Commit c5ffe5ca by luoqi

fix(sync): CH query 加瞬时错误重试(远程 DW 抖动健壮性)

服务器全量 sync 在 batch 180/260 撞远程阿里云 DW 瞬时错误
"Response from the Engine was empty" → 整跑 failed(数据已提交 180 批,
锁正常释放,cursor 因 status=failed 被过滤不误推进 — 都按设计работает)。

根因:一次全量 260 批 × 6 query = 1560 次远程查询,只要一次网络抖动就挂。

修复:queryJsonWithRetry —— 对瞬时错误(empty/timeout/reset/socket/5xx)
指数退避重试 3 次(0.5/1.5/4.5s);确定性错误(SQL/权限)不重试快速失败。
应用到 3 个热路径:listPatientPairs / loadTablesForCohort / loadAllTables。
(reversePull / 单患者 refresh 数据量小,暂不包)
parent d849b1b2
......@@ -42,6 +42,46 @@ export interface PatientScope {
export class ClickHouseSourceService {
private readonly logger = new Logger(ClickHouseSourceService.name);
/**
* 带重试的 CH 查询 — 远程 DW(阿里云 ADS 公网出口)偶发瞬时错误:
* "Response from the Engine was empty" / 连接 reset / 超时。
* 一次性大全量(260 批 × 6 query = 1560 次)里只要一次抖动就整跑失败,
* 故对每个 query 加指数退避重试(默认 3 次:0.5s / 1.5s / 4.5s)。
*
* 只重试"瞬时"类错误;SQL 语法错等确定性错误不重试(立即抛)。
*/
private async queryJsonWithRetry(
client: ClickHouseClient,
sql: string,
label: string,
maxAttempts = 3,
): Promise<unknown[]> {
let lastErr: unknown;
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
const result = await client.query({ query: sql, format: 'JSONEachRow' });
return (await result.json()) as unknown[];
} catch (err) {
lastErr = err;
const msg = err instanceof Error ? err.message : String(err);
// 确定性错误(SQL 错 / 权限)不重试 — 重试也没用,快速失败
const transient =
/empty|timeout|timed out|ECONNRESET|socket|network|EAI_AGAIN|503|502|too many|reset by peer/i.test(
msg,
);
if (!transient || attempt === maxAttempts) {
throw err;
}
const backoffMs = 500 * Math.pow(3, attempt - 1); // 500 / 1500 / 4500
this.logger.warn(
`[clickhouse] "${label}" 第 ${attempt}/${maxAttempts} 次失败(瞬时:${msg.slice(0, 80)}),${backoffMs}ms 后重试`,
);
await new Promise((r) => setTimeout(r, backoffMs));
}
}
throw lastErr;
}
/// W4 末:env 覆盖 manifest connection(便于多环境部署 不必改 yaml)
/// 优先级:env → manifest → 抛错
private resolveConnection(source: ClickHouseSource): {
......@@ -94,11 +134,7 @@ export class ClickHouseSourceService {
: sql;
const sqlWithLimit = this.applyDefaultLimit(sqlWithIncremental, defaultLimit);
const started = Date.now();
const result = await client.query({
query: sqlWithLimit,
format: 'JSONEachRow',
});
const rows = (await result.json()) as unknown[];
const rows = await this.queryJsonWithRetry(client, sqlWithLimit, tableName);
const elapsed = Date.now() - started;
tables[tableName] = rows;
this.logger.log(
......@@ -294,8 +330,7 @@ export class ClickHouseSourceService {
const sql = `SELECT DISTINCT ${selectCols} FROM ${patient_list_from}${whereSql} ORDER BY ${patient_key_column}`;
this.logger.log(`[clickhouse·cohort] list patient keys — ${sql.slice(0, 200)}`);
const started = Date.now();
const result = await client.query({ query: sql, format: 'JSONEachRow' });
const rows = (await result.json()) as Array<Record<string, unknown>>;
const rows = (await this.queryJsonWithRetry(client, sql, 'list-patient-keys')) as Array<Record<string, unknown>>;
const keys: CohortKey[] = rows.map((r) => ({
key: String(r[patient_key_column] ?? ''),
tenant: tenant_key_column ? String(r[tenant_key_column] ?? '') : undefined,
......@@ -361,8 +396,7 @@ export class ClickHouseSourceService {
const sqlWithCohort = this.injectCohortFilter(sqlWithCursor, cohortClause);
const sqlWithLimit = this.applyDefaultLimit(sqlWithCohort, defaultLimit);
const started = Date.now();
const result = await client.query({ query: sqlWithLimit, format: 'JSONEachRow' });
const rows = (await result.json()) as unknown[];
const rows = await this.queryJsonWithRetry(client, sqlWithLimit, `cohort:${tableName}`);
tables[tableName] = rows;
this.logger.log(
`[clickhouse·cohort] "${tableName}" → ${rows.length} 行,${Date.now() - started} ms`,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment