Commit e2960fa2 by luoqi

perf(sync): reparse 按患者分批,避免全量时把全 host rawPayload 拉进内存 OOM

全量 reparse 时一次性 findMany 所有 rawPayload 可达 GB 级(服务器 diagnosis 975MB/541k txn)。
改为按患者 cohort 分批(PAC_REPARSE_BATCH,默认 3000):每批只重建该批的 distinct 源行 →
transform → processSubject,内存恒定(本地实测 rss ~460MB);跨批累加 stats,supersededAt 圈出
变更患者定向重算。dry-run 改报 scope(count,不分批装配)。

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
parent 84820f32
......@@ -124,72 +124,92 @@ export class ColdImportService {
`dryRun=${!!opts.dryRun} patientScope=${opts.patientIds?.length ?? 'all'}`,
);
// 1. 拉 scoped transactions,按 raw 源表收集 distinct rawPayload(payloadHash 去重)
const tables: Record<string, Record<string, unknown>[]> = {};
const seenHashByTable = new Map<string, Set<string>>();
for (const cfg of reparseableCfgs) {
const subjectType = cfg.emits!.subjectType;
const rawTable = traceRawSourceTable(cfg.primary.table, transforms);
const bucket = (tables[rawTable] ??= []);
const seen = seenHashByTable.get(rawTable) ?? new Set<string>();
seenHashByTable.set(rawTable, seen);
const txs = await this.prisma.patientTransaction.findMany({
where: {
hostId: host.id,
subjectType,
...(opts.patientIds?.length ? { patientId: { in: opts.patientIds } } : {}),
},
select: { rawPayload: true, payloadHash: true },
// 1. 解析 patient 范围 —— **按患者分批**重建源表,避免一次性把全 host rawPayload(可达 GB 级)
// 拉进内存 OOM(全量 reparse 时尤为关键)。
const subjectTypes = reparseableCfgs.map((c) => c.emits!.subjectType);
let scopePatientIds = opts.patientIds;
if (!scopePatientIds?.length) {
const rows = await this.prisma.patientTransaction.findMany({
where: { hostId: host.id, subjectType: { in: subjectTypes }, patientId: { not: null } },
select: { patientId: true },
distinct: ['patientId'],
});
for (const t of txs) {
if (seen.has(t.payloadHash)) continue;
seen.add(t.payloadHash);
bucket.push(t.rawPayload as Record<string, unknown>);
}
this.logger.log(
`reparse: ${cfg.canonical}(subjectType=${subjectType}) raw=${rawTable} ` +
`txns=${txs.length} distinctSrcRows=${bucket.length}`,
);
scopePatientIds = rows.map((r) => r.patientId).filter((x): x is string => !!x);
}
// transforms 引用但未重建的输入表 → 补空,避免引擎缺表
for (const tf of transforms) {
const inp = (tf as { input?: string }).input;
if (inp && !tables[inp]) tables[inp] = [];
}
const transformed =
transforms.length > 0 ? this.transformEngine.run({ tables, transforms }) : tables;
// 2a. dryRun:只报「会处理多少」的 scope(跑一遍 assembler 看产出量),**不写库**。
// 不做 fact 级 diff —— fact.subjectId 由 parser 自行派生(≠ canonical.externalId),
// 离线精确预测需重跑 parser;而 reparse 本身非破坏(版本流 supersede + 幂等),
// 实跑会精确报 superseded/unchanged,故 dry-run 给 scope 即够。
const dryRunDiffs: Array<{ resource: string; subjectId: string; field: string; from: string; to: string }> = [];
// 2a. dryRun:报 scope(患者数 + 各资源 txn 数),不写、不分批装配(便宜)。
if (opts.dryRun) {
for (const cfg of reparseableCfgs) {
const assembled = this.assembler.assemble({ tables: transformed, config: cfg, normalize, hostId: host.id });
this.logger.log(
`reparse[dry]: ${cfg.canonical} 源行=${(transformed[traceRawSourceTable(cfg.primary.table, transforms)] as unknown[])?.length ?? 0} ` +
` 装配出 ${assembled.rows.length} canonical(assembler 失败 ${assembled.stats.failed});实跑将按版本流 supersede 变更的、跳过不变的`,
);
const n = await this.prisma.patientTransaction.count({
where: { hostId: host.id, subjectType: cfg.emits!.subjectType, ...(opts.patientIds?.length ? { patientId: { in: opts.patientIds } } : {}) },
});
this.logger.log(`reparse[dry]: ${cfg.canonical}(${cfg.emits!.subjectType}) txns=${n} 实跑按版本流 supersede 变更的、跳过不变的`);
}
this.logger.log(`reparse[dry]: 范围 ${scopePatientIds.length} 患者;去掉 --dry-run 实跑(非破坏)`);
return { perResource: [], affectedPatientIds: [], dryRunDiffs };
}
// 2b. 实跑:processSubject(复用摄入路径)→ transaction 幂等命中 → parser 重衍生 fact(版本流)
// 2b. 分批实跑:每批患者 → 重建源表(只这批 distinct rawPayload)→ transform → processSubject
// (transaction 幂等命中已存 → parser 重衍生 fact,版本流 supersede)。
const BATCH = Math.max(1, Number(process.env.PAC_REPARSE_BATCH) || 3000);
const runStart = new Date();
const perResource: PerResourceStats[] = [];
const aggByResource = new Map<string, PerResourceStats>();
const seenTenants = new Set<string>();
for (const cfg of reparseableCfgs) {
const stats = await this.processSubject(transformed, cfg, host.id, tenantResolver, seenTenants, normalize, undefined, false);
perResource.push(stats);
this.logger.log(
`reparse: ${cfg.canonical} txns=${stats.transactionsWritten} ` +
`factsSuperseded=${stats.factsSuperseded} factsUnchanged=${stats.factsUnchanged} factsCreated=${stats.factsCreated}`,
);
const totalBatches = Math.ceil(scopePatientIds.length / BATCH);
this.logger.log(`reparse: 范围 ${scopePatientIds.length} 患者, ${totalBatches} (${BATCH}/批)`);
for (let bi = 0; bi < totalBatches; bi++) {
const batch = scopePatientIds.slice(bi * BATCH, (bi + 1) * BATCH);
const tables: Record<string, Record<string, unknown>[]> = {};
const seenHashByTable = new Map<string, Set<string>>();
for (const cfg of reparseableCfgs) {
const rawTable = traceRawSourceTable(cfg.primary.table, transforms);
const bucket = (tables[rawTable] ??= []);
const seen = seenHashByTable.get(rawTable) ?? new Set<string>();
seenHashByTable.set(rawTable, seen);
const txs = await this.prisma.patientTransaction.findMany({
where: { hostId: host.id, subjectType: cfg.emits!.subjectType, patientId: { in: batch } },
select: { rawPayload: true, payloadHash: true },
});
for (const t of txs) {
if (seen.has(t.payloadHash)) continue;
seen.add(t.payloadHash);
bucket.push(t.rawPayload as Record<string, unknown>);
}
}
for (const tf of transforms) {
const inp = (tf as { input?: string }).input;
if (inp && !tables[inp]) tables[inp] = [];
}
const transformed = transforms.length > 0 ? this.transformEngine.run({ tables, transforms }) : tables;
for (const cfg of reparseableCfgs) {
const stats = await this.processSubject(transformed, cfg, host.id, tenantResolver, seenTenants, normalize, undefined, false);
const agg = aggByResource.get(cfg.canonical);
if (!agg) {
aggByResource.set(cfg.canonical, stats);
} else {
agg.transactionsWritten += stats.transactionsWritten;
agg.duplicates += stats.duplicates;
agg.failed += stats.failed;
agg.factsCreated += stats.factsCreated;
agg.factsSuperseded += stats.factsSuperseded;
agg.factsUnchanged += stats.factsUnchanged;
agg.factsEvidenceAppended += stats.factsEvidenceAppended;
agg.factsFailed += stats.factsFailed;
agg.fetched += stats.fetched;
}
}
if ((bi + 1) % 5 === 0 || bi + 1 === totalBatches) {
const sup = [...aggByResource.values()].reduce((s, r) => s + r.factsSuperseded, 0);
this.logger.log(
`reparse: batch ${bi + 1}/${totalBatches} 累计 superseded=${sup} ` +
`rss=${Math.round(process.memoryUsage().rss / 1024 / 1024)}MB`,
);
}
}
const perResource = [...aggByResource.values()];
// 3. 受影响 patientId = 本次真正被 supersede(内容变了)的 fact 的 distinct patient → 只重算这些。
// 比"scope 内所有患者"精准得多(避免对没变的患者做无谓 persona/plan 重算 churn)。
const changed = await this.prisma.patientFact.findMany({
where: {
hostId: host.id,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment