Skip to content

Testing Patterns

The memory backend makes testing Corpus applications straightforward. No mocking required - just use a real in-memory backend.

Basic Test Setup

Create a fresh corpus for each test to ensure isolation:

import { describe, test, expect, beforeEach } from 'bun:test'
import { z } from 'zod'
import {
create_corpus,
create_memory_backend,
define_store,
json_codec,
type Corpus
} from '@f0rbit/corpus'
const UserSchema = z.object({
id: z.string(),
name: z.string(),
email: z.string().email(),
})
const users = define_store('users', json_codec(UserSchema))
describe('UserStore', () => {
let corpus: Corpus<{ users: Store<z.infer<typeof UserSchema>> }>
beforeEach(() => {
corpus = create_corpus()
.with_backend(create_memory_backend())
.with_store(users)
.build()
})
test('stores and retrieves user', async () => {
const user = { id: '1', name: 'Alice', email: 'alice@example.com' }
const put = await corpus.stores.users.put(user)
expect(put.ok).toBe(true)
if (!put.ok) return
const get = await corpus.stores.users.get(put.value.version)
expect(get.ok).toBe(true)
if (!get.ok) return
expect(get.value.data).toEqual(user)
})
})

Testing Lineage

Test parent-child relationships between snapshots:

test('tracks parent-child lineage', async () => {
const parent = await corpus.stores.users.put({
id: '1', name: 'Alice v1', email: 'alice@example.com'
})
expect(parent.ok).toBe(true)
if (!parent.ok) return
const child = await corpus.stores.users.put(
{ id: '1', name: 'Alice v2', email: 'alice@example.com' },
{
parents: [{
store_id: 'users',
version: parent.value.version
}]
}
)
expect(child.ok).toBe(true)
if (!child.ok) return
expect(child.value.parents).toHaveLength(1)
expect(child.value.parents[0].version).toBe(parent.value.version)
})

Testing Deduplication

Verify that identical content shares storage:

test('deduplicates identical content', async () => {
const user = { id: '1', name: 'Alice', email: 'alice@example.com' }
const first = await corpus.stores.users.put(user)
const second = await corpus.stores.users.put(user)
expect(first.ok && second.ok).toBe(true)
if (!first.ok || !second.ok) return
// Different versions
expect(first.value.version).not.toBe(second.value.version)
// Same content hash
expect(first.value.content_hash).toBe(second.value.content_hash)
// Same data key (shared storage)
expect(first.value.data_key).toBe(second.value.data_key)
})

Testing Event Handlers

Capture and verify events using the on_event option:

test('emits events on operations', async () => {
const events: CorpusEvent[] = []
const corpus = create_corpus()
.with_backend(create_memory_backend({
on_event: (e) => events.push(e)
}))
.with_store(users)
.build()
await corpus.stores.users.put({
id: '1', name: 'Alice', email: 'alice@example.com'
})
const putEvents = events.filter(e => e.type === 'snapshot_put')
expect(putEvents).toHaveLength(1)
expect(putEvents[0].store_id).toBe('users')
})

Testing Error Cases

Test that errors are properly returned:

test('returns not_found for missing version', async () => {
const result = await corpus.stores.users.get('nonexistent')
expect(result.ok).toBe(false)
if (result.ok) return
expect(result.error.kind).toBe('not_found')
expect(result.error.version).toBe('nonexistent')
})
test('returns decode_error for invalid data', async () => {
// Directly insert malformed data via the backend
await corpus.data.put('users/bad-hash', new TextEncoder().encode('not json'))
await corpus.metadata.put({
store_id: 'users',
version: 'bad-version',
parents: [],
created_at: new Date(),
content_hash: 'bad-hash',
content_type: 'application/json',
size_bytes: 8,
data_key: 'users/bad-hash',
})
const result = await corpus.stores.users.get('bad-version')
expect(result.ok).toBe(false)
if (result.ok) return
expect(result.error.kind).toBe('decode_error')
})

Testing Listing and Filtering

Test the list() method with various filters:

test('lists snapshots with filters', async () => {
const now = new Date()
const yesterday = new Date(now.getTime() - 24 * 60 * 60 * 1000)
await corpus.stores.users.put(
{ id: '1', name: 'Alice', email: 'alice@example.com' },
{ tags: ['active'] }
)
await corpus.stores.users.put(
{ id: '2', name: 'Bob', email: 'bob@example.com' },
{ tags: ['inactive'] }
)
// List with tag filter
const active: SnapshotMeta[] = []
for await (const meta of corpus.stores.users.list({ tags: ['active'] })) {
active.push(meta)
}
expect(active).toHaveLength(1)
expect(active[0].tags).toContain('active')
// List with limit
const limited: SnapshotMeta[] = []
for await (const meta of corpus.stores.users.list({ limit: 1 })) {
limited.push(meta)
}
expect(limited).toHaveLength(1)
})

Test Factory Pattern

Create a reusable test factory for complex setups:

function createTestCorpus() {
const events: CorpusEvent[] = []
const corpus = create_corpus()
.with_backend(create_memory_backend({
on_event: (e) => events.push(e)
}))
.with_store(users)
.with_store(define_store('posts', json_codec(PostSchema)))
.build()
return {
corpus,
events,
async seedUsers(count: number) {
const versions: string[] = []
for (let i = 0; i < count; i++) {
const result = await corpus.stores.users.put({
id: String(i),
name: `User ${i}`,
email: `user${i}@example.com`,
})
if (result.ok) versions.push(result.value.version)
}
return versions
},
}
}
test('complex scenario with seeded data', async () => {
const { corpus, events, seedUsers } = createTestCorpus()
const versions = await seedUsers(5)
expect(versions).toHaveLength(5)
expect(events.filter(e => e.type === 'snapshot_put')).toHaveLength(5)
})