Stream-Table Temporal Join
Enrich a Kafka stream with the latest version of a slowly-changing Iceberg dimension table.
The classic "click enrichment" pattern: a fast stream of events joined against a slowly-changing dimension table, with the correct historical version of the dimension row applied at the event's time.
When to use
You have a high-volume event stream (clicks, orders, telemetry) that needs enrichment from a slowly-changing reference dataset (users, products, geo, feature flags). Re-computing the dimension on every event is wasteful; reading the whole dimension on every join is wasteful too. Use a temporal join so each event picks the version of the dimension that was current at its event time.
Step 1 — Create the versioned dimension
-- Dimension with a validity window. validity_start is set on each write;
-- validity_end is open-ended for the current version.
CREATE TABLE users (
user_id BIGINT NOT NULL,
tier VARCHAR,
country VARCHAR,
validity_start TIMESTAMP NOT NULL,
validity_end TIMESTAMP
);
Krishiv's temporal join uses validity_start and validity_end columns. The planner picks the version with validity_start <= event_time AND (validity_end IS NULL OR event_time < validity_end).
Step 2 — Register the stream
let session = Session::embedded().await?;
session.register_kafka_source(
"clicks",
clicks_schema,
"broker:9092",
"clicks",
"krishiv-app",
)?;
Step 3 — Join
use krishiv_api::temporal_join;
let enriched = temporal_join(
session.table("clicks")?.with_event_time("event_time")?,
session.table("users")?,
"event_time",
&["user_id"],
/* inner = */ false, // left outer
)?;
Or via SQL — the planner rewrites the temporal-as-of join automatically:
SELECT c.event_time, c.url, u.tier, u.country
FROM clicks c
JOIN users u FOR SYSTEM_TIME AS OF c.event_time
ON c.user_id = u.user_id;
State and watermark
Each executor keeps a VersionedTableState for the dimension. It's bounded by max_versions_per_key (default 8). Set a watermark on the stream so late events pick a stable dimension version:
let stream = session.table("clicks")?
.with_event_time("event_time")?
.watermark("event_time", 5_000)?; // 5 s allowed lateness
Performance
- Per-key state grows as the dimension evolves; expect ~100 B per version per key.
- For very wide dimensions, project the join columns:
SELECT c.*, u.tier, u.country FROM ...instead ofSELECT *. - If the dimension fits in memory, set
KRISHIV_DIMENSION_CACHE=1to skip the version lookup.