cold-import.service.ts
55.8 KB
-
feat(sync): PR5b — cohort 级并行 worker pool(资源充分利用) · 45255896
env PAC_COHORT_CONCURRENCY: 默认 1 = 完全串行(行为不变,最稳) >1 = worker pool 并发处理 N 个 cohort 并发安全(数据完整性靠架构,非"小心写代码"): - cohort = disjoint 患者集(chunk distinct pairs)→ 所有写入患者级隔离 - source_event_id partial UNIQUE(本 PR 前一 commit 补)+ fact (subject_id,version) UNIQUE 双保险 → 任何并发下 DB 强制一致 - totals/seenTenants/firstError 共享但 JS 单线程,+= / .add() 同步原子无 race - cursorAdvances race 无害(PR1 后 cursor=run_start 不读它) worker pool 实现:N 个 runner 从 indices 队列各自拉下一个直到耗尽(无额外依赖)。 并行模式调整: - checkpoint resume 禁用(完成乱序前缀语义失效;靠 source_event_id 幂等从头重跑安全) - metadata.cohortDone 改 count-based(近似进度)+ 记 cohortMode/concurrency 本地验证(concurrency=4, 9 batch × 50 患者): - 4 cohort 同时 start,worker pool 滚动,9 批 8 秒(串行约 20-45s) - 内存峰值 338MB(单 cohort × 并发,14GB 有余量) - 数据完整性自查全过:0 dup_source_events / 0 multi_active_facts - 幂等:重跑全 skip 运维注意(已在 resolveCohortConcurrency 注释): 并行需 bump Prisma 连接池 DATABASE_URL?connection_limit=N×N+余量; 内存 N× 单 cohort(并发 4 ≈ 1.3GB) 服务器用法:pnpm sync:prod -- --dir=./data/jvs-dw(默认串行); 榨资源:PAC_COHORT_CONCURRENCY=4 pnpm sync:prod ...luoqi committed