ProductDocumentationExamplesBlogRoadmapGitHubGet Started
Available

Your first Krishiv pipeline

A 15-minute end-to-end tutorial: read a file, run a query, write a result.

This tutorial takes you from zero to a working pipeline that reads a CSV file, runs a SQL aggregation, and writes the result back to Parquet. Pick Python or Rust — both are first-class.

Prerequisites

  • Python: Python 3.10+, then pip install --no-build-isolation krishiv (or build from source: maturin develop --manifest-path crates/krishiv-python/Cargo.toml).
  • Rust: Rust 1.80+ and the just runner.
  • ~50 lines of code, no external services.

Step 1 — Create the input

Create data/orders.csv with the following content (the path is referenced in later steps):

order_id,customer_id,amount,event_time
1,c1,42.50,1700000000
2,c2,15.00,1700000060
3,c1,9.99,1700000120
4,c3,120.00,1700000180
5,c2,7.50,1700000240
6,c1,300.00,1700000300
7,c3,55.25,1700000360
8,c2,12.40,1700000420

Step 2a — Python pipeline

Create pipeline.py:

import krishiv as ks
from krishiv.functions import col, sum, desc

# 1. Create an in-process session (no daemon, no cluster).
session = ks.Session.embedded()

# 2. Register the CSV as a SQL table.
session.register_csv("orders", "data/orders.csv")

# 3. Run a SQL aggregation.
top_customers = session.sql("""
    SELECT customer_id, SUM(amount) AS total
    FROM orders
    GROUP BY customer_id
    ORDER BY total DESC
    LIMIT 5
""")

# 4. Print the result.
print("Top customers by total spend:")
top_customers.show()

# 5. Write the result to a Parquet file.
top_customers.write_parquet("out/top_customers.parquet")
print("Wrote out/top_customers.parquet")

Run it:

python pipeline.py

Expected output (numbers match the CSV above):

Top customers by total spend:
+--------------+-------+
| customer_id  | total |
+--------------+-------+
| c1           | 352.49|
| c3           | 175.25|
| c2           |  34.90|
+--------------+-------+

Wrote out/top_customers.parquet

Step 2b — Rust pipeline

Create src/main.rs:

use krishiv_api::{col, desc, sum, Session};

#[tokio::main]
async fn main() -> krishiv_api::Result<()> {
    let session = Session::embedded().await?;
    session.register_csv("orders", "data/orders.csv").await?;

    let df = session.sql("
        SELECT customer_id, SUM(amount) AS total
        FROM orders
        GROUP BY customer_id
        ORDER BY total DESC
        LIMIT 5
    ").await?;

    println!("Top customers by total spend:");
    df.show().await?;
    df.write_parquet("out/top_customers.parquet", None).await?;
    println!("Wrote out/top_customers.parquet");
    Ok(())
}

Run it:

cargo run

Step 3 — Inspect the result

Read the Parquet file you just wrote and confirm the schema:

import pyarrow.parquet as pq
table = pq.read_table("out/top_customers.parquet")
print(table.schema)
print(table.to_pandas())

Step 4 — Modify the query

Try changing the query. Some ideas:

  • Add a WHERE amount > 20 filter.
  • Add COUNT(*) AS n_orders alongside SUM(amount).
  • Switch the DataFrame API for the SQL string (see the DataFrame 101 recipe).
  • Read a Parquet file instead of CSV (see Parquet & Object Store).
  • Run the same code against the single-node daemon for restart-durable state (see Single-node deployment).

What you learned

  • Session.embedded() creates an in-process engine — no daemon, no cluster.
  • register_csv / register_parquet expose a file as a SQL table.
  • SQL is parsed and planned by DataFusion and executed by the same runtime as DataFrame and Stream.
  • Results are Arrow RecordBatch values, and writes go back through Arrow.

Next steps