ProductDocumentationExamplesBlogRoadmapGitHubGet Started
Available

Two-Phase Commit Pipeline

Build an exactly-once pipeline: Kafka → Krishiv → Iceberg with two-phase commit.

The recipe that proves it works: ingest a Kafka topic, run a windowed aggregation, and write the result to Iceberg with exactly-once delivery.

Prerequisites

  • Kafka cluster with a topic that has multiple partitions and a stable schema.
  • Iceberg REST catalog (Nessie, Polaris, or Glue).
  • Object store for both Kafka (if not local) and Iceberg.
  • Build with cargo build --release --features 'kafka iceberg' -p krishiv.

Step 1 — Environment

export KRISHIV_COORDINATOR=https://coord.internal:50051
export KRISHIV_COORDINATOR_BEARER_TOKEN=...
export KRISHIV_DURABILITY_PROFILE=distributed-durable
export KRISHIV_SHUFFLE_OBJECT_STORE_URI=s3://my-bucket/shuffle/
export KRISHIV_ICEBERG_CATALOG_URI=http://catalog:8181
export KRISHIV_ICEBERG_WAREHOUSE=s3://my-bucket/warehouse/
export OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317

Step 2 — Topology

orders-topic  ──►  Kafka source  ──►  windowed aggregation  ──►  Iceberg sink (2PC)
                            │                                            │
                            └─►  checkpoint (Kafka offset)  ◄───────────┘

Step 3 — The pipeline

use krishiv_api::{Session, col, count, sum};
use krishiv_connectors::iceberg::IcebergSink;
use std::sync::Arc;

#[tokio::main]
async fn main() -> krishiv_api::Result<()> {
    // Distributed: must point at a coordinator
    let session = Session::connect("https://coord.internal:50051").await?;
    session.register_kafka_source(
        "orders",
        orders_schema(),
        "broker:9092",
        "orders",
        "krishiv-app",
    )?;

    // Build the streaming query
    let per_minute = session
        .table("orders")?
        .to_streaming()
        .with_event_time("event_time")
        .watermark("event_time", 5_000)  // 5 s allowed lateness
        .tumbling_window(60_000)         // 1-minute windows
        .agg(vec![count(col("*")), sum(col("amount"))]);

    // Two-phase commit Iceberg sink
    let sink = Arc::new(
        IcebergSink::new("http://catalog:8181", "s3://my-bucket/warehouse", "orders_per_minute")
            .with_two_phase_commit()
    );

    // Start the query
    let query = per_minute
        .write_stream()
        .output_mode(krishiv_api::OutputMode::Append)
        .trigger(krishiv_api::Trigger::ProcessingTime(5_000))
        .format("iceberg")
        .option("sink", sink)
        .option("checkpoint.location", "s3://my-bucket/ckpt/orders_per_minute/")
        .start().await?;

    // Wait for it
    query.await_termination().await?;
    Ok(())
}

Step 4 — Verify exactly-once

  1. Push the same Kafka batch twice (replay it). The Iceberg table should not gain duplicate rows.
  2. Kill the coordinator mid-pipeline (or simulate with a chaos test). The Iceberg snapshot is not renamed; on restart, the Kafka offset is not committed; the batch is replayed from the last committed checkpoint and the snapshot is committed exactly once.
  3. Inspect the Iceberg snapshot lineage: SELECT snapshot_id, parent_snapshot_id, summary FROM orders_per_minute.snapshots;. Each commit produces a single child snapshot; no double-parent edges.

See also