Commit 3fd28974 by luoqi

feat(sync): PR4 — bulk createMany + batched parser(9h→30min 性能优化)

Hot path 优化 — 把每行一次 SQL 改成每 1000 行一次 SQL,SQL 往返从 ~3N 降到 ~5。

FactWriter 新增 bulkWrite(entries):
  - 1 次 SELECT 取所有相关 subject 的 latest version(by subjectId IN)
  - 内存里链式决策:unchanged / evidence_append / supersede / create
  - 处理 batch 内同 subject 多 draft(后 draft 跟前 draft 比,递推 liveLatest)
  - 1 个 $transaction commit:bulk updateMany supersede + bulk createMany 新版本
    + 罕见 evidence_append 走 array_append raw SQL
  - {maxWait: 30s, timeout: 120s} 防大批量 + swap 下 5s 默认超时

ParserPipeline 新增 runForBatch(items):
  - 全部 tx 走 parser 收集 drafts(in-memory,无 DB)
  - 1 次 FactWriter.bulkWrite 提交;失败降级 per-entry writeDraft(保收尾)
  - 同 runForTransaction 的 metrics 接口,调用方零适配

cold-import processSubject 大重构:
  - 引入 buffer 按 tenant 分桶(因 createMany 不能跨 tenant)
  - 每 N 行触发 flushBatchedWrite:
      1. createMany tx({skipDuplicates: true})— 1 SQL 写 N 行
      2. SELECT WHERE sourceEventId IN (...) — 取回 tx ids 喂 parser
      3. parserPipeline.runForBatch — 内部 bulkWrite
  - createMany 失败降级 fallbackPerRowWrite(per-row 老路径,保稳)
  - env PAC_WRITE_BATCH_SIZE 兜底(默认 1000;0 = 退回 per-row 回滚开关)

性能预期(实测待验证):
  per-row baseline:  ~80 tx/s (实测服务器)
  bulk createMany +  10-20x → 800-1600 tx/s
  4.6M 行全量:       9-12h → **30-60min**

跟 PR3 统一 sync 模式协同:
  - 任何 mode(sync/--full/cron 增量)都走同一条 hot path
  - cohort batch(PR2)+ write batch(PR4)正交叠加
  - 失败降级保稳(createMany 崩 → fallback per-row;bulkWrite 崩 → fallback writeDraft)
  - 同 fact subject_id 跨 batch 一致性靠 version + partial UNIQUE active 兜底,不变

未来 PR5(可选):pg-copy-streams 真 COPY + staging 表 → 再 3-5x(总 30-50x)
parent f7a6d41f
...@@ -9,7 +9,7 @@ import type { Action, CanonicalResourceKey } from '@pac/types'; ...@@ -9,7 +9,7 @@ import type { Action, CanonicalResourceKey } from '@pac/types';
import type { Prisma } from '@prisma/client'; import type { Prisma } from '@prisma/client';
import { PrismaService } from '../../../prisma/prisma.service'; import { PrismaService } from '../../../prisma/prisma.service';
import { TransactionSynthesizer } from '../pipeline/transaction-synthesizer'; import { TransactionSynthesizer } from '../pipeline/transaction-synthesizer';
import { ParserPipeline } from '../pipeline/parser-pipeline.service'; import { ParserPipeline, type BatchItem } from '../pipeline/parser-pipeline.service';
import { AssemblerEngine, type AssemblerResult } from '../assembler/assembler-engine'; import { AssemblerEngine, type AssemblerResult } from '../assembler/assembler-engine';
import { import {
AssemblerConfigSchema, AssemblerConfigSchema,
...@@ -764,52 +764,227 @@ export class ColdImportService { ...@@ -764,52 +764,227 @@ export class ColdImportService {
// ⚠️ sourceContext 必须跨 run 稳定(否则 source_event_id 永远不重复) // ⚠️ sourceContext 必须跨 run 稳定(否则 source_event_id 永远不重复)
const sourceContext = `cold-import`; const sourceContext = `cold-import`;
for (const { canonical, rawSource } of assembled.rows) { // PR4 ⭐ 批写 buffer — 默认 1000 行一批,createMany + bulkWrite 替代 per-row 写
const canonicalRow = canonical as Record<string, unknown>; // env PAC_WRITE_BATCH_SIZE 兜底;0 = 退回 per-row(回滚开关)
// per-row tenant 解析 const writeBatchSize = resolveWriteBatchSize();
const tenantId = tenantResolver.resolve(rawSource);
if (!tenantId) { // 同 batch 内不同 tenant 要分别 createMany(prisma createMany 不能跨 tenant)
stats.failed++; // 简化:按 tenant 分桶 buffer
this.logger.warn( const buffersByTenant = new Map<
`${config.canonical} row 未匹配 tenant(${tenantResolver.describe()}),跳过`, string,
); Array<{
continue; tx: Prisma.PatientTransactionUncheckedCreateInput;
} canonicalRow: Record<string, unknown>;
seenTenants.add(tenantId); }>
if (dryRun) { >();
stats.transactionsWritten++;
continue; const flushTenant = async (tenantId: string): Promise<void> => {
} const buf = buffersByTenant.get(tenantId);
// 懒加载该 tenant 的 patient index if (!buf || buf.length === 0) return;
let patientIndex = patientIndexByTenant.get(tenantId); await this.flushBatchedWrite({
if (!patientIndex) { tenantId,
patientIndex = await this.buildPatientIndex(hostId, tenantId); buffer: buf,
patientIndexByTenant.set(tenantId, patientIndex); stats,
config,
});
buf.length = 0;
};
try {
for (const { canonical, rawSource } of assembled.rows) {
const canonicalRow = canonical as Record<string, unknown>;
// per-row tenant 解析
const tenantId = tenantResolver.resolve(rawSource);
if (!tenantId) {
stats.failed++;
this.logger.warn(
`${config.canonical} row 未匹配 tenant(${tenantResolver.describe()}),跳过`,
);
continue;
}
seenTenants.add(tenantId);
if (dryRun) {
stats.transactionsWritten++;
continue;
}
// 懒加载该 tenant 的 patient index
let patientIndex = patientIndexByTenant.get(tenantId);
if (!patientIndex) {
patientIndex = await this.buildPatientIndex(hostId, tenantId);
patientIndexByTenant.set(tenantId, patientIndex);
}
// stub auto-create
const patientExternalId = canonicalRow.patientExternalId as string | undefined;
if (patientExternalId && !patientIndex.has(patientExternalId)) {
const stubId = await this.ensurePatientStub(hostId, tenantId, patientExternalId);
patientIndex.set(patientExternalId, stubId);
}
const txn = this.synthesizer.synthesize({
rawRow: rawSource,
canonicalRow,
emits: assembled.emits,
resource: config.canonical,
hostId,
tenantId,
patientIdResolver: (extId) => patientIndex!.get(extId),
sourceContext,
});
if (!txn) {
stats.failed++;
continue;
}
if (syncLogId) txn.syncLogId = syncLogId;
// 入 buffer
let buf = buffersByTenant.get(tenantId);
if (!buf) {
buf = [];
buffersByTenant.set(tenantId, buf);
}
buf.push({ tx: txn, canonicalRow });
// batch 满 → flush 该 tenant
if (writeBatchSize > 0 && buf.length >= writeBatchSize) {
await flushTenant(tenantId);
}
} }
// ⭐ W4 末 stub auto-create:本批 fact 引用的 patient 主档没拉到 → 即时建空 stub
// 后续 fact_client_out(同 run 反向拉 OR 下次 sync 主档更新)会 upsert 补 PII // 尾巴 — flush 所有未满 buffer
// 保证 fact 立即派生不丢,不依赖等主档(详见 docs/dw-data-source-issues.md "patient stub") for (const tenantId of buffersByTenant.keys()) {
const patientExternalId = canonicalRow.patientExternalId as string | undefined; await flushTenant(tenantId);
if (patientExternalId && !patientIndex.has(patientExternalId)) {
const stubId = await this.ensurePatientStub(hostId, tenantId, patientExternalId);
patientIndex.set(patientExternalId, stubId);
} }
const txn = this.synthesizer.synthesize({ } catch (err) {
rawRow: rawSource, this.logger.error(
canonicalRow, `processSubject(${config.canonical}) 批写主循环异常: ${
emits: assembled.emits, err instanceof Error ? err.message : String(err)
resource: config.canonical, }`,
);
throw err;
}
return stats;
}
/**
* PR4 批写一批 transactions + 衍生 facts。
*
* 流程:
* 1. prisma.patientTransaction.createMany({ data, skipDuplicates: true })
* → 1 次 SQL 写 N 行,撞 source_event_id 唯一约束自动 skip
* 2. SELECT ... WHERE sourceEventId IN (...) 取回所有 tx 行(含 id)— 1 次 SQL
* 新增 + 已存在的都会回来,parser 全部跑(idempotent via fact hash)
* 3. parserPipeline.runForBatch(items) → 内部 bulkWrite,1 次 SELECT 现有 fact + 批 INSERT/UPDATE
*
* 相比 per-row 老路径:N 行从 ~3N 次 SQL 降到 ~5 次 SQL → 10-20x 速度
*/
private async flushBatchedWrite(args: {
tenantId: string;
buffer: Array<{
tx: Prisma.PatientTransactionUncheckedCreateInput;
canonicalRow: Record<string, unknown>;
}>;
stats: PerResourceStats;
config: AssemblerConfig;
}): Promise<void> {
const { tenantId, buffer, stats, config } = args;
if (buffer.length === 0) return;
const hostId = buffer[0]!.tx.hostId as string;
// 1. createMany tx(skipDuplicates 跳重)
const data = buffer.map((b) => b.tx);
let createdCount = 0;
try {
const r = await this.prisma.patientTransaction.createMany({
data,
skipDuplicates: true,
});
createdCount = r.count;
} catch (err) {
// createMany 整批失败(罕见 — schema 校验等)→ 降级 per-row
this.logger.warn(
`createMany tx 批失败,降级 per-row: ${err instanceof Error ? err.message : String(err)}`,
);
await this.fallbackPerRowWrite(buffer, stats, config);
return;
}
stats.transactionsWritten += createdCount;
stats.duplicates += buffer.length - createdCount;
// 2. 取回所有 tx 行(含 id),供 parser 用
const eventIds = buffer
.map((b) => b.tx.sourceEventId as string | undefined)
.filter((id): id is string => !!id);
if (eventIds.length === 0) {
return; // 全 buffer 都没 source_event_id?理论不该发生
}
const txRows = await this.prisma.patientTransaction.findMany({
where: {
hostId, hostId,
tenantId, tenantId,
patientIdResolver: (extId) => patientIndex!.get(extId), sourceEventId: { in: eventIds },
sourceContext, },
}); select: {
if (!txn) { id: true,
stats.failed++; hostId: true,
tenantId: true,
patientId: true,
action: true,
subjectType: true,
subjectId: true,
occurredAt: true,
clinicId: true,
sourceEventId: true,
},
});
const txBySourceEventId = new Map(txRows.map((t) => [t.sourceEventId!, t]));
// 3. 构建 batch items,跑 parserPipeline.runForBatch
const items: BatchItem[] = [];
for (const b of buffer) {
const sid = b.tx.sourceEventId as string | undefined;
if (!sid) continue;
const tx = txBySourceEventId.get(sid);
if (!tx) {
// 罕见 — createMany 写入但 SELECT 没拿到(并发?最大可见性问题?)
this.logger.warn(`flushBatch: tx ${sid} 写入后未能查回(跳过 parser)`);
continue; continue;
} }
if (syncLogId) txn.syncLogId = syncLogId; items.push({
transaction: {
id: tx.id,
hostId: tx.hostId,
tenantId: tx.tenantId,
patientId: tx.patientId,
action: tx.action as Action,
subjectType: tx.subjectType,
subjectId: tx.subjectId,
occurredAt: tx.occurredAt,
clinicId: tx.clinicId,
},
canonicalRow: b.canonicalRow,
});
}
if (items.length === 0) return;
const metrics = await this.parserPipeline.runForBatch(items);
stats.factsCreated += metrics.factsCreated;
stats.factsSuperseded += metrics.factsSuperseded;
stats.factsUnchanged += metrics.factsUnchanged;
stats.factsEvidenceAppended += metrics.factsEvidenceAppended;
stats.factsFailed += metrics.factsFailed;
}
/// 兜底:createMany 整批失败时降级 per-row(慢但稳)
private async fallbackPerRowWrite(
buffer: Array<{
tx: Prisma.PatientTransactionUncheckedCreateInput;
canonicalRow: Record<string, unknown>;
}>,
stats: PerResourceStats,
config: AssemblerConfig,
): Promise<void> {
for (const b of buffer) {
const txn = b.tx;
try { try {
const created = await this.prisma.patientTransaction.create({ data: txn }); const created = await this.prisma.patientTransaction.create({ data: txn });
stats.transactionsWritten++; stats.transactionsWritten++;
...@@ -825,7 +1000,7 @@ export class ColdImportService { ...@@ -825,7 +1000,7 @@ export class ColdImportService {
occurredAt: created.occurredAt, occurredAt: created.occurredAt,
clinicId: created.clinicId, clinicId: created.clinicId,
}, },
canonicalRow, canonicalRow: b.canonicalRow,
}); });
stats.factsCreated += metrics.factsCreated; stats.factsCreated += metrics.factsCreated;
stats.factsSuperseded += metrics.factsSuperseded; stats.factsSuperseded += metrics.factsSuperseded;
...@@ -836,12 +1011,11 @@ export class ColdImportService { ...@@ -836,12 +1011,11 @@ export class ColdImportService {
const code = (err as { code?: string })?.code; const code = (err as { code?: string })?.code;
if (code === 'P2002') { if (code === 'P2002') {
stats.duplicates++; stats.duplicates++;
// 幂等命中 — 仍跑 parser(中断恢复:transaction 落了但 fact 没落场景)
const existing = await this.prisma.patientTransaction.findFirst({ const existing = await this.prisma.patientTransaction.findFirst({
where: { where: {
hostId: txn.hostId, hostId: txn.hostId as string,
tenantId: txn.tenantId, tenantId: txn.tenantId as string,
sourceEventId: txn.sourceEventId ?? undefined, sourceEventId: (txn.sourceEventId as string | undefined) ?? undefined,
}, },
}); });
if (existing) { if (existing) {
...@@ -857,7 +1031,7 @@ export class ColdImportService { ...@@ -857,7 +1031,7 @@ export class ColdImportService {
occurredAt: existing.occurredAt, occurredAt: existing.occurredAt,
clinicId: existing.clinicId, clinicId: existing.clinicId,
}, },
canonicalRow, canonicalRow: b.canonicalRow,
}); });
stats.factsCreated += metrics.factsCreated; stats.factsCreated += metrics.factsCreated;
stats.factsSuperseded += metrics.factsSuperseded; stats.factsSuperseded += metrics.factsSuperseded;
...@@ -868,15 +1042,13 @@ export class ColdImportService { ...@@ -868,15 +1042,13 @@ export class ColdImportService {
} else { } else {
stats.failed++; stats.failed++;
this.logger.error( this.logger.error(
`transaction write failed (resource=${config.canonical}, subject=${txn.subjectId}): ${ `fallback per-row tx failed (resource=${config.canonical}, subject=${txn.subjectId}): ${
err instanceof Error ? err.message : String(err) err instanceof Error ? err.message : String(err)
}`, }`,
); );
} }
} }
} }
return stats;
} }
private async buildPatientIndex( private async buildPatientIndex(
...@@ -1107,6 +1279,20 @@ export function resolveCohortBatchSize(opt: number | null | undefined): number { ...@@ -1107,6 +1279,20 @@ export function resolveCohortBatchSize(opt: number | null | undefined): number {
return 5000; return 5000;
} }
/**
* PR4 ⭐ batch size for createMany tx + bulk fact write。
* env PAC_WRITE_BATCH_SIZE 优先(0 = 退回 per-row,回滚开关)
* 默认 1000(SQL prepared-statement 上限 + V8 array 友好,实测平衡)
*/
export function resolveWriteBatchSize(): number {
const env = process.env.PAC_WRITE_BATCH_SIZE;
if (env !== undefined) {
const n = parseInt(env, 10);
return Number.isFinite(n) && n >= 0 ? n : 0;
}
return 1000;
}
/** 把数组切成 size 大小的块(尾巴块可能 < size) */ /** 把数组切成 size 大小的块(尾巴块可能 < size) */
export function chunk<T>(arr: ReadonlyArray<T>, size: number): T[][] { export function chunk<T>(arr: ReadonlyArray<T>, size: number): T[][] {
if (size <= 0) return [arr.slice() as T[]]; if (size <= 0) return [arr.slice() as T[]];
......
...@@ -157,6 +157,185 @@ export class FactWriter { ...@@ -157,6 +157,185 @@ export class FactWriter {
} }
/** /**
* 批量写入 N 个 fact draft(PR4 — 大幅减少 SQL 往返,9h→30min 性能优化的核心)。
*
* 流程:
* 1. 全部 zod 校验(失败的整批抛错;调用方按 entry 级 try/catch 拆分小批重试)
* 2. 收集所有 subject_id,**1 次 SELECT** 取全部 latest version
* 3. 对每 entry 跟 latest 比 hash,决策:unchanged / evidence_append / supersede / create
* 4. **3 个 bulk 操作** 在单个 $transaction:
* - updateMany supersede 那些 active 旧版本(by id IN ...)
* - 每个 evidence_append 单独 update(罕见路径,通常 0-N 个,代价小)
* - createMany 新版本(主流路径,百行级一次写完)
*
* 同 batch 同 subject_id 处理:
* - 若 N 个 draft 同 subject_id,按出现顺序处理 → 最后一个 draft 是最终 active 版本
* - 早 draft 跟 latest 比、决定 supersede;后续 draft 跟 早 draft 比、决定 supersede;...
* - 链式 supersede 在 1 个 batch 内完成,通过递推 latest 实现
*
* **跨 batch 的 subject_id**:不用担心(version + active partial UNIQUE 保证一致性)
*
* 失败处理:
* - 整批 $transaction 失败 → 全部回滚 → 抛 BulkWriteFailedError(调用方降级 per-entry)
* - zod 失败 → 同步抛(整批拒绝;调用方拆 entry 跑 writeDraft 兜底)
*/
async bulkWrite(entries: BulkEntry[]): Promise<FactWriteResult[]> {
if (entries.length === 0) return [];
// 假设全 batch 同 hostId+tenantId(processSubject 是按 host+tenant 跑的,符合)
// 防御:校验所有 entry 同 host+tenant
const { hostId, tenantId } = entries[0]!;
for (const e of entries) {
if (e.hostId !== hostId || e.tenantId !== tenantId) {
throw new Error(
`bulkWrite: 批内 hostId/tenantId 不一致 — host=${hostId} vs ${e.hostId}, tenant=${tenantId} vs ${e.tenantId}`,
);
}
}
// 1. 全部 zod 校验
const validatedEntries = entries.map((e) => {
const content = validateFactContent(e.draft.type, e.draft.subjectId, e.draft.content) as Prisma.InputJsonValue;
return { ...e, validatedContent: content, hash: this.hashContent(content) };
});
// 2. 一次 SELECT 把所有相关 subject 的 latest version 拿回
const subjectIds = [...new Set(validatedEntries.map((e) => e.draft.subjectId))];
const allHist = await this.prisma.patientFact.findMany({
where: { hostId, tenantId, subjectId: { in: subjectIds } },
orderBy: [{ subjectId: 'asc' }, { version: 'desc' }],
});
// 取每 subject 的 highest version 行
const latestBySubject = new Map<string, (typeof allHist)[number]>();
for (const f of allHist) {
if (!latestBySubject.has(f.subjectId)) latestBySubject.set(f.subjectId, f);
}
// 3. 决策每个 entry,累积批操作
const toSupersede: string[] = []; // fact ids needing status -> superseded
const toEvidenceAppend: Array<{ factId: string; transactionId: string }> = [];
const toCreate: Prisma.PatientFactCreateManyInput[] = [];
const results: FactWriteResult[] = [];
// 模拟"链式 latest"— batch 内同 subject 多 draft 时,后 draft 跟前 draft 比
const liveLatest = new Map<string, {
id?: string;
version: number;
status: string;
hash: string;
transactionIds: string[];
}>();
for (const [sid, latest] of latestBySubject.entries()) {
liveLatest.set(sid, {
id: latest.id,
version: latest.version,
status: latest.status,
hash: this.hashContent(latest.content as Prisma.InputJsonValue),
transactionIds: latest.transactionIds,
});
}
for (const entry of validatedEntries) {
const sid = entry.draft.subjectId;
const live = liveLatest.get(sid);
const draftStatus = entry.draft.status ?? FactStatus.ACTIVE;
if (live && live.hash === entry.hash && live.status === draftStatus) {
// 内容一致 + 状态一致
if (live.id && !live.transactionIds.includes(entry.transactionId)) {
toEvidenceAppend.push({ factId: live.id, transactionId: entry.transactionId });
// 更新 liveLatest.transactionIds(防同 subject 后续 draft 再加同一 tx)
live.transactionIds = [...live.transactionIds, entry.transactionId];
results.push({
action: 'evidence_appended',
factId: live.id,
subjectId: sid,
version: live.version,
});
} else {
results.push({
action: 'unchanged',
factId: live.id ?? '',
subjectId: sid,
version: live.version,
});
}
continue;
}
// 需要新版本(content / status 不同,或 batch 内同 subject 第二次出现)
// supersede 旧 active(若有,且来自 DB,不是 batch 内"虚拟" liveLatest)
if (live?.id && live.status === FactStatus.ACTIVE) {
toSupersede.push(live.id);
}
const nextVersion = (live?.version ?? 0) + 1;
toCreate.push({
hostId,
tenantId,
patientId: entry.patientId,
subjectId: sid,
kind: entry.draft.kind,
type: entry.draft.type,
status: draftStatus,
version: nextVersion,
clinicId: entry.draft.clinicId ?? null,
occurredAt: entry.draft.occurredAt ?? null,
plannedFor: entry.draft.plannedFor ?? null,
validFrom: entry.draft.validFrom ?? null,
validUntil: entry.draft.validUntil ?? null,
title: entry.draft.title ?? null,
summary: entry.draft.summary ?? null,
content: entry.validatedContent,
transactionIds: [entry.transactionId],
});
// batch 内链式:把"虚拟 latest"刷新成本 draft 内容,后续同 subject 的 draft 用它当 prev
// (注意 id=undefined 表示是本 batch 内 create,真正写入后才有 id;但 batch 内不会再 supersede 它,
// 因为 batch 内同 subject 第二次出现也是 create,不 update)
liveLatest.set(sid, {
id: undefined,
version: nextVersion,
status: draftStatus,
hash: entry.hash,
transactionIds: [entry.transactionId],
});
results.push({
action: live ? 'superseded' : 'created',
factId: '', // bulk create 不返回 id;调用方不依赖 id(parser 不引用)
subjectId: sid,
version: nextVersion,
});
}
// 4. 整批一个 transaction,真原子(supersede + create 不分离)
// 用 callback 形态 + tx 重建 ops,这样可设 maxWait/timeout(默认 5s 在大批量 + swap 下不够)
if (toSupersede.length > 0 || toEvidenceAppend.length > 0 || toCreate.length > 0) {
await this.prisma.$transaction(
async (tx) => {
if (toSupersede.length > 0) {
await tx.patientFact.updateMany({
where: { id: { in: toSupersede } },
data: { status: FactStatus.SUPERSEDED, supersededAt: new Date() },
});
}
for (const e of toEvidenceAppend) {
await tx.$executeRaw`
UPDATE patient_facts
SET transaction_ids = array_append(transaction_ids, ${e.transactionId}::uuid)
WHERE id = ${e.factId}::uuid
AND NOT (${e.transactionId}::uuid = ANY(transaction_ids))
`;
}
if (toCreate.length > 0) {
await tx.patientFact.createMany({ data: toCreate });
}
},
{ maxWait: 30_000, timeout: 120_000 },
);
}
return results;
}
/**
* 稳定 JSON hash — 递归按 key 排序后 sha256。 * 稳定 JSON hash — 递归按 key 排序后 sha256。
* JSON.stringify 默认按 insertion order,key 顺序不同会算出不同 hash。 * JSON.stringify 默认按 insertion order,key 顺序不同会算出不同 hash。
*/ */
...@@ -196,3 +375,12 @@ export interface FactWriteResult { ...@@ -196,3 +375,12 @@ export interface FactWriteResult {
subjectId: string; subjectId: string;
version: number; version: number;
} }
/// FactWriter.bulkWrite 入参条目 — 一个 draft + 它所属的 patient / host / tenant / tx 上下文
export interface BulkEntry {
draft: FactDraft;
hostId: string;
tenantId: string;
patientId: string;
transactionId: string;
}
import { Injectable, Logger } from '@nestjs/common'; import { Injectable, Logger } from '@nestjs/common';
import type { Action } from '@pac/types'; import type { Action } from '@pac/types';
import { ParserRegistry } from './parsers/parser.registry'; import { ParserRegistry } from './parsers/parser.registry';
import { FactWriter, FactWriteResult } from './fact-writer.service'; import {
type BulkEntry,
FactWriter,
FactWriteResult,
} from './fact-writer.service';
/** /**
* ParserPipeline — transaction → fact 衍生编排器 * ParserPipeline — transaction → fact 衍生编排器
...@@ -110,6 +114,136 @@ export class ParserPipeline { ...@@ -110,6 +114,136 @@ export class ParserPipeline {
return metrics; return metrics;
} }
/**
* 批量版 — N 条 transaction 一起跑 parser → 收集所有 draft → 1 次 bulk fact write
* PR4 引入,跟 runForTransaction 行为等价,但**减少 SQL 往返 ~ N 倍**(单次 batch SELECT + bulk INSERT/UPDATE)
*
* 注意:本方法假设所有 transaction 同 hostId+tenantId(processSubject 满足)
* 返回:整批的 PipelineRunMetrics(汇总)
*/
async runForBatch(items: BatchItem[]): Promise<PipelineRunMetrics> {
const metrics: PipelineRunMetrics = {
action: 'BATCH',
parserMatched: true,
factsCreated: 0,
factsSuperseded: 0,
factsUnchanged: 0,
factsEvidenceAppended: 0,
factsFailed: 0,
writes: [],
};
// 1. 全部 tx 走 parser,收集 drafts(in-memory)
const bulkEntries: BulkEntry[] = [];
for (const item of items) {
const parser = this.registry.get(item.transaction.action);
if (!parser) {
this.logger.debug(`no parser for action=${item.transaction.action};skip`);
continue;
}
if (!item.transaction.patientId) continue;
try {
const drafts = parser.parse({
transaction: item.transaction,
canonicalRow: item.canonicalRow,
});
for (const draft of drafts) {
bulkEntries.push({
draft,
hostId: item.transaction.hostId,
tenantId: item.transaction.tenantId,
patientId: item.transaction.patientId,
transactionId: item.transaction.id,
});
}
} catch (err) {
metrics.factsFailed++;
this.logger.error(
`parser failed: tx=${item.transaction.id} action=${item.transaction.action} ` +
`err=${err instanceof Error ? err.message : String(err)}`,
);
}
}
if (bulkEntries.length === 0) return metrics;
// 2. 一次 bulk write,失败降级 per-entry(写一份保证收尾)
try {
const results = await this.writer.bulkWrite(bulkEntries);
metrics.writes.push(...results);
for (const r of results) {
switch (r.action) {
case 'created':
metrics.factsCreated++;
break;
case 'superseded':
metrics.factsSuperseded++;
break;
case 'unchanged':
metrics.factsUnchanged++;
break;
case 'evidence_appended':
metrics.factsEvidenceAppended++;
break;
}
}
} catch (err) {
this.logger.warn(
`bulkWrite 批失败,降级 per-entry: ${err instanceof Error ? err.message : String(err)}`,
);
// 降级:逐条用 writeDraft(单 entry 失败不影响其他)
for (const e of bulkEntries) {
try {
const r = await this.writer.writeDraft({
draft: e.draft,
hostId: e.hostId,
tenantId: e.tenantId,
patientId: e.patientId,
transactionId: e.transactionId,
});
metrics.writes.push(r);
switch (r.action) {
case 'created':
metrics.factsCreated++;
break;
case 'superseded':
metrics.factsSuperseded++;
break;
case 'unchanged':
metrics.factsUnchanged++;
break;
case 'evidence_appended':
metrics.factsEvidenceAppended++;
break;
}
} catch (subErr) {
metrics.factsFailed++;
this.logger.error(
`fallback writeDraft failed: tx=${e.transactionId} subject=${e.draft.subjectId} ` +
`err=${subErr instanceof Error ? subErr.message : String(subErr)}`,
);
}
}
}
return metrics;
}
}
export interface BatchItem {
transaction: {
id: string;
hostId: string;
tenantId: string;
patientId: string | null;
action: Action;
subjectType: string;
subjectId: string;
occurredAt: Date;
clinicId: string;
};
canonicalRow: Record<string, unknown>;
} }
export interface PipelineRunMetrics { export interface PipelineRunMetrics {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment