Runtime

Worker pool + lease queue + RunApi. Turns drover into a server.

@drover/runtime is the opt-in piece that adds a durable job queue, worker pool with crash recovery, and a programmatic API. Without it drover is a library; with it, drover hosts a durable agent-runner.

Pieces

piecepurpose
QueueAdapterenqueue / claim / heartbeat / complete / fail / cancel / reclaimStale
createMemoryQueuein-process, single-instance, tests
createLibsqlQueuedurable, multi-process, production
WorkerPoolN workers polling the queue, executing runs via runAgent, heartbeating, recovering crashes
RunApiprogrammatic enqueue / get / cancel / resume / waitFor

Hello, runtime

ts
import { Type } from "@sinclair/typebox";
import { defineAgent } from "@drover/core";
import { staticRegistry } from "@drover/facade";
import {
  createLibsqlQueue,
  createRunApi,
  createWorkerPool,
} from "@drover/runtime";
import { createLibsqlStorage } from "@drover/storage";
import { createNoneSandbox } from "@drover/sandbox";

const echo = defineAgent({
  id: "echo",
  systemPrompt: "Reply with {ok: true} as JSON.",
  inputSchema: Type.Object({ msg: Type.String() }),
  outputSchema: Type.Object({ ok: Type.Boolean() }),
  model: "cheap",
  tools: [],
  quota: { maxTurns: 2 },
});

const queue = await createLibsqlQueue({ url: "file:./var/queue.db" });
const storage = await createLibsqlStorage({ url: "file:./var/runs.db" });
const registry = staticRegistry({ echo });
const sandboxFor = (): ReturnType<typeof createNoneSandbox> => createNoneSandbox();

// Worker side (long-running process)
const pool = createWorkerPool(
  { queue, storage, registry, sandboxFor },
  { concurrency: 4 },
);
pool.start();

// Producer side (HTTP handler, CLI, another agent)
const api = createRunApi({ queue, storage, registry, sandboxFor });
const job = await api.enqueue({
  id: crypto.randomUUID(),
  agentId: "echo",
  input: { msg: "hi" },
});

// Wait inline if you want a synchronous-feeling call
const final = await api.waitFor(job.id, { timeoutMs: 60_000 });
console.log(final.run?.output);

Atomic claim

createLibsqlQueue.claim uses one statement:

sql
UPDATE queue_jobs
SET status = 'leased', lease_worker = ?, lease_expires_at = ?
WHERE id = (
  SELECT id FROM queue_jobs
    WHERE status = 'queued'
      AND (scheduled_for IS NULL OR scheduled_for <= ?)
    ORDER BY priority DESC, created_at ASC
    LIMIT 1
)
RETURNING *;

UPDATE ... RETURNING is atomic across workers — at most one worker leases a given job. Point N pools at the same file:./var/queue.db and they coordinate via the file lock. No application-side mutex.

Crash recovery

Workers heartbeat every leaseDurationMs / 3 (default 10s). If a worker dies mid-job:

  1. Heartbeats stop.
  2. lease_expires_at rolls into the past.
  3. The pool’s reclaimStale loop runs every reclaimIntervalMs (default 5s). Jobs whose lease has expired flip back to status = 'queued' with attempts += 1.
  4. Another worker claims the next iteration.

maxAttempts (default 1, per-job) caps the recovery loop. After `attempts

= maxAttempts, the job moves to failed`.

Cancellation

ts
await api.cancel(jobId);

Effect:

  • queue.cancel(jobId) flips cancel_requested = 1.
  • If the job is still queued, it’s marked cancelled immediately.
  • If a worker is running it, the worker’s cancelPoll notices on its next tick (≤ heartbeat interval) and aborts the inner run via the AbortController propagated into pi-agent-core.
  • The worker calls queue.fail(jobId, ..., retry=false) — no auto-retry on caller cancellations.

Heartbeat tuning

knobdefaulttradeoff
leaseDurationMs30slonger = fewer reclaims when workers slow down; shorter = faster crash recovery
heartbeatIntervalMsleaseDurationMs / 3shorter = more DB writes; longer = closer to lease expiry, risk of false reclaim
reclaimIntervalMs5sshorter = faster recovery; longer = lower polling overhead

For a long-running agent (say a multi-minute build), bump leaseDurationMs to comfortably exceed the longest expected pause inside the run.

Horizontal scale

Each worker pool is a Bun.spawn-able process. Multiple pools, multiple machines, one libsql file (Turso for cross-machine):

ts
createLibsqlQueue({ url: "libsql://your-org.turso.io", authToken: "..." });
createLibsqlStorage({ url: "libsql://your-org.turso.io", authToken: "..." });

Pool A on machine 1 with concurrency: 8, pool B on machine 2 with concurrency: 8 — 16 workers total, no app-side coordination needed.

What’s missing in v0

  • HTTP wrappers (REST or RPC). The programmatic RunApi is the public surface; an HTTP layer can wrap it without drover knowing.
  • SSE event streaming. api.waitFor polls; for live event feeds today go through storage.listEvents(runId) after a poll.
  • Cron / scheduled triggers. EnqueueInput.scheduledFor is honoured by the queue (jobs aren’t claimable until then), but cron-style “every Monday” needs an external scheduler.

These are tractable to add — none of them require redesigning the queue / pool primitives.

Type to search…

↑↓ navigate open esc close