Queue + worker
Run agents as jobs. Producer enqueues, worker pool drains, RunApi observes.
Producer-side enqueue is decoupled from consumer-side execution; the
pool can run on a different machine.
Boot both sides
server.ts
tsimport { Type } from "@sinclair/typebox";
import { defineAgent } from "@drover/core";
import { staticRegistry } from "@drover/facade";
import {
createLibsqlQueue,
createRunApi,
createWorkerPool,
} from "@drover/runtime";
import { createLibsqlStorage } from "@drover/storage";
import { createNoneSandbox } from "@drover/sandbox";
// 1. Define your agents
const echo = defineAgent({
id: "echo",
systemPrompt: "Reply with {received, ok: true} as JSON.",
inputSchema: Type.Object({ n: Type.Integer() }),
outputSchema: Type.Object({
received: Type.Integer(),
ok: Type.Boolean(),
}),
model: "cheap",
tools: [],
quota: { maxTurns: 2 },
});
// 2. Wire shared infra
const queue = await createLibsqlQueue({ url: "file:./var/queue.db" });
const storage = await createLibsqlStorage({ url: "file:./var/runs.db" });
const registry = staticRegistry({ echo });
const sandboxFor = () => createNoneSandbox();
// 3. Start workers
const pool = createWorkerPool(
{ queue, storage, registry, sandboxFor },
{ concurrency: 4, leaseDurationMs: 30_000 },
);
pool.start();
// 4. Expose the producer API (you'd wrap this in HTTP routes)
const api = createRunApi({ queue, storage, registry, sandboxFor });
export { api, pool };Enqueue + wait
const job = await api.enqueue({
id: crypto.randomUUID(),
agentId: "echo",
input: { n: 42 },
priority: 10, // higher = sooner
maxAttempts: 3,
});
console.log("queued:", job.id);
// Synchronous-feeling wait — polls under the hood
const final = await api.waitFor(job.id, { timeoutMs: 60_000 });
console.log("status:", final.job.status); // "done"
console.log("output:", final.run?.output); // { received: 42, ok: true }Cancel mid-flight
const job = await api.enqueue({ id, agentId: "echo", input: { n: 0 } });
// later, maybe from a different process pointed at the same DB:
await api.cancel(id);
const final = await api.waitFor(id);
console.log(final.job.status); // "failed" — cancel marks failure (no retry)Horizontal scale
Spin up multiple workers, point them at the same DB:
# machine 1
DROVER_QUEUE_URL=libsql://your-org.turso.io DROVER_STORAGE_URL=... bun server.ts
# machine 2
DROVER_QUEUE_URL=libsql://your-org.turso.io DROVER_STORAGE_URL=... bun server.tsWorkers coordinate via libsql’s atomic UPDATE ... RETURNING. The
reclaimStale loop on each pool recovers jobs from crashed siblings.
What the worker does per job
queue.claim(workerId, leaseDurationMs)→ atomic- Look up
agentIdin the registry →AgentSpec runAgent(spec, job.input, { storage, sandbox, agentRegistry })with a per-job AbortController wired to the queue’s cancel signal- Heartbeat every
leaseDurationMs/3while in flight - On completion (all lease-guarded — returns
falseand silently drops the result if the worker lost its lease viareclaimStale):- success →
queue.complete(jobId, workerId, runId, status) - error →
queue.fail(jobId, workerId, error, retryOnFailure)(re-queues if attempts < max) - cancelled →
queue.fail(jobId, workerId, ..., retry=false)
- success →
- Loop.
Inspect with the eval viewer
cd apps/eval-viewer
DROVER_STORAGE_URL=file:../../var/runs.db bun run devNavigate to #/storage for the run list, click into any run to see
its event timeline.