@drover/storage

StorageAdapter interface + memory + libsql impls.

StorageAdapter

ts
interface StorageAdapter {
  readonly id: string;
  createRun(row: RunRow): Effect<void, StorageError>;
  updateRun(id, patch): Effect<void, StorageError>;
  appendEvent(event: EventRow): Effect<void, StorageError>;
  saveCheckpoint(cp: CheckpointRow): Effect<void, StorageError>;
  loadRun(id): Effect<RunRow | null, StorageError>;
  loadLatestCheckpoint(runId): Effect<CheckpointRow | null, StorageError>;
  listEvents(runId): Effect<readonly EventRow[], StorageError>;
  listRuns(filter?): Effect<readonly RunRow[], StorageError>;
  createPendingConfirmation(row): Effect<void, StorageError>;
  resolvePendingConfirmation(runId, toolUseId, result, resolvedAt): Effect<void, StorageError>;
  close(): Effect<void, StorageError>;
}

Row shapes

ts
interface RunRow {
  id: string;
  parentRunId?: string;
  agentId: string;
  specHash: string;
  status: "running" | "success" | "quota" | "cancelled" | "error" | "paused";
  input: unknown;
  output?: unknown;
  error?: { tag: string; message: string };
  startedAt: number;
  endedAt?: number;
  tokensIn: number;
  tokensOut: number;
  costUsd: number;
  meta?: Record<string, unknown>;
}

interface EventRow {
  runId: string;
  seq: number;          // monotonic per-run
  ts: number;
  kind: HarnessEvent["kind"];
  payload: HarnessEvent;
}

interface CheckpointRow {
  runId: string;
  seq: number;
  messages: unknown;    // pi-agent-core AgentMessage[]
  usage: Usage;
  toolCalls: readonly string[];
  retriesUsed: number;
  ts: number;
}

createMemoryStorage

ts
function createMemoryStorage(): StorageAdapter;

Map-backed, single-process. For tests + short-lived processes.

createLibsqlStorage

ts
function createLibsqlStorage(opts: {
  url: string;          // ":memory:" | "file:..." | "libsql://..."
  authToken?: string;
}): Promise<StorageAdapter>;

Migrations run idempotently on the first read/write. Tables: runs, run_events, run_checkpoints, pending_confirmations, plus a drover_migrations bookkeeping table.

RunListFilter

ts
interface RunListFilter {
  status?: readonly RunRow["status"][];
  agentId?: string;
  parentRunId?: string | null;  // null = no parent (root runs only)
  startedAfter?: number;
  limit?: number;
  offset?: number;
}

Concurrency contract

  • appendEvent is per-run monotonic by seq. Implementations must preserve order within a run.
  • Concurrent runs are isolated by runId.
  • loadLatestCheckpoint(runId) returns the highest seq for that run.
  • createPendingConfirmation / resolvePendingConfirmation are forward-compat for confirm-gate-as-control-plane. v0 implementations can no-op them.

Type to search…

↑↓ navigate open esc close