ProductDocumentationExamplesBlogRoadmapGitHubGet Started
Available

Stream-Table Temporal Join

Enrich a Kafka stream with the latest version of a slowly-changing Iceberg dimension table.

The classic "click enrichment" pattern: a fast stream of events joined against a slowly-changing dimension table, with the correct historical version of the dimension row applied at the event's time.

When to use

You have a high-volume event stream (clicks, orders, telemetry) that needs enrichment from a slowly-changing reference dataset (users, products, geo, feature flags). Re-computing the dimension on every event is wasteful; reading the whole dimension on every join is wasteful too. Use a temporal join so each event picks the version of the dimension that was current at its event time.

Step 1 — Create the versioned dimension

-- Dimension with a validity window. validity_start is set on each write;
-- validity_end is open-ended for the current version.
CREATE TABLE users (
  user_id     BIGINT NOT NULL,
  tier        VARCHAR,
  country     VARCHAR,
  validity_start TIMESTAMP NOT NULL,
  validity_end   TIMESTAMP
);

Krishiv's temporal join uses validity_start and validity_end columns. The planner picks the version with validity_start <= event_time AND (validity_end IS NULL OR event_time < validity_end).

Step 2 — Register the stream

let session = Session::embedded().await?;
session.register_kafka_source(
    "clicks",
    clicks_schema,
    "broker:9092",
    "clicks",
    "krishiv-app",
)?;

Step 3 — Join

use krishiv_api::temporal_join;

let enriched = temporal_join(
    session.table("clicks")?.with_event_time("event_time")?,
    session.table("users")?,
    "event_time",
    &["user_id"],
    /* inner = */ false,  // left outer
)?;

Or via SQL — the planner rewrites the temporal-as-of join automatically:

SELECT c.event_time, c.url, u.tier, u.country
FROM clicks c
JOIN users u FOR SYSTEM_TIME AS OF c.event_time
  ON c.user_id = u.user_id;

State and watermark

Each executor keeps a VersionedTableState for the dimension. It's bounded by max_versions_per_key (default 8). Set a watermark on the stream so late events pick a stable dimension version:

let stream = session.table("clicks")?
    .with_event_time("event_time")?
    .watermark("event_time", 5_000)?;  // 5 s allowed lateness

Performance

  • Per-key state grows as the dimension evolves; expect ~100 B per version per key.
  • For very wide dimensions, project the join columns: SELECT c.*, u.tier, u.country FROM ... instead of SELECT *.
  • If the dimension fits in memory, set KRISHIV_DIMENSION_CACHE=1 to skip the version lookup.

See also