Queue + worker

Run agents as jobs. Producer enqueues, worker pool drains, RunApi observes.

Producer-side enqueue is decoupled from consumer-side execution; the pool can run on a different machine.

Boot both sides

server.ts
ts
import { Type } from "@sinclair/typebox";
import { defineAgent } from "@drover/core";
import { staticRegistry } from "@drover/facade";
import {
  createLibsqlQueue,
  createRunApi,
  createWorkerPool,
} from "@drover/runtime";
import { createLibsqlStorage } from "@drover/storage";
import { createNoneSandbox } from "@drover/sandbox";

// 1. Define your agents
const echo = defineAgent({
  id: "echo",
  systemPrompt: "Reply with {received, ok: true} as JSON.",
  inputSchema: Type.Object({ n: Type.Integer() }),
  outputSchema: Type.Object({
    received: Type.Integer(),
    ok: Type.Boolean(),
  }),
  model: "cheap",
  tools: [],
  quota: { maxTurns: 2 },
});

// 2. Wire shared infra
const queue = await createLibsqlQueue({ url: "file:./var/queue.db" });
const storage = await createLibsqlStorage({ url: "file:./var/runs.db" });
const registry = staticRegistry({ echo });
const sandboxFor = () => createNoneSandbox();

// 3. Start workers
const pool = createWorkerPool(
  { queue, storage, registry, sandboxFor },
  { concurrency: 4, leaseDurationMs: 30_000 },
);
pool.start();

// 4. Expose the producer API (you'd wrap this in HTTP routes)
const api = createRunApi({ queue, storage, registry, sandboxFor });

export { api, pool };

Enqueue + wait

ts
const job = await api.enqueue({
  id: crypto.randomUUID(),
  agentId: "echo",
  input: { n: 42 },
  priority: 10,            // higher = sooner
  maxAttempts: 3,
});

console.log("queued:", job.id);

// Synchronous-feeling wait — polls under the hood
const final = await api.waitFor(job.id, { timeoutMs: 60_000 });
console.log("status:", final.job.status);     // "done"
console.log("output:", final.run?.output);     // { received: 42, ok: true }

Cancel mid-flight

ts
const job = await api.enqueue({ id, agentId: "echo", input: { n: 0 } });
// later, maybe from a different process pointed at the same DB:
await api.cancel(id);

const final = await api.waitFor(id);
console.log(final.job.status);  // "failed" — cancel marks failure (no retry)

Horizontal scale

Spin up multiple workers, point them at the same DB:

bash
# machine 1
DROVER_QUEUE_URL=libsql://your-org.turso.io DROVER_STORAGE_URL=... bun server.ts

# machine 2
DROVER_QUEUE_URL=libsql://your-org.turso.io DROVER_STORAGE_URL=... bun server.ts

Workers coordinate via libsql’s atomic UPDATE ... RETURNING. The reclaimStale loop on each pool recovers jobs from crashed siblings.

What the worker does per job

  1. queue.claim(workerId, leaseDurationMs) → atomic
  2. Look up agentId in the registry → AgentSpec
  3. runAgent(spec, job.input, { storage, sandbox, agentRegistry }) with a per-job AbortController wired to the queue’s cancel signal
  4. Heartbeat every leaseDurationMs/3 while in flight
  5. On completion (all lease-guarded — returns false and silently drops the result if the worker lost its lease via reclaimStale):
    • success → queue.complete(jobId, workerId, runId, status)
    • error → queue.fail(jobId, workerId, error, retryOnFailure) (re-queues if attempts < max)
    • cancelled → queue.fail(jobId, workerId, ..., retry=false)
  6. Loop.

Inspect with the eval viewer

bash
cd apps/eval-viewer
DROVER_STORAGE_URL=file:../../var/runs.db bun run dev

Navigate to #/storage for the run list, click into any run to see its event timeline.

Type to search…

↑↓ navigate open esc close