@drover/runtime
Worker pool + lease queue + programmatic RunApi.
QueueAdapter
interface QueueAdapter {
readonly id: string;
enqueue(input: EnqueueInput): Effect<QueueJob, QueueError>;
claim(workerId, leaseDurationMs): Effect<ClaimedJob | null, QueueError>;
heartbeat(jobId, workerId, leaseDurationMs): Effect<void, QueueError>;
// Lease-guarded. Returns `true` when the write applied; `false` when
// the caller no longer holds the lease (another worker took over via
// reclaim — drop the result, don't retry).
complete(jobId, workerId, runId, status): Effect<boolean, QueueError>;
fail(jobId, workerId, error, retry): Effect<boolean, QueueError>;
cancel(jobId): Effect<void, QueueError>;
cancelRequested(jobId): Effect<boolean, QueueError>;
reclaimStale(now): Effect<number, QueueError>;
get(jobId): Effect<QueueJob | null, QueueError>;
list(filter?): Effect<readonly QueueJob[], QueueError>;
close(): Effect<void, QueueError>;
}QueueJob
interface QueueJob {
id: string;
agentId: string;
input: unknown;
meta?: Readonly<Record<string, unknown>>;
priority: number;
scheduledFor: number | null;
status: "queued" | "leased" | "done" | "failed" | "cancelled";
leaseWorker: string | null;
leaseExpiresAt: number | null;
attempts: number;
maxAttempts: number;
createdAt: number;
runId: string | null;
resultStatus: RunStatus | null;
resultError: { tag: string; message: string } | null;
}createMemoryQueue / createLibsqlQueue
function createMemoryQueue(): QueueAdapter;
function createLibsqlQueue(opts: {
url: string;
authToken?: string;
migrationsTable?: string;
}): Promise<QueueAdapter>;libsql uses UPDATE ... RETURNING for atomic claim. Multiple processes
sharing the same DB coordinate via file locks.
createWorkerPool
function createWorkerPool(
deps: WorkerPoolDeps,
opts?: WorkerPoolOptions,
): WorkerPoolHandle;
interface WorkerPoolDeps {
queue: QueueAdapter;
storage: StorageAdapter;
registry: AgentRegistry;
sandboxFor: (job: QueueJob) => SandboxAdapter | Promise<SandboxAdapter>;
onEvent?: (job: QueueJob, event: HarnessEvent) => void;
plugins?: readonly HarnessPlugin[];
}
interface WorkerPoolOptions {
workerIdPrefix?: string;
concurrency?: number; // default 1
pollIntervalMs?: number; // default 200
leaseDurationMs?: number; // default 30_000
heartbeatIntervalMs?: number; // default leaseDurationMs/3
reclaimIntervalMs?: number; // default 5_000
retryOnFailure?: boolean; // default true
}
interface WorkerPoolHandle {
start(): void;
stop(): Promise<void>;
stats(): { running: number; processed: number; failed: number };
}Workers heartbeat at heartbeatIntervalMs while a run is in flight.
The pool also runs reclaimStale every reclaimIntervalMs to recover
expired leases from crashed workers.
stop() aborts in-flight runs and awaits worker drain.
createRunApi
function createRunApi(deps: RunApiDeps): RunApi;
interface RunApiDeps {
queue: QueueAdapter;
storage: StorageAdapter;
registry: AgentRegistry;
sandboxFor: (job: QueueJob) => SandboxAdapter | Promise<SandboxAdapter>;
// Must mirror your WorkerPoolDeps.plugins. They're hashed into the
// effective spec at run start; resume reuses the same hash. Omit
// here and a plugin-injected paused run won't resume.
plugins?: readonly HarnessPlugin[];
}
interface RunApi {
enqueue(input: EnqueueInput): Promise<QueueJob>;
get(jobId): Promise<{ job: QueueJob | null; run: RunRow | null }>;
cancel(jobId): Promise<void>;
resume(jobId): Promise<RunResult<unknown>>;
waitFor(jobId, opts?: { pollIntervalMs?: number; timeoutMs?: number }):
Promise<{ job: QueueJob; run: RunRow | null }>;
list(filter?): Promise<readonly QueueJob[]>;
}resume runs inline — caller awaits the RunResult. Pause/resume is
a control-plane operation, not a queue job.
waitFor polls; for low-latency UI use, query storage events
directly via your viewer.