ProductDocumentationExamplesBlogRoadmapGitHubGet Started
Available

Python Client to Remote Cluster

Connect from a Python notebook or script to a remote Krishiv coordinator.

The Python bindings talk to a remote coordinator over Flight SQL (data plane) and gRPC (control plane). Setup is one import, one connect call.

Install

pip install --no-build-isolation krishiv

Or from a source checkout:

maturin develop --manifest-path crates/krishiv-python/Cargo.toml

Environment

export KRISHIV_COORDINATOR=https://coord.example.com:50051
export KRISHIV_COORDINATOR_BEARER_TOKEN=...   # or use API key
export KRISHIV_OIDC_AUDIENCE=krishiv-prod     # if using OIDC

Connect

import krishiv as ks

# Read KRISHIV_COORDINATOR and KRISHIV_COORDINATOR_BEARER_TOKEN
session = ks.Session.from_env()

# Or pass explicitly
session = ks.Session.connect(
    "https://coord.example.com:50051",
    grpc_url="https://coord.example.com:50051",
    target_parallelism=8,
    state_ttl_ms=3_600_000,  # 1 hour state TTL
)

# Or use API keys (server: KRISHIV_API_KEYS=key1=user,...)
session = ks.Session.connect("https://coord.example.com:50051", api_key="key1")

Run a query

df = session.sql("SELECT customer_id, SUM(amount) AS total FROM orders GROUP BY customer_id")
print(df.collect().pretty())

Register a Parquet source

session.register_parquet("orders", "s3://my-bucket/data/orders/")
df = session.sql("SELECT * FROM orders WHERE amount > 100")
df.write_parquet("s3://my-bucket/out/big_orders/")

Incremental view

ivm = session.ivm("order_totals")
ivm.register_view("order_totals", "SELECT customer_id, SUM(amount) AS total FROM orders GROUP BY customer_id")
batch = pa.record_batch(...)  # pyarrow
ivm.tick("orders", batch)
print(ivm.snapshot("order_totals").to_pandas())

Explain

print(session.sql("SELECT * FROM orders JOIN users ON orders.user_id = users.id").explain())

Streaming from a topic

session.register_kafka_source("clicks", schema, "broker:9092", "clicks", "krishiv-app")
(session.table("clicks").to_streaming()
       .with_event_time("event_time")
       .tumbling_window(60_000)
       .agg([ks.count(ks.col("*"))])
       .write_stream()
       .format("iceberg")
       .option("table", "clicks_per_minute")
       .start())

See also