Commit d3635199 by luoqi

perf(sync): PR5a — 并行化 CH query(load 阶段 ~4x)

loadTablesForCohort + loadAllTables 的 N 个 ClickHouse query 从串行
for-await 改 Promise.all 并行。

瓶颈分析:远程 DW 每 query 往返 170-580ms,5-6 个串行 = ~2.3s/批,
是 cohort batch 里 load 阶段的主要耗时。@clickhouse/client 基于 HTTP,
单 client 可并发多请求。

并发安全:
- cursorAdvances 在 Promise.all 前预初始化(避免并行 callback 各自 ?? {} 互相覆盖)
- 每个 tableName 写自己的 key,无竞争
- tables[tableName] 各写各的 key,无竞争

本地验证(2 批 × ~100 患者):
  并行前(串行累加):191+193+329+393+579+584 = ~2269ms/批
  并行后(取最慢):  max ≈ 584ms/批  → ~4x load 加速
  幂等正确:superseded=2 unchanged=285 evidence=1632(重读决策正确)

资源利用:之前 1 核串行等 IO,现在 6 query 并发打满网络/CH;
内存不变(本来就 hold 全部表)。

PR5b(可选,未做):cohort 级并行 worker pool(env PAC_COHORT_CONCURRENCY)
  — 多 cohort 并发处理,3-4x 吞吐;但需 bump prisma pool + checkpoint 改 count-based,
  风险略高,留作后续按需开。
parent 24a7ce2c
...@@ -83,16 +83,16 @@ export class ClickHouseSourceService { ...@@ -83,16 +83,16 @@ export class ClickHouseSourceService {
const defaultLimit = source.default_limit ?? 100_000; const defaultLimit = source.default_limit ?? 100_000;
try { try {
for (const [tableName, sql] of Object.entries(source.queries)) { // ⭐ PR5a:N 个 query 并行(同 loadTablesForCohort)
if (incremental) incremental.cursorAdvances = incremental.cursorAdvances ?? {};
await Promise.all(
Object.entries(source.queries).map(async ([tableName, sql]) => {
// W4 末:incremental 模式注入 cursor 条件 + cohort LIMIT 移除(增量不需要 dev cohort) // W4 末:incremental 模式注入 cursor 条件 + cohort LIMIT 移除(增量不需要 dev cohort)
const incCfg = incremental?.perQuery[tableName]; const incCfg = incremental?.perQuery[tableName];
const sqlWithIncremental = incCfg const sqlWithIncremental = incCfg
? this.injectIncrementalCursor(sql, incCfg.cursorColumn, incCfg.cursorValue) ? this.injectIncrementalCursor(sql, incCfg.cursorColumn, incCfg.cursorValue)
: sql; : sql;
const sqlWithLimit = this.applyDefaultLimit(sqlWithIncremental, defaultLimit); const sqlWithLimit = this.applyDefaultLimit(sqlWithIncremental, defaultLimit);
this.logger.log(
`[clickhouse] query "${tableName}" — ${sqlWithLimit.slice(0, 120).replace(/\s+/g, ' ')}...`,
);
const started = Date.now(); const started = Date.now();
const result = await client.query({ const result = await client.query({
query: sqlWithLimit, query: sqlWithLimit,
...@@ -105,14 +105,14 @@ export class ClickHouseSourceService { ...@@ -105,14 +105,14 @@ export class ClickHouseSourceService {
`[clickhouse] "${tableName}" → ${rows.length} 行,${elapsed} ms`, `[clickhouse] "${tableName}" → ${rows.length} 行,${elapsed} ms`,
); );
// W4 末:incremental 跑完算新 cursor(= max(cursor_column)) // W4 末:incremental 跑完算新 cursor(= max(cursor_column))
if (incremental && incCfg && rows.length > 0) { if (incremental?.cursorAdvances && incCfg && rows.length > 0) {
const maxVal = this.computeMax(rows as Record<string, unknown>[], incCfg.cursorColumn); const maxVal = this.computeMax(rows as Record<string, unknown>[], incCfg.cursorColumn);
if (maxVal) { if (maxVal) {
incremental.cursorAdvances = incremental.cursorAdvances ?? {};
incremental.cursorAdvances[tableName] = maxVal; incremental.cursorAdvances[tableName] = maxVal;
} }
} }
} }),
);
// ⭐ W4 末 反向拉主档(方案 C):增量模式下,fact 表拉来的 patient_id 不一定在 fact_client_out 里 // ⭐ W4 末 反向拉主档(方案 C):增量模式下,fact 表拉来的 patient_id 不一定在 fact_client_out 里
// (例:患者 EMR 编辑了 last_visit_time 不变,主档 cursor 拉不到他) // (例:患者 EMR 编辑了 last_visit_time 不变,主档 cursor 拉不到他)
...@@ -347,18 +347,19 @@ export class ClickHouseSourceService { ...@@ -347,18 +347,19 @@ export class ClickHouseSourceService {
const defaultLimit = source.default_limit ?? 100_000; const defaultLimit = source.default_limit ?? 100_000;
try { try {
for (const [tableName, sql] of Object.entries(source.queries)) { // ⭐ PR5a:N 个 query 并行(远程 DW 往返是瓶颈,串行 N×170ms → 并行 ~170ms)
// @clickhouse/client 基于 HTTP,单 client 可并发多请求;每个 query 独立 result
// 先初始化 cursorAdvances(避免并行 callback 各自 ?? {} 互相覆盖的竞态)
if (incremental) incremental.cursorAdvances = incremental.cursorAdvances ?? {};
const queryStart = Date.now();
await Promise.all(
Object.entries(source.queries).map(async ([tableName, sql]) => {
const incCfg = incremental?.perQuery[tableName]; const incCfg = incremental?.perQuery[tableName];
// 1. cursor 注入(同 loadAllTables 路径)
const sqlWithCursor = incCfg const sqlWithCursor = incCfg
? this.injectIncrementalCursor(sql, incCfg.cursorColumn, incCfg.cursorValue) ? this.injectIncrementalCursor(sql, incCfg.cursorColumn, incCfg.cursorValue)
: sql; : sql;
// 2. cohort 注入
const sqlWithCohort = this.injectCohortFilter(sqlWithCursor, cohortClause); const sqlWithCohort = this.injectCohortFilter(sqlWithCursor, cohortClause);
const sqlWithLimit = this.applyDefaultLimit(sqlWithCohort, defaultLimit); const sqlWithLimit = this.applyDefaultLimit(sqlWithCohort, defaultLimit);
this.logger.log(
`[clickhouse·cohort] query "${tableName}" — ${sqlWithLimit.slice(0, 140).replace(/\s+/g, ' ')}...`,
);
const started = Date.now(); const started = Date.now();
const result = await client.query({ query: sqlWithLimit, format: 'JSONEachRow' }); const result = await client.query({ query: sqlWithLimit, format: 'JSONEachRow' });
const rows = (await result.json()) as unknown[]; const rows = (await result.json()) as unknown[];
...@@ -366,17 +367,19 @@ export class ClickHouseSourceService { ...@@ -366,17 +367,19 @@ export class ClickHouseSourceService {
this.logger.log( this.logger.log(
`[clickhouse·cohort] "${tableName}" → ${rows.length} 行,${Date.now() - started} ms`, `[clickhouse·cohort] "${tableName}" → ${rows.length} 行,${Date.now() - started} ms`,
); );
if (incremental && incCfg && rows.length > 0) { // cursor advance(并发安全:cursorAdvances 已预初始化,每个 tableName 写自己的 key)
if (incremental?.cursorAdvances && incCfg && rows.length > 0) {
const maxVal = this.computeMax(rows as Record<string, unknown>[], incCfg.cursorColumn); const maxVal = this.computeMax(rows as Record<string, unknown>[], incCfg.cursorColumn);
if (maxVal) { if (maxVal) {
incremental.cursorAdvances = incremental.cursorAdvances ?? {};
const prev = incremental.cursorAdvances[tableName]; const prev = incremental.cursorAdvances[tableName];
if (!prev || maxVal > prev) { if (!prev || maxVal > prev) {
incremental.cursorAdvances[tableName] = maxVal; incremental.cursorAdvances[tableName] = maxVal;
} }
} }
} }
} }),
);
this.logger.debug(`[clickhouse·cohort] ${Object.keys(source.queries).length} query 并行完成 ${Date.now() - queryStart}ms`);
} finally { } finally {
await client.close(); await client.close();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment