Vector Sinks
Vector database connectors for embedding search (LanceDB, Pinecone, Weaviate, Qdrant, pgvector).
Krishiv ships five vector-store sinks. They share a common interface so you can swap targets without changing pipeline code.
Common interface
| Method | Purpose |
|---|---|
upsert_batch(batch) | Write a batch of points (id, vector, payload). |
query_nearest(vector, k) -> Vec<ScoredChunk> | k-NN search. Returns scored chunks with payload. |
delete_by_ids(ids) | Delete by id. |
sink_name() -> &'static str | For metrics labels and CLI output. |
Backends
| Backend | Feature | Status |
|---|---|---|
| Backend | Feature | Status |
InMemoryVectorSink | (always) | Preview — for tests and prototypes. |
LanceDbSink::open(path, table) | vector-sinks | Preview — local file-backed. |
PgvectorSink::connect(conn_str, table) | vector-sinks + pgvector | Experimental. |
QdrantSink::connect(url, collection) | vector-sinks + qdrant | Experimental. |
PineconeSink::new(api_key, index) | vector-sinks | Preview. |
WeaviateSink::connect(url, class) | vector-sinks | Preview. |
Data shape
All sinks expect a RecordBatch with at least:
| Column | Type |
|---|---|
id | utf8 or int64 |
vector | list<float32> (or fixed-size list) |
payload | struct<…> — backend-specific fields |
Schema validation is the caller's responsibility; point_id_from_doc_epoch is a helper that turns a timestamped id into a deterministic u64.
Python
import krishiv as ks
session = ks.Session.embedded()
sink = ks.LanceDbSink.open("./vectors", "embeddings")
session.sql("SELECT id, vector, payload FROM embeddings")
.write_stream()
.format("vector")
.option("sink", sink)
.start()