Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Checkpointing

Checkpoints are the foundation of Ripple’s fault tolerance. A checkpoint captures the complete state of a worker’s incremental graph at a consistent point in time, enabling recovery after crashes without replaying the entire input history. This page covers the checkpoint type, atomicity guarantees, pluggable stores, the restore protocol, and the effectively-once boundary.

The Checkpoint Type

type node_snapshot =
  { node_id : int
  ; height : int
  ; value_bytes : string   (* bin_prot or sexp serialized *)
  }

type t =
  { checkpoint_id : string            (* "ckpt-w1-42" *)
  ; worker_id : string
  ; partition_id : string
  ; timestamp_ns : Int64.t
  ; epoch : int                       (* monotonically increasing counter *)
  ; node_snapshots : node_snapshot list
  ; input_offsets : (string * Int64.t) list  (* source -> offset *)
  ; node_count : int
  }

Key design decision: only leaf node values are checkpointed. Compute nodes (map, fold, incr_fold) are not stored – they are recomputed by running stabilize after restoring the leaf values. This reduces checkpoint size by ~60% and avoids serializing closures.

The input_offsets field records the Kafka consumer offset for each input source at the time of the checkpoint. This is the replay starting point: on recovery, the consumer seeks to these offsets and replays forward.

Atomic Writes

Checkpoint writes are atomic. A crash mid-write must never leave a corrupt checkpoint visible to the restore protocol.

Local Disk: temp + rename

let write ~dir ckpt =
  let data = Bin_prot.Utils.bin_dump bin_writer_t ckpt
    |> Bigstring.to_string in
  let final_path = dir ^/ ckpt.checkpoint_id ^ ".ckpt" in
  let temp_path = final_path ^ ".tmp" in
  (* Write to temp file *)
  Out_channel.write_all temp_path ~data;
  (* Atomic rename -- crash before rename leaves only temp *)
  Core_unix.rename ~src:temp_path ~dst:final_path

If the process crashes during write_all, only the .tmp file exists. On recovery, .tmp files are ignored. If the process crashes after rename, the checkpoint is complete and valid.

S3: Single-Object PUT

S3 provides strong read-after-write consistency for individual objects. A single PutObject is atomic from the reader’s perspective – the object is either fully visible or not visible at all.

s3://ripple-checkpoints/prod/ckpt-w0-42.ckpt

There is no need for temp files or two-phase commit on S3.

Pluggable Stores

All checkpoint stores implement the same module signature:

module type S = sig
  type t

  val write : t -> Checkpoint.t -> unit Or_error.t
  val read : t -> checkpoint_id:string -> Checkpoint.t Or_error.t
  val latest : t -> worker_id:string -> Checkpoint.t option
  val list : t -> worker_id:string -> Checkpoint.t list
  val delete : t -> checkpoint_id:string -> unit Or_error.t
end

Three implementations are provided:

StoreUse CaseAtomicity Mechanism
In_memoryUnit testsHash table (trivially atomic)
Local_diskDevelopmenttemp file + rename
S3ProductionSingle-object PUT

Creating a Store

(* Testing *)
let store = Store.In_memory.create ()

(* Development *)
let store = Store.Local_disk.create ~dir:"/var/lib/ripple/checkpoints"

(* Production *)
let store = Store.S3.create
  ~bucket:"ripple-checkpoints"
  ~prefix:"prod"
  ~region:"us-east-1"

Snapshot and Restore

Taking a Checkpoint

let ckpt = Checkpoint.snapshot_graph
  ~graph:g
  ~worker_id:"w0"
  ~partition_id:"p0"
  ~epoch:42
  ~input_offsets:[("trades", 1000000L)]
  ~now
in
let _ = Store.write store ckpt

snapshot_graph calls Graph.snapshot_leaf_values which iterates over all nodes and serializes those with registered sexp_of_value functions. Only Leaf nodes with serialization support are included.

Restoring from Checkpoint

The restore protocol is a 4-step process:

1. Load latest checkpoint for this worker
2. Rebuild the graph structure (same code as initial construction)
3. Restore leaf values from checkpoint
4. Stabilize to recompute all derived nodes
5. Resume input processing from checkpoint's input offsets
(* Step 1: Load checkpoint *)
let ckpt = Store.latest store ~worker_id:"w0"
  |> Option.value_exn in

(* Step 2: Rebuild graph (same code as normal startup) *)
let g = build_vwap_graph ~symbols ~now in

(* Step 3: Restore leaf values *)
let _ = Checkpoint.restore_graph ~graph:g ~checkpoint:ckpt in

(* Step 4: Stabilize *)
let _ = Graph.stabilize g in

(* Step 5: Seek input to checkpoint offset *)
let offset = List.Assoc.find_exn ckpt.input_offsets
  ~equal:String.equal "trades" in
Kafka_consumer.seek ~offset

After step 4, the graph is in exactly the same state it was in when the checkpoint was taken. Derived nodes are recomputed from the restored leaf values, producing identical results (by the determinism guarantee).

Checkpoint Interval

The default checkpoint interval is 10 seconds, configured in the coordinator:

checkpoint_interval_sec = 10

This interval was chosen based on benchmark B-05:

  • At 100K events/sec, a 10-second interval means ~1M events between checkpoints
  • Worst-case replay of 1M events at 250ns/stabilization = ~0.25 seconds
  • Well within the 30-second recovery target

The coordinator triggers checkpoints cluster-wide:

let checkpoint_due t ~now_ns =
  let elapsed = Int64.(-) now_ns t.last_checkpoint_trigger_ns in
  if Int64.compare elapsed t.checkpoint_interval_ns >= 0 then begin
    t.last_checkpoint_trigger_ns <- now_ns;
    true
  end else false

The Effectively-Once Boundary

The checkpoint forms the boundary of effectively-once processing:

  Events before checkpoint:  guaranteed processed (in checkpoint state)
  Events during checkpoint:  included or excluded (depends on timing)
  Events after checkpoint:   replayed on recovery (idempotent application)

  ──────────┬──────────────┬──────────────────────>
            │  checkpoint  │
            │  offset: 42  │
            │              │
  applied   │  boundary    │  replay (idempotent)

The combination of:

  1. Checkpointed state (leaf values at offset 42)
  2. Replay from offset 42 (at-least-once delivery)
  3. Idempotent delta application (sequence-number dedup)

yields effectively-once semantics. Each event’s effect appears exactly once in the output, even though some events may be processed more than once during replay.

Checkpoint Size

For a 2,000-symbol VWAP pipeline:

  • 2,000 leaf nodes with vwap_state values
  • Each vwap_state serializes to ~50 bytes (sexp)
  • Plus metadata overhead
  • Total: ~120 KB per checkpoint

At a 10-second interval, this is ~12 KB/sec of checkpoint I/O. S3 PUT latency for a 120 KB object is typically 10-50ms, well within budget.