Skip to content

Extending Backends

If the built-in backends (memory, file, Cloudflare) don’t fit your use case, you can implement a custom backend by implementing the adapter interfaces and wrapping them with the provided factory functions.

Overview

A backend chains together three parts:

  1. Adapter interfaces — Your implementation of metadata and data storage (thin, single-method operations)
  2. Wrapper factoriescreate_metadata_client(), create_data_client() that add error handling, event emission, and Result wrapping
  3. Backend composition — Combine the clients into a Backend object

This layering minimizes boilerplate: you implement the bare storage logic; the wrappers handle error wrapping, deduplication, and event emission.

The Backend Interface

For reference, here’s what you’re assembling into:

type Backend = {
metadata: MetadataClient
data: DataClient
observations?: ObservationsClient
on_event?: EventHandler
apply_batch?: (ops: BatchOp[]) => Promise<Result<void, CorpusError>>
}

See Backend Types for the full MetadataClient and DataClient interfaces. The optional apply_batch hook is covered in Optional apply_batch hook below.

Implementing MetadataStorage

Metadata storage handles versioned snapshots (without data):

type MetadataStorage = {
get: (store_id: string, version: string) => Promise<SnapshotMeta | null>
put: (meta: SnapshotMeta) => Promise<void>
delete: (store_id: string, version: string) => Promise<void>
list: (store_id: string) => AsyncIterable<SnapshotMeta>
find_by_hash: (store_id: string, hash: string) => Promise<SnapshotMeta | null>
}

Contract

  • get: Return the metadata row, or null if not found. Don’t throw.
  • put: Store or update the metadata. Return void or throw if I/O fails.
  • delete: Remove metadata. No error if already absent.
  • list: Yield all metadata rows for a store (order doesn’t matter). No pagination—the wrapper handles that.
  • find_by_hash: Find any version with this content hash (used for deduplication). Return the first match or null.

Example: Postgres Metadata Adapter

import { SnapshotMeta } from '@f0rbit/corpus'
type MetadataStorage = {
get: (store_id: string, version: string) => Promise<SnapshotMeta | null>
put: (meta: SnapshotMeta) => Promise<void>
delete: (store_id: string, version: string) => Promise<void>
list: (store_id: string) => AsyncIterable<SnapshotMeta>
find_by_hash: (store_id: string, hash: string) => Promise<SnapshotMeta | null>
}
export function create_postgres_metadata_adapter(db: Pool): MetadataStorage {
return {
async get(store_id, version) {
const result = await db.query(
'SELECT * FROM corpus_snapshots WHERE store_id = $1 AND version = $2',
[store_id, version]
)
if (result.rows.length === 0) return null
return result.rows[0] as SnapshotMeta
},
async put(meta) {
await db.query(
`INSERT INTO corpus_snapshots (store_id, version, content_hash, data_key, created_at, size_bytes, parents)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (store_id, version) DO UPDATE SET ...`,
[meta.store_id, meta.version, meta.content_hash, meta.data_key, meta.created_at, meta.size_bytes, JSON.stringify(meta.parents)]
)
},
async delete(store_id, version) {
await db.query(
'DELETE FROM corpus_snapshots WHERE store_id = $1 AND version = $2',
[store_id, version]
)
},
async *list(store_id) {
const result = await db.query(
'SELECT * FROM corpus_snapshots WHERE store_id = $1 ORDER BY created_at DESC',
[store_id]
)
for (const row of result.rows) {
yield row as SnapshotMeta
}
},
async find_by_hash(store_id, hash) {
const result = await db.query(
'SELECT * FROM corpus_snapshots WHERE store_id = $1 AND content_hash = $2 LIMIT 1',
[store_id, hash]
)
if (result.rows.length === 0) return null
return result.rows[0] as SnapshotMeta
},
}
}

Implementing DataStorage

Data storage handles binary blob persistence. Since 0.4.0, get returns a lazy handle so backends with native streaming (R2, Bun.file()) can avoid buffering large objects:

type DataStorage = {
get: (data_key: string) => Promise<DataStorageHandle | null>
put: (data_key: string, data: Uint8Array) => Promise<void>
delete: (data_key: string) => Promise<void>
exists: (data_key: string) => Promise<boolean>
}
type DataStorageHandle = {
bytes: () => Promise<Uint8Array>
stream?: () => ReadableStream<Uint8Array>
size?: number
}

Contract

  • get: Return a DataStorageHandle, or null if not found. Don’t throw.
  • put: Store the bytes. Overwrite if already present. Throw on I/O failure.
  • delete: Remove the blob. No error if already absent.
  • exists: Return whether the key exists (used for deduplication checks).

The handle’s bytes() is mandatory; stream() is optional. If you don’t provide a native stream, create_data_client falls back to a one-chunk wrapper around bytes(). Implement stream() when the underlying storage supports it natively (R2 object.body, Bun.file().stream()).

Example Sketch: R2-Compatible Storage

export async function create_r2_data_adapter(bucket: R2Bucket): Promise<DataStorage> {
return {
async get(data_key) {
const obj = await bucket.get(data_key)
if (!obj) return null
return {
bytes: async () => new Uint8Array(await obj.arrayBuffer()),
stream: () => obj.body as ReadableStream<Uint8Array>,
size: obj.size,
}
},
async put(data_key, data) {
await bucket.put(data_key, data)
},
async delete(data_key) {
await bucket.delete(data_key)
},
async exists(data_key) {
const obj = await bucket.head(data_key)
return obj !== null
},
}
}

Migration: 0.4.0 — DataStorage.get returns DataStorageHandle

Before 0.4.0, DataStorage.get returned Promise<Uint8Array | null>. From 0.4.0 it returns Promise<DataStorageHandle | null>. This was driven by the new streaming read surface (Store.get_handle) — backends with native streams (R2, Bun.file().stream()) can now flow data through to consumers without buffering.

For backends that genuinely only have a bytes-getter, use the wrap_bytes_storage helper to keep the migration to a single line:

import { create_data_client, wrap_bytes_storage } from '@f0rbit/corpus/backend/base'
// before — pre-0.4.0
const data = create_data_client(my_bytes_storage, emit)
// after — 0.4.0
const data = create_data_client(wrap_bytes_storage(my_bytes_storage), emit)

wrap_bytes_storage produces a DataStorageHandle whose bytes() returns the stored bytes and whose stream() wraps them in a one-chunk ReadableStream. Existing semantics are preserved — bytes() callers see no change, and stream() callers get a single-chunk read instead of a true streaming read. Upgrade to a native stream() only when the underlying storage offers one.

The consumer-facing DataHandle (the shape returned by DataClient.get) is unchanged — only the adapter contract moved.

Wrapping Adapters into Clients

Use the factory functions from backend/base.ts to wrap adapters and get error handling + event emission for free:

import { create_metadata_client, create_data_client } from '@f0rbit/corpus/backend/base'
import type { Backend, EventHandler } from '@f0rbit/corpus'
export function create_custom_backend(
metaAdapter: MetadataStorage,
dataAdapter: DataStorage,
on_event?: EventHandler
): Backend {
const emit = on_event || (() => {})
return {
metadata: create_metadata_client(metaAdapter, emit),
data: create_data_client(dataAdapter, emit),
on_event,
}
}

What you get from the wrapper:

  • Error wrapping: Catches throws and converts them to Result<T, CorpusError> with kind: 'storage_error'
  • Event emission: Emits meta_get, meta_put, data_get, data_put, etc.
  • Deduplication tracking: deduplicated: true on events when content hash matches

Optional apply_batch hook

corpus.transaction() (see Transactions) buffers ops inside the body and submits them in one shot at commit time. Backends opt into atomic application by exporting an apply_batch method; backends that don’t fall back to a sequential best-effort path inside corpus.transaction().

Signature

type Backend = {
// ...
apply_batch?: (ops: BatchOp[]) => Promise<Result<void, CorpusError>>
}

BatchOp is a discriminated union covering every mutation a transaction can produce:

type BatchOp =
| { type: 'meta_put'; meta: SnapshotMeta }
| { type: 'meta_delete'; store_id: string; version: string }
| { type: 'data_put'; data_key: string; bytes: Uint8Array }
| { type: 'observation_put'; row: ObservationRow }
| { type: 'observation_delete'; id: string }

Ops arrive in the order the body emitted them. The implementation is free to reorder internally (e.g. write all data blobs before metadata) but the post-commit state must match a serial application of the original order.

Contract

apply_batch must be all-or-nothing from a reader’s perspective:

  • On success, return ok(undefined). Every op in the batch is visible to subsequent reads.
  • On failure, return err({ kind: 'transaction_aborted', reason: 'apply_batch_failed', cause }). No op in the batch should be visible to subsequent reads. If your storage cannot guarantee this (e.g. the operation is not natively transactional and you’ve already written some ops), surface partial_commit honestly with ops_completed / ops_failed counts:
return err({
kind: 'partial_commit',
ops_completed: 3,
ops_failed: 2,
cause,
})

Throwing from inside apply_batch is a contract violation — wrap external calls and translate failures to Result.

Fallback semantics when apply_batch is absent

corpus.transaction() runs each op against the live backend in order. On the first failure, it issues compensating deletes for every op already applied, in reverse order:

  • meta_putmetadata.delete
  • data_putdata.delete (best-effort; absence is fine because data is content-addressed)
  • observation_putobservations.delete
  • meta_delete / observation_deletenon-undoable — surfaces partial_commit if any have already applied

If a compensation also fails, the transaction returns partial_commit. Otherwise it returns transaction_aborted with reason: 'apply_batch_failed'.

This is intentionally best-effort. Backends with non-trivial atomicity needs — anything that touches multiple stores, observations, or external services — should ship apply_batch. The fallback is a safety net for simple custom backends; consumers that mix observations into transactions need a real apply_batch (the fallback returns invalid_config for observation_put / observation_delete ops to avoid silently diverging the buffered observation id from a re-minted server-side id).

Worked example: Postgres adapter

A Postgres-backed backend wrapping pg’s transaction helper:

import type { BatchOp, Backend, CorpusError, Result } from '@f0rbit/corpus'
import { ok, err } from '@f0rbit/corpus'
export function create_postgres_backend(pool: Pool): Backend {
// ...metadata, data, observations, on_event wired up as elsewhere...
async function apply_batch(ops: BatchOp[]): Promise<Result<void, CorpusError>> {
const client = await pool.connect()
try {
await client.query('BEGIN')
for (const op of ops) {
switch (op.type) {
case 'meta_put':
await client.query(meta_insert_sql, meta_insert_params(op.meta))
break
case 'meta_delete':
await client.query(meta_delete_sql, [op.store_id, op.version])
break
case 'data_put':
await client.query(data_insert_sql, [op.data_key, op.bytes])
break
case 'observation_put':
await client.query(obs_insert_sql, obs_insert_params(op.row))
break
case 'observation_delete':
await client.query(obs_delete_sql, [op.id])
break
}
}
await client.query('COMMIT')
return ok(undefined)
} catch (cause) {
await client.query('ROLLBACK').catch(() => {})
return err({
kind: 'transaction_aborted',
reason: 'apply_batch_failed',
cause: cause instanceof Error ? cause : new Error(String(cause)),
})
} finally {
client.release()
}
}
return { metadata, data, observations, on_event, apply_batch }
}

Postgres is the easy case: BEGIN / COMMIT / ROLLBACK give true atomicity for free. The harder cases — multi-bucket object storage, cross-database writes — generally don’t have that primitive and have to surface partial_commit honestly.

Observations Support (Optional)

Observations are stored separately from metadata/data. If you want to support observations, implement an observations adapter:

type ObservationsStorage = {
put_row: (row: ObservationRow) => Promise<Result<ObservationRow, CorpusError>>
get_row: (id: string) => Promise<Result<ObservationRow | null, CorpusError>>
query_rows: (opts?: StorageQueryOpts) => AsyncIterable<ObservationRow>
delete_row: (id: string) => Promise<Result<boolean, CorpusError>>
delete_by_source: (store_id: string, version: string, path?: string) => Promise<Result<number, CorpusError>>
}

Required Methods

You must implement all methods above. Here’s a minimal outline:

  • put_row: Store an observation row (with JSON content). Return the row.
  • get_row: Retrieve a row by ID, or null.
  • query_rows: Yield all rows matching filters (type, source_store, date ranges, etc.). Return as async iterable.
  • delete_row: Remove a row. Return true if deleted, false if not found.
  • delete_by_source: Remove all rows pointing to a specific source (store + version). Return count deleted.

Optional Optimizations

The wrapper (create_observations_storage) can fall back to per-row operations if you don’t provide optimized batch methods. However, providing these improves performance:

type ObservationsAdapter = ObservationsStorage & {
// Optional: batch query with filters (called by wrapper if present)
query_rows_optimized?: (opts: StorageQueryOpts) => AsyncIterable<ObservationRow>
// Optional: efficiently delete by source prefix (called if present)
delete_by_source_prefix?: (store_id: string, version_prefix: string) => Promise<Result<number, CorpusError>>
}

Wrapping into Client

import { create_observations_client, create_observations_storage } from '@f0rbit/corpus'
const storage = create_observations_storage(adapter)
const client = create_observations_client(storage, metadata_client)

Example: Full Custom Backend

Here’s a skeleton for a custom backend combining all pieces:

import { create_metadata_client, create_data_client } from '@f0rbit/corpus/backend/base'
import { create_observations_client, create_observations_storage } from '@f0rbit/corpus'
import type { Backend, MetadataStorage, DataStorage, EventHandler } from '@f0rbit/corpus'
import type { ObservationsStorage } from '@f0rbit/corpus'
export interface CustomBackendConfig {
metaAdapter: MetadataStorage
dataAdapter: DataStorage
obsAdapter?: ObservationsStorage
on_event?: EventHandler
}
export function create_custom_backend(config: CustomBackendConfig): Backend {
const emit = config.on_event || (() => {})
return {
metadata: create_metadata_client(config.metaAdapter, emit),
data: create_data_client(config.dataAdapter, emit),
observations: config.obsAdapter
? create_observations_client(
create_observations_storage(config.obsAdapter),
create_metadata_client(config.metaAdapter, emit)
)
: undefined,
on_event: config.on_event,
}
}

Error Handling

Adapters should:

  • Not throw for not-found cases; return null instead
  • Throw on I/O failures (network, disk, permissions); the wrapper converts to Result with kind: 'storage_error'
  • Keep error messages descriptive for debugging

The wrapper automatically logs errors via the on_event handler if provided.

Testing Custom Backends

Use the integration test suite to validate your backend:

import { runBackendContractTests } from '@f0rbit/corpus'
describe('custom backend', () => {
runBackendContractTests(
'my-custom-backend',
async () => create_custom_backend(config),
async (backend) => {
// Cleanup function (optional)
await cleanup()
}
)
})

See Also