| Name |
Last commit
|
Last update |
|---|---|---|
| .. | ||
| adapters | ||
| assembler | ||
| cold-import | ||
| dto | ||
| pipeline | ||
| pull | ||
| push | ||
| reconcile | ||
| transforms | ||
| sync.controller.ts | ||
| sync.module.ts | ||
| sync.service.ts |
env PAC_COHORT_CONCURRENCY:
默认 1 = 完全串行(行为不变,最稳)
>1 = worker pool 并发处理 N 个 cohort
并发安全(数据完整性靠架构,非"小心写代码"):
- cohort = disjoint 患者集(chunk distinct pairs)→ 所有写入患者级隔离
- source_event_id partial UNIQUE(本 PR 前一 commit 补)+ fact (subject_id,version)
UNIQUE 双保险 → 任何并发下 DB 强制一致
- totals/seenTenants/firstError 共享但 JS 单线程,+= / .add() 同步原子无 race
- cursorAdvances race 无害(PR1 后 cursor=run_start 不读它)
worker pool 实现:N 个 runner 从 indices 队列各自拉下一个直到耗尽(无额外依赖)。
并行模式调整:
- checkpoint resume 禁用(完成乱序前缀语义失效;靠 source_event_id 幂等从头重跑安全)
- metadata.cohortDone 改 count-based(近似进度)+ 记 cohortMode/concurrency
本地验证(concurrency=4, 9 batch × 50 患者):
- 4 cohort 同时 start,worker pool 滚动,9 批 8 秒(串行约 20-45s)
- 内存峰值 338MB(单 cohort × 并发,14GB 有余量)
- 数据完整性自查全过:0 dup_source_events / 0 multi_active_facts
- 幂等:重跑全 skip
运维注意(已在 resolveCohortConcurrency 注释):
并行需 bump Prisma 连接池 DATABASE_URL?connection_limit=N×N+余量;
内存 N× 单 cohort(并发 4 ≈ 1.3GB)
服务器用法:pnpm sync:prod -- --dir=./data/jvs-dw(默认串行);
榨资源:PAC_COHORT_CONCURRENCY=4 pnpm sync:prod ...
| Name |
Last commit
|
Last update |
|---|---|---|
| .. | ||
| adapters | Loading commit data... | |
| assembler | Loading commit data... | |
| cold-import | Loading commit data... | |
| dto | Loading commit data... | |
| pipeline | Loading commit data... | |
| pull | Loading commit data... | |
| push | Loading commit data... | |
| reconcile | Loading commit data... | |
| transforms | Loading commit data... | |
| sync.controller.ts | Loading commit data... | |
| sync.module.ts | Loading commit data... | |
| sync.service.ts | Loading commit data... |