Worker Configuration
This page covers the Ripple worker process: its lifecycle state machine, health endpoints, configuration parameters, and operational behavior.
Worker Lifecycle
Workers progress through seven well-defined states. The state machine prevents invalid transitions at runtime.
┌──────────────┐
│ Starting │
│ (boot, load │
│ checkpoint) │
└──┬───┬───┬───┘
│ │ │
ok │ │ │ checkpoint found
│ │ │
┌────────────┘ │ └────────────┐
v │ v
┌───────────┐ │ ┌─────────────┐
│ Active │<─────────┘ │ Recovering │
│(processing│ ok │(replaying │
│ events) │<───────────────────│ from ckpt) │
└──┬──┬──┬──┘ └──────┬──────┘
│ │ │ │
│ │ │ drain request │ failure
│ │ v v
│ │ ┌──────────┐ ┌──────────┐
│ │ │ Draining │ │ Failed │
│ │ │(finish │ │(crashed) │
│ │ │ batch, │ └──────────┘
│ │ │ write │ ^
│ │ │ ckpt) │ │
│ │ └──┬───┬───┘ failure │
│ │ │ │ │
│ │ │ └──────────────────────┘
│ │ v
│ │ resume
│ │ (back to Active)
│ │
│ │ stop request
│ v
│ ┌───────────┐
│ │ Stopping │
│ │(graceful │
│ │ shutdown) │
│ └─────┬─────┘
│ │
│ v
│ ┌───────────┐
│ │ Stopped │
│ │(clean │
│ │ shutdown) │
│ └───────────┘
│
└──── failure ──> Failed
State Definitions
| State | Description | Duration |
|---|---|---|
Starting | Booting, loading configuration, checking for checkpoint | < 5s |
Active | Processing events from input sources | Indefinite |
Draining | Completing current batch, writing checkpoint | < 10s |
Stopping | Graceful shutdown in progress | < 30s |
Stopped | Shut down cleanly | Terminal |
Failed | Crashed or unrecoverable error | Until restart |
Recovering | Loading checkpoint and replaying events | < 30s (target) |
Valid Transitions
let valid_transition ~from ~to_ =
match from, to_ with
| Starting, Active -> true (* successful boot *)
| Starting, Failed -> true (* boot failure *)
| Starting, Recovering -> true (* checkpoint found, replay needed *)
| Active, Draining -> true (* checkpoint trigger or rebalance *)
| Active, Failed -> true (* crash during processing *)
| Active, Stopping -> true (* graceful shutdown *)
| Draining, Active -> true (* drain complete, resume *)
| Draining, Failed -> true (* crash during drain *)
| Draining, Stopping -> true (* shutdown during drain *)
| Stopping, Stopped -> true (* shutdown complete *)
| Stopping, Failed -> true (* crash during shutdown *)
| Failed, Recovering -> true (* restart initiated *)
| Recovering, Active -> true (* replay complete *)
| Recovering, Failed -> true (* replay failure *)
| _ -> false (* all other transitions are invalid *)
Invalid transitions return Or_error.t with a diagnostic message:
let result = Worker.transition w ~to_:Stopped in
(* Error: "Invalid state transition" (worker_id w1) (from Starting) (to_ Stopped) *)
Health Endpoints
Each worker serves HTTP endpoints for Kubernetes probes and monitoring:
Port 9100: Health and Readiness
| Endpoint | Method | Probe Type | Response |
|---|---|---|---|
/health | GET | Liveness | 200 OK if process is running, 503 otherwise |
/ready | GET | Readiness | 200 READY if state is Active, 503 NOT READY otherwise |
The liveness probe tells Kubernetes “is the process alive?” – it returns 200 as long as the HTTP server is responsive, regardless of worker state. The readiness probe tells Kubernetes “should traffic be routed here?” – it returns 200 only when the worker is Active and processing events.
Port 9102: Metrics
| Endpoint | Method | Response |
|---|---|---|
/metrics | GET | Prometheus exposition format |
Returns all registered metrics (see Observability).
Port 9101: RPC
Async RPC port for delta exchange between workers. This is a binary protocol, not HTTP.
Configuration
Command-Line Flags
ripple-worker \
--worker-id w0 \
--partition-id p0 \
--coordinator localhost:9200 \
--rpc-port 9101 \
--health-port 9100 \
--metrics-port 9102
| Flag | Default | Description |
|---|---|---|
--worker-id | w0 | Unique worker identifier |
--partition-id | p0 | Partition assignment (may be overridden by coordinator) |
--coordinator | localhost:9200 | Coordinator gRPC address |
--rpc-port | 9101 | Async RPC port for delta exchange |
--health-port | 9100 | HTTP port for health and readiness probes |
--metrics-port | 9102 | HTTP port for Prometheus metrics |
Environment Variables
In Kubernetes, configuration is typically injected via environment variables:
| Variable | Description |
|---|---|
RIPPLE_WORKER_ID | Worker identifier (from StatefulSet pod name) |
RIPPLE_COORDINATOR_HOST | Coordinator service hostname |
RIPPLE_COORDINATOR_PORT | Coordinator gRPC port |
RIPPLE_CHECKPOINT_BUCKET | S3 bucket for checkpoint storage |
RIPPLE_KAFKA_BROKERS | Comma-separated Kafka broker addresses |
S-expression Config File
For complex configuration, use an s-expression config file (/etc/ripple/ripple.sexp):
((cluster
((name prod)
(coordinator
((host ripple-coordinator.ripple.svc.cluster.local)
(port 9200)))
(checkpoint_store
((backend S3)
(bucket ripple-checkpoints)
(prefix prod)))
(defaults
((num_partitions 128)
(max_keys_per_partition 2000)
(checkpoint_interval_sec 10)
(heartbeat_interval_sec 5)
(failure_detection_timeout_sec 30)
(allowed_lateness_sec 60)
(backpressure
((warn_threshold 0.6)
(slow_threshold 0.8)
(pause_threshold 0.95))))))))
Heartbeat Loop
The worker sends heartbeats to the coordinator every 5 seconds:
let rec heartbeat_loop () =
let%bind () = Clock.after (Time_float.Span.of_sec 5.0) in
let gc = Gc.stat () in
Metrics.Gauge.set Metrics.System.heap_words
(Float.of_int gc.heap_words);
Log.Global.debug_s [%message "heartbeat"
(worker_id : string)
~events:(worker_state.events_processed : int)];
heartbeat_loop ()
Each heartbeat also updates the GC heap size metric, providing continuous visibility into memory usage.
Graceful Shutdown
When the worker receives SIGTERM (Kubernetes pod termination):
- Transition to
Stoppingstate - Readiness probe starts returning
503 NOT READY - Drain current batch (finish processing, do not start new batch)
- Write final checkpoint
- Close RPC connections
- Transition to
Stopped - Exit process
The Kubernetes terminationGracePeriodSeconds is set to 30 seconds to allow time for the drain and checkpoint write.