Stateful process function
Run a per-key process function with ValueState.
Process functions run per record with access to keyed state. Use them for running counts, de-duplication, pattern tracking, and per-key enrichment.
Python
import krishiv as ks
from krishiv import apply_process_function, ProcessContext, ValueState
def count_per_key(ctx: ProcessContext, batch, state: ValueState):
seen = state.get_json() or 0
seen += batch.num_rows
state.set_json(seen)
ctx.emit(batch)
session = ks.Session.embedded()
stream, sender = session.memory_stream(schema)
keyed = stream.key_by("user_id")
out = apply_process_function(keyed, count_per_key, ValueState("seen"))
State is keyed by the
key_by partition. ValueState stores one value per key. MapState and ListState are also available — see Python State.State durability
State is backed by krishiv-state — in-memory under dev-local, RocksDB under single-node-durable and distributed-durable. State is restored from checkpoint on restart.