ProductDocumentationExamplesBlogRoadmapGitHubGet Started
Available

Windows and Watermarks

Tumbling, sliding, and session windows; watermark strategies; late-event handling.

Krishiv supports the three standard window types. All are defined as part of the operator runtime in krishiv-dataflow and have SQL equivalents in krishiv-sql for the GROUP BY form.

Windows partition an unbounded stream into finite buckets. Watermarks mark the point past which only late events are expected.

Window types

TypeSQL helperStream APIDescription
Tumblingtumble_start(ts, interval) / tumble_end(ts, interval).tumbling_window(size_ms)Fixed-size non-overlapping windows aligned to the epoch.
Sliding (Hop)hop_start(ts, slide, size) / hop_end(ts, slide, size).sliding_window(size_ms, slide_ms)Overlapping windows; each row may belong to multiple. Bounded only in this release.
Sessionsession_start(ts, gap) / session_end(ts, gap).session_window(gap_ms)Windows that close after an inactivity gap. Bounded only in this release.

SQL example

SELECT
  tumble_start(event_time, INTERVAL '1 minute') AS window_start,
  tumble_end(event_time,   INTERVAL '1 minute') AS window_end,
  COUNT(*) AS events
FROM events
GROUP BY
  tumble_start(event_time, INTERVAL '1 minute'),
  tumble_end(event_time,   INTERVAL '1 minute');

Stream API example (Rust)

use krishiv_api::{Session, Result, col, count, sum};

#[tokio::main]
async fn main() -> Result<()> {
    let session = Session::embedded().await?;
    let (stream, sender) = session.memory_stream(schema)?;

    let per_minute = stream
        .watermark("event_time", 5_000)?           // 5 s allowed lateness
        .key_by("user_id")?
        .tumbling_window(60_000)                  // 1-minute windows
        .agg(vec![count(col("*")), sum(col("amount"))]);

    sender.send(batch)?;
    let next_window = per_minute.collect_with_aggs(vec![count(col("*"))]).await?;
    Ok(())
}

Watermarks

The watermark is the operator runtime's "I have seen all events with event time ≤ W." Three places to set it:

  1. Per-column, per-stream via .watermark("event_time", lag_ms) or with_watermark(WatermarkSpec::fixed_lag_ms(...)).
  2. Across multiple sources via MultiSourceWatermarkSpec::new().source("a", spec).source("b", spec). The effective watermark is min across all sources (configurable idle timeout).
  3. Via SQL with SET watermark.lag = INTERVAL '5' SECOND (session-scoped, applies to the next query in the same session).

Late events

Events whose event time is < current watermark are late. Default behavior: drop. Two opt-ins:

StrategyAPIUse
CountingLateEventHandler (default)counts in watermark.record_late_drop()metrics / alerting
Custom handlerimplement trait LateEventHandler and pass via with_late_event_handler(handler)route to side output, or count by key
Event-time TTLset TtlStateBackend.set_watermark(...) and StateTtlConfig with ttl_msbound state growth for late-arriving keys

Stateful vs stateless

Tumbling and sliding windows are state-backed: their accumulators persist to RocksDB (under the single-node-durable or distributed-durable durability profile) so they survive restart. In dev-local mode they use in-memory state backed by a checkpoint file.

See also