Queries and Lifecycle
DataStreamWriter, StreamingQuery, output modes, triggers, listeners, cancellation.
A streaming query in Krishiv is a long-running job owned by the coordinator (or by the local cluster in single-node mode). The DataStreamWriter is the way you start one, and the StreamingQuery handle is how you observe and stop it.
DataStreamWriter
use krishiv_api::Session;
#[tokio::main]
async fn main() -> krishiv_api::Result<()> {
let session = Session::embedded().await?;
let streaming = session
.read_parquet("data/orders.parquet").await?
.to_streaming()
.with_event_time("event_time")
.tumbling_window(60_000)
.agg(vec![count(col("*"))]);
let query = streaming
.write_stream()
.output_mode(OutputMode::Append) // Append | Update | Complete
.trigger(Trigger::ProcessingTime(5_000)) // 5-s micro-batches
.format("parquet") // kafka | parquet | iceberg | memory | console
.option("path", "out/per_minute/")
.option("checkpoint.location", "ckpt/")
.start().await?;
Ok(())
}
Output modes
| Mode | What is emitted | Use |
|---|---|---|
Append (default) | Only new rows. Triggers downstream sinks to commit only the new data. | Idempotent sinks (Parquet, Iceberg) — the safe default. |
Update | Inserts and updates keyed by primary key. State-backed, requires OutputMode::Update support in the sink. | Materialized result tables, dedup pipelines. |
Complete | The full result table for each trigger. | Small aggregate tables where the sink can rewrite. |
Triggers
| Trigger | Behavior | Latency |
|---|---|---|
Once | Process all available data, then stop. No checkpoints. | One-shot backfill. |
AvailableNow | Process all currently-available data in batched micro-triggers, then stop. Checkpoints between triggers. | Backfill that survives restart. |
ProcessingTime(n) | Trigger every n milliseconds. Checkpointed. | Seconds-scale latency. Default for production. |
Continuous(n) | Run as a true streaming pipeline; checkpoint every n ms. | Sub-second latency. Higher coordinator overhead. |
Sinks (formats)
.format(name) takes a string and dispatches:
| Format | Notes |
|---|---|
memory | Writes to an in-process Vec<RecordBatch>. query.memory_batches() retrieves. |
console | Prints to stdout. Useful for debugging. |
parquet | Local file or S3/ADLS/GCS path. Checkpointed. |
kafka | Topic + bootstrap servers. Use with_kafka_transactional(...) for exactly-once. |
iceberg | Catalog URI + warehouse. Two-phase commit for exactly-once. |
foreach_batch | Pass a closure that gets called with each micro-batch. |
StreamingQuery lifecycle
let query = handle.start().await?;
// observe
println!("id={} state={:?}", query.id(), query.status().state);
let progress = query.last_progress();
if let Some(p) = progress {
println!("trigger={:?} input_rows={} output_rows={}", p.trigger, p.input_rows, p.output_rows);
}
// wait with timeout
query.await_termination_timeout(Duration::from_secs(60)).await?;
// stop
query.stop();
On Drop the query is stopped automatically.
StreamingQueryManager and listeners
For long-lived services that own many queries, use StreamingQueryManager to register listeners:
let mgr = StreamingQueryManager::new();
mgr.add_listener(Arc::new(MyListener));
struct MyListener;
impl StreamingQueryListener for MyListener {
fn on_query_terminated(&self, e: &QueryTerminatedEvent) {
log::error!("query {} terminated: {:?}", e.query_id, e.exception);
}
}
Query events include the last StreamingQueryProgress so you can write a clean shutdown handler.
Cancellation and timeout
- Per-query timeout:
session.sql_with_timeout("...", 30_000)(ms). - Cancel a query:
query.stop()orsession.operation_registry().cancel(op_id). - Coordinator-driven cancel via gRPC:
TaskCancellationRequest(perkrishiv-proto).