ProductDocumentationExamplesBlogRoadmapGitHubGet Started
Available

State Types

ValueState, ListState, MapState, ReducingState, BroadcastState — APIs and idioms.

Five state primitives. All are per-key, named, and back by a state backend. They are serde::Serialize + DeserializeOwned by default (the blanket StateValue trait).

Keyed state comes in four shapes: a single value, an append-only list, a per-key map, and a reducer that combines updates.

ValueState<T>

One value per key. The most common primitive.

use krishiv_api::ValueState;

let state = ValueState::<i64>::new("seen_count");

// in on_event
let n = state.get_json().unwrap_or(0);
state.set_json(n + 1);

Python: state = ValueState("seen_count"); state.set_json(n); n = state.get_json().

ListState<T>

Append-only list per key. Good for "all events for this user in the last hour" or "rolling 1000 prices for this symbol".

MethodEffect
add_json(v) / add(v)Append v to the list.
get() / get_json() -> Vec<T>Read the list (a copy).
length() / is_empty()Count.
clear()Drop the list.

If you need bounded retention, wrap with a TTL or trim in on_timer.

MapState<K, V>

Keyed map per outer key. Good for "user has these active sessions" or "this device has these attributes".

MethodEffect
put_json(k, v) / put(k, v)Set k -> v.
get(k) / get_json(k) -> Option<V>Read or None.
contains(k) -> boolExistence check.
keys() / values() / entries()Iterate.
remove(k) / clear()Delete one or all.

ReducingState<T>

A monoid-reducible accumulator per key. Useful for "running max", "running quantile sketch", or "running HyperLogLog".

use krishiv_api::ReducingState;

let max_amt = ReducingState::<i64>::new("max_amount");

// in on_event
let prev = max_amt.get().unwrap_or(i64::MIN);
max_amt.merge(batch_amount.max(prev));

The merge op is a user-supplied associative function; the engine never inspects the value. This means the accumulator is opaque from the engine's point of view — the state backend just stores and forwards bytes.

BroadcastState<K, V>

Shared across all parallel instances of a function. Updates are propagated to every instance; reads are local. Use for small, read-mostly lookup tables (rules, features, geo data).

use krishiv_api::{BroadcastState, BroadcastStateDescriptor};

let rules = BroadcastStateDescriptor::<i32, String>::new("rules");
let state = BroadcastState::<i32, String>::new("rules");
let v = state.get(&42); // local read

Size limit: keep broadcast state < 10 MB per descriptor. Larger belongs in a real table source.

TTL with TtlStateBackend

Wrap any state backend in a TTL to evict keys after a configurable time, either wall-clock or event-time:

use krishiv_api::{TtlConfig, TtlStateBackend, RocksDbStateBackend};
use std::sync::Arc;
use std::time::Duration;

let inner = Arc::new(RocksDbStateBackend::open("/var/krishiv/state")?);
let ttl = TtlStateBackend::new(inner, TtlConfig::new(60_000)); // 60 s
// event-time mode: when a watermark is set, expiry is in event time, not wall clock
ttl.set_watermark(current_watermark_ms);

Wire format: each state value is stored as [8-byte LE expires_at_ms][raw bytes]. On read, expired entries are treated as absent and removed on the next write. This is what makes TTL safe when the event-time watermark lags the wall clock.

Backends

BackendUsePersistence
InMemoryStateBackendTests, dev-localNone
RocksDbStateBackendProduction (single-node-durable and up)Local filesystem, crash-safe
TtlStateBackend<B>Wrapper for any backend to add TTL semanticsSame as inner

See also