Skip to content

Transactions

corpus.transaction(async tx => …) runs a callback whose mutations are buffered and committed together — atomically when the backend supports it, sequentially with best-effort rollback when it does not.

Why transactions

Single-store writes are well-served by store.put(). Each call is content-addressed and idempotent on the data side, so retries are safe; metadata writes go through one backend call.

What that primitive doesn’t cover is linkage between stores, where two writes have to land or neither should:

  • A timeline event plus a timeline_index entry that references the event’s version.
  • A documents snapshot plus an observation annotating it.

If the second write fails after the first commits, downstream readers see inconsistent state — an index that lags real events, or an observation whose source resolves to not_found. corpus.transaction() is the primitive that closes that gap.

API

const result = await corpus.transaction(async (tx) => {
const event = await tx.put(corpus.stores.timeline, payload, { tags: ['evt'] })
if (!event.ok) return event
const idx = await tx.put(corpus.stores.timeline_index, build_index(payload), {
parents: [{ store_id: 'timeline', version: event.value.version, role: 'source' }],
})
if (!idx.ok) return idx
return ok({ event_version: event.value.version })
})
if (!result.ok) {
// result.error.kind: 'transaction_aborted' | 'partial_commit' | 'invalid_config' | ...
return result
}
// result.value: { value: { event_version: string }, commits: SnapshotMeta[], observations: Observation[] }

The handle exposes:

type TransactionHandle = {
put: <T>(store: Store<T>, data: T, opts?: PutOpts) => Promise<Result<SnapshotMeta, CorpusError>>
get: <T>(store: Store<T>, version: string) => Promise<Result<Snapshot<T>, CorpusError>>
delete: <T>(store: Store<T>, version: string) => Promise<Result<void, CorpusError>>
observe: <T>(type: ObservationTypeDef<T>, opts: ObservationPutOpts<T>) => Promise<Result<Observation<T>, CorpusError>>
observation_delete: (id: string) => Promise<Result<void, CorpusError>>
}

The body must return a Result. Returning err() aborts the transaction with transaction_aborted (reason returned_err). A thrown exception is caught and wrapped as transaction_aborted (reason threw). Any successful body return triggers a commit phase against the backend.

The transaction result wraps the body’s return value alongside an ordered log of writes:

type TransactionResult<R> = {
value: R // whatever the body returned via ok(...)
commits: SnapshotMeta[] // every tx.put, in put-order
observations: Observation[] // every tx.observe, in observe-order
}

Read-your-writes

tx.get() checks the in-tx buffer before falling through to the backend. A tx.put followed by a tx.get for that version returns the buffered snapshot without a backend round-trip.

tx.delete writes a tombstone into the buffer, so a subsequent tx.get for that same (store_id, version) returns not_found even if the version is still committed in the live backend (the tombstone wins until the tx commits or aborts). This keeps in-tx semantics consistent with what the post-commit world would look like.

Reads from outside the transaction see no in-flight state — they always read the live backend.

Failure modes

Three error kinds are specific to transactions:

KindWhen it fires
transaction_abortedBody returned err(), threw, or the backend’s apply_batch rejected the commit. The reason field discriminates: 'returned_err', 'threw', or 'apply_batch_failed'.
partial_commitSequential fallback path could not fully roll back after a mid-batch failure. Only fires for backends without apply_batch, or when a non-undoable op (e.g. meta_delete) was already applied.
concurrent_modificationReserved for forward-compatibility. Currently no built-in backend produces this — unique-constraint violations on Cloudflare D1 surface as transaction_aborted with reason: 'apply_batch_failed'. Documented so consumer switches stay exhaustive once it lands.

Switch on reason to react to the abort cause:

const result = await corpus.transaction(async (tx) => {
// ...
})
if (!result.ok) {
switch (result.error.kind) {
case 'transaction_aborted':
switch (result.error.reason) {
case 'returned_err':
// body chose to abort — inspect cause for the original CorpusError
break
case 'threw':
// body threw — cause is the original Error
break
case 'apply_batch_failed':
// commit phase rejected — backend-specific cause
break
}
break
case 'partial_commit':
console.error(
`partial commit: ${result.error.ops_completed} of ${result.error.ops_completed + result.error.ops_failed} committed`,
)
break
case 'invalid_config':
// e.g. nested transaction, unregistered store
break
}
}

Per-backend guarantees

BackendAtomicityNotes
MemoryFullSnapshot the in-memory maps, apply ops synchronously, restore on throw. Genuinely atomic — no await interleaves between snapshot and restore.
FilePer-store metadata atomic via renameCross-file batch is not atomic. A mid-flight crash can land some renames but not others, surfacing as partial_commit. Recoverable via recover().
CloudflareD1 metadata atomic via db.batch; R2 orphan-tolerantAll metadata + observations within a transaction commit through a single SQLite transaction. R2 has no multi-object atomic API; failures after R2 puts but before D1 commit leave content-addressed orphan blobs (recoverable via a future corpus.gc()).
LayeredForwarded to bottom write layerCache layers above the bottom write layer are skipped on transactional commits. Reads after commit hit the bottom and lazily populate the cache.

The file backend writes data first (idempotent under content addressing), then metadata, so a mid-flight crash produces orphan blobs but never dangling pointers.

Concurrency

corpus is not a transactional database. There is no isolation between concurrent transactions, no locking, and no optimistic concurrency control.

  • Concurrent corpus.transaction() calls on the same Corpus instance are guarded by an in-process flag — the second one returns invalid_config with message nested transactions are not supported.
  • Concurrent calls across separate Corpus instances pointed at the same backend race. Each transaction commits or aborts as a unit; corpus does not serialise them. Last writer wins for memory and file; D1’s SQLite engine may serialise at the connection level on Cloudflare, but corpus does not promise it.

If you need cross-instance isolation, layer a coordinator (e.g. a queue, a per-key mutex, or an external lock service) above corpus.

Crash recovery (file backend)

The file backend stages writes under <base>/.tx-<uuid>/ and renames them into the live tree on commit. If the process crashes mid-commit, the staging directory is left behind. Clean it up at startup:

import { recover, create_file_backend } from '@f0rbit/corpus/file'
await recover('./data/corpus')
const backend = create_file_backend({ base_path: './data/corpus' })

recover() scans for .tx-* directories and removes them. It does not roll forward — a partial commit stays partial; the staging dir just gets cleaned up.

Worked example: timeline + index

Write an event and its index entry atomically. The index references the event’s version, so the body has to read the event’s meta before the second put.

import { ok, err } from '@f0rbit/corpus'
type TimelineEvent = { kind: string; payload: unknown; ts: string }
type IndexEntry = { date: string; event_version: string }
const result = await corpus.transaction(async (tx) => {
const event = await tx.put(corpus.stores.timeline, evt, { tags: ['evt'] })
if (!event.ok) return event
const entry: IndexEntry = { date: evt.ts.slice(0, 10), event_version: event.value.version }
const idx = await tx.put(corpus.stores.timeline_index, entry, {
parents: [{ store_id: 'timeline', version: event.value.version, role: 'source' }],
})
if (!idx.ok) return idx
return ok({ event_version: event.value.version, idx_version: idx.value.version })
})

If the index put fails, the event put is rolled back too — the timeline never advances without its matching index entry.

Worked example: snapshot + observation

Write a snapshot and an annotation against it in the same commit. Useful for AI pipeline provenance, where the observation only makes sense if its source is durably stored.

import { ok } from '@f0rbit/corpus'
const result = await corpus.transaction(async (tx) => {
const doc = await tx.put(corpus.stores.documents, raw_text, { tags: ['ingest'] })
if (!doc.ok) return doc
const obs = await tx.observe(entity_mention, {
source: { store_id: 'documents', version: doc.value.version, path: '$.text' },
content: { entity: 'Climate Policy', entity_type: 'topic' },
confidence: 0.95,
})
if (!obs.ok) return obs
return ok({ doc_version: doc.value.version, obs_id: obs.value.id })
})

If the observation write fails, the document write is rolled back. There is no way for an observation to land pointing at a not_found source.

Limitations

  • No nested transactions. Calling corpus.transaction() inside a body returns invalid_config.
  • No timeout option. Wrap the call in Promise.race if you need one.
  • No optimistic concurrency control. A version_check op is not provided. If two transactions write the same (store_id, version), last writer wins on memory/file and surfaces apply_batch_failed on Cloudflare D1 unique-constraint violations.
  • Layered cache asymmetry. Transactional writes skip cache layers and land at the bottom write layer; the cache fills lazily on subsequent reads. Mixing transactional and non-transactional writes against the same layered backend produces an asymmetric cache state. See Extending Backends for the apply_batch hook contract.
  • No retry helper. Consumers manage retries themselves — corpus does not loop on transaction_aborted.

See Also