Two-Phase Commit & Delivery Guarantees
Atomic sink writes, the writer–commit–ack protocol, and the certified delivery-guarantee matrix.
Two-phase commit (2PC) is what turns at-least-once into exactly-once. Krishiv implements the standard prepare → commit protocol, coordinated by the checkpoint barrier.
The protocol
- The coordinator injects a checkpoint barrier into every task.
- Each task drains its current output, writes to its sink via
prepare, and acknowledges the barrier with the stagedcommit handle. - Once every task has acked, the coordinator commits the epoch: every sink's
commitis called. The checkpoint is final only after every commit succeeds. - If any task fails before ack, the coordinator aborts the epoch: every
prepared commit isaborted. No partial state is left visible.
Two-phase sink implementations
| Sink | Notes |
|---|---|
LocalParquetTwoPhaseCommitSink | Writes to a temp dir, then renames on commit. Crash-safe on local disk. |
InMemoryTwoPhaseCommitSink | For tests. |
IcebergNativeTwoPhaseCommit (iceberg) | Writes a new Iceberg snapshot, then renames the metadata pointer on commit. |
HudiTwoPhaseCommitSink (lakehouse) | Writes a Hudi commit, then publishes the timeline on commit. |
LocalDeltaTwoPhaseCommitSink (lakehouse) | Writes a Delta log entry, then renames the checkpoint on commit. |
Kafka transactional sink (kafka) | Uses the Kafka EOS API: initTransactions → beginTransaction → sendOffsetsToTransaction → commitTransaction. |
When you get exactly-once
Exactly-once delivery requires all of:
- Source supports offset commit (e.g. Kafka).
- Sink supports 2PC (or is transactional in the source's sense).
- Durability profile is
distributed-durable. - Coordinator uses fenced commits (epoch tokens).
See the Connector Certification matrix for the certified combinations.
Data quality (DLT-style)
Pair 2PC with a DataQualityConfig to drop or fail records that violate expectations:
use krishiv_connectors::quality::{DataQualityConfig, DataQualityRule, QualityAction};
let cfg = DataQualityConfig::new()
.rule(DataQualityRule::not_null("user_id"))
.rule(DataQualityRule::range("amount", 0.0, 1_000_000.0))
.on_violation(QualityAction::Drop)
.with_dead_letter(DeadLetterSink::parquet("./dlq/"));
See also
- Connector Certification — the source of truth for which combinations are supported
- Iceberg
- Kafka