Commit 9caef1b0 by luoqi

feat(sync): 加 reparse 离线重解析 —— 改 yaml 字典不再需要全量重摄(task #46)

ColdImportService.reparseFromTransactions + `pnpm reparse` CLI:用已存的
transaction.rawPayload 重跑 transformEngine + assembler + processSubject(**不连 DW**),
fact 走版本流(内容变 supersede 升版本,不变幂等 no-op),只对真正变更的患者定向重算 persona/plan。

- 非破坏:不 truncate、不碰 transaction 账本 → plan_executions(客服回写)/ 分配全保住;
  比全量重摄快一个量级(省掉 DW 拉取)。
- 覆盖 field/enum/keyword_mapping + transforms 算子 + parser 改动(长尾字典修补 99%);
  非 transform 产出的资源(如 image_finding_rows = CH SQL 视图)自动跳过(其 rawPayload 非装配输入,需走 DW)。
- traceRawSourceTable:沿 transforms output→input 链回溯到原始源表(rawPayload 就是它的行)。
- 用法:pnpm reparse -- --host=jvs-dw --subject-type=diagnosis [--patient=..] [--dry-run] [--no-recompute]
- 本地实测:diagnosis 全量 superseded=36 / unchanged=3820 / created=0(零 churn),单患者 end-to-end
  fact 升版本 + 定向重算 + 正畸召回正确出现。

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
parent 5696e951
...@@ -21,6 +21,8 @@ ...@@ -21,6 +21,8 @@
"sync:once": "ts-node --transpile-only src/cli/sync-incremental.cli.ts", "sync:once": "ts-node --transpile-only src/cli/sync-incremental.cli.ts",
"cold-import": "ts-node --transpile-only src/cli/cold-import.cli.ts", "cold-import": "ts-node --transpile-only src/cli/cold-import.cli.ts",
"cold-import:prod": "node --max-old-space-size=8192 dist/cli/cold-import.cli.js", "cold-import:prod": "node --max-old-space-size=8192 dist/cli/cold-import.cli.js",
"reparse": "ts-node --transpile-only src/cli/reparse.cli.ts",
"reparse:prod": "node --max-old-space-size=8192 dist/cli/reparse.cli.js",
"sync-incremental": "ts-node --transpile-only src/cli/sync-incremental.cli.ts", "sync-incremental": "ts-node --transpile-only src/cli/sync-incremental.cli.ts",
"sync-incremental:prod": "node --max-old-space-size=4096 dist/cli/sync-incremental.cli.js", "sync-incremental:prod": "node --max-old-space-size=4096 dist/cli/sync-incremental.cli.js",
"recompute-persona": "ts-node --transpile-only src/cli/recompute-persona.cli.ts", "recompute-persona": "ts-node --transpile-only src/cli/recompute-persona.cli.ts",
......
/**
* Reparse CLI — 用已存的 transaction.rawPayload 离线重跑装配+解析(不连 DW)。
*
* 改了 assembler yaml(enum/keyword/field_mapping / transforms)后,把存量 fact 按新口径
* 重新衍生(版本流 supersede),**不 truncate、不碰账本、不拉 DW**,执行历史/分配全保住。
*
* Usage:
* pnpm reparse -- --host=jvs-dw --subject-type=diagnosis # 实跑 + 定向重算
* pnpm reparse -- --host=jvs-dw --subject-type=diagnosis --dry-run # 只报会改多少,不写
* pnpm reparse -- --host=jvs-dw --patient=<uuid>,<uuid> # 限定患者
* pnpm reparse -- --host=jvs-dw --subject-type=diagnosis --no-recompute # 只重衍 fact,稍后单独 recompute
*
* 通过 NestApplicationContext 启动(不起 HTTP),复用 DI。
*/
import { NestFactory } from '@nestjs/core';
import { Logger } from '@nestjs/common';
import { AppModule } from '../app.module';
import { PrismaService } from '../prisma/prisma.service';
import { ColdImportService } from '../modules/sync/cold-import/cold-import.service';
import { PersonaService } from '../modules/persona/persona.service';
import { PlanEngineService } from '../modules/plan/engine/plan-engine.service';
interface CliArgs {
dir: string;
host: string;
subjectTypes?: string[];
patientIds?: string[];
dryRun: boolean;
recompute: boolean;
help: boolean;
}
function parseArgs(argv: string[]): CliArgs {
const args: CliArgs = { dir: './data/jvs-dw', host: 'jvs-dw', dryRun: false, recompute: true, help: false };
for (const a of argv) {
if (a === '--help' || a === '-h') args.help = true;
else if (a === '--dry-run') args.dryRun = true;
else if (a === '--no-recompute') args.recompute = false;
else if (a.startsWith('--dir=')) args.dir = a.slice('--dir='.length);
else if (a.startsWith('--host=')) args.host = a.slice('--host='.length);
else if (a.startsWith('--subject-type=')) args.subjectTypes = a.slice('--subject-type='.length).split(',').map((s) => s.trim()).filter(Boolean);
else if (a.startsWith('--patient=')) args.patientIds = a.slice('--patient='.length).split(',').map((s) => s.trim()).filter(Boolean);
}
return args;
}
async function main(): Promise<void> {
const logger = new Logger('reparse-cli');
const args = parseArgs(process.argv.slice(2));
if (args.help) {
// eslint-disable-next-line no-console
console.log(
'pnpm reparse -- --host=<name> [--dir=./data/<host>] [--subject-type=diagnosis,treatment]\n' +
' [--patient=<uuid>,...] [--dry-run] [--no-recompute]',
);
return;
}
const app = await NestFactory.createApplicationContext(AppModule, { logger: ['log', 'warn', 'error'] });
try {
const prisma = app.get(PrismaService);
const coldImport = app.get(ColdImportService);
const persona = app.get(PersonaService);
const planEngine = app.get(PlanEngineService);
logger.log(
`reparse start: host=${args.host} dir=${args.dir} ` +
`subjectTypes=${args.subjectTypes?.join(',') ?? 'ALL'} ` +
`patients=${args.patientIds?.length ?? 'all'} dryRun=${args.dryRun} recompute=${args.recompute}`,
);
const result = await coldImport.reparseFromTransactions({
dir: args.dir,
hostName: args.host,
subjectTypes: args.subjectTypes,
patientIds: args.patientIds,
dryRun: args.dryRun,
});
if (args.dryRun) {
logger.log(
`[DRY-RUN] 上方 reparse[dry] 日志列出了各资源的源行数 会装配的 canonical 数。` +
`不写库。去掉 --dry-run 实跑(非破坏:版本流 supersede 变更的、跳过不变的,会精确报 superseded/unchanged)`,
);
await app.close();
return;
}
const totalSuperseded = result.perResource.reduce((s, r) => s + r.factsSuperseded, 0);
const totalCreated = result.perResource.reduce((s, r) => s + r.factsCreated, 0);
logger.log(
`reparse fact 重衍完成:superseded=${totalSuperseded} created=${totalCreated} ` +
`affectedPatients=${result.affectedPatientIds.length}`,
);
if (!args.recompute) {
logger.log(`--no-recompute:跳过 persona/plan 重算。记得稍后跑 recompute-persona / recompute-plans`);
await app.close();
return;
}
// 定向重算受影响患者(persona force=true:reparse 不动 transaction 水位,需跳幂等闸)
const ids = result.affectedPatientIds;
logger.log(`定向重算 ${ids.length} 个患者的 persona + plan `);
let done = 0;
let personaErr = 0;
let planErr = 0;
for (const patientId of ids) {
const patient = await prisma.patient.findUnique({
where: { id: patientId },
select: { hostId: true, tenantId: true },
});
if (!patient) continue;
try {
await persona.recompute({ patientId, source: 'reparse', force: true });
} catch (e) {
personaErr++;
logger.warn(`persona recompute 失败 patient=${patientId}: ${(e as Error).message}`);
}
try {
await planEngine.recomputeForPatient({ hostId: patient.hostId, tenantId: patient.tenantId, patientId });
} catch (e) {
planErr++;
logger.warn(`plan recompute 失败 patient=${patientId}: ${(e as Error).message}`);
}
if (++done % 200 === 0) logger.log(` recompute 进度 ${done}/${ids.length}`);
}
logger.log(`reparse DONE:重算 ${done} 患者(persona 失败 ${personaErr} / plan 失败 ${planErr})`);
await app.close();
} catch (err) {
logger.error(`reparse 失败:${err instanceof Error ? err.message : String(err)}`);
await app.close();
process.exitCode = 1;
}
}
void main();
...@@ -58,6 +58,151 @@ export class ColdImportService { ...@@ -58,6 +58,151 @@ export class ColdImportService {
private readonly transformEngine: TransformEngine, private readonly transformEngine: TransformEngine,
) {} ) {}
/**
* Reparse — 用已存的 `transaction.rawPayload` 离线重跑 装配+解析(**不连 DW**)。
*
* 场景:改了 assembler yaml(enum_mapping / keyword_mapping / field_mapping / transforms)后,
* 把存量 patient_facts 按新口径重新衍生 —— **不 truncate、不碰 transaction 账本、不拉 DW**,
* 因此执行历史(plan_executions)/ 分配全保住,比全量重摄快一个量级(省掉 DW 拉取)。
*
* 原理:`transaction.rawPayload` 存的就是 host 原始源行(如 EMR 行)。重建出原始源表 →
* 喂回 `transformEngine` + `processSubject`;transaction 经 source_event_id 幂等命中已存
* (createMany skipDuplicates 跳过重写),但 SELECT-back 后 **parser 照常重跑** → fact 走
* 版本流(内容变 → supersede 升版本,不变 → no-op)。
*
* 覆盖:field/enum/keyword_mapping + transforms 算子 + parser 改动(长尾字典修补 99%)。
* 不覆盖:改了 CH `sql_source` SELECT 本身(要从 DW 多拉原来没拉的列/表)→ 那才需真重摄。
*
* @returns 受影响 patientId(调用方据此重算 persona/plan);dryRun 时只 diff 不写。
*/
async reparseFromTransactions(opts: {
dir: string;
hostName: string;
subjectTypes?: string[]; // 限定 canonical(如 ['diagnosis']);空 = 全部 subject 资源
patientIds?: string[];
dryRun?: boolean;
}): Promise<{
perResource: PerResourceStats[];
affectedPatientIds: string[];
dryRunDiffs: Array<{ resource: string; subjectId: string; field: string; from: string; to: string }>;
}> {
const absDir = path.resolve(opts.dir);
const manifest = this.readManifest(absDir);
const host = await this.prisma.host.findFirst({ where: { name: opts.hostName } });
if (!host) throw new Error(`reparse: host '${opts.hostName}' not found`);
const tenantResolver = buildTenantResolver(manifest);
const normalize = { amountUnit: manifest.amount_unit, timezone: manifest.timezone };
const subjectCfgs = this.loadAllAssemblers(absDir, manifest).filter((c) => !!c.emits);
const targetCfgs = opts.subjectTypes?.length
? subjectCfgs.filter((c) => opts.subjectTypes!.includes(c.canonical))
: subjectCfgs;
if (targetCfgs.length === 0) {
throw new Error(
`reparse: 无匹配 subject 配置(subjectTypes=${opts.subjectTypes?.join(',') ?? 'all'});` +
`可选=${subjectCfgs.map((c) => c.canonical).join(',')}`,
);
}
const transforms = manifest.transforms ?? [];
// 只能离线 reparse「transform 产出的 primary 表」:其 rawPayload 是 transform 的源行,可重放。
// 非 transform 产出的(如 image_finding_rows = CH SQL 视图)→ rawPayload 不是其装配输入 → 跳过(需走 DW)。
const reparseableCfgs = targetCfgs.filter((cfg) => {
const raw = traceRawSourceTable(cfg.primary.table, transforms);
if (raw === cfg.primary.table) {
this.logger.warn(
`reparse: 跳过 ${cfg.canonical}(file primary=${cfg.primary.table} transform 产出 ` +
`rawPayload 不是其装配输入,无法离线 reparse;该资源如需更新走 DW 重摄)`,
);
return false;
}
return true;
});
if (reparseableCfgs.length === 0) {
throw new Error('reparse: 目标资源都非 transform 产出,无法离线 reparse(需走 DW 重摄)');
}
this.logger.log(
`reparse: host=${host.name} resources=[${reparseableCfgs.map((c) => c.canonical + '/' + c.primary.table).join(',')}] ` +
`dryRun=${!!opts.dryRun} patientScope=${opts.patientIds?.length ?? 'all'}`,
);
// 1. 拉 scoped transactions,按 raw 源表收集 distinct rawPayload(payloadHash 去重)
const tables: Record<string, Record<string, unknown>[]> = {};
const seenHashByTable = new Map<string, Set<string>>();
for (const cfg of reparseableCfgs) {
const subjectType = cfg.emits!.subjectType;
const rawTable = traceRawSourceTable(cfg.primary.table, transforms);
const bucket = (tables[rawTable] ??= []);
const seen = seenHashByTable.get(rawTable) ?? new Set<string>();
seenHashByTable.set(rawTable, seen);
const txs = await this.prisma.patientTransaction.findMany({
where: {
hostId: host.id,
subjectType,
...(opts.patientIds?.length ? { patientId: { in: opts.patientIds } } : {}),
},
select: { rawPayload: true, payloadHash: true },
});
for (const t of txs) {
if (seen.has(t.payloadHash)) continue;
seen.add(t.payloadHash);
bucket.push(t.rawPayload as Record<string, unknown>);
}
this.logger.log(
`reparse: ${cfg.canonical}(subjectType=${subjectType}) raw=${rawTable} ` +
`txns=${txs.length} distinctSrcRows=${bucket.length}`,
);
}
// transforms 引用但未重建的输入表 → 补空,避免引擎缺表
for (const tf of transforms) {
const inp = (tf as { input?: string }).input;
if (inp && !tables[inp]) tables[inp] = [];
}
const transformed =
transforms.length > 0 ? this.transformEngine.run({ tables, transforms }) : tables;
// 2a. dryRun:只报「会处理多少」的 scope(跑一遍 assembler 看产出量),**不写库**。
// 不做 fact 级 diff —— fact.subjectId 由 parser 自行派生(≠ canonical.externalId),
// 离线精确预测需重跑 parser;而 reparse 本身非破坏(版本流 supersede + 幂等),
// 实跑会精确报 superseded/unchanged,故 dry-run 给 scope 即够。
const dryRunDiffs: Array<{ resource: string; subjectId: string; field: string; from: string; to: string }> = [];
if (opts.dryRun) {
for (const cfg of reparseableCfgs) {
const assembled = this.assembler.assemble({ tables: transformed, config: cfg, normalize, hostId: host.id });
this.logger.log(
`reparse[dry]: ${cfg.canonical} 源行=${(transformed[traceRawSourceTable(cfg.primary.table, transforms)] as unknown[])?.length ?? 0} ` +
` 装配出 ${assembled.rows.length} canonical(assembler 失败 ${assembled.stats.failed});实跑将按版本流 supersede 变更的、跳过不变的`,
);
}
return { perResource: [], affectedPatientIds: [], dryRunDiffs };
}
// 2b. 实跑:processSubject(复用摄入路径)→ transaction 幂等命中 → parser 重衍生 fact(版本流)
const runStart = new Date();
const perResource: PerResourceStats[] = [];
const seenTenants = new Set<string>();
for (const cfg of reparseableCfgs) {
const stats = await this.processSubject(transformed, cfg, host.id, tenantResolver, seenTenants, normalize, undefined, false);
perResource.push(stats);
this.logger.log(
`reparse: ${cfg.canonical} txns=${stats.transactionsWritten} ` +
`factsSuperseded=${stats.factsSuperseded} factsUnchanged=${stats.factsUnchanged} factsCreated=${stats.factsCreated}`,
);
}
// 3. 受影响 patientId = 本次真正被 supersede(内容变了)的 fact 的 distinct patient → 只重算这些。
// 比"scope 内所有患者"精准得多(避免对没变的患者做无谓 persona/plan 重算 churn)。
const changed = await this.prisma.patientFact.findMany({
where: {
hostId: host.id,
supersededAt: { gte: runStart },
...(opts.patientIds?.length ? { patientId: { in: opts.patientIds } } : {}),
},
select: { patientId: true },
distinct: ['patientId'],
});
const affectedPatientIds = changed.map((a) => a.patientId).filter((x): x is string => !!x);
return { perResource, affectedPatientIds, dryRunDiffs };
}
async importDirectory( async importDirectory(
dir: string, dir: string,
options: { options: {
...@@ -1619,6 +1764,28 @@ interface TotalsBlock { ...@@ -1619,6 +1764,28 @@ interface TotalsBlock {
factsFailed: number; factsFailed: number;
} }
/// 沿 transforms 的 output→input 链回溯,找到非任何 transform 产出的"原始源表"名。
/// 例:diagnosis_rows ← _diagnosis_norm ← _diagnosis_raw ← fact_emr_treatment_out(终点=源表)。
/// 用于 reparse:transaction.rawPayload 存的就是这张源表的行。
function traceRawSourceTable(primaryTable: string, transforms: ReadonlyArray<unknown>): string {
const byOutput = new Map<string, string>(); // output 表名 → input 表名
for (const tf of transforms) {
const t = tf as { input?: string; output?: string; outputs?: Array<{ output?: string }> };
if (t.output && t.input) byOutput.set(t.output, t.input);
// route_by_pattern 多 output:每个 output 都回到同一 input
if (Array.isArray(t.outputs) && t.input) {
for (const o of t.outputs) if (o?.output) byOutput.set(o.output, t.input);
}
}
let cur = primaryTable;
const seen = new Set<string>();
while (byOutput.has(cur) && !seen.has(cur)) {
seen.add(cur);
cur = byOutput.get(cur)!;
}
return cur;
}
export interface PerResourceStats extends TotalsBlock { export interface PerResourceStats extends TotalsBlock {
resource: string; resource: string;
fetched: number; fetched: number;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment