Commit 7aa3e1a8 by luoqi

feat(assembler): 映射覆盖漏监控(mappingMiss)— code=null 等悄悄丢的常设可见化

保障机制:enum_mapping 精确+keyword 都没命中、落到 _default 的原值,过去静默(code=null
悄悄烂),现在按 field+rawValue 聚合记账 → 落 SyncLog.metadata,供"看漏了啥 → 扩字典 →
reparse"闭环。pull(cold-import)/push/reparse 三入口同享(都过 assembler)。

- assembler-engine:applyEnum 落 _default 时 recordMappingMiss(单值+数组);
  AssemblerResult.stats.mappingMisses[];导出 MappingMiss + mergeMappingMisses。
- cold-import:PerResourceStats.mappingMisses;processSubject 携带;
  ingestRawTables(push)写 SyncLog.metadata + warn;reparse 聚合 + top20 日志(批量审计面)。
- 不建表、不加列(复用既有 SyncLog.metadata Json)。

本地验证:push 一条含"乳牙列"(故意不映射)的 EMR 行 → metadata.mappingMisses 精确捕获
[{field:code, rawValue:乳牙列, fellBackTo:"", count:1}]。

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
parent 54691f62
...@@ -10,7 +10,12 @@ import type { Prisma } from '@prisma/client'; ...@@ -10,7 +10,12 @@ import type { Prisma } from '@prisma/client';
import { PrismaService } from '../../../prisma/prisma.service'; import { PrismaService } from '../../../prisma/prisma.service';
import { TransactionSynthesizer } from '../pipeline/transaction-synthesizer'; import { TransactionSynthesizer } from '../pipeline/transaction-synthesizer';
import { ParserPipeline, type BatchItem } from '../pipeline/parser-pipeline.service'; import { ParserPipeline, type BatchItem } from '../pipeline/parser-pipeline.service';
import { AssemblerEngine, type AssemblerResult } from '../assembler/assembler-engine'; import {
AssemblerEngine,
type AssemblerResult,
type MappingMiss,
mergeMappingMisses,
} from '../assembler/assembler-engine';
import { import {
AssemblerConfigSchema, AssemblerConfigSchema,
type AssemblerConfig, type AssemblerConfig,
...@@ -197,6 +202,7 @@ export class ColdImportService { ...@@ -197,6 +202,7 @@ export class ColdImportService {
agg.factsEvidenceAppended += stats.factsEvidenceAppended; agg.factsEvidenceAppended += stats.factsEvidenceAppended;
agg.factsFailed += stats.factsFailed; agg.factsFailed += stats.factsFailed;
agg.fetched += stats.fetched; agg.fetched += stats.fetched;
agg.mappingMisses = mergeMappingMisses([agg.mappingMisses, stats.mappingMisses]);
} }
} }
if ((bi + 1) % 5 === 0 || bi + 1 === totalBatches) { if ((bi + 1) % 5 === 0 || bi + 1 === totalBatches) {
...@@ -209,6 +215,18 @@ export class ColdImportService { ...@@ -209,6 +215,18 @@ export class ColdImportService {
} }
const perResource = [...aggByResource.values()]; const perResource = [...aggByResource.values()];
// 保障机制:映射覆盖漏审计 —— 全量 reparse 是看"还有哪些原值没映射上"的最佳时机。
const allMisses = mergeMappingMisses(perResource.map((s) => s.mappingMisses));
if (allMisses.length > 0) {
this.logger.warn(
`reparse 映射覆盖漏 ${allMisses.length} 种(落 _default,按量倒序 top 20):\n` +
allMisses
.slice(0, 20)
.map((m) => ` ${m.field}='${m.rawValue}' ×${m.count} → '${m.fellBackTo}'`)
.join('\n'),
);
}
// 3. 受影响 patientId = 本次真正被 supersede(内容变了)的 fact 的 distinct patient → 只重算这些。 // 3. 受影响 patientId = 本次真正被 supersede(内容变了)的 fact 的 distinct patient → 只重算这些。
const changed = await this.prisma.patientFact.findMany({ const changed = await this.prisma.patientFact.findMany({
where: { where: {
...@@ -326,6 +344,19 @@ export class ColdImportService { ...@@ -326,6 +344,19 @@ export class ColdImportService {
}), }),
{ txn: 0, dup: 0, failed: 0, facts: 0 }, { txn: 0, dup: 0, failed: 0, facts: 0 },
); );
// 映射覆盖漏(保障机制):本批落 _default 的原值汇总 → SyncLog.metadata,供审计/扩字典闭环。
const mappingMisses = mergeMappingMisses(perResource.map((s) => s.mappingMisses));
if (mappingMisses.length > 0) {
this.logger.warn(
`ingestRawTables(${opts.source}) 映射覆盖漏 ${mappingMisses.length} 种:` +
mappingMisses
.slice(0, 5)
.map((m) => `${m.field}='${m.rawValue}${m.count}`)
.join(', ') +
(mappingMisses.length > 5 ? ' …' : ''),
);
}
await this.prisma.syncLog.update({ await this.prisma.syncLog.update({
where: { id: syncLog.id }, where: { id: syncLog.id },
data: { data: {
...@@ -343,6 +374,9 @@ export class ColdImportService { ...@@ -343,6 +374,9 @@ export class ColdImportService {
failed: agg.failed, failed: agg.failed,
errorMessage: firstError, errorMessage: firstError,
endedAt: new Date(), endedAt: new Date(),
...(mappingMisses.length > 0
? { metadata: { mappingMisses } as unknown as Prisma.InputJsonValue }
: {}),
}, },
}); });
...@@ -1346,6 +1380,7 @@ export class ColdImportService { ...@@ -1346,6 +1380,7 @@ export class ColdImportService {
const assembled = this.assembler.assemble({ tables, config, normalize, hostId }); const assembled = this.assembler.assemble({ tables, config, normalize, hostId });
stats.fetched = assembled.stats.fetched; stats.fetched = assembled.stats.fetched;
stats.failed += assembled.stats.failed; // assembler 自己的失败计在内 stats.failed += assembled.stats.failed; // assembler 自己的失败计在内
stats.mappingMisses = assembled.stats.mappingMisses; // 映射覆盖漏(落 _default 的原值)
if (assembled.rows.length > 0) if (assembled.rows.length > 0)
stats.sampleCanonical.push(assembled.rows[0]!.canonical); stats.sampleCanonical.push(assembled.rows[0]!.canonical);
...@@ -1900,6 +1935,7 @@ export class ColdImportService { ...@@ -1900,6 +1935,7 @@ export class ColdImportService {
factsEvidenceAppended: 0, factsEvidenceAppended: 0,
factsFailed: 0, factsFailed: 0,
sampleCanonical: [], sampleCanonical: [],
mappingMisses: [],
}; };
} }
} }
...@@ -1946,6 +1982,8 @@ export interface PerResourceStats extends TotalsBlock { ...@@ -1946,6 +1982,8 @@ export interface PerResourceStats extends TotalsBlock {
resource: string; resource: string;
fetched: number; fetched: number;
sampleCanonical: unknown[]; sampleCanonical: unknown[];
/// 映射覆盖漏(assembler 透传):该资源装配时落 _default 的原值(field+rawValue 聚合)
mappingMisses: MappingMiss[];
} }
/** /**
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment