Commit ddeb2661 by luoqi

docs(deployment): 对齐 sync harness 重构(PR1-5b + 统一 sync 入口)

deployment-data-ingest.md 更新到当前实现:
- §2.2 加 sync 资源调优 env(COHORT_BATCH_SIZE/CONCURRENCY/WRITE_BATCH_SIZE)
  + Docker Compose --env-file 必带 + connection_limit 说明
- §2.4 加 docker compose prod 部署(--env-file + pac-migrate/pac-service 两镜像都要 rebuild)
- §3 重写数据摄入 SOP:
  · 3.1 cold-import → 统一 pnpm sync 首跑(cohort 分批 + 并行,~1.5-2h vs 旧 9-12h)
  · 3.2 增量走同一 importDirectory(并发锁 + run_start cursor)
  · 3.3 幂等 DB 双 UNIQUE(含补的 source_event partial unique)+ cohort 宿主无关
  · 3.4 新增资源调优表(cohort/concurrency/write-batch/query 并行)
- §7.3 cursor 倒退加 --full;§7.4 新增 stale running 锁清理 SOP
- §8 已知边界更新(去掉过时的 LIMIT 100;加 COPY/看门狗/并行锁)
- 一句话归纳重写

反映:cold-import/sync-incremental 合并;cohort batch(PR2);
统一 mode(PR3);bulk write(PR4);query 并行(PR5a);cohort 并行(PR5b);
source_event partial unique 补建;宿主无关 cohort 配置。
parent 45255896
......@@ -100,11 +100,22 @@ PAC_INCREMENTAL_CRON=30 2 * * * # 每天 02:30(DW 02:00 刷完后)
PAC_INCREMENTAL_HOSTS=jvs-dw # 多 host 用逗号分隔
PAC_INCREMENTAL_DATA_DIR=/opt/pac/apps/pac-service/data
# ─── Sync 资源调优(全可选,不设走默认)───
PAC_COHORT_BATCH_SIZE=5000 # 按患者分批(内存友好);0 = single-shot
PAC_COHORT_CONCURRENCY=1 # cohort 并发度;1=串行(默认稳),3-4=并行榨资源
PAC_WRITE_BATCH_SIZE=1000 # 每批 createMany/bulkWrite 行数;0 = per-row
# ─── 数据滞后告警阈值 ───
PAC_LAG_WARN_HOURS=24 # > 24h 黄色 WARN
PAC_LAG_ERROR_HOURS=48 # > 48h 红色 ERROR
```
> ⚠️ **Docker Compose 部署必带 `--env-file apps/pac-service/.env`**:
> compose 的 `${POSTGRES_PASSWORD}` 等变量插值从 *项目根 .env / shell* 读,**不读** service 的 `env_file`。
> 漏了 `--env-file` → 密码插值成默认 `pac` → migrate / 连库认证失败。
> `connection_limit`:并行 sync(concurrency>1)时 `DATABASE_URL` 加 `?...&connection_limit=20`,
> 否则并发 cohort 抢连接排队。
### 2.3 起 Postgres + Redis
```bash
......@@ -134,80 +145,130 @@ cd apps/pac-service && node --enable-source-maps dist/src/main &
cd apps/pac-web && pnpm start &
```
**Docker Compose 部署(prod,服务器实际用法)**:
```bash
cd ~/pac
# ⚠️ 全程带 --env-file(否则 ${POSTGRES_PASSWORD} 插值成默认值 → 认证失败)
# ⚠️ 代码更新后,pac-migrate 跟 pac-service 是【两个独立镜像】,都要 rebuild
docker compose --env-file apps/pac-service/.env -f docker-compose.prod.yml build pac-migrate pac-service
docker compose --env-file apps/pac-service/.env -f docker-compose.prod.yml up -d
# pac-migrate(restart:no)起来自动跑 prisma migrate deploy 后退出;pac-service 依赖它健康
# 验证迁移:docker compose ... logs pac-migrate | grep "migrations have been applied"
```
---
## 三、数据摄入 SOP
### 3.1 首次冷启动(Cold Import)— 把全量历史拉进 PAC
> **统一 sync 入口(2026-05-28 重构)**:存量 / 增量合并为单一 `pnpm sync`。
> 首跑(cursor 空)= 等价全量;之后 cron 增量。`pnpm cold-import` 保留为 legacy
> 「忽略 cursor 强制全量」alias,日常用 `pnpm sync`。
### 3.1 首次全量(`pnpm sync` 首跑)— 把全量历史拉进 PAC
**用途**:首次部署 / 重建库 / yaml 改大版本时
**配置**:`apps/pac-service/data/jvs-dw/manifest.yaml``sql_source.queries` 各表 SQL。Dev 期间带 `LIMIT 100 OFFSET 100` 控量,**生产首跑前必须去掉 LIMIT**
**机制**:cursor 为空 → 等价全量。**按患者 cohort 分批**(默认 5000/批),每批
load → transform → assemble → bulk write → 释放内存 → 下一批。内存恒定 ~500MB-1GB,
**不会 OOM**(对比旧 cold-import 一次性 5-10GB 撞墙)。
```bash
cd /opt/pac/apps/pac-service
pnpm cold-import -- --dir=./data/jvs-dw
# → 患者 100% 拉 / facts 全量 / 各表去重
# 预估:13 万 patients ≈ 10-30 min,内存峰值 ~2GB
# docker 部署(prod)— 必带 --env-file
cd ~/pac
docker compose --env-file apps/pac-service/.env -f docker-compose.prod.yml exec -T \
-e PAC_COHORT_CONCURRENCY=3 \
pac-service pnpm sync:prod -- --dir=./data/jvs-dw --cohort-batch=500 --no-recompute
# 本地 / 裸机
PAC_COHORT_CONCURRENCY=3 pnpm sync -- --dir=./data/jvs-dw --cohort-batch=5000 --no-recompute
```
参数:
- `--cohort-batch=N` — 每批 N 个患者(默认 5000;远程 DW 慢可调小 500 看进度更细)
- `PAC_COHORT_CONCURRENCY=3` — 3 个 cohort 并发(榨资源;默认 1 串行);需 `connection_limit` 配套
- `--no-recompute` — 只拉数据,persona/plan 单独跑(大数据量分阶段更可控)
**预估(13 万 patients,远程 DW)**:concurrency=3 ≈ 1.5-2h;内存峰值 ~800MB。
(对比旧串行 cold-import 9-12h 且撞 OOM/磁盘满)
**verify**:
```sql
SELECT 'patients' AS t, count(*) FROM patients
UNION ALL SELECT 'facts', count(*) FROM patient_facts
UNION ALL SELECT 'failed_in_sync_log', failed FROM sync_logs ORDER BY started_at DESC LIMIT 1;
-- failed 应当 = 0
UNION ALL SELECT 'tx', count(*) FROM patient_transactions
UNION ALL SELECT 'facts', count(*) FROM patient_facts;
-- 完整性自检:应当都 = 0
SELECT count(*) AS dup_tx FROM (SELECT source_event_id,host_id,tenant_id FROM patient_transactions
WHERE source_event_id IS NOT NULL GROUP BY 1,2,3 HAVING count(*)>1) x;
SELECT count(*) AS multi_active FROM (SELECT subject_id,host_id,tenant_id FROM patient_facts
WHERE status='active' GROUP BY 1,2,3 HAVING count(*)>1) y;
```
冷启完成后,**手动触发一次 persona + plan**:
全量完成后,**手动触发 persona + plan**(增量模式 cron 会自动联动,首跑用 --no-recompute 时手动补):
```bash
pnpm recompute-persona -- --host=jvs-dw
pnpm recompute-plans -- --host=jvs-dw
pnpm recompute-persona:prod -- --host=jvs-dw # 13万 ~45min
pnpm recompute-plans:prod -- --host=jvs-dw # ~10min
```
### 3.2 日级增量(Incremental Sync)— 自动跑,无需人工
### 3.2 日级增量(自动跑,无需人工)
**机制**:`SyncIncrementalSchedulerService` `@Cron` 注解,服务起来自动注册。
**机制**:`SyncIncrementalSchedulerService` `@Cron`,服务起来自动注册,走同一个
`importDirectory`(存量增量同一套代码)。
```
每天 02:30(可改 PAC_INCREMENTAL_CRON env)
每天 02:30(PAC_INCREMENTAL_CRON)
ColdImportService.importDirectory({ incremental: true })
- 读 sync_logs 上次 cursor_after → SQL 注入 WHERE updated_date > '...'
- 拉到新/改的 fact 行 + 反向拉主档(C 方案)+ stub auto-create(A 方案)
- 写新 cursor 到 sync_logs
importDirectory({ incremental: true })
- 并发锁:sync_logs partial UNIQUE (host_id) WHERE status='running'
同 host 已在跑 → skip(SyncAlreadyRunningError),等下次 cron
- 读 sync_logs 上次 cursor_after → SQL WHERE updated_date > cursor(增量;数据量小通常 1 批)
- 拉新/改 fact + 反向拉主档(C 方案)+ stub auto-create(A 方案)
- ⭐ 写 cursor_after = run_start ISO(不是 max(updated_date))
保证跑期间 DW 任何新写入,下次都能捞回(不漏批次间变更)
Persona recompute(本次涉及 distinct patient)
Persona recompute(本次涉及 distinct patient)
Plan recompute(per tenant — SQL 召回 + 6 因子打分)
Sync log status=success,带每表 cursor_after JSON
```
**结果可见**:DW 今天的新数据 → 明天早晨 03:00 ± 客服工作台可见。
**结果可见**:DW 今天新数据 → 明早 03:00 ± 客服工作台可见。
**手动触发**(debug / 紧急补跑):
```bash
pnpm sync-incremental -- --dir=./data/jvs-dw
# 跳过 persona/plan(只拉数据):
pnpm sync-incremental -- --dir=./data/jvs-dw --no-recompute
# dry-run(不写库,看 SQL cursor 注入预览):
pnpm sync-incremental -- --dir=./data/jvs-dw --dry-run
pnpm sync -- --dir=./data/jvs-dw # 增量 + 联动重算
pnpm sync -- --dir=./data/jvs-dw --no-recompute # 只拉数据
pnpm sync -- --dir=./data/jvs-dw --dry-run # 不写库,看 cursor 注入
pnpm sync -- --dir=./data/jvs-dw --full # 忽略 cursor 强制全量(灾后)
```
### 3.3 数据完整性保证 — A+C 双修
### 3.3 数据完整性保证
**A:Patient Stub Auto-Create**
- 增量拉 fact 但 patient 主档 cursor 没动 → 用 `(hostId, tenantId, externalId)` 三段建空 stub
- 真实主档 upsert 时填上姓名/电话(W4 加固)
**幂等(DB 双 UNIQUE)** — 任何并发 / 重跑都一致:
- `patient_transactions` partial UNIQUE `(host_id, tenant_id, source_event_id) WHERE NOT NULL`
(migration 20260528000002 补;之前漏建,createMany skipDuplicates 曾失效)
- `patient_facts` UNIQUE `(host_id, tenant_id, subject_id, version)` + active partial UNIQUE
**C:反向拉主档**
- 每次跑完 fact 表,收集 `(patient_id, brand)` 集合 → 追加 `WHERE (patient_id, brand) IN (...)` 强拉主档
- 跟 fact 同 run,主档跟事实 100% 同步
**source_event_id 含 updatedAt** → DW 行级 in-place 更新自动产生新 tx + 新 fact 版本
(supersede 旧版);同行同 updatedAt → 幂等 skip。
**实测**(W3 末):
- 修前:51% transactions patient_id 为 null
- 修后:**0%** patient_id 为 null
**A+C 双修**(patient_id 不丢):
- **A Patient Stub Auto-Create**:增量拉 fact 但主档 cursor 没动 → 用三段键建空 stub,真实主档 upsert 补 PII
- **C 反向拉主档**:跑完 fact 收集 `(patient_id, brand)``WHERE (patient_id,brand) IN (...)` 强拉主档同 run
- 实测:修前 51% tx patient_id null → 修后 **0%**
**cohort 宿主无关**`manifest.sql_source.cohort` 声明 `patient_list_from` / `patient_key_column`
/ `tenant_key_column` / `list_cursor_column`,代码不硬编码表名列名。新 host 改 yaml 即可。
### 3.4 资源调优(并行 / 内存 / 速度)
| 旋钮 | env | 作用 | 建议 |
|---|---|---|---|
| cohort 大小 | `PAC_COHORT_BATCH_SIZE` / `--cohort-batch=N` | 每批患者数(控内存) | 5000;远程 DW 看进度可 500 |
| cohort 并发 | `PAC_COHORT_CONCURRENCY` | 并发 cohort 数(榨资源) | 默认 1 稳;3-4 并行(需 connection_limit) |
| 写批大小 | `PAC_WRITE_BATCH_SIZE` | 每次 createMany 行数 | 1000;0=退回 per-row(回滚开关) |
| query 并行 | (自动) | N 个 CH query Promise.all | load 阶段 ~4x,无需配置 |
**并行数据完整性**:cohort = disjoint 患者集,所有写患者级隔离 + DB 双 UNIQUE 兜底
→ 任何并发下一致。并行模式 checkpoint resume 禁用(靠幂等从头重跑)。
---
......@@ -311,17 +372,28 @@ UPDATE sync_logs SET cursor_after = '{"fact_emr_treatment_out":"2026-04-01",...}
WHERE id = (SELECT id FROM sync_logs WHERE resource='incremental_bundle' AND status='success' ORDER BY started_at DESC LIMIT 1);
```
下次 cron 自动从这个 cursor 拉。
下次 cron 自动从这个 cursor 拉。或直接 `pnpm sync -- --full` 忽略 cursor 全量重拉(幂等,UPSERT skip 已有)。
### 7.4 stale 'running' 锁清理(进程崩溃留下)
并发锁是 sync_logs `status='running'` 行;进程崩溃没 finalize 会留 stale 锁挡住后续 sync。
```sql
-- 看门狗:running 超 12h 视为 stale,人工 abort 释放锁
UPDATE sync_logs SET status='aborted', ended_at=NOW()
WHERE status='running' AND started_at < NOW() - INTERVAL '12 hours';
```
(CLI 撞锁退出 code=4;日志含 existing sync_log id 便于定位)
---
## 八、当前已知边界(W4 末)
## 八、当前已知边界
| 项 | 现状 | 影响 | 何时解决 |
|---|---|---|---|
| 首跑全量 cohort LIMIT 100 | dev manifest 里写死 LIMIT | 生产首跑必须去掉 LIMIT(可能 30 min 跑 13 万 patient) | 部署前手动改 |
| sync-incremental reparse 模式缺 | yaml 改后历史数据需要 truncate 重导 | 改 yaml 不能"原地补"老 facts | task #46(W5+) |
| reparse 模式缺 | yaml 改后历史数据需 truncate + `pnpm sync --full` 重导 | 改 yaml 不能"原地补"老 facts | task #46(W5+) |
| 告警接通道 | log ERROR + 人工看 | 滞后无即时告警 | W5+ 加 webhook 钉钉/飞书 |
| 并行 stale 锁清理 | 人工 SQL(7.4) | 崩溃后需手动 abort | W5+ 加 cron 看门狗自动清 |
| COPY 未上 | 用 prisma createMany(已 10-20x) | 极限性能还差 COPY 的 3-5x | PR5c(可选,按需) |
| 多 host 部署 | 支持(`PAC_INCREMENTAL_HOSTS=jvs-dw,friday`)| — | 已就绪 |
| 单机 vs 集群 | 当前单机 | 高 QPS 时 pac-service 可水平扩展(无状态) | 试点期不需要 |
......@@ -329,4 +401,4 @@ WHERE id = (SELECT id FROM sync_logs WHERE resource='incremental_bundle' AND sta
## ▎一句话归纳
> **PAC 部署 = Postgres + Redis + pac-service + pac-web 四件套。数据摄入 = 首次 cold-import(一次性)+ 日级 sync-incremental(自动 cron)。监控 = lag monitor + Bull Board + 日常 SQL 检查。**
> **PAC 部署 = Postgres + Redis + pac-service + pac-web 四件套(docker compose 必带 `--env-file`)。数据摄入 = 统一 `pnpm sync`:首跑全量(cohort 分批 + 可并行)+ 日级 cron 增量(同一套代码)。幂等靠 DB 双 UNIQUE,内存靠 cohort batch 恒定,速度靠 query 并行 + bulk write + cohort 并发。监控 = lag monitor + Bull Board + 日常 SQL 检查。**
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment