Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Observability

Ripple provides built-in observability through four mechanisms: metrics (Prometheus-compatible), distributed tracing (W3C Trace Context), graph introspection (sexp and DOT export), and rule-based alerting. This page covers each subsystem.

Metrics

Metrics are pre-registered at module initialization time. There are three metric types:

Counter

Monotonically increasing integer. Never decreases (except on process restart).

module Counter = struct
  type t = { name : string; mutable value : int }
  let incr t = t.value <- t.value + 1
  let incr_by t n = t.value <- t.value + n
end

Gauge

Float value that can go up or down. Represents current state.

module Gauge = struct
  type t = { name : string; mutable value : float }
  let set t v = t.value <- v
end

Histogram

Tracks distribution of observed values using fixed buckets. O(1) observe operation.

module Histogram = struct
  type t =
    { name : string
    ; buckets : float array
    ; mutable counts : int array
    ; mutable sum : float
    ; mutable count : int
    }
end

Default buckets: [0.001; 0.005; 0.01; 0.05; 0.1; 0.5; 1.0; 5.0; 10.0; 50.0; 100.0]

Pre-Registered Metrics

MetricTypeDescription
ripple_graph_stabilization_nsHistogramStabilization cycle duration in nanoseconds
ripple_graph_recompute_countHistogramNodes recomputed per stabilization
ripple_graph_node_countGaugeTotal nodes in the graph
ripple_graph_dirty_countGaugeCurrently dirty nodes
ripple_graph_stabilizations_totalCounterTotal stabilization cycles
ripple_graph_cutoff_hits_totalCounterTimes cutoff prevented propagation
ripple_transport_deltas_sent_totalCounterDeltas sent to other workers
ripple_transport_deltas_received_totalCounterDeltas received from other workers
ripple_transport_delta_bytes_totalCounterTotal delta bytes transmitted
ripple_transport_rpc_latency_nsHistogramRPC round-trip latency
ripple_transport_backpressure_levelGaugeCurrent backpressure (0.0 to 1.0)
ripple_window_active_windowsGaugeOpen window count
ripple_window_late_events_totalCounterMildly late events received
ripple_window_very_late_events_totalCounterVery late events (dropped)
ripple_window_retractions_totalCounterWindow result retractions
ripple_system_event_lag_nsHistogramEvent time lag (event time vs wall clock)
ripple_system_heap_wordsGaugeOCaml heap size in words

Prometheus Export

All metrics are exported in Prometheus exposition format via the /metrics HTTP endpoint on port 9102:

# TYPE ripple_graph_stabilizations_total counter
ripple_graph_stabilizations_total 1042

# TYPE ripple_graph_node_count gauge
ripple_graph_node_count 4001.000000

# TYPE ripple_graph_stabilization_ns histogram
ripple_graph_stabilization_ns_count 1042
ripple_graph_stabilization_ns_sum 284532.000000

The export function is a simple traversal of all registered metrics:

let to_prometheus () =
  let buf = Buffer.create 1024 in
  add_counter Graph.stabilizations_total;
  add_gauge Graph.node_count;
  add_histogram Graph.stabilization_time_ns;
  (* ... all metrics ... *)
  Buffer.contents buf

W3C Distributed Tracing

Trace context follows the W3C Trace Context specification:

type context =
  { trace_id : string     (* 32 hex chars, 128-bit *)
  ; span_id : string      (* 16 hex chars, 64-bit *)
  ; parent_id : string option
  ; sampled : bool
  ; baggage : (string * string) list
  }

Trace Propagation

Trace context propagates across worker boundaries via delta messages. Each stabilization cycle is a span:

Worker-A                              Worker-B
  span: "stabilize"                     span: "stabilize"
  trace_id: abc123...                   trace_id: abc123...  (same)
  span_id: def456...                    span_id: 789ghi...
  parent_id: None                       parent_id: def456... (links to A)

W3C Traceparent Header

Format: {version}-{trace_id}-{span_id}-{flags}
Example: 00-17380abc6f63f3cdaf093916e327df84-88b8fa2222fd7ba5-01

Parsed and generated by:

let to_traceparent ctx =
  let flags = if ctx.sampled then "01" else "00" in
  sprintf "00-%s-%s-%s" ctx.trace_id ctx.span_id flags

Adaptive Sampling

At high throughput, tracing every event is prohibitively expensive. The adaptive sampler adjusts the sampling rate to hit a target traces-per-second:

module Sampler = struct
  type t =
    { target_per_sec : int
    ; mutable rate : float          (* 0.0 to 1.0 *)
    ; mutable traces_this_second : int
    ; mutable last_adjust_ns : Int64.t
    ; min_rate : float
    ; max_rate : float
    }
end

Every second, the sampler computes:

new_rate = old_rate * (target / actual_traces)
new_rate = clamp(new_rate, min_rate, max_rate)

Default: target_per_sec = 100, min_rate = 0.001, max_rate = 1.0.

Graph Introspection

The introspection module provides runtime snapshots of the computation graph:

type node_info =
  { id : int
  ; height : int
  ; is_dirty : bool
  ; is_output : bool
  ; kind : string         (* "leaf", "map", "fold", etc. *)
  ; dependent_ids : int list
  ; value_sexp : Sexp.t option
  }

type graph_stats =
  { total_nodes : int
  ; dirty_nodes : int
  ; output_nodes : int
  ; max_height : int
  ; last_recompute_count : int
  }

S-expression Export

let snap = Introspect.snapshot graph in
let sexp = Introspect.to_sexp snap in
print_s sexp

Useful for expect tests that verify graph structure:

((nodes
  ((id 0)(height 0)(is_dirty false)(kind leaf)(dependent_ids (2)))
  ((id 1)(height 0)(is_dirty false)(kind leaf)(dependent_ids (2)))
  ((id 2)(height 1)(is_dirty false)(kind map2)(dependent_ids ())))
 (stats (total_nodes 3)(dirty_nodes 0)(output_nodes 0)(max_height 1)))

DOT Export

let dot = Introspect.to_dot snap in
Out_channel.write_all "graph.dot" ~data:dot

Produces Graphviz DOT format:

digraph ripple {
  rankdir=TB;
  node [fontname="monospace" fontsize=10];

  n0 [label="leaf_0" shape=ellipse];
  n1 [label="leaf_1" shape=ellipse];
  n2 [label="map2_2" shape=box color=blue style=bold];

  n0 -> n2;
  n1 -> n2;

  // nodes=3 dirty=0 outputs=1 max_height=1
}

Render with: dot -Tpng graph.dot -o graph.png

Alert Rules

Built-in alerts fire on operational conditions:

ConditionDefault ThresholdSeverity
Stabilization slow> 10msWarning
Backpressure high> 80%Critical
Late events excessive> 1% of totalWarning
Checkpoint stale> 20s (2x interval)Critical
Worker unresponsive> 30s no heartbeatCritical
Schema mismatchanyCritical
Data loss detectedanyCritical

Alerts are evaluated by the Alert.Manager:

let mgr = Alert.Manager.create () in
Alert.Manager.on_alert mgr ~f:(fun alert ->
  Log.error_s [%message "ALERT" (alert : Alert.t)]);
Alert.Manager.check_stabilization mgr
  ~stabilization_ns:15_000_000L
  ~worker_id:"w0" ~now_ns

All alert evaluation is local to the worker. The coordinator aggregates alerts across the cluster via heartbeat responses.