Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Worker Configuration

This page covers the Ripple worker process: its lifecycle state machine, health endpoints, configuration parameters, and operational behavior.

Worker Lifecycle

Workers progress through seven well-defined states. The state machine prevents invalid transitions at runtime.

                  ┌──────────────┐
                  │   Starting   │
                  │ (boot, load  │
                  │  checkpoint) │
                  └──┬───┬───┬───┘
                     │   │   │
              ok     │   │   │  checkpoint found
                     │   │   │
        ┌────────────┘   │   └────────────┐
        v                │                v
  ┌───────────┐          │         ┌─────────────┐
  │  Active   │<─────────┘         │ Recovering  │
  │(processing│          ok        │(replaying   │
  │ events)   │<───────────────────│ from ckpt)  │
  └──┬──┬──┬──┘                    └──────┬──────┘
     │  │  │                              │
     │  │  │  drain request               │ failure
     │  │  v                              v
     │  │  ┌──────────┐            ┌──────────┐
     │  │  │ Draining │            │  Failed  │
     │  │  │(finish   │            │(crashed) │
     │  │  │ batch,   │            └──────────┘
     │  │  │ write    │                  ^
     │  │  │ ckpt)    │                  │
     │  │  └──┬───┬───┘          failure │
     │  │     │   │                      │
     │  │     │   └──────────────────────┘
     │  │     v
     │  │  resume
     │  │  (back to Active)
     │  │
     │  │  stop request
     │  v
     │  ┌───────────┐
     │  │ Stopping  │
     │  │(graceful  │
     │  │ shutdown) │
     │  └─────┬─────┘
     │        │
     │        v
     │  ┌───────────┐
     │  │  Stopped  │
     │  │(clean     │
     │  │ shutdown) │
     │  └───────────┘
     │
     └──── failure ──> Failed

State Definitions

StateDescriptionDuration
StartingBooting, loading configuration, checking for checkpoint< 5s
ActiveProcessing events from input sourcesIndefinite
DrainingCompleting current batch, writing checkpoint< 10s
StoppingGraceful shutdown in progress< 30s
StoppedShut down cleanlyTerminal
FailedCrashed or unrecoverable errorUntil restart
RecoveringLoading checkpoint and replaying events< 30s (target)

Valid Transitions

let valid_transition ~from ~to_ =
  match from, to_ with
  | Starting, Active -> true       (* successful boot *)
  | Starting, Failed -> true       (* boot failure *)
  | Starting, Recovering -> true   (* checkpoint found, replay needed *)
  | Active, Draining -> true       (* checkpoint trigger or rebalance *)
  | Active, Failed -> true         (* crash during processing *)
  | Active, Stopping -> true       (* graceful shutdown *)
  | Draining, Active -> true       (* drain complete, resume *)
  | Draining, Failed -> true       (* crash during drain *)
  | Draining, Stopping -> true     (* shutdown during drain *)
  | Stopping, Stopped -> true      (* shutdown complete *)
  | Stopping, Failed -> true       (* crash during shutdown *)
  | Failed, Recovering -> true     (* restart initiated *)
  | Recovering, Active -> true     (* replay complete *)
  | Recovering, Failed -> true     (* replay failure *)
  | _ -> false                     (* all other transitions are invalid *)

Invalid transitions return Or_error.t with a diagnostic message:

let result = Worker.transition w ~to_:Stopped in
(* Error: "Invalid state transition" (worker_id w1) (from Starting) (to_ Stopped) *)

Health Endpoints

Each worker serves HTTP endpoints for Kubernetes probes and monitoring:

Port 9100: Health and Readiness

EndpointMethodProbe TypeResponse
/healthGETLiveness200 OK if process is running, 503 otherwise
/readyGETReadiness200 READY if state is Active, 503 NOT READY otherwise

The liveness probe tells Kubernetes “is the process alive?” – it returns 200 as long as the HTTP server is responsive, regardless of worker state. The readiness probe tells Kubernetes “should traffic be routed here?” – it returns 200 only when the worker is Active and processing events.

Port 9102: Metrics

EndpointMethodResponse
/metricsGETPrometheus exposition format

Returns all registered metrics (see Observability).

Port 9101: RPC

Async RPC port for delta exchange between workers. This is a binary protocol, not HTTP.

Configuration

Command-Line Flags

ripple-worker \
  --worker-id w0 \
  --partition-id p0 \
  --coordinator localhost:9200 \
  --rpc-port 9101 \
  --health-port 9100 \
  --metrics-port 9102
FlagDefaultDescription
--worker-idw0Unique worker identifier
--partition-idp0Partition assignment (may be overridden by coordinator)
--coordinatorlocalhost:9200Coordinator gRPC address
--rpc-port9101Async RPC port for delta exchange
--health-port9100HTTP port for health and readiness probes
--metrics-port9102HTTP port for Prometheus metrics

Environment Variables

In Kubernetes, configuration is typically injected via environment variables:

VariableDescription
RIPPLE_WORKER_IDWorker identifier (from StatefulSet pod name)
RIPPLE_COORDINATOR_HOSTCoordinator service hostname
RIPPLE_COORDINATOR_PORTCoordinator gRPC port
RIPPLE_CHECKPOINT_BUCKETS3 bucket for checkpoint storage
RIPPLE_KAFKA_BROKERSComma-separated Kafka broker addresses

S-expression Config File

For complex configuration, use an s-expression config file (/etc/ripple/ripple.sexp):

((cluster
  ((name prod)
   (coordinator
    ((host ripple-coordinator.ripple.svc.cluster.local)
     (port 9200)))
   (checkpoint_store
    ((backend S3)
     (bucket ripple-checkpoints)
     (prefix prod)))
   (defaults
    ((num_partitions 128)
     (max_keys_per_partition 2000)
     (checkpoint_interval_sec 10)
     (heartbeat_interval_sec 5)
     (failure_detection_timeout_sec 30)
     (allowed_lateness_sec 60)
     (backpressure
      ((warn_threshold 0.6)
       (slow_threshold 0.8)
       (pause_threshold 0.95))))))))

Heartbeat Loop

The worker sends heartbeats to the coordinator every 5 seconds:

let rec heartbeat_loop () =
  let%bind () = Clock.after (Time_float.Span.of_sec 5.0) in
  let gc = Gc.stat () in
  Metrics.Gauge.set Metrics.System.heap_words
    (Float.of_int gc.heap_words);
  Log.Global.debug_s [%message "heartbeat"
    (worker_id : string)
    ~events:(worker_state.events_processed : int)];
  heartbeat_loop ()

Each heartbeat also updates the GC heap size metric, providing continuous visibility into memory usage.

Graceful Shutdown

When the worker receives SIGTERM (Kubernetes pod termination):

  1. Transition to Stopping state
  2. Readiness probe starts returning 503 NOT READY
  3. Drain current batch (finish processing, do not start new batch)
  4. Write final checkpoint
  5. Close RPC connections
  6. Transition to Stopped
  7. Exit process

The Kubernetes terminationGracePeriodSeconds is set to 30 seconds to allow time for the drain and checkpoint write.