Skip to content

Observations API

Observations let you extract structured facts from versioned documents with automatic staleness detection. For a comprehensive guide with examples, see the Observations Guide.

define_observation_type

function define_observation_type<T>(
name: string,
schema: ZodType<T>
): ObservationTypeDef<T>

Creates a typed observation definition for use with observations.put().

Parameters

ParameterTypeDescription
namestringUnique identifier for this observation type
schemaZodType<T>Zod schema for validating observation content

Example

import { z } from 'zod'
import { define_observation_type } from '@f0rbit/corpus'
const SentimentObservation = define_observation_type(
'sentiment',
z.object({
subject: z.string(),
score: z.number().min(-1).max(1),
keywords: z.array(z.string()),
})
)

ObservationsClient

The corpus.observations property provides methods for managing observations.

interface ObservationsClient {
put<T>(type: ObservationTypeDef<T>, opts: ObservationPutOpts<T>): Promise<Result<Observation<T>, CorpusError>>
get(id: string): Promise<Result<Observation, CorpusError>>
query(opts?: ObservationQueryOpts): AsyncIterable<Observation>
query_meta(opts?: ObservationQueryOpts): AsyncIterable<ObservationMeta>
delete(id: string): Promise<Result<void, CorpusError>>
delete_by_source(source: SnapshotPointer): Promise<Result<number, CorpusError>>
is_stale(pointer: SnapshotPointer): Promise<boolean>
}

Methods

MethodDescription
put(type, opts)Create a new observation
get(id)Get an observation by ID
query(opts?)Query observations with filters
query_meta(opts?)Query metadata only (more efficient)
delete(id)Delete an observation by ID
delete_by_source(source)Delete all observations from a source
is_stale(pointer)Check if a source is stale

Types

SnapshotPointer

Universal address to versioned content within a corpus store.

type SnapshotPointer = {
store_id: string // Which store
version: string // Which version
path?: string // JSONPath to nested value (e.g., "$.items[0].text")
span?: { // Character range within the resolved value
start: number
end: number
}
}

Observation

A stored observation record linking structured facts to versioned content.

type Observation<T = unknown> = {
id: string // Auto-generated unique ID
type: string // Type discriminator
source: SnapshotPointer // Where this came from
content: T // The structured data
confidence?: number // 0-1 confidence score
observed_at?: Date // When the fact occurred (domain time)
created_at: Date // When the observation was created
derived_from?: SnapshotPointer[] // Optional provenance chain
}

ObservationMeta

Observation metadata without content payload. Used for efficient listing operations.

type ObservationMeta = Omit<Observation<never>, 'content'>

ObservationTypeDef

Definition for a typed observation schema.

type ObservationTypeDef<T> = {
readonly name: string
readonly schema: ZodType<T>
}

ObservationQueryOpts

Query options for filtering observations.

type ObservationQueryOpts = {
type?: string | string[] // Filter by observation type(s)
source_store?: string // Filter by source store
source_version?: string // Filter by exact source version
source_prefix?: string // Filter by source version prefix
after?: Date // observed_at after this date
before?: Date // observed_at before this date
created_after?: Date // created_at after this date
created_before?: Date // created_at before this date
include_stale?: boolean // Include stale observations (default: false)
limit?: number // Max results to return
cursor?: string // Pagination cursor
}

ObservationPutOpts

Options for creating a new observation.

type ObservationPutOpts<T> = {
source: SnapshotPointer // Required: what this observation is about
content: T // Required: the typed observation data
confidence?: number // Optional: 0-1 confidence score
observed_at?: Date // Optional: when the fact occurred
derived_from?: SnapshotPointer[] // Optional: provenance chain
}

InferObservationContent

Utility type to extract the content type from an ObservationTypeDef.

type InferObservationContent<T> = T extends ObservationTypeDef<infer C> ? C : never
const entity_mention = define_observation_type('entity_mention', EntitySchema)
type EntityMention = InferObservationContent<typeof entity_mention>
// => { entity: string; entity_type: 'person' | 'organization' | ... }

Error Handling

Observation operations return Result<T, CorpusError> types. Observation-specific errors:

type ObservationError =
| { kind: 'validation_error'; message: string }
| { kind: 'observation_not_found'; id: string }
| { kind: 'storage_error'; cause: Error; operation: string }

Example

const result = await corpus.observations.put(SentimentObservation, {
source: pointer,
content: { subject: 'test', score: 2.0, keywords: [] }, // Invalid: score > 1
})
if (!result.ok) {
switch (result.error.kind) {
case 'validation_error':
console.log('Invalid content:', result.error.message)
break
case 'observation_not_found':
console.log('Observation not found:', result.error.id)
break
case 'storage_error':
console.log('Backend error:', result.error.operation)
break
}
}

See Also