Skip to content

Agent 运行时

Agent 运行时的核心是 subscribeEmbeddedPiSession() 函数——它管理与 AI 模型的交互循环,处理流式输出、工具调用和上下文维护。

主入口

Source: src/agents/pi-embedded-subscribe.ts

typescript
// src/agents/pi-embedded-subscribe.ts (simplified)
async function subscribeEmbeddedPiSession(
  params: SubscribeEmbeddedPiSessionParams
): Promise<void> {
  // State management
  const assistantTexts: string[] = [];
  const toolMetas: ToolMeta[] = [];
  let blockReplyBreak = false;

  // Main interaction loop
  // 1. Send messages to model
  // 2. Receive streaming deltas
  // 3. Handle tool calls
  // 4. Collect and deliver responses
}

参数类型

Source: src/agents/pi-embedded-subscribe.types.ts

typescript
// src/agents/pi-embedded-subscribe.types.ts (simplified)
interface SubscribeEmbeddedPiSessionParams {
  session: AgentSession;          // Session context
  runId: string;                  // Unique run identifier
  verboseLevel: number;           // Logging verbosity
  reasoningMode: "on" | "off" | "stream";  // Reasoning visibility

  toolResultFormat: "markdown" | "plain";  // Tool result formatting

  // Callbacks
  onToolResult: (result: ToolResult) => void;
  onBlockReply: (text: string, mediaUrls?: string[]) => void;
  onReasoningStream: (text: string) => void;
  onBlockReplyFlush: () => void;
  onPartialReply: (text: string) => void;
}

状态管理

运行时维护以下状态:

状态说明
assistantTexts累积的回复文本数组
toolMetas收集的工具调用元数据
blockReplyBreak是否需要在回复块之间断开
reasoningMode推理模式(on/off/stream)
deltaBuffer流式 delta 缓冲
blockState当前块状态(thinking/final)

Block State Machine

Agent 回复可能包含"思考"块(reasoning)和"最终"块(final output)。Block State Machine 跟踪当前处于哪种状态:

EmbeddedBlockChunker

Source: src/agents/pi-embedded-subscribe.ts

Block Chunker 负责将零碎的 delta 合并成有意义的文本块:

typescript
// EmbeddedBlockChunker (simplified)
class EmbeddedBlockChunker {
  private buffer: string = "";
  private timer: Timer | null = null;

  // Minimum characters before emitting
  private minChars: number;
  // Idle time (ms) before flushing
  private idleMs: number;

  push(delta: string): void {
    this.buffer += delta;
    this.resetTimer();

    if (this.buffer.length >= this.minChars) {
      this.emit();
    }
  }

  private resetTimer(): void {
    // Reset idle timer
    // Flush on idle timeout
  }

  private emit(): void {
    // Emit accumulated buffer via onBlockReply
    this.buffer = "";
  }
}

合并策略:

  • 当缓冲区达到 minChars 时立即发射
  • idleMs 超时(没有新 delta)时强制发射
  • 在工具调用前强制 flush,保持边界清晰

消息规范化

Source: src/agents/pi-embedded-helpers.ts

消息在发送给模型前会经过规范化:

typescript
// src/agents/pi-embedded-helpers.ts (simplified)
// Message normalization for comparison
// Messaging tool duplicate detection
// Session message sanitization
// Tool call ID handling via ToolCallIdMode
// Context window guards

关键辅助功能:

  • 消息规范化 — 统一消息格式用于比较
  • 重复检测isMessagingToolDuplicateNormalized() 检测重复的工具调用
  • 上下文窗口守卫 — 防止超出模型的上下文长度限制

小结

  • subscribeEmbeddedPiSession() 是 Agent 执行的核心入口
  • 运行时维护文本累积、工具元数据、块状态等内部状态
  • Block State Machine 区分 thinking 和 final 输出
  • EmbeddedBlockChunker 合并零碎 delta 为有意义的文本块
  • 辅助函数处理消息规范化、重复检测和上下文窗口管理

下一章:工具调用

OpenClaw 源码学习教程