Commit cc3eb275 by luoqi

fix(sync): 增量空跑不推进游标 + 回看重叠窗 — 修 DW 迟到导致的静默漏拉

根因:DW 当天落库晚于 cron(03:30 沪)→ 增量 cohort 拉 0,但 cursor_after 在
finally 里【无条件】推进到 run_start,使 updated_date≤run_start 但落库迟到的行被
永久埋在游标下方(2026-06-14 实测:DW 有 ~3368 行 06-13 变更,03:30 跑却 fetched=0,
游标推进后这批再也捞不回)。

A 空跑不推进:增量 cohort 为空(已含回看仍 0 行)→ 保留上次 ISO 水位、不推进到
  run_start,下次增量重试同窗口,DW 落库后即可捞回。
B 回看重叠窗:每次增量查询下界 = 游标 − PAC_INCREMENTAL_LOOKBACK_HOURS(默认 48h),
  靠 source_event_id 幂等去重容忍 DW 迟到 / 部分落库;设 0 关闭。

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
parent 943c7d44
......@@ -119,6 +119,12 @@ PAC_INCREMENTAL_CRON=
# 设值(逗号分隔)→ 显式 override,只同步列出的 host(staging 错峰 / 临时只跑某 host 用)。
PAC_INCREMENTAL_HOSTS=
# 增量回看重叠窗(小时)— 容忍 DW 落库迟到(updated_date 旧但落库晚于上次 run_start)。
# 每次增量查询下界 = 游标 − 本值,靠 source_event_id 幂等去重(重扫已摄入行无害)。
# 不设 → 默认 48h。设 0 → 关闭(严格 > 游标,有静默漏拉风险,不建议生产用)。
# 配合「空跑不推进游标」(代码内)双保险:DW 当天刷新晚于 cron 时不会漏数据。
PAC_INCREMENTAL_LOOKBACK_HOURS=
# 其他 cron(同步策略:env 不设 → 不跑)
# local: 全部不设
# staging/prod:
......
......@@ -531,6 +531,10 @@ export class ColdImportService {
// file 源(无 sql_source)→ 永远 single-shot,不参与 cursor
let incrementalConfig: IncrementalConfig | undefined;
let cursorBeforeJson: string | null = null;
// A 修复:增量 cohort 空跑标记 + 上次水位 baseline(ISO)。空跑时 finally 不推进游标,
// 避免推进到 run_start 后把 DW 迟到的旧 updated_date 行永久埋在游标下方(静默漏拉)。
let incrementalNoData = false;
let incrementalBaselineCursor: Record<string, string> = {};
const perQueryCfg = manifest.sql_source?.incremental?.per_query;
if (perQueryCfg) {
// 默认读 cursor;options.incremental === false 时 (强制 full) 不读
......@@ -538,8 +542,11 @@ export class ColdImportService {
const lastCursor = ignoreCursor
? {} as Record<string, string>
: await this.readLastIncrementalCursor(host.id);
incrementalBaselineCursor = lastCursor; // A:空跑时回退到此(原始 ISO 水位)
const lookbackHours = resolveIncrementalLookbackHours(); // B:回看重叠窗
this.logger.log(
`Cursor:${ignoreCursor ? '(显式忽略 — --full)' : Object.keys(lastCursor).length === 0 ? '(首跑,全量)' : JSON.stringify(lastCursor)}`,
`Cursor:${ignoreCursor ? '(显式忽略 — --full)' : Object.keys(lastCursor).length === 0 ? '(首跑,全量)' : JSON.stringify(lastCursor)}` +
(lookbackHours > 0 ? ` (回看窗 -${lookbackHours}h)` : ''),
);
incrementalConfig = {
perQuery: Object.fromEntries(
......@@ -553,7 +560,10 @@ export class ColdImportService {
// 都"小于"游标 → 增量永远 fetched=0(2026-06-10 服务器实测三天空转)。
// 这里读取侧转成 DW 时区的同款格式,旧游标无需迁移。
cursorValue: lastCursor[table]
? toDwCursorLiteral(lastCursor[table], manifest.timezone)
? toDwCursorLiteral(
shiftIsoHours(lastCursor[table], -lookbackHours),
manifest.timezone,
)
: null,
},
]),
......@@ -656,6 +666,8 @@ export class ColdImportService {
incrementalConfig,
);
if (allPairs.length === 0) {
// A:增量模式 + cohort 空 → DW 本窗口(已含回看)无任何数据 → 标记不推进游标
if (incrementalConfig) incrementalNoData = true;
this.logger.log(`Cohort: 0 patients in scope(增量空跑或源数据为空) skip batch`);
} else {
const batches = chunk(allPairs, cohortBatchSize);
......@@ -793,17 +805,31 @@ export class ColdImportService {
// 不会因为 --full 没写 cursor 而漏掉 --full 期间 DW 新写入的数据。
let cursorAfterJson: string | null = null;
if (incrementalConfig) {
const runStartIso = runStart.toISOString();
const oldCursors: Record<string, string> = JSON.parse(cursorBeforeJson ?? '{}');
const merged = { ...oldCursors };
for (const tbl of Object.keys(incrementalConfig.perQuery)) {
// 防御:cursor 不倒退(老 cursor > runStart 时保留老 — 罕见,跨时钟漂移)
if (!merged[tbl] || runStartIso > merged[tbl]) {
merged[tbl] = runStartIso;
if (incrementalNoData) {
// ⭐ A 修复:增量空跑(DW 本窗口含回看仍 0 行)→ 不推进游标,保留上次水位。
// 根因:DW 当天落库可能晚于 run_start;若空跑也推进到 run_start,那些
// updated_date≤run_start 但落库迟到的行会被永久埋在游标下方(静默漏拉,2026-06-14 实测)。
// 保留上次 cursor_after(ISO baseline)→ 下次增量重试同窗口,DW 落库后即可捞回。
cursorAfterJson =
Object.keys(incrementalBaselineCursor).length > 0
? JSON.stringify(incrementalBaselineCursor)
: null;
this.logger.log(
`Cursor 不推进(增量空跑,保留上次水位):${cursorAfterJson ?? '(首跑无 cursor)'}`,
);
} else {
const runStartIso = runStart.toISOString();
const oldCursors: Record<string, string> = JSON.parse(cursorBeforeJson ?? '{}');
const merged = { ...oldCursors };
for (const tbl of Object.keys(incrementalConfig.perQuery)) {
// 防御:cursor 不倒退(老 cursor > runStart 时保留老 — 罕见,跨时钟漂移)
if (!merged[tbl] || runStartIso > merged[tbl]) {
merged[tbl] = runStartIso;
}
}
cursorAfterJson = JSON.stringify(merged);
this.logger.log(`Cursor (run_start baseline):${cursorAfterJson}`);
}
cursorAfterJson = JSON.stringify(merged);
this.logger.log(`Cursor 写(run_start baseline):${cursorAfterJson}`);
}
if (syncLog) {
const fetched =
......@@ -2214,6 +2240,26 @@ function toDwCursorLiteral(value: string, timeZone: string): string {
}).format(d);
}
/// 增量回看重叠窗(小时)— B 修复。游标查询下界统一减去该窗口,靠 source_event_id 幂等去重。
/// 作用:容忍 DW 落库迟到(updated_date 旧、但落库晚于上次 run_start;否则被推进的游标埋掉 → 静默漏拉)。
/// 默认 48h;PAC_INCREMENTAL_LOOKBACK_HOURS=0 关闭(回到严格 > cursor)。
function resolveIncrementalLookbackHours(): number {
const raw = process.env.PAC_INCREMENTAL_LOOKBACK_HOURS;
if (raw == null || raw === '') return 48;
const n = Number(raw);
return Number.isFinite(n) && n >= 0 ? n : 48;
}
/// 把 ISO 时间戳前移 deltaHours 小时(负数=往前),返回新 ISO。
/// 非 ISO(罕见 legacy DW 字面量)或非法值 → 原样返回(下游 toDwCursorLiteral 会再兜底)。
function shiftIsoHours(value: string, deltaHours: number): string {
if (deltaHours === 0) return value;
if (!/\d{4}-\d{2}-\d{2}T/.test(value)) return value;
const d = new Date(value);
if (Number.isNaN(d.getTime())) return value;
return new Date(d.getTime() + deltaHours * 3600_000).toISOString();
}
/// 回访治疗项「大类 · 子项」合并值归一(transforms H 的 concat)。
/// 两列皆空 → 合并出来只剩 " · "(纯分隔符/空白)→ 视为无治疗项,归 null;有真实内容则 trim 返回。
function normalizeMergedItems(v: string | undefined): string | null {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment