ProductDocumentationExamplesBlogRoadmapGitHubGet Started
Experimental

IncrementalFlow

The programmatic API for incremental view maintenance: sources, views, ticks, checkpoints, watches.

Where Incremental Views are the SQL face of IVM, IncrementalFlow is the Rust face. It gives you programmatic control over sources, views, ticks, checkpoints, and watches. Use it when you need a custom tick trigger, a non-SQL source, or a watch that pushes deltas to another system.

IncrementalFlow: a delta in the source triggers a recompute along the dirty path only. Downstream views stay consistent with the source of truth, and a checkpoint pins a known-good state.

Shape of a flow

use krishiv_ivm::IncrementalFlow;

let mut flow = IncrementalFlow::new();

// 1. Declare one or more sources (record-batched inputs)
flow.register_source("orders", orders_schema)?;

// 2. Declare views as SQL; the flow orders them topologically
flow.register_view("order_totals",
    "SELECT customer_id, SUM(amount) AS total FROM orders GROUP BY customer_id")?;
flow.register_view("top_customers",
    "SELECT * FROM order_totals ORDER BY total DESC LIMIT 10")?;

// 3. Drive ticks; each tick returns a StepSummary
let summary = flow.tick("orders", delta_batch).await?;
println!("rows_in={} rows_out={} duration_ms={}",
    summary.rows_in, summary.rows_out, summary.duration_ms);

// 4. Read snapshots (current materialised state) or watch for deltas
let snapshot = flow.snapshot("top_customers").await?;
let mut rx = flow.watch_output("top_customers")?;
while let Some(delta) = rx.recv().await { /* … */ }

Step and StepSummary

Each tick(source, DeltaBatch) call:

  1. Integrates the delta into the source's running snapshot (via apply_delta).
  2. Walks the view DAG in topological order.
  3. Diffs each view's full SQL result against its previous output to produce a true DeltaBatch.
  4. Emits the non-empty deltas to watches and downstream views.

StepSummary reports rows_in (rows received this tick), rows_out (delta rows emitted), and duration_ms.

DeltaBatch (the data type)

DeltaBatch is a RecordBatch with one extra Int64 column named _weight (value +1 for inserts, -1 for retractions, 0 for cancelled-by-update). The runtime treats weights ≠ 0 as "row presence" with multiplicity; 0 weights are dropped before emission.

Dirty-bit scheduling

When the planner sees that a view's SQL references no dirty source or upstream view, that view is skipped in the tick. This makes the per-tick cost proportional to the size of the change, not the size of the data.

Content-addressed dedup

Opt-in per source. Krishiv hashes each incoming batch; identical back-to-back batches (same content) are coalesced. Capped at DEDUP_SEEN_CAPACITY = 10 000 000 entries; older entries are dropped silently and counted via krishiv_dedup_dropped_total.

Checkpoint and restore

flow.checkpoint("s3://bucket/krishiv/ivm/").await?;
// … time passes, possibly failures …
flow.restore("s3://bucket/krishiv/ivm/").await?;
// the next tick resumes from the restored state

Delta checkpoints (per source) serialise only the delta since the last checkpoint. The full state is rebuilt by replaying deltas on restore.

Full checkpoints (for migrations)

checkpoint_full(path) writes the entire materialised state, not just the delta. Use for cross-version migrations where the delta format may have changed.

Coordinator-authoritative remote ticks

When a flow runs in distributed mode, force_diff_based() makes a remote tick bit-identical to a central tick. The coordinator acquires a per-job step_lock, parallelises the step across executors for partitioned views, and waits for all shards to finish before publishing. On failure it re-feeds the pending delta rather than re-computing from the source.

Partitioned flows

PartitionedIncrementalFlow shards the source data by a partition key (typically a hash of the primary key). Each shard is an independent flow; the coordinator merges the deltas. Use for very high-volume sources or when the materialised state does not fit in one executor's memory.

Vector views

You can register a view whose body is a vector-store query (e.g. "k-nearest neighbours to this embedding"). The view's materialised state is a list of point ids + payloads. IvmVectorSink::spawn_vector_view wires a flow to a vector store; VectorViewSpec configures the distance metric and index parameters.

See also