Timers
Event-time and processing-time timers for stateful functions and IVM.
Timers let a stateful function do work at a specific time, instead of in response to an event. They are the canonical way to implement "do X after N seconds of inactivity" or "emit the result of this window at 12:00:00".
Timer kinds
| Kind | Fires when | API |
|---|---|---|
| Event-time | The job's watermark crosses the timer's fire time. Order is preserved per key. | ctx.register_event_time_timer(fire_ms) |
| Processing-time | Wall-clock crosses the timer's fire time. Order is not guaranteed across keys. | ctx.register_processing_time_timer(fire_ms) |
Event-time vs processing-time
Use event-time timers when the semantics of your function are tied to data (windowing, late-event detection, key-timeout). Use processing-time timers for wall-clock-driven work (heartbeats, periodic flushes, SLA deadlines).
API
use krishiv_api::{ProcessContext, TimerKind};
fn on_event(&mut self, _key: &[u8], batch: &RecordBatch, _row: usize, ctx: &mut ProcessContext) {
ctx.register_event_time_timer(batch.column_by_name("event_time").unwrap().as_any().downcast_ref::<...>());
}
fn on_timer(&mut self, _key: &[u8], fire_time_ms: i64, ctx: &mut ProcessContext) {
// do periodic work
}
Timers are per-key. When the timer fires, the engine calls on_timer on the same ProcessFunction instance with the same key. From there you can emit, modify state, and register new timers.
Persistence
Timers are persisted as part of the operator state. On restart, timers that have not yet fired are re-armed. Processing-time timers use the engine's checkpoint time as their anchor — they don't re-fire past events that happened during downtime.
Timers in IVM
IVM views can use WatermarkTracker and LatenessSpec to declaratively express timing without explicit timers. The LATENESS clause in CREATE INCREMENTAL VIEW sets a per-source late-event tolerance:
CREATE INCREMENTAL VIEW order_totals AS
SELECT customer_id, SUM(amount) AS total
FROM orders
GROUP BY customer_id
LATENESS event_time INTERVAL '10' SECOND;
This is sugar for a watermark with a 10 s allowed lateness. See Incremental Views.
Timer services
For functions that are not part of a streaming query but still need timers (e.g. background cleanup tasks), Krishiv provides two timer services:
InMemoryTimerService— fast, ephemeral.ProcessingTimeTimerService— backed by a state store so timers survive restart.