Commit 48d9aee0 by luoqi

fix(sync): 增量列患者改为各游标表变更患者的并集 — 修"约诊未到诊"类永久漏数

根因(2026-06-11 哨兵空转告警,服务器 DW 逐行核验):
增量 cohort 列患者只查 patient_list_from.last_visit_time > cursor(=近期来过诊的人),
漏掉"没来就诊但数据有变"的患者 — 典型即 PAC 自身业务闭环:客服电话约复诊(预约
updated_date 变了,人没来)、医生回头补写病历。当日 24 行预约 + 3 行 EMR 超游标,
27/27 全属此类患者(去重 22 人,老口径列出 0 人);且等患者到诊时游标已越过这些行
→ 永久漏数。

修复:增量模式下列患者 = 每张配了 cursor 的源表按各自游标取变更行患者,UNION ALL
去重(manifest 驱动,FROM 主表正则提取,不写死表名;宿主无关)。
- 全量/无游标首跑:保持原 patient_list_from 行为不变;
- 采样 LIMIT 的 list_cursor_column 排序在 union 模式回退按 patient key(仅 dev 用);
- refund 分支与 settlement 同表重复,DISTINCT 吃掉,无害。

验证(只读):服务器 DW 实跑新 union SQL → 22 患者全部捞回;service tsc 0。
仅本地修复,未部署(待统一打包)。

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
parent 838d26b9
...@@ -357,8 +357,29 @@ export class ClickHouseSourceService { ...@@ -357,8 +357,29 @@ export class ClickHouseSourceService {
: patient_list_from; : patient_list_from;
const cursorCfg = incremental?.perQuery[listTableShort]; const cursorCfg = incremental?.perQuery[listTableShort];
const cursorCol = list_cursor_column ?? cursorCfg?.cursorColumn; const cursorCol = list_cursor_column ?? cursorCfg?.cursorColumn;
// ⭐ 增量列患者 = 「任一源表有 > 各自游标的行」的患者并集(UNION ALL,manifest 驱动)。
// 只看 patient_list_from.last_visit_time 会漏"没来就诊但数据有变"的患者 ——
// 典型:客服电话约了复诊(预约 updated_date 变了,人没来)、医生回头补写病历;
// 且等患者真来就诊时游标已越过那些行 → 永久漏数。
// 2026-06-11 服务器实测:增量 fetched=0 而 DW 有 24 行预约 + 3 行 EMR 超游标,
// 逐行核验 27/27 全部属于"无新就诊"患者(哨兵空转告警由此而来)。
const incBranches: string[] = [];
if (incremental) {
for (const [tbl, cfg] of Object.entries(incremental.perQuery)) {
if (!cfg.cursorColumn || !cfg.cursorValue) continue;
const m = source.queries[tbl]?.match(/FROM\s+([\w.]+)/i);
const fqn = m?.[1] ?? tbl;
const keyCols = tenant_key_column
? `${patient_key_column}, ${tenant_key_column}`
: patient_key_column;
incBranches.push(
`SELECT ${keyCols} FROM ${fqn} WHERE ${cfg.cursorColumn} > '${cfg.cursorValue.replace(/'/g, "''")}'`,
);
}
}
const unionMode = incBranches.length > 0;
const whereParts: string[] = []; const whereParts: string[] = [];
if (cursorCol && cursorCfg?.cursorValue) { if (!unionMode && cursorCol && cursorCfg?.cursorValue) {
whereParts.push(`${cursorCol} > '${cursorCfg.cursorValue.replace(/'/g, "''")}'`); whereParts.push(`${cursorCol} > '${cursorCfg.cursorValue.replace(/'/g, "''")}'`);
} }
// dev/ops:只摄入指定患者(PAC_COHORT_ONLY_PATIENT=<patient_id> 或逗号列表 id1,id2,...) // dev/ops:只摄入指定患者(PAC_COHORT_ONLY_PATIENT=<patient_id> 或逗号列表 id1,id2,...)
...@@ -382,7 +403,7 @@ export class ClickHouseSourceService { ...@@ -382,7 +403,7 @@ export class ClickHouseSourceService {
if (sampleMode === 'random') { if (sampleMode === 'random') {
extraSelect = ', rand() AS _samp'; // 放进 SELECT 规避 DISTINCT + ORDER BY 非选列限制 extraSelect = ', rand() AS _samp'; // 放进 SELECT 规避 DISTINCT + ORDER BY 非选列限制
orderTail = ` ORDER BY _samp LIMIT ${cohortLimit}`; orderTail = ` ORDER BY _samp LIMIT ${cohortLimit}`;
} else if (list_cursor_column) { } else if (list_cursor_column && !unionMode) {
const dir = sampleMode === 'oldest' ? 'ASC' : 'DESC'; const dir = sampleMode === 'oldest' ? 'ASC' : 'DESC';
extraSelect = `, ${list_cursor_column}`; extraSelect = `, ${list_cursor_column}`;
orderTail = ` ORDER BY ${list_cursor_column} ${dir} LIMIT ${cohortLimit}`; orderTail = ` ORDER BY ${list_cursor_column} ${dir} LIMIT ${cohortLimit}`;
...@@ -392,8 +413,11 @@ export class ClickHouseSourceService { ...@@ -392,8 +413,11 @@ export class ClickHouseSourceService {
} }
const selectCols = const selectCols =
(tenant_key_column ? `${patient_key_column}, ${tenant_key_column}` : patient_key_column) + extraSelect; (tenant_key_column ? `${patient_key_column}, ${tenant_key_column}` : patient_key_column) + extraSelect;
const sql = `SELECT DISTINCT ${selectCols} FROM ${patient_list_from}${whereSql}${orderTail}`; const listFrom = unionMode ? `(${incBranches.join(' UNION ALL ')})` : patient_list_from;
this.logger.log(`[clickhouse·cohort] list patient keys — ${sql.slice(0, 200)}`); const sql = `SELECT DISTINCT ${selectCols} FROM ${listFrom}${whereSql}${orderTail}`;
this.logger.log(
`[clickhouse·cohort] list patient keys${unionMode ? `(union of ${incBranches.length} cursored tables)` : ''}${sql.slice(0, 300)}`,
);
const started = Date.now(); const started = Date.now();
const rows = (await this.queryJsonWithRetry(client, sql, 'list-patient-keys')) as Array<Record<string, unknown>>; const rows = (await this.queryJsonWithRetry(client, sql, 'list-patient-keys')) as Array<Record<string, unknown>>;
const keys: CohortKey[] = rows.map((r) => ({ const keys: CohortKey[] = rows.map((r) => ({
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment