Pipeline Builder
Fluent Rust API for sources, views, sinks, CDC, and data-quality expectations.
The Pipeline DSL is the canonical way to compose a multi-stage data flow in Rust. PipelineBuilder is the fluent entry point; Pipeline is the validated, runnable plan.
Shape of a pipeline
use krishiv_api::PipelineBuilder;
let pipeline = PipelineBuilder::new("orders_to_totals")
.source("orders", source_cdc) // CDC change stream
.source("users", source_memory) // in-memory reference data
.view("enriched",
"SELECT o.*, u.tier FROM orders o JOIN users u ON o.user_id = u.id",
/* materialized = */ true)
.view("totals",
"SELECT user_id, SUM(amount) AS total FROM enriched GROUP BY user_id",
/* materialized = */ true)
.flow("top", "SELECT * FROM totals ORDER BY total DESC LIMIT 100") // fan-in via UNION ALL
.sink_memory("enriched", vec![]) // also tee to memory for tests
.sink("totals", sink_iceberg) // primary sink
.expect("totals", "non_negative_total",
"total >= 0", OnViolation::Drop)
.build()?;
pipeline.validate()?;
let report = pipeline.run(RunPolicy::Coalesce).await?;
Pipeline modes
PipelineMode is auto-inferred from the source kind:
| Source | Mode | Behavior |
|---|---|---|
source_cdc(...) | Ivm | Each tick produces a DeltaBatch; views are updated incrementally. |
source_memory(...) | Batch | The whole batch is run once; run blocks until the result is emitted. |
| Any other | Stream | Continuous query with the configured trigger. |
Override with .mode(PipelineMode::Ivm) when the inference is wrong (e.g. a Kafka source in a non-IVM pipeline).
Sources
| Method | What it does |
|---|---|
.source(name, Ingest) | Bounded source: file, in-memory batches, or a SQL subquery. |
.source_cdc(name, Vec<CdcChange>) | CDC source: insert / delete / update(before, after) records. |
.source_memory(name, Vec<RecordBatch>) | In-memory batches (tests and one-shots). |
For streaming sources (Kafka, Kinesis, Pulsar) build the flow with the lower-level DataStreamWriter or use Pipeline::run in stream mode against a registered source.
Views
| Method | What it does |
|---|---|
.view(name, sql, materialized) | Register a view. materialized = true materialises it; false inlines the query on read. |
.temp_view(name, sql) | A non-materialised view (always inlined). For ad-hoc helpers. |
.flow(target, sql) | Append a view to another view's UNION ALL. Use for fan-in pipelines. |
Sinks
| Method | What it does |
|---|---|
.sink(view, Egress) | Send a view's output to a sink (Parquet, Iceberg, Kafka, etc.). |
.sink_memory(view, Arc<Mutex<Vec<RecordBatch>>>) | Send to an in-memory buffer (tests, sampling). |
One pipeline can fan out the same view to multiple sinks.
Expectations (DLT-style data quality)
Attach a quality gate to a view:
use krishiv_api::{Expectation, OnViolation};
let exp = Expectation::new("totals", "non_negative_total", "total >= 0", OnViolation::Drop);
let pipeline = builder.expect("totals", "non_negative_total", "total >= 0", OnViolation::Drop).build()?;
OnViolation::Drop removes the offending rows; OnViolation::Fail fails the run. Counters in krishiv_dataquality_dropped_total and krishiv_dataquality_failed_total.
RunPolicy
Controls how the runtime schedules ticks:
RunPolicy::Coalesce— batch up source updates that arrive within a window. Default for IVM and stream modes.RunPolicy::Immediate— tick on every source update. Lowest latency, highest overhead.RunPolicy::Batched(interval)— tick on a wall-clock interval. Use for backfills.
Combine with pipeline.run(policy) to start, or pipeline.refresh(policy) to force a full-state rebuild from sources.
Validation
Before run, Pipeline::validate() checks:
- All referenced sources exist.
- All view SQL parses.
- All sink connectors are available given the active Cargo features.
- The inferred mode matches the requested mode.
Returns a Vec<ValidationError> (empty on success). Wire this into CI for pre-deployment checks.