Python Client to Remote Cluster
Connect from a Python notebook or script to a remote Krishiv coordinator.
The Python bindings talk to a remote coordinator over Flight SQL (data plane) and gRPC (control plane). Setup is one import, one connect call.
Install
pip install --no-build-isolation krishiv
Or from a source checkout:
maturin develop --manifest-path crates/krishiv-python/Cargo.toml
Environment
export KRISHIV_COORDINATOR=https://coord.example.com:50051
export KRISHIV_COORDINATOR_BEARER_TOKEN=... # or use API key
export KRISHIV_OIDC_AUDIENCE=krishiv-prod # if using OIDC
Connect
import krishiv as ks
# Read KRISHIV_COORDINATOR and KRISHIV_COORDINATOR_BEARER_TOKEN
session = ks.Session.from_env()
# Or pass explicitly
session = ks.Session.connect(
"https://coord.example.com:50051",
grpc_url="https://coord.example.com:50051",
target_parallelism=8,
state_ttl_ms=3_600_000, # 1 hour state TTL
)
# Or use API keys (server: KRISHIV_API_KEYS=key1=user,...)
session = ks.Session.connect("https://coord.example.com:50051", api_key="key1")
Run a query
df = session.sql("SELECT customer_id, SUM(amount) AS total FROM orders GROUP BY customer_id")
print(df.collect().pretty())
Register a Parquet source
session.register_parquet("orders", "s3://my-bucket/data/orders/")
df = session.sql("SELECT * FROM orders WHERE amount > 100")
df.write_parquet("s3://my-bucket/out/big_orders/")
Incremental view
ivm = session.ivm("order_totals")
ivm.register_view("order_totals", "SELECT customer_id, SUM(amount) AS total FROM orders GROUP BY customer_id")
batch = pa.record_batch(...) # pyarrow
ivm.tick("orders", batch)
print(ivm.snapshot("order_totals").to_pandas())
Explain
print(session.sql("SELECT * FROM orders JOIN users ON orders.user_id = users.id").explain())
Streaming from a topic
session.register_kafka_source("clicks", schema, "broker:9092", "clicks", "krishiv-app")
(session.table("clicks").to_streaming()
.with_event_time("event_time")
.tumbling_window(60_000)
.agg([ks.count(ks.col("*"))])
.write_stream()
.format("iceberg")
.option("table", "clicks_per_minute")
.start())