Observations
Observations let you extract structured facts from your versioned documents and query them efficiently, while automatically tracking which document version each fact came from.
Why Observations?
When building applications on top of versioned documents, you often need to extract structured data:
Without observations, you have two bad options:
- Embed facts in documents - Can’t query efficiently across documents
- Store in separate tables - Lose lineage tracking, must manually invalidate when documents change
Observations solve this by making extracted facts a first-class concept with:
- Source pointers — Every fact knows exactly where it came from (document, version, even JSONPath)
- Automatic staleness — When a source document is updated, observations from the old version are automatically filtered out
- Type-safe schemas — Define observation types with Zod, get full TypeScript inference
- Efficient queries — Filter by type, source, date range, and more
Architecture Overview
┌───────────────────────────────────────────────────────────────────┐│ Corpus │├───────────────────────────────────────────────────────────────────┤│ Stores (versioned documents) │ Observations (facts) ││ ┌─────────────────────────────┐ │ ┌──────────────────────────┐ ││ │ articles │ │ │ sentiment │ ││ │ └─ v1: "Article text..." │◄─┼──│ source: articles:v1 │ ││ │ └─ v2: "Updated text..." │ │ │ content: {score: 0.8} │ ││ └─────────────────────────────┘ │ └──────────────────────────┘ ││ ┌─────────────────────────────┐ │ ┌──────────────────────────┐ ││ │ transcripts │ │ │ entity │ ││ │ └─ v1: {speeches: [...]} │◄─┼──│ source: transcripts:v1 │ ││ └─────────────────────────────┘ │ │ path: $.speeches[0] │ ││ │ │ content: {name: "..."} │ ││ │ └──────────────────────────┘ │└───────────────────────────────────────────────────────────────────┘Each observation contains:
- type - Discriminator (e.g., “sentiment”, “entity”)
- source - Pointer to the document version it came from
- content - The structured fact data (validated by Zod schema)
- metadata - Confidence score, timestamps, derivation lineage
Example Use Case: Article Sentiment Analysis
Imagine you’re building a news aggregator that analyzes sentiment in articles:
import { z } from 'zod'import { define_observation_type } from '@f0rbit/corpus'
const SentimentObservation = define_observation_type( 'sentiment', z.object({ subject: z.string(), score: z.number().min(-1).max(1), keywords: z.array(z.string()), }))import { create_corpus, create_memory_backend, define_store, json_codec} from '@f0rbit/corpus'
const ArticleSchema = z.object({ title: z.string(), body: z.string(), published_at: z.string(),})
const corpus = create_corpus() .with_backend(create_memory_backend()) .with_store(define_store('articles', json_codec(ArticleSchema))) .with_observations([SentimentObservation]) .build()// Store the articleconst article = { title: 'Tech Giants Report Strong Earnings', body: 'Apple and Microsoft both exceeded expectations...', published_at: '2024-07-15T10:00:00Z',}
const result = await corpus.stores.articles.put(article)if (!result.ok) throw new Error('Failed to store article')
const version = result.value.version
// Run your sentiment analysisconst sentiments = analyzeSentiment(article) // Your ML model
// Store observations pointing back to the articlefor (const s of sentiments) { await corpus.observations.put(SentimentObservation, { source: { store_id: 'articles', version: version, }, content: { subject: s.subject, score: s.score, keywords: s.keywords, }, confidence: s.confidence, observed_at: new Date(article.published_at), })}// Get all positive sentiment observations from the last weekfor await (const obs of corpus.observations.query({ type: 'sentiment', after: new Date('2024-07-08'),})) { if (obs.content.score > 0.5) { console.log(`Positive sentiment about ${obs.content.subject}`) console.log(` Score: ${obs.content.score}`) console.log(` Keywords: ${obs.content.keywords.join(', ')}`) console.log(` Source: ${obs.source.store_id}:${obs.source.version}`) }}// Article gets updated with correctionsconst updatedArticle = { ...article, body: 'Apple exceeded expectations, Microsoft met expectations...',}
await corpus.stores.articles.put(updatedArticle)
// Old observations are automatically marked stale!// This query only returns observations from the LATEST version:for await (const obs of corpus.observations.query({ type: 'sentiment' })) { // Only observations from the new version appear here}
// To include historical observations:for await (const obs of corpus.observations.query({ type: 'sentiment', include_stale: true})) { // All observations, including from old versions}Core Concepts
SnapshotPointer
A pointer to a specific location within a versioned document:
type SnapshotPointer = { store_id: string // Which store version: string // Which version path?: string // JSONPath to nested value (e.g., "$.items[0].text") span?: { // Character range within the resolved value start: number end: number }}Use corpus.create_pointer() to create pointers:
// Point to entire documentconst docPointer = corpus.create_pointer('articles', 'v123')
// Point to nested valueconst nestedPointer = corpus.create_pointer('transcripts', 'v456', '$.speeches[2]')
// Resolve a pointer back to its valueconst result = await corpus.resolve_pointer(nestedPointer)if (result.ok) { console.log('Resolved value:', result.value)}Observation Types
Define observation types with Zod schemas for type safety:
import { z } from 'zod'import { define_observation_type } from '@f0rbit/corpus'
// Simple typeconst NoteObservation = define_observation_type( 'note', z.object({ text: z.string(), }))
// Complex type with validationconst EntityObservation = define_observation_type( 'entity', z.object({ name: z.string().min(1), type: z.enum(['person', 'organization', 'location']), mentions: z.number().int().positive(), confidence: z.number().min(0).max(1).optional(), }))
// The content is fully typedawait corpus.observations.put(EntityObservation, { source: pointer, content: { name: 'Acme Corp', type: 'organization', // TypeScript enforces valid enum mentions: 5, },})Staleness Detection
Observations automatically track whether their source document has been superseded:
// Check if a specific observation's source is staleconst isStale = await corpus.observations.is_stale(observation.source)
// Check via the corpus APIconst isSuperseded = await corpus.is_superseded(pointer)How it works:
- When you store a new version of a document, it becomes the “latest” version
- Observations pointing to older versions are considered “stale”
- By default,
query()excludes stale observations - Use
include_stale: trueto include them for historical analysis
API Reference
ObservationsClient
The corpus.observations property provides these methods:
interface ObservationsClient { // Create an observation put<T>( type: ObservationTypeDef<T>, opts: { source: SnapshotPointer content: T confidence?: number observed_at?: Date derived_from?: SnapshotPointer[] } ): Promise<Result<Observation<T>, CorpusError>>
// Get by ID get(id: string): Promise<Result<Observation, CorpusError>>
// Query with filters query(opts?: ObservationQueryOpts): AsyncIterable<Observation>
// Query metadata only (excludes content, more efficient) query_meta(opts?: ObservationQueryOpts): AsyncIterable<ObservationMeta>
// Delete by ID delete(id: string): Promise<Result<void, CorpusError>>
// Delete all observations from a source delete_by_source(source: SnapshotPointer): Promise<Result<number, CorpusError>>
// Check if a source is stale is_stale(pointer: SnapshotPointer): Promise<boolean>}Query Options
type ObservationQueryOpts = { type?: string | string[] // Filter by observation type(s) source_store?: string // Filter by source store source_version?: string // Filter by exact source version source_prefix?: string // Filter by source version prefix after?: Date // observed_at after this date before?: Date // observed_at before this date created_after?: Date // created_at after this date created_before?: Date // created_at before this date include_stale?: boolean // Include stale observations (default: false) limit?: number // Max results to return}Observation Type
type Observation<T = unknown> = { id: string // Auto-generated unique ID type: string // Type discriminator source: SnapshotPointer // Where this came from content: T // The structured data confidence?: number // 0-1 confidence score observed_at?: Date // When the fact occurred (domain time) created_at: Date // When the observation was created derived_from?: SnapshotPointer[] // Optional provenance chain}Re-processing Workflows
When you need to re-analyze documents (e.g., with an improved model), you have two options:
// Delete old observations before re-processingawait corpus.observations.delete_by_source({ store_id: 'articles', version: old_version,})
// Store updated documentconst result = await corpus.stores.articles.put(updated_data)const new_version = result.value.version
// Create new observationsfor (const fact of extract_facts(updated_data)) { await corpus.observations.put(MyType, { source: { store_id: 'articles', version: new_version }, content: fact, })}// Just store updated document and create new observationsconst result = await corpus.stores.articles.put(updated_data)const new_version = result.value.version
for (const fact of extract_facts(updated_data)) { await corpus.observations.put(MyType, { source: { store_id: 'articles', version: new_version }, content: fact, })}
// Old observations remain but are filtered out by default// Use include_stale: true for historical analysisWhen to use each:
- Option A - When you want to save storage space and don’t need historical data
- Option B - When you want to keep a full audit trail of all extractions
Backend Support
Observations are supported on all backends:
| Backend | Storage | Best For |
|---|---|---|
| Memory | In-memory Map | Testing, prototyping |
| File | JSON file | Local development |
| Cloudflare | D1 (SQLite) | Production |
| Layered | Composite | Caching, replication |
Error Handling
Observation operations return Result<T, CorpusError> types:
const result = await corpus.observations.put(SentimentObservation, { source: pointer, content: { subject: 'test', score: 2.0, keywords: [] }, // Invalid: score > 1})
if (!result.ok) { switch (result.error.kind) { case 'validation_error': console.log('Invalid content:', result.error.message) break case 'observation_not_found': console.log('Observation not found:', result.error.id) break case 'storage_error': console.log('Backend error:', result.error.operation) break }}Testing with Observations
Use the memory backend for fast, isolated tests:
import { describe, it, expect, beforeEach } from 'bun:test'import { create_memory_backend } from '@f0rbit/corpus'
describe('sentiment analysis', () => { let corpus: ReturnType<typeof createCorpus>
beforeEach(() => { // Fresh backend for each test corpus = create_corpus() .with_backend(create_memory_backend()) .with_store(define_store('articles', json_codec(ArticleSchema))) .with_observations([SentimentObservation]) .build() })
it('extracts sentiment from articles', async () => { // Store article const result = await corpus.stores.articles.put(testArticle) expect(result.ok).toBe(true)
// Create observation const obsResult = await corpus.observations.put(SentimentObservation, { source: { store_id: 'articles', version: result.value.version }, content: { subject: 'test', score: 0.8, keywords: ['good'] }, }) expect(obsResult.ok).toBe(true)
// Query observations const observations = [] for await (const obs of corpus.observations.query({ type: 'sentiment' })) { observations.push(obs) } expect(observations).toHaveLength(1) expect(observations[0].content.score).toBe(0.8) })})