Two-Phase Commit Pipeline
Build an exactly-once pipeline: Kafka → Krishiv → Iceberg with two-phase commit.
The recipe that proves it works: ingest a Kafka topic, run a windowed aggregation, and write the result to Iceberg with exactly-once delivery.
Prerequisites
- Kafka cluster with a topic that has multiple partitions and a stable schema.
- Iceberg REST catalog (Nessie, Polaris, or Glue).
- Object store for both Kafka (if not local) and Iceberg.
- Build with
cargo build --release --features 'kafka iceberg' -p krishiv.
Step 1 — Environment
export KRISHIV_COORDINATOR=https://coord.internal:50051
export KRISHIV_COORDINATOR_BEARER_TOKEN=...
export KRISHIV_DURABILITY_PROFILE=distributed-durable
export KRISHIV_SHUFFLE_OBJECT_STORE_URI=s3://my-bucket/shuffle/
export KRISHIV_ICEBERG_CATALOG_URI=http://catalog:8181
export KRISHIV_ICEBERG_WAREHOUSE=s3://my-bucket/warehouse/
export OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
Step 2 — Topology
orders-topic ──► Kafka source ──► windowed aggregation ──► Iceberg sink (2PC)
│ │
└─► checkpoint (Kafka offset) ◄───────────┘
Step 3 — The pipeline
use krishiv_api::{Session, col, count, sum};
use krishiv_connectors::iceberg::IcebergSink;
use std::sync::Arc;
#[tokio::main]
async fn main() -> krishiv_api::Result<()> {
// Distributed: must point at a coordinator
let session = Session::connect("https://coord.internal:50051").await?;
session.register_kafka_source(
"orders",
orders_schema(),
"broker:9092",
"orders",
"krishiv-app",
)?;
// Build the streaming query
let per_minute = session
.table("orders")?
.to_streaming()
.with_event_time("event_time")
.watermark("event_time", 5_000) // 5 s allowed lateness
.tumbling_window(60_000) // 1-minute windows
.agg(vec![count(col("*")), sum(col("amount"))]);
// Two-phase commit Iceberg sink
let sink = Arc::new(
IcebergSink::new("http://catalog:8181", "s3://my-bucket/warehouse", "orders_per_minute")
.with_two_phase_commit()
);
// Start the query
let query = per_minute
.write_stream()
.output_mode(krishiv_api::OutputMode::Append)
.trigger(krishiv_api::Trigger::ProcessingTime(5_000))
.format("iceberg")
.option("sink", sink)
.option("checkpoint.location", "s3://my-bucket/ckpt/orders_per_minute/")
.start().await?;
// Wait for it
query.await_termination().await?;
Ok(())
}
Step 4 — Verify exactly-once
- Push the same Kafka batch twice (replay it). The Iceberg table should not gain duplicate rows.
- Kill the coordinator mid-pipeline (or simulate with a chaos test). The Iceberg snapshot is not renamed; on restart, the Kafka offset is not committed; the batch is replayed from the last committed checkpoint and the snapshot is committed exactly once.
- Inspect the Iceberg snapshot lineage:
SELECT snapshot_id, parent_snapshot_id, summary FROM orders_per_minute.snapshots;. Each commit produces a single child snapshot; no double-parent edges.