Commit d9a77a65 by luoqi

feat(sync): 自动同步开关挪进 manifest(auto_sync)— 消除 PAC_INCREMENTAL_HOSTS drift 坑

问题:每日增量同步的 host 清单写在 env(PAC_INCREMENTAL_HOSTS=jvs-dw)。接入新 host
(manifest + 冷启都做了)却忘了改这条 env → 静默不被 cron 同步,不报错。env 跟 host
配置分离,易 drift。

改:把"是否自动增量"声明进 host 自己的 manifest(顶层 auto_sync: bool,默认 false)。
- manifest.schema 加 auto_sync 字段。
- ColdImportService.discoverAutoSyncHostDirs(dataDir):扫各 host 子目录 manifest,
  返回 auto_sync=true 的目录名(宽松:无 manifest/解析失败/无 flag 跳过)。
- scheduler:env 设了 → 显式 override(escape hatch);未设 → 自动发现。无 host → warn+skip。
- jvs-dw manifest 置 auto_sync: true;.env.example PAC_INCREMENTAL_HOSTS 改为可选/默认空。

效果:接入新 host 只在其 manifest 置 auto_sync: true 即纳入每晚同步,不碰 env。
验证:real data/ → [jvs-dw];synthetic(有flag/无flag/无manifest/坏yaml)→ 只 [hostA]。
tsc 0,全量 89 测试通过。

注:服务器 .env 现仍有 PAC_INCREMENTAL_HOSTS=jvs-dw(override 生效,行为不变);
要切到 manifest 自动发现,部署时清空该 env 即可。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
parent 2cb3710c
......@@ -113,8 +113,11 @@ DW_CLICKHOUSE_PASSWORD=
# production: 30 2 * * * (02:30 — DW 02:00 完成全量后 30 分钟)
PAC_INCREMENTAL_CRON=
# 多 host 用逗号分隔(目前只一个 jvs-dw)
PAC_INCREMENTAL_HOSTS=jvs-dw
# 自动增量同步的 host:【可选】。
# 默认(留空)→ 自动发现 data/<host>/manifest.yaml 中 `auto_sync: true` 的 host
# —— 接入新 host 只在其 manifest 置 auto_sync,不必动本 env(防 drift / 漏配静默不同步)。
# 设值(逗号分隔)→ 显式 override,只同步列出的 host(staging 错峰 / 临时只跑某 host 用)。
PAC_INCREMENTAL_HOSTS=
# 其他 cron(同步策略:env 不设 → 不跑)
# local: 全部不设
......
......@@ -40,6 +40,7 @@
host_name: jvs-dw
display_name: 瑞尔集团 DW(单 host 多 brand)
auto_sync: true # 纳入每日自动增量同步(scheduler 自动发现;取代 PAC_INCREMENTAL_HOSTS env)
# ── tenant 路由(临时方案,W3 末)──
# 理想态:DW 所有 ADS 表都带 tenant_id UUID 字段,PAC 直接 tenant_field=tenant_id pass-through。
......
......@@ -1189,6 +1189,29 @@ export class ColdImportService {
// Manifest + assembler + raw tables 读盘
// ─────────────────────────────────────────────────────────
/// 扫 dataDir 下各 host 子目录的 manifest,返回 `auto_sync: true` 的【目录名】列表。
/// 给 SyncIncrementalSchedulerService 自动发现"该每晚自动拉的 host" —— 取代 env 清单,
/// 让"是否自动同步"跟 host 配置同处声明,接入新 host 不必再改 env(防 drift)。
/// 宽松:无 manifest / 解析失败 / auto_sync ≠ true 的目录跳过(完整校验在 importDirectory 时做)。
discoverAutoSyncHostDirs(dataDir: string): string[] {
if (!fs.existsSync(dataDir)) return [];
const out: string[] = [];
for (const entry of fs.readdirSync(dataDir, { withFileTypes: true })) {
if (!entry.isDirectory()) continue;
const mPath = path.join(dataDir, entry.name, 'manifest.yaml');
if (!fs.existsSync(mPath)) continue;
try {
const parsed = yaml.load(fs.readFileSync(mPath, 'utf-8')) as
| { auto_sync?: unknown }
| undefined;
if (parsed?.auto_sync === true) out.push(entry.name);
} catch {
// 解析失败跳过 — 不阻塞其他 host 的发现
}
}
return out.sort();
}
private readManifest(dir: string): ColdImportManifest {
const p = path.join(dir, 'manifest.yaml');
if (!fs.existsSync(p)) throw new Error(`manifest.yaml not found at ${p}`);
......
......@@ -108,6 +108,12 @@ export const ColdImportManifestSchema = z
host_name: z.string().min(1),
display_name: z.string().optional(),
/// 是否纳入每日自动增量同步(SyncIncrementalSchedulerService 启动时自动发现)。
/// 默认 false:host 接入(冷启)后,显式置 true 才会被每晚 cron 自动拉。
/// 取代旧 `PAC_INCREMENTAL_HOSTS` env 清单 —— 声明式、跟 host 配置同处、不会 drift
///(接入新 host 忘了改 env 导致静默不同步的坑)。env 仍可作显式 override(见 scheduler)。
auto_sync: z.boolean().optional().default(false),
/// **单 tenant 场景**:静态 tenant_id,所有行共用一个 tenant
tenant_id: z.string().min(1).optional(),
......
......@@ -21,7 +21,8 @@ import { PlanEngineService } from '../modules/plan/engine/plan-engine.service';
* 2. Persona 重算(affected patient — 本次 sync 写入的 distinct patient_id)
* 3. Plan 重算(per tenant,scenario SQL + 6 因子打分)
*
* 多 host:循环 PAC_INCREMENTAL_HOSTS env(逗号分隔,默认 'jvs-dw')
* 多 host:默认自动发现 data/<host>/manifest.yaml 中 `auto_sync: true` 的 host;
* PAC_INCREMENTAL_HOSTS env(逗号分隔)若设则作显式 override(escape hatch)。
* 每个 host 期望 data/<host>/manifest.yaml 存在
*
* 跑失败:
......@@ -49,12 +50,24 @@ export class SyncIncrementalSchedulerService {
name: 'sync-incremental-daily',
})
async runDaily(): Promise<void> {
const hosts = (process.env.PAC_INCREMENTAL_HOSTS ?? 'jvs-dw')
const dataDir = process.env.PAC_INCREMENTAL_DATA_DIR ?? path.resolve(__dirname, '../../data');
// host 来源:env 显式 override(逗号分隔)优先;未设 → 自动发现 manifest.auto_sync=true。
// 自动发现取代旧 `PAC_INCREMENTAL_HOSTS=jvs-dw` 默认值,接入新 host 只改其 manifest,不碰 env(防 drift)。
const envHosts = (process.env.PAC_INCREMENTAL_HOSTS ?? '')
.split(',')
.map((h) => h.trim())
.filter(Boolean);
const dataDir = process.env.PAC_INCREMENTAL_DATA_DIR ?? path.resolve(__dirname, '../../data');
this.logger.log(`sync-incremental: START hosts=[${hosts.join(',')}] dataDir=${dataDir}`);
const hosts = envHosts.length > 0 ? envHosts : this.coldImport.discoverAutoSyncHostDirs(dataDir);
const source = envHosts.length > 0 ? 'env-override' : 'manifest auto_sync';
if (hosts.length === 0) {
this.logger.warn(
`sync-incremental: 无 host 可同步(env PAC_INCREMENTAL_HOSTS 未设,且 ${dataDir} 下无 manifest.auto_sync=true);skip`,
);
return;
}
this.logger.log(
`sync-incremental: START hosts=[${hosts.join(',')}] (source=${source}) dataDir=${dataDir}`,
);
for (const host of hosts) {
try {
await this.runOne(path.join(dataDir, host));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment