IncrementalFlow
The programmatic API for incremental view maintenance: sources, views, ticks, checkpoints, watches.
Where Incremental Views are the SQL face of IVM, IncrementalFlow is the Rust face. It gives you programmatic control over sources, views, ticks, checkpoints, and watches. Use it when you need a custom tick trigger, a non-SQL source, or a watch that pushes deltas to another system.
Shape of a flow
use krishiv_ivm::IncrementalFlow;
let mut flow = IncrementalFlow::new();
// 1. Declare one or more sources (record-batched inputs)
flow.register_source("orders", orders_schema)?;
// 2. Declare views as SQL; the flow orders them topologically
flow.register_view("order_totals",
"SELECT customer_id, SUM(amount) AS total FROM orders GROUP BY customer_id")?;
flow.register_view("top_customers",
"SELECT * FROM order_totals ORDER BY total DESC LIMIT 10")?;
// 3. Drive ticks; each tick returns a StepSummary
let summary = flow.tick("orders", delta_batch).await?;
println!("rows_in={} rows_out={} duration_ms={}",
summary.rows_in, summary.rows_out, summary.duration_ms);
// 4. Read snapshots (current materialised state) or watch for deltas
let snapshot = flow.snapshot("top_customers").await?;
let mut rx = flow.watch_output("top_customers")?;
while let Some(delta) = rx.recv().await { /* … */ }
Step and StepSummary
Each tick(source, DeltaBatch) call:
- Integrates the delta into the source's running snapshot (via
apply_delta). - Walks the view DAG in topological order.
- Diffs each view's full SQL result against its previous output to produce a true
DeltaBatch. - Emits the non-empty deltas to watches and downstream views.
StepSummary reports rows_in (rows received this tick), rows_out (delta rows emitted), and duration_ms.
DeltaBatch (the data type)
DeltaBatch is a RecordBatch with one extra Int64 column named _weight (value +1 for inserts, -1 for retractions, 0 for cancelled-by-update). The runtime treats weights ≠ 0 as "row presence" with multiplicity; 0 weights are dropped before emission.
Dirty-bit scheduling
When the planner sees that a view's SQL references no dirty source or upstream view, that view is skipped in the tick. This makes the per-tick cost proportional to the size of the change, not the size of the data.
Content-addressed dedup
Opt-in per source. Krishiv hashes each incoming batch; identical back-to-back batches (same content) are coalesced. Capped at DEDUP_SEEN_CAPACITY = 10 000 000 entries; older entries are dropped silently and counted via krishiv_dedup_dropped_total.
Checkpoint and restore
flow.checkpoint("s3://bucket/krishiv/ivm/").await?;
// … time passes, possibly failures …
flow.restore("s3://bucket/krishiv/ivm/").await?;
// the next tick resumes from the restored state
Delta checkpoints (per source) serialise only the delta since the last checkpoint. The full state is rebuilt by replaying deltas on restore.
Full checkpoints (for migrations)
checkpoint_full(path) writes the entire materialised state, not just the delta. Use for cross-version migrations where the delta format may have changed.
Coordinator-authoritative remote ticks
When a flow runs in distributed mode, force_diff_based() makes a remote tick bit-identical to a central tick. The coordinator acquires a per-job step_lock, parallelises the step across executors for partitioned views, and waits for all shards to finish before publishing. On failure it re-feeds the pending delta rather than re-computing from the source.
Partitioned flows
PartitionedIncrementalFlow shards the source data by a partition key (typically a hash of the primary key). Each shard is an independent flow; the coordinator merges the deltas. Use for very high-volume sources or when the materialised state does not fit in one executor's memory.
Vector views
You can register a view whose body is a vector-store query (e.g. "k-nearest neighbours to this embedding"). The view's materialised state is a list of point ids + payloads. IvmVectorSink::spawn_vector_view wires a flow to a vector store; VectorViewSpec configures the distance metric and index parameters.