ProductDocumentationExamplesBlogRoadmapGitHubGet Started
Available

Two-Phase Commit & Delivery Guarantees

Atomic sink writes, the writer–commit–ack protocol, and the certified delivery-guarantee matrix.

Two-phase commit (2PC) is what turns at-least-once into exactly-once. Krishiv implements the standard prepare → commit protocol, coordinated by the checkpoint barrier.

The protocol

  1. The coordinator injects a checkpoint barrier into every task.
  2. Each task drains its current output, writes to its sink via prepare, and acknowledges the barrier with the staged commit handle.
  3. Once every task has acked, the coordinator commits the epoch: every sink's commit is called. The checkpoint is final only after every commit succeeds.
  4. If any task fails before ack, the coordinator aborts the epoch: every prepared commit is aborted. No partial state is left visible.

Two-phase sink implementations

SinkNotes
LocalParquetTwoPhaseCommitSinkWrites to a temp dir, then renames on commit. Crash-safe on local disk.
InMemoryTwoPhaseCommitSinkFor tests.
IcebergNativeTwoPhaseCommit (iceberg)Writes a new Iceberg snapshot, then renames the metadata pointer on commit.
HudiTwoPhaseCommitSink (lakehouse)Writes a Hudi commit, then publishes the timeline on commit.
LocalDeltaTwoPhaseCommitSink (lakehouse)Writes a Delta log entry, then renames the checkpoint on commit.
Kafka transactional sink (kafka)Uses the Kafka EOS API: initTransactionsbeginTransactionsendOffsetsToTransactioncommitTransaction.

When you get exactly-once

Exactly-once delivery requires all of:

  1. Source supports offset commit (e.g. Kafka).
  2. Sink supports 2PC (or is transactional in the source's sense).
  3. Durability profile is distributed-durable.
  4. Coordinator uses fenced commits (epoch tokens).

See the Connector Certification matrix for the certified combinations.

Data quality (DLT-style)

Pair 2PC with a DataQualityConfig to drop or fail records that violate expectations:

use krishiv_connectors::quality::{DataQualityConfig, DataQualityRule, QualityAction};

let cfg = DataQualityConfig::new()
    .rule(DataQualityRule::not_null("user_id"))
    .rule(DataQualityRule::range("amount", 0.0, 1_000_000.0))
    .on_violation(QualityAction::Drop)
    .with_dead_letter(DeadLetterSink::parquet("./dlq/"));

See also