Commit 41d7ef61 by luoqi

perf(sync): 单患者刷新从 O(全租户) 降到 O(1)

详情页"刷新"原走 recomputeForPatient→runAllForHost,对整个租户跑召回 SQL
并重写全员 plan(13万规模拖到分钟级 / >120s)。

- ScenarioScope 加可选 patientId;scenario SQL 注入 AND p.id=patientId 收窄
- recomputeForPatient 改真·单患者:只 selectHits 该患者 + 只 upsert 这一个 plan
- loadTablesForPatient 6 张源表 Promise.all 并行(跨洋 RTT 6×→1×)

本地验证:刷新 success、单患者 hits 正确、无回归。

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
parent fb38f2de
...@@ -44,13 +44,51 @@ export class PlanEngineService { ...@@ -44,13 +44,51 @@ export class PlanEngineService {
clinicId?: string; clinicId?: string;
patientId: string; patientId: string;
}): Promise<{ plansCreated: number }> { }): Promise<{ plansCreated: number }> {
// v1 简化:单 patient 走全量过滤路径(scenario selector 自带 patient 过滤的成本可接受); // 真·单 patient:scenario SQL 带 patientId 收窄(只扫该患者),只 upsert 这一个 plan。
// W3+ 加 patientId 入参深入 scenario plugin 真正只跑该 patient // (旧版走 runAllForHost 全租户扫 + 写全员 plan,详情页单刷被拖到分钟级 → 现 O(1))
const r = await this.runAllForHost({ const scope: ScenarioScope = {
hostId: input.hostId, hostId: input.hostId,
tenantId: input.tenantId, tenantId: input.tenantId,
now: new Date(),
patientId: input.patientId,
};
const hits: ScenarioHitWithKey[] = [];
for (const sc of this.scenarios) {
const scHits = await sc.selectHits(scope);
for (const h of scHits) hits.push({ ...h, scenarioKey: sc.key });
}
// 无命中 → 不动(与全量行为一致:不主动关闭旧 plan,stale 清理是独立议题)
if (hits.length === 0) return { plansCreated: 0 };
const log = await this.prisma.planGenerationLog.create({
data: {
hostId: input.hostId,
tenantId: input.tenantId,
patientId: input.patientId,
triggeredBy: 'patient_refresh',
status: 'running',
scenarioRunCount: this.scenarios.length,
},
}); });
return { plansCreated: r.plansCreated }; try {
const result = await this.upsertPlan({ scope, patientId: input.patientId, hits });
const created = result === 'created' || result === 'superseded' ? 1 : 0;
await this.prisma.planGenerationLog.update({
where: { id: log.id },
data: { status: 'success', plansCreated: created, endedAt: new Date() },
});
return { plansCreated: created };
} catch (err) {
await this.prisma.planGenerationLog.update({
where: { id: log.id },
data: {
status: 'failed',
errorMessage: err instanceof Error ? err.message : String(err),
endedAt: new Date(),
},
});
throw err;
}
} }
/** /**
......
...@@ -18,6 +18,9 @@ export interface ScenarioScope { ...@@ -18,6 +18,9 @@ export interface ScenarioScope {
hostId: string; hostId: string;
tenantId: string; tenantId: string;
now: Date; now: Date;
/// 可选:只评估单个 patient(详情页"刷新"单刷场景)。
/// 设了 → selectHits SQL 加 `AND p.id = patientId`,从全租户扫降为单患者扫(O(1))。
patientId?: string;
} }
export interface ScenarioHit { export interface ScenarioHit {
......
import { Injectable, Logger } from '@nestjs/common'; import { Injectable, Logger } from '@nestjs/common';
import { Prisma } from '@prisma/client';
import { import {
PlanScenario, PlanScenario,
DiagnosisTreatmentMap, DiagnosisTreatmentMap,
...@@ -233,6 +234,11 @@ export class TreatmentInitiationRecallScenario implements PlanScenarioPlugin { ...@@ -233,6 +234,11 @@ export class TreatmentInitiationRecallScenario implements PlanScenarioPlugin {
.filter(([, cat]) => excludeCats.includes(cat)) .filter(([, cat]) => excludeCats.includes(cat))
.map(([text]) => text); .map(([text]) => text);
// 单 patient 收窄(详情页"刷新"):设了 scope.patientId → 只扫该患者,O(全租户)→O(1)
const patientFilter = scope.patientId
? Prisma.sql`AND p.id = ${scope.patientId}::uuid`
: Prisma.empty;
// ╔═════════════════════════════════════════════════════════════════════╗ // ╔═════════════════════════════════════════════════════════════════════╗
// ║ 召回 SQL 完整解读(initiation = 潜在治疗新链召回) ║ // ║ 召回 SQL 完整解读(initiation = 潜在治疗新链召回) ║
// ║ ║ // ║ ║
...@@ -312,6 +318,7 @@ export class TreatmentInitiationRecallScenario implements PlanScenarioPlugin { ...@@ -312,6 +318,7 @@ export class TreatmentInitiationRecallScenario implements PlanScenarioPlugin {
JOIN patient_facts sig ON sig.patient_id = p.id JOIN patient_facts sig ON sig.patient_id = p.id
WHERE p.host_id = ${scope.hostId}::uuid -- ① 隔离闸 WHERE p.host_id = ${scope.hostId}::uuid -- ① 隔离闸
AND p.tenant_id = ${scope.tenantId} -- ① 隔离闸 AND p.tenant_id = ${scope.tenantId} -- ① 隔离闸
${patientFilter} -- 单刷收窄(可空)
AND p.active = true -- ② 合规闸 AND p.active = true -- ② 合规闸
AND pp.do_not_contact = false -- ② 合规闸 AND pp.do_not_contact = false -- ② 合规闸
AND pp.deceased = false -- ② 合规闸 AND pp.deceased = false -- ② 合规闸
......
...@@ -256,22 +256,20 @@ export class ClickHouseSourceService { ...@@ -256,22 +256,20 @@ export class ClickHouseSourceService {
const brandEsc = scope.brand.replace(/'/g, "''"); const brandEsc = scope.brand.replace(/'/g, "''");
try { try {
for (const [tableName, sql] of Object.entries(source.queries)) { // 并行查 N 张源表(单患者数据量极小,瓶颈是跨洋 RTT)。
const patientSql = this.injectPatientFilter(sql, pidEsc, brandEsc); // 串行 6 表 ≈ 6×RTT;并行 ≈ 1×RTT。对齐 loadAllTables/loadTablesForCohort 的 Promise.all 口径。
this.logger.log( await Promise.all(
`[clickhouse·patient] query "${tableName}" — ${patientSql.slice(0, 140).replace(/\s+/g, ' ')}...`, Object.entries(source.queries).map(async ([tableName, sql]) => {
); const patientSql = this.injectPatientFilter(sql, pidEsc, brandEsc);
const started = Date.now(); const started = Date.now();
const result = await client.query({ const result = await client.query({ query: patientSql, format: 'JSONEachRow' });
query: patientSql, const rows = (await result.json()) as unknown[];
format: 'JSONEachRow', tables[tableName] = rows;
}); this.logger.log(
const rows = (await result.json()) as unknown[]; `[clickhouse·patient] "${tableName}" → ${rows.length} 行,${Date.now() - started} ms`,
tables[tableName] = rows; );
this.logger.log( }),
`[clickhouse·patient] "${tableName}" → ${rows.length} 行,${Date.now() - started} ms`, );
);
}
} finally { } finally {
await client.close(); await client.close();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment