ProductDocumentationExamplesBlogRoadmapGitHubGet Started
Available

CDC Routing

Bridging Kafka CDC topics into lakehouse sinks with offset tracking.

The CDC router lets you bridge a Kafka CDC topic (Debezium-style: op column with values c/u/d + before/after payloads) into a lakehouse table that supports merge / upsert.

Topology

Kafka CDC topic → krishiv CDC router → Iceberg / Delta / Hudi (with two-phase commit)
                            │
                            └─ DLQ (rows that don't match the schema)

Configuration

use krishiv_connectors::cdc::CdcRouter;

let router = CdcRouter::builder()
    .source_kafka("broker:9092", "orders.cdc", "krishiv-cdc")
    .sink_iceberg("catalog.uri", "warehouse", "orders")
    .key_columns(&["order_id"])
    .dlq_parquet("./dlq/")
    .build()?;

Supported operations

CDC opSink action
c (create)INSERT
u (update)MERGE (matched by key_columns)
d (delete)DELETE
r (read, snapshot)Ignored (no-op)

Offset tracking

Per-partition Kafka offsets are captured in the checkpoint. On restart, the router resumes from the last committed offset. Exactly-once requires the destination sink to support 2PC; otherwise the router falls back to at-least-once.

Schema mapping

The router expects the CDC payload to have at least:

ColumnType
oputf8 — one of c, u, d, r
beforestruct (or null) — the row before the change
afterstruct (or null) — the row after the change
ts_msint64 — source timestamp

Configure schema with .with_before_field("before"), .with_after_field("after"), .with_op_field("op"), .with_ts_field("ts_ms").

DLQ for malformed records

Records that don't parse (missing op, missing before/after, schema mismatch) are routed to the DLQ sink with an explanation in _dlq_reason. Counts in krishiv_cdc_dlq_total{topic, reason}.

See also