Commit bcf26264 by luoqi

fix(sync): cold-import Prisma 热路径加瞬时错误重试

首次全量在 batch 183 因 prisma.patientTransaction.findMany() 报
"Response from the Engine was empty"(Prisma 引擎 socket 瞬时抖动)整轮 abort。
上次只给 ClickHouse 查询加了重试,Prisma 侧未覆盖。

补 withDbRetry 指数退避(0.5/1.5/4.5s),包裹 cold-import 写主循环里
会 re-throw 而中断整轮的 DB 调用:createMany / findMany(回查 tx)/
buildPatientIndex / ensurePatientStub / patient+profile upsert。
只重试已知瞬时类错误(empty engine / 连接池 / ECONNRESET / server closed /
too many connections),P2002 等确定性错误立即抛交 caller。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
parent 368b2a9e
......@@ -751,19 +751,27 @@ export class ColdImportService {
};
try {
const patient = await this.prisma.patient.upsert({
where: {
hostId_tenantId_externalId: { hostId, tenantId, externalId },
},
create: { hostId, tenantId, externalId, ...patientData },
update: patientData,
});
const patient = await this.withDbRetry(
() =>
this.prisma.patient.upsert({
where: {
hostId_tenantId_externalId: { hostId, tenantId, externalId },
},
create: { hostId, tenantId, externalId, ...patientData },
update: patientData,
}),
'patient upsert',
);
// 副表 1:1 upsert
await this.prisma.patientProfile.upsert({
where: { patientId: patient.id },
create: { patientId: patient.id, ...profileData },
update: profileData,
});
await this.withDbRetry(
() =>
this.prisma.patientProfile.upsert({
where: { patientId: patient.id },
create: { patientId: patient.id, ...profileData },
update: profileData,
}),
'patientProfile upsert',
);
stats.patientsUpserted++;
} catch (err) {
stats.failed++;
......@@ -905,6 +913,40 @@ export class ColdImportService {
}
/**
* 带重试的 Prisma 操作 — Postgres/Prisma 引擎偶发瞬时错误:
* "Response from the Engine was empty"(引擎 socket 抖动)/ 连接池超时 / ECONNRESET / server closed。
* 一次全量(260 批 × 多次 SQL)里一次抖动就会整轮 abort(processSubject 主循环 re-throw),
* 故对热路径 DB 读/写加指数退避重试(默认 3 次:0.5s / 1.5s / 4.5s)。
* 只重试已知瞬时类错误;唯一约束(P2002)等确定性错误不匹配 → 立即抛(交 caller 处理)。
*/
private async withDbRetry<T>(
op: () => Promise<T>,
label: string,
maxAttempts = 3,
): Promise<T> {
let lastErr: unknown;
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
return await op();
} catch (err) {
lastErr = err;
const msg = err instanceof Error ? err.message : String(err);
const transient =
/Response from the Engine was empty|Can't reach database|connection pool|connection closed|timed out|timeout|ECONNRESET|socket hang up|Connection terminated|server closed the connection|reset by peer|too many connections/i.test(
msg,
);
if (!transient || attempt === maxAttempts) throw err;
const backoffMs = 500 * Math.pow(3, attempt - 1); // 500 / 1500 / 4500
this.logger.warn(
`[db] "${label}" 第 ${attempt}/${maxAttempts} 次失败(瞬时:${msg.slice(0, 80)}),${backoffMs}ms 后重试`,
);
await new Promise((r) => setTimeout(r, backoffMs));
}
}
throw lastErr;
}
/**
* PR4 批写一批 transactions + 衍生 facts。
*
* 流程:
......@@ -933,10 +975,14 @@ export class ColdImportService {
const data = buffer.map((b) => b.tx);
let createdCount = 0;
try {
const r = await this.prisma.patientTransaction.createMany({
data,
skipDuplicates: true,
});
const r = await this.withDbRetry(
() =>
this.prisma.patientTransaction.createMany({
data,
skipDuplicates: true,
}),
'createMany tx',
);
createdCount = r.count;
} catch (err) {
// createMany 整批失败(罕见 — schema 校验等)→ 降级 per-row
......@@ -956,25 +1002,29 @@ export class ColdImportService {
if (eventIds.length === 0) {
return; // 全 buffer 都没 source_event_id?理论不该发生
}
const txRows = await this.prisma.patientTransaction.findMany({
where: {
hostId,
tenantId,
sourceEventId: { in: eventIds },
},
select: {
id: true,
hostId: true,
tenantId: true,
patientId: true,
action: true,
subjectType: true,
subjectId: true,
occurredAt: true,
clinicId: true,
sourceEventId: true,
},
});
const txRows = await this.withDbRetry(
() =>
this.prisma.patientTransaction.findMany({
where: {
hostId,
tenantId,
sourceEventId: { in: eventIds },
},
select: {
id: true,
hostId: true,
tenantId: true,
patientId: true,
action: true,
subjectType: true,
subjectId: true,
occurredAt: true,
clinicId: true,
sourceEventId: true,
},
}),
'findMany tx by sourceEventId',
);
const txBySourceEventId = new Map(txRows.map((t) => [t.sourceEventId!, t]));
// 3. 构建 batch items,跑 parserPipeline.runForBatch
......@@ -1094,10 +1144,14 @@ export class ColdImportService {
hostId: string,
tenantId: string,
): Promise<Map<string, string>> {
const rows = await this.prisma.patient.findMany({
where: { hostId, tenantId },
select: { id: true, externalId: true },
});
const rows = await this.withDbRetry(
() =>
this.prisma.patient.findMany({
where: { hostId, tenantId },
select: { id: true, externalId: true },
}),
'buildPatientIndex',
);
return new Map(rows.map((p) => [p.externalId, p.id]));
}
......@@ -1110,16 +1164,24 @@ export class ColdImportService {
tenantId: string,
externalId: string,
): Promise<string> {
const patient = await this.prisma.patient.upsert({
where: { hostId_tenantId_externalId: { hostId, tenantId, externalId } },
create: { hostId, tenantId, externalId, active: true },
update: {}, // 有则 noop(姓名等真实主档 upsert 走 processPatients 路径)
});
await this.prisma.patientProfile.upsert({
where: { patientId: patient.id },
create: { patientId: patient.id, doNotContact: false, deceased: false, tags: [] },
update: {},
});
const patient = await this.withDbRetry(
() =>
this.prisma.patient.upsert({
where: { hostId_tenantId_externalId: { hostId, tenantId, externalId } },
create: { hostId, tenantId, externalId, active: true },
update: {}, // 有则 noop(姓名等真实主档 upsert 走 processPatients 路径)
}),
'ensurePatientStub patient',
);
await this.withDbRetry(
() =>
this.prisma.patientProfile.upsert({
where: { patientId: patient.id },
create: { patientId: patient.id, doNotContact: false, deceased: false, tags: [] },
update: {},
}),
'ensurePatientStub profile',
);
return patient.id;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment