Stateful Process Functions
Per-record processing with keyed state, timers, and multi-stream coordination.
When windowed aggregation is the wrong shape — for example, "per-user dedup with custom logic", "per-card fraud scoring with two event types", or "broadcast a rules table to all partitions" — you want a stateful process function. Krishiv provides three flavors.
ProcessFunction (single stream)
The simplest model. on_event is called per record, with access to per-key state and timers.
use krishiv_api::{apply_process_function, ValueState, ProcessContext, ProcessFunction};
struct Counter { state: ValueState<i64> }
impl ProcessFunction for Counter {
fn on_event(&mut self, _key: &[u8], batch: &RecordBatch, _row: usize, ctx: &mut ProcessContext) {
let n = self.state.get_json().unwrap_or(0);
self.state.set_json(n + batch.num_rows() as i64);
ctx.emit(batch.clone());
}
fn on_timer(&mut self, _key: &[u8], _fire_ms: i64, _ctx: &mut ProcessContext) {}
}
let out = apply_process_function(stream, "user_id", Counter { state: ValueState::new("count") }, Default::default())?;
Python: apply_process_function(stream, "user_id", my_fn, ValueState("count")).
Timers
Two timer kinds per ProcessFunction:
| Kind | API | When it fires |
|---|---|---|
| Event-time | ctx.register_event_time_timer(fire_time_ms) | When the watermark crosses the fire time. |
| Processing-time | ctx.register_processing_time_timer(fire_time_ms) | When wall clock crosses the fire time. |
Timers are per-key. When fired, the runtime calls on_timer on the same ProcessFunction instance. The function can then emit, modify state, and register new timers.
ConnectedStreams (two streams, one function)
Co-process function: receive events from two keyed streams, share state. Useful for click→impression correlation that needs to share a "this user is suspicious" flag.
use krishiv_api::{connect_streams, CoProcessFunction, ProcessContext, ValueState};
struct FraudScorer { flagged: ValueState<bool> }
impl CoProcessFunction for FraudScorer {
fn on_event(&mut self, _key: &[u8], batch: &RecordBatch, _row: usize, ctx: &mut ProcessContext) {
let flagged = self.flagged.get_json().unwrap_or(false);
if flagged { ctx.emit(batch.clone()); }
}
}
let out = connect_streams(left, right, FraudScorer { flagged: ValueState::new("flagged") })?;
BroadcastState (rules shared across partitions)
When every parallel instance of a function needs the same read-only state (e.g. a small rules table) use BroadcastState. Updates are propagated to all parallel instances. Reads are local.
use krishiv_api::{broadcast_stream, BroadcastState, BroadcastProcessFunction};
struct ApplyRules { rules: BroadcastState<i32, String> }
impl BroadcastProcessFunction for ApplyRules {
fn on_broadcast(&mut self, _key: &[u8], batch: &RecordBatch, _row: usize, ctx: &mut ProcessContext) {
if let Some(rule) = self.rules.get(&42) {
// apply rule
}
ctx.emit(batch.clone());
}
}
let out = broadcast_stream(stream, &rules_descriptor, ApplyRules { rules: BroadcastState::new("rules") })?;
State primitives
All process functions take a state descriptor. Krishiv ships five kinds:
| Kind | API | Use |
|---|---|---|
ValueState<T> | state.get_json() / set_json(v) / clear() | Single value per key. |
ListState<T> | state.add_json(v) / get() / clear() | Append-only list per key. |
MapState<K,V> | state.put_json(k, v) / get(k) / remove(k) | Keyed map per outer key. |
ReducingState<T> | state.add(v) / get() / merge(v) | Monoid-reducible accumulator per key. |
BroadcastState<K,V> | shared across all parallel instances | Read-mostly lookup tables. |
State backends: RocksDbStateBackend (default in single-node-durable / distributed-durable) or in-memory (default in dev-local). See State.