Streaming
Streaming surfaces the underlying ReadableStream<Uint8Array> of a stored snapshot through the Store API. It avoids buffering large payloads (multi-GB media on R2, multi-MB log dumps) into worker memory before the consumer can touch a single byte.
The streaming surface is opt-in: Store.get and Store.put are unchanged and still materialise the full value. Streaming methods (get_handle, get_latest_handle, put_stream) only exist when the underlying codec supports them, and the type system enforces this — calling put_stream on a Store<User> backed by json_codec is a TypeScript error.
Why streaming matters
- Cloudflare R2 returns a
ReadableStreamnatively. Without streaming, every read buffers the full object into memory before decoding. - File backend uses
Bun.file().stream()(256 KB chunks) when configured to stream — useful for large local artefacts. - Streaming writes let you pipe an HTTP body, file, or sub-process stdout straight into corpus without materialising the upstream first.
Streamability rules
A value is streamable when it can be carved into discrete chunks. v1 supports two shapes:
type StreamableValue = string | Uint8ArrayStructured values (e.g. User validated by Zod) cannot stream — the codec needs the full document to validate.
A codec is streamable when it ships the optional encode_stream / decode_stream methods on the Codec<T> interface. Built-in codec coverage:
| Codec | encode_stream | decode_stream |
|---|---|---|
json_codec(schema) | – | – |
text_codec() | yes | yes |
binary_codec() | yes | yes |
gzip_codec() | yes | yes |
encrypt_codec(key) | yes | – |
encrypt_codec intentionally omits decode_stream — AES-GCM has to verify the auth tag before any plaintext is safe to release. See Codecs.
Reading: get_handle / get_latest_handle
Store.get_handle(version) and Store.get_latest_handle() return a SnapshotHandle<T> instead of an eagerly materialised value. The handle exposes three accessors:
type SnapshotHandle<T> = { value: () => Promise<Result<T, CorpusError>> bytes: () => Promise<Result<Uint8Array, CorpusError>> stream: T extends StreamableValue ? () => Promise<Result<ReadableStream<T>, CorpusError>> : never}| Method | Returns | Notes |
|---|---|---|
value() | Decoded T | Always available. Equivalent to store.get(v).value.data. |
bytes() | The literal stored bytes (post-encode) | Always available. Includes any wrapper layers (gzip, encrypt). |
stream() | ReadableStream<T> of decoded chunks | Only present when the codec has decode_stream. |
bytes() returns the raw payload as written to the backend — gzipped if the codec was composed with gzip_codec(), encrypted if composed with encrypt_codec(). Asking for “decoded bytes” before decode runs is incoherent for a Codec<User>; call value() instead.
Example
import { create_corpus, create_memory_backend, define_store, text_codec } from '@f0rbit/corpus'
const corpus = create_corpus() .with_backend(create_memory_backend()) .with_store(define_store('logs', text_codec())) .build()
const meta = await corpus.stores.logs.put('chunk one\nchunk two\n')if (!meta.ok) throw meta.error
const handle_result = await corpus.stores.logs.get_latest_handle()if (!handle_result.ok) throw handle_result.error
const { handle } = handle_result.valueconst stream_result = await handle.stream()if (!stream_result.ok) throw stream_result.error
const reader = stream_result.value.getReader()while (true) { const { done, value } = await reader.read() if (done) break process.stdout.write(value)}Writing: put_stream
Store.put_stream(stream, opts?) accepts a ReadableStream<T> and stores its concatenation as a single snapshot. Symmetric with get_handle — only present when the codec exposes encode_stream.
import { create_corpus, create_memory_backend, define_store, binary_codec } from '@f0rbit/corpus'
const corpus = create_corpus() .with_backend(create_memory_backend()) .with_store(define_store('uploads', binary_codec())) .build()
const response = await fetch('https://example.com/big.bin')if (!response.body) throw new Error('no body')
const meta = await corpus.stores.uploads.put_stream(response.body)Compile-time guarantees
SnapshotHandle.stream and Store.put_stream are conditional types. When T is not string | Uint8Array, both fields resolve to never and calling them is a type error. No runtime branch needed — the checker rejects misuse.
const users = define_store('users', json_codec(UserSchema))
// @ts-expect-error — put_stream is `never` for json-backed stores.await corpus.stores.users.put_stream(some_stream)
const handle_result = await corpus.stores.users.get_latest_handle()if (handle_result.ok) { // @ts-expect-error — stream() is `never` for json-backed stores. await handle_result.value.handle.stream()}The same gating applies to compositions through compose() — adding a non-streamable layer to a codec drops the stream method off the returned codec, which propagates to the Store type.
Composition and streamability
Streamability composes structurally. compose(text_codec(), gzip_codec()) keeps both encode_stream and decode_stream because every layer has them. compose(text_codec(), encrypt_codec(key)) drops decode_stream because encrypt_codec doesn’t ship it. The type system reflects this — see Codecs: compose.
See Also
- Codecs —
compose(), built-in codecs, streamability table - Core Types —
SnapshotHandle<T>,Store<T>,StreamableValue - Extending Backends —
DataStorageHandlefor custom backends