Tumbling window aggregation
Group a stream into fixed-size, non-overlapping time windows.
Tumbling windows partition a stream into fixed-size buckets aligned to the epoch. Use them for "events per minute", "errors per hour", etc.
SQL
SELECT
tumble_start(event_time, INTERVAL '1 minute') AS window_start,
tumble_end(event_time, INTERVAL '1 minute') AS window_end,
COUNT(*) AS events
FROM events
GROUP BY tumble_start(event_time, INTERVAL '1 minute'),
tumble_end(event_time, INTERVAL '1 minute');
Python (Stream API)
import krishiv as ks
from krishiv.functions import count, sum, col
session = ks.Session.embedded()
schema = ... # PyArrow schema: event_time timestamp, user_id utf8, amount float64
stream, sender = session.memory_stream(schema)
windowed = (stream
.watermark("event_time", 5000) # 5s allowed lateness
.key_by("user_id")
.tumbling_window(60_000) # 1-minute windows
.agg([count(col("*")).alias("events"),
sum(col("amount")).alias("total")]))
# Push events and collect results
import pyarrow as pa
sender.send(pa.record_batch([...]))
print(windowed.try_next()) # next windowed aggregate
Rust (Stream API)
use krishiv_api::{col, count, sum, Session};
#[tokio::main]
async fn main() -> krishiv_api::Result<()> {
let session = Session::embedded().await?;
let (stream, sender) = session.memory_stream(schema)?;
let windowed = stream
.watermark("event_time", 5_000)?
.key_by("user_id")?
.tumbling_window(60_000)
.agg(vec![count(col("*")), sum(col("amount"))]);
Ok(())
}
See also
- Window Functions —
TUMBLE,HOP,SESSIONhelpers in SQL - Python Stream & Windows
- Rust Stream & KeyedStream