Kafka → Parquet pipeline
Read a Kafka topic, run a streaming aggregation, write the result to Parquet.
Preview: The Kafka source and Parquet sink are implemented. End-to-end certification depends on the durability profile and certified source/sink combination — see Connectors.
Requirements
- Enable the
kafkaCargo feature (ormaturin develop --features kafkafor Python). - Use the
single-node-durableordistributed-durableprofile for at-least-once delivery.
Declare with SQL DDL
CREATE SOURCE orders_raw
TYPE KAFKA
OPTIONS (
'brokers' = 'broker1:9092,broker2:9092',
'topic' = 'orders',
'group.id' = 'krishiv-orders',
'auto.offset.reset' = 'latest',
'format' = 'json'
)
WITH SCHEMA (
order_id BIGINT NOT NULL,
customer VARCHAR,
amount DOUBLE,
event_time TIMESTAMP
);
CREATE SINK per_minute_totals
TYPE PARQUET
OPTIONS (
'path' = 's3://my-bucket/out/per_minute/',
'compression' = 'zstd'
);
START PIPELINE orders_raw TO per_minute_totals
AS
SELECT
tumble_start(event_time, INTERVAL '1 minute') AS window_start,
customer,
SUM(amount) AS total
FROM orders_raw
GROUP BY tumble_start(event_time, INTERVAL '1 minute'), customer;
Python equivalent
import krishiv as ks
session = ks.Session.local() # single-node daemon
session.register_kafka_source(
"orders_raw", schema,
brokers="broker1:9092",
topic="orders",
group="krishiv-orders",
)
(session.read_stream()
.format("kafka")
.option("topic", "orders")
.load()
.group_by(window("event_time", "1 minute"), "customer")
.agg(sum("amount").alias("total"))
.write_stream()
.format("parquet")
.option("path", "s3://my-bucket/out/per_minute/")
.option("checkpoint", "s3://my-bucket/checkpoints/per_minute/")
.start())
See also
- Kafka Connector — full option reference
- Parquet & Object Store
- Checkpointing