Commit bab32e83 by luoqi

feat(sync): 宿主改表检测(suspectFields)— 列缺席/删/加 漂移可见化

回答"宿主改了表结构/字段/枚举没通知,PAC 能不能察觉":在已有 mappingMiss(枚举漂移)基础上加
"字段/列漂移"检测,两路:
- assembler 层(column_absent):映射字段对应 host 列在装配行里整列缺席 → 进 stats.suspectFields。
  对直连原生表的资源有效;transform 产出表会补 key 掩盖,故补第二路 ↓
- raw 层(form A,column_removed/added,最可靠):ingestRawTables 在 transform 前快照推送列集,
  对比"历史同源 rawPayload 列集"(基线抽样 50 条,排除本 run 防自污染,基线<5 跳过)→ 删列/加列。
两路都并入 SyncLog.metadata.suspectFields,/admin/mapping-miss 端点一并透出(报告含 misses+suspectFields)。

本地直连验证:删 doctor_name + 加 new_weird_col 推送 →
suspectFields=[{doctor_name,column_removed},{new_weird_col,column_added}]。

不建表(复用 SyncLog.metadata)。只检测+可见化,不告警/不阻断(按需后续接 webhook)。

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
parent 43fa2672
...@@ -25,7 +25,8 @@ export class MappingMissAdminController { ...@@ -25,7 +25,8 @@ export class MappingMissAdminController {
@Get() @Get()
@ApiOperation({ @ApiOperation({
summary: '映射覆盖漏(落 _default 的原值)按量倒序;扫最近 N 次 sync run 的 metadata(默认 100,上限 500)', summary:
'摄入漂移报告:misses(落 _default 的原值,扩字典)+ suspectFields(整列缺席,疑似宿主改表)。扫最近 N 次 sync run 的 metadata(默认 100,上限 500)',
}) })
recent(@TenantScope() scope: TenantScopeContext, @Query('limit') limit?: string) { recent(@TenantScope() scope: TenantScopeContext, @Query('limit') limit?: string) {
return this.svc.recent(scope, { limit: limit ? Number(limit) : undefined }); return this.svc.recent(scope, { limit: limit ? Number(limit) : undefined });
......
...@@ -2,7 +2,9 @@ import { Injectable } from '@nestjs/common'; ...@@ -2,7 +2,9 @@ import { Injectable } from '@nestjs/common';
import { PrismaService } from '../../prisma/prisma.service'; import { PrismaService } from '../../prisma/prisma.service';
import { import {
mergeMappingMisses, mergeMappingMisses,
mergeSuspectFields,
type MappingMiss, type MappingMiss,
type SuspectField,
} from '../sync/assembler/assembler-engine'; } from '../sync/assembler/assembler-engine';
import type { TenantScopeContext } from '../../common/decorators/tenant-scope.decorator'; import type { TenantScopeContext } from '../../common/decorators/tenant-scope.decorator';
...@@ -23,7 +25,12 @@ export class MappingMissService { ...@@ -23,7 +25,12 @@ export class MappingMissService {
async recent( async recent(
scope: TenantScopeContext, scope: TenantScopeContext,
opts: { limit?: number }, opts: { limit?: number },
): Promise<{ scannedRuns: number; runsWithMisses: number; distinct: number; misses: MappingMiss[] }> { ): Promise<{
scannedRuns: number;
runsWithDrift: number;
misses: MappingMiss[];
suspectFields: SuspectField[];
}> {
const take = Math.min(Math.max(opts.limit ?? 100, 1), 500); const take = Math.min(Math.max(opts.limit ?? 100, 1), 500);
const logs = await this.prisma.syncLog.findMany({ const logs = await this.prisma.syncLog.findMany({
where: { hostId: scope.hostId }, where: { hostId: scope.hostId },
...@@ -31,17 +38,29 @@ export class MappingMissService { ...@@ -31,17 +38,29 @@ export class MappingMissService {
take, take,
select: { metadata: true }, select: { metadata: true },
}); });
const lists: MappingMiss[][] = []; const missLists: MappingMiss[][] = [];
const suspectLists: SuspectField[][] = [];
let runsWithDrift = 0;
for (const l of logs) { for (const l of logs) {
const md = l.metadata as { mappingMisses?: MappingMiss[] } | null; const md = l.metadata as
if (md?.mappingMisses?.length) lists.push(md.mappingMisses); | { mappingMisses?: MappingMiss[]; suspectFields?: SuspectField[] }
| null;
let hit = false;
if (md?.mappingMisses?.length) {
missLists.push(md.mappingMisses);
hit = true;
}
if (md?.suspectFields?.length) {
suspectLists.push(md.suspectFields);
hit = true;
}
if (hit) runsWithDrift++;
} }
const misses = mergeMappingMisses(lists);
return { return {
scannedRuns: logs.length, scannedRuns: logs.length,
runsWithMisses: lists.length, runsWithDrift,
distinct: misses.length, misses: mergeMappingMisses(missLists),
misses, suspectFields: mergeSuspectFields(suspectLists),
}; };
} }
} }
...@@ -14,7 +14,9 @@ import { ...@@ -14,7 +14,9 @@ import {
AssemblerEngine, AssemblerEngine,
type AssemblerResult, type AssemblerResult,
type MappingMiss, type MappingMiss,
type SuspectField,
mergeMappingMisses, mergeMappingMisses,
mergeSuspectFields,
} from '../assembler/assembler-engine'; } from '../assembler/assembler-engine';
import { import {
AssemblerConfigSchema, AssemblerConfigSchema,
...@@ -203,6 +205,7 @@ export class ColdImportService { ...@@ -203,6 +205,7 @@ export class ColdImportService {
agg.factsFailed += stats.factsFailed; agg.factsFailed += stats.factsFailed;
agg.fetched += stats.fetched; agg.fetched += stats.fetched;
agg.mappingMisses = mergeMappingMisses([agg.mappingMisses, stats.mappingMisses]); agg.mappingMisses = mergeMappingMisses([agg.mappingMisses, stats.mappingMisses]);
agg.suspectFields = mergeSuspectFields([agg.suspectFields, stats.suspectFields]);
} }
} }
if ((bi + 1) % 5 === 0 || bi + 1 === totalBatches) { if ((bi + 1) % 5 === 0 || bi + 1 === totalBatches) {
...@@ -215,7 +218,7 @@ export class ColdImportService { ...@@ -215,7 +218,7 @@ export class ColdImportService {
} }
const perResource = [...aggByResource.values()]; const perResource = [...aggByResource.values()];
// 保障机制:映射覆盖漏审计 —— 全量 reparse 是看"还有哪些原值没映射上"的最佳时机。 // 保障机制:摄入漂移审计 —— 全量 reparse 是看"还有哪些原值没映射上 / 哪些列消失了"的最佳时机。
const allMisses = mergeMappingMisses(perResource.map((s) => s.mappingMisses)); const allMisses = mergeMappingMisses(perResource.map((s) => s.mappingMisses));
if (allMisses.length > 0) { if (allMisses.length > 0) {
this.logger.warn( this.logger.warn(
...@@ -226,6 +229,13 @@ export class ColdImportService { ...@@ -226,6 +229,13 @@ export class ColdImportService {
.join('\n'), .join('\n'),
); );
} }
const allSuspect = mergeSuspectFields(perResource.map((s) => s.suspectFields));
if (allSuspect.length > 0) {
this.logger.warn(
`⚠️ reparse 疑似宿主改表 ${allSuspect.length} 列整列缺席:\n` +
allSuspect.map((s) => ` ${s.resource}.${s.field}(host 列 ${s.hostField})`).join('\n'),
);
}
// 3. 受影响 patientId = 本次真正被 supersede(内容变了)的 fact 的 distinct patient → 只重算这些。 // 3. 受影响 patientId = 本次真正被 supersede(内容变了)的 fact 的 distinct patient → 只重算这些。
const changed = await this.prisma.patientFact.findMany({ const changed = await this.prisma.patientFact.findMany({
...@@ -294,6 +304,10 @@ export class ColdImportService { ...@@ -294,6 +304,10 @@ export class ColdImportService {
}, },
}); });
// ⚠️ transform 可能 in-place 改源行(剥未知列),故在 transform【之前】快照推送列集,供 raw 层列漂移检测。
const pushedCols = new Set<string>();
for (const r of opts.rows) for (const k of Object.keys(r)) if (!k.startsWith('_')) pushedCols.add(k);
// tables = { [source]: rows } + transform 空输入兜底 → transform → processSubject // tables = { [source]: rows } + transform 空输入兜底 → transform → processSubject
const tables: Record<string, Record<string, unknown>[]> = { [opts.source]: opts.rows }; const tables: Record<string, Record<string, unknown>[]> = { [opts.source]: opts.rows };
for (const tf of transforms) { for (const tf of transforms) {
...@@ -344,8 +358,25 @@ export class ColdImportService { ...@@ -344,8 +358,25 @@ export class ColdImportService {
}), }),
{ txn: 0, dup: 0, failed: 0, facts: 0 }, { txn: 0, dup: 0, failed: 0, facts: 0 },
); );
// 映射覆盖漏(保障机制):本批落 _default 的原值汇总 → SyncLog.metadata,供审计/扩字典闭环。 // ── raw 层列漂移检测(form A 专属,最可靠):本次推送行的列集 vs 历史同源 rawPayload 列集 ──
// transform 会掩盖 assembler 层的删列(补 key),故在原生行层面直接对比。基线 = 已存 transaction,
// 不需新存储。基线样本太少(新 host 冷启)则跳过,避免误报。
const rawDrift = await this.detectRawColumnDrift({
hostId: host.id,
source: opts.source,
subjectTypes: targetCfgs.map((c) => c.emits!.subjectType),
pushedCols,
rowCount: opts.rows.length,
excludeSyncLogId: syncLog.id, // ⚠️ 基线排除本 run 刚写的行(否则加列会被自己污染成基线)
});
// 摄入漂移(保障机制)→ SyncLog.metadata,供审计:
// mappingMisses = 落 _default 的原值(扩字典);suspectFields = 疑似宿主改表(列缺席/删/加)。
const mappingMisses = mergeMappingMisses(perResource.map((s) => s.mappingMisses)); const mappingMisses = mergeMappingMisses(perResource.map((s) => s.mappingMisses));
const suspectFields = mergeSuspectFields([
...perResource.map((s) => s.suspectFields),
rawDrift,
]);
if (mappingMisses.length > 0) { if (mappingMisses.length > 0) {
this.logger.warn( this.logger.warn(
`ingestRawTables(${opts.source}) 映射覆盖漏 ${mappingMisses.length} 种:` + `ingestRawTables(${opts.source}) 映射覆盖漏 ${mappingMisses.length} 种:` +
...@@ -356,6 +387,19 @@ export class ColdImportService { ...@@ -356,6 +387,19 @@ export class ColdImportService {
(mappingMisses.length > 5 ? ' …' : ''), (mappingMisses.length > 5 ? ' …' : ''),
); );
} }
if (suspectFields.length > 0) {
this.logger.warn(
`⚠️ ingestRawTables(${opts.source}) 疑似宿主改表 ${suspectFields.length} 项:` +
suspectFields.map((s) => `[${s.reason}] ${s.resource}.${s.field}`).join(', '),
);
}
const driftMeta =
mappingMisses.length > 0 || suspectFields.length > 0
? {
...(mappingMisses.length > 0 ? { mappingMisses } : {}),
...(suspectFields.length > 0 ? { suspectFields } : {}),
}
: null;
await this.prisma.syncLog.update({ await this.prisma.syncLog.update({
where: { id: syncLog.id }, where: { id: syncLog.id },
...@@ -374,9 +418,7 @@ export class ColdImportService { ...@@ -374,9 +418,7 @@ export class ColdImportService {
failed: agg.failed, failed: agg.failed,
errorMessage: firstError, errorMessage: firstError,
endedAt: new Date(), endedAt: new Date(),
...(mappingMisses.length > 0 ...(driftMeta ? { metadata: driftMeta as unknown as Prisma.InputJsonValue } : {}),
? { metadata: { mappingMisses } as unknown as Prisma.InputJsonValue }
: {}),
}, },
}); });
...@@ -393,6 +435,49 @@ export class ColdImportService { ...@@ -393,6 +435,49 @@ export class ColdImportService {
return path.join(base, hostName); return path.join(base, hostName);
} }
/**
* raw 层列漂移检测(form A):本次推送行的列集 vs 历史同源 rawPayload 的列集。
* - 历史有、本次没有 → column_removed(疑似宿主删列/改名)
* - 本次有、历史没有 → column_added(疑似宿主加列/改名)
* 基线 = 已存 transaction 的 rawPayload(同 subjectType 抽样 50 条),不需新存储。
* 基线样本 < 5 条(新 host 冷启)→ 跳过,避免误报。`_` 开头的内部 meta key 忽略。
*/
private async detectRawColumnDrift(args: {
hostId: string;
source: string;
subjectTypes: string[];
pushedCols: Set<string>;
rowCount: number;
excludeSyncLogId: string;
}): Promise<SuspectField[]> {
const MIN_BASELINE = 5;
const sample = await this.prisma.patientTransaction.findMany({
where: {
hostId: args.hostId,
subjectType: { in: args.subjectTypes },
syncLogId: { not: args.excludeSyncLogId }, // 排除本 run 刚写的行(防自污染)
},
select: { rawPayload: true },
take: 50,
orderBy: { createdAt: 'desc' },
});
if (sample.length < MIN_BASELINE) return [];
const baseline = new Set<string>();
for (const s of sample) {
const rp = s.rawPayload as Record<string, unknown> | null;
if (rp) for (const k of Object.keys(rp)) if (!k.startsWith('_')) baseline.add(k);
}
const out: SuspectField[] = [];
for (const c of baseline)
if (!args.pushedCols.has(c))
out.push({ resource: args.source, field: c, hostField: c, total: args.rowCount, reason: 'column_removed' });
for (const c of args.pushedCols)
if (!baseline.has(c))
out.push({ resource: args.source, field: c, hostField: c, total: args.rowCount, reason: 'column_added' });
return out;
}
async importDirectory( async importDirectory(
dir: string, dir: string,
options: { options: {
...@@ -1381,6 +1466,7 @@ export class ColdImportService { ...@@ -1381,6 +1466,7 @@ export class ColdImportService {
stats.fetched = assembled.stats.fetched; stats.fetched = assembled.stats.fetched;
stats.failed += assembled.stats.failed; // assembler 自己的失败计在内 stats.failed += assembled.stats.failed; // assembler 自己的失败计在内
stats.mappingMisses = assembled.stats.mappingMisses; // 映射覆盖漏(落 _default 的原值) stats.mappingMisses = assembled.stats.mappingMisses; // 映射覆盖漏(落 _default 的原值)
stats.suspectFields = assembled.stats.suspectFields; // 字段/列漂移(整列缺席)
if (assembled.rows.length > 0) if (assembled.rows.length > 0)
stats.sampleCanonical.push(assembled.rows[0]!.canonical); stats.sampleCanonical.push(assembled.rows[0]!.canonical);
...@@ -1936,6 +2022,7 @@ export class ColdImportService { ...@@ -1936,6 +2022,7 @@ export class ColdImportService {
factsFailed: 0, factsFailed: 0,
sampleCanonical: [], sampleCanonical: [],
mappingMisses: [], mappingMisses: [],
suspectFields: [],
}; };
} }
} }
...@@ -1984,6 +2071,8 @@ export interface PerResourceStats extends TotalsBlock { ...@@ -1984,6 +2071,8 @@ export interface PerResourceStats extends TotalsBlock {
sampleCanonical: unknown[]; sampleCanonical: unknown[];
/// 映射覆盖漏(assembler 透传):该资源装配时落 _default 的原值(field+rawValue 聚合) /// 映射覆盖漏(assembler 透传):该资源装配时落 _default 的原值(field+rawValue 聚合)
mappingMisses: MappingMiss[]; mappingMisses: MappingMiss[];
/// 字段/列漂移(assembler 透传):整列缺席的映射字段(疑似宿主改名/删列)
suspectFields: SuspectField[];
} }
/** /**
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment