Commit 24a7ce2c by luoqi

refactor(sync): cohort config 宿主无关化 — 表名/列名移到 manifest

修复 PR2/PR4 引入的硬编码违反"各宿主只 yaml 不同"原则的问题。

问题:
  listPatientPairs / loadTablesForCohort / injectCohortFilter 硬编码了
  jvs-dw 专属的 `dw_group.fact_client_out` + `patient_id` + `brand`,
  其它 host 接入需改代码 → 违反 PAC 核心设计(摄入流程跟宿主无关)。

修复:
  manifest.schema 加 sql_source.cohort 配置段:
    patient_list_from    列患者清单的源表全名(库.表)
    patient_key_column   患者主键列(所有源表共用做 cohort 过滤,默认 patient_id)
    tenant_key_column    租户区分列(可选;jvs-dw=brand;单 tenant host 删此行)
    list_cursor_column   列患者增量 cursor 列(对应主档表时间列)

  ClickHouseSourceService:
    - CohortKey 类型 { key, tenant? } 替代 { patient_id, brand }(值载体,列名外置)
    - listPatientPairs:SELECT DISTINCT <key>[,<tenant>] FROM <list_from>
      [WHERE <cursor_col> > x] ORDER BY <key> — 全从 cohort 配置读
    - loadTablesForCohort + buildCohortClause:
      有 tenant_key → (key,tenant) IN ((..))  无 → key IN (..)
    - injectCohortFilter 接收已构造好的 clause,不再硬编码列名

  cold-import.service:
    - cohort 类型改 CohortKey;canCohort 检查 manifest.sql_source.cohort 存在
    - 配了 cohortBatchSize 但没 cohort 段 → warn + 退回 single-shot

  manifest.yaml(jvs-dw)加 cohort 段:
    patient_list_from: dw_group.fact_client_out
    patient_key_column: patient_id
    tenant_key_column: brand
    list_cursor_column: last_visit_time

本地端到端验证(33,400 患者 / 759k tx):
   内存峰值 389MB(对比服务器 OOM 7.6GB)— cohort batching 决定性
   8 个 subject_type 全覆盖(含之前服务器 0 的 emr/payment/image)
   并发锁拦截 + cursor=run_start 推进
   每批增量提交(checkpoint)
   plan compose:10,028 plans / 17,828 reasons
   sub_key tooth-overlap union-find(impacted_tooth@18;28;38;48 多牙合并)
   K01-K08 全场景召回触发

新 host 接入清单(零代码):
  1. manifest.yaml 写 connection + queries + incremental.per_query + cohort 段
  2. 写 assemblers/*.yaml(canonical 映射)
  3. 写 transforms(如需 JSON 拆行等)
  完事 — sync / cohort / 增量 / 锁 全部复用
parent 3fd28974
...@@ -88,6 +88,14 @@ sql_source: ...@@ -88,6 +88,14 @@ sql_source:
fact_settlement_out_refund: { cursor_column: updated_date } fact_settlement_out_refund: { cursor_column: updated_date }
fact_settlement_mode_out: { cursor_column: updated_date } fact_settlement_mode_out: { cursor_column: updated_date }
# ⭐ Cohort 分批配置(PR2/PR4)— 宿主无关性的关键
# PAC 摄入代码不硬编码任何表名/列名,全从这里读;别的 host 接入只改本段
cohort:
patient_list_from: dw_group.fact_client_out # 列患者清单的源表(患者主档表)
patient_key_column: patient_id # 患者主键列(所有源表共用此列名做 cohort 过滤)
tenant_key_column: brand # 租户区分列(jvs-dw 多 brand;单 tenant host 删本行)
list_cursor_column: last_visit_time # 列患者时增量 cursor 列(对应患者主档表时间列)
# SQL 最朴素化 — host(DW)给的数据范围就是 PAC 该消化的范围。 # SQL 最朴素化 — host(DW)给的数据范围就是 PAC 该消化的范围。
# PAC 这边不预设诊所 / brand / 时间过滤,数据来什么就是什么。 # PAC 这边不预设诊所 / brand / 时间过滤,数据来什么就是什么。
# dev 期的 cohort LIMIT 已去除 → 全量;只保留必要业务过滤(退费状态)。 # dev 期的 cohort LIMIT 已去除 → 全量;只保留必要业务过滤(退费状态)。
......
...@@ -21,6 +21,13 @@ export interface IncrementalConfig { ...@@ -21,6 +21,13 @@ export interface IncrementalConfig {
cursorAdvances?: Record<string, string>; cursorAdvances?: Record<string, string>;
} }
/// Cohort batch 的患者 key — key=患者主键值,tenant=租户区分值(可选,仅配了 tenant_key_column)
/// 列名由 manifest.sql_source.cohort 声明,本接口只承载值(宿主无关)
export interface CohortKey {
key: string;
tenant?: string;
}
/// W4 末:单患者刷新 scope(详情页"刷新"按钮) /// W4 末:单患者刷新 scope(详情页"刷新"按钮)
/// 把 manifest queries 的 cohort 子查询替换为 `WHERE patient_id='X' AND brand='Y'` /// 把 manifest queries 的 cohort 子查询替换为 `WHERE patient_id='X' AND brand='Y'`
/// 不跑 cursor、不写 cursor、不反向拉主档(本身就是单患者全量) /// 不跑 cursor、不写 cursor、不反向拉主档(本身就是单患者全量)
...@@ -234,18 +241,31 @@ export class ClickHouseSourceService { ...@@ -234,18 +241,31 @@ export class ClickHouseSourceService {
} }
/** /**
* 列出全部 patient (patient_id, brand) pair,作为 cohort batch 的 ORDER 边界。 * 列出全部 patient cohort key,作为 cohort batch 的 ORDER 边界。
*
* ⭐ 宿主无关:表名 / 主键列 / 租户列 / cursor 列全部从 manifest.sql_source.cohort 读,
* 代码不硬编码任何 jvs-dw 专属名称。各 host 改 yaml 即可。
* *
* 用 fact_client_out 当 source of truth(它就是 patient 主档表); * SQL 形态:
* 增量模式 cursorAfter 用 last_visit_time 列(同 manifest.sql_source.incremental.per_query.fact_client_out)。 * SELECT DISTINCT <patient_key>[, <tenant_key>]
* FROM <patient_list_from>
* [WHERE <list_cursor_column> > '<cursor>']
* ORDER BY <patient_key> ← 排序保证 chunk 分页稳定
* *
* ORDER BY patient_id ASC 保证 OFFSET LIMIT 分页稳定; * 返回 CohortKey[]:{ key, tenant? }(tenant 仅当配了 tenant_key_column)
* 跨 batch 时同一 patient 不会拆开(LIMIT 5000 = 拿 5000 个 distinct patient)。
*/ */
async listPatientPairs( async listPatientPairs(
source: ClickHouseSource, source: ClickHouseSource,
incremental?: IncrementalConfig, incremental?: IncrementalConfig,
): Promise<Array<{ patient_id: string; brand: string }>> { ): Promise<CohortKey[]> {
const cohort = source.cohort;
if (!cohort) {
throw new Error(
'listPatientPairs 需要 manifest.sql_source.cohort 配置(patient_list_from / patient_key_column 等)',
);
}
const { patient_list_from, patient_key_column, tenant_key_column, list_cursor_column } = cohort;
const conn = this.resolveConnection(source); const conn = this.resolveConnection(source);
const client = createClient({ const client = createClient({
url: conn.url, url: conn.url,
...@@ -256,45 +276,60 @@ export class ClickHouseSourceService { ...@@ -256,45 +276,60 @@ export class ClickHouseSourceService {
compression: { response: true, request: false }, compression: { response: true, request: false },
}); });
try { try {
const cursorCfg = incremental?.perQuery['fact_client_out']; // 增量 cursor:patient 列表表的 cursor 值(从 perQuery 拿,key = patient_list_from 表名)
// 表名可能含库前缀(dw_group.fact_client_out),perQuery key 是不带库前缀的(fact_client_out)
const listTableShort = patient_list_from.includes('.')
? patient_list_from.split('.').pop()!
: patient_list_from;
const cursorCfg = incremental?.perQuery[listTableShort];
const cursorCol = list_cursor_column ?? cursorCfg?.cursorColumn;
const whereParts: string[] = []; const whereParts: string[] = [];
if (cursorCfg?.cursorValue) { if (cursorCol && cursorCfg?.cursorValue) {
whereParts.push( whereParts.push(`${cursorCol} > '${cursorCfg.cursorValue.replace(/'/g, "''")}'`);
`${cursorCfg.cursorColumn} > '${cursorCfg.cursorValue.replace(/'/g, "''")}'`,
);
} }
const whereSql = whereParts.length > 0 ? ` WHERE ${whereParts.join(' AND ')}` : ''; const whereSql = whereParts.length > 0 ? ` WHERE ${whereParts.join(' AND ')}` : '';
const sql = `SELECT DISTINCT patient_id, brand FROM dw_group.fact_client_out${whereSql} ORDER BY patient_id`; const selectCols = tenant_key_column
this.logger.log(`[clickhouse·cohort] list patient pairs — ${sql.slice(0, 200)}`); ? `${patient_key_column}, ${tenant_key_column}`
: patient_key_column;
const sql = `SELECT DISTINCT ${selectCols} FROM ${patient_list_from}${whereSql} ORDER BY ${patient_key_column}`;
this.logger.log(`[clickhouse·cohort] list patient keys — ${sql.slice(0, 200)}`);
const started = Date.now(); const started = Date.now();
const result = await client.query({ query: sql, format: 'JSONEachRow' }); const result = await client.query({ query: sql, format: 'JSONEachRow' });
const rows = (await result.json()) as Array<{ patient_id: string; brand: string }>; const rows = (await result.json()) as Array<Record<string, unknown>>;
const keys: CohortKey[] = rows.map((r) => ({
key: String(r[patient_key_column] ?? ''),
tenant: tenant_key_column ? String(r[tenant_key_column] ?? '') : undefined,
}));
this.logger.log( this.logger.log(
`[clickhouse·cohort] → ${rows.length} distinct (patient_id, brand) pairs, ${Date.now() - started} ms`, `[clickhouse·cohort] → ${keys.length} distinct cohort key${tenant_key_column ? ' (含 tenant)' : ''},${Date.now() - started} ms`,
); );
return rows; return keys;
} finally { } finally {
await client.close(); await client.close();
} }
} }
/** /**
* 加载指定 patient 集合的所有 source 表(cohort batch 模式入口)。 * 加载指定 cohort key 集合的所有 source 表(cohort batch 模式入口)。
* *
* 跟 loadAllTables 的差别: * ⭐ 宿主无关:cohort 过滤列从 manifest.sql_source.cohort 读,不硬编码。
* - 每张表 SQL 注入 `(patient_id, brand) IN ((p1,b1),(p2,b2),...)` 过滤
* - 增量 cursor 跟全量一样支持(WHERE cursor_col > val AND in (...) 二者并存)
* - 不做反向拉主档(本批 fact_client_out 已含本批所有 patient 主档,无需补)
* - default_limit 仍然应用(防御性,本批数据通常远小于 limit)
* *
* 用途:importDirectory 在 cohort batch 模式下按 patient 段调用, * 跟 loadAllTables 的差别:
* 每批跑完释放内存,bound 资源使用。 * - 每张表 SQL 注入 cohort 过滤:
* 有 tenant_key:`(patient_key, tenant_key) IN ((k1,t1),(k2,t2),...)`
* 无 tenant_key:`patient_key IN (k1,k2,...)`
* - 增量 cursor 跟全量一样支持(二者 AND 并存)
* - 不做反向拉主档(本批 patient 主档已含本批所有 patient,无需补)
*/ */
async loadTablesForCohort( async loadTablesForCohort(
source: ClickHouseSource, source: ClickHouseSource,
pairs: ReadonlyArray<{ patient_id: string; brand: string }>, cohortKeys: ReadonlyArray<CohortKey>,
incremental?: IncrementalConfig, incremental?: IncrementalConfig,
): Promise<Record<string, unknown[]>> { ): Promise<Record<string, unknown[]>> {
const cohort = source.cohort;
if (!cohort) {
throw new Error('loadTablesForCohort 需要 manifest.sql_source.cohort 配置');
}
const conn = this.resolveConnection(source); const conn = this.resolveConnection(source);
const client = createClient({ const client = createClient({
url: conn.url, url: conn.url,
...@@ -305,13 +340,8 @@ export class ClickHouseSourceService { ...@@ -305,13 +340,8 @@ export class ClickHouseSourceService {
compression: { response: true, request: false }, compression: { response: true, request: false },
}); });
// 构造 IN tuple 列表(同 reversePullPatientMaster 风格) // 构造 cohort IN 子句(按是否配 tenant_key 分两种形态)
const tuples = pairs const cohortClause = this.buildCohortClause(cohort, cohortKeys);
.map(
(p) =>
`('${(p.patient_id ?? '').replace(/'/g, "''")}', '${(p.brand ?? '').replace(/'/g, "''")}')`,
)
.join(',');
const tables: Record<string, unknown[]> = {}; const tables: Record<string, unknown[]> = {};
const defaultLimit = source.default_limit ?? 100_000; const defaultLimit = source.default_limit ?? 100_000;
...@@ -323,8 +353,8 @@ export class ClickHouseSourceService { ...@@ -323,8 +353,8 @@ export class ClickHouseSourceService {
const sqlWithCursor = incCfg const sqlWithCursor = incCfg
? this.injectIncrementalCursor(sql, incCfg.cursorColumn, incCfg.cursorValue) ? this.injectIncrementalCursor(sql, incCfg.cursorColumn, incCfg.cursorValue)
: sql; : sql;
// 2. cohort 注入:在 WHERE 后追加 AND (patient_id, brand) IN (...) // 2. cohort 注入
const sqlWithCohort = this.injectCohortFilter(sqlWithCursor, tuples); const sqlWithCohort = this.injectCohortFilter(sqlWithCursor, cohortClause);
const sqlWithLimit = this.applyDefaultLimit(sqlWithCohort, defaultLimit); const sqlWithLimit = this.applyDefaultLimit(sqlWithCohort, defaultLimit);
this.logger.log( this.logger.log(
`[clickhouse·cohort] query "${tableName}" — ${sqlWithLimit.slice(0, 140).replace(/\s+/g, ' ')}...`, `[clickhouse·cohort] query "${tableName}" — ${sqlWithLimit.slice(0, 140).replace(/\s+/g, ' ')}...`,
...@@ -336,7 +366,6 @@ export class ClickHouseSourceService { ...@@ -336,7 +366,6 @@ export class ClickHouseSourceService {
this.logger.log( this.logger.log(
`[clickhouse·cohort] "${tableName}" → ${rows.length} 行,${Date.now() - started} ms`, `[clickhouse·cohort] "${tableName}" → ${rows.length} 行,${Date.now() - started} ms`,
); );
// 增量 cursor 推进(同 loadAllTables;cohort 模式下也累积,run_start cursor 会覆盖)
if (incremental && incCfg && rows.length > 0) { if (incremental && incCfg && rows.length > 0) {
const maxVal = this.computeMax(rows as Record<string, unknown>[], incCfg.cursorColumn); const maxVal = this.computeMax(rows as Record<string, unknown>[], incCfg.cursorColumn);
if (maxVal) { if (maxVal) {
...@@ -354,10 +383,25 @@ export class ClickHouseSourceService { ...@@ -354,10 +383,25 @@ export class ClickHouseSourceService {
return tables; return tables;
} }
/// 把 (patient_id, brand) IN tuple 过滤注入到已有 SQL 的 WHERE 末尾(无 WHERE 时新建) /// 按 cohort 配置构造 IN 子句(有 tenant_key → tuple 形态;无 → 单列形态)
private injectCohortFilter(sql: string, tuplesCsv: string): string { private buildCohortClause(
if (!tuplesCsv) return sql; // 空 cohort 不注入(理论上不该发生) cohort: NonNullable<ClickHouseSource['cohort']>,
const cohortClause = `(patient_id, brand) IN (${tuplesCsv})`; keys: ReadonlyArray<CohortKey>,
): string {
const esc = (s: string) => s.replace(/'/g, "''");
if (cohort.tenant_key_column) {
const tuples = keys
.map((k) => `('${esc(k.key)}', '${esc(k.tenant ?? '')}')`)
.join(',');
return `(${cohort.patient_key_column}, ${cohort.tenant_key_column}) IN (${tuples})`;
}
const list = keys.map((k) => `'${esc(k.key)}'`).join(',');
return `${cohort.patient_key_column} IN (${list})`;
}
/// 把 cohort IN 子句注入到已有 SQL 的 WHERE 末尾(无 WHERE 时新建)
private injectCohortFilter(sql: string, cohortClause: string): string {
if (!cohortClause) return sql; // 空 cohort 不注入(理论上不该发生)
// 找最外层 WHERE(若有);若已有 ORDER BY/LIMIT/GROUP BY,要插在它们之前 // 找最外层 WHERE(若有);若已有 ORDER BY/LIMIT/GROUP BY,要插在它们之前
const hasWhere = /\bWHERE\b/i.test(sql); const hasWhere = /\bWHERE\b/i.test(sql);
if (!hasWhere) { if (!hasWhere) {
......
...@@ -22,6 +22,7 @@ import { ...@@ -22,6 +22,7 @@ import {
} from './manifest.schema'; } from './manifest.schema';
import { import {
ClickHouseSourceService, ClickHouseSourceService,
type CohortKey,
type IncrementalConfig, type IncrementalConfig,
type PatientScope, type PatientScope,
} from './clickhouse-source.service'; } from './clickhouse-source.service';
...@@ -195,14 +196,22 @@ export class ColdImportService { ...@@ -195,14 +196,22 @@ export class ColdImportService {
const normalize = { amountUnit: manifest.amount_unit, timezone: manifest.timezone }; const normalize = { amountUnit: manifest.amount_unit, timezone: manifest.timezone };
// 6. 决定 cohort 模式: // 6. 决定 cohort 模式:
// - cohortBatchSize > 0 + ClickHouse 源 → 分批跑(内存友好,resume 友好) // - cohortBatchSize > 0 + ClickHouse 源 + 配了 cohort → 分批跑(内存友好,resume 友好)
// - 未设 / 文件源 → 单 shot(向后兼容,csv 模式数据量小不需分批) // - 未设 / 文件源 / 没配 cohort → 单 shot(向后兼容,csv 模式数据量小不需分批)
const cohortBatchSize = resolveCohortBatchSize(options.cohortBatchSize); const cohortBatchSize = resolveCohortBatchSize(options.cohortBatchSize);
if (cohortBatchSize && manifest.sql_source) { const canCohort = cohortBatchSize > 0 && !!manifest.sql_source?.cohort;
if (cohortBatchSize > 0 && manifest.sql_source && !manifest.sql_source.cohort) {
this.logger.warn(
`cohortBatchSize=${cohortBatchSize} manifest.sql_source.cohort 未配置 ` +
`退回 single-shot(要分批请在 manifest cohort )`,
);
}
if (canCohort && manifest.sql_source) {
const sqlSource = manifest.sql_source;
// ── cohort 模式 ── // ── cohort 模式 ──
// 6a. 列出全部 patient pair(增量 cursor 同样适用) // 6a. 列出全部 patient pair(增量 cursor 同样适用)
const allPairs = await this.chSource.listPatientPairs( const allPairs = await this.chSource.listPatientPairs(
manifest.sql_source, sqlSource,
incrementalConfig, incrementalConfig,
); );
if (allPairs.length === 0) { if (allPairs.length === 0) {
...@@ -539,7 +548,7 @@ export class ColdImportService { ...@@ -539,7 +548,7 @@ export class ColdImportService {
subjectCfgs: AssemblerConfig[]; subjectCfgs: AssemblerConfig[];
syncLogId: string | undefined; syncLogId: string | undefined;
dryRun: boolean; dryRun: boolean;
cohort: ReadonlyArray<{ patient_id: string; brand: string }> | null; cohort: ReadonlyArray<CohortKey> | null;
incrementalConfig: IncrementalConfig | undefined; incrementalConfig: IncrementalConfig | undefined;
/// 累加进调用方;由调用方维护(跨 cohort 全 run 汇总) /// 累加进调用方;由调用方维护(跨 cohort 全 run 汇总)
perResource: PerResourceStats[]; perResource: PerResourceStats[];
......
...@@ -79,6 +79,26 @@ export const ClickHouseSourceSchema = z.object({ ...@@ -79,6 +79,26 @@ export const ClickHouseSourceSchema = z.object({
), ),
}) })
.optional(), .optional(),
/// ⭐ Cohort 分批配置(PR2/PR4)— 宿主无关性的关键:
/// patient 列表来源 + 主键列 + 租户区分列全部声明在这,代码不硬编码任何表名/列名。
/// 不配 cohort → 不分批(single-shot,文件源 / 小数据集)。
/// 各 host 只改本段 + queries SQL,摄入流程代码零改动。
cohort: z
.object({
/// 列患者清单的源表全名(库.表);通常是 patient 主档表
/// 例:dw_group.fact_client_out
patient_list_from: z.string().min(1),
/// 患者主键列名 — 所有源表共用此列名做 (key) IN (...) cohort 过滤
/// 例:patient_id
patient_key_column: z.string().min(1).default('patient_id'),
/// 租户区分列名(可选)— 多 tenant per host 时用(jvs-dw 用 brand 区分瑞尔/瑞泰)
/// 配了 → cohort 用 (patient_key, tenant_key) 复合键;不配 → 仅 patient_key 单键
tenant_key_column: z.string().min(1).optional(),
/// 列患者时的增量 cursor 列(对应 patient_list_from 表的时间列)
/// 例:last_visit_time;不配 → 列患者不做增量过滤(总是全量列再 cohort 切)
list_cursor_column: z.string().min(1).optional(),
})
.optional(),
}); });
export type ClickHouseSource = z.infer<typeof ClickHouseSourceSchema>; export type ClickHouseSource = z.infer<typeof ClickHouseSourceSchema>;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment