Checkpointing
Checkpoints are the foundation of Ripple’s fault tolerance. A checkpoint captures the complete state of a worker’s incremental graph at a consistent point in time, enabling recovery after crashes without replaying the entire input history. This page covers the checkpoint type, atomicity guarantees, pluggable stores, the restore protocol, and the effectively-once boundary.
The Checkpoint Type
type node_snapshot =
{ node_id : int
; height : int
; value_bytes : string (* bin_prot or sexp serialized *)
}
type t =
{ checkpoint_id : string (* "ckpt-w1-42" *)
; worker_id : string
; partition_id : string
; timestamp_ns : Int64.t
; epoch : int (* monotonically increasing counter *)
; node_snapshots : node_snapshot list
; input_offsets : (string * Int64.t) list (* source -> offset *)
; node_count : int
}
Key design decision: only leaf node values are checkpointed. Compute nodes (map, fold, incr_fold) are not stored – they are recomputed by running stabilize after restoring the leaf values. This reduces checkpoint size by ~60% and avoids serializing closures.
The input_offsets field records the Kafka consumer offset for each input source at the time of the checkpoint. This is the replay starting point: on recovery, the consumer seeks to these offsets and replays forward.
Atomic Writes
Checkpoint writes are atomic. A crash mid-write must never leave a corrupt checkpoint visible to the restore protocol.
Local Disk: temp + rename
let write ~dir ckpt =
let data = Bin_prot.Utils.bin_dump bin_writer_t ckpt
|> Bigstring.to_string in
let final_path = dir ^/ ckpt.checkpoint_id ^ ".ckpt" in
let temp_path = final_path ^ ".tmp" in
(* Write to temp file *)
Out_channel.write_all temp_path ~data;
(* Atomic rename -- crash before rename leaves only temp *)
Core_unix.rename ~src:temp_path ~dst:final_path
If the process crashes during write_all, only the .tmp file exists. On recovery, .tmp files are ignored. If the process crashes after rename, the checkpoint is complete and valid.
S3: Single-Object PUT
S3 provides strong read-after-write consistency for individual objects. A single PutObject is atomic from the reader’s perspective – the object is either fully visible or not visible at all.
s3://ripple-checkpoints/prod/ckpt-w0-42.ckpt
There is no need for temp files or two-phase commit on S3.
Pluggable Stores
All checkpoint stores implement the same module signature:
module type S = sig
type t
val write : t -> Checkpoint.t -> unit Or_error.t
val read : t -> checkpoint_id:string -> Checkpoint.t Or_error.t
val latest : t -> worker_id:string -> Checkpoint.t option
val list : t -> worker_id:string -> Checkpoint.t list
val delete : t -> checkpoint_id:string -> unit Or_error.t
end
Three implementations are provided:
| Store | Use Case | Atomicity Mechanism |
|---|---|---|
In_memory | Unit tests | Hash table (trivially atomic) |
Local_disk | Development | temp file + rename |
S3 | Production | Single-object PUT |
Creating a Store
(* Testing *)
let store = Store.In_memory.create ()
(* Development *)
let store = Store.Local_disk.create ~dir:"/var/lib/ripple/checkpoints"
(* Production *)
let store = Store.S3.create
~bucket:"ripple-checkpoints"
~prefix:"prod"
~region:"us-east-1"
Snapshot and Restore
Taking a Checkpoint
let ckpt = Checkpoint.snapshot_graph
~graph:g
~worker_id:"w0"
~partition_id:"p0"
~epoch:42
~input_offsets:[("trades", 1000000L)]
~now
in
let _ = Store.write store ckpt
snapshot_graph calls Graph.snapshot_leaf_values which iterates over all nodes and serializes those with registered sexp_of_value functions. Only Leaf nodes with serialization support are included.
Restoring from Checkpoint
The restore protocol is a 4-step process:
1. Load latest checkpoint for this worker
2. Rebuild the graph structure (same code as initial construction)
3. Restore leaf values from checkpoint
4. Stabilize to recompute all derived nodes
5. Resume input processing from checkpoint's input offsets
(* Step 1: Load checkpoint *)
let ckpt = Store.latest store ~worker_id:"w0"
|> Option.value_exn in
(* Step 2: Rebuild graph (same code as normal startup) *)
let g = build_vwap_graph ~symbols ~now in
(* Step 3: Restore leaf values *)
let _ = Checkpoint.restore_graph ~graph:g ~checkpoint:ckpt in
(* Step 4: Stabilize *)
let _ = Graph.stabilize g in
(* Step 5: Seek input to checkpoint offset *)
let offset = List.Assoc.find_exn ckpt.input_offsets
~equal:String.equal "trades" in
Kafka_consumer.seek ~offset
After step 4, the graph is in exactly the same state it was in when the checkpoint was taken. Derived nodes are recomputed from the restored leaf values, producing identical results (by the determinism guarantee).
Checkpoint Interval
The default checkpoint interval is 10 seconds, configured in the coordinator:
checkpoint_interval_sec = 10
This interval was chosen based on benchmark B-05:
- At 100K events/sec, a 10-second interval means ~1M events between checkpoints
- Worst-case replay of 1M events at 250ns/stabilization = ~0.25 seconds
- Well within the 30-second recovery target
The coordinator triggers checkpoints cluster-wide:
let checkpoint_due t ~now_ns =
let elapsed = Int64.(-) now_ns t.last_checkpoint_trigger_ns in
if Int64.compare elapsed t.checkpoint_interval_ns >= 0 then begin
t.last_checkpoint_trigger_ns <- now_ns;
true
end else false
The Effectively-Once Boundary
The checkpoint forms the boundary of effectively-once processing:
Events before checkpoint: guaranteed processed (in checkpoint state)
Events during checkpoint: included or excluded (depends on timing)
Events after checkpoint: replayed on recovery (idempotent application)
──────────┬──────────────┬──────────────────────>
│ checkpoint │
│ offset: 42 │
│ │
applied │ boundary │ replay (idempotent)
The combination of:
- Checkpointed state (leaf values at offset 42)
- Replay from offset 42 (at-least-once delivery)
- Idempotent delta application (sequence-number dedup)
yields effectively-once semantics. Each event’s effect appears exactly once in the output, even though some events may be processed more than once during replay.
Checkpoint Size
For a 2,000-symbol VWAP pipeline:
- 2,000 leaf nodes with
vwap_statevalues - Each
vwap_stateserializes to ~50 bytes (sexp) - Plus metadata overhead
- Total: ~120 KB per checkpoint
At a 10-second interval, this is ~12 KB/sec of checkpoint I/O. S3 PUT latency for a 120 KB object is typically 10-50ms, well within budget.