Skip to content

Streaming

Streaming surfaces the underlying ReadableStream<Uint8Array> of a stored snapshot through the Store API. It avoids buffering large payloads (multi-GB media on R2, multi-MB log dumps) into worker memory before the consumer can touch a single byte.

The streaming surface is opt-in: Store.get and Store.put are unchanged and still materialise the full value. Streaming methods (get_handle, get_latest_handle, put_stream) only exist when the underlying codec supports them, and the type system enforces this — calling put_stream on a Store<User> backed by json_codec is a TypeScript error.

Why streaming matters

  • Cloudflare R2 returns a ReadableStream natively. Without streaming, every read buffers the full object into memory before decoding.
  • File backend uses Bun.file().stream() (256 KB chunks) when configured to stream — useful for large local artefacts.
  • Streaming writes let you pipe an HTTP body, file, or sub-process stdout straight into corpus without materialising the upstream first.

Streamability rules

A value is streamable when it can be carved into discrete chunks. v1 supports two shapes:

type StreamableValue = string | Uint8Array

Structured values (e.g. User validated by Zod) cannot stream — the codec needs the full document to validate.

A codec is streamable when it ships the optional encode_stream / decode_stream methods on the Codec<T> interface. Built-in codec coverage:

Codecencode_streamdecode_stream
json_codec(schema)
text_codec()yesyes
binary_codec()yesyes
gzip_codec()yesyes
encrypt_codec(key)yes

encrypt_codec intentionally omits decode_stream — AES-GCM has to verify the auth tag before any plaintext is safe to release. See Codecs.

Reading: get_handle / get_latest_handle

Store.get_handle(version) and Store.get_latest_handle() return a SnapshotHandle<T> instead of an eagerly materialised value. The handle exposes three accessors:

type SnapshotHandle<T> = {
value: () => Promise<Result<T, CorpusError>>
bytes: () => Promise<Result<Uint8Array, CorpusError>>
stream: T extends StreamableValue
? () => Promise<Result<ReadableStream<T>, CorpusError>>
: never
}
MethodReturnsNotes
value()Decoded TAlways available. Equivalent to store.get(v).value.data.
bytes()The literal stored bytes (post-encode)Always available. Includes any wrapper layers (gzip, encrypt).
stream()ReadableStream<T> of decoded chunksOnly present when the codec has decode_stream.

bytes() returns the raw payload as written to the backend — gzipped if the codec was composed with gzip_codec(), encrypted if composed with encrypt_codec(). Asking for “decoded bytes” before decode runs is incoherent for a Codec<User>; call value() instead.

Example

import { create_corpus, create_memory_backend, define_store, text_codec } from '@f0rbit/corpus'
const corpus = create_corpus()
.with_backend(create_memory_backend())
.with_store(define_store('logs', text_codec()))
.build()
const meta = await corpus.stores.logs.put('chunk one\nchunk two\n')
if (!meta.ok) throw meta.error
const handle_result = await corpus.stores.logs.get_latest_handle()
if (!handle_result.ok) throw handle_result.error
const { handle } = handle_result.value
const stream_result = await handle.stream()
if (!stream_result.ok) throw stream_result.error
const reader = stream_result.value.getReader()
while (true) {
const { done, value } = await reader.read()
if (done) break
process.stdout.write(value)
}

Writing: put_stream

Store.put_stream(stream, opts?) accepts a ReadableStream<T> and stores its concatenation as a single snapshot. Symmetric with get_handle — only present when the codec exposes encode_stream.

import { create_corpus, create_memory_backend, define_store, binary_codec } from '@f0rbit/corpus'
const corpus = create_corpus()
.with_backend(create_memory_backend())
.with_store(define_store('uploads', binary_codec()))
.build()
const response = await fetch('https://example.com/big.bin')
if (!response.body) throw new Error('no body')
const meta = await corpus.stores.uploads.put_stream(response.body)

Compile-time guarantees

SnapshotHandle.stream and Store.put_stream are conditional types. When T is not string | Uint8Array, both fields resolve to never and calling them is a type error. No runtime branch needed — the checker rejects misuse.

const users = define_store('users', json_codec(UserSchema))
// @ts-expect-error — put_stream is `never` for json-backed stores.
await corpus.stores.users.put_stream(some_stream)
const handle_result = await corpus.stores.users.get_latest_handle()
if (handle_result.ok) {
// @ts-expect-error — stream() is `never` for json-backed stores.
await handle_result.value.handle.stream()
}

The same gating applies to compositions through compose() — adding a non-streamable layer to a codec drops the stream method off the returned codec, which propagates to the Store type.

Composition and streamability

Streamability composes structurally. compose(text_codec(), gzip_codec()) keeps both encode_stream and decode_stream because every layer has them. compose(text_codec(), encrypt_codec(key)) drops decode_stream because encrypt_codec doesn’t ship it. The type system reflects this — see Codecs: compose.

See Also

  • Codecscompose(), built-in codecs, streamability table
  • Core TypesSnapshotHandle<T>, Store<T>, StreamableValue
  • Extending BackendsDataStorageHandle for custom backends