Observations
Observations let you extract structured facts from your versioned documents and query them efficiently, while automatically tracking which document version each fact came from.
Why Observations?
When building applications on top of versioned documents, you often need to extract structured data:
Without observations, you have two bad options:
- Embed facts in documents - Can’t query efficiently across documents
- Store in separate tables - Lose lineage tracking, must manually invalidate when documents change
Observations solve this by making extracted facts a first-class concept with:
- Source pointers — Every fact knows exactly where it came from (document, version, even JSONPath)
- Automatic staleness — When a source document is updated, observations from the old version are automatically filtered out
- Type-safe schemas — Define observation types with Zod, get full TypeScript inference
- Efficient queries — Filter by type, source, date range, and more
Architecture Overview
┌───────────────────────────────────────────────────────────────────┐│ Corpus │├───────────────────────────────────────────────────────────────────┤│ Stores (versioned documents) │ Observations (facts) ││ ┌─────────────────────────────┐ │ ┌──────────────────────────┐ ││ │ articles │ │ │ sentiment │ ││ │ └─ v1: "Article text..." │◄─┼──│ source: articles:v1 │ ││ │ └─ v2: "Updated text..." │ │ │ content: {score: 0.8} │ ││ └─────────────────────────────┘ │ └──────────────────────────┘ ││ ┌─────────────────────────────┐ │ ┌──────────────────────────┐ ││ │ transcripts │ │ │ entity │ ││ │ └─ v1: {speeches: [...]} │◄─┼──│ source: transcripts:v1 │ ││ └─────────────────────────────┘ │ │ path: $.speeches[0] │ ││ │ │ content: {name: "..."} │ ││ │ └──────────────────────────┘ │└───────────────────────────────────────────────────────────────────┘Each observation contains:
- type - Discriminator (e.g., “sentiment”, “entity”)
- source - Pointer to the document version it came from
- content - The structured fact data (validated by Zod schema)
- metadata - Confidence score, timestamps, derivation lineage
Example Use Case: Article Sentiment Analysis
Imagine you’re building a news aggregator that analyzes sentiment in articles:
import { z } from 'zod'import { define_observation_type } from '@f0rbit/corpus'
const SentimentObservation = define_observation_type( 'sentiment', z.object({ subject: z.string(), score: z.number().min(-1).max(1), keywords: z.array(z.string()), }))import { create_corpus, create_memory_backend, define_store, json_codec} from '@f0rbit/corpus'
const ArticleSchema = z.object({ title: z.string(), body: z.string(), published_at: z.string(),})
const corpus = create_corpus() .with_backend(create_memory_backend()) .with_store(define_store('articles', json_codec(ArticleSchema))) .with_observations([SentimentObservation]) .build()// Store the articleconst article = { title: 'Tech Giants Report Strong Earnings', body: 'Apple and Microsoft both exceeded expectations...', published_at: '2024-07-15T10:00:00Z',}
const result = await corpus.stores.articles.put(article)if (!result.ok) throw new Error('Failed to store article')
const version = result.value.version
// Run your sentiment analysisconst sentiments = analyzeSentiment(article) // Your ML model
// Store observations pointing back to the articlefor (const s of sentiments) { await corpus.observations.put(SentimentObservation, { source: { store_id: 'articles', version: version, }, content: { subject: s.subject, score: s.score, keywords: s.keywords, }, confidence: s.confidence, observed_at: new Date(article.published_at), })}// Get all positive sentiment observations from the last weekfor await (const obs of corpus.observations.query({ type: 'sentiment', after: new Date('2024-07-08'),})) { if (obs.content.score > 0.5) { console.log(`Positive sentiment about ${obs.content.subject}`) console.log(` Score: ${obs.content.score}`) console.log(` Keywords: ${obs.content.keywords.join(', ')}`) console.log(` Source: ${obs.source.store_id}:${obs.source.version}`) }}// Article gets updated with correctionsconst updatedArticle = { ...article, body: 'Apple exceeded expectations, Microsoft met expectations...',}
await corpus.stores.articles.put(updatedArticle)
// Old observations are automatically marked stale!// This query only returns observations from the LATEST version:for await (const obs of corpus.observations.query({ type: 'sentiment' })) { // Only observations from the new version appear here}
// To include historical observations:for await (const obs of corpus.observations.query({ type: 'sentiment', include_stale: true})) { // All observations, including from old versions}Core Concepts
SnapshotPointer
A pointer to a specific location within a versioned document:
type SnapshotPointer = { store_id: string // Which store version: string // Which version path?: string // JSONPath to nested value (e.g., "$.items[0].text") span?: { // Character range within the resolved value start: number end: number }}Use corpus.create_pointer() to create pointers:
// Point to entire documentconst docPointer = corpus.create_pointer('articles', 'v123')
// Point to nested valueconst nestedPointer = corpus.create_pointer('transcripts', 'v456', '$.speeches[2]')
// Resolve a pointer back to its valueconst result = await corpus.resolve_pointer(nestedPointer)if (result.ok) { console.log('Resolved value:', result.value)}Observation Types
Define observation types with Zod schemas for type safety:
import { z } from 'zod'import { define_observation_type } from '@f0rbit/corpus'
// Simple typeconst NoteObservation = define_observation_type( 'note', z.object({ text: z.string(), }))
// Complex type with validationconst EntityObservation = define_observation_type( 'entity', z.object({ name: z.string().min(1), type: z.enum(['person', 'organization', 'location']), mentions: z.number().int().positive(), confidence: z.number().min(0).max(1).optional(), }))
// The content is fully typedawait corpus.observations.put(EntityObservation, { source: pointer, content: { name: 'Acme Corp', type: 'organization', // TypeScript enforces valid enum mentions: 5, },})Staleness Detection
Observations automatically track whether their source document has been superseded:
// Check if a specific observation's source is staleconst isStale = await corpus.observations.is_stale(observation.source)
// Check via the corpus APIconst isSuperseded = await corpus.is_superseded(pointer)How it works:
- When you store a new version of a document, it becomes the “latest” version
- Observations pointing to older versions are considered “stale”
- By default,
query()excludes stale observations - Use
include_stale: trueto include them for historical analysis
API Reference
ObservationsClient
The corpus.observations property provides these methods:
interface ObservationsClient { // Create an observation put<T>( type: ObservationTypeDef<T>, opts: { source: SnapshotPointer content: T confidence?: number observed_at?: Date derived_from?: SnapshotPointer[] } ): Promise<Result<Observation<T>, CorpusError>>
// Get by ID get(id: string): Promise<Result<Observation, CorpusError>>
// Query with filters query(opts?: ObservationQueryOpts): AsyncIterable<Observation>
// Query metadata only (excludes content, more efficient) query_meta(opts?: ObservationQueryOpts): AsyncIterable<ObservationMeta>
// Delete by ID delete(id: string): Promise<Result<void, CorpusError>>
// Delete all observations from a source delete_by_source(source: SnapshotPointer): Promise<Result<number, CorpusError>>
// Check if a source is stale is_stale(pointer: SnapshotPointer): Promise<boolean>}Query Options
type ObservationQueryOpts = { type?: string | string[] // Filter by observation type(s) source_store?: string // Filter by source store source_version?: string // Filter by exact source version source_prefix?: string // Filter by source version prefix after?: Date // observed_at after this date before?: Date // observed_at before this date created_after?: Date // created_at after this date created_before?: Date // created_at before this date include_stale?: boolean // Include stale observations (default: false) limit?: number // Max results to return}Observation Type
type Observation<T = unknown> = { id: string // Auto-generated unique ID type: string // Type discriminator source: SnapshotPointer // Where this came from content: T // The structured data confidence?: number // 0-1 confidence score observed_at?: Date // When the fact occurred (domain time) created_at: Date // When the observation was created derived_from?: SnapshotPointer[] // Optional provenance chain}Provenance Chains with derived_from
Observations can link back to their source observations via the derived_from field. This is essential for tracking AI pipeline provenance—which model produced which annotation from what input.
Use Case: Multi-Hop Annotation Pipeline
Imagine a document processing pipeline:
Raw text → Named Entity Recognition → Entity Disambiguation → Final AnnotationEach stage produces observations. By linking them with derived_from, you can walk back the full chain:
// Stage 1: Extract raw entities from textconst entities = await corpus.observations.put(EntityExtractionObservation, { source: { store_id: 'documents', version: docVersion }, content: { entities: ['Apple', 'Microsoft'] },})
// Stage 2: Disambiguate entities (linking back to stage 1)const disambiguation = await corpus.observations.put(DisambiguationObservation, { source: { store_id: 'documents', version: docVersion }, content: { resolved_entities: [ { name: 'Apple Inc.', wikidata_id: 'Q312' } ] }, derived_from: [entities.value.source] // Link to stage 1})
// Stage 3: Final annotation (linking back to stage 2)const final = await corpus.observations.put(FinalAnnotationObservation, { source: { store_id: 'documents', version: docVersion }, content: { labels: ['company', 'tech'] }, derived_from: [disambiguation.value.source] // Link to stage 2})Later, you can trace back the full provenance:
async function trace_provenance(obs: Observation): Promise<Observation[]> { const chain = [obs]
let current = obs while (current.derived_from && current.derived_from.length > 0) { const source = current.derived_from[0] const result = await corpus.observations.get(source_to_id(source)) // Assuming you store source IDs if (!result.ok) break chain.push(result.value) current = result.value }
return chain}
const provenance = await trace_provenance(final)console.log('Pipeline:', provenance.map(p => p.type).join(' → '))// Output: entity_extraction → disambiguation → final_annotationPractical benefits:
- Model auditing — See which model versions produced which annotations
- Error recovery — If stage 3 fails, you still have stage 2 results to work with
- Explainability — Show users the full computation chain that led to a decision
- Reprocessing — Replace one stage without re-running the entire pipeline
Re-processing Workflows
When you need to re-analyze documents (e.g., with an improved model), you have two options:
// Delete old observations before re-processingawait corpus.observations.delete_by_source({ store_id: 'articles', version: old_version,})
// Store updated documentconst result = await corpus.stores.articles.put(updated_data)const new_version = result.value.version
// Create new observationsfor (const fact of extract_facts(updated_data)) { await corpus.observations.put(MyType, { source: { store_id: 'articles', version: new_version }, content: fact, })}// Just store updated document and create new observationsconst result = await corpus.stores.articles.put(updated_data)const new_version = result.value.version
for (const fact of extract_facts(updated_data)) { await corpus.observations.put(MyType, { source: { store_id: 'articles', version: new_version }, content: fact, })}
// Old observations remain but are filtered out by default// Use include_stale: true for historical analysisWhen to use each:
- Option A - When you want to save storage space and don’t need historical data
- Option B - When you want to keep a full audit trail of all extractions
Backend Support
Observations are supported on all backends:
| Backend | Storage | Best For |
|---|---|---|
| Memory | In-memory Map | Testing, prototyping |
| File | JSON file | Local development |
| Cloudflare | D1 (SQLite) | Production |
| Layered | Composite | Caching, replication |
Error Handling
Observation operations return Result<T, CorpusError> types:
const result = await corpus.observations.put(SentimentObservation, { source: pointer, content: { subject: 'test', score: 2.0, keywords: [] }, // Invalid: score > 1})
if (!result.ok) { switch (result.error.kind) { case 'validation_error': console.log('Invalid content:', result.error.message) break case 'observation_not_found': console.log('Observation not found:', result.error.id) break case 'storage_error': console.log('Backend error:', result.error.operation) break }}Testing with Observations
Use the memory backend for fast, isolated tests:
import { describe, it, expect, beforeEach } from 'bun:test'import { create_memory_backend } from '@f0rbit/corpus'
describe('sentiment analysis', () => { let corpus: ReturnType<typeof createCorpus>
beforeEach(() => { // Fresh backend for each test corpus = create_corpus() .with_backend(create_memory_backend()) .with_store(define_store('articles', json_codec(ArticleSchema))) .with_observations([SentimentObservation]) .build() })
it('extracts sentiment from articles', async () => { // Store article const result = await corpus.stores.articles.put(testArticle) expect(result.ok).toBe(true)
// Create observation const obsResult = await corpus.observations.put(SentimentObservation, { source: { store_id: 'articles', version: result.value.version }, content: { subject: 'test', score: 0.8, keywords: ['good'] }, }) expect(obsResult.ok).toBe(true)
// Query observations const observations = [] for await (const obs of corpus.observations.query({ type: 'sentiment' })) { observations.push(obs) } expect(observations).toHaveLength(1) expect(observations[0].content.score).toBe(0.8) })})