CDC to Iceberg
Stream Debezium-format change events from Kafka into an Iceberg table with exactly-once commits.
The CDC router bridges a Debezium-style Kafka topic (the standard for MySQL, Postgres, MongoDB change streams) into an Iceberg table. The destination is updated with MERGE semantics keyed by the row's primary key, so inserts, updates, and deletes all land in the right place.
Topology
MySQL ──► Debezium ──► Kafka (orders.cdc) ──► Krishiv CDC router ──► Iceberg (orders)
│
└─► DLQ
Step 1 — Iceberg target
-- Create the table once (e.g. via Spark or the Iceberg CLI)
CREATE TABLE orders (
order_id BIGINT,
user_id BIGINT,
amount DOUBLE,
status VARCHAR,
ts TIMESTAMP
) USING iceberg
PARTITIONED BY (days(ts));
Register it in Krishiv via a catalog:
export KRISHIV_ICEBERG_CATALOG_URI=http://catalog:8181
export KRISHIV_ICEBERG_WAREHOUSE=s3://my-bucket/warehouse
Step 2 — Configure the CDC router
use krishiv_connectors::cdc::CdcRouter;
use krishiv_connectors::iceberg::IcebergSink;
let sink = IcebergSink::new(&catalog_uri, &warehouse, "orders")
.with_two_phase_commit();
let router = CdcRouter::builder()
.source_kafka("broker:9092", "orders.cdc", "krishiv-cdc")
.sink(sink)
.key_columns(&["order_id"])
.dlq_parquet("./dlq/")
.build()?;
Step 3 — Run
router.run().await?;
Each Kafka record carries a Debezium envelope: op (one of c/u/d/r), before, after, and ts_ms. Krishiv rewrites them to SQL operations:
| Debezium op | Iceberg action |
|---|---|
c (create) | INSERT |
u (update) | MERGE (matched by key_columns) |
d (delete) | DELETE |
r (read) | no-op |
Step 4 — Verify exactly-once
With the iceberg feature built and the coordinator running with distributed-durable, the CDC router uses two-phase commit. On failure mid-commit, the Iceberg snapshot is not renamed; the source Kafka offset is not committed. On restart, the router replays from the last committed offset and the same operations produce the same snapshot.
Step 5 — Watch the DLQ
Records that don't parse (missing op, schema mismatch) go to ./dlq/ as Parquet. Each row carries a _dlq_reason string column. Counts in krishiv_cdc_dlq_total{topic, reason}.