Commit d72f557a by luoqi

feat(sync): PR2 — cohort batch + checkpoint(内存稳 + 进度可观测)

资源 + 续跑 2 件:
1. **Cohort batch** — 按 patient 分批 load+transform+assemble+write,
   每批跑完中间表出作用域 → V8 GC 释放,峰值内存从 5-10GB 降到 500MB-1GB
   14GB 机器全量跑稳,不再撞 PG panic 那种磁盘 / OOM。

2. **Per-batch checkpoint** — sync_logs.metadata JSONB 记 cohortDone /
   cohortTotal / lastBatchMs / lastBatchRssMb,Dashboard + 监控可观测;
   readCheckpointOffset 从同 syncLogId 读 cohortDone(为 PR3 --resume 留口)。

变更:
  prisma migration 20260528000001:syncLog 加 metadata JSONB 列
  prisma schema 同步 metadata Json? 字段
  ClickHouseSourceService:
    - listPatientPairs:DISTINCT (patient_id, brand) FROM fact_client_out
      ORDER BY patient_id,增量 cursor 同步过滤;返回 batch 的边界
    - loadTablesForCohort:跟 loadAllTables 同形态,SQL 注入
      (patient_id, brand) IN (tuples) 过滤,增量 cursor 仍生效;
      不做反向拉主档(本批 fact_client_out 已含本批所有 patient 主档)
    - injectCohortFilter:把 IN tuple 在原 SQL 的 WHERE 末 / ORDER BY 前插入
  ColdImportService.importDirectory:
    - 加 cohortBatchSize option(env PAC_COHORT_BATCH_SIZE 兜底,默认 5000)
    - 抽出 processCohort 私有方法(单 cohort 完整 load→transform→write 流程)
    - cohortBatchSize > 0 + sql_source → 分批 loop,每批结束更新 metadata
    - 否则 single-shot(向后兼容,文件源走此路径)
    - chunk + resolveCohortBatchSize 导出工具函数(给 PR4 + 测试用)
  CLI cold-import.cli.ts:
    - 加 --incremental / --cohort-batch=N / --no-cohort 参数
    - 启动日志打印分批配置

向后兼容:
  - 既有 importDirectory({dryRun, incremental}) 调用全不动
  - 文件源(manifest.tables[])仍走 single-shot
  - ClickHouse 源默认走 5000 cohort,可 --no-cohort 退回 single-shot

PR3 后续:
  - 加 --resume 用 readCheckpointOffset(stale running lock 需手动 abort 后才能用)
  - 加 cron 看门狗自动清 stale running
parent fcc2a9d6
-- sync_logs.metadata:cohort batch / assembler 进度 + 资源画像 + 自定义标签
-- 形态(JSONB,无强 schema):
-- { "cohortTotal": 26, "cohortDone": 5, "lastAssembler": "diagnosis", "rssPeakMb": 980, ... }
-- 用途:
-- 1. 进度可观测:监控 / dashboard 看跑到第几批
-- 2. checkpoint 续跑:崩了 / 手动 abort 后,重启 CLI 加 --resume 从 cohortDone+1 接着跑
-- 3. 资源画像:rssPeakMb 记录峰值内存,辅助调 batch size
ALTER TABLE "sync_logs" ADD COLUMN "metadata" JSONB;
......@@ -1331,6 +1331,11 @@ model SyncLog {
startedAt DateTime @default(now()) @map("started_at") @db.Timestamptz(3)
endedAt DateTime? @map("ended_at") @db.Timestamptz(3)
/// cohort batch / assembler 进度 + 资源画像(JSONB,无强 schema)
/// 形态见 packages/types schema(可选,目前用 unknown);典型字段:
/// { cohortTotal, cohortDone, lastAssembler, rssPeakMb, batchTimings: [...] }
metadata Json?
host Host @relation(fields: [hostId], references: [id])
transactions PatientTransaction[]
......
......@@ -20,14 +20,21 @@ interface CliArgs {
dir?: string;
dryRun: boolean;
help: boolean;
incremental: boolean;
cohortBatchSize?: number | null; // null = 显式禁用分批,undefined = 用 env / 默认
}
function parseArgs(argv: string[]): CliArgs {
const args: CliArgs = { dryRun: false, help: false };
const args: CliArgs = { dryRun: false, help: false, incremental: false };
for (const a of argv) {
if (a === '--help' || a === '-h') args.help = true;
else if (a === '--dry-run') args.dryRun = true;
else if (a.startsWith('--dir=')) args.dir = a.slice('--dir='.length);
else if (a === '--incremental') args.incremental = true;
else if (a === '--no-cohort') args.cohortBatchSize = 0; // 显式禁用分批,跑 single-shot
else if (a.startsWith('--cohort-batch=')) {
const n = parseInt(a.slice('--cohort-batch='.length), 10);
args.cohortBatchSize = Number.isFinite(n) && n >= 0 ? n : undefined;
} else if (a.startsWith('--dir=')) args.dir = a.slice('--dir='.length);
}
return args;
}
......@@ -43,13 +50,18 @@ function printHelp() {
' pnpm cold-import -- --dir=<dir> [--dry-run]',
'',
'Options:',
' --dir=<path> 必填,manifest.yaml 所在目录(相对 / 绝对路径)',
' --dry-run 只读 + 翻译预览,不写库',
' --help, -h 显示本帮助',
' --dir=<path> 必填,manifest.yaml 所在目录(相对 / 绝对路径)',
' --dry-run 只读 + 翻译预览,不写库',
' --incremental 增量模式(读上次 cursor,SQL 注 WHERE > cursor;首跑等价全量)',
' --cohort-batch=<N> 按 patient 分批跑(default 5000;env PAC_COHORT_BATCH_SIZE 兜底)',
' --no-cohort 显式禁用分批,跑 single-shot(文件源或调试场景)',
' --help, -h 显示本帮助',
'',
'Examples:',
' pnpm cold-import -- --dir=./data/jvs-dw --dry-run',
' pnpm cold-import -- --dir=./data/jvs-dw',
' pnpm cold-import -- --dir=./data/jvs-dw --cohort-batch=2000',
' pnpm cold-import -- --dir=./data/jvs-dw --incremental',
'',
];
// eslint-disable-next-line no-console
......@@ -64,7 +76,10 @@ async function bootstrap() {
}
const logger = new Logger('cold-import:cli');
logger.log(`Starting cold-import CLI(dir=${args.dir}, dryRun=${args.dryRun})`);
logger.log(
`Starting cold-import CLI(dir=${args.dir}, dryRun=${args.dryRun}, ` +
`incremental=${args.incremental}, cohortBatch=${args.cohortBatchSize ?? '(default/env)'})`,
);
const app = await NestFactory.createApplicationContext(AppModule, {
logger: ['log', 'warn', 'error'],
......@@ -72,7 +87,11 @@ async function bootstrap() {
try {
const svc = app.get(ColdImportService);
const result = await svc.importDirectory(args.dir!, { dryRun: args.dryRun });
const result = await svc.importDirectory(args.dir!, {
dryRun: args.dryRun,
incremental: args.incremental,
cohortBatchSize: args.cohortBatchSize,
});
logger.log('─────────────────────────────────────────');
logger.log(`Result:`);
......
......@@ -233,6 +233,151 @@ export class ClickHouseSourceService {
return tables;
}
/**
* 列出全部 patient (patient_id, brand) pair,作为 cohort batch 的 ORDER 边界。
*
* 用 fact_client_out 当 source of truth(它就是 patient 主档表);
* 增量模式 cursorAfter 用 last_visit_time 列(同 manifest.sql_source.incremental.per_query.fact_client_out)。
*
* ORDER BY patient_id ASC 保证 OFFSET LIMIT 分页稳定;
* 跨 batch 时同一 patient 不会拆开(LIMIT 5000 = 拿 5000 个 distinct patient)。
*/
async listPatientPairs(
source: ClickHouseSource,
incremental?: IncrementalConfig,
): Promise<Array<{ patient_id: string; brand: string }>> {
const conn = this.resolveConnection(source);
const client = createClient({
url: conn.url,
database: conn.database,
username: conn.username,
password: conn.password,
request_timeout: 60_000,
compression: { response: true, request: false },
});
try {
const cursorCfg = incremental?.perQuery['fact_client_out'];
const whereParts: string[] = [];
if (cursorCfg?.cursorValue) {
whereParts.push(
`${cursorCfg.cursorColumn} > '${cursorCfg.cursorValue.replace(/'/g, "''")}'`,
);
}
const whereSql = whereParts.length > 0 ? ` WHERE ${whereParts.join(' AND ')}` : '';
const sql = `SELECT DISTINCT patient_id, brand FROM dw_group.fact_client_out${whereSql} ORDER BY patient_id`;
this.logger.log(`[clickhouse·cohort] list patient pairs — ${sql.slice(0, 200)}`);
const started = Date.now();
const result = await client.query({ query: sql, format: 'JSONEachRow' });
const rows = (await result.json()) as Array<{ patient_id: string; brand: string }>;
this.logger.log(
`[clickhouse·cohort] → ${rows.length} distinct (patient_id, brand) pairs, ${Date.now() - started} ms`,
);
return rows;
} finally {
await client.close();
}
}
/**
* 加载指定 patient 集合的所有 source 表(cohort batch 模式入口)。
*
* 跟 loadAllTables 的差别:
* - 每张表 SQL 注入 `(patient_id, brand) IN ((p1,b1),(p2,b2),...)` 过滤
* - 增量 cursor 跟全量一样支持(WHERE cursor_col > val AND in (...) 二者并存)
* - 不做反向拉主档(本批 fact_client_out 已含本批所有 patient 主档,无需补)
* - default_limit 仍然应用(防御性,本批数据通常远小于 limit)
*
* 用途:importDirectory 在 cohort batch 模式下按 patient 段调用,
* 每批跑完释放内存,bound 资源使用。
*/
async loadTablesForCohort(
source: ClickHouseSource,
pairs: ReadonlyArray<{ patient_id: string; brand: string }>,
incremental?: IncrementalConfig,
): Promise<Record<string, unknown[]>> {
const conn = this.resolveConnection(source);
const client = createClient({
url: conn.url,
database: conn.database,
username: conn.username,
password: conn.password,
request_timeout: 60_000,
compression: { response: true, request: false },
});
// 构造 IN tuple 列表(同 reversePullPatientMaster 风格)
const tuples = pairs
.map(
(p) =>
`('${(p.patient_id ?? '').replace(/'/g, "''")}', '${(p.brand ?? '').replace(/'/g, "''")}')`,
)
.join(',');
const tables: Record<string, unknown[]> = {};
const defaultLimit = source.default_limit ?? 100_000;
try {
for (const [tableName, sql] of Object.entries(source.queries)) {
const incCfg = incremental?.perQuery[tableName];
// 1. cursor 注入(同 loadAllTables 路径)
const sqlWithCursor = incCfg
? this.injectIncrementalCursor(sql, incCfg.cursorColumn, incCfg.cursorValue)
: sql;
// 2. cohort 注入:在 WHERE 后追加 AND (patient_id, brand) IN (...)
const sqlWithCohort = this.injectCohortFilter(sqlWithCursor, tuples);
const sqlWithLimit = this.applyDefaultLimit(sqlWithCohort, defaultLimit);
this.logger.log(
`[clickhouse·cohort] query "${tableName}" — ${sqlWithLimit.slice(0, 140).replace(/\s+/g, ' ')}...`,
);
const started = Date.now();
const result = await client.query({ query: sqlWithLimit, format: 'JSONEachRow' });
const rows = (await result.json()) as unknown[];
tables[tableName] = rows;
this.logger.log(
`[clickhouse·cohort] "${tableName}" → ${rows.length} 行,${Date.now() - started} ms`,
);
// 增量 cursor 推进(同 loadAllTables;cohort 模式下也累积,run_start cursor 会覆盖)
if (incremental && incCfg && rows.length > 0) {
const maxVal = this.computeMax(rows as Record<string, unknown>[], incCfg.cursorColumn);
if (maxVal) {
incremental.cursorAdvances = incremental.cursorAdvances ?? {};
const prev = incremental.cursorAdvances[tableName];
if (!prev || maxVal > prev) {
incremental.cursorAdvances[tableName] = maxVal;
}
}
}
}
} finally {
await client.close();
}
return tables;
}
/// 把 (patient_id, brand) IN tuple 过滤注入到已有 SQL 的 WHERE 末尾(无 WHERE 时新建)
private injectCohortFilter(sql: string, tuplesCsv: string): string {
if (!tuplesCsv) return sql; // 空 cohort 不注入(理论上不该发生)
const cohortClause = `(patient_id, brand) IN (${tuplesCsv})`;
// 找最外层 WHERE(若有);若已有 ORDER BY/LIMIT/GROUP BY,要插在它们之前
const hasWhere = /\bWHERE\b/i.test(sql);
if (!hasWhere) {
// 在 ORDER/LIMIT/GROUP 之前 / FROM 之后插 WHERE
const tailMatch = sql.match(/(\s+(?:ORDER\s+BY|LIMIT|GROUP\s+BY)\b[\s\S]*)$/i);
if (tailMatch) {
const tail = tailMatch[1]!;
return sql.slice(0, sql.length - tail.length) + ` WHERE ${cohortClause}` + tail;
}
return `${sql} WHERE ${cohortClause}`;
}
// 已有 WHERE:在 ORDER/LIMIT/GROUP 之前 / WHERE body 末尾插 AND ...
const tailMatch = sql.match(/(\s+(?:ORDER\s+BY|LIMIT|GROUP\s+BY)\b[\s\S]*)$/i);
if (tailMatch) {
const tail = tailMatch[1]!;
return sql.slice(0, sql.length - tail.length) + ` AND ${cohortClause}` + tail;
}
return `${sql} AND ${cohortClause}`;
}
/// 把原 SQL 的 cohort/cursor/ORDER/LIMIT 全部剥离,改写为 patient_id+brand 精确过滤
private injectPatientFilter(
originalSql: string,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment