ProductDocumentationExamplesBlogRoadmapGitHubGet Started
Available

Savepoints and Schema Migration

User-triggered snapshots, restore, and migrating state across versions.

Checkpoints are coordinator-driven and automatic. Savepoints are user-triggered, named, and listable — useful for "snapshot before I deploy the new model" workflows. Schema migration is the mechanism for evolving the shape of state values across releases.

A savepoint is a point-in-time snapshot of all keyed state and source offsets. The same savepoint can be restored to a new cluster, on a new schema, or with a new parallelism.

Savepoints

CLI:

# Take a savepoint with a label
krishiv savepoint --job my-pipeline --label before-v2

# List savepoints for a job
krishiv checkpoints list --job my-pipeline

# Restore from a savepoint (coordinator restarts the job from that epoch)
krishiv restore --job my-pipeline --epoch 42

# Delete a savepoint
# (the savepoint directory is removed from object storage)

Each savepoint records:

FieldDescription
format_versionSAVEPOINT_FORMAT_VERSION = 1
savepoint_idUnique ID; usually the epoch number.
labelUser-supplied string.
job_idThe job it belongs to.
epochThe checkpoint epoch this savepoint aliases.
operator_versionsMap of operator UID → behavior version. Used for migration.
created_at_secsUnix timestamp.

Savepoints are stored at {base_dir}/{job_id}/savepoints/{savepoint_id}/meta.json.

State schema migration

If you change the type or encoding of a state value across releases, you need a migration. Register one for each operator + version pair:

use krishiv_api::{register_state_migration, state_migration};

register_state_migration("fraud_score", 1, 2, state_migration::<OldScore, NewScore>(
    |old| NewScore {
        value: old.value * 1.5,    // reweight
        model_version: 2,
    }
));

When the coordinator loads a checkpoint with an older operator_version, the migration is applied. Two helpers:

FunctionUse
migrate_snapshot(value_migrator, &[(from, to)])Value-only migration.
migrate_snapshot_with_keys(key_migrator, value_migrator, &[(from, to)])Both key and value migration (e.g. when the key encoding changes).

Incremental checkpointing

For RocksDB-backed state, full checkpoints can be expensive. Krishiv supports incremental checkpointing via RocksDbIncrementalCheckpointer:

  • Each checkpoint only writes SST files that changed since the last one.
  • Metadata is tracked in an SstEpochManifest.
  • Restoration rebuilds the state directory by layering SSTs from manifest entries.

Enable by setting the checkpoint storage URI to one with an RocksDbIncremental hint, or by passing an explicit IncrementalCheckpointer in the session config.

Rescaling (key-group redistribution)

When you change the parallelism of a job, state keys are remapped via KeyGroupRescaler:

  • The rescaler computes which existing key-groups map to which new ones.
  • Each rescaled key is rewritten with EntryRouting.
  • A RescaleChecksum is computed to verify that no key is lost or duplicated.

Rescaling happens automatically on restore when the new parallelism differs from the checkpoint.

See also