Utility Functions
The corpus library provides extended utilities for working with Result<T, E> types and managing concurrent async operations. These utilities enable explicit error handling without exceptions and controlled parallelism.
Result Utilities
All operations return a Result<T, E> type rather than throwing exceptions:
type Result<T, E = CorpusError> = | { ok: true; value: T } | { ok: false; error: E }
// Helper constructorsconst ok = <T>(value: T): Result<T, never>const err = <E>(error: E): Result<never, E>Basic Result Operations
match(result, on_ok, on_err)
Pattern match on a Result, extracting the value with the appropriate handler. Guarantees exhaustive handling of both cases.
import { match } from '@f0rbit/corpus'
const result = await fetchUser(id)
const message = match( result, user => `Hello, ${user.name}!`, error => `Failed: ${error.message}`)Signature:
function match<T, E, R>( result: Result<T, E>, on_ok: (value: T) => R, on_err: (error: E) => R): Runwrap_or(result, default_value)
Extract the success value, returning a default if the Result is an error. Useful for providing fallback values without branching.
import { unwrap_or } from '@f0rbit/corpus'
const users = unwrap_or(await fetchUsers(), [])const count = unwrap_or(await getCount(), 0)Signature:
function unwrap_or<T, E>(result: Result<T, E>, default_value: T): Tunwrap(result) / unwrap_err(result)
Forcefully extract the value or error, throwing if the Result is in the opposite state. Use only when you’re certain of the Result’s state, or in tests.
import { unwrap, unwrap_err } from '@f0rbit/corpus'
// In tests where you expect successconst user = unwrap(await createUser(data))expect(user.name).toBe('Alice')
// In tests where you expect failureconst error = unwrap_err(await createUser(invalidData))expect(error.kind).toBe('validation_error')Signatures:
function unwrap<T, E>(result: Result<T, E>): T // throws if errorfunction unwrap_err<T, E>(result: Result<T, E>): E // throws if okException Handling
try_catch(fn, on_error)
Execute a synchronous function and convert any thrown exception to a Result. Bridges exception-throwing APIs into the Result world.
import { try_catch } from '@f0rbit/corpus'
const result = try_catch( () => JSON.parse(input), e => ({ kind: 'parse_error', message: format_error(e) }))
if (result.ok) { console.log(result.value)}Signature:
function try_catch<T, E>( fn: () => T, on_error: (e: unknown) => E): Result<T, E>try_catch_async(fn, on_error)
Async version of try_catch. Wraps a Promise-returning function and converts rejections to a Result.
import { try_catch_async } from '@f0rbit/corpus'
const result = await try_catch_async( () => db.query('SELECT * FROM users'), e => ({ kind: 'database_error', cause: e }))Signature:
function try_catch_async<T, E>( fn: () => Promise<T>, on_error: (e: unknown) => E): Promise<Result<T, E>>Fetch Wrapper
fetch_result(input, init, on_error, parse_body?)
A fetch wrapper that returns a Result instead of throwing. Handles both network errors and non-2xx HTTP responses.
import { fetch_result, type FetchError } from '@f0rbit/corpus'
const result = await fetch_result( 'https://api.example.com/users', { headers: { Authorization: `Bearer ${token}` } }, e => e.type === 'http' ? `HTTP ${e.status}` : 'Network error')
if (result.ok) { console.log(result.value) // Parsed JSON body}Error types:
type FetchError = | { type: 'network'; cause: unknown } | { type: 'http'; status: number; status_text: string }Custom body parsing:
// Parse as text instead of JSONconst textResult = await fetch_result( url, undefined, formatError, response => response.text())
// Parse with Zod validationconst validatedResult = await fetch_result( url, undefined, formatError, async response => UserSchema.parse(await response.json()))Signature:
function fetch_result<T, E>( input: string | URL | Request, init: RequestInit | undefined, on_error: (e: FetchError) => E, parse_body?: (response: Response) => Promise<T>): Promise<Result<T, E>>Result Pipelines
pipe(initial)
Create a composable pipeline for chaining Result operations. All operations are lazy—nothing executes until you call .result() or .unwrap_or().
import { pipe, ok } from '@f0rbit/corpus'
const avatar = await pipe(fetchUser(id)) .flat_map(user => fetchProfile(user.profile_id)) .map(profile => profile.avatar_url) .unwrap_or('/default-avatar.png')Pipeline methods:
| Method | Description |
|---|---|
.map(fn) | Transform the success value |
.map_async(fn) | Transform with an async function |
.flat_map(fn) | Chain with another Result-returning operation |
.map_err(fn) | Transform the error value |
.tap(fn) | Execute side effect on success (logging, metrics) |
.tap_err(fn) | Execute side effect on error |
.unwrap_or(default) | Extract value with fallback (terminal) |
.result() | Get the underlying Result (terminal) |
Starting a pipeline:
// From a sync Resultconst doubled = await pipe(ok(21)) .map(n => n * 2) .result()// { ok: true, value: 42 }// From Promise<Result>const user = await pipe(fetchUser(id)) .flat_map(u => fetchPermissions(u.id)) .result()// Wrap throwing codeconst data = await pipe.try( () => riskyAsyncOperation(), e => ({ kind: 'failed', cause: e })) .map(processData) .result()// Wrap fetch callconst users = await pipe.fetch<User[], string>( '/api/users', { headers: { Authorization: token } }, e => e.type === 'http' ? `Error ${e.status}` : 'Network failure') .map(users => users.filter(u => u.active)) .unwrap_or([])Static constructors:
// Start with Ok valuepipe.ok(42) // Pipe<number, never>
// Start with Err valuepipe.err({ kind: 'not_found' }) // Pipe<never, { kind: string }>
// Start by wrapping a throwing async functionpipe.try(fn, on_error) // Pipe<T, E>
// Start with a fetch callpipe.fetch(url, init, on_error) // Pipe<T, E>Chaining operations:
const result = await pipe(getConfig()) .map(config => config.apiUrl) .flat_map(url => fetch_result(url, undefined, formatError)) .map(data => data.items) .tap(items => console.log(`Fetched ${items.length} items`)) .tap_err(err => metrics.increment('api_error')) .map_err(e => ({ kind: 'api_error', cause: e })) .result()Utility Converters
to_nullable(result)
Convert a Result to its value or null. Useful for “fetch single resource” patterns where not-found is expected.
import { to_nullable } from '@f0rbit/corpus'
const user = to_nullable(await store.get(userId))if (!user) { return <NotFound />}return <Profile user={user} />to_fallback(result, fallback)
Convert a Result to its value or a fallback. Useful for list endpoints where empty array is acceptable.
import { to_fallback } from '@f0rbit/corpus'
const items = to_fallback(await store.list(), [])null_on(result, predicate) / fallback_on(result, predicate, fallback)
Return null/fallback only for expected errors (matching the predicate), otherwise rethrow the error. Use for 404-as-null patterns where other errors should propagate.
import { null_on, fallback_on } from '@f0rbit/corpus'
// Returns null for not_found, throws for other errorsconst user = null_on( await store.get(id), e => e.kind === 'not_found')
// Returns 0 for not_found, throws for storage_errorconst count = fallback_on( await store.count(), e => e.kind === 'not_found', 0)format_error(e)
Format an unknown error to a string. Handles Error objects, strings, and anything else via String(e).
import { format_error } from '@f0rbit/corpus'
const result = try_catch( () => JSON.parse(input), e => ({ kind: 'parse_error', message: format_error(e) }))Object Utilities
DeepPartial<T>
A utility type that recursively makes all properties of T optional. Useful for configuration objects and partial updates.
import { type DeepPartial } from '@f0rbit/corpus'
type Config = { api: { url: string; timeout: number } debug: boolean}
type PartialConfig = DeepPartial<Config>// { api?: { url?: string; timeout?: number }; debug?: boolean }merge_deep(base, overrides)
Deep merge two objects, with overrides taking precedence. Only merges plain objects—arrays and null values are replaced entirely.
import { merge_deep } from '@f0rbit/corpus'
const config = merge_deep( { api: { url: 'http://localhost', timeout: 5000 }, debug: false }, { api: { timeout: 10000 } })// { api: { url: 'http://localhost', timeout: 10000 }, debug: false }Signature:
function merge_deep<T extends Record<string, unknown>>( base: T, overrides: DeepPartial<T>): TArray Utilities
at(array, index)
Safely access an array element by index, returning a Result instead of potentially undefined. Provides explicit error information for out-of-bounds access.
import { at } from '@f0rbit/corpus'
const items = ['a', 'b', 'c']
const second = at(items, 1)// { ok: true, value: 'b' }
const tenth = at(items, 10)// { ok: false, error: { kind: 'index_out_of_bounds', index: 10, length: 3 } }Signature:
function at<T>( array: readonly T[], index: number): Result<T, { kind: 'index_out_of_bounds'; index: number; length: number }>first(array)
Safely get the first element of an array, returning a Result that indicates if the array was empty.
import { first } from '@f0rbit/corpus'
const head = first([1, 2, 3])// { ok: true, value: 1 }
const empty = first([])// { ok: false, error: { kind: 'empty_array' } }Signature:
function first<T>( array: readonly T[]): Result<T, { kind: 'empty_array' }>last(array)
Safely get the last element of an array, returning a Result that indicates if the array was empty.
import { last } from '@f0rbit/corpus'
const tail = last([1, 2, 3])// { ok: true, value: 3 }
const empty = last([])// { ok: false, error: { kind: 'empty_array' } }Signature:
function last<T>( array: readonly T[]): Result<T, { kind: 'empty_array' }>Composable Patterns
The real power of these utilities emerges when you compose them together. The Result-returning array functions (first, last, at) combine elegantly with the converter utilities (to_nullable, to_fallback) to eliminate verbose .ok checks and ternaries.
Converting Empty Arrays to Null
The most common pattern: get the first/last element, or null if the array is empty.
import { first, last, to_nullable } from '@f0rbit/corpus'
const head = to_nullable(first(items)) // T | nullconst tail = to_nullable(last(items)) // T | null// Without composition - manual .ok checks everywhereconst result = first(items)const head = result.ok ? result.value : null
const result2 = last(items)const tail = result2.ok ? result2.value : nullProviding Fallback Values
When you want a default value instead of null:
import { first, last, to_fallback } from '@f0rbit/corpus'
const primary = to_fallback(first(colors), 'gray')const latest = to_fallback(last(events), defaultEvent)// Without compositionconst result = first(colors)const primary = result.ok ? result.value : 'gray'
const result2 = last(events)const latest = result2.ok ? result2.value : defaultEventReal-World Parsing Patterns
These patterns shine in string parsing scenarios:
import { first, to_fallback } from '@f0rbit/corpus'
// Extract username from "user/repo" format, fallback to original stringconst username = to_fallback(first(input.split('/')), input)
// Get file extension, default to empty stringconst extension = to_fallback(last(filename.split('.')), '')
// Extract first path segmentconst basePath = to_fallback(first(path.split('/').filter(Boolean)), '/')Index Access with Fallbacks
import { at, to_nullable, to_fallback } from '@f0rbit/corpus'
// Get specific element or nullconst third = to_nullable(at(items, 2))
// Get element with defaultconst selected = to_fallback(at(options, selectedIndex), options[0])Combining with Pipe
For more complex transformations, combine with pipe:
import { pipe, first, ok } from '@f0rbit/corpus'
// Transform the first element, or use a defaultconst firstName = await pipe(ok(users)) .flat_map(first) .map(user => user.name.toUpperCase()) .unwrap_or('ANONYMOUS')
// Chain multiple operationsconst primaryEmail = await pipe(ok(contacts)) .flat_map(first) .map(contact => contact.emails) .flat_map(first) .unwrap_or('no-email@example.com')Why This Matters
This functional composition approach:
- Eliminates boilerplate: No manual
.okchecks or ternary expressions - Reads naturally:
to_nullable(first(arr))clearly expresses “first element or null” - Composes freely: Each utility does one thing, combining into powerful patterns
- Type-safe: TypeScript infers the correct types through the composition
Concurrency Utilities
Semaphore
A classic semaphore for controlling concurrent operations. Limits the number of concurrent async operations by requiring callers to acquire a permit before proceeding.
import { Semaphore } from '@f0rbit/corpus'
const semaphore = new Semaphore(3) // Allow 3 concurrent operations
async function rateLimitedFetch(url: string) { await semaphore.acquire() try { return await fetch(url) } finally { semaphore.release() }}
// Only 3 fetches will run concurrentlyawait Promise.all(urls.map(rateLimitedFetch))Methods:
| Method | Description |
|---|---|
acquire() | Wait for a permit. Resolves immediately if available, otherwise waits. |
release() | Release a permit, allowing the next waiting operation to proceed. |
parallel_map(items, mapper, concurrency)
Map over an array with controlled concurrency. Unlike Promise.all which starts all operations at once, this limits concurrent operations while preserving result order.
import { parallel_map } from '@f0rbit/corpus'
// Process 100 items, but only 5 at a timeconst results = await parallel_map( urls, async (url, index) => { console.log(`Fetching ${index + 1}/${urls.length}`) return fetch(url).then(r => r.json()) }, 5)Signature:
function parallel_map<T, R>( items: T[], mapper: (item: T, index: number) => Promise<R>, concurrency: number): Promise<R[]>Key behaviors:
- Results are returned in the same order as inputs
- If any mapper throws, the entire operation rejects
- Uses
Semaphoreinternally to limit concurrency
Usage Examples
Example 1: API Client with Error Handling
Combine pipe, fetch_result, and try_catch for a robust API client:
import { pipe, fetch_result, try_catch, type FetchError } from '@f0rbit/corpus'import { z } from 'zod'
const UserSchema = z.object({ id: z.string(), name: z.string(), email: z.string().email()})
type User = z.infer<typeof UserSchema>type ApiError = | { kind: 'network' } | { kind: 'http'; status: number } | { kind: 'validation'; message: string }
const toApiError = (e: FetchError): ApiError => e.type === 'network' ? { kind: 'network' } : { kind: 'http', status: e.status }
async function getUser(id: string): Promise<User | null> { return pipe.fetch<unknown, ApiError>( `/api/users/${id}`, undefined, toApiError ) .flat_map(data => { const parsed = try_catch( () => UserSchema.parse(data), e => ({ kind: 'validation' as const, message: String(e) }) ) return Promise.resolve(parsed) }) .tap(user => console.log(`Loaded user: ${user.name}`)) .tap_err(err => console.error(`Failed to load user ${id}:`, err.kind)) .unwrap_or(null as unknown as User) .then(u => u ?? null)}Example 2: Rate-Limited Batch Processing
Use parallel_map with AI APIs that have rate limits:
import { parallel_map, try_catch_async, ok, err, type Result } from '@f0rbit/corpus'
type SummaryError = { kind: 'ai_error'; cause: unknown }
async function summarizeDocument(doc: string): Promise<Result<string, SummaryError>> { return try_catch_async( async () => { const response = await ai.complete({ model: 'claude-3-opus', messages: [{ role: 'user', content: `Summarize: ${doc}` }] }) return response.content }, cause => ({ kind: 'ai_error', cause }) )}
async function summarizeBatch(documents: string[]): Promise<string[]> { // Process 3 at a time to respect rate limits const results = await parallel_map( documents, async (doc, index) => { console.log(`Processing ${index + 1}/${documents.length}`) const result = await summarizeDocument(doc) if (!result.ok) { console.warn(`Failed to summarize doc ${index}:`, result.error) return '[Summary unavailable]' } return result.value }, 3 )
return results}Example 3: Graceful Degradation
Use null_on and to_fallback for graceful degradation:
import { null_on, to_fallback, pipe } from '@f0rbit/corpus'
async function renderDashboard(userId: string) { // User must exist - other errors should propagate const user = null_on( await userStore.get(userId), e => e.kind === 'not_found' )
if (!user) { return <NotFound message="User not found" /> }
// These can gracefully degrade to empty/defaults const [posts, notifications, preferences] = await Promise.all([ to_fallback(postStore.listByUser(userId), []), to_fallback(notificationStore.listUnread(userId), []), pipe(preferencesStore.get(userId)) .unwrap_or({ theme: 'light', notifications: true }) ])
return ( <Dashboard user={user} posts={posts} notifications={notifications} preferences={preferences} /> )}