Commit f86e39a5 by luoqi

feat(assistant): P2 — "外部 agent" 模拟器(模型自主调 MCP 工具 + SSE 流式)

独立 assistant 模块(不复用 AiCall 单发框架),实现 model-driven tool-calling agent loop:
- McpClientService:极简 HTTP MCP 客户端(raw JSON-RPC),助手作为外部 agent 真连 PAC 自己的
  /mcp(dogfood),转发用户 Bearer → 工具在该用户 tenant scope 执行。
- AssistantService:动态拉 MCP 工具 → 包成 AI SDK tool(execute=回调 MCP)→ streamText
  多步循环(模型自主决定调哪些工具,stopWhen=stepCountIs(8))→ provider 可切换
  (deepseek/gemini/qwen,复用 AiProviderService)。
- AssistantController:POST /assistant/chat(JWT)→ fullStream 部件转 SSE
  (text / tool_call / tool_result / error / done),供前端流式渲染 + 工具调用可视化。

本地端到端验证(:3101):问"孙柯画像+召回计划"→ 模型自主 find_patient→get_patient_overview,
流式产出基于真实数据的答案(价值分群/画像/召回 3 条),手机号掩码。

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
parent cb405aef
......@@ -19,6 +19,7 @@ import { AiModule } from './modules/ai/ai.module';
import { RealtimeCoachModule } from './modules/realtime-coach/realtime-coach.module';
import { AdminModule } from './modules/admin/admin.module';
import { McpModule } from './modules/mcp/mcp.module';
import { AssistantModule } from './modules/assistant/assistant.module';
import { QueuesModule } from './queues/queues.module';
import { QueuesBullBoardModule } from './queues/bull-board.module';
import { JwtAuthGuard } from './common/guards/jwt-auth.guard';
......@@ -51,6 +52,7 @@ import { HealthController } from './health.controller';
AiModule,
RealtimeCoachModule,
McpModule,
AssistantModule,
AdminModule,
PlanAggregateModule,
],
......
import { Body, Controller, Post, Req, Res } from '@nestjs/common';
import type { Request, Response } from 'express';
import { ApiBearerAuth, ApiOperation, ApiTags } from '@nestjs/swagger';
import type { ModelMessage } from 'ai';
import { AssistantService } from './assistant.service';
interface ChatBody {
messages: ModelMessage[];
model?: string;
}
/**
* AssistantController — "外部 agent" 模拟器聊天端点(SSE 流式)。
*
* POST /pac/v1/assistant/chat —— 普通 JWT 鉴权(全局 guard);转发用户 token 给 MCP,
* 工具在该用户 tenant scope 下执行。把 streamText 的 fullStream 部件转成 SSE 事件给前端:
* { type: 'text', text } 文本增量
* { type: 'tool_call', tool, args } 模型发起工具调用(前端可视化"调了哪个 PAC 工具")
* { type: 'tool_result', tool, result }
* { type: 'error', error } / { type: 'done' }
*/
@ApiTags('assistant')
@ApiBearerAuth('accessToken')
@Controller('assistant')
export class AssistantController {
constructor(private readonly assistant: AssistantService) {}
@Post('chat')
@ApiOperation({ summary: '助手对话(SSE)— 模型自主调 PAC MCP 工具' })
async chat(@Req() req: Request, @Res() res: Response, @Body() body: ChatBody): Promise<void> {
const token = (req.headers['authorization'] ?? '').replace(/^Bearer\s+/i, '');
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache, no-transform');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders?.();
const send = (event: Record<string, unknown>): void => {
res.write(`data: ${JSON.stringify(event)}\n\n`);
};
const ac = new AbortController();
req.on('close', () => ac.abort());
try {
const result = await this.assistant.chat({
userToken: token,
modelId: body.model,
messages: body.messages ?? [],
abortSignal: ac.signal,
});
for await (const part of result.fullStream) {
const p = part as Record<string, unknown> & { type: string };
switch (p.type) {
case 'text-delta':
case 'text':
send({ type: 'text', text: (p.text as string) ?? (p.delta as string) ?? '' });
break;
case 'tool-call':
send({ type: 'tool_call', tool: p.toolName, args: p.input });
break;
case 'tool-result':
send({ type: 'tool_result', tool: p.toolName, result: p.output });
break;
case 'error':
send({ type: 'error', error: String(p.error) });
break;
default:
break;
}
}
send({ type: 'done' });
} catch (err) {
send({ type: 'error', error: err instanceof Error ? err.message : String(err) });
} finally {
res.end();
}
}
}
import { Module } from '@nestjs/common';
import { AiModule } from '../ai/ai.module';
import { AssistantController } from './assistant.controller';
import { AssistantService } from './assistant.service';
import { McpClientService } from './mcp-client.service';
/**
* AssistantModule — "外部 agent" 模拟器(独立设计,不复用 AiCall 单发框架)。
*
* 复用 AiModule 的 AiProviderService(provider 可切换);McpClientService 真连 PAC MCP 端点。
*/
@Module({
imports: [AiModule],
controllers: [AssistantController],
providers: [AssistantService, McpClientService],
})
export class AssistantModule {}
import { Injectable, Logger } from '@nestjs/common';
import { streamText, tool, jsonSchema, stepCountIs, type ModelMessage, type ToolSet } from 'ai';
import { AiProviderService } from '../ai/core/ai-provider.service';
import { McpClientService } from './mcp-client.service';
const SYSTEM_PROMPT = `你是 PAC(疗效保障 / 患者分析中心)的患者召回助手,正在模拟"外部 agent"通过工具访问 PAC。
你有一组工具可以查询患者数据。使用建议:
- 用户给患者姓名/手机/患者号时,先用 find_patient 拿到 patientId,再继续。
- 要患者全景就用 get_patient_overview(一次给画像+事实+召回计划);要细节再用 get_persona / get_facts / get_recall_plan。
- 问"现在该联系谁 / 召回池"用 list_recall_queue。
铁律:只依据工具返回的真实数据回答,绝不编造;工具没返回的就说"没有该信息"。手机号只显示掩码。
用中文,简洁专业。回答时可简述你查了哪些数据。`;
export interface AssistantChatInput {
userToken: string;
modelId?: string;
messages: ModelMessage[];
abortSignal?: AbortSignal;
}
/**
* AssistantService — 独立的"外部 agent"模拟器(不复用 AiCall 单发框架)。
*
* 模型自主决定调哪些工具(model-driven tool-calling,非强制工作流):
* 动态从 MCP 拉工具 → 包成 AI SDK tool(execute = 回调 MCP)→ streamText 多步循环
* (模型出 tool-call → 执行 → 喂回 → 直到产出最终文本)→ 流式返回。
* provider 可切换(deepseek / gemini / qwen),复用 AiProviderService 配置。
*/
@Injectable()
export class AssistantService {
private readonly logger = new Logger(AssistantService.name);
constructor(
private readonly provider: AiProviderService,
private readonly mcp: McpClientService,
) {}
// 显式收窄返回类型为控制器实际消费的最小形状,避开 streamText 返回类型引用 ai 内部
// 未导出的 Output 类型导致的 .d.ts 命名失败(TS4053)。
async chat(input: AssistantChatInput): Promise<{ fullStream: AsyncIterable<unknown> }> {
// 1. 动态拉 MCP 工具 → 转成 AI SDK tools(execute 回调真 MCP 调用,带用户 token)
const mcpTools = await this.mcp.listTools(input.userToken);
const tools: ToolSet = {};
for (const t of mcpTools) {
tools[t.name] = tool({
description: t.description ?? t.name,
inputSchema: jsonSchema(t.inputSchema),
execute: async (args: unknown) => this.mcp.callTool(input.userToken, t.name, args),
});
}
this.logger.log(
`assistant chat: model=${input.modelId ?? 'deepseek'} tools=${Object.keys(tools).join(',')}`,
);
// 2. resolve provider(可切换)+ streamText 跑 model-driven tool-calling 循环
const { model } = this.provider.resolve(input.modelId ?? 'deepseek');
return streamText({
model,
system: SYSTEM_PROMPT,
messages: input.messages,
tools,
stopWhen: stepCountIs(8), // 防失控:最多 8 步工具循环
abortSignal: input.abortSignal,
});
}
}
import { Injectable, Logger } from '@nestjs/common';
export interface McpToolDef {
name: string;
description?: string;
inputSchema: Record<string, unknown>;
}
/**
* McpClientService — 极简 HTTP MCP 客户端(raw JSON-RPC over Streamable HTTP)。
*
* 助手作为"外部 agent"通过此客户端**真连** PAC 自己的 MCP 端点(dogfood 验证契约)。
* 转发调用方(登录用户)的 Bearer token → MCP 工具在该用户的 tenant scope 下执行。
*
* 无状态:PAC MCP server enableJsonResponse + 无 session,tools/list / tools/call 可直接 POST,无需握手。
*/
@Injectable()
export class McpClientService {
private readonly logger = new Logger(McpClientService.name);
private readonly url =
process.env.PAC_MCP_URL ?? `http://127.0.0.1:${process.env.PORT ?? '3001'}/pac/v1/mcp`;
private async rpc(token: string, method: string, params?: unknown): Promise<Record<string, unknown>> {
const res = await fetch(this.url, {
method: 'POST',
headers: {
'content-type': 'application/json',
accept: 'application/json, text/event-stream',
authorization: `Bearer ${token}`,
},
body: JSON.stringify({ jsonrpc: '2.0', id: Date.now(), method, params }),
});
const text = await res.text();
const msg = parseRpc(text);
if (msg.error) {
throw new Error(`MCP ${method} 失败: ${(msg.error as { message?: string }).message ?? 'unknown'}`);
}
return (msg.result ?? {}) as Record<string, unknown>;
}
async listTools(token: string): Promise<McpToolDef[]> {
const r = await this.rpc(token, 'tools/list');
return (r.tools as McpToolDef[]) ?? [];
}
/** 调工具 → 返回纯文本结果(MCP content[].text 拼接);isError 时抛出供 agent 看到。 */
async callTool(token: string, name: string, args: unknown): Promise<string> {
const r = await this.rpc(token, 'tools/call', { name, arguments: args ?? {} });
const content = (r.content as Array<{ type: string; text?: string }>) ?? [];
const text = content.map((c) => c.text ?? '').join('\n');
if (r.isError) throw new Error(text || `工具 ${name} 返回错误`);
return text;
}
}
/** 解析 MCP 响应:plain JSON 或 SSE 帧(data: {...})。 */
function parseRpc(text: string): Record<string, unknown> {
const t = text.trim();
if (t.startsWith('{')) return JSON.parse(t) as Record<string, unknown>;
const dataLine = t.split('\n').find((l) => l.startsWith('data:'));
if (!dataLine) throw new Error(`无法解析 MCP 响应: ${t.slice(0, 120)}`);
return JSON.parse(dataLine.slice('data:'.length).trim()) as Record<string, unknown>;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment