Skip to content

Observations

Observations let you extract structured facts from your versioned documents and query them efficiently, while automatically tracking which document version each fact came from.

Why Observations?

When building applications on top of versioned documents, you often need to extract structured data:

Sentiment Analysis
Extract emotional tone from articles
Named Entities
Identify people, places, organizations
Key Facts
Surface important data points
Annotations
User-added notes and highlights

Without observations, you have two bad options:

  1. Embed facts in documents - Can’t query efficiently across documents
  2. Store in separate tables - Lose lineage tracking, must manually invalidate when documents change

Observations solve this by making extracted facts a first-class concept with:

  • Source pointers — Every fact knows exactly where it came from (document, version, even JSONPath)
  • Automatic staleness — When a source document is updated, observations from the old version are automatically filtered out
  • Type-safe schemas — Define observation types with Zod, get full TypeScript inference
  • Efficient queries — Filter by type, source, date range, and more

Architecture Overview

┌───────────────────────────────────────────────────────────────────┐
│ Corpus │
├───────────────────────────────────────────────────────────────────┤
│ Stores (versioned documents) │ Observations (facts) │
│ ┌─────────────────────────────┐ │ ┌──────────────────────────┐ │
│ │ articles │ │ │ sentiment │ │
│ │ └─ v1: "Article text..." │◄─┼──│ source: articles:v1 │ │
│ │ └─ v2: "Updated text..." │ │ │ content: {score: 0.8} │ │
│ └─────────────────────────────┘ │ └──────────────────────────┘ │
│ ┌─────────────────────────────┐ │ ┌──────────────────────────┐ │
│ │ transcripts │ │ │ entity │ │
│ │ └─ v1: {speeches: [...]} │◄─┼──│ source: transcripts:v1 │ │
│ └─────────────────────────────┘ │ │ path: $.speeches[0] │ │
│ │ │ content: {name: "..."} │ │
│ │ └──────────────────────────┘ │
└───────────────────────────────────────────────────────────────────┘

Each observation contains:

  • type - Discriminator (e.g., “sentiment”, “entity”)
  • source - Pointer to the document version it came from
  • content - The structured fact data (validated by Zod schema)
  • metadata - Confidence score, timestamps, derivation lineage

Example Use Case: Article Sentiment Analysis

Imagine you’re building a news aggregator that analyzes sentiment in articles:

1
Define your observation type
import { z } from 'zod'
import { define_observation_type } from '@f0rbit/corpus'
const SentimentObservation = define_observation_type(
'sentiment',
z.object({
subject: z.string(),
score: z.number().min(-1).max(1),
keywords: z.array(z.string()),
})
)
2
Create a corpus with observations
import {
create_corpus,
create_memory_backend,
define_store,
json_codec
} from '@f0rbit/corpus'
const ArticleSchema = z.object({
title: z.string(),
body: z.string(),
published_at: z.string(),
})
const corpus = create_corpus()
.with_backend(create_memory_backend())
.with_store(define_store('articles', json_codec(ArticleSchema)))
.with_observations([SentimentObservation])
.build()
3
Store an article and extract observations
// Store the article
const article = {
title: 'Tech Giants Report Strong Earnings',
body: 'Apple and Microsoft both exceeded expectations...',
published_at: '2024-07-15T10:00:00Z',
}
const result = await corpus.stores.articles.put(article)
if (!result.ok) throw new Error('Failed to store article')
const version = result.value.version
// Run your sentiment analysis
const sentiments = analyzeSentiment(article) // Your ML model
// Store observations pointing back to the article
for (const s of sentiments) {
await corpus.observations.put(SentimentObservation, {
source: {
store_id: 'articles',
version: version,
},
content: {
subject: s.subject,
score: s.score,
keywords: s.keywords,
},
confidence: s.confidence,
observed_at: new Date(article.published_at),
})
}
4
Query observations across all articles
// Get all positive sentiment observations from the last week
for await (const obs of corpus.observations.query({
type: 'sentiment',
after: new Date('2024-07-08'),
})) {
if (obs.content.score > 0.5) {
console.log(`Positive sentiment about ${obs.content.subject}`)
console.log(` Score: ${obs.content.score}`)
console.log(` Keywords: ${obs.content.keywords.join(', ')}`)
console.log(` Source: ${obs.source.store_id}:${obs.source.version}`)
}
}
5
Handle article updates automatically
// Article gets updated with corrections
const updatedArticle = {
...article,
body: 'Apple exceeded expectations, Microsoft met expectations...',
}
await corpus.stores.articles.put(updatedArticle)
// Old observations are automatically marked stale!
// This query only returns observations from the LATEST version:
for await (const obs of corpus.observations.query({ type: 'sentiment' })) {
// Only observations from the new version appear here
}
// To include historical observations:
for await (const obs of corpus.observations.query({
type: 'sentiment',
include_stale: true
})) {
// All observations, including from old versions
}

Core Concepts

SnapshotPointer

A pointer to a specific location within a versioned document:

type SnapshotPointer = {
store_id: string // Which store
version: string // Which version
path?: string // JSONPath to nested value (e.g., "$.items[0].text")
span?: { // Character range within the resolved value
start: number
end: number
}
}

Use corpus.create_pointer() to create pointers:

// Point to entire document
const docPointer = corpus.create_pointer('articles', 'v123')
// Point to nested value
const nestedPointer = corpus.create_pointer('transcripts', 'v456', '$.speeches[2]')
// Resolve a pointer back to its value
const result = await corpus.resolve_pointer(nestedPointer)
if (result.ok) {
console.log('Resolved value:', result.value)
}

Observation Types

Define observation types with Zod schemas for type safety:

import { z } from 'zod'
import { define_observation_type } from '@f0rbit/corpus'
// Simple type
const NoteObservation = define_observation_type(
'note',
z.object({
text: z.string(),
})
)
// Complex type with validation
const EntityObservation = define_observation_type(
'entity',
z.object({
name: z.string().min(1),
type: z.enum(['person', 'organization', 'location']),
mentions: z.number().int().positive(),
confidence: z.number().min(0).max(1).optional(),
})
)
// The content is fully typed
await corpus.observations.put(EntityObservation, {
source: pointer,
content: {
name: 'Acme Corp',
type: 'organization', // TypeScript enforces valid enum
mentions: 5,
},
})

Staleness Detection

Observations automatically track whether their source document has been superseded:

// Check if a specific observation's source is stale
const isStale = await corpus.observations.is_stale(observation.source)
// Check via the corpus API
const isSuperseded = await corpus.is_superseded(pointer)

How it works:

  1. When you store a new version of a document, it becomes the “latest” version
  2. Observations pointing to older versions are considered “stale”
  3. By default, query() excludes stale observations
  4. Use include_stale: true to include them for historical analysis

API Reference

ObservationsClient

The corpus.observations property provides these methods:

interface ObservationsClient {
// Create an observation
put<T>(
type: ObservationTypeDef<T>,
opts: {
source: SnapshotPointer
content: T
confidence?: number
observed_at?: Date
derived_from?: SnapshotPointer[]
}
): Promise<Result<Observation<T>, CorpusError>>
// Get by ID
get(id: string): Promise<Result<Observation, CorpusError>>
// Query with filters
query(opts?: ObservationQueryOpts): AsyncIterable<Observation>
// Query metadata only (excludes content, more efficient)
query_meta(opts?: ObservationQueryOpts): AsyncIterable<ObservationMeta>
// Delete by ID
delete(id: string): Promise<Result<void, CorpusError>>
// Delete all observations from a source
delete_by_source(source: SnapshotPointer): Promise<Result<number, CorpusError>>
// Check if a source is stale
is_stale(pointer: SnapshotPointer): Promise<boolean>
}

Query Options

type ObservationQueryOpts = {
type?: string | string[] // Filter by observation type(s)
source_store?: string // Filter by source store
source_version?: string // Filter by exact source version
source_prefix?: string // Filter by source version prefix
after?: Date // observed_at after this date
before?: Date // observed_at before this date
created_after?: Date // created_at after this date
created_before?: Date // created_at before this date
include_stale?: boolean // Include stale observations (default: false)
limit?: number // Max results to return
}

Observation Type

type Observation<T = unknown> = {
id: string // Auto-generated unique ID
type: string // Type discriminator
source: SnapshotPointer // Where this came from
content: T // The structured data
confidence?: number // 0-1 confidence score
observed_at?: Date // When the fact occurred (domain time)
created_at: Date // When the observation was created
derived_from?: SnapshotPointer[] // Optional provenance chain
}

Re-processing Workflows

When you need to re-analyze documents (e.g., with an improved model), you have two options:

When to use each:

  • Option A - When you want to save storage space and don’t need historical data
  • Option B - When you want to keep a full audit trail of all extractions

Backend Support

Observations are supported on all backends:

BackendStorageBest For
MemoryIn-memory MapTesting, prototyping
FileJSON fileLocal development
CloudflareD1 (SQLite)Production
LayeredCompositeCaching, replication

Error Handling

Observation operations return Result<T, CorpusError> types:

const result = await corpus.observations.put(SentimentObservation, {
source: pointer,
content: { subject: 'test', score: 2.0, keywords: [] }, // Invalid: score > 1
})
if (!result.ok) {
switch (result.error.kind) {
case 'validation_error':
console.log('Invalid content:', result.error.message)
break
case 'observation_not_found':
console.log('Observation not found:', result.error.id)
break
case 'storage_error':
console.log('Backend error:', result.error.operation)
break
}
}

Testing with Observations

Use the memory backend for fast, isolated tests:

import { describe, it, expect, beforeEach } from 'bun:test'
import { create_memory_backend } from '@f0rbit/corpus'
describe('sentiment analysis', () => {
let corpus: ReturnType<typeof createCorpus>
beforeEach(() => {
// Fresh backend for each test
corpus = create_corpus()
.with_backend(create_memory_backend())
.with_store(define_store('articles', json_codec(ArticleSchema)))
.with_observations([SentimentObservation])
.build()
})
it('extracts sentiment from articles', async () => {
// Store article
const result = await corpus.stores.articles.put(testArticle)
expect(result.ok).toBe(true)
// Create observation
const obsResult = await corpus.observations.put(SentimentObservation, {
source: { store_id: 'articles', version: result.value.version },
content: { subject: 'test', score: 0.8, keywords: ['good'] },
})
expect(obsResult.ok).toBe(true)
// Query observations
const observations = []
for await (const obs of corpus.observations.query({ type: 'sentiment' })) {
observations.push(obs)
}
expect(observations).toHaveLength(1)
expect(observations[0].content.score).toBe(0.8)
})
})

See Also