Transactions
corpus.transaction(async tx => …) runs a callback whose mutations are buffered and committed together — atomically when the backend supports it, sequentially with best-effort rollback when it does not.
Why transactions
Single-store writes are well-served by store.put(). Each call is content-addressed and idempotent on the data side, so retries are safe; metadata writes go through one backend call.
What that primitive doesn’t cover is linkage between stores, where two writes have to land or neither should:
- A
timelineevent plus atimeline_indexentry that references the event’s version. - A
documentssnapshot plus anobservationannotating it.
If the second write fails after the first commits, downstream readers see inconsistent state — an index that lags real events, or an observation whose source resolves to not_found. corpus.transaction() is the primitive that closes that gap.
API
const result = await corpus.transaction(async (tx) => { const event = await tx.put(corpus.stores.timeline, payload, { tags: ['evt'] }) if (!event.ok) return event
const idx = await tx.put(corpus.stores.timeline_index, build_index(payload), { parents: [{ store_id: 'timeline', version: event.value.version, role: 'source' }], }) if (!idx.ok) return idx
return ok({ event_version: event.value.version })})
if (!result.ok) { // result.error.kind: 'transaction_aborted' | 'partial_commit' | 'invalid_config' | ... return result}
// result.value: { value: { event_version: string }, commits: SnapshotMeta[], observations: Observation[] }The handle exposes:
type TransactionHandle = { put: <T>(store: Store<T>, data: T, opts?: PutOpts) => Promise<Result<SnapshotMeta, CorpusError>> get: <T>(store: Store<T>, version: string) => Promise<Result<Snapshot<T>, CorpusError>> delete: <T>(store: Store<T>, version: string) => Promise<Result<void, CorpusError>> observe: <T>(type: ObservationTypeDef<T>, opts: ObservationPutOpts<T>) => Promise<Result<Observation<T>, CorpusError>> observation_delete: (id: string) => Promise<Result<void, CorpusError>>}The body must return a Result. Returning err() aborts the transaction with transaction_aborted (reason returned_err). A thrown exception is caught and wrapped as transaction_aborted (reason threw). Any successful body return triggers a commit phase against the backend.
The transaction result wraps the body’s return value alongside an ordered log of writes:
type TransactionResult<R> = { value: R // whatever the body returned via ok(...) commits: SnapshotMeta[] // every tx.put, in put-order observations: Observation[] // every tx.observe, in observe-order}Read-your-writes
tx.get() checks the in-tx buffer before falling through to the backend. A tx.put followed by a tx.get for that version returns the buffered snapshot without a backend round-trip.
tx.delete writes a tombstone into the buffer, so a subsequent tx.get for that same (store_id, version) returns not_found even if the version is still committed in the live backend (the tombstone wins until the tx commits or aborts). This keeps in-tx semantics consistent with what the post-commit world would look like.
Reads from outside the transaction see no in-flight state — they always read the live backend.
Failure modes
Three error kinds are specific to transactions:
| Kind | When it fires |
|---|---|
transaction_aborted | Body returned err(), threw, or the backend’s apply_batch rejected the commit. The reason field discriminates: 'returned_err', 'threw', or 'apply_batch_failed'. |
partial_commit | Sequential fallback path could not fully roll back after a mid-batch failure. Only fires for backends without apply_batch, or when a non-undoable op (e.g. meta_delete) was already applied. |
concurrent_modification | Reserved for forward-compatibility. Currently no built-in backend produces this — unique-constraint violations on Cloudflare D1 surface as transaction_aborted with reason: 'apply_batch_failed'. Documented so consumer switches stay exhaustive once it lands. |
Switch on reason to react to the abort cause:
const result = await corpus.transaction(async (tx) => { // ...})
if (!result.ok) { switch (result.error.kind) { case 'transaction_aborted': switch (result.error.reason) { case 'returned_err': // body chose to abort — inspect cause for the original CorpusError break case 'threw': // body threw — cause is the original Error break case 'apply_batch_failed': // commit phase rejected — backend-specific cause break } break case 'partial_commit': console.error( `partial commit: ${result.error.ops_completed} of ${result.error.ops_completed + result.error.ops_failed} committed`, ) break case 'invalid_config': // e.g. nested transaction, unregistered store break }}Per-backend guarantees
| Backend | Atomicity | Notes |
|---|---|---|
| Memory | Full | Snapshot the in-memory maps, apply ops synchronously, restore on throw. Genuinely atomic — no await interleaves between snapshot and restore. |
| File | Per-store metadata atomic via rename | Cross-file batch is not atomic. A mid-flight crash can land some renames but not others, surfacing as partial_commit. Recoverable via recover(). |
| Cloudflare | D1 metadata atomic via db.batch; R2 orphan-tolerant | All metadata + observations within a transaction commit through a single SQLite transaction. R2 has no multi-object atomic API; failures after R2 puts but before D1 commit leave content-addressed orphan blobs (recoverable via a future corpus.gc()). |
| Layered | Forwarded to bottom write layer | Cache layers above the bottom write layer are skipped on transactional commits. Reads after commit hit the bottom and lazily populate the cache. |
The file backend writes data first (idempotent under content addressing), then metadata, so a mid-flight crash produces orphan blobs but never dangling pointers.
Concurrency
corpus is not a transactional database. There is no isolation between concurrent transactions, no locking, and no optimistic concurrency control.
- Concurrent
corpus.transaction()calls on the sameCorpusinstance are guarded by an in-process flag — the second one returnsinvalid_configwith messagenested transactions are not supported. - Concurrent calls across separate
Corpusinstances pointed at the same backend race. Each transaction commits or aborts as a unit; corpus does not serialise them. Last writer wins for memory and file; D1’s SQLite engine may serialise at the connection level on Cloudflare, but corpus does not promise it.
If you need cross-instance isolation, layer a coordinator (e.g. a queue, a per-key mutex, or an external lock service) above corpus.
Crash recovery (file backend)
The file backend stages writes under <base>/.tx-<uuid>/ and renames them into the live tree on commit. If the process crashes mid-commit, the staging directory is left behind. Clean it up at startup:
import { recover, create_file_backend } from '@f0rbit/corpus/file'
await recover('./data/corpus')const backend = create_file_backend({ base_path: './data/corpus' })recover() scans for .tx-* directories and removes them. It does not roll forward — a partial commit stays partial; the staging dir just gets cleaned up.
Worked example: timeline + index
Write an event and its index entry atomically. The index references the event’s version, so the body has to read the event’s meta before the second put.
import { ok, err } from '@f0rbit/corpus'
type TimelineEvent = { kind: string; payload: unknown; ts: string }type IndexEntry = { date: string; event_version: string }
const result = await corpus.transaction(async (tx) => { const event = await tx.put(corpus.stores.timeline, evt, { tags: ['evt'] }) if (!event.ok) return event
const entry: IndexEntry = { date: evt.ts.slice(0, 10), event_version: event.value.version } const idx = await tx.put(corpus.stores.timeline_index, entry, { parents: [{ store_id: 'timeline', version: event.value.version, role: 'source' }], }) if (!idx.ok) return idx
return ok({ event_version: event.value.version, idx_version: idx.value.version })})If the index put fails, the event put is rolled back too — the timeline never advances without its matching index entry.
Worked example: snapshot + observation
Write a snapshot and an annotation against it in the same commit. Useful for AI pipeline provenance, where the observation only makes sense if its source is durably stored.
import { ok } from '@f0rbit/corpus'
const result = await corpus.transaction(async (tx) => { const doc = await tx.put(corpus.stores.documents, raw_text, { tags: ['ingest'] }) if (!doc.ok) return doc
const obs = await tx.observe(entity_mention, { source: { store_id: 'documents', version: doc.value.version, path: '$.text' }, content: { entity: 'Climate Policy', entity_type: 'topic' }, confidence: 0.95, }) if (!obs.ok) return obs
return ok({ doc_version: doc.value.version, obs_id: obs.value.id })})If the observation write fails, the document write is rolled back. There is no way for an observation to land pointing at a not_found source.
Limitations
- No nested transactions. Calling
corpus.transaction()inside a body returnsinvalid_config. - No timeout option. Wrap the call in
Promise.raceif you need one. - No optimistic concurrency control. A
version_checkop is not provided. If two transactions write the same(store_id, version), last writer wins on memory/file and surfacesapply_batch_failedon Cloudflare D1 unique-constraint violations. - Layered cache asymmetry. Transactional writes skip cache layers and land at the bottom write layer; the cache fills lazily on subsequent reads. Mixing transactional and non-transactional writes against the same layered backend produces an asymmetric cache state. See Extending Backends for the
apply_batchhook contract. - No retry helper. Consumers manage retries themselves — corpus does not loop on
transaction_aborted.