数据正确性 2 件:
1. **并发锁**:sync_logs 加 partial UNIQUE (host_id) WHERE status='running'
同 host 同时只能 1 个 sync 在跑(存量 / 增量 cron / 手动一律抢同一把锁)
INSERT 撞 P2002 → 抛 SyncAlreadyRunningError → 调用方 skip
scheduler 捕获该 error 时 warn 不 error,下次 cron 自然 retry
CLI 撞锁退出 code=4(区分于 2=真失败 / 3=bootstrap 崩)
2. **cursor=run_start 而非 max(updated_date)**:
存量跑期间 DW 持续写入(已摄入患者更新 / 未摄入患者新增),
max(updated_date) cursor 会漏:
· batch 1 摄入患者 100,T+1h DW 又写患者 100 一笔(updated_date=T+1h)
max cursor 推到 T+4:25(末批的 max),下次增量 WHERE > T+4:25 → 漏掉这笔
run_start cursor 保证捞回:
· 下次 WHERE > T+0 → 全部 T+0~now 的变化都进入增量
· 同行同 updatedAt → source_event_id 一致 → P2002 path → parser re-run idempotent
· 同行不同 updatedAt → 不同 source_event_id → 新 tx + 新 fact 版本
重复读浪费 read 但无害,数据正确性 > 带宽优化
importDirectory 重构:
- runStart 入口冻结
- SyncLog create 提前(在 table load 前),作为锁 acquisition 点
- 整段 work 包 try/finally,finally 统一 finalize syncLog(释放锁 + 写 cursor)
- 之前没有 finally,work throw 时 syncLog 卡在 status='running' = 永久死锁
- 加 SyncAlreadyRunningError 导出类,scheduler / CLI 分别处理
本地验证(docker exec psql):
✅ 1st INSERT running OK
✅ 2nd INSERT running for same host → unique violation
✅ UPDATE 1st status='success' 后,新 running INSERT OK(锁释放)
stale 锁兜底(进程崩留 status='running'):
目前需人工清:
UPDATE sync_logs SET status='aborted' WHERE status='running'
AND started_at < NOW() - INTERVAL '12 hours';
PR2/PR3 期间会加 cron 看门狗自动清。
| Name |
Last commit
|
Last update |
|---|---|---|
| .. | ||
| processors | Loading commit data... | |
| bull-board.module.ts | Loading commit data... | |
| dw-lag-monitor.service.ts | Loading commit data... | |
| job-payloads.ts | Loading commit data... | |
| queue-names.ts | Loading commit data... | |
| queue-producer.service.ts | Loading commit data... | |
| queues.module.ts | Loading commit data... | |
| stale-scan.service.ts | Loading commit data... | |
| sync-incremental.scheduler.ts | Loading commit data... |