Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Coordinator

The coordinator is a stateless cluster management service that handles partition assignment, worker registration, failure detection, and checkpoint scheduling. This page covers the consistent hash ring, partition assignment algorithm, heartbeat liveness detection, and rebalancing protocol.

Design Principles

The coordinator is intentionally stateless. All durable state lives in the checkpoint store (S3). The coordinator can be restarted at any time without data loss – it reconstructs its view of the cluster from worker heartbeats.

In Kubernetes, the coordinator runs as a Deployment with 2 replicas for high availability. Both replicas are active and can independently manage the cluster. Workers register with whichever replica responds first.

Consistent Hash Ring

Partition-to-worker assignment uses a consistent hash ring with virtual nodes:

module Hash_ring = struct
  type t =
    { mutable ring : (int * string) list
    ; virtual_nodes : int   (* default: 128 *)
    }

  let add_worker t ~worker_id =
    let new_nodes = List.init t.virtual_nodes ~f:(fun i ->
      let key = sprintf "%s-vnode-%d" worker_id i in
      (hash_key key, worker_id))
    in
    t.ring <- List.sort (t.ring @ new_nodes)
      ~compare:(fun (a, _) (b, _) -> Int.compare a b)

  let assign_partition t ~partition_id =
    let h = hash_key (sprintf "partition-%d" partition_id) in
    (* Find first ring entry >= h (clockwise lookup) *)
    match List.find t.ring ~f:(fun (hash, _) -> hash >= h) with
    | Some (_, worker_id) -> Some worker_id
    | None ->
      (* Wrap around to first entry *)
      match t.ring with
      | (_, worker_id) :: _ -> Some worker_id
      | [] -> None
end

Why Consistent Hashing

When a worker is added or removed, consistent hashing minimizes the number of partitions that need to be reassigned. With 128 virtual nodes per worker and 128 partitions:

EventPartitions reassignedWithout consistent hashing
Add 1 worker (3 -> 4)~32 (1/4 of total)~96 (3/4 of total)
Remove 1 worker (4 -> 3)~32~96

This minimizes the amount of state that needs to be migrated during rebalancing.

Virtual Nodes

Each physical worker maps to 128 virtual nodes on the ring. This ensures even distribution: without virtual nodes, a small number of physical workers could create hotspots where some workers receive far more partitions than others.

Ring (simplified, 8 virtual nodes per worker):

  0 ──── w0 ── w1 ── w0 ── w2 ── w1 ── w0 ── w2 ── w1 ──── MAX
         vn0   vn0   vn1   vn0   vn1   vn2   vn1   vn2

Worker Registration

Workers register with the coordinator on startup:

let register_worker t ~worker_id ~now_ns =
  Hashtbl.set t.workers ~key:worker_id ~data:
    { worker_id
    ; partitions = []
    ; last_heartbeat_ns = now_ns
    ; load = 0.0
    ; state = `Active
    };
  Hash_ring.add_worker t.hash_ring ~worker_id;
  t.assignment_epoch <- t.assignment_epoch + 1

Registration:

  1. Adds the worker to the worker registry
  2. Inserts 128 virtual nodes into the hash ring
  3. Increments the assignment epoch (triggers rebalance)

The worker then queries its partition assignment:

let get_assignment t ~worker_id =
  let partitions = List.filter_map
    (List.init t.total_partitions ~f:Fn.id)
    ~f:(fun pid ->
      match Hash_ring.assign_partition t.hash_ring ~partition_id:pid with
      | Some wid when String.equal wid worker_id -> Some pid
      | _ -> None)
  in
  { worker_id; partitions; epoch = t.assignment_epoch }

Heartbeat Liveness Detection

Workers send heartbeats every 5 seconds (configurable). The heartbeat carries:

type heartbeat =
  { worker_id : string
  ; timestamp_ns : Int64.t
  ; load : float       (* 0.0 to 1.0 *)
  }

The coordinator updates the worker’s last heartbeat timestamp and load metric:

let heartbeat t ~worker_id ~now_ns ~load =
  match Hashtbl.find t.workers worker_id with
  | None -> ()  (* Unknown worker -- ignore *)
  | Some status ->
    Hashtbl.set t.workers ~key:worker_id
      ~data:{ status with
              last_heartbeat_ns = now_ns
            ; load
            ; state = `Active }

Failure Detection

The coordinator periodically scans for workers whose last heartbeat exceeds the timeout (default: 30 seconds):

let detect_failures t ~now_ns =
  let dead_workers =
    Hashtbl.fold t.workers ~init:[] ~f:(fun ~key:wid ~data:status acc ->
      let elapsed = Int64.(-) now_ns status.last_heartbeat_ns in
      if Int64.compare elapsed t.heartbeat_timeout_ns > 0 then
        (wid, status) :: acc
      else acc)
  in
  List.iter dead_workers ~f:(fun (wid, status) ->
    Hashtbl.set t.workers ~key:wid
      ~data:{ status with state = `Dead });
  List.map dead_workers ~f:fst

Worker state transitions:

                  heartbeat received
  Active ──────────────────────────────> Active
    │
    │  timeout exceeded
    v
  Suspected ──── confirmed ───> Dead ───> Removed

Rebalancing on Failure

When a worker is detected as dead, the coordinator removes it and triggers rebalancing:

let remove_worker t ~worker_id =
  Hashtbl.remove t.workers worker_id;
  Hash_ring.remove_worker t.hash_ring ~worker_id;
  t.assignment_epoch <- t.assignment_epoch + 1

The remaining workers detect the epoch change and query for their updated assignments. The consistent hash ring ensures that only partitions previously assigned to the dead worker are redistributed.

Rebalancing Flow

t=0s:   3 workers, 12 partitions
        w0: [0,1,2,3]  w1: [4,5,6,7]  w2: [8,9,10,11]

t=30s:  w2 heartbeat timeout
        coordinator: detect_failures -> ["w2"]
        coordinator: remove_worker "w2"
        epoch: 2 -> 3

t=31s:  w0 polls assignment (epoch 3)
        w0: [0,1,2,3,8,9]    (gained 8,9 from w2)
        w1: [4,5,6,7,10,11]  (gained 10,11 from w2)

t=32s:  w0 loads checkpoint for partitions 8,9 from S3
        w0 replays input from checkpoint offsets
        w0 resumes processing partitions 8,9

Checkpoint Triggers

The coordinator tracks checkpoint timing and signals workers when a checkpoint is due:

let checkpoint_due t ~now_ns =
  let elapsed = Int64.(-) now_ns t.last_checkpoint_trigger_ns in
  if Int64.compare elapsed t.checkpoint_interval_ns >= 0 then begin
    t.last_checkpoint_trigger_ns <- now_ns;
    true
  end else false

Default interval: 10 seconds.

When checkpoint_due returns true, the coordinator sends Checkpoint_request messages to all active workers. Each worker:

  1. Drains its current batch
  2. Writes a checkpoint to the store
  3. Responds with Checkpoint_ack

The coordinator does not wait for all acks before proceeding. Checkpoints are per-worker, not cluster-wide barriers. This avoids the stop-the-world pause of Chandy-Lamport-style distributed snapshots.

Cluster Status

The coordinator exposes a cluster status summary:

let cluster_status t =
  let active = Hashtbl.count t.workers ~f:(fun s ->
    match s.state with `Active -> true | _ -> false) in
  (active, Hashtbl.length t.workers, t.total_partitions, t.assignment_epoch)

This is consumed by ripple-cli status and the coordinator’s /health endpoint.

Configuration

ParameterDefaultDescription
total_partitions128Number of data partitions
heartbeat_timeout_sec30Seconds before a worker is declared dead
checkpoint_interval_sec10Seconds between checkpoint triggers
virtual_nodes128Virtual nodes per worker on hash ring
max_keys_per_partition2,000Maximum symbols per partition (B-05 mitigation)