Agent Runtime
The heart of the Agent runtime is the subscribeEmbeddedPiSession() function -- it manages the interaction loop with the AI model, handles streaming output, tool calls, and context maintenance.
Main Entry Point
Source: src/agents/pi-embedded-subscribe.ts
// src/agents/pi-embedded-subscribe.ts (simplified)
async function subscribeEmbeddedPiSession(
params: SubscribeEmbeddedPiSessionParams
): Promise<void> {
// State management
const assistantTexts: string[] = [];
const toolMetas: ToolMeta[] = [];
let blockReplyBreak = false;
// Main interaction loop
// 1. Send messages to model
// 2. Receive streaming deltas
// 3. Handle tool calls
// 4. Collect and deliver responses
}Parameter Types
Source: src/agents/pi-embedded-subscribe.types.ts
// src/agents/pi-embedded-subscribe.types.ts (simplified)
interface SubscribeEmbeddedPiSessionParams {
session: AgentSession; // Session context
runId: string; // Unique run identifier
verboseLevel: number; // Logging verbosity
reasoningMode: "on" | "off" | "stream"; // Reasoning visibility
toolResultFormat: "markdown" | "plain"; // Tool result formatting
// Callbacks
onToolResult: (result: ToolResult) => void;
onBlockReply: (text: string, mediaUrls?: string[]) => void;
onReasoningStream: (text: string) => void;
onBlockReplyFlush: () => void;
onPartialReply: (text: string) => void;
}State Management
The runtime maintains the following internal state:
| State | Description |
|---|---|
assistantTexts | Accumulated reply text array |
toolMetas | Collected tool call metadata |
blockReplyBreak | Whether to break between reply blocks |
reasoningMode | Reasoning mode (on/off/stream) |
deltaBuffer | Streaming delta buffer |
blockState | Current block state (thinking/final) |
Block State Machine
Agent replies may contain "thinking" blocks (reasoning) and "final" blocks (output). The Block State Machine tracks which state the stream is currently in:
EmbeddedBlockChunker
Source: src/agents/pi-embedded-subscribe.ts
The Block Chunker is responsible for merging fragmented deltas into meaningful text blocks:
// EmbeddedBlockChunker (simplified)
class EmbeddedBlockChunker {
private buffer: string = "";
private timer: Timer | null = null;
// Minimum characters before emitting
private minChars: number;
// Idle time (ms) before flushing
private idleMs: number;
push(delta: string): void {
this.buffer += delta;
this.resetTimer();
if (this.buffer.length >= this.minChars) {
this.emit();
}
}
private resetTimer(): void {
// Reset idle timer
// Flush on idle timeout
}
private emit(): void {
// Emit accumulated buffer via onBlockReply
this.buffer = "";
}
}Merging strategy:
- Emit immediately when the buffer reaches
minChars - Force emit after
idleMstimeout (no new deltas arriving) - Force flush before tool calls to maintain clear boundaries
Message Normalization
Source: src/agents/pi-embedded-helpers.ts
Messages are normalized before being sent to the model:
// src/agents/pi-embedded-helpers.ts (simplified)
// Message normalization for comparison
// Messaging tool duplicate detection
// Session message sanitization
// Tool call ID handling via ToolCallIdMode
// Context window guardsKey helper capabilities:
- Message normalization -- Unify message format for comparison
- Duplicate detection --
isMessagingToolDuplicateNormalized()detects duplicate tool calls - Context window guards -- Prevent exceeding the model's context length limit
Summary
subscribeEmbeddedPiSession()is the core entry point for Agent execution- The runtime maintains internal state including text accumulation, tool metadata, and block state
- Block State Machine distinguishes between thinking and final output
- EmbeddedBlockChunker merges fragmented deltas into meaningful text blocks
- Helper functions handle message normalization, duplicate detection, and context window management
Next: Tool Calls