Agent 运行时
Agent 运行时的核心是 subscribeEmbeddedPiSession() 函数——它管理与 AI 模型的交互循环,处理流式输出、工具调用和上下文维护。
主入口
Source: src/agents/pi-embedded-subscribe.ts
typescript
// src/agents/pi-embedded-subscribe.ts (simplified)
async function subscribeEmbeddedPiSession(
params: SubscribeEmbeddedPiSessionParams
): Promise<void> {
// State management
const assistantTexts: string[] = [];
const toolMetas: ToolMeta[] = [];
let blockReplyBreak = false;
// Main interaction loop
// 1. Send messages to model
// 2. Receive streaming deltas
// 3. Handle tool calls
// 4. Collect and deliver responses
}参数类型
Source: src/agents/pi-embedded-subscribe.types.ts
typescript
// src/agents/pi-embedded-subscribe.types.ts (simplified)
interface SubscribeEmbeddedPiSessionParams {
session: AgentSession; // Session context
runId: string; // Unique run identifier
verboseLevel: number; // Logging verbosity
reasoningMode: "on" | "off" | "stream"; // Reasoning visibility
toolResultFormat: "markdown" | "plain"; // Tool result formatting
// Callbacks
onToolResult: (result: ToolResult) => void;
onBlockReply: (text: string, mediaUrls?: string[]) => void;
onReasoningStream: (text: string) => void;
onBlockReplyFlush: () => void;
onPartialReply: (text: string) => void;
}状态管理
运行时维护以下状态:
| 状态 | 说明 |
|---|---|
assistantTexts | 累积的回复文本数组 |
toolMetas | 收集的工具调用元数据 |
blockReplyBreak | 是否需要在回复块之间断开 |
reasoningMode | 推理模式(on/off/stream) |
deltaBuffer | 流式 delta 缓冲 |
blockState | 当前块状态(thinking/final) |
Block State Machine
Agent 回复可能包含"思考"块(reasoning)和"最终"块(final output)。Block State Machine 跟踪当前处于哪种状态:
EmbeddedBlockChunker
Source: src/agents/pi-embedded-subscribe.ts
Block Chunker 负责将零碎的 delta 合并成有意义的文本块:
typescript
// EmbeddedBlockChunker (simplified)
class EmbeddedBlockChunker {
private buffer: string = "";
private timer: Timer | null = null;
// Minimum characters before emitting
private minChars: number;
// Idle time (ms) before flushing
private idleMs: number;
push(delta: string): void {
this.buffer += delta;
this.resetTimer();
if (this.buffer.length >= this.minChars) {
this.emit();
}
}
private resetTimer(): void {
// Reset idle timer
// Flush on idle timeout
}
private emit(): void {
// Emit accumulated buffer via onBlockReply
this.buffer = "";
}
}合并策略:
- 当缓冲区达到
minChars时立即发射 - 当
idleMs超时(没有新 delta)时强制发射 - 在工具调用前强制 flush,保持边界清晰
消息规范化
Source: src/agents/pi-embedded-helpers.ts
消息在发送给模型前会经过规范化:
typescript
// src/agents/pi-embedded-helpers.ts (simplified)
// Message normalization for comparison
// Messaging tool duplicate detection
// Session message sanitization
// Tool call ID handling via ToolCallIdMode
// Context window guards关键辅助功能:
- 消息规范化 — 统一消息格式用于比较
- 重复检测 —
isMessagingToolDuplicateNormalized()检测重复的工具调用 - 上下文窗口守卫 — 防止超出模型的上下文长度限制
小结
subscribeEmbeddedPiSession()是 Agent 执行的核心入口- 运行时维护文本累积、工具元数据、块状态等内部状态
- Block State Machine 区分 thinking 和 final 输出
- EmbeddedBlockChunker 合并零碎 delta 为有意义的文本块
- 辅助函数处理消息规范化、重复检测和上下文窗口管理
下一章:工具调用