URL: /drover/concepts/events-and-streams

---
title: Events & streams
description: The HarnessEvent discriminated union the harness emits.
---

drover normalises pi-agent-core's events into a single discriminated
union. Consumers iterate `RunHandle.events: AsyncIterable<HarnessEvent>`;
plugins observe via `onEvent`.

## The union

| kind | when | payload |
| --- | --- | --- |
| `run_start` | first thing emitted | `agentId`, `specHash` |
| `input_validated` | input passed `inputSchema` | — |
| `turn_start` | each pi turn | `turn` (1-based) |
| `llm_call` | model dispatch | `modelName`, `reasoning?` |
| `thinking` | model emits thinking block | `text` |
| `assistant_text` | model emits final text per turn | `text` |
| `tool_call_start` | pi dispatches a tool | `toolUseId`, `toolName`, `input` |
| `tool_call_end` | tool returns | `toolUseId`, `toolName`, `result`, `durationMs` |
| `usage` | per-call accounting | `usage: { inputTokens, outputTokens, costUsd? }` |
| `compaction` | history compacted | `beforeTokens`, `afterTokens`, `collapsedRange` |
| `subagent_start` | child spawn | `childRunId`, `agentId` |
| `subagent_end` | child completes | `childRunId`, `status` |
| `output_validated` | output passed `outputSchema` | — |
| `output_retry` | output failed; retrying | `attempt`, `reason` |
| `run_end` | terminal | `status` |
| `error` | tagged error | `tag`, `message` |

Every event has `runId`, `ts`. Turn-scoped events also have `turn`.

## Pairing

Tool calls always come in pairs — `tool_call_start` followed by
`tool_call_end`. Pair by `toolUseId`. If a run aborts mid-tool, the
start is recorded but the end may be missing.

## Subagent observability

When the harness invokes the auto-injected `task` tool to spawn a
child, the parent stream gets:

```
tool_call_start (task)
subagent_start  { childRunId, agentId }
... (child runs; events NOT mirrored)
subagent_end    { childRunId, status }
tool_call_end   (task)
```

The child's own events flow through ITS harness instance — accessible
via storage (`listEvents(childRunId)`) or by attaching observers to
the child spec's `plugins`.

## Consuming

```ts
const handle = runAgent(spec, input);

for await (const e of handle.events) {
  switch (e.kind) {
    case "tool_call_start":
      console.log(`→ ${e.toolName}`);
      break;
    case "tool_call_end":
      if (e.result.isError) console.log(`  ✗ ${e.result.content}`);
      break;
    case "usage":
      console.log(`  ${e.usage.inputTokens}/${e.usage.outputTokens}`);
      break;
  }
}
```

The Promise + AsyncIterable both eventually settle. If you never
iterate `events`, the queue grows unbounded (single-run scale, fine for
short runs); for high-volume scenarios use the runtime layer instead.

## Persistence

When `storage` is wired, every event lands in the `run_events` table in
seq order. Replay = `listEvents(runId)` + iterate.
