ProductDocumentationExamplesBlogRoadmapGitHubGet Started
Available

Tumbling window aggregation

Group a stream into fixed-size, non-overlapping time windows.

Tumbling windows partition a stream into fixed-size buckets aligned to the epoch. Use them for "events per minute", "errors per hour", etc.

SQL

SELECT
  tumble_start(event_time, INTERVAL '1 minute') AS window_start,
  tumble_end(event_time,   INTERVAL '1 minute') AS window_end,
  COUNT(*) AS events
FROM events
GROUP BY tumble_start(event_time, INTERVAL '1 minute'),
         tumble_end(event_time,   INTERVAL '1 minute');

Python (Stream API)

import krishiv as ks
from krishiv.functions import count, sum, col

session = ks.Session.embedded()
schema = ...  # PyArrow schema: event_time timestamp, user_id utf8, amount float64
stream, sender = session.memory_stream(schema)

windowed = (stream
    .watermark("event_time", 5000)        # 5s allowed lateness
    .key_by("user_id")
    .tumbling_window(60_000)              # 1-minute windows
    .agg([count(col("*")).alias("events"),
          sum(col("amount")).alias("total")]))

# Push events and collect results
import pyarrow as pa
sender.send(pa.record_batch([...]))
print(windowed.try_next())  # next windowed aggregate

Rust (Stream API)

use krishiv_api::{col, count, sum, Session};

#[tokio::main]
async fn main() -> krishiv_api::Result<()> {
    let session = Session::embedded().await?;
    let (stream, sender) = session.memory_stream(schema)?;

    let windowed = stream
        .watermark("event_time", 5_000)?
        .key_by("user_id")?
        .tumbling_window(60_000)
        .agg(vec![count(col("*")), sum(col("amount"))]);
    Ok(())
}

See also