Commit fcc2a9d6 by luoqi

feat(sync): PR1 — partial-unique 并发锁 + run_start baseline cursor

数据正确性 2 件:
1. **并发锁**:sync_logs 加 partial UNIQUE (host_id) WHERE status='running'
   同 host 同时只能 1 个 sync 在跑(存量 / 增量 cron / 手动一律抢同一把锁)
   INSERT 撞 P2002 → 抛 SyncAlreadyRunningError → 调用方 skip
   scheduler 捕获该 error 时 warn 不 error,下次 cron 自然 retry
   CLI 撞锁退出 code=4(区分于 2=真失败 / 3=bootstrap 崩)

2. **cursor=run_start 而非 max(updated_date)**:
   存量跑期间 DW 持续写入(已摄入患者更新 / 未摄入患者新增),
   max(updated_date) cursor 会漏:
     · batch 1 摄入患者 100,T+1h DW 又写患者 100 一笔(updated_date=T+1h)
       max cursor 推到 T+4:25(末批的 max),下次增量 WHERE > T+4:25 → 漏掉这笔
   run_start cursor 保证捞回:
     · 下次 WHERE > T+0 → 全部 T+0~now 的变化都进入增量
     · 同行同 updatedAt → source_event_id 一致 → P2002 path → parser re-run idempotent
     · 同行不同 updatedAt → 不同 source_event_id → 新 tx + 新 fact 版本
   重复读浪费 read 但无害,数据正确性 > 带宽优化

importDirectory 重构:
- runStart 入口冻结
- SyncLog create 提前(在 table load 前),作为锁 acquisition 点
- 整段 work 包 try/finally,finally 统一 finalize syncLog(释放锁 + 写 cursor)
- 之前没有 finally,work throw 时 syncLog 卡在 status='running' = 永久死锁
- 加 SyncAlreadyRunningError 导出类,scheduler / CLI 分别处理

本地验证(docker exec psql):
   1st INSERT running OK
   2nd INSERT running for same host → unique violation
   UPDATE 1st status='success' 后,新 running INSERT OK(锁释放)

stale 锁兜底(进程崩留 status='running'):
  目前需人工清:
    UPDATE sync_logs SET status='aborted' WHERE status='running'
                                            AND started_at < NOW() - INTERVAL '12 hours';
  PR2/PR3 期间会加 cron 看门狗自动清。
parent f19434d7
-- 同 host 同时只能有 1 个 status='running' 的 sync_log
-- 用 partial UNIQUE 索引把"锁"内嵌到 sync_logs 行:
-- - INSERT status='running' 时撞 unique → P2002,调用方 skip(并发拦截)
-- - finalize 时 UPDATE status='success'/'failed'/'partial' → partial 条件不满足 → 锁释放
-- - 进程崩留下 stale running:依赖 cron/手动 UPDATE status='aborted' 清理(留 TTL 兜底,见
-- SyncIncrementalSchedulerService 的看门狗)
--
-- 比独立 sync_locks 表简洁:沿用现有 schema,锁状态跟 sync_log 行同步,无需双写
CREATE UNIQUE INDEX "sync_logs_one_running_per_host"
ON "sync_logs" ("host_id")
WHERE "status" = 'running';
......@@ -1339,5 +1339,8 @@ model SyncLog {
@@index([status])
/// 单患者 pull 反查:"该患者最近被刷新过几次"
@@index([patientId, startedAt])
/// MIGRATION 20260528000000:partial UNIQUE (host_id) WHERE status='running'
/// host 同时只能有 1 running ,作为存量/增量 sync 的并发锁。
/// Prisma 不支持声明式 partial UNIQUE,migration.sql 是单一真理源。
@@map("sync_logs")
}
......@@ -11,7 +11,10 @@
import { NestFactory } from '@nestjs/core';
import { Logger } from '@nestjs/common';
import { AppModule } from '../app.module';
import { ColdImportService } from '../modules/sync/cold-import/cold-import.service';
import {
ColdImportService,
SyncAlreadyRunningError,
} from '../modules/sync/cold-import/cold-import.service';
interface CliArgs {
dir?: string;
......@@ -100,9 +103,19 @@ async function bootstrap() {
}
logger.log('─────────────────────────────────────────');
} catch (err) {
logger.error('Cold import failed:');
logger.error(err instanceof Error ? err.stack ?? err.message : String(err));
process.exitCode = 2;
if (err instanceof SyncAlreadyRunningError) {
logger.error(`并发拦截:已有其他 sync 在跑同 host:${err.message}`);
logger.error('如果上次 sync 异常崩了未清理,手动 SQL 清:');
logger.error(
` UPDATE sync_logs SET status='aborted', ended_at=NOW() ` +
`WHERE host_id='<hostId>' AND status='running' AND started_at < NOW() - INTERVAL '12 hours';`,
);
process.exitCode = 4; // 4 = lock conflict(区分于 2 = 真失败 / 3 = bootstrap 崩)
} else {
logger.error('Cold import failed:');
logger.error(err instanceof Error ? err.stack ?? err.message : String(err));
process.exitCode = 2;
}
} finally {
await app.close();
}
......
......@@ -62,8 +62,12 @@ export class ColdImportService {
): Promise<ImportRunResult> {
const absDir = path.resolve(dir);
const runId = randomUUID();
// ⭐ run_start 在函数入口冻结,作为 cursor_after baseline(见下方 §finalize)
// 保证存量跑期间任何 DW 新写入(无论已摄入 / 未摄入患者),都能被下次增量 cron 捞回
const runStart = new Date();
this.logger.log(
`Cold import starting: ${absDir} (dryRun=${!!options.dryRun}, runId=${runId})`,
`Sync starting: ${absDir} (dryRun=${!!options.dryRun}, ` +
`incremental=${!!options.incremental}, runId=${runId}, run_start=${runStart.toISOString()})`,
);
// 1. Manifest
......@@ -89,9 +93,9 @@ export class ColdImportService {
` assemblers=${manifest.assemblers.length}`,
);
// 3. 一次性加载所有 raw tables(文件 / ClickHouse 二选一)
// W4 末:incremental 模式构建 cursor map(从最近一次成功 sync_log 的 cursor_after JSON 读)
// 3. 增量 cursor 准备(必须在 SyncLog 创建之前算,因为 cursorBefore 要落到 sync_log 行)
let incrementalConfig: IncrementalConfig | undefined;
let cursorBeforeJson: string | null = null;
if (options.incremental) {
const perQueryCfg = manifest.sql_source?.incremental?.per_query;
if (!perQueryCfg) {
......@@ -111,46 +115,23 @@ export class ColdImportService {
]),
),
};
}
let tables = await this.loadAllTables(absDir, manifest, incrementalConfig);
// 3.5 Layer A.5 transforms — yaml 声明式形态改造(JSON 拆行 / 派生 / 路由等)
if (manifest.transforms && manifest.transforms.length > 0) {
this.logger.log(
`Layer A.5 transforms:${manifest.transforms.length} 步,sources=${Object.keys(tables).length}`,
);
// cast 兼容:loadAllTables 返回 unknown[] per row,transforms 内部按 Record<string, unknown> 处理
const transformInputs = tables as Record<string, Record<string, unknown>[]>;
tables = this.transformEngine.run({ tables: transformInputs, transforms: manifest.transforms });
this.logger.log(
`Layer A.5 done,tables 现有 ${Object.keys(tables).length} 张` +
` (含 transform 产出):${Object.entries(tables)
.map(([k, v]) => `${k}=${v.length}`)
.join(', ')}`,
cursorBeforeJson = JSON.stringify(
Object.fromEntries(
Object.entries(incrementalConfig.perQuery).map(([k, v]) => [k, v.cursorValue ?? '']),
),
);
}
// 4. 一次性读所有 assembler configs
const assemblerConfigs = this.loadAllAssemblers(absDir, manifest);
// 必须有 patient assembler(主档)
const patientCfg = assemblerConfigs.find((c) => c.canonical === 'patient');
const subjectCfgs = assemblerConfigs.filter((c) => c.canonical !== 'patient');
// 5. SyncLog start
// SyncLog 自身的 tenantId 列只能填一个;多 tenant 跑用 '_multi' sentinel
// 4. SyncLog 创建 — ⭐ 作为并发锁
// sync_logs 表有 partial UNIQUE (host_id) WHERE status='running'(migration 20260528000000)
// 同 host 已有 running 行时,本次 INSERT 撞 P2002 → 抛 SyncAlreadyRunningError → 调用方 skip
// finally 块统一 finalize status,确保锁一定被释放(进程崩需依赖 stale 清理)
const syncLogTenant = knownTenants.length === 1 ? knownTenants[0]! : '_multi';
// W4 末:incremental 模式 → resource='incremental_bundle' + cursor_before 写入老 cursor
const syncResource = options.incremental ? 'incremental_bundle' : 'cold_import_bundle';
const cursorBeforeJson = options.incremental && incrementalConfig
? JSON.stringify(
Object.fromEntries(
Object.entries(incrementalConfig.perQuery).map(([k, v]) => [k, v.cursorValue ?? '']),
),
)
: null;
const syncLog = options.dryRun
? null
: await this.prisma.syncLog.create({
let syncLog: { id: string } | null = null;
if (!options.dryRun) {
try {
syncLog = await this.prisma.syncLog.create({
data: {
hostId: host.id,
tenantId: syncLogTenant,
......@@ -160,116 +141,173 @@ export class ColdImportService {
status: SyncStatus.RUNNING,
cursorBefore: cursorBeforeJson,
},
select: { id: true },
});
} catch (err) {
const code = (err as { code?: string }).code;
if (code === 'P2002') {
// 并发拦截 — 已有同 host 的 running sync_log,本次直接抛弃
const existing = await this.prisma.syncLog.findFirst({
where: { hostId: host.id, status: SyncStatus.RUNNING },
orderBy: { startedAt: 'desc' },
select: { id: true, startedAt: true, triggeredBy: true },
});
this.logger.warn(
`Sync 已在跑(host=${host.name}) — 本次 skip(partial-unique 锁拦截)。` +
`existing sync_log id=${existing?.id} started_at=${existing?.startedAt.toISOString()} ` +
`triggered_by=${existing?.triggeredBy}`,
);
throw new SyncAlreadyRunningError(host.name, existing);
}
throw err;
}
}
// 6. 逐 assembler 处理
// ─── 进入 try/finally:任何抛错都要走 finally finalize syncLog,释放锁 ───
const perResource: PerResourceStats[] = [];
const totals = this.zeroTotals();
let firstError: string | null = null;
let status: string = SyncStatus.FAILED;
const normalize = { amountUnit: manifest.amount_unit, timezone: manifest.timezone };
try {
// 5. 一次性加载所有 raw tables(文件 / ClickHouse 二选一)
let tables = await this.loadAllTables(absDir, manifest, incrementalConfig);
// patients 先做(建立 patient_id 索引,供 subject 资源关联)
if (patientCfg) {
const stats = await this.processPatients(
tables,
patientCfg,
host.id,
tenantResolver,
seenTenants,
normalize,
options.dryRun ?? false,
);
perResource.push(stats);
totals.patientsUpserted += stats.patientsUpserted;
totals.failed += stats.failed;
if (!firstError && stats.failed > 0)
firstError ||= `patient: ${stats.failed} rows failed`;
} else {
this.logger.warn(
`No assembler for canonical='patient';skipping patient master upsert`,
);
}
// 5.5 Layer A.5 transforms — yaml 声明式形态改造(JSON 拆行 / 派生 / 路由等)
if (manifest.transforms && manifest.transforms.length > 0) {
this.logger.log(
`Layer A.5 transforms:${manifest.transforms.length} ,sources=${Object.keys(tables).length}`,
);
const transformInputs = tables as Record<string, Record<string, unknown>[]>;
tables = this.transformEngine.run({ tables: transformInputs, transforms: manifest.transforms });
this.logger.log(
`Layer A.5 done,tables 现有 ${Object.keys(tables).length} ` +
` ( transform 产出):${Object.entries(tables)
.map(([k, v]) => `${k}=${v.length}`)
.join(', ')}`,
);
}
// 其他资源(transaction 合成 + parser 衍生 fact)
for (const cfg of subjectCfgs) {
try {
const stats = await this.processSubject(
// 6. 一次性读所有 assembler configs
const assemblerConfigs = this.loadAllAssemblers(absDir, manifest);
const patientCfg = assemblerConfigs.find((c) => c.canonical === 'patient');
const subjectCfgs = assemblerConfigs.filter((c) => c.canonical !== 'patient');
const normalize = { amountUnit: manifest.amount_unit, timezone: manifest.timezone };
// 7. patients 先做(建立 patient_id 索引,供 subject 资源关联)
if (patientCfg) {
const stats = await this.processPatients(
tables,
cfg,
patientCfg,
host.id,
tenantResolver,
seenTenants,
normalize,
syncLog?.id,
options.dryRun ?? false,
);
perResource.push(stats);
totals.transactionsWritten += stats.transactionsWritten;
totals.duplicates += stats.duplicates;
totals.patientsUpserted += stats.patientsUpserted;
totals.failed += stats.failed;
totals.factsCreated += stats.factsCreated;
totals.factsSuperseded += stats.factsSuperseded;
totals.factsUnchanged += stats.factsUnchanged;
totals.factsEvidenceAppended += stats.factsEvidenceAppended;
totals.factsFailed += stats.factsFailed;
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
firstError ||= `${cfg.canonical}: ${msg}`;
this.logger.error(`Resource ${cfg.canonical} failed: ${msg}`);
if (!firstError && stats.failed > 0)
firstError = `patient: ${stats.failed} rows failed`;
} else {
this.logger.warn(
`No assembler for canonical='patient';skipping patient master upsert`,
);
}
}
// 7. SyncLog finalize
const fetched =
totals.patientsUpserted +
totals.transactionsWritten +
totals.duplicates +
totals.failed;
const status =
totals.failed === 0
? SyncStatus.SUCCESS
: totals.patientsUpserted + totals.transactionsWritten > 0
? SyncStatus.PARTIAL
: SyncStatus.FAILED;
// 8. 其他资源(transaction 合成 + parser 衍生 fact)
for (const cfg of subjectCfgs) {
try {
const stats = await this.processSubject(
tables,
cfg,
host.id,
tenantResolver,
seenTenants,
normalize,
syncLog?.id,
options.dryRun ?? false,
);
perResource.push(stats);
totals.transactionsWritten += stats.transactionsWritten;
totals.duplicates += stats.duplicates;
totals.failed += stats.failed;
totals.factsCreated += stats.factsCreated;
totals.factsSuperseded += stats.factsSuperseded;
totals.factsUnchanged += stats.factsUnchanged;
totals.factsEvidenceAppended += stats.factsEvidenceAppended;
totals.factsFailed += stats.factsFailed;
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
firstError ||= `${cfg.canonical}: ${msg}`;
this.logger.error(`Resource ${cfg.canonical} failed: ${msg}`);
}
}
// W4 末:incremental cursor_after — 合并老 cursor + 本批新增 max
// 本批某表 0 行(无新数据)→ 保留老 cursor 不变(下次同样 WHERE > old_cursor)
// 本批某表有行 → cursor 推进到本批 max(updated_date)
let cursorAfterJson: string | null = null;
if (options.incremental && incrementalConfig) {
const oldCursors: Record<string, string> = JSON.parse(cursorBeforeJson ?? '{}');
const advances = incrementalConfig.cursorAdvances ?? {};
const merged = { ...oldCursors };
for (const [tbl, newVal] of Object.entries(advances)) {
const old = merged[tbl];
if (!old || newVal > old) merged[tbl] = newVal;
status =
totals.failed === 0
? SyncStatus.SUCCESS
: totals.patientsUpserted + totals.transactionsWritten > 0
? SyncStatus.PARTIAL
: SyncStatus.FAILED;
} catch (workErr) {
// 重大工作异常(载入失败 / transform 引擎崩 / DB 整体故障等)→ failed,但仍走 finally 释放锁
const msg = workErr instanceof Error ? workErr.message : String(workErr);
firstError ||= `fatal: ${msg}`;
status = SyncStatus.FAILED;
this.logger.error(`Sync work fatal: ${msg}`);
// 不 rethrow — 让 finally 把状态写回 DB,然后再 throw 给调用方
// 但调用方需要知道这次失败,所以 finally 之后重抛
// eslint-disable-next-line no-unsafe-finally,@typescript-eslint/no-unsafe-finally
} finally {
// ⭐ 不管成功失败,把 syncLog 标 final status(释放 partial-unique 锁)
// ⭐ cursor_after = run_start ISO(关键!不是 max(updated_date) — 避免漏批次间 DW 新增)
// 理由:run_start 之后 DW 任何写入,下次增量 WHERE > run_start 都能捞回
// - 同行不同 updatedAt → source_event_id 不同 → 新 tx + 新 fact 版本(parser supersede 旧版)
// - 同行同 updatedAt → P2002 conflict → parser re-run(idempotent,无害)
let cursorAfterJson: string | null = null;
if (options.incremental && incrementalConfig) {
const runStartIso = runStart.toISOString();
const oldCursors: Record<string, string> = JSON.parse(cursorBeforeJson ?? '{}');
const merged = { ...oldCursors };
for (const tbl of Object.keys(incrementalConfig.perQuery)) {
// 防御:cursor 不倒退(老 cursor > runStart 时保留老 — 罕见,跨时钟漂移)
if (!merged[tbl] || runStartIso > merged[tbl]) {
merged[tbl] = runStartIso;
}
}
cursorAfterJson = JSON.stringify(merged);
this.logger.log(`Incremental cursor 写(run_start baseline):${cursorAfterJson}`);
}
if (syncLog) {
const fetched =
totals.patientsUpserted +
totals.transactionsWritten +
totals.duplicates +
totals.failed;
await this.prisma.syncLog.update({
where: { id: syncLog.id },
data: {
status,
fetched,
transactionsWritten: totals.transactionsWritten,
factsEmitted: totals.factsCreated + totals.factsSuperseded,
duplicates: totals.duplicates,
failed: totals.failed,
errorMessage: firstError,
endedAt: new Date(),
cursorAfter: cursorAfterJson,
},
});
}
cursorAfterJson = JSON.stringify(merged);
this.logger.log(`Incremental cursor 写:${cursorAfterJson}`);
}
if (syncLog) {
await this.prisma.syncLog.update({
where: { id: syncLog.id },
data: {
status,
fetched,
transactionsWritten: totals.transactionsWritten,
factsEmitted: totals.factsCreated + totals.factsSuperseded,
duplicates: totals.duplicates,
failed: totals.failed,
errorMessage: firstError,
endedAt: new Date(),
cursorAfter: cursorAfterJson,
},
});
}
const result: ImportRunResult = {
runId,
hostId: host.id,
hostName: host.name,
// 实际跑过程见到的 tenant 集合(pass-through 模式下 declared 为空,这里是真实结果)
tenantIds: [...seenTenants].sort(),
dryRun: !!options.dryRun,
syncLogId: syncLog?.id,
......@@ -279,7 +317,7 @@ export class ColdImportService {
};
this.logger.log(
`Cold import done: status=${status} patients=${totals.patientsUpserted} txns=${totals.transactionsWritten} dups=${totals.duplicates} failed=${totals.failed} ` +
`Sync done: status=${status} patients=${totals.patientsUpserted} txns=${totals.transactionsWritten} dups=${totals.duplicates} failed=${totals.failed} ` +
`facts(created=${totals.factsCreated} superseded=${totals.factsSuperseded} unchanged=${totals.factsUnchanged} evidence=${totals.factsEvidenceAppended} failed=${totals.factsFailed})`,
);
return result;
......@@ -913,3 +951,27 @@ export interface ImportRunResult {
totals: TotalsBlock;
perResource: PerResourceStats[];
}
/**
* 并发拦截 — 同 host 已有 status='running' 的 sync_log 时抛出。
* 由 sync_logs 的 partial UNIQUE (host_id) WHERE status='running' 约束触发(P2002)。
*
* 调用方处理建议:
* - cron(SyncIncrementalScheduler):捕获本异常 → warn log + skip + 等下次 cron
* - 手动 CLI:打印 error + 退出 code 2(让运维知道有其他 sync 在跑)
* - 单患者 refresh:本异常不会抛(走 resource='patient_refresh' 不参与该锁)
*/
export class SyncAlreadyRunningError extends Error {
constructor(
public readonly hostName: string,
public readonly existing: { id: string; startedAt: Date; triggeredBy: string | null } | null,
) {
super(
`SYNC_ALREADY_RUNNING:host=${hostName}` +
(existing
? ` existing=${existing.id} started_at=${existing.startedAt.toISOString()} triggered_by=${existing.triggeredBy}`
: ''),
);
this.name = 'SyncAlreadyRunningError';
}
}
......@@ -2,7 +2,10 @@ import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import * as path from 'path';
import { PrismaService } from '../prisma/prisma.service';
import { ColdImportService } from '../modules/sync/cold-import/cold-import.service';
import {
ColdImportService,
SyncAlreadyRunningError,
} from '../modules/sync/cold-import/cold-import.service';
import { PersonaService } from '../modules/persona/persona.service';
import { PlanEngineService } from '../modules/plan/engine/plan-engine.service';
......@@ -56,9 +59,16 @@ export class SyncIncrementalSchedulerService {
try {
await this.runOne(path.join(dataDir, host));
} catch (err) {
this.logger.error(
`sync-incremental: host=${host} failed: ${(err as Error).message}`,
);
if (err instanceof SyncAlreadyRunningError) {
// 并发拦截 — 已有手动跑或上次 cron 还没完。下次 cron 自然 retry。
this.logger.warn(
`sync-incremental: host=${host} skip(并发锁拦截):${err.message}`,
);
} else {
this.logger.error(
`sync-incremental: host=${host} failed: ${(err as Error).message}`,
);
}
// 不抛 — 下个 host 继续;cursor 没前进自然下次 catchup
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment