Your first Krishiv pipeline
A 15-minute end-to-end tutorial: read a file, run a query, write a result.
This tutorial takes you from zero to a working pipeline that reads a CSV file, runs a SQL aggregation, and writes the result back to Parquet. Pick Python or Rust — both are first-class.
Prerequisites
- Python: Python 3.10+, then
pip install --no-build-isolation krishiv(or build from source:maturin develop --manifest-path crates/krishiv-python/Cargo.toml). - Rust: Rust 1.80+ and the
justrunner. - ~50 lines of code, no external services.
Step 1 — Create the input
Create data/orders.csv with the following content (the path is referenced in later steps):
order_id,customer_id,amount,event_time
1,c1,42.50,1700000000
2,c2,15.00,1700000060
3,c1,9.99,1700000120
4,c3,120.00,1700000180
5,c2,7.50,1700000240
6,c1,300.00,1700000300
7,c3,55.25,1700000360
8,c2,12.40,1700000420
Step 2a — Python pipeline
Create pipeline.py:
import krishiv as ks
from krishiv.functions import col, sum, desc
# 1. Create an in-process session (no daemon, no cluster).
session = ks.Session.embedded()
# 2. Register the CSV as a SQL table.
session.register_csv("orders", "data/orders.csv")
# 3. Run a SQL aggregation.
top_customers = session.sql("""
SELECT customer_id, SUM(amount) AS total
FROM orders
GROUP BY customer_id
ORDER BY total DESC
LIMIT 5
""")
# 4. Print the result.
print("Top customers by total spend:")
top_customers.show()
# 5. Write the result to a Parquet file.
top_customers.write_parquet("out/top_customers.parquet")
print("Wrote out/top_customers.parquet")
Run it:
python pipeline.py
Expected output (numbers match the CSV above):
Top customers by total spend:
+--------------+-------+
| customer_id | total |
+--------------+-------+
| c1 | 352.49|
| c3 | 175.25|
| c2 | 34.90|
+--------------+-------+
Wrote out/top_customers.parquet
Step 2b — Rust pipeline
Create src/main.rs:
use krishiv_api::{col, desc, sum, Session};
#[tokio::main]
async fn main() -> krishiv_api::Result<()> {
let session = Session::embedded().await?;
session.register_csv("orders", "data/orders.csv").await?;
let df = session.sql("
SELECT customer_id, SUM(amount) AS total
FROM orders
GROUP BY customer_id
ORDER BY total DESC
LIMIT 5
").await?;
println!("Top customers by total spend:");
df.show().await?;
df.write_parquet("out/top_customers.parquet", None).await?;
println!("Wrote out/top_customers.parquet");
Ok(())
}
Run it:
cargo run
Step 3 — Inspect the result
Read the Parquet file you just wrote and confirm the schema:
import pyarrow.parquet as pq
table = pq.read_table("out/top_customers.parquet")
print(table.schema)
print(table.to_pandas())
Step 4 — Modify the query
Try changing the query. Some ideas:
- Add a
WHERE amount > 20filter. - Add
COUNT(*) AS n_ordersalongsideSUM(amount). - Switch the DataFrame API for the SQL string (see the DataFrame 101 recipe).
- Read a Parquet file instead of CSV (see Parquet & Object Store).
- Run the same code against the
single-nodedaemon for restart-durable state (see Single-node deployment).
What you learned
Session.embedded()creates an in-process engine — no daemon, no cluster.register_csv/register_parquetexpose a file as a SQL table.- SQL is parsed and planned by DataFusion and executed by the same runtime as DataFrame and Stream.
- Results are Arrow
RecordBatchvalues, and writes go back through Arrow.
Next steps
- Recipes — task-oriented examples (tumbling windows, Iceberg upserts, Kafka pipelines, etc.)
- How Krishiv executes a query — the journey from
session.sql(...)to result - Feature Maturity — what is production-ready today