Runtime
Worker pool + lease queue + RunApi. Turns drover into a server.
@drover/runtime is the opt-in piece that adds a durable job queue,
worker pool with crash recovery, and a programmatic API. Without it
drover is a library; with it, drover hosts a durable agent-runner.
Pieces
| piece | purpose |
|---|---|
QueueAdapter | enqueue / claim / heartbeat / complete / fail / cancel / reclaimStale |
createMemoryQueue | in-process, single-instance, tests |
createLibsqlQueue | durable, multi-process, production |
WorkerPool | N workers polling the queue, executing runs via runAgent, heartbeating, recovering crashes |
RunApi | programmatic enqueue / get / cancel / resume / waitFor |
Hello, runtime
import { Type } from "@sinclair/typebox";
import { defineAgent } from "@drover/core";
import { staticRegistry } from "@drover/facade";
import {
createLibsqlQueue,
createRunApi,
createWorkerPool,
} from "@drover/runtime";
import { createLibsqlStorage } from "@drover/storage";
import { createNoneSandbox } from "@drover/sandbox";
const echo = defineAgent({
id: "echo",
systemPrompt: "Reply with {ok: true} as JSON.",
inputSchema: Type.Object({ msg: Type.String() }),
outputSchema: Type.Object({ ok: Type.Boolean() }),
model: "cheap",
tools: [],
quota: { maxTurns: 2 },
});
const queue = await createLibsqlQueue({ url: "file:./var/queue.db" });
const storage = await createLibsqlStorage({ url: "file:./var/runs.db" });
const registry = staticRegistry({ echo });
const sandboxFor = (): ReturnType<typeof createNoneSandbox> => createNoneSandbox();
// Worker side (long-running process)
const pool = createWorkerPool(
{ queue, storage, registry, sandboxFor },
{ concurrency: 4 },
);
pool.start();
// Producer side (HTTP handler, CLI, another agent)
const api = createRunApi({ queue, storage, registry, sandboxFor });
const job = await api.enqueue({
id: crypto.randomUUID(),
agentId: "echo",
input: { msg: "hi" },
});
// Wait inline if you want a synchronous-feeling call
const final = await api.waitFor(job.id, { timeoutMs: 60_000 });
console.log(final.run?.output);Atomic claim
createLibsqlQueue.claim uses one statement:
UPDATE queue_jobs
SET status = 'leased', lease_worker = ?, lease_expires_at = ?
WHERE id = (
SELECT id FROM queue_jobs
WHERE status = 'queued'
AND (scheduled_for IS NULL OR scheduled_for <= ?)
ORDER BY priority DESC, created_at ASC
LIMIT 1
)
RETURNING *;UPDATE ... RETURNING is atomic across workers — at most one worker
leases a given job. Point N pools at the same file:./var/queue.db and
they coordinate via the file lock. No application-side mutex.
Crash recovery
Workers heartbeat every leaseDurationMs / 3 (default 10s). If a worker
dies mid-job:
- Heartbeats stop.
lease_expires_atrolls into the past.- The pool’s
reclaimStaleloop runs everyreclaimIntervalMs(default 5s). Jobs whose lease has expired flip back tostatus = 'queued'withattempts += 1. - Another worker claims the next iteration.
maxAttempts (default 1, per-job) caps the recovery loop. After `attempts
= maxAttempts
, the job moves tofailed`.
Cancellation
await api.cancel(jobId);Effect:
queue.cancel(jobId)flipscancel_requested = 1.- If the job is still
queued, it’s markedcancelledimmediately. - If a worker is running it, the worker’s
cancelPollnotices on its next tick (≤ heartbeat interval) and aborts the inner run via theAbortControllerpropagated into pi-agent-core. - The worker calls
queue.fail(jobId, ..., retry=false)— no auto-retry on caller cancellations.
Heartbeat tuning
| knob | default | tradeoff |
|---|---|---|
leaseDurationMs | 30s | longer = fewer reclaims when workers slow down; shorter = faster crash recovery |
heartbeatIntervalMs | leaseDurationMs / 3 | shorter = more DB writes; longer = closer to lease expiry, risk of false reclaim |
reclaimIntervalMs | 5s | shorter = faster recovery; longer = lower polling overhead |
For a long-running agent (say a multi-minute build), bump leaseDurationMs
to comfortably exceed the longest expected pause inside the run.
Horizontal scale
Each worker pool is a Bun.spawn-able process. Multiple pools, multiple
machines, one libsql file (Turso for cross-machine):
createLibsqlQueue({ url: "libsql://your-org.turso.io", authToken: "..." });
createLibsqlStorage({ url: "libsql://your-org.turso.io", authToken: "..." });Pool A on machine 1 with concurrency: 8, pool B on machine 2 with
concurrency: 8 — 16 workers total, no app-side coordination needed.
What’s missing in v0
- HTTP wrappers (REST or RPC). The programmatic
RunApiis the public surface; an HTTP layer can wrap it without drover knowing. - SSE event streaming.
api.waitForpolls; for live event feeds today go throughstorage.listEvents(runId)after a poll. - Cron / scheduled triggers.
EnqueueInput.scheduledForis honoured by the queue (jobs aren’t claimable until then), but cron-style “every Monday” needs an external scheduler.
These are tractable to add — none of them require redesigning the queue / pool primitives.