ProductDocumentationExamplesBlogRoadmapGitHubGet Started
Available

Streaming Joins

Stream-table temporal joins, stream-stream interval joins, and regular joins on bounded input.

Streaming joins in Krishiv come in three flavors. Pick by what you need: a slowly-changing dimension table, two correlated event streams, or a batch lookup.

Krishiv supports three join shapes: stream-table (as-of), stream-stream (interval / windowed), and regular joins on streaming input.

Stream-table temporal join (as-of)

Join a stream against a versioned table where each row of the table has a validity window. The engine picks the version of the dimension row that was current at the time of the stream event. This is the SQL FOR SYSTEM_TIME AS OF pattern.

use krishiv_api::{Session, temporal_join, Result};

#[tokio::main]
async fn main() -> Result<()> {
    let session = Session::embedded().await?;
    let enriched = temporal_join(
        session.read_kafka("orders", schema, "broker:9092", "orders", "app")?
            .with_event_time("event_time")?,
        session.table("users")?,  // versioned dimension
        "event_time",
        &["user_id"],
        false,  // inner=false ⇒ left outer
    )?;
    Ok(())
}

Python equivalent: stream_table_join(stream, table, "event_time", ["user_id"], inner=False).

Stream-stream interval join

Two streams with time-range correlation: for each event on the left, find events on the right whose time is within a window. Useful for click→impression joins, fraud detection, correlating sensor events.

use krishiv_api::{interval_join, Result};

#[tokio::main]
async fn main() -> Result<()> {
    let joined = interval_join(
        clicks,
        impressions,
        IntervalJoinSpec {
            lower_bound_ms: -10_000,  // impression can be 10 s before click
            upper_bound_ms: +30_000,  // or 30 s after
            key_column: "ad_id".into(),
            max_buffer_per_side: 10_000,
        },
    )?;
    Ok(())
}

Per-key state is bounded by max_buffer_per_side; older events are dropped and counted via the late-event handler.

Regular join on streaming input

If both inputs are bounded (e.g. two Parquet files), use the standard DataFrame join — no special streaming semantics. If one or both inputs are unbounded, you must give the planner a window: .tumbling_window(...) first, then .join(...). The join will run as a windowed join (state per window).

State and backpressure

JoinState backendBackpressure
TemporalVersionedTableState per join keyDrops late events past TTL; counts via LateEventHandler.
IntervalPerKeyIntervalJoin with max_buffer_per_sideOldest events dropped first when buffer full.
WindowedSame as windowed aggregation: RocksDB per-key accumulatorsWatermark-driven emission.
Note: All three joins require both sides to have an event-time column set. If the dimension table has no FOR SYSTEM_TIME AS OF version column, use a regular join with a watermark(...) on the stream side.

See also