Iceberg upsert with MERGE INTO
Merge a stream of changes into an Iceberg table using copy-on-write.
Preview:
MERGE INTO on Iceberg uses copy-on-write. Merge-on-read and distributed atomic commit certification are ongoing. The target table must be registered under a KrishivCatalog.Prerequisites
- Enable the
icebergCargo/Python feature. - Register a REST catalog (see Iceberg connector).
- The target table must exist in the catalog.
SQL
MERGE INTO my_catalog.warehouse.inventory AS tgt
USING incoming_stock AS src
ON tgt.product_id = src.product_id
WHEN MATCHED AND src.quantity = 0 THEN DELETE
WHEN MATCHED THEN UPDATE SET tgt.quantity = tgt.quantity + src.quantity
WHEN NOT MATCHED THEN INSERT (product_id, quantity)
VALUES (src.product_id, src.quantity);
Python (in-process MemoryLakehouseTable for tests)
import krishiv as ks
import pyarrow as pa
session = ks.Session.embedded()
table = ks.MemoryLakehouseTable(pa.schema([("product_id", pa.int64()), ("quantity", pa.int64())]))
table.append(pa.record_batch([(1, 10), (2, 20)], schema=table.schema()))
# Apply an update
table.update_where("product_id = 1", {"quantity": 99})
print(table.snapshot_rows()) # 2
For production Iceberg tables, use the SQL MERGE INTO form above — the MemoryLakehouseTable is for tests and local exploration only.