Commit 4c93c669 by luoqi

feat(alerting): 企微 webhook 告警 + 增量空转探针 + DW 滞后告警接通

背景:增量游标 ISO 格式 bug 空转三天才被人工发现 — "success+fetched=0" 无人知晓。

- AlertService:识别企微 webhook(qyapi.weixin.qq.com)→ markdown 格式(级别 emoji/颜色,
  context 引用块,body 截断);企微 HTTP 永远 200 → 检查 body.errcode 才算送达。
- 增量空转探针(ColdImportService 收尾):带游标的增量跑出 0 写入时,反查 DW
  「比游标新的行数」— DW 也 0 = 真没数据(静默);DW>0 = 拉不到但有 → 当天 critical 告警。
  探针(ClickHouseSourceService.probeNewRowCounts)任何异常吞掉,绝不影响同步主流程。
- DwLagMonitorService:🟡/🔴 滞后从"只打日志"接到 AlertService(企微同收)。

验证:本地用编译产物真发企微群,payload 被接受(无 errcode 报错)。tsc 0。

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
parent 7375f010
...@@ -37,18 +37,26 @@ export class AlertService { ...@@ -37,18 +37,26 @@ export class AlertService {
if (webhookUrl) { if (webhookUrl) {
try { try {
await fetch(webhookUrl, { const payload = webhookUrl.includes('qyapi.weixin.qq.com')
? buildWecomPayload(alert)
: {
level: alert.level,
title: alert.title,
body: alert.body,
context: alert.context,
timestamp: new Date().toISOString(),
};
const res = await fetch(webhookUrl, {
method: 'POST', method: 'POST',
headers: { 'Content-Type': 'application/json' }, headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ body: JSON.stringify(payload),
level: alert.level,
title: alert.title,
body: alert.body,
context: alert.context,
timestamp: new Date().toISOString(),
}),
signal: AbortSignal.timeout(5_000), signal: AbortSignal.timeout(5_000),
}); });
// 企微即使格式错也回 HTTP 200,错误在 body.errcode — 必须看 body
const text = await res.text();
if (!res.ok || /"errcode":\s*[1-9]/.test(text)) {
this.logger.error(`Alert webhook rejected: HTTP ${res.status} ${text.slice(0, 160)}`);
}
} catch (err) { } catch (err) {
this.logger.error( this.logger.error(
`Failed to deliver alert webhook: ${err instanceof Error ? err.message : String(err)}`, `Failed to deliver alert webhook: ${err instanceof Error ? err.message : String(err)}`,
...@@ -57,3 +65,25 @@ export class AlertService { ...@@ -57,3 +65,25 @@ export class AlertService {
} }
} }
} }
/// 企微群机器人 payload(markdown)。颜色:企微只支持 info绿/comment灰/warning橙 → 级别用 emoji 区分。
/// 内容上限 4096 字节 → body 截断;context 逐行附在引用块里。
function buildWecomPayload(alert: Alert): Record<string, unknown> {
const icon = alert.level === 'critical' ? '🔴' : alert.level === 'warning' ? '🟡' : '🟢';
const color = alert.level === 'info' ? 'info' : 'warning';
const ctxLines = alert.context
? Object.entries(alert.context)
.map(([k, v]) => `> ${k}: ${typeof v === 'string' ? v : JSON.stringify(v)}`)
.join('\n')
: '';
const body = alert.body.length > 1500 ? `${alert.body.slice(0, 1500)}…` : alert.body;
const content = [
`${icon} **<font color="${color}">[PAC ${alert.level.toUpperCase()}] ${alert.title}</font>**`,
body,
ctxLines,
`> time: ${new Date().toISOString()}`,
]
.filter(Boolean)
.join('\n');
return { msgtype: 'markdown', markdown: { content } };
}
...@@ -105,6 +105,44 @@ export class ClickHouseSourceService { ...@@ -105,6 +105,44 @@ export class ClickHouseSourceService {
return { url, database, username, password }; return { url, database, username, password };
} }
/// 增量空转探针:按(已规范化的)游标去 DW 数"比游标新的行数"。
/// 用途:增量 fetched=0 时分辨「DW 真没新数据(静默)」vs「查询条件失效在空转(告警)」
/// —— 2026-06-10 游标 ISO 格式 bug 让增量空转三天才被人工发现,此探针让它当天就暴露。
/// 探针绝不能影响同步主流程:任何异常吞掉返回 {}。
async probeNewRowCounts(
source: ClickHouseSource,
perQuery: IncrementalConfig['perQuery'],
): Promise<Record<string, number>> {
const out: Record<string, number> = {};
try {
const conn = this.resolveConnection(source);
const client = createClient({
url: conn.url,
database: conn.database,
username: conn.username,
password: conn.password,
request_timeout: 30_000,
});
try {
for (const [tableName, cfg] of Object.entries(perQuery)) {
if (!cfg.cursorValue) continue;
const sql = source.queries[tableName];
const m = sql?.match(/FROM\s+([\w.]+)/i);
if (!m) continue;
const q = `SELECT count(*) AS c FROM ${m[1]} WHERE ${cfg.cursorColumn} > '${cfg.cursorValue.replace(/'/g, "''")}'`;
const rs = await client.query({ query: q, format: 'JSONEachRow' });
const rows = (await rs.json()) as Array<{ c: string | number }>;
out[tableName] = Number(rows[0]?.c ?? 0);
}
} finally {
await client.close();
}
} catch (e) {
this.logger.warn(`[probe] 空转探针失败(忽略): ${e instanceof Error ? e.message : String(e)}`);
}
return out;
}
async loadAllTables( async loadAllTables(
source: ClickHouseSource, source: ClickHouseSource,
incremental?: IncrementalConfig, incremental?: IncrementalConfig,
......
...@@ -33,6 +33,7 @@ import { ...@@ -33,6 +33,7 @@ import {
type IncrementalConfig, type IncrementalConfig,
type PatientScope, type PatientScope,
} from './clickhouse-source.service'; } from './clickhouse-source.service';
import { AlertService } from '../../../common/alerting/alert.service';
import { TransformEngine } from '../transforms/transform-engine'; import { TransformEngine } from '../transforms/transform-engine';
import { buildTenantResolver, type TenantResolver } from './tenant-resolver'; import { buildTenantResolver, type TenantResolver } from './tenant-resolver';
import { mergePatientPreferences } from '../pipeline/patient-preferences.util'; import { mergePatientPreferences } from '../pipeline/patient-preferences.util';
...@@ -63,6 +64,7 @@ export class ColdImportService { ...@@ -63,6 +64,7 @@ export class ColdImportService {
private readonly assembler: AssemblerEngine, private readonly assembler: AssemblerEngine,
private readonly chSource: ClickHouseSourceService, private readonly chSource: ClickHouseSourceService,
private readonly transformEngine: TransformEngine, private readonly transformEngine: TransformEngine,
private readonly alerter: AlertService,
) {} ) {}
/** /**
...@@ -824,6 +826,31 @@ export class ColdImportService { ...@@ -824,6 +826,31 @@ export class ColdImportService {
}, },
}); });
} }
// ── 增量空转探针(2026-06-10 游标格式 bug 空转三天才被发现的教训)──
// fetched=0 且确实带游标跑的增量 → 反查 DW「比游标新的行数」:
// DW 也是 0 → 真没新数据,静默;DW > 0 → PAC 拉不到但 DW 有 → 当天告警(企微)。
const usedCursor =
incrementalConfig &&
Object.values(incrementalConfig.perQuery).some((c) => c.cursorValue !== null);
if (!options.dryRun && usedCursor && totals.transactionsWritten + totals.duplicates === 0 && manifest.sql_source) {
const counts = await this.chSource.probeNewRowCounts(
manifest.sql_source,
incrementalConfig!.perQuery,
);
const missed = Object.entries(counts).filter(([, n]) => n > 0);
if (missed.length > 0) {
await this.alerter.send({
level: 'critical',
title: '增量空转:PAC 拉到 0 行,但 DW 有新数据',
body:
`本次增量 fetched=0,但按当前游标反查 DW 仍有未拉取的行 — ` +
`大概率是游标条件/格式失效(参照 2026-06-10 ISO 游标字典序 bug)。\n` +
missed.map(([t, n]) => `${t}: ${n} 行`).join('\n'),
context: { host: host.name, syncLogId: syncLog?.id ?? '' },
});
}
}
} }
const result: ImportRunResult = { const result: ImportRunResult = {
......
import { Injectable, Logger } from '@nestjs/common'; import { Injectable, Logger } from '@nestjs/common';
import { Cron } from '@nestjs/schedule'; import { Cron } from '@nestjs/schedule';
import { PrismaService } from '../prisma/prisma.service'; import { PrismaService } from '../prisma/prisma.service';
import { AlertService } from '../common/alerting/alert.service';
/** /**
* DwLagMonitorService — 每小时检查 DW 增量数据滞后(W4 末) * DwLagMonitorService — 每小时检查 DW 增量数据滞后(W4 末)
...@@ -21,7 +22,10 @@ import { PrismaService } from '../prisma/prisma.service'; ...@@ -21,7 +22,10 @@ import { PrismaService } from '../prisma/prisma.service';
export class DwLagMonitorService { export class DwLagMonitorService {
private readonly logger = new Logger(DwLagMonitorService.name); private readonly logger = new Logger(DwLagMonitorService.name);
constructor(private readonly prisma: PrismaService) {} constructor(
private readonly prisma: PrismaService,
private readonly alerter: AlertService,
) {}
/// DW 数据滞后告警 cron — env 驱动 /// DW 数据滞后告警 cron — env 驱动
/// PAC_LAG_MONITOR_CRON: /// PAC_LAG_MONITOR_CRON:
...@@ -70,9 +74,20 @@ export class DwLagMonitorService { ...@@ -70,9 +74,20 @@ export class DwLagMonitorService {
const msg = `dw-lag: ${tag} host=${host.name} max_cursor=${new Date(maxCursorTs).toISOString()} lag=${diffHours.toFixed(1)}h (warn=${warnH} error=${errorH})`; const msg = `dw-lag: ${tag} host=${host.name} max_cursor=${new Date(maxCursorTs).toISOString()} lag=${diffHours.toFixed(1)}h (warn=${warnH} error=${errorH})`;
if (diffHours > errorH) { if (diffHours > errorH) {
this.logger.error(msg); this.logger.error(msg);
// TODO W5+: 接 alerting service(webhook/email/钉钉) — 现在 log ERROR 让 ops 看 await this.alerter.send({
level: 'critical',
title: 'DW 数据严重滞后',
body: `增量游标已 ${diffHours.toFixed(1)} 小时未推进(阈值 ${errorH}h)— 事件可能漏召。`,
context: { host: host.name, maxCursor: new Date(maxCursorTs).toISOString() },
});
} else if (diffHours > warnH) { } else if (diffHours > warnH) {
this.logger.warn(msg); this.logger.warn(msg);
await this.alerter.send({
level: 'warning',
title: 'DW 数据滞后(注意)',
body: `增量游标已 ${diffHours.toFixed(1)} 小时未推进(阈值 ${warnH}h)— DW 可能没刷新。`,
context: { host: host.name, maxCursor: new Date(maxCursorTs).toISOString() },
});
} else { } else {
this.logger.log(msg); this.logger.log(msg);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment