Commit 45255896 by luoqi

feat(sync): PR5b — cohort 级并行 worker pool(资源充分利用)

env PAC_COHORT_CONCURRENCY:
  默认 1 = 完全串行(行为不变,最稳)
  >1 = worker pool 并发处理 N 个 cohort

并发安全(数据完整性靠架构,非"小心写代码"):
  - cohort = disjoint 患者集(chunk distinct pairs)→ 所有写入患者级隔离
  - source_event_id partial UNIQUE(本 PR 前一 commit 补)+ fact (subject_id,version)
    UNIQUE 双保险 → 任何并发下 DB 强制一致
  - totals/seenTenants/firstError 共享但 JS 单线程,+= / .add() 同步原子无 race
  - cursorAdvances race 无害(PR1 后 cursor=run_start 不读它)

worker pool 实现:N 个 runner 从 indices 队列各自拉下一个直到耗尽(无额外依赖)。

并行模式调整:
  - checkpoint resume 禁用(完成乱序前缀语义失效;靠 source_event_id 幂等从头重跑安全)
  - metadata.cohortDone 改 count-based(近似进度)+ 记 cohortMode/concurrency

本地验证(concurrency=4, 9 batch × 50 患者):
  - 4 cohort 同时 start,worker pool 滚动,9 批 8 秒(串行约 20-45s)
  - 内存峰值 338MB(单 cohort × 并发,14GB 有余量)
  - 数据完整性自查全过:0 dup_source_events / 0 multi_active_facts
  - 幂等:重跑全 skip

运维注意(已在 resolveCohortConcurrency 注释):
  并行需 bump Prisma 连接池 DATABASE_URL?connection_limit=N×N+余量;
  内存 N× 单 cohort(并发 4 ≈ 1.3GB)

服务器用法:pnpm sync:prod -- --dir=./data/jvs-dw(默认串行);
  榨资源:PAC_COHORT_CONCURRENCY=4 pnpm sync:prod ...
parent bf921bbc
......@@ -218,17 +218,24 @@ export class ColdImportService {
this.logger.log(`Cohort: 0 patients in scope(增量空跑或源数据为空) skip batch`);
} else {
const batches = chunk(allPairs, cohortBatchSize);
const resumeFrom = await this.readCheckpointOffset(syncLog?.id);
const concurrency = resolveCohortConcurrency();
// ⭐ PR5b:并行模式不做 checkpoint resume(完成顺序乱,前缀语义失效)
// 靠 source_event_id 幂等,崩溃从头重跑安全;串行模式保留 resume
const resumeFrom = concurrency > 1 ? 0 : await this.readCheckpointOffset(syncLog?.id);
this.logger.log(
`Cohort 模式:patients=${allPairs.length} batchSize=${cohortBatchSize} batches=${batches.length}` +
(resumeFrom > 0 ? ` resumeFrom=${resumeFrom}` : ''),
`Cohort 模式:patients=${allPairs.length} batchSize=${cohortBatchSize} ` +
`batches=${batches.length} concurrency=${concurrency}` +
(resumeFrom > 0 ? ` resumeFrom=${resumeFrom}` : '') +
(concurrency > 1 ? ' (并行:checkpoint resume 禁用,靠幂等)' : ''),
);
for (let i = resumeFrom; i < batches.length; i++) {
let doneCount = resumeFrom;
const runOneBatch = async (i: number): Promise<void> => {
const batchStart = Date.now();
const batch = batches[i]!;
this.logger.log(
`─── cohort batch ${i + 1}/${batches.length} ` +
`(patients ${i * cohortBatchSize}..${i * cohortBatchSize + batch.length - 1}) ─────`,
`─── cohort batch ${i + 1}/${batches.length} start ` +
`(${batch.length} patients) ─────`,
);
await this.processCohort({
absDir,
......@@ -247,19 +254,20 @@ export class ColdImportService {
totals,
onFirstError: (msg) => { if (!firstError) firstError = msg; },
});
// 6b. checkpoint 写入(给 --resume + 进度可观测用)
doneCount++;
// checkpoint 写入(串行=精确前缀;并行=count-based 近似进度)
if (syncLog) {
const elapsedMs = Date.now() - batchStart;
const memMb = Math.round(process.memoryUsage().rss / 1024 / 1024);
await this.prisma.syncLog.update({
where: { id: syncLog.id },
data: {
metadata: {
cohortTotal: batches.length,
cohortDone: i + 1,
cohortDone: doneCount,
cohortMode: concurrency > 1 ? 'parallel' : 'serial',
concurrency,
batchSize: cohortBatchSize,
lastBatchMs: elapsedMs,
lastBatchRssMb: memMb,
lastBatchMs: Date.now() - batchStart,
lastBatchRssMb: Math.round(process.memoryUsage().rss / 1024 / 1024),
lastBatchAt: new Date().toISOString(),
},
},
......@@ -267,7 +275,29 @@ export class ColdImportService {
}
this.logger.log(
`─── cohort batch ${i + 1}/${batches.length} done ${Date.now() - batchStart}ms ` +
`rss=${Math.round(process.memoryUsage().rss / 1024 / 1024)}MB ─────`,
`rss=${Math.round(process.memoryUsage().rss / 1024 / 1024)}MB ` +
`(${doneCount}/${batches.length}) ─────`,
);
};
const indices = Array.from(
{ length: batches.length - resumeFrom },
(_, k) => resumeFrom + k,
);
if (concurrency <= 1) {
// 串行(默认,行为不变)
for (const i of indices) await runOneBatch(i);
} else {
// 并行 worker pool — N 个 runner 各自从 indices 队列拉下一个直到耗尽
let cursor = 0;
const runner = async (): Promise<void> => {
while (cursor < indices.length) {
const myIdx = indices[cursor++]!;
await runOneBatch(myIdx);
}
};
await Promise.all(
Array.from({ length: Math.min(concurrency, indices.length) }, () => runner()),
);
}
}
......@@ -1302,6 +1332,27 @@ export function resolveWriteBatchSize(): number {
return 1000;
}
/**
* PR5b ⭐ cohort 并发度:
* env PAC_COHORT_CONCURRENCY > 1 → 并行处理 N 个 cohort(worker pool)
* 默认 1 = 完全串行(行为不变,最稳)
*
* ⚠️ 并行时注意:
* - 数据完整性不受影响(cohort = disjoint 患者;source_event_id + fact version partial UNIQUE 双保险)
* - checkpoint resume 禁用(完成乱序,靠幂等从头重跑)
* - 需 bump Prisma 连接池:DATABASE_URL?connection_limit=N×(write_batch 并发数+余量),
* 否则并发 createMany/bulkWrite 排队等连接(不损坏数据,只是变慢)
* - 内存 N× 单 cohort(单 cohort ~330MB;并发 4 ≈ 1.3GB,14GB 机器有余量)
*/
export function resolveCohortConcurrency(): number {
const env = process.env.PAC_COHORT_CONCURRENCY;
if (env !== undefined) {
const n = parseInt(env, 10);
return Number.isFinite(n) && n >= 1 ? n : 1;
}
return 1;
}
/** 把数组切成 size 大小的块(尾巴块可能 < size) */
export function chunk<T>(arr: ReadonlyArray<T>, size: number): T[][] {
if (size <= 0) return [arr.slice() as T[]];
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment