ProductDocumentationExamplesBlogRoadmapGitHubGet Started
Available

Queries and Lifecycle

DataStreamWriter, StreamingQuery, output modes, triggers, listeners, cancellation.

A streaming query in Krishiv is a long-running job owned by the coordinator (or by the local cluster in single-node mode). The DataStreamWriter is the way you start one, and the StreamingQuery handle is how you observe and stop it.

DataStreamWriter

use krishiv_api::Session;

#[tokio::main]
async fn main() -> krishiv_api::Result<()> {
    let session = Session::embedded().await?;

    let streaming = session
        .read_parquet("data/orders.parquet").await?
        .to_streaming()
        .with_event_time("event_time")
        .tumbling_window(60_000)
        .agg(vec![count(col("*"))]);

    let query = streaming
        .write_stream()
        .output_mode(OutputMode::Append)              // Append | Update | Complete
        .trigger(Trigger::ProcessingTime(5_000))     // 5-s micro-batches
        .format("parquet")                          // kafka | parquet | iceberg | memory | console
        .option("path", "out/per_minute/")
        .option("checkpoint.location", "ckpt/")
        .start().await?;

    Ok(())
}

Output modes

ModeWhat is emittedUse
Append (default)Only new rows. Triggers downstream sinks to commit only the new data.Idempotent sinks (Parquet, Iceberg) — the safe default.
UpdateInserts and updates keyed by primary key. State-backed, requires OutputMode::Update support in the sink.Materialized result tables, dedup pipelines.
CompleteThe full result table for each trigger.Small aggregate tables where the sink can rewrite.

Triggers

TriggerBehaviorLatency
OnceProcess all available data, then stop. No checkpoints.One-shot backfill.
AvailableNowProcess all currently-available data in batched micro-triggers, then stop. Checkpoints between triggers.Backfill that survives restart.
ProcessingTime(n)Trigger every n milliseconds. Checkpointed.Seconds-scale latency. Default for production.
Continuous(n)Run as a true streaming pipeline; checkpoint every n ms.Sub-second latency. Higher coordinator overhead.

Sinks (formats)

.format(name) takes a string and dispatches:

FormatNotes
memoryWrites to an in-process Vec<RecordBatch>. query.memory_batches() retrieves.
consolePrints to stdout. Useful for debugging.
parquetLocal file or S3/ADLS/GCS path. Checkpointed.
kafkaTopic + bootstrap servers. Use with_kafka_transactional(...) for exactly-once.
icebergCatalog URI + warehouse. Two-phase commit for exactly-once.
foreach_batchPass a closure that gets called with each micro-batch.

StreamingQuery lifecycle

let query = handle.start().await?;

// observe
println!("id={} state={:?}", query.id(), query.status().state);
let progress = query.last_progress();
if let Some(p) = progress {
    println!("trigger={:?} input_rows={} output_rows={}", p.trigger, p.input_rows, p.output_rows);
}

// wait with timeout
query.await_termination_timeout(Duration::from_secs(60)).await?;

// stop
query.stop();

On Drop the query is stopped automatically.

StreamingQueryManager and listeners

For long-lived services that own many queries, use StreamingQueryManager to register listeners:

let mgr = StreamingQueryManager::new();
mgr.add_listener(Arc::new(MyListener));

struct MyListener;
impl StreamingQueryListener for MyListener {
    fn on_query_terminated(&self, e: &QueryTerminatedEvent) {
        log::error!("query {} terminated: {:?}", e.query_id, e.exception);
    }
}

Query events include the last StreamingQueryProgress so you can write a clean shutdown handler.

Cancellation and timeout

  • Per-query timeout: session.sql_with_timeout("...", 30_000) (ms).
  • Cancel a query: query.stop() or session.operation_registry().cancel(op_id).
  • Coordinator-driven cancel via gRPC: TaskCancellationRequest (per krishiv-proto).

See also