Commit 54691f62 by luoqi

feat(push): 形态 A 推原生表行 — POST /push/rows 复用 pull/cold-import 同一摄入管线

宿主把原生表行(导出给数仓那种)照抄推来,PAC 复用 transforms+assembler+processSubject
同核落库(拆分/映射/外键/幂等全 PAC 做),与 pull/reparse 完全一致,只是数据入口是 webhook。

- ColdImportService.ingestRawTables({hostName, source, rows}):tables={[source]:rows}
  → transformEngine.run → 对"终端原生表=source"的每个 assembler cfg 跑 processSubject;
  tenant 走 manifest brand resolver(同 pull,非 stub);返回 touched 患者(带 tenant)。
- push.schema:PushRowsRequest/Response。
- PushReceiverService.receiveRows:ingest → 对 touched 患者 enqueuePersonaRecompute
  (复用现有 BullMQ 触发 + watermark 幂等 + 凌晨 cron 兜底);plan 不在此触发(定时任务保)。
- PushController:POST /push/rows(HMAC 验签复用 /push/events 同逻辑)。

本地核验:原样推=同口径去重(dup=6/txn=0);bump updated_date 推=新 txn=6/touched=1
(tenant 正确解析);版本流无重复(active facts 9→9)。main.ts 已 rawBody:true。

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
parent cb303524
......@@ -223,6 +223,142 @@ export class ColdImportService {
return { perResource, affectedPatientIds, dryRunDiffs };
}
/**
* 形态 A push 入口:宿主推来某张【原生源表】的原生行 → 复用 transforms + assembler + processSubject 落库。
*
* 跟 cold-import / reparse 同一个核(raw → canonical → txn → fact 版本流),唯一差异:
* 原生行来自 webhook 请求,而不是 ClickHouse SQL / transactions.rawPayload。
*
* 不触发 persona / plan(由 caller 决定);返回 touched 患者(带 tenant)供 caller enqueue 画像重算。
* tenant 走 manifest 的 brand resolver(每行解析),与 pull / reparse 完全一致。
* 幂等:processSubject 内 sourceContext 固定,source_event_id 与 pull 同口径去重。
*/
async ingestRawTables(opts: {
hostName: string;
source: string; // 原生源表名(= manifest tables[].table / transform 终端表)
rows: Record<string, unknown>[];
triggeredBy?: string;
}): Promise<{
syncLogId: string;
perResource: PerResourceStats[];
touched: Array<{ patientId: string; tenantId: string }>;
}> {
const absDir = this.resolveHostDir(opts.hostName);
const manifest = this.readManifest(absDir);
const host = await this.prisma.host.findFirst({ where: { name: opts.hostName } });
if (!host) throw new Error(`ingestRawTables: host '${opts.hostName}' not found`);
const tenantResolver = buildTenantResolver(manifest);
const normalize = { amountUnit: manifest.amount_unit, timezone: manifest.timezone };
const transforms = manifest.transforms ?? [];
// 找出"终端原生表 === source"的所有 subject cfg(一张 EMR 源表可喂 诊断/治疗/emr/建议 多 cfg)
const subjectCfgs = this.loadAllAssemblers(absDir, manifest).filter((c) => !!c.emits);
const targetCfgs = subjectCfgs.filter(
(cfg) => traceRawSourceTable(cfg.primary.table, transforms) === opts.source,
);
if (targetCfgs.length === 0) {
const known = [
...new Set(subjectCfgs.map((c) => traceRawSourceTable(c.primary.table, transforms))),
];
throw new Error(
`ingestRawTables: source='${opts.source}' 无匹配 assembler(可选源表=${known.join(',')})`,
);
}
const syncLog = await this.prisma.syncLog.create({
data: {
hostId: host.id,
tenantId: 't_pending', // push 多 tenant 由行决定;finalize 时回填首个 seen tenant
direction: SyncDirection.PUSH,
resource: opts.source,
triggeredBy: opts.triggeredBy ?? `push:${opts.hostName}:${opts.source}`,
status: SyncStatus.RUNNING,
},
});
// tables = { [source]: rows } + transform 空输入兜底 → transform → processSubject
const tables: Record<string, Record<string, unknown>[]> = { [opts.source]: opts.rows };
for (const tf of transforms) {
const inp = (tf as { input?: string }).input;
if (inp && !tables[inp]) tables[inp] = [];
}
const transformed =
transforms.length > 0 ? this.transformEngine.run({ tables, transforms }) : tables;
const seenTenants = new Set<string>();
const perResource: PerResourceStats[] = [];
let firstError: string | null = null;
try {
for (const cfg of targetCfgs) {
const stats = await this.processSubject(
transformed,
cfg,
host.id,
tenantResolver,
seenTenants,
normalize,
syncLog.id,
false,
);
perResource.push(stats);
}
} catch (err) {
firstError = err instanceof Error ? err.message : String(err);
this.logger.error(`ingestRawTables(${opts.source}) 异常: ${firstError}`);
}
// touched 患者 = 本 syncLog 写入的 transaction 的 distinct (patientId, tenantId)
const touchedRows = await this.prisma.patientTransaction.findMany({
where: { syncLogId: syncLog.id, patientId: { not: null } },
select: { patientId: true, tenantId: true },
distinct: ['patientId'],
});
const touched = touchedRows
.filter((t): t is { patientId: string; tenantId: string } => !!t.patientId)
.map((t) => ({ patientId: t.patientId, tenantId: t.tenantId }));
const agg = perResource.reduce(
(a, s) => ({
txn: a.txn + s.transactionsWritten,
dup: a.dup + s.duplicates,
failed: a.failed + s.failed,
facts: a.facts + s.factsCreated + s.factsSuperseded,
}),
{ txn: 0, dup: 0, failed: 0, facts: 0 },
);
await this.prisma.syncLog.update({
where: { id: syncLog.id },
data: {
status:
firstError && agg.txn === 0
? SyncStatus.FAILED
: firstError
? SyncStatus.PARTIAL
: SyncStatus.SUCCESS,
tenantId: [...seenTenants][0] ?? 't_unknown',
fetched: opts.rows.length,
transactionsWritten: agg.txn,
factsEmitted: agg.facts,
duplicates: agg.dup,
failed: agg.failed,
errorMessage: firstError,
endedAt: new Date(),
},
});
this.logger.log(
`ingestRawTables host=${opts.hostName} source=${opts.source} rows=${opts.rows.length} ` +
`txn=${agg.txn} dup=${agg.dup} failed=${agg.failed} touched=${touched.length}`,
);
return { syncLogId: syncLog.id, perResource, touched };
}
/// host → 配置目录(= sync.service.resolveDataDir 同约定:env PAC_INCREMENTAL_DATA_DIR ?? cwd/data,再拼 hostName)
private resolveHostDir(hostName: string): string {
const base = process.env.PAC_INCREMENTAL_DATA_DIR ?? path.resolve(process.cwd(), 'data');
return path.join(base, hostName);
}
async importDirectory(
dir: string,
options: {
......
......@@ -2,8 +2,10 @@ import { Injectable, Logger } from '@nestjs/common';
import { SyncDirection, SyncStatus } from '@pac/types';
import { PrismaService } from '../../../prisma/prisma.service';
import { PipelineDispatcher, type EmitsResolver } from '../pipeline/pipeline-dispatcher.service';
import { ColdImportService } from '../cold-import/cold-import.service';
import { QueueProducer } from '../../../queues/queue-producer.service';
import type { EmitsConfig } from '../assembler/assembler.schema';
import type { PushEvent, PushEventsResponse } from './push.schema';
import type { PushEvent, PushEventsResponse, PushRowsResponse } from './push.schema';
/**
* PushReceiver — 处理已验签的 push 事件批。
......@@ -25,8 +27,75 @@ export class PushReceiverService {
constructor(
private readonly prisma: PrismaService,
private readonly dispatcher: PipelineDispatcher,
private readonly coldImport: ColdImportService,
private readonly queue: QueueProducer,
) {}
/**
* 形态 A:接收原生表行(/push/rows)。
*
* 复用 cold-import/reparse 同核(ColdImportService.ingestRawTables):transforms + assembler + 版本流落库,
* 然后对 touched 患者入队【画像重算】(plan 不在此触发,由定时任务兜底)。
*/
async receiveRows(input: {
hostId: string;
hostName: string;
source: string;
rows: Record<string, unknown>[];
}): Promise<PushRowsResponse> {
const { hostId, hostName, source, rows } = input;
const r = await this.coldImport.ingestRawTables({
hostName,
source,
rows,
triggeredBy: `push:${hostName}:${source}`,
});
// 事件驱动:touched 患者入队 persona-recompute(BullMQ jobId 去重 + eventWatermark 幂等 + 凌晨 cron 兜底)
// ⚠️ plan 不在此触发(交给现有定时任务),与单患者刷新路径有意区分。
let personaEnqueued = 0;
for (const { patientId, tenantId } of r.touched) {
try {
await this.queue.enqueuePersonaRecompute({
hostId,
tenantId,
patientId,
triggeredBy: `push:${hostName}`,
});
personaEnqueued++;
} catch (err) {
this.logger.warn(
`enqueue persona-recompute failed patient=${patientId}: ${err instanceof Error ? err.message : err}`,
);
}
}
const agg = r.perResource.reduce(
(a, s) => ({
txn: a.txn + s.transactionsWritten,
dup: a.dup + s.duplicates,
failed: a.failed + s.failed,
}),
{ txn: 0, dup: 0, failed: 0 },
);
this.logger.log(
`push/rows host=${hostName} source=${source} rows=${rows.length} ` +
`txn=${agg.txn} dup=${agg.dup} failed=${agg.failed} personaEnqueued=${personaEnqueued}`,
);
return {
syncLogId: r.syncLogId,
source,
accepted: rows.length,
transactionsWritten: agg.txn,
duplicates: agg.dup,
failed: agg.failed,
personaEnqueued,
};
}
async receive(input: {
hostId: string;
hostName: string;
......
......@@ -11,7 +11,12 @@ import { ApiOperation, ApiTags } from '@nestjs/swagger';
import { Public } from '../../../common/decorators/public.decorator';
import { HmacVerifier } from './hmac-verifier.service';
import { PushReceiverService } from './push-receiver.service';
import { PushEventsRequestSchema, type PushEventsRequest } from './push.schema';
import {
PushEventsRequestSchema,
type PushEventsRequest,
PushRowsRequestSchema,
type PushRowsRequest,
} from './push.schema';
/**
* PushController — 宿主主动推送事件入口(webhook)。
......@@ -73,4 +78,44 @@ export class PushController {
events: parsed.events,
});
}
/**
* 形态 A:推原生表行(推荐路径)。
*
* Body: { tenantId?, source, rows: [{原生字段照抄}, ...] }
* 复用 pull/cold-import 同一条摄入管线(transforms + assembler + 版本流),
* 落库后对 touched 患者入队画像重算(plan 由定时任务兜底)。
*/
@Post('rows')
@Public()
@HttpCode(200)
@ApiOperation({
summary: '宿主推送原生表行 webhook(形态 A,HMAC 验签)',
description: 'Headers 必填 X-PAC-Host-Id / X-PAC-Timestamp / X-PAC-Signature。Body: { tenantId?, source, rows[] }。',
})
async receiveRows(
@Req() req: Request & { rawBody?: Buffer },
@Headers('x-pac-host-id') hostIdHeader: string | undefined,
@Headers('x-pac-timestamp') timestampHeader: string | undefined,
@Headers('x-pac-signature') signatureHeader: string | undefined,
@Body() body: PushRowsRequest,
) {
const rawBody = req.rawBody ? req.rawBody.toString('utf8') : JSON.stringify(body);
const { hostId, hostName } = await this.verifier.verify({
hostIdHeader,
timestampHeader,
signatureHeader,
rawBody,
});
const parsed = PushRowsRequestSchema.parse(body);
return this.receiver.receiveRows({
hostId,
hostName,
source: parsed.source,
rows: parsed.rows,
});
}
}
......@@ -43,3 +43,38 @@ export const PushEventsResponseSchema = z.object({
});
export type PushEventsResponse = z.infer<typeof PushEventsResponseSchema>;
/**
* 形态 A:推【原生表行】契约 — 宿主把"导出给数仓的那种原生表行"照抄推来。
*
* { tenantId?, source, rows: [ {原生字段照抄...}, ... ] }
*
* - `source` = 原生源表名(= 宿主数据字典里的表名 / manifest tables[].table)
* - `rows` = 原生行,PAC 用该 host 的 yaml 跑 transforms + assembler 拆分/映射/落库
* - `tenantId`= 选填;PAC 默认按 manifest 的 brand→tenant 逐行解析(同 pull/reparse)
*
* 跟 pull / cold-import / reparse 走同一条摄入管线,只是数据入口是 webhook。
*/
export const PushRowsRequestSchema = z.object({
tenantId: z.string().min(1).optional(),
source: z.string().min(1).describe('原生源表名(= manifest tables[].table)'),
rows: z
.array(z.record(z.string(), z.unknown()))
.min(1)
.max(2000)
.describe('原生表行,字段照抄'),
});
export type PushRowsRequest = z.infer<typeof PushRowsRequestSchema>;
export const PushRowsResponseSchema = z.object({
syncLogId: z.string().uuid(),
source: z.string(),
accepted: z.number().int().describe('收到的原生行数'),
transactionsWritten: z.number().int(),
duplicates: z.number().int(),
failed: z.number().int(),
personaEnqueued: z.number().int().describe('入队画像重算的患者数(plan 不在此触发,由定时任务保)'),
});
export type PushRowsResponse = z.infer<typeof PushRowsResponseSchema>;
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment