Coordinator
The coordinator is a stateless cluster management service that handles partition assignment, worker registration, failure detection, and checkpoint scheduling. This page covers the consistent hash ring, partition assignment algorithm, heartbeat liveness detection, and rebalancing protocol.
Design Principles
The coordinator is intentionally stateless. All durable state lives in the checkpoint store (S3). The coordinator can be restarted at any time without data loss – it reconstructs its view of the cluster from worker heartbeats.
In Kubernetes, the coordinator runs as a Deployment with 2 replicas for high availability. Both replicas are active and can independently manage the cluster. Workers register with whichever replica responds first.
Consistent Hash Ring
Partition-to-worker assignment uses a consistent hash ring with virtual nodes:
module Hash_ring = struct
type t =
{ mutable ring : (int * string) list
; virtual_nodes : int (* default: 128 *)
}
let add_worker t ~worker_id =
let new_nodes = List.init t.virtual_nodes ~f:(fun i ->
let key = sprintf "%s-vnode-%d" worker_id i in
(hash_key key, worker_id))
in
t.ring <- List.sort (t.ring @ new_nodes)
~compare:(fun (a, _) (b, _) -> Int.compare a b)
let assign_partition t ~partition_id =
let h = hash_key (sprintf "partition-%d" partition_id) in
(* Find first ring entry >= h (clockwise lookup) *)
match List.find t.ring ~f:(fun (hash, _) -> hash >= h) with
| Some (_, worker_id) -> Some worker_id
| None ->
(* Wrap around to first entry *)
match t.ring with
| (_, worker_id) :: _ -> Some worker_id
| [] -> None
end
Why Consistent Hashing
When a worker is added or removed, consistent hashing minimizes the number of partitions that need to be reassigned. With 128 virtual nodes per worker and 128 partitions:
| Event | Partitions reassigned | Without consistent hashing |
|---|---|---|
| Add 1 worker (3 -> 4) | ~32 (1/4 of total) | ~96 (3/4 of total) |
| Remove 1 worker (4 -> 3) | ~32 | ~96 |
This minimizes the amount of state that needs to be migrated during rebalancing.
Virtual Nodes
Each physical worker maps to 128 virtual nodes on the ring. This ensures even distribution: without virtual nodes, a small number of physical workers could create hotspots where some workers receive far more partitions than others.
Ring (simplified, 8 virtual nodes per worker):
0 ──── w0 ── w1 ── w0 ── w2 ── w1 ── w0 ── w2 ── w1 ──── MAX
vn0 vn0 vn1 vn0 vn1 vn2 vn1 vn2
Worker Registration
Workers register with the coordinator on startup:
let register_worker t ~worker_id ~now_ns =
Hashtbl.set t.workers ~key:worker_id ~data:
{ worker_id
; partitions = []
; last_heartbeat_ns = now_ns
; load = 0.0
; state = `Active
};
Hash_ring.add_worker t.hash_ring ~worker_id;
t.assignment_epoch <- t.assignment_epoch + 1
Registration:
- Adds the worker to the worker registry
- Inserts 128 virtual nodes into the hash ring
- Increments the assignment epoch (triggers rebalance)
The worker then queries its partition assignment:
let get_assignment t ~worker_id =
let partitions = List.filter_map
(List.init t.total_partitions ~f:Fn.id)
~f:(fun pid ->
match Hash_ring.assign_partition t.hash_ring ~partition_id:pid with
| Some wid when String.equal wid worker_id -> Some pid
| _ -> None)
in
{ worker_id; partitions; epoch = t.assignment_epoch }
Heartbeat Liveness Detection
Workers send heartbeats every 5 seconds (configurable). The heartbeat carries:
type heartbeat =
{ worker_id : string
; timestamp_ns : Int64.t
; load : float (* 0.0 to 1.0 *)
}
The coordinator updates the worker’s last heartbeat timestamp and load metric:
let heartbeat t ~worker_id ~now_ns ~load =
match Hashtbl.find t.workers worker_id with
| None -> () (* Unknown worker -- ignore *)
| Some status ->
Hashtbl.set t.workers ~key:worker_id
~data:{ status with
last_heartbeat_ns = now_ns
; load
; state = `Active }
Failure Detection
The coordinator periodically scans for workers whose last heartbeat exceeds the timeout (default: 30 seconds):
let detect_failures t ~now_ns =
let dead_workers =
Hashtbl.fold t.workers ~init:[] ~f:(fun ~key:wid ~data:status acc ->
let elapsed = Int64.(-) now_ns status.last_heartbeat_ns in
if Int64.compare elapsed t.heartbeat_timeout_ns > 0 then
(wid, status) :: acc
else acc)
in
List.iter dead_workers ~f:(fun (wid, status) ->
Hashtbl.set t.workers ~key:wid
~data:{ status with state = `Dead });
List.map dead_workers ~f:fst
Worker state transitions:
heartbeat received
Active ──────────────────────────────> Active
│
│ timeout exceeded
v
Suspected ──── confirmed ───> Dead ───> Removed
Rebalancing on Failure
When a worker is detected as dead, the coordinator removes it and triggers rebalancing:
let remove_worker t ~worker_id =
Hashtbl.remove t.workers worker_id;
Hash_ring.remove_worker t.hash_ring ~worker_id;
t.assignment_epoch <- t.assignment_epoch + 1
The remaining workers detect the epoch change and query for their updated assignments. The consistent hash ring ensures that only partitions previously assigned to the dead worker are redistributed.
Rebalancing Flow
t=0s: 3 workers, 12 partitions
w0: [0,1,2,3] w1: [4,5,6,7] w2: [8,9,10,11]
t=30s: w2 heartbeat timeout
coordinator: detect_failures -> ["w2"]
coordinator: remove_worker "w2"
epoch: 2 -> 3
t=31s: w0 polls assignment (epoch 3)
w0: [0,1,2,3,8,9] (gained 8,9 from w2)
w1: [4,5,6,7,10,11] (gained 10,11 from w2)
t=32s: w0 loads checkpoint for partitions 8,9 from S3
w0 replays input from checkpoint offsets
w0 resumes processing partitions 8,9
Checkpoint Triggers
The coordinator tracks checkpoint timing and signals workers when a checkpoint is due:
let checkpoint_due t ~now_ns =
let elapsed = Int64.(-) now_ns t.last_checkpoint_trigger_ns in
if Int64.compare elapsed t.checkpoint_interval_ns >= 0 then begin
t.last_checkpoint_trigger_ns <- now_ns;
true
end else false
Default interval: 10 seconds.
When checkpoint_due returns true, the coordinator sends Checkpoint_request messages to all active workers. Each worker:
- Drains its current batch
- Writes a checkpoint to the store
- Responds with
Checkpoint_ack
The coordinator does not wait for all acks before proceeding. Checkpoints are per-worker, not cluster-wide barriers. This avoids the stop-the-world pause of Chandy-Lamport-style distributed snapshots.
Cluster Status
The coordinator exposes a cluster status summary:
let cluster_status t =
let active = Hashtbl.count t.workers ~f:(fun s ->
match s.state with `Active -> true | _ -> false) in
(active, Hashtbl.length t.workers, t.total_partitions, t.assignment_epoch)
This is consumed by ripple-cli status and the coordinator’s /health endpoint.
Configuration
| Parameter | Default | Description |
|---|---|---|
total_partitions | 128 | Number of data partitions |
heartbeat_timeout_sec | 30 | Seconds before a worker is declared dead |
checkpoint_interval_sec | 10 | Seconds between checkpoint triggers |
virtual_nodes | 128 | Virtual nodes per worker on hash ring |
max_keys_per_partition | 2,000 | Maximum symbols per partition (B-05 mitigation) |