Commit f7a6d41f by luoqi

feat(sync): PR3 — 统一 sync 模式,deprecate cold-import 独立路径

把 cold-import / incremental 两个独立模式合并为单一 sync 入口。

importDirectory 关键改动:
- 只要 manifest 配了 sql_source.incremental.per_query,**永远** 读 + 写 cursor
- options.incremental === false → 走 --full 路径(忽略上次 cursor,但仍写新 cursor_after)
- options.incremental 默认 true → 正常 sync(读 + 写 cursor)
- resource 统一写 'incremental_bundle'(不再区分 cold_import / incremental)
- readLastIncrementalCursor 接受 SUCCESS + PARTIAL 历史行(partial 已推进 cursor)

修关键 bug:
  PR1 之前,--full 模式(incremental=false)不写 cursor → 下次 sync 从旧 cursor 拉
  → **漏掉 --full 期间 DW 新写入的数据**
  PR3 统一写 cursor_after=run_start,保证后续 sync 永远从最新 baseline 接力

新 CLI 入口:
- pnpm sync          → 推荐主入口(=sync-incremental,首跑全量 / 日常增量自动)
- pnpm sync:prod     → docker prod 版
- pnpm sync:once     → 别名(强调"手动触发一次")
- pnpm cold-import   → legacy 保留(不读 cursor,等价 pnpm sync --full)
- pnpm sync-incremental → legacy alias 保留

sync-incremental.cli.ts 增强:
- 加 --full / --cohort-batch=N / --no-cohort 参数
- 文件头改 "PAC v1 唯一推荐 sync 命令" + 用法 + 退出码注释
- 启动日志带 cohortBatch / full / dryRun 配置全量

部署节奏(server 实战):
  首次:pnpm sync:prod -- --dir=./data/jvs-dw
        → 自动分批(5000 patient/批)
        → cursor_after = 启动时刻 ISO
        → 持续跑直到所有 patient 处理完成
        → 此后 cron 02:30 增量自动接力
  灾后:DELETE FROM patient_transactions...;
        pnpm sync:prod -- --dir=./data/jvs-dw  (cursor 自然为空 = 等价全量)
  强制:pnpm sync -- --full(罕见,cursor 损坏修复场景)

向后兼容:
- 所有老脚本(cold-import / sync-incremental)继续工作
- SyncIncrementalSchedulerService 不动(默认 incremental:true)
- 历史 sync_logs 行(resource='cold_import_bundle')不受影响,
  自然被 cursor_after IS NULL 过滤掉
parent d72f557a
...@@ -16,6 +16,9 @@ ...@@ -16,6 +16,9 @@
"prisma:deploy": "prisma migrate deploy", "prisma:deploy": "prisma migrate deploy",
"prisma:studio": "prisma studio", "prisma:studio": "prisma studio",
"prisma:seed": "ts-node --transpile-only prisma/seed.ts", "prisma:seed": "ts-node --transpile-only prisma/seed.ts",
"sync": "ts-node --transpile-only src/cli/sync-incremental.cli.ts",
"sync:prod": "node --max-old-space-size=8192 dist/cli/sync-incremental.cli.js",
"sync:once": "ts-node --transpile-only src/cli/sync-incremental.cli.ts",
"cold-import": "ts-node --transpile-only src/cli/cold-import.cli.ts", "cold-import": "ts-node --transpile-only src/cli/cold-import.cli.ts",
"cold-import:prod": "node --max-old-space-size=8192 dist/cli/cold-import.cli.js", "cold-import:prod": "node --max-old-space-size=8192 dist/cli/cold-import.cli.js",
"sync-incremental": "ts-node --transpile-only src/cli/sync-incremental.cli.ts", "sync-incremental": "ts-node --transpile-only src/cli/sync-incremental.cli.ts",
......
/** /**
* Sync Incremental CLI(W4 末) * Sync CLI(统一入口 — PAC v1 唯一推荐 sync 命令)
* *
* 从上次 sync_logs.cursor_after 增量拉 DW 数据,跑完触发 persona + plan recompute。 * 从上次 sync_logs.cursor_after 增量拉 DW 数据,跑完触发 persona + plan recompute。
* *
* 链路: * 链路:
* 1. ColdImportService.importDirectory({ incremental: true }) * 1. ColdImportService.importDirectory({ incremental: true, cohortBatchSize })
* - 读 sync_logs[host=jvs-dw, resource=incremental_bundle].cursor_after 拿上次 cursor * - 读 sync_logs[host=jvs-dw, resource=incremental_bundle].cursor_after 拿上次 cursor
* - SQL 注入 WHERE updated_date > '${cursor}'(per-query) * - SQL 注入 WHERE updated_date > '${cursor}'(per-query)
* - 跑完写新 sync_log + cursor_after JSON * - **首跑无 cursor = 等价全量**(自动按 patient 分批,内存恒定)
* - 跑完写新 cursor_after = run_start ISO(PR1 — 不漏批次间 DW 变更)
* 2. 收集本批 affected patient_ids * 2. 收集本批 affected patient_ids
* 3. 对每个 affected patient 跑 PersonaService.recompute(等价 BullMQ 触发,本地同步跑) * 3. 对每个 affected patient 跑 PersonaService.recompute
* 4. 跑 PlanEngineService.runHost(SQL 召回 + 6 因子打分,生成/更新 followup_plans) * 4. 跑 PlanEngineService.runHost(SQL 召回 + 6 因子打分,生成/更新 followup_plans)
* *
* 用法: * 用法(推荐 `pnpm sync` 别名,等价 sync-incremental):
* pnpm sync-incremental -- --dir=./data/jvs-dw # 增量 + 联动 persona/plan * pnpm sync -- --dir=./data/jvs-dw # 首跑(全量)或日常增量
* pnpm sync-incremental -- --dir=./data/jvs-dw --no-recompute # 只拉数据不联动重算(debug) * pnpm sync -- --dir=./data/jvs-dw --cohort-batch=5000 # 显式分批(默认 5000,可调)
* pnpm sync-incremental -- --dir=./data/jvs-dw --dry-run # 不写库(测试 cursor 注入是否正确) * pnpm sync -- --dir=./data/jvs-dw --full # 强制忽略 cursor,全量重新拉(灾难恢复)
* pnpm sync -- --dir=./data/jvs-dw --no-recompute # 只拉数据不联动重算(debug)
* pnpm sync -- --dir=./data/jvs-dw --dry-run # 不写库(测试 cursor 注入)
* *
* 部署:BullMQ cron 每天 02:00 触发(DW 那边每日刷新后) * 部署:BullMQ cron 每天 02:00 触发(DW 上游 02:00 完成刷新后)
*
* 推荐路径:
* - 首次部署:`pnpm sync:prod -- --dir=./data/jvs-dw`(等价全量,分批跑稳)
* - 日常增量:cron 自动跑(env PAC_INCREMENTAL_CRON)
* - 灾后重灌:TRUNCATE patient_transactions; pnpm sync(cursor 无值 = 全量)
*
* cold-import.cli.ts(`pnpm cold-import`)是 legacy 入口,**不读不写 cursor**;
* 仅用于"想要全量但不动 cursor 状态"的极少场景。**默认请用 `pnpm sync`**。
*/ */
import { NestFactory } from '@nestjs/core'; import { NestFactory } from '@nestjs/core';
...@@ -32,37 +43,61 @@ interface CliArgs { ...@@ -32,37 +43,61 @@ interface CliArgs {
dir?: string; dir?: string;
dryRun: boolean; dryRun: boolean;
noRecompute: boolean; noRecompute: boolean;
full: boolean; // 忽略 cursor,等价 cold-import legacy 行为(但仍写 cursor_after = run_start)
cohortBatchSize?: number | null;
help: boolean; help: boolean;
} }
function parseArgs(argv: string[]): CliArgs { function parseArgs(argv: string[]): CliArgs {
const out: CliArgs = { dryRun: false, noRecompute: false, help: false }; const out: CliArgs = {
dryRun: false,
noRecompute: false,
full: false,
help: false,
};
for (const a of argv) { for (const a of argv) {
if (a === '--help' || a === '-h') out.help = true; if (a === '--help' || a === '-h') out.help = true;
else if (a === '--dry-run') out.dryRun = true; else if (a === '--dry-run') out.dryRun = true;
else if (a === '--no-recompute') out.noRecompute = true; else if (a === '--no-recompute') out.noRecompute = true;
else if (a.startsWith('--dir=')) out.dir = a.slice('--dir='.length); else if (a === '--full') out.full = true;
else if (a === '--no-cohort') out.cohortBatchSize = 0;
else if (a.startsWith('--cohort-batch=')) {
const n = parseInt(a.slice('--cohort-batch='.length), 10);
out.cohortBatchSize = Number.isFinite(n) && n >= 0 ? n : undefined;
} else if (a.startsWith('--dir=')) out.dir = a.slice('--dir='.length);
} }
return out; return out;
} }
function printHelp() { function printHelp() {
console.log(` console.log(`
Sync Incremental CLI PAC Sync CLI(统一入口,推荐用 \`pnpm sync\`)
DW 直连增量摄入 + 自动 persona/plan 联动重算 DW 直连增量摄入 + 自动 persona/plan 联动重算
Usage: Usage:
pnpm sync-incremental -- --dir=<manifest_dir> [--dry-run] [--no-recompute] pnpm sync -- --dir=<manifest_dir> [options]
Options: Options:
--dir=<path> 必填,manifest.yaml 所在目录(增量 cursor 配置也在 manifest.sql_source.incremental) --dir=<path> 必填,manifest.yaml 所在目录
--cohort-batch=<N> 按 patient 分批跑(默认 5000;env PAC_COHORT_BATCH_SIZE 兜底)
首跑(全量)强烈建议保留默认或调小;日常增量自动跑
--no-cohort 显式禁用分批,跑 single-shot(文件源 / 调试)
--full 忽略上次 cursor,等价"完整重新拉一次"
(灾后重灌 / cursor 损坏时用,日常 sync 不要用)
--dry-run 只读 cursor 注入预览,不写库 --dry-run 只读 cursor 注入预览,不写库
--no-recompute 只拉数据,不触发 persona/plan 重算 --no-recompute 只拉数据,不触发 persona/plan 重算
--help, -h 显示本帮助 --help, -h 显示本帮助
首跑(无 cursor):等价全量(去 dev cohort LIMIT,按 yaml 完整拉) 行为模式:
后续:WHERE updated_date > '\${cursor}',只拉新/改的行 首跑(sync_logs 无 cursor):等价全量,自动分 N 批,内存恒定
后续增量:WHERE updated_date > '\${cursor}',只拉新/改的行
退出码:
0 success / partial(有进展)
1 bootstrap 失败(Nest 起不来)
2 sync 失败(数据/SQL 异常)
4 并发拦截(同 host 已有 sync 在跑)
`); `);
} }
...@@ -73,8 +108,11 @@ async function bootstrap() { ...@@ -73,8 +108,11 @@ async function bootstrap() {
process.exit(args.help ? 0 : 1); process.exit(args.help ? 0 : 1);
} }
const logger = new Logger('sync-incremental:cli'); const logger = new Logger('sync:cli');
logger.log(`Starting incremental sync(dir=${args.dir}, dryRun=${args.dryRun}, noRecompute=${args.noRecompute})`); logger.log(
`Starting sync(dir=${args.dir}, dryRun=${args.dryRun}, noRecompute=${args.noRecompute}, ` +
`full=${args.full}, cohortBatch=${args.cohortBatchSize ?? '(default/env)'})`,
);
const app = await NestFactory.createApplicationContext(AppModule, { const app = await NestFactory.createApplicationContext(AppModule, {
logger: ['log', 'warn', 'error'], logger: ['log', 'warn', 'error'],
...@@ -84,7 +122,10 @@ async function bootstrap() { ...@@ -84,7 +122,10 @@ async function bootstrap() {
const importSvc = app.get(ColdImportService); const importSvc = app.get(ColdImportService);
const result = await importSvc.importDirectory(args.dir!, { const result = await importSvc.importDirectory(args.dir!, {
dryRun: args.dryRun, dryRun: args.dryRun,
incremental: true, // --full 时走 legacy cold-import path(不读 cursor);否则增量(读上次 cursor)
// 注意 PR1 后两个 mode 都写 cursor_after=run_start(为下次增量留 baseline)
incremental: !args.full,
cohortBatchSize: args.cohortBatchSize,
}); });
logger.log('─────────────────────────────────────────'); logger.log('─────────────────────────────────────────');
......
...@@ -99,19 +99,23 @@ export class ColdImportService { ...@@ -99,19 +99,23 @@ export class ColdImportService {
` assemblers=${manifest.assemblers.length}`, ` assemblers=${manifest.assemblers.length}`,
); );
// 3. 增量 cursor 准备(必须在 SyncLog 创建之前算,因为 cursorBefore 要落到 sync_log 行) // 3. 增量 cursor 准备(PR3 统一:只要 manifest 有 incremental.per_query 配置,始终读 + 写 cursor)
// options.incremental 兼容老调用方:
// true(默认)→ 读上次 cursor,SQL WHERE > cursor 增量
// false → 忽略上次 cursor(等价 ignoreCursor=true),但仍写新 cursor_after=run_start
// 用于灾后重灌 / 强制全量 / cursor 损坏修复
// file 源(无 sql_source)→ 永远 single-shot,不参与 cursor
let incrementalConfig: IncrementalConfig | undefined; let incrementalConfig: IncrementalConfig | undefined;
let cursorBeforeJson: string | null = null; let cursorBeforeJson: string | null = null;
if (options.incremental) {
const perQueryCfg = manifest.sql_source?.incremental?.per_query; const perQueryCfg = manifest.sql_source?.incremental?.per_query;
if (!perQueryCfg) { if (perQueryCfg) {
throw new Error( // 默认读 cursor;options.incremental === false 时 (强制 full) 不读
`--incremental 模式但 manifest.sql_source.incremental.per_query 未配置;请在 manifest.yaml 补 cursor 配置`, const ignoreCursor = options.incremental === false;
); const lastCursor = ignoreCursor
} ? {} as Record<string, string>
const lastCursor = await this.readLastIncrementalCursor(host.id); : await this.readLastIncrementalCursor(host.id);
this.logger.log( this.logger.log(
`Incremental cursor 读:${Object.keys(lastCursor).length === 0 ? '(首跑,全量)' : JSON.stringify(lastCursor)}`, `Cursor:${ignoreCursor ? '(显式忽略 — --full)' : Object.keys(lastCursor).length === 0 ? '(首跑,全量)' : JSON.stringify(lastCursor)}`,
); );
incrementalConfig = { incrementalConfig = {
perQuery: Object.fromEntries( perQuery: Object.fromEntries(
...@@ -126,6 +130,11 @@ export class ColdImportService { ...@@ -126,6 +130,11 @@ export class ColdImportService {
Object.entries(incrementalConfig.perQuery).map(([k, v]) => [k, v.cursorValue ?? '']), Object.entries(incrementalConfig.perQuery).map(([k, v]) => [k, v.cursorValue ?? '']),
), ),
); );
} else if (options.incremental) {
// 老调用方明确要 incremental 但 manifest 没配 cursor → fast-fail
throw new Error(
`incremental 模式需要 manifest.sql_source.incremental.per_query;请补 cursor 配置`,
);
} }
// 4. SyncLog 创建 — ⭐ 作为并发锁 // 4. SyncLog 创建 — ⭐ 作为并发锁
...@@ -133,7 +142,10 @@ export class ColdImportService { ...@@ -133,7 +142,10 @@ export class ColdImportService {
// 同 host 已有 running 行时,本次 INSERT 撞 P2002 → 抛 SyncAlreadyRunningError → 调用方 skip // 同 host 已有 running 行时,本次 INSERT 撞 P2002 → 抛 SyncAlreadyRunningError → 调用方 skip
// finally 块统一 finalize status,确保锁一定被释放(进程崩需依赖 stale 清理) // finally 块统一 finalize status,确保锁一定被释放(进程崩需依赖 stale 清理)
const syncLogTenant = knownTenants.length === 1 ? knownTenants[0]! : '_multi'; const syncLogTenant = knownTenants.length === 1 ? knownTenants[0]! : '_multi';
const syncResource = options.incremental ? 'incremental_bundle' : 'cold_import_bundle'; // PR3 统一 resource = 'incremental_bundle';legacy --full 也写此 resource,
// 这样 readLastIncrementalCursor 能从任一历史 sync_log 拿到 cursor,语义统一
const syncResource = 'incremental_bundle';
const triggerMode = options.incremental === false ? 'full' : 'sync';
let syncLog: { id: string } | null = null; let syncLog: { id: string } | null = null;
if (!options.dryRun) { if (!options.dryRun) {
try { try {
...@@ -143,7 +155,7 @@ export class ColdImportService { ...@@ -143,7 +155,7 @@ export class ColdImportService {
tenantId: syncLogTenant, tenantId: syncLogTenant,
direction: SyncDirection.PULL, // 冷启复用 pull 语义(同套 pipeline) direction: SyncDirection.PULL, // 冷启复用 pull 语义(同套 pipeline)
resource: syncResource, resource: syncResource,
triggeredBy: `${options.incremental ? 'incremental' : 'cold_import'}:${path.basename(absDir)}:${runId}`, triggeredBy: `${triggerMode}:${path.basename(absDir)}:${runId}`,
status: SyncStatus.RUNNING, status: SyncStatus.RUNNING,
cursorBefore: cursorBeforeJson, cursorBefore: cursorBeforeJson,
}, },
...@@ -292,8 +304,11 @@ export class ColdImportService { ...@@ -292,8 +304,11 @@ export class ColdImportService {
// 理由:run_start 之后 DW 任何写入,下次增量 WHERE > run_start 都能捞回 // 理由:run_start 之后 DW 任何写入,下次增量 WHERE > run_start 都能捞回
// - 同行不同 updatedAt → source_event_id 不同 → 新 tx + 新 fact 版本(parser supersede 旧版) // - 同行不同 updatedAt → source_event_id 不同 → 新 tx + 新 fact 版本(parser supersede 旧版)
// - 同行同 updatedAt → P2002 conflict → parser re-run(idempotent,无害) // - 同行同 updatedAt → P2002 conflict → parser re-run(idempotent,无害)
// PR3 统一:只要 manifest 配了 incremental.per_query,无论 mode(sync / --full)
// 都写 cursor_after = run_start。--full 也写,保证下次正常 sync 从这里接力,
// 不会因为 --full 没写 cursor 而漏掉 --full 期间 DW 新写入的数据。
let cursorAfterJson: string | null = null; let cursorAfterJson: string | null = null;
if (options.incremental && incrementalConfig) { if (incrementalConfig) {
const runStartIso = runStart.toISOString(); const runStartIso = runStart.toISOString();
const oldCursors: Record<string, string> = JSON.parse(cursorBeforeJson ?? '{}'); const oldCursors: Record<string, string> = JSON.parse(cursorBeforeJson ?? '{}');
const merged = { ...oldCursors }; const merged = { ...oldCursors };
...@@ -304,7 +319,7 @@ export class ColdImportService { ...@@ -304,7 +319,7 @@ export class ColdImportService {
} }
} }
cursorAfterJson = JSON.stringify(merged); cursorAfterJson = JSON.stringify(merged);
this.logger.log(`Incremental cursor 写(run_start baseline):${cursorAfterJson}`); this.logger.log(`Cursor 写(run_start baseline):${cursorAfterJson}`);
} }
if (syncLog) { if (syncLog) {
const fetched = const fetched =
...@@ -914,14 +929,17 @@ export class ColdImportService { ...@@ -914,14 +929,17 @@ export class ColdImportService {
return result.data; return result.data;
} }
/// W4 末:读最近一次成功的 incremental sync_log 的 cursor_after JSON /// 读最近一次成功的 sync_log 的 cursor_after JSON。
/// 返回 { tableName: cursorValue } map(无记录返回空对象 → 首跑等价全量) /// 返回 { tableName: cursorValue } map(无记录返回空对象 → 首跑等价全量)。
/// PR3 统一:resource = 'incremental_bundle';历史 'cold_import_bundle' 行无 cursor_after,
/// 自然被 cursorAfter: { not: null } 过滤掉(不影响)。
private async readLastIncrementalCursor(hostId: string): Promise<Record<string, string>> { private async readLastIncrementalCursor(hostId: string): Promise<Record<string, string>> {
const last = await this.prisma.syncLog.findFirst({ const last = await this.prisma.syncLog.findFirst({
where: { where: {
hostId, hostId,
resource: 'incremental_bundle', resource: 'incremental_bundle',
status: SyncStatus.SUCCESS, // SUCCESS or PARTIAL 都接受(partial 表示有进展,cursor 已推进到 run_start)
status: { in: [SyncStatus.SUCCESS, SyncStatus.PARTIAL] },
cursorAfter: { not: null }, cursorAfter: { not: null },
}, },
orderBy: { startedAt: 'desc' }, orderBy: { startedAt: 'desc' },
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment