ProductDocumentationExamplesBlogRoadmapGitHubGet Started
Available

Pipeline Builder

Fluent Rust API for sources, views, sinks, CDC, and data-quality expectations.

The Pipeline DSL is the canonical way to compose a multi-stage data flow in Rust. PipelineBuilder is the fluent entry point; Pipeline is the validated, runnable plan.

A pipeline is a chain of typed operations. Each step returns a new builder; nothing runs until you call .sink() or .show().

Shape of a pipeline

use krishiv_api::PipelineBuilder;

let pipeline = PipelineBuilder::new("orders_to_totals")
    .source("orders", source_cdc)             // CDC change stream
    .source("users", source_memory)            // in-memory reference data
    .view("enriched",
          "SELECT o.*, u.tier FROM orders o JOIN users u ON o.user_id = u.id",
          /* materialized = */ true)
    .view("totals",
          "SELECT user_id, SUM(amount) AS total FROM enriched GROUP BY user_id",
          /* materialized = */ true)
    .flow("top", "SELECT * FROM totals ORDER BY total DESC LIMIT 100")  // fan-in via UNION ALL
    .sink_memory("enriched", vec![])         // also tee to memory for tests
    .sink("totals", sink_iceberg)              // primary sink
    .expect("totals", "non_negative_total",
            "total >= 0", OnViolation::Drop)
    .build()?;

pipeline.validate()?;
let report = pipeline.run(RunPolicy::Coalesce).await?;

Pipeline modes

PipelineMode is auto-inferred from the source kind:

SourceModeBehavior
source_cdc(...)IvmEach tick produces a DeltaBatch; views are updated incrementally.
source_memory(...)BatchThe whole batch is run once; run blocks until the result is emitted.
Any otherStreamContinuous query with the configured trigger.

Override with .mode(PipelineMode::Ivm) when the inference is wrong (e.g. a Kafka source in a non-IVM pipeline).

Sources

MethodWhat it does
.source(name, Ingest)Bounded source: file, in-memory batches, or a SQL subquery.
.source_cdc(name, Vec<CdcChange>)CDC source: insert / delete / update(before, after) records.
.source_memory(name, Vec<RecordBatch>)In-memory batches (tests and one-shots).

For streaming sources (Kafka, Kinesis, Pulsar) build the flow with the lower-level DataStreamWriter or use Pipeline::run in stream mode against a registered source.

Views

MethodWhat it does
.view(name, sql, materialized)Register a view. materialized = true materialises it; false inlines the query on read.
.temp_view(name, sql)A non-materialised view (always inlined). For ad-hoc helpers.
.flow(target, sql)Append a view to another view's UNION ALL. Use for fan-in pipelines.

Sinks

MethodWhat it does
.sink(view, Egress)Send a view's output to a sink (Parquet, Iceberg, Kafka, etc.).
.sink_memory(view, Arc<Mutex<Vec<RecordBatch>>>)Send to an in-memory buffer (tests, sampling).

One pipeline can fan out the same view to multiple sinks.

Expectations (DLT-style data quality)

Attach a quality gate to a view:

use krishiv_api::{Expectation, OnViolation};

let exp = Expectation::new("totals", "non_negative_total", "total >= 0", OnViolation::Drop);
let pipeline = builder.expect("totals", "non_negative_total", "total >= 0", OnViolation::Drop).build()?;

OnViolation::Drop removes the offending rows; OnViolation::Fail fails the run. Counters in krishiv_dataquality_dropped_total and krishiv_dataquality_failed_total.

RunPolicy

Controls how the runtime schedules ticks:

  • RunPolicy::Coalesce — batch up source updates that arrive within a window. Default for IVM and stream modes.
  • RunPolicy::Immediate — tick on every source update. Lowest latency, highest overhead.
  • RunPolicy::Batched(interval) — tick on a wall-clock interval. Use for backfills.

Combine with pipeline.run(policy) to start, or pipeline.refresh(policy) to force a full-state rebuild from sources.

Validation

Before run, Pipeline::validate() checks:

  • All referenced sources exist.
  • All view SQL parses.
  • All sink connectors are available given the active Cargo features.
  • The inferred mode matches the requested mode.

Returns a Vec<ValidationError> (empty on success). Wire this into CI for pre-deployment checks.

See also