URL: /drover/guides/runtime

---
title: Runtime
description: Worker pool + lease queue + RunApi. Turns drover into a server.
---

`@drover/runtime` is the opt-in piece that adds a durable job queue,
worker pool with crash recovery, and a programmatic API. Without it
drover is a library; with it, drover hosts a durable agent-runner.

## Pieces

| piece | purpose |
| --- | --- |
| `QueueAdapter` | enqueue / claim / heartbeat / complete / fail / cancel / reclaimStale |
| `createMemoryQueue` | in-process, single-instance, tests |
| `createLibsqlQueue` | durable, multi-process, production |
| `WorkerPool` | N workers polling the queue, executing runs via `runAgent`, heartbeating, recovering crashes |
| `RunApi` | programmatic enqueue / get / cancel / resume / waitFor |

## Hello, runtime

```ts
import { Type } from "@sinclair/typebox";
import { defineAgent } from "@drover/core";
import { staticRegistry } from "@drover/facade";
import {
  createLibsqlQueue,
  createRunApi,
  createWorkerPool,
} from "@drover/runtime";
import { createLibsqlStorage } from "@drover/storage";
import { createNoneSandbox } from "@drover/sandbox";

const echo = defineAgent({
  id: "echo",
  systemPrompt: "Reply with {ok: true} as JSON.",
  inputSchema: Type.Object({ msg: Type.String() }),
  outputSchema: Type.Object({ ok: Type.Boolean() }),
  model: "cheap",
  tools: [],
  quota: { maxTurns: 2 },
});

const queue = await createLibsqlQueue({ url: "file:./var/queue.db" });
const storage = await createLibsqlStorage({ url: "file:./var/runs.db" });
const registry = staticRegistry({ echo });
const sandboxFor = (): ReturnType<typeof createNoneSandbox> => createNoneSandbox();

// Worker side (long-running process)
const pool = createWorkerPool(
  { queue, storage, registry, sandboxFor },
  { concurrency: 4 },
);
pool.start();

// Producer side (HTTP handler, CLI, another agent)
const api = createRunApi({ queue, storage, registry, sandboxFor });
const job = await api.enqueue({
  id: crypto.randomUUID(),
  agentId: "echo",
  input: { msg: "hi" },
});

// Wait inline if you want a synchronous-feeling call
const final = await api.waitFor(job.id, { timeoutMs: 60_000 });
console.log(final.run?.output);
```

## Atomic claim

`createLibsqlQueue.claim` uses one statement:

```sql
UPDATE queue_jobs
SET status = 'leased', lease_worker = ?, lease_expires_at = ?
WHERE id = (
  SELECT id FROM queue_jobs
    WHERE status = 'queued'
      AND (scheduled_for IS NULL OR scheduled_for <= ?)
    ORDER BY priority DESC, created_at ASC
    LIMIT 1
)
RETURNING *;
```

`UPDATE ... RETURNING` is atomic across workers — at most one worker
leases a given job. Point N pools at the same `file:./var/queue.db` and
they coordinate via the file lock. No application-side mutex.

## Crash recovery

Workers heartbeat every `leaseDurationMs / 3` (default 10s). If a worker
dies mid-job:

1. Heartbeats stop.
2. `lease_expires_at` rolls into the past.
3. The pool's `reclaimStale` loop runs every `reclaimIntervalMs` (default
   5s). Jobs whose lease has expired flip back to `status = 'queued'`
   with `attempts += 1`.
4. Another worker claims the next iteration.

`maxAttempts` (default 1, per-job) caps the recovery loop. After `attempts
>= maxAttempts`, the job moves to `failed`.

## Cancellation

```ts
await api.cancel(jobId);
```

Effect:
- `queue.cancel(jobId)` flips `cancel_requested = 1`.
- If the job is still `queued`, it's marked `cancelled` immediately.
- If a worker is running it, the worker's `cancelPoll` notices on its
  next tick (≤ heartbeat interval) and aborts the inner run via the
  `AbortController` propagated into pi-agent-core.
- The worker calls `queue.fail(jobId, ..., retry=false)` — no auto-retry
  on caller cancellations.

## Heartbeat tuning

| knob | default | tradeoff |
| --- | --- | --- |
| `leaseDurationMs` | 30s | longer = fewer reclaims when workers slow down; shorter = faster crash recovery |
| `heartbeatIntervalMs` | leaseDurationMs / 3 | shorter = more DB writes; longer = closer to lease expiry, risk of false reclaim |
| `reclaimIntervalMs` | 5s | shorter = faster recovery; longer = lower polling overhead |

For a long-running agent (say a multi-minute build), bump `leaseDurationMs`
to comfortably exceed the longest expected pause inside the run.

## Horizontal scale

Each worker pool is a `Bun.spawn`-able process. Multiple pools, multiple
machines, one libsql file (Turso for cross-machine):

```ts
createLibsqlQueue({ url: "libsql://your-org.turso.io", authToken: "..." });
createLibsqlStorage({ url: "libsql://your-org.turso.io", authToken: "..." });
```

Pool A on machine 1 with `concurrency: 8`, pool B on machine 2 with
`concurrency: 8` — 16 workers total, no app-side coordination needed.

## What's missing in v0

- HTTP wrappers (REST or RPC). The programmatic `RunApi` is the public
  surface; an HTTP layer can wrap it without drover knowing.
- SSE event streaming. `api.waitFor` polls; for live event feeds today
  go through `storage.listEvents(runId)` after a poll.
- Cron / scheduled triggers. `EnqueueInput.scheduledFor` is honoured by
  the queue (jobs aren't claimable until then), but cron-style "every
  Monday" needs an external scheduler.

These are tractable to add — none of them require redesigning the
queue / pool primitives.
