ProductDocumentationExamplesBlogRoadmapGitHubGet Started
Preview

Kafka → Parquet pipeline

Read a Kafka topic, run a streaming aggregation, write the result to Parquet.

Preview: The Kafka source and Parquet sink are implemented. End-to-end certification depends on the durability profile and certified source/sink combination — see Connectors.

Requirements

  • Enable the kafka Cargo feature (or maturin develop --features kafka for Python).
  • Use the single-node-durable or distributed-durable profile for at-least-once delivery.

Declare with SQL DDL

CREATE SOURCE orders_raw
TYPE KAFKA
OPTIONS (
  'brokers'           = 'broker1:9092,broker2:9092',
  'topic'             = 'orders',
  'group.id'          = 'krishiv-orders',
  'auto.offset.reset' = 'latest',
  'format'            = 'json'
)
WITH SCHEMA (
  order_id   BIGINT   NOT NULL,
  customer   VARCHAR,
  amount     DOUBLE,
  event_time TIMESTAMP
);

CREATE SINK per_minute_totals
TYPE PARQUET
OPTIONS (
  'path'        = 's3://my-bucket/out/per_minute/',
  'compression' = 'zstd'
);

START PIPELINE orders_raw TO per_minute_totals
AS
SELECT
  tumble_start(event_time, INTERVAL '1 minute') AS window_start,
  customer,
  SUM(amount) AS total
FROM orders_raw
GROUP BY tumble_start(event_time, INTERVAL '1 minute'), customer;

Python equivalent

import krishiv as ks

session = ks.Session.local()  # single-node daemon
session.register_kafka_source(
    "orders_raw", schema,
    brokers="broker1:9092",
    topic="orders",
    group="krishiv-orders",
)

(session.read_stream()
    .format("kafka")
    .option("topic", "orders")
    .load()
    .group_by(window("event_time", "1 minute"), "customer")
    .agg(sum("amount").alias("total"))
    .write_stream()
    .format("parquet")
    .option("path", "s3://my-bucket/out/per_minute/")
    .option("checkpoint", "s3://my-bucket/checkpoints/per_minute/")
    .start())

See also