Codecs
Codecs define how data is serialized for storage and deserialized on retrieval. Each codec specifies an encoding format and content type.
json_codec
function json_codec<T>(schema: ZodLike<T>): Codec<T>Creates a JSON codec with Zod schema validation. Data is validated on decode (read), ensuring type safety when retrieving stored data.
Usage
import { json_codec, define_store } from '@f0rbit/corpus'import { z } from 'zod'
const UserSchema = z.object({ name: z.string(), email: z.string().email(), age: z.number().min(0),})
const users = define_store('users', json_codec(UserSchema))Zod Compatibility
Works with both Zod 3.x and 4.x through structural typing:
// Any object with a parse method workstype ZodLike<T> = { parse: (data: unknown) => T }Validation Example
// Valid data - works fineawait store.put({ name: 'Alice', email: 'alice@example.com', age: 30 })
// Invalid data on retrieval - decode_error// (if stored data doesn't match schema)const result = await store.get(version)if (!result.ok && result.error.kind === 'decode_error') { console.error('Data validation failed:', result.error.cause)}Content Type
application/json
text_codec
function text_codec(): Codec<string>Creates a plain text codec using UTF-8 encoding. No validation is performed.
Usage
import { text_codec, define_store } from '@f0rbit/corpus'
const logs = define_store('logs', text_codec())const notes = define_store('notes', text_codec())Example
const corpus = create_corpus() .with_backend(create_memory_backend()) .with_store(define_store('logs', text_codec())) .build()
await corpus.stores.logs.put('2024-01-15 10:30:00 - Server started')await corpus.stores.logs.put('2024-01-15 10:30:05 - Connected to database')
const latest = await corpus.stores.logs.get_latest()if (latest.ok) { console.log(latest.value.data) // "2024-01-15 10:30:05 - Connected to database"}Content Type
text/plain
binary_codec
function binary_codec(): Codec<Uint8Array>Creates a pass-through codec for raw binary data. No transformation is applied - data is stored and retrieved as-is.
Usage
import { binary_codec, define_store } from '@f0rbit/corpus'
const images = define_store('images', binary_codec())const documents = define_store('documents', binary_codec())Example
const corpus = create_corpus() .with_backend(create_file_backend({ base_path: './data' })) .with_store(define_store('images', binary_codec())) .build()
// Store an imageconst imageBytes = await Bun.file('photo.png').bytes()await corpus.stores.images.put(new Uint8Array(imageBytes))
// Retrieve and saveconst result = await corpus.stores.images.get_latest()if (result.ok) { await Bun.write('output.png', result.value.data)}Content Type
application/octet-stream
compose
function compose<T>(head: Codec<T>, ...layers: BytesCodec[]): Codec<T>Chain a head codec with N byte-transforming layers (gzip, encrypt, base64, …). Encode runs left-to-right; decode runs right-to-left.
The leftmost codec defines the value type T. Subsequent layers are BytesCodec (i.e. Codec<Uint8Array>). The composed codec inherits the head’s content_type — wrapper layers don’t change semantic type.
Usage
import { compose, json_codec, gzip_codec, define_store } from '@f0rbit/corpus'import { z } from 'zod'
const EventSchema = z.object({ id: z.string(), payload: z.unknown() })
const codec = compose(json_codec(EventSchema), gzip_codec())const events = define_store('events', codec)Streamability inference
compose() does not return a special StreamableCodec<T> — it just conditionally includes encode_stream / decode_stream on the result. Both are present iff every layer (head + all wrappers) exposes the corresponding stream method:
compose(text_codec(), gzip_codec()).decode_stream // definedcompose(json_codec(s), gzip_codec()).decode_stream // undefined — json_codec lacks decode_streamcompose(text_codec(), encrypt_codec(key)).decode_stream // undefined — encrypt_codec omits decode_streamThis propagates to the Store<T> type: adding a non-streamable layer disables Store.put_stream / SnapshotHandle.stream for that store.
gzip_codec
function gzip_codec(): BytesCodecA BytesCodec that compresses on encode and decompresses on decode via the standard CompressionStream / DecompressionStream APIs (Workers, Bun, Node 18+; no polyfill).
Use as a layer in compose(...) — the head codec defines the value type; gzip_codec() transforms the bytes left by the upstream encoder.
Usage
import { compose, json_codec, gzip_codec, define_store } from '@f0rbit/corpus'
const codec = compose(json_codec(EventSchema), gzip_codec())const events = define_store('events', codec)Both encode_stream and decode_stream are present, so a compose(text_codec(), gzip_codec()) chain is fully streamable. With json_codec, only the buffered (non-streaming) path is available because json_codec itself doesn’t ship decode_stream.
Content Type
application/gzip (overridden by the head codec’s content_type once composed)
encrypt_codec
function encrypt_codec(key: CryptoKey): BytesCodecA BytesCodec that encrypts on encode and decrypts on decode via WebCrypto AES-GCM. A fresh 12-byte IV is generated per encode call and prepended to the ciphertext for self-contained decoding — no caller-side IV management.
Usage
import { compose, json_codec, encrypt_codec, define_store } from '@f0rbit/corpus'
const key = await crypto.subtle.generateKey( { name: 'AES-GCM', length: 256 }, true, ['encrypt', 'decrypt'])
const codec = compose(json_codec(SecretSchema), encrypt_codec(key))const secrets = define_store('secrets', codec)Streaming notes
encode_streamis present (single-chunk wrapper aroundencode). True chunked encryption is not implemented — AES-GCM authenticates the whole ciphertext via a tag at the end, and standard WebCrypto exposes only the one-shot form.decode_streamis intentionally omitted. AES-GCM must verify the auth tag before any plaintext is safe to release; yielding chunks early would leak unauthenticated data. Includingencrypt_codec()in a composition disables streaming decode for the whole pipeline — by design.
Deduplication tradeoff
A random IV per encode means identical plaintexts produce different ciphertexts each time. Content-hash deduplication operates on the encoded bytes, so a compose(json_codec(S), encrypt_codec(key)) pipeline cannot dedup identical plaintexts.
This is the correct security tradeoff. Opt-in deterministic encryption is a footgun — it leaks plaintext equality through ciphertext equality. If you need dedup with encryption, layer encryption on top of an already-content-addressed system rather than asking the encryption layer to be deterministic.
Content Type
application/octet-stream (overridden by the head codec’s content_type once composed)
Streamability summary
| Codec | encode_stream | decode_stream |
|---|---|---|
json_codec(schema) | – | – |
text_codec() | yes | yes |
binary_codec() | yes | yes |
gzip_codec() | yes | yes |
encrypt_codec(key) | yes | – |
A composed codec carries a stream method only when every layer carries it. See Streaming for how streamability surfaces through Store.get_handle / Store.put_stream.
Custom Codecs
Create your own codec by implementing the Codec<T> interface:
type Codec<T> = { content_type: ContentType encode: (value: T) => Promise<Uint8Array> decode: (bytes: Uint8Array) => Promise<T> encode_stream?: (value: T) => ReadableStream<Uint8Array> decode_stream?: (bytes: ReadableStream<Uint8Array>) => ReadableStream<T>}encode and decode are async (since 0.4.0). The two stream methods are optional — ship them when the format chunks naturally (text, raw bytes, gzip). Omit them when chunking is unsafe or incoherent (Zod-validated JSON, AEAD decode).
MessagePack Example
import { encode, decode } from '@msgpack/msgpack'
function msgpack_codec<T>(schema: Parser<T>): Codec<T> { return { content_type: 'application/msgpack', encode: async (value) => encode(value), decode: async (bytes) => schema.parse(decode(bytes)), }}XML Example
function xml_codec(): Codec<string> { return { content_type: 'text/xml', encode: async (value) => new TextEncoder().encode(value), decode: async (bytes) => new TextDecoder().decode(bytes), }}Types
Codec
type Codec<T> = { content_type: ContentType encode: (value: T) => Promise<Uint8Array> decode: (bytes: Uint8Array) => Promise<T> encode_stream?: (value: T) => ReadableStream<Uint8Array> decode_stream?: (bytes: ReadableStream<Uint8Array>) => ReadableStream<T>}| Property | Type | Description |
|---|---|---|
content_type | ContentType | MIME type for the encoded data |
encode | (T) => Promise<Uint8Array> | Serialize value to bytes (async since 0.4.0) |
decode | (Uint8Array) => Promise<T> | Deserialize bytes to value (async since 0.4.0) |
encode_stream | (T) => ReadableStream<Uint8Array> | Optional. Chunked encode — required for Store.put_stream. |
decode_stream | (ReadableStream<Uint8Array>) => ReadableStream<T> | Optional. Chunked decode — required for SnapshotHandle.stream(). |
BytesCodec
type BytesCodec = Codec<Uint8Array>A codec layer that transforms bytes → bytes (gzip, encrypt, base64, …). The parameter type for compose() layers — the head codec defines T, subsequent layers are byte transformers.
ContentType
type ContentType = | "application/json" | "text/plain" | "text/xml" | "image/png" | "image/jpeg" | "application/octet-stream" | (string & {}) // Any other MIME typeComparison
| Codec | Type | Validation | Use Case |
|---|---|---|---|
json_codec | Structured data | Zod schema | Most application data |
text_codec | Strings | None | Logs, notes, markup |
binary_codec | Raw bytes | None | Files, images, blobs |
gzip_codec | Bytes layer | None | Wrap any codec with gzip compression via compose() |
encrypt_codec | Bytes layer | Auth-tag (GCM) | Wrap any codec with AES-GCM encryption via compose() |