CDC Routing
Bridging Kafka CDC topics into lakehouse sinks with offset tracking.
The CDC router lets you bridge a Kafka CDC topic (Debezium-style: op column with values c/u/d + before/after payloads) into a lakehouse table that supports merge / upsert.
Topology
Kafka CDC topic → krishiv CDC router → Iceberg / Delta / Hudi (with two-phase commit)
│
└─ DLQ (rows that don't match the schema)
Configuration
use krishiv_connectors::cdc::CdcRouter;
let router = CdcRouter::builder()
.source_kafka("broker:9092", "orders.cdc", "krishiv-cdc")
.sink_iceberg("catalog.uri", "warehouse", "orders")
.key_columns(&["order_id"])
.dlq_parquet("./dlq/")
.build()?;
Supported operations
| CDC op | Sink action |
|---|---|
c (create) | INSERT |
u (update) | MERGE (matched by key_columns) |
d (delete) | DELETE |
r (read, snapshot) | Ignored (no-op) |
Offset tracking
Per-partition Kafka offsets are captured in the checkpoint. On restart, the router resumes from the last committed offset. Exactly-once requires the destination sink to support 2PC; otherwise the router falls back to at-least-once.
Schema mapping
The router expects the CDC payload to have at least:
| Column | Type |
|---|---|
op | utf8 — one of c, u, d, r |
before | struct (or null) — the row before the change |
after | struct (or null) — the row after the change |
ts_ms | int64 — source timestamp |
Configure schema with .with_before_field("before"), .with_after_field("after"), .with_op_field("op"), .with_ts_field("ts_ms").
DLQ for malformed records
Records that don't parse (missing op, missing before/after, schema mismatch) are routed to the DLQ sink with an explanation in _dlq_reason. Counts in krishiv_cdc_dlq_total{topic, reason}.