Extending Backends
If the built-in backends (memory, file, Cloudflare) don’t fit your use case, you can implement a custom backend by implementing the adapter interfaces and wrapping them with the provided factory functions.
Overview
A backend chains together three parts:
- Adapter interfaces — Your implementation of metadata and data storage (thin, single-method operations)
- Wrapper factories —
create_metadata_client(),create_data_client()that add error handling, event emission, and Result wrapping - Backend composition — Combine the clients into a
Backendobject
This layering minimizes boilerplate: you implement the bare storage logic; the wrappers handle error wrapping, deduplication, and event emission.
The Backend Interface
For reference, here’s what you’re assembling into:
type Backend = { metadata: MetadataClient data: DataClient observations?: ObservationsClient on_event?: EventHandler apply_batch?: (ops: BatchOp[]) => Promise<Result<void, CorpusError>>}See Backend Types for the full MetadataClient and DataClient interfaces. The optional apply_batch hook is covered in Optional apply_batch hook below.
Implementing MetadataStorage
Metadata storage handles versioned snapshots (without data):
type MetadataStorage = { get: (store_id: string, version: string) => Promise<SnapshotMeta | null> put: (meta: SnapshotMeta) => Promise<void> delete: (store_id: string, version: string) => Promise<void> list: (store_id: string) => AsyncIterable<SnapshotMeta> find_by_hash: (store_id: string, hash: string) => Promise<SnapshotMeta | null>}Contract
- get: Return the metadata row, or
nullif not found. Don’t throw. - put: Store or update the metadata. Return
voidor throw if I/O fails. - delete: Remove metadata. No error if already absent.
- list: Yield all metadata rows for a store (order doesn’t matter). No pagination—the wrapper handles that.
- find_by_hash: Find any version with this content hash (used for deduplication). Return the first match or
null.
Example: Postgres Metadata Adapter
import { SnapshotMeta } from '@f0rbit/corpus'
type MetadataStorage = { get: (store_id: string, version: string) => Promise<SnapshotMeta | null> put: (meta: SnapshotMeta) => Promise<void> delete: (store_id: string, version: string) => Promise<void> list: (store_id: string) => AsyncIterable<SnapshotMeta> find_by_hash: (store_id: string, hash: string) => Promise<SnapshotMeta | null>}
export function create_postgres_metadata_adapter(db: Pool): MetadataStorage { return { async get(store_id, version) { const result = await db.query( 'SELECT * FROM corpus_snapshots WHERE store_id = $1 AND version = $2', [store_id, version] ) if (result.rows.length === 0) return null return result.rows[0] as SnapshotMeta },
async put(meta) { await db.query( `INSERT INTO corpus_snapshots (store_id, version, content_hash, data_key, created_at, size_bytes, parents) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (store_id, version) DO UPDATE SET ...`, [meta.store_id, meta.version, meta.content_hash, meta.data_key, meta.created_at, meta.size_bytes, JSON.stringify(meta.parents)] ) },
async delete(store_id, version) { await db.query( 'DELETE FROM corpus_snapshots WHERE store_id = $1 AND version = $2', [store_id, version] ) },
async *list(store_id) { const result = await db.query( 'SELECT * FROM corpus_snapshots WHERE store_id = $1 ORDER BY created_at DESC', [store_id] ) for (const row of result.rows) { yield row as SnapshotMeta } },
async find_by_hash(store_id, hash) { const result = await db.query( 'SELECT * FROM corpus_snapshots WHERE store_id = $1 AND content_hash = $2 LIMIT 1', [store_id, hash] ) if (result.rows.length === 0) return null return result.rows[0] as SnapshotMeta }, }}Implementing DataStorage
Data storage handles binary blob persistence. Since 0.4.0, get returns a lazy handle so backends with native streaming (R2, Bun.file()) can avoid buffering large objects:
type DataStorage = { get: (data_key: string) => Promise<DataStorageHandle | null> put: (data_key: string, data: Uint8Array) => Promise<void> delete: (data_key: string) => Promise<void> exists: (data_key: string) => Promise<boolean>}
type DataStorageHandle = { bytes: () => Promise<Uint8Array> stream?: () => ReadableStream<Uint8Array> size?: number}Contract
- get: Return a
DataStorageHandle, ornullif not found. Don’t throw. - put: Store the bytes. Overwrite if already present. Throw on I/O failure.
- delete: Remove the blob. No error if already absent.
- exists: Return whether the key exists (used for deduplication checks).
The handle’s bytes() is mandatory; stream() is optional. If you don’t provide a native stream, create_data_client falls back to a one-chunk wrapper around bytes(). Implement stream() when the underlying storage supports it natively (R2 object.body, Bun.file().stream()).
Example Sketch: R2-Compatible Storage
export async function create_r2_data_adapter(bucket: R2Bucket): Promise<DataStorage> { return { async get(data_key) { const obj = await bucket.get(data_key) if (!obj) return null return { bytes: async () => new Uint8Array(await obj.arrayBuffer()), stream: () => obj.body as ReadableStream<Uint8Array>, size: obj.size, } },
async put(data_key, data) { await bucket.put(data_key, data) },
async delete(data_key) { await bucket.delete(data_key) },
async exists(data_key) { const obj = await bucket.head(data_key) return obj !== null }, }}Migration: 0.4.0 — DataStorage.get returns DataStorageHandle
Before 0.4.0, DataStorage.get returned Promise<Uint8Array | null>. From 0.4.0 it returns Promise<DataStorageHandle | null>. This was driven by the new streaming read surface (Store.get_handle) — backends with native streams (R2, Bun.file().stream()) can now flow data through to consumers without buffering.
For backends that genuinely only have a bytes-getter, use the wrap_bytes_storage helper to keep the migration to a single line:
import { create_data_client, wrap_bytes_storage } from '@f0rbit/corpus/backend/base'
// before — pre-0.4.0const data = create_data_client(my_bytes_storage, emit)
// after — 0.4.0const data = create_data_client(wrap_bytes_storage(my_bytes_storage), emit)wrap_bytes_storage produces a DataStorageHandle whose bytes() returns the stored bytes and whose stream() wraps them in a one-chunk ReadableStream. Existing semantics are preserved — bytes() callers see no change, and stream() callers get a single-chunk read instead of a true streaming read. Upgrade to a native stream() only when the underlying storage offers one.
The consumer-facing DataHandle (the shape returned by DataClient.get) is unchanged — only the adapter contract moved.
Wrapping Adapters into Clients
Use the factory functions from backend/base.ts to wrap adapters and get error handling + event emission for free:
import { create_metadata_client, create_data_client } from '@f0rbit/corpus/backend/base'import type { Backend, EventHandler } from '@f0rbit/corpus'
export function create_custom_backend( metaAdapter: MetadataStorage, dataAdapter: DataStorage, on_event?: EventHandler): Backend { const emit = on_event || (() => {})
return { metadata: create_metadata_client(metaAdapter, emit), data: create_data_client(dataAdapter, emit), on_event, }}What you get from the wrapper:
- Error wrapping: Catches throws and converts them to
Result<T, CorpusError>withkind: 'storage_error' - Event emission: Emits
meta_get,meta_put,data_get,data_put, etc. - Deduplication tracking:
deduplicated: trueon events when content hash matches
Optional apply_batch hook
corpus.transaction() (see Transactions) buffers ops inside the body and submits them in one shot at commit time. Backends opt into atomic application by exporting an apply_batch method; backends that don’t fall back to a sequential best-effort path inside corpus.transaction().
Signature
type Backend = { // ... apply_batch?: (ops: BatchOp[]) => Promise<Result<void, CorpusError>>}BatchOp is a discriminated union covering every mutation a transaction can produce:
type BatchOp = | { type: 'meta_put'; meta: SnapshotMeta } | { type: 'meta_delete'; store_id: string; version: string } | { type: 'data_put'; data_key: string; bytes: Uint8Array } | { type: 'observation_put'; row: ObservationRow } | { type: 'observation_delete'; id: string }Ops arrive in the order the body emitted them. The implementation is free to reorder internally (e.g. write all data blobs before metadata) but the post-commit state must match a serial application of the original order.
Contract
apply_batch must be all-or-nothing from a reader’s perspective:
- On success, return
ok(undefined). Every op in the batch is visible to subsequent reads. - On failure, return
err({ kind: 'transaction_aborted', reason: 'apply_batch_failed', cause }). No op in the batch should be visible to subsequent reads. If your storage cannot guarantee this (e.g. the operation is not natively transactional and you’ve already written some ops), surfacepartial_commithonestly withops_completed/ops_failedcounts:
return err({ kind: 'partial_commit', ops_completed: 3, ops_failed: 2, cause,})Throwing from inside apply_batch is a contract violation — wrap external calls and translate failures to Result.
Fallback semantics when apply_batch is absent
corpus.transaction() runs each op against the live backend in order. On the first failure, it issues compensating deletes for every op already applied, in reverse order:
meta_put→metadata.deletedata_put→data.delete(best-effort; absence is fine because data is content-addressed)observation_put→observations.deletemeta_delete/observation_delete→ non-undoable — surfacespartial_commitif any have already applied
If a compensation also fails, the transaction returns partial_commit. Otherwise it returns transaction_aborted with reason: 'apply_batch_failed'.
This is intentionally best-effort. Backends with non-trivial atomicity needs — anything that touches multiple stores, observations, or external services — should ship apply_batch. The fallback is a safety net for simple custom backends; consumers that mix observations into transactions need a real apply_batch (the fallback returns invalid_config for observation_put / observation_delete ops to avoid silently diverging the buffered observation id from a re-minted server-side id).
Worked example: Postgres adapter
A Postgres-backed backend wrapping pg’s transaction helper:
import type { BatchOp, Backend, CorpusError, Result } from '@f0rbit/corpus'import { ok, err } from '@f0rbit/corpus'
export function create_postgres_backend(pool: Pool): Backend { // ...metadata, data, observations, on_event wired up as elsewhere...
async function apply_batch(ops: BatchOp[]): Promise<Result<void, CorpusError>> { const client = await pool.connect() try { await client.query('BEGIN')
for (const op of ops) { switch (op.type) { case 'meta_put': await client.query(meta_insert_sql, meta_insert_params(op.meta)) break case 'meta_delete': await client.query(meta_delete_sql, [op.store_id, op.version]) break case 'data_put': await client.query(data_insert_sql, [op.data_key, op.bytes]) break case 'observation_put': await client.query(obs_insert_sql, obs_insert_params(op.row)) break case 'observation_delete': await client.query(obs_delete_sql, [op.id]) break } }
await client.query('COMMIT') return ok(undefined) } catch (cause) { await client.query('ROLLBACK').catch(() => {}) return err({ kind: 'transaction_aborted', reason: 'apply_batch_failed', cause: cause instanceof Error ? cause : new Error(String(cause)), }) } finally { client.release() } }
return { metadata, data, observations, on_event, apply_batch }}Postgres is the easy case: BEGIN / COMMIT / ROLLBACK give true atomicity for free. The harder cases — multi-bucket object storage, cross-database writes — generally don’t have that primitive and have to surface partial_commit honestly.
Observations Support (Optional)
Observations are stored separately from metadata/data. If you want to support observations, implement an observations adapter:
type ObservationsStorage = { put_row: (row: ObservationRow) => Promise<Result<ObservationRow, CorpusError>> get_row: (id: string) => Promise<Result<ObservationRow | null, CorpusError>> query_rows: (opts?: StorageQueryOpts) => AsyncIterable<ObservationRow> delete_row: (id: string) => Promise<Result<boolean, CorpusError>> delete_by_source: (store_id: string, version: string, path?: string) => Promise<Result<number, CorpusError>>}Required Methods
You must implement all methods above. Here’s a minimal outline:
- put_row: Store an observation row (with JSON content). Return the row.
- get_row: Retrieve a row by ID, or
null. - query_rows: Yield all rows matching filters (type, source_store, date ranges, etc.). Return as async iterable.
- delete_row: Remove a row. Return
trueif deleted,falseif not found. - delete_by_source: Remove all rows pointing to a specific source (store + version). Return count deleted.
Optional Optimizations
The wrapper (create_observations_storage) can fall back to per-row operations if you don’t provide optimized batch methods. However, providing these improves performance:
type ObservationsAdapter = ObservationsStorage & { // Optional: batch query with filters (called by wrapper if present) query_rows_optimized?: (opts: StorageQueryOpts) => AsyncIterable<ObservationRow>
// Optional: efficiently delete by source prefix (called if present) delete_by_source_prefix?: (store_id: string, version_prefix: string) => Promise<Result<number, CorpusError>>}Wrapping into Client
import { create_observations_client, create_observations_storage } from '@f0rbit/corpus'
const storage = create_observations_storage(adapter)const client = create_observations_client(storage, metadata_client)Example: Full Custom Backend
Here’s a skeleton for a custom backend combining all pieces:
import { create_metadata_client, create_data_client } from '@f0rbit/corpus/backend/base'import { create_observations_client, create_observations_storage } from '@f0rbit/corpus'import type { Backend, MetadataStorage, DataStorage, EventHandler } from '@f0rbit/corpus'import type { ObservationsStorage } from '@f0rbit/corpus'
export interface CustomBackendConfig { metaAdapter: MetadataStorage dataAdapter: DataStorage obsAdapter?: ObservationsStorage on_event?: EventHandler}
export function create_custom_backend(config: CustomBackendConfig): Backend { const emit = config.on_event || (() => {})
return { metadata: create_metadata_client(config.metaAdapter, emit), data: create_data_client(config.dataAdapter, emit), observations: config.obsAdapter ? create_observations_client( create_observations_storage(config.obsAdapter), create_metadata_client(config.metaAdapter, emit) ) : undefined, on_event: config.on_event, }}Error Handling
Adapters should:
- Not throw for not-found cases; return
nullinstead - Throw on I/O failures (network, disk, permissions); the wrapper converts to
Resultwithkind: 'storage_error' - Keep error messages descriptive for debugging
The wrapper automatically logs errors via the on_event handler if provided.
Testing Custom Backends
Use the integration test suite to validate your backend:
import { runBackendContractTests } from '@f0rbit/corpus'
describe('custom backend', () => { runBackendContractTests( 'my-custom-backend', async () => create_custom_backend(config), async (backend) => { // Cleanup function (optional) await cleanup() } )})See Also
- Backend Types — Full interface definitions
- Memory Backend — Simple reference implementation
- File Backend — Complete production implementation