URL: /drover/reference/runtime

---
title: "@drover/runtime"
description: Worker pool + lease queue + programmatic RunApi.
---

## `QueueAdapter`

```ts
interface QueueAdapter {
  readonly id: string;
  enqueue(input: EnqueueInput): Effect<QueueJob, QueueError>;
  claim(workerId, leaseDurationMs): Effect<ClaimedJob | null, QueueError>;
  heartbeat(jobId, workerId, leaseDurationMs): Effect<void, QueueError>;
  // Lease-guarded. Returns `true` when the write applied; `false` when
  // the caller no longer holds the lease (another worker took over via
  // reclaim — drop the result, don't retry).
  complete(jobId, workerId, runId, status): Effect<boolean, QueueError>;
  fail(jobId, workerId, error, retry): Effect<boolean, QueueError>;
  cancel(jobId): Effect<void, QueueError>;
  cancelRequested(jobId): Effect<boolean, QueueError>;
  reclaimStale(now): Effect<number, QueueError>;
  get(jobId): Effect<QueueJob | null, QueueError>;
  list(filter?): Effect<readonly QueueJob[], QueueError>;
  close(): Effect<void, QueueError>;
}
```

## `QueueJob`

```ts
interface QueueJob {
  id: string;
  agentId: string;
  input: unknown;
  meta?: Readonly<Record<string, unknown>>;
  priority: number;
  scheduledFor: number | null;
  status: "queued" | "leased" | "done" | "failed" | "cancelled";
  leaseWorker: string | null;
  leaseExpiresAt: number | null;
  attempts: number;
  maxAttempts: number;
  createdAt: number;
  runId: string | null;
  resultStatus: RunStatus | null;
  resultError: { tag: string; message: string } | null;
}
```

## `createMemoryQueue` / `createLibsqlQueue`

```ts
function createMemoryQueue(): QueueAdapter;
function createLibsqlQueue(opts: {
  url: string;
  authToken?: string;
  migrationsTable?: string;
}): Promise<QueueAdapter>;
```

libsql uses `UPDATE ... RETURNING` for atomic claim. Multiple processes
sharing the same DB coordinate via file locks.

## `createWorkerPool`

```ts
function createWorkerPool(
  deps: WorkerPoolDeps,
  opts?: WorkerPoolOptions,
): WorkerPoolHandle;

interface WorkerPoolDeps {
  queue: QueueAdapter;
  storage: StorageAdapter;
  registry: AgentRegistry;
  sandboxFor: (job: QueueJob) => SandboxAdapter | Promise<SandboxAdapter>;
  onEvent?: (job: QueueJob, event: HarnessEvent) => void;
  plugins?: readonly HarnessPlugin[];
}

interface WorkerPoolOptions {
  workerIdPrefix?: string;
  concurrency?: number;            // default 1
  pollIntervalMs?: number;         // default 200
  leaseDurationMs?: number;        // default 30_000
  heartbeatIntervalMs?: number;    // default leaseDurationMs/3
  reclaimIntervalMs?: number;      // default 5_000
  retryOnFailure?: boolean;        // default true
}

interface WorkerPoolHandle {
  start(): void;
  stop(): Promise<void>;
  stats(): { running: number; processed: number; failed: number };
}
```

Workers heartbeat at `heartbeatIntervalMs` while a run is in flight.
The pool also runs `reclaimStale` every `reclaimIntervalMs` to recover
expired leases from crashed workers.

`stop()` aborts in-flight runs and awaits worker drain.

## `createRunApi`

```ts
function createRunApi(deps: RunApiDeps): RunApi;

interface RunApiDeps {
  queue: QueueAdapter;
  storage: StorageAdapter;
  registry: AgentRegistry;
  sandboxFor: (job: QueueJob) => SandboxAdapter | Promise<SandboxAdapter>;
  // Must mirror your WorkerPoolDeps.plugins. They're hashed into the
  // effective spec at run start; resume reuses the same hash. Omit
  // here and a plugin-injected paused run won't resume.
  plugins?: readonly HarnessPlugin[];
}

interface RunApi {
  enqueue(input: EnqueueInput): Promise<QueueJob>;
  get(jobId): Promise<{ job: QueueJob | null; run: RunRow | null }>;
  cancel(jobId): Promise<void>;
  resume(jobId): Promise<RunResult<unknown>>;
  waitFor(jobId, opts?: { pollIntervalMs?: number; timeoutMs?: number }):
    Promise<{ job: QueueJob; run: RunRow | null }>;
  list(filter?): Promise<readonly QueueJob[]>;
}
```

`resume` runs inline — caller awaits the `RunResult`. Pause/resume is
a control-plane operation, not a queue job.

`waitFor` polls; for low-latency UI use, query storage events
directly via your viewer.
