Events & streams
The HarnessEvent discriminated union the harness emits.
drover normalises pi-agent-core’s events into a single discriminated
union. Consumers iterate RunHandle.events: AsyncIterable<HarnessEvent>;
plugins observe via onEvent.
The union
| kind | when | payload |
|---|---|---|
run_start | first thing emitted | agentId, specHash |
input_validated | input passed inputSchema | — |
turn_start | each pi turn | turn (1-based) |
llm_call | model dispatch | modelName, reasoning? |
thinking | model emits thinking block | text |
assistant_text | model emits final text per turn | text |
tool_call_start | pi dispatches a tool | toolUseId, toolName, input |
tool_call_end | tool returns | toolUseId, toolName, result, durationMs |
usage | per-call accounting | usage: { inputTokens, outputTokens, costUsd? } |
compaction | history compacted | beforeTokens, afterTokens, collapsedRange |
subagent_start | child spawn | childRunId, agentId |
subagent_end | child completes | childRunId, status |
output_validated | output passed outputSchema | — |
output_retry | output failed; retrying | attempt, reason |
run_end | terminal | status |
error | tagged error | tag, message |
Every event has runId, ts. Turn-scoped events also have turn.
Pairing
Tool calls always come in pairs — tool_call_start followed by
tool_call_end. Pair by toolUseId. If a run aborts mid-tool, the
start is recorded but the end may be missing.
Subagent observability
When the harness invokes the auto-injected task tool to spawn a
child, the parent stream gets:
tool_call_start (task)
subagent_start { childRunId, agentId }
... (child runs; events NOT mirrored)
subagent_end { childRunId, status }
tool_call_end (task)
The child’s own events flow through ITS harness instance — accessible
via storage (listEvents(childRunId)) or by attaching observers to
the child spec’s plugins.
Consuming
const handle = runAgent(spec, input);
for await (const e of handle.events) {
switch (e.kind) {
case "tool_call_start":
console.log(`→ ${e.toolName}`);
break;
case "tool_call_end":
if (e.result.isError) console.log(` ✗ ${e.result.content}`);
break;
case "usage":
console.log(` ${e.usage.inputTokens}/${e.usage.outputTokens}`);
break;
}
}The Promise + AsyncIterable both eventually settle. If you never
iterate events, the queue grows unbounded (single-run scale, fine for
short runs); for high-volume scenarios use the runtime layer instead.
Persistence
When storage is wired, every event lands in the run_events table in
seq order. Replay = listEvents(runId) + iterate.