ProductDocumentationExamplesBlogRoadmapGitHubGet Started
Available

Stateful process function

Run a per-key process function with ValueState.

Process functions run per record with access to keyed state. Use them for running counts, de-duplication, pattern tracking, and per-key enrichment.

Python

import krishiv as ks
from krishiv import apply_process_function, ProcessContext, ValueState

def count_per_key(ctx: ProcessContext, batch, state: ValueState):
    seen = state.get_json() or 0
    seen += batch.num_rows
    state.set_json(seen)
    ctx.emit(batch)

session = ks.Session.embedded()
stream, sender = session.memory_stream(schema)

keyed = stream.key_by("user_id")
out = apply_process_function(keyed, count_per_key, ValueState("seen"))
State is keyed by the key_by partition. ValueState stores one value per key. MapState and ListState are also available — see Python State.

State durability

State is backed by krishiv-state — in-memory under dev-local, RocksDB under single-node-durable and distributed-durable. State is restored from checkpoint on restart.