ProductDocumentationExamplesBlogRoadmapGitHubGet Started
Preview

Iceberg upsert with MERGE INTO

Merge a stream of changes into an Iceberg table using copy-on-write.

Preview: MERGE INTO on Iceberg uses copy-on-write. Merge-on-read and distributed atomic commit certification are ongoing. The target table must be registered under a KrishivCatalog.

Prerequisites

  • Enable the iceberg Cargo/Python feature.
  • Register a REST catalog (see Iceberg connector).
  • The target table must exist in the catalog.

SQL

MERGE INTO my_catalog.warehouse.inventory AS tgt
USING incoming_stock AS src
ON tgt.product_id = src.product_id
WHEN MATCHED AND src.quantity = 0 THEN DELETE
WHEN MATCHED THEN UPDATE SET tgt.quantity = tgt.quantity + src.quantity
WHEN NOT MATCHED THEN INSERT (product_id, quantity)
VALUES (src.product_id, src.quantity);

Python (in-process MemoryLakehouseTable for tests)

import krishiv as ks
import pyarrow as pa

session = ks.Session.embedded()
table = ks.MemoryLakehouseTable(pa.schema([("product_id", pa.int64()), ("quantity", pa.int64())]))
table.append(pa.record_batch([(1, 10), (2, 20)], schema=table.schema()))

# Apply an update
table.update_where("product_id = 1", {"quantity": 99})
print(table.snapshot_rows())  # 2

For production Iceberg tables, use the SQL MERGE INTO form above — the MemoryLakehouseTable is for tests and local exploration only.