ProductDocumentationExamplesBlogRoadmapGitHubGet Started
Available

Data Quality & Dead Letter

Per-source data quality rules, drop / fail actions, and the dead-letter sink.

Data quality in Krishiv is a per-source config that runs on the write path (between the operator and the sink). It produces either dropped rows, a failed query, or routed-to-DLQ rows.

Configuration

use krishiv_connectors::quality::{DataQualityConfig, DataQualityRule, QualityAction, DeadLetterSink};

let cfg = DataQualityConfig::new()
    .rule(DataQualityRule::not_null("user_id"))
    .rule(DataQualityRule::range("amount", 0.0, 1_000_000.0))
    .rule(DataQualityRule::regex("email", r"^[^@]+@[^@]+$"))
    .on_violation(QualityAction::Drop)
    .with_dead_letter(DeadLetterSink::parquet("./dlq/"));

let sink = IcebergSink::new(...).with_quality(cfg);

Rule kinds

RuleWhat it checks
not_null(col)Column is non-null for every row in the batch.
range(col, min, max)Column is in [min, max] for every row.
regex(col, pattern)Column matches the regex.
enum(col, &[v1, v2])Column is one of the listed values.
custom(col, fn(batch) -> Mask)Custom boolean mask per row.

Actions

ActionBehavior
DropRemove the offending rows from the batch. Continue.
FailFail the query with DataQualityError. Return non-zero in the CLI.
DeadLetterSend the offending rows to a separate sink (e.g. Parquet) and continue with the clean rows.

Dead-letter sink

A dead-letter sink is itself a Krishiv sink. It writes to a separate file, table, or topic:

DeadLetterSink::parquet("./dlq/")
DeadLetterSink::iceberg("dlq_catalog", "dlq.dlq_table")
DeadLetterSink::kafka("broker:9092", "dlq.orders")

Each dead-letter record carries an extra _dlq_reason string column with the rule that failed.

Metrics

Per rule and per source, Krishiv exposes:

  • krishiv_dataquality_dropped_total{source, rule}
  • krishiv_dataquality_failed_total{source, rule}
  • krishiv_dataquality_dlq_total{source, rule, sink}

See also