Observability
Ripple provides built-in observability through four mechanisms: metrics (Prometheus-compatible), distributed tracing (W3C Trace Context), graph introspection (sexp and DOT export), and rule-based alerting. This page covers each subsystem.
Metrics
Metrics are pre-registered at module initialization time. There are three metric types:
Counter
Monotonically increasing integer. Never decreases (except on process restart).
module Counter = struct
type t = { name : string; mutable value : int }
let incr t = t.value <- t.value + 1
let incr_by t n = t.value <- t.value + n
end
Gauge
Float value that can go up or down. Represents current state.
module Gauge = struct
type t = { name : string; mutable value : float }
let set t v = t.value <- v
end
Histogram
Tracks distribution of observed values using fixed buckets. O(1) observe operation.
module Histogram = struct
type t =
{ name : string
; buckets : float array
; mutable counts : int array
; mutable sum : float
; mutable count : int
}
end
Default buckets: [0.001; 0.005; 0.01; 0.05; 0.1; 0.5; 1.0; 5.0; 10.0; 50.0; 100.0]
Pre-Registered Metrics
| Metric | Type | Description |
|---|---|---|
ripple_graph_stabilization_ns | Histogram | Stabilization cycle duration in nanoseconds |
ripple_graph_recompute_count | Histogram | Nodes recomputed per stabilization |
ripple_graph_node_count | Gauge | Total nodes in the graph |
ripple_graph_dirty_count | Gauge | Currently dirty nodes |
ripple_graph_stabilizations_total | Counter | Total stabilization cycles |
ripple_graph_cutoff_hits_total | Counter | Times cutoff prevented propagation |
ripple_transport_deltas_sent_total | Counter | Deltas sent to other workers |
ripple_transport_deltas_received_total | Counter | Deltas received from other workers |
ripple_transport_delta_bytes_total | Counter | Total delta bytes transmitted |
ripple_transport_rpc_latency_ns | Histogram | RPC round-trip latency |
ripple_transport_backpressure_level | Gauge | Current backpressure (0.0 to 1.0) |
ripple_window_active_windows | Gauge | Open window count |
ripple_window_late_events_total | Counter | Mildly late events received |
ripple_window_very_late_events_total | Counter | Very late events (dropped) |
ripple_window_retractions_total | Counter | Window result retractions |
ripple_system_event_lag_ns | Histogram | Event time lag (event time vs wall clock) |
ripple_system_heap_words | Gauge | OCaml heap size in words |
Prometheus Export
All metrics are exported in Prometheus exposition format via the /metrics HTTP endpoint on port 9102:
# TYPE ripple_graph_stabilizations_total counter
ripple_graph_stabilizations_total 1042
# TYPE ripple_graph_node_count gauge
ripple_graph_node_count 4001.000000
# TYPE ripple_graph_stabilization_ns histogram
ripple_graph_stabilization_ns_count 1042
ripple_graph_stabilization_ns_sum 284532.000000
The export function is a simple traversal of all registered metrics:
let to_prometheus () =
let buf = Buffer.create 1024 in
add_counter Graph.stabilizations_total;
add_gauge Graph.node_count;
add_histogram Graph.stabilization_time_ns;
(* ... all metrics ... *)
Buffer.contents buf
W3C Distributed Tracing
Trace context follows the W3C Trace Context specification:
type context =
{ trace_id : string (* 32 hex chars, 128-bit *)
; span_id : string (* 16 hex chars, 64-bit *)
; parent_id : string option
; sampled : bool
; baggage : (string * string) list
}
Trace Propagation
Trace context propagates across worker boundaries via delta messages. Each stabilization cycle is a span:
Worker-A Worker-B
span: "stabilize" span: "stabilize"
trace_id: abc123... trace_id: abc123... (same)
span_id: def456... span_id: 789ghi...
parent_id: None parent_id: def456... (links to A)
W3C Traceparent Header
Format: {version}-{trace_id}-{span_id}-{flags}
Example: 00-17380abc6f63f3cdaf093916e327df84-88b8fa2222fd7ba5-01
Parsed and generated by:
let to_traceparent ctx =
let flags = if ctx.sampled then "01" else "00" in
sprintf "00-%s-%s-%s" ctx.trace_id ctx.span_id flags
Adaptive Sampling
At high throughput, tracing every event is prohibitively expensive. The adaptive sampler adjusts the sampling rate to hit a target traces-per-second:
module Sampler = struct
type t =
{ target_per_sec : int
; mutable rate : float (* 0.0 to 1.0 *)
; mutable traces_this_second : int
; mutable last_adjust_ns : Int64.t
; min_rate : float
; max_rate : float
}
end
Every second, the sampler computes:
new_rate = old_rate * (target / actual_traces)
new_rate = clamp(new_rate, min_rate, max_rate)
Default: target_per_sec = 100, min_rate = 0.001, max_rate = 1.0.
Graph Introspection
The introspection module provides runtime snapshots of the computation graph:
type node_info =
{ id : int
; height : int
; is_dirty : bool
; is_output : bool
; kind : string (* "leaf", "map", "fold", etc. *)
; dependent_ids : int list
; value_sexp : Sexp.t option
}
type graph_stats =
{ total_nodes : int
; dirty_nodes : int
; output_nodes : int
; max_height : int
; last_recompute_count : int
}
S-expression Export
let snap = Introspect.snapshot graph in
let sexp = Introspect.to_sexp snap in
print_s sexp
Useful for expect tests that verify graph structure:
((nodes
((id 0)(height 0)(is_dirty false)(kind leaf)(dependent_ids (2)))
((id 1)(height 0)(is_dirty false)(kind leaf)(dependent_ids (2)))
((id 2)(height 1)(is_dirty false)(kind map2)(dependent_ids ())))
(stats (total_nodes 3)(dirty_nodes 0)(output_nodes 0)(max_height 1)))
DOT Export
let dot = Introspect.to_dot snap in
Out_channel.write_all "graph.dot" ~data:dot
Produces Graphviz DOT format:
digraph ripple {
rankdir=TB;
node [fontname="monospace" fontsize=10];
n0 [label="leaf_0" shape=ellipse];
n1 [label="leaf_1" shape=ellipse];
n2 [label="map2_2" shape=box color=blue style=bold];
n0 -> n2;
n1 -> n2;
// nodes=3 dirty=0 outputs=1 max_height=1
}
Render with: dot -Tpng graph.dot -o graph.png
Alert Rules
Built-in alerts fire on operational conditions:
| Condition | Default Threshold | Severity |
|---|---|---|
| Stabilization slow | > 10ms | Warning |
| Backpressure high | > 80% | Critical |
| Late events excessive | > 1% of total | Warning |
| Checkpoint stale | > 20s (2x interval) | Critical |
| Worker unresponsive | > 30s no heartbeat | Critical |
| Schema mismatch | any | Critical |
| Data loss detected | any | Critical |
Alerts are evaluated by the Alert.Manager:
let mgr = Alert.Manager.create () in
Alert.Manager.on_alert mgr ~f:(fun alert ->
Log.error_s [%message "ALERT" (alert : Alert.t)]);
Alert.Manager.check_stabilization mgr
~stabilization_ns:15_000_000L
~worker_id:"w0" ~now_ns
All alert evaluation is local to the worker. The coordinator aggregates alerts across the cluster via heartbeat responses.