Streaming Joins
Stream-table temporal joins, stream-stream interval joins, and regular joins on bounded input.
Streaming joins in Krishiv come in three flavors. Pick by what you need: a slowly-changing dimension table, two correlated event streams, or a batch lookup.
Stream-table temporal join (as-of)
Join a stream against a versioned table where each row of the table has a validity window. The engine picks the version of the dimension row that was current at the time of the stream event. This is the SQL FOR SYSTEM_TIME AS OF pattern.
use krishiv_api::{Session, temporal_join, Result};
#[tokio::main]
async fn main() -> Result<()> {
let session = Session::embedded().await?;
let enriched = temporal_join(
session.read_kafka("orders", schema, "broker:9092", "orders", "app")?
.with_event_time("event_time")?,
session.table("users")?, // versioned dimension
"event_time",
&["user_id"],
false, // inner=false ⇒ left outer
)?;
Ok(())
}
Python equivalent: stream_table_join(stream, table, "event_time", ["user_id"], inner=False).
Stream-stream interval join
Two streams with time-range correlation: for each event on the left, find events on the right whose time is within a window. Useful for click→impression joins, fraud detection, correlating sensor events.
use krishiv_api::{interval_join, Result};
#[tokio::main]
async fn main() -> Result<()> {
let joined = interval_join(
clicks,
impressions,
IntervalJoinSpec {
lower_bound_ms: -10_000, // impression can be 10 s before click
upper_bound_ms: +30_000, // or 30 s after
key_column: "ad_id".into(),
max_buffer_per_side: 10_000,
},
)?;
Ok(())
}
Per-key state is bounded by max_buffer_per_side; older events are dropped and counted via the late-event handler.
Regular join on streaming input
If both inputs are bounded (e.g. two Parquet files), use the standard DataFrame join — no special streaming semantics. If one or both inputs are unbounded, you must give the planner a window: .tumbling_window(...) first, then .join(...). The join will run as a windowed join (state per window).
State and backpressure
| Join | State backend | Backpressure |
|---|---|---|
| Temporal | VersionedTableState per join key | Drops late events past TTL; counts via LateEventHandler. |
| Interval | PerKeyIntervalJoin with max_buffer_per_side | Oldest events dropped first when buffer full. |
| Windowed | Same as windowed aggregation: RocksDB per-key accumulators | Watermark-driven emission. |
FOR SYSTEM_TIME AS OF version column, use a regular join with a watermark(...) on the stream side.