Data Quality & Dead Letter
Per-source data quality rules, drop / fail actions, and the dead-letter sink.
Data quality in Krishiv is a per-source config that runs on the write path (between the operator and the sink). It produces either dropped rows, a failed query, or routed-to-DLQ rows.
Configuration
use krishiv_connectors::quality::{DataQualityConfig, DataQualityRule, QualityAction, DeadLetterSink};
let cfg = DataQualityConfig::new()
.rule(DataQualityRule::not_null("user_id"))
.rule(DataQualityRule::range("amount", 0.0, 1_000_000.0))
.rule(DataQualityRule::regex("email", r"^[^@]+@[^@]+$"))
.on_violation(QualityAction::Drop)
.with_dead_letter(DeadLetterSink::parquet("./dlq/"));
let sink = IcebergSink::new(...).with_quality(cfg);
Rule kinds
| Rule | What it checks |
|---|---|
not_null(col) | Column is non-null for every row in the batch. |
range(col, min, max) | Column is in [min, max] for every row. |
regex(col, pattern) | Column matches the regex. |
enum(col, &[v1, v2]) | Column is one of the listed values. |
custom(col, fn(batch) -> Mask) | Custom boolean mask per row. |
Actions
| Action | Behavior |
|---|---|
Drop | Remove the offending rows from the batch. Continue. |
Fail | Fail the query with DataQualityError. Return non-zero in the CLI. |
DeadLetter | Send the offending rows to a separate sink (e.g. Parquet) and continue with the clean rows. |
Dead-letter sink
A dead-letter sink is itself a Krishiv sink. It writes to a separate file, table, or topic:
DeadLetterSink::parquet("./dlq/")
DeadLetterSink::iceberg("dlq_catalog", "dlq.dlq_table")
DeadLetterSink::kafka("broker:9092", "dlq.orders")
Each dead-letter record carries an extra _dlq_reason string column with the rule that failed.
Metrics
Per rule and per source, Krishiv exposes:
krishiv_dataquality_dropped_total{source, rule}krishiv_dataquality_failed_total{source, rule}krishiv_dataquality_dlq_total{source, rule, sink}