Windows and Watermarks
Tumbling, sliding, and session windows; watermark strategies; late-event handling.
Krishiv supports the three standard window types. All are defined as part of the operator runtime in krishiv-dataflow and have SQL equivalents in krishiv-sql for the GROUP BY form.
Window types
| Type | SQL helper | Stream API | Description |
|---|---|---|---|
| Tumbling | tumble_start(ts, interval) / tumble_end(ts, interval) | .tumbling_window(size_ms) | Fixed-size non-overlapping windows aligned to the epoch. |
| Sliding (Hop) | hop_start(ts, slide, size) / hop_end(ts, slide, size) | .sliding_window(size_ms, slide_ms) | Overlapping windows; each row may belong to multiple. Bounded only in this release. |
| Session | session_start(ts, gap) / session_end(ts, gap) | .session_window(gap_ms) | Windows that close after an inactivity gap. Bounded only in this release. |
SQL example
SELECT
tumble_start(event_time, INTERVAL '1 minute') AS window_start,
tumble_end(event_time, INTERVAL '1 minute') AS window_end,
COUNT(*) AS events
FROM events
GROUP BY
tumble_start(event_time, INTERVAL '1 minute'),
tumble_end(event_time, INTERVAL '1 minute');
Stream API example (Rust)
use krishiv_api::{Session, Result, col, count, sum};
#[tokio::main]
async fn main() -> Result<()> {
let session = Session::embedded().await?;
let (stream, sender) = session.memory_stream(schema)?;
let per_minute = stream
.watermark("event_time", 5_000)? // 5 s allowed lateness
.key_by("user_id")?
.tumbling_window(60_000) // 1-minute windows
.agg(vec![count(col("*")), sum(col("amount"))]);
sender.send(batch)?;
let next_window = per_minute.collect_with_aggs(vec![count(col("*"))]).await?;
Ok(())
}
Watermarks
The watermark is the operator runtime's "I have seen all events with event time ≤ W." Three places to set it:
- Per-column, per-stream via
.watermark("event_time", lag_ms)orwith_watermark(WatermarkSpec::fixed_lag_ms(...)). - Across multiple sources via
MultiSourceWatermarkSpec::new().source("a", spec).source("b", spec). The effective watermark is min across all sources (configurable idle timeout). - Via SQL with
SET watermark.lag = INTERVAL '5' SECOND(session-scoped, applies to the next query in the same session).
Late events
Events whose event time is < current watermark are late. Default behavior: drop. Two opt-ins:
| Strategy | API | Use |
|---|---|---|
CountingLateEventHandler (default) | counts in watermark.record_late_drop() | metrics / alerting |
| Custom handler | implement trait LateEventHandler and pass via with_late_event_handler(handler) | route to side output, or count by key |
| Event-time TTL | set TtlStateBackend.set_watermark(...) and StateTtlConfig with ttl_ms | bound state growth for late-arriving keys |
Stateful vs stateless
Tumbling and sliding windows are state-backed: their accumulators persist to RocksDB (under the single-node-durable or distributed-durable durability profile) so they survive restart. In dev-local mode they use in-memory state backed by a checkpoint file.