State Types
ValueState, ListState, MapState, ReducingState, BroadcastState — APIs and idioms.
Five state primitives. All are per-key, named, and back by a state backend. They are serde::Serialize + DeserializeOwned by default (the blanket StateValue trait).
ValueState<T>
One value per key. The most common primitive.
use krishiv_api::ValueState;
let state = ValueState::<i64>::new("seen_count");
// in on_event
let n = state.get_json().unwrap_or(0);
state.set_json(n + 1);
Python: state = ValueState("seen_count"); state.set_json(n); n = state.get_json().
ListState<T>
Append-only list per key. Good for "all events for this user in the last hour" or "rolling 1000 prices for this symbol".
| Method | Effect |
|---|---|
add_json(v) / add(v) | Append v to the list. |
get() / get_json() -> Vec<T> | Read the list (a copy). |
length() / is_empty() | Count. |
clear() | Drop the list. |
If you need bounded retention, wrap with a TTL or trim in on_timer.
MapState<K, V>
Keyed map per outer key. Good for "user has these active sessions" or "this device has these attributes".
| Method | Effect |
|---|---|
put_json(k, v) / put(k, v) | Set k -> v. |
get(k) / get_json(k) -> Option<V> | Read or None. |
contains(k) -> bool | Existence check. |
keys() / values() / entries() | Iterate. |
remove(k) / clear() | Delete one or all. |
ReducingState<T>
A monoid-reducible accumulator per key. Useful for "running max", "running quantile sketch", or "running HyperLogLog".
use krishiv_api::ReducingState;
let max_amt = ReducingState::<i64>::new("max_amount");
// in on_event
let prev = max_amt.get().unwrap_or(i64::MIN);
max_amt.merge(batch_amount.max(prev));
The merge op is a user-supplied associative function; the engine never inspects the value. This means the accumulator is opaque from the engine's point of view — the state backend just stores and forwards bytes.
BroadcastState<K, V>
Shared across all parallel instances of a function. Updates are propagated to every instance; reads are local. Use for small, read-mostly lookup tables (rules, features, geo data).
use krishiv_api::{BroadcastState, BroadcastStateDescriptor};
let rules = BroadcastStateDescriptor::<i32, String>::new("rules");
let state = BroadcastState::<i32, String>::new("rules");
let v = state.get(&42); // local read
Size limit: keep broadcast state < 10 MB per descriptor. Larger belongs in a real table source.
TTL with TtlStateBackend
Wrap any state backend in a TTL to evict keys after a configurable time, either wall-clock or event-time:
use krishiv_api::{TtlConfig, TtlStateBackend, RocksDbStateBackend};
use std::sync::Arc;
use std::time::Duration;
let inner = Arc::new(RocksDbStateBackend::open("/var/krishiv/state")?);
let ttl = TtlStateBackend::new(inner, TtlConfig::new(60_000)); // 60 s
// event-time mode: when a watermark is set, expiry is in event time, not wall clock
ttl.set_watermark(current_watermark_ms);
Wire format: each state value is stored as [8-byte LE expires_at_ms][raw bytes]. On read, expired entries are treated as absent and removed on the next write. This is what makes TTL safe when the event-time watermark lags the wall clock.
Backends
| Backend | Use | Persistence |
|---|---|---|
InMemoryStateBackend | Tests, dev-local | None |
RocksDbStateBackend | Production (single-node-durable and up) | Local filesystem, crash-safe |
TtlStateBackend<B> | Wrapper for any backend to add TTL semantics | Same as inner |