cold-import.service.ts
46 KB
-
feat(sync): PR2 — cohort batch + checkpoint(内存稳 + 进度可观测) · d72f557a
资源 + 续跑 2 件: 1. **Cohort batch** — 按 patient 分批 load+transform+assemble+write, 每批跑完中间表出作用域 → V8 GC 释放,峰值内存从 5-10GB 降到 500MB-1GB 14GB 机器全量跑稳,不再撞 PG panic 那种磁盘 / OOM。 2. **Per-batch checkpoint** — sync_logs.metadata JSONB 记 cohortDone / cohortTotal / lastBatchMs / lastBatchRssMb,Dashboard + 监控可观测; readCheckpointOffset 从同 syncLogId 读 cohortDone(为 PR3 --resume 留口)。 变更: prisma migration 20260528000001:syncLog 加 metadata JSONB 列 prisma schema 同步 metadata Json? 字段 ClickHouseSourceService: - listPatientPairs:DISTINCT (patient_id, brand) FROM fact_client_out ORDER BY patient_id,增量 cursor 同步过滤;返回 batch 的边界 - loadTablesForCohort:跟 loadAllTables 同形态,SQL 注入 (patient_id, brand) IN (tuples) 过滤,增量 cursor 仍生效; 不做反向拉主档(本批 fact_client_out 已含本批所有 patient 主档) - injectCohortFilter:把 IN tuple 在原 SQL 的 WHERE 末 / ORDER BY 前插入 ColdImportService.importDirectory: - 加 cohortBatchSize option(env PAC_COHORT_BATCH_SIZE 兜底,默认 5000) - 抽出 processCohort 私有方法(单 cohort 完整 load→transform→write 流程) - cohortBatchSize > 0 + sql_source → 分批 loop,每批结束更新 metadata - 否则 single-shot(向后兼容,文件源走此路径) - chunk + resolveCohortBatchSize 导出工具函数(给 PR4 + 测试用) CLI cold-import.cli.ts: - 加 --incremental / --cohort-batch=N / --no-cohort 参数 - 启动日志打印分批配置 向后兼容: - 既有 importDirectory({dryRun, incremental}) 调用全不动 - 文件源(manifest.tables[])仍走 single-shot - ClickHouse 源默认走 5000 cohort,可 --no-cohort 退回 single-shot PR3 后续: - 加 --resume 用 readCheckpointOffset(stale running lock 需手动 abort 后才能用) - 加 cron 看门狗自动清 stale runningluoqi committed