ProductDocumentationExamplesBlogRoadmapGitHubGet Started
Available

Stateful Process Functions

Per-record processing with keyed state, timers, and multi-stream coordination.

When windowed aggregation is the wrong shape — for example, "per-user dedup with custom logic", "per-card fraud scoring with two event types", or "broadcast a rules table to all partitions" — you want a stateful process function. Krishiv provides three flavors.

ProcessFunction (single stream)

The simplest model. on_event is called per record, with access to per-key state and timers.

use krishiv_api::{apply_process_function, ValueState, ProcessContext, ProcessFunction};

struct Counter { state: ValueState<i64> }

impl ProcessFunction for Counter {
    fn on_event(&mut self, _key: &[u8], batch: &RecordBatch, _row: usize, ctx: &mut ProcessContext) {
        let n = self.state.get_json().unwrap_or(0);
        self.state.set_json(n + batch.num_rows() as i64);
        ctx.emit(batch.clone());
    }
    fn on_timer(&mut self, _key: &[u8], _fire_ms: i64, _ctx: &mut ProcessContext) {}
}

let out = apply_process_function(stream, "user_id", Counter { state: ValueState::new("count") }, Default::default())?;

Python: apply_process_function(stream, "user_id", my_fn, ValueState("count")).

Timers

Two timer kinds per ProcessFunction:

KindAPIWhen it fires
Event-timectx.register_event_time_timer(fire_time_ms)When the watermark crosses the fire time.
Processing-timectx.register_processing_time_timer(fire_time_ms)When wall clock crosses the fire time.

Timers are per-key. When fired, the runtime calls on_timer on the same ProcessFunction instance. The function can then emit, modify state, and register new timers.

ConnectedStreams (two streams, one function)

Co-process function: receive events from two keyed streams, share state. Useful for click→impression correlation that needs to share a "this user is suspicious" flag.

use krishiv_api::{connect_streams, CoProcessFunction, ProcessContext, ValueState};

struct FraudScorer { flagged: ValueState<bool> }

impl CoProcessFunction for FraudScorer {
    fn on_event(&mut self, _key: &[u8], batch: &RecordBatch, _row: usize, ctx: &mut ProcessContext) {
        let flagged = self.flagged.get_json().unwrap_or(false);
        if flagged { ctx.emit(batch.clone()); }
    }
}

let out = connect_streams(left, right, FraudScorer { flagged: ValueState::new("flagged") })?;

BroadcastState (rules shared across partitions)

When every parallel instance of a function needs the same read-only state (e.g. a small rules table) use BroadcastState. Updates are propagated to all parallel instances. Reads are local.

use krishiv_api::{broadcast_stream, BroadcastState, BroadcastProcessFunction};

struct ApplyRules { rules: BroadcastState<i32, String> }
impl BroadcastProcessFunction for ApplyRules {
    fn on_broadcast(&mut self, _key: &[u8], batch: &RecordBatch, _row: usize, ctx: &mut ProcessContext) {
        if let Some(rule) = self.rules.get(&42) {
            // apply rule
        }
        ctx.emit(batch.clone());
    }
}
let out = broadcast_stream(stream, &rules_descriptor, ApplyRules { rules: BroadcastState::new("rules") })?;

State primitives

All process functions take a state descriptor. Krishiv ships five kinds:

KindAPIUse
ValueState<T>state.get_json() / set_json(v) / clear()Single value per key.
ListState<T>state.add_json(v) / get() / clear()Append-only list per key.
MapState<K,V>state.put_json(k, v) / get(k) / remove(k)Keyed map per outer key.
ReducingState<T>state.add(v) / get() / merge(v)Monoid-reducible accumulator per key.
BroadcastState<K,V>shared across all parallel instancesRead-mostly lookup tables.

State backends: RocksDbStateBackend (default in single-node-durable / distributed-durable) or in-memory (default in dev-local). See State.

See also