ProductDocumentationExamplesBlogRoadmapGitHubGet Started
Available

CDC to Iceberg

Stream Debezium-format change events from Kafka into an Iceberg table with exactly-once commits.

The CDC router bridges a Debezium-style Kafka topic (the standard for MySQL, Postgres, MongoDB change streams) into an Iceberg table. The destination is updated with MERGE semantics keyed by the row's primary key, so inserts, updates, and deletes all land in the right place.

CDC to Iceberg: Postgres WAL → Debezium → Kafka → Krishiv MERGE / IVM → Iceberg snapshots. Every row in Iceberg is reproducible from the WAL.

Topology

MySQL  ──►  Debezium  ──►  Kafka (orders.cdc)  ──►  Krishiv CDC router  ──►  Iceberg (orders)
                                                                                   │
                                                                                   └─►  DLQ

Step 1 — Iceberg target

-- Create the table once (e.g. via Spark or the Iceberg CLI)
CREATE TABLE orders (
  order_id   BIGINT,
  user_id    BIGINT,
  amount     DOUBLE,
  status     VARCHAR,
  ts         TIMESTAMP
) USING iceberg
PARTITIONED BY (days(ts));

Register it in Krishiv via a catalog:

export KRISHIV_ICEBERG_CATALOG_URI=http://catalog:8181
export KRISHIV_ICEBERG_WAREHOUSE=s3://my-bucket/warehouse

Step 2 — Configure the CDC router

use krishiv_connectors::cdc::CdcRouter;
use krishiv_connectors::iceberg::IcebergSink;

let sink = IcebergSink::new(&catalog_uri, &warehouse, "orders")
    .with_two_phase_commit();

let router = CdcRouter::builder()
    .source_kafka("broker:9092", "orders.cdc", "krishiv-cdc")
    .sink(sink)
    .key_columns(&["order_id"])
    .dlq_parquet("./dlq/")
    .build()?;

Step 3 — Run

router.run().await?;

Each Kafka record carries a Debezium envelope: op (one of c/u/d/r), before, after, and ts_ms. Krishiv rewrites them to SQL operations:

Debezium opIceberg action
c (create)INSERT
u (update)MERGE (matched by key_columns)
d (delete)DELETE
r (read)no-op

Step 4 — Verify exactly-once

With the iceberg feature built and the coordinator running with distributed-durable, the CDC router uses two-phase commit. On failure mid-commit, the Iceberg snapshot is not renamed; the source Kafka offset is not committed. On restart, the router replays from the last committed offset and the same operations produce the same snapshot.

Step 5 — Watch the DLQ

Records that don't parse (missing op, schema mismatch) go to ./dlq/ as Parquet. Each row carries a _dlq_reason string column. Counts in krishiv_cdc_dlq_total{topic, reason}.

See also