Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Ripple

Distributed, type-safe, incremental stream processing in OCaml.

Ripple extends the core insight of self-adjusting computation — when an input changes, only the affected subgraph recomputes — from a single process to a cluster of cooperating nodes.

What makes it different

Most stream processors recompute the entire operator graph when an input arrives. Ripple doesn’t. A min-heap tracks which nodes are dirty. An incremental fold updates the running total by subtracting the old value and adding the new. The result: 250ns per stabilization at 10,000 symbols, regardless of graph size.

At a glance

MetricValue
Stabilization (10K symbols)250 ns
bin_prot serde roundtrip82 ns
Schema compatibility check128 ns
VWAP pipeline throughput2.16M events/sec
6M event replay recovery2.1 seconds
Heap growth over 1M events0.1%
Crash recovery correctness100/100 random crash points

The stack

16 libraries, 3 binaries, ~6,200 lines of OCaml. Built on Jane Street’s Core, Async, and bin_prot.

lib/
├── kernel/          Effect injection, domain types
├── graph/           Heap-based stabilization, incremental fold
├── schema/          Type-safe schemas, delta algebra, compatibility
├── wire/            bin_prot protocol with CRC-32C integrity
├── transport/       Sequence-ordered delta buffer
├── checkpoint/      Snapshot/restore, pluggable stores
├── window/          Tumbling, sliding, session windows
├── time_series/     Aggregators (count, sum, mean, vwap, stddev)
├── topology/        Pipeline composition
├── observability/   Prometheus metrics, W3C tracing, introspection
├── coordinator/     Consistent hashing, partition assignment
├── worker/          Lifecycle state machine
├── rpc/             Async RPC delta transport
├── connector/       File, Kafka interfaces
└── ripple/          Top-level facade

View source on GitHub · Landing page

Installation

This page covers how to set up a development environment for Ripple, including OCaml toolchain installation, dependency management, and build verification.

Prerequisites

Ripple requires OCaml 5.1.0 or later and the following system-level dependencies:

DependencyVersionPurpose
OCaml>= 5.1.0Language runtime (effects, optimized GC)
opam>= 2.1Package manager
dune>= 3.0Build system
librdkafka>= 2.0Kafka client library (for connectors)
libssl>= 1.1TLS for Async RPC
pkg-configanyBuild dependency resolution

Installing the OCaml Toolchain

If you do not already have opam installed:

# Ubuntu/Debian
sudo apt-get install -y opam librdkafka-dev libssl-dev pkg-config

# macOS (Homebrew)
brew install opam librdkafka openssl pkg-config

# Initialize opam (first time only)
opam init --auto-setup
eval $(opam env)

# Create a switch with OCaml 5.3
opam switch create ripple 5.3.0
eval $(opam env)

Installing Ripple Dependencies

All OCaml dependencies are declared in ripple.opam and pinned to Jane Street v0.17 releases. Install them in one step:

cd ripple
opam install . --deps-only --yes

The full dependency list:

PackageVersionRole
core>= 0.17.0Jane Street standard library replacement
async>= 0.17.0Cooperative concurrency (Deferred.t, RPC)
bin_prot>= 0.17.0Binary serialization (wire protocol)
sexplib0>= 0.17.0S-expression serialization (schemas, checkpoints)
ppx_jane>= 0.17.0Meta-ppx including sexp, bin_io, compare, hash
ppx_expect>= 0.17.0Inline expect tests
ppx_bench>= 0.17.0Inline micro-benchmarks
ppx_inline_test>= 0.17.0Inline test runner
core_bench>= 0.17.0Benchmarking framework

Building

# Full build
dune build

# Or equivalently:
make build

The build produces three binaries:

BinaryPathPurpose
ripple-vwap-demobin/vwap_demo/main.exeDemo pipeline
ripple-workerbin/worker/main.exeProduction worker process
ripple-clibin/cli/main.exeCommand-line management tool

Running Tests

# Run all inline expect tests
dune runtest

# Or:
make test

Every .ml file in lib/ contains inline %expect_test blocks. These are the primary test mechanism – there are no separate test files for unit tests. Integration tests live in test/.

Installing Git Hooks

Ripple ships a pre-commit hook that runs the benchmark suite and gates on regression:

make install-hooks

This installs hooks/pre-commit which runs dune runtest before allowing a commit. If any expect test fails, the commit is rejected.

Running Benchmarks

# Run the full OCaml benchmark suite (core_bench)
make bench

# This executes bench/run_benchmarks.ml which includes:
#   B-01: Incremental stabilization throughput (1K and 10K symbols)
#   B-02: bin_prot serialization roundtrip
#   B-03: Delta diff performance
#   B-05: Replay recovery (2K and 10K symbols)
#   B-06: Schema validation and fingerprinting

Verifying the Installation

A quick smoke test to confirm everything works:

# Build, test, and benchmark in one command
make check

# Run the VWAP demo with synthetic data
make demo

The demo should produce CSV output to stdout and statistics to stderr, including throughput in events/sec and the final portfolio VWAP total.

Project Layout

ripple/
├── lib/
│   ├── kernel/        # Core types: Trade, Vwap, Effect_intf
│   ├── graph/         # Incremental graph engine, dirty_heap, cutoff
│   ├── schema/        # Schema, Delta, Compat
│   ├── wire/          # Envelope, Message, CRC-32C
│   ├── checkpoint/    # Checkpoint, Store (In_memory, Local_disk, S3)
│   ├── window/        # Window assigners, Watermark tracker
│   ├── observability/ # Metrics, Trace, Introspect, Alert
│   ├── coordinator/   # Hash ring, partition assignment
│   ├── worker/        # Worker lifecycle state machine
│   ├── connector/     # File source/sink, Kafka connector
│   ├── topology/      # Pipeline topology builder
│   └── ripple/        # Top-level Ripple module (facade)
├── bin/
│   ├── vwap_demo/     # VWAP demo binary
│   ├── worker/        # Worker binary
│   └── cli/           # CLI binary
├── bench/             # core_bench benchmarks
├── test/              # Integration and property tests
├── infra/             # Docker, Compose, Kubernetes manifests
└── book/              # This documentation (mdBook)

Your First Pipeline

This page walks through building a minimal incremental computation graph in OCaml. By the end, you will understand leaves, map nodes, fold nodes, stabilization, and output observation.

Core Concepts in 60 Seconds

Ripple’s computation model is a directed acyclic graph (DAG) of nodes:

  • Leaf nodes hold input values that you set externally.
  • Compute nodes (map, map2, fold, incr_fold) derive values from their parents.
  • Stabilization propagates changes from dirty leaves through the graph, recomputing only what changed.
    [leaf: price]     [leaf: size]
         |                 |
         v                 v
    [map: validate]   [map: validate]
              \          /
               v        v
           [map2: compute_vwap]
                   |
                   v
              [output: vwap]

Creating a Graph

Every graph requires a now function for timestamping stabilization cycles. In tests, use Effect_intf.Test.now for deterministic time. In production, use Time_ns.now.

open! Core

let () =
  (* Create graph with deterministic time for reproducibility *)
  let g = Ripple_graph.Graph.create
    ~now:Ripple_kernel.Effect_intf.Test.now
  in
  ignore g

Adding Leaf Nodes

Leaves are the entry points for data. Each leaf requires an initial value and an equality function for cutoff optimization (see Cutoff Optimization).

(* A leaf holding a float value *)
let price = Ripple_graph.Graph.add_leaf g
  ~initial:100.0
  ~equal:Float.equal
in

(* A leaf holding an int value *)
let volume = Ripple_graph.Graph.add_leaf g
  ~initial:0
  ~equal:Int.equal
in

add_leaf returns an 'a var – a typed handle for setting the value later. The type parameter 'a ensures you cannot set a float leaf with an int.

Deriving Compute Nodes

map: Transform a Single Parent

(* Convert var to incr for use as a dependency *)
let price_incr = Ripple_graph.Graph.var_to_incr price in

(* Double the price *)
let doubled = Ripple_graph.Graph.map g price_incr
  ~f:(fun p -> p *. 2.0)
  ~equal:Float.equal
in

map creates a node at height parent.height + 1, adds a dependency edge, and computes an initial value immediately.

map2: Combine Two Parents

let volume_incr = Ripple_graph.Graph.var_to_incr volume in

let notional = Ripple_graph.Graph.map2 g price_incr volume_incr
  ~f:(fun p v -> p *. Float.of_int v)
  ~equal:Float.equal
in

fold_array: Aggregate Many Parents

(* Sum across multiple price leaves *)
let leaves = Array.init 5 ~f:(fun i ->
  Ripple_graph.Graph.add_leaf g
    ~initial:(Float.of_int (i + 1))
    ~equal:Float.equal)
in
let incrs = Array.map leaves
  ~f:Ripple_graph.Graph.var_to_incr
in
let total = Ripple_graph.Graph.fold_array g incrs
  ~init:0.0
  ~f:(fun acc v -> acc +. v)
  ~equal:Float.equal
in

Important: fold_array is O(N) per stabilization – it re-folds all parents every time any parent changes. For large parent sets, use incr_fold_array instead.

incr_fold_array: O(1) Incremental Aggregation

let total_incr = Ripple_graph.Graph.incr_fold_array g incrs
  ~init:0.0
  ~add:(fun acc v -> acc +. v)
  ~remove:(fun acc v -> acc -. v)
  ~equal:Float.equal
in

incr_fold_array tracks which parents changed and applies only remove(old_value) + add(new_value) for each changed parent. This is the key primitive for VWAP: when one symbol’s price changes, the portfolio sum updates in O(1), not O(N).

Stabilization

Nothing happens until you call stabilize. This is the core operation – it processes all dirty nodes in topological order (min-heap sorted by height) and returns a list of changed outputs.

(* Set a leaf value *)
Ripple_graph.Graph.set_leaf price 150.0;

(* Propagate changes through the graph *)
let _changed_outputs = Ripple_graph.Graph.stabilize g in

(* Read the current value of any node *)
let current_doubled = Ripple_graph.Graph.watch g doubled in
printf "doubled = %.1f\n" current_doubled;
(* Output: doubled = 300.0 *)

Key properties of stabilize:

  • Idempotent: calling stabilize when no nodes are dirty does zero work.
  • Deterministic: same inputs always produce same outputs (provided now is deterministic).
  • Selective: only recomputes nodes whose inputs actually changed.

Watching Output Values

watch reads the current value of any 'a incr node:

let value : float = Ripple_graph.Graph.watch g notional in

This is a direct array lookup – O(1), no allocation.

Complete Example

open! Core

let () =
  Ripple_kernel.Effect_intf.Test.set_time Time_ns.epoch;
  let g = Ripple_graph.Graph.create
    ~now:Ripple_kernel.Effect_intf.Test.now
  in
  (* Build graph: price -> doubled, volume -> notional *)
  let price = Ripple_graph.Graph.add_leaf g
    ~initial:100.0 ~equal:Float.equal in
  let volume = Ripple_graph.Graph.add_leaf g
    ~initial:1000 ~equal:Int.equal in
  let p = Ripple_graph.Graph.var_to_incr price in
  let v = Ripple_graph.Graph.var_to_incr volume in
  let doubled = Ripple_graph.Graph.map g p
    ~f:(fun x -> x *. 2.0) ~equal:Float.equal in
  let notional = Ripple_graph.Graph.map2 g p v
    ~f:(fun p v -> p *. Float.of_int v) ~equal:Float.equal in
  (* Initial stabilize *)
  let _ = Ripple_graph.Graph.stabilize g in
  printf "doubled=%.1f notional=%.1f\n"
    (Ripple_graph.Graph.watch g doubled)
    (Ripple_graph.Graph.watch g notional);
  (* Update price only — volume's branch is untouched *)
  Ripple_graph.Graph.set_leaf price 150.0;
  let _ = Ripple_graph.Graph.stabilize g in
  printf "doubled=%.1f notional=%.1f recomputed=%d\n"
    (Ripple_graph.Graph.watch g doubled)
    (Ripple_graph.Graph.watch g notional)
    (Ripple_graph.Graph.last_recompute_count g)

Expected output:

doubled=200.0 notional=100000.0
doubled=300.0 notional=150000.0 recomputed=3

The second stabilization recomputes 3 nodes: the price leaf (changed), doubled (depends on price), and notional (depends on price). The volume leaf and any nodes that depend only on volume are untouched.

What’s Next

Running the VWAP Demo

The VWAP (Volume-Weighted Average Price) demo is a complete end-to-end pipeline that reads trade events, partitions by symbol, computes incremental VWAP with tumbling windows, and writes results to stdout. It is the proof that Ripple’s core claim holds: a typed, incremental, distributed pipeline defined in ~50 lines of OCaml that processes 100K+ events/sec.

Quick Start

# Generate and process 100K synthetic trades
make demo

# Or directly:
dune exec ripple-vwap-demo -- --synthetic 100000

Input Modes

The demo binary accepts three input modes:

Synthetic Mode (default)

Generates N trade events across 100 symbols with deterministic prices and sizes:

ripple-vwap-demo --synthetic 100000

Each synthetic event is spaced 1ms apart in event time. Symbols are named SYM0000 through SYM0099. Prices range from 100.0 to 110.0 with 0.1 increments.

File Mode

Reads trade events from a CSV file:

ripple-vwap-demo --file trades.csv

Stdin Mode

Reads trade events from standard input, one per line:

echo "AAPL,150.0,100,1000000000,XNAS" | ripple-vwap-demo --stdin

Or pipe from another process:

cat trades.csv | ripple-vwap-demo --stdin

CSV Format

Input records are comma-separated with 5 fields, no header:

symbol,price,size,timestamp_ns,venue
FieldTypeDescription
symbolstringTicker symbol (e.g., AAPL, SYM0042)
pricefloatTrade price
sizeintNumber of shares
timestamp_nsint64Nanoseconds since epoch (event time)
venuestringExchange venue (e.g., XNAS, XNYS)

Example input:

AAPL,150.00,100,1709000000000000000,XNAS
AAPL,150.25,200,1709000001000000000,XNAS
GOOG,175.50,50,1709000001500000000,XNYS
AAPL,150.10,150,1709000002000000000,XNAS

Lines starting with # are treated as comments and skipped. Empty lines are also skipped.

Output Format

Output is CSV written to stdout with 4 fields:

symbol,vwap,total_volume,trade_count

Example output:

AAPL,150.1222,450,3
GOOG,175.5000,50,1

Diagnostic information is written to stderr, so you can separate data from metadata:

ripple-vwap-demo --synthetic 10000 > output.csv 2> stats.txt

Pipeline Architecture

The demo constructs the following computation graph:

  [leaf: SYM0000 state]  [leaf: SYM0001 state]  ...  [leaf: SYM0099 state]
          |                       |                          |
          v                       v                          v
  [map: vwap_price]       [map: vwap_price]       ...  [map: vwap_price]
          |                       |                          |
          +----------+------------+--------------------------+
                     |
                     v
            [incr_fold: portfolio_total]   <-- O(1) per update
                     |
                     v
                 [output]

Each symbol has a leaf node holding a vwap_state record:

type vwap_state =
  { total_value : float    (* price * size cumulative *)
  ; total_volume : int     (* shares cumulative *)
  ; trade_count : int
  ; last_price : float
  }

When a trade arrives for symbol X:

  1. The leaf for X is updated with the new cumulative state
  2. stabilize fires, recomputing only X’s map node and the incr_fold
  3. The portfolio total updates in O(1) via remove(old_vwap) + add(new_vwap)

This means processing a single trade touches exactly 3 nodes regardless of how many symbols exist in the graph.

Processing Flow

Events are processed in batches of 1,000:

for each batch of 1000 events:
    for each event in batch:
        look up the symbol's leaf
        update vwap_state with new trade
        set_leaf with updated state
        advance watermark
    stabilize graph (once per batch)
    emit changed symbol VWAPs to stdout

Batching amortizes the stabilization cost. With 1,000-event batches and 100 symbols, a typical stabilization touches ~100 map nodes + 1 fold node.

Expected Output Statistics

When running --synthetic 100000:

--- Ripple VWAP Demo Results ---
Events processed:  100000
Symbols:           100
Graph nodes:       301
Stabilizations:    100
Elapsed:           0.XXXs
Throughput:        XXXXX events/sec
Watermark:         100999000000 ns
Portfolio total:   XXXX.XX
Output records:    XXXXX
  • Graph nodes = 301: 100 leaves + 100 map nodes + 1 incr_fold node = 201 compute nodes. The remaining 100 are the var_to_incr adapter nodes.
  • Stabilizations = 100: 100,000 events / 1,000 batch size = 100 stabilization cycles.
  • Throughput: typically 200K-500K events/sec on modern hardware, well above the 100K target.

Running with Real Market Data

To feed real trade data, convert your market data feed to the CSV format above. If you have TAQ data or FIX logs:

# Convert FIX log to Ripple CSV format
your-fix-converter --output trades.csv

# Process
ripple-vwap-demo --file trades.csv > vwap_results.csv

The only requirement is that timestamp_ns values be monotonically non-decreasing per symbol for correct watermark tracking. Out-of-order events are processed but classified as late by the watermark tracker.

Incremental Computation

Incremental computation is the foundational idea behind Ripple. This page explains what it is, why it makes Ripple fundamentally faster than re-computation-based systems, and how it compares to existing stream processing frameworks.

The Core Insight

Most data processing systems react to a change by recomputing everything from scratch. If you have 10,000 symbols and one price changes, a naive system recomputes all 10,000 VWAPs.

Incremental computation takes a different approach: recompute only what is affected by the change. If one price changes, recompute that symbol’s VWAP and any downstream aggregation – nothing else.

This is the difference between O(N) and O(R), where R is the number of nodes reachable from the changed input. In the typical case of a single trade arriving, R = 3 (leaf + map + fold), regardless of whether N is 1,000 or 100,000.

Self-Adjusting Computation

Ripple is built on the theory of self-adjusting computation, originally developed by Umut Acar and later implemented in Jane Street’s Incremental library. The key properties:

  1. Memoization of intermediate results. Every node in the computation graph caches its last computed value. A node is only recomputed if at least one of its inputs has changed.

  2. Change propagation, not re-execution. When an input changes, the system identifies the minimal set of nodes that need updating and processes them in topological order.

  3. Cutoff optimization. Even when a node is recomputed, if its new value equals its old value (by the user-provided equality function), propagation stops. This prevents phantom updates from rippling through the graph.

Comparison with Stream Processing Frameworks

Flink processes events through a DAG of operators. Each operator receives an event, processes it, and emits results downstream. This is a push-based dataflow model.

AspectFlinkRipple
ModelPush-based dataflowSelf-adjusting computation
RecomputationPer-event through full operator chainOnly affected subgraph
StatePer-operator keyed state (RocksDB)Node values in array (in-memory)
CheckpointingChandy-Lamport barriersSnapshot leaf values + input offsets
LanguageJava/ScalaOCaml
Latency floorNetwork + serialization overheadArray index + function call

Flink’s strength is horizontal scalability across commodity clusters. Ripple’s strength is per-node efficiency – sub-microsecond stabilization for the common case.

Kafka Streams

Kafka Streams is a client library (no separate cluster) that processes records from Kafka topics. It uses a topology of processors connected by internal topics.

AspectKafka StreamsRipple
DeploymentEmbedded in applicationDedicated worker processes
StateRocksDB state storesIn-memory incremental graph
WindowingTime-based with wall-clock triggersEvent-time with watermarks
Exactly-onceKafka transactionsIdempotent processing + dedup
RecomputationFull reprocess on state restoreCheckpoint leaf values, recompute derived

Materialize / Differential Dataflow

Materialize (built on Timely/Differential Dataflow) is the closest conceptual relative to Ripple. Both maintain materialized views that update incrementally.

AspectMaterializeRipple
FoundationDifferential Dataflow (Rust)Incremental (OCaml)
InterfaceSQLOCaml module signatures
Change representation(data, time, diff) triplesDelta type (Set/Patch/Remove)
DeploymentStandalone databaseEmbeddable library + workers

Ripple deliberately targets the same niche but within the Jane Street OCaml ecosystem, where interop with Core, Async, bin_prot, and ppx_* is essential.

Why O(R) Matters

Consider a VWAP pipeline with 2,000 symbols (the maximum per worker):

Graph structure:
  2000 leaf nodes (one per symbol)
  2000 map nodes (vwap_price extraction)
  1 incr_fold node (portfolio total)
  ─────────────────────────
  4001 total nodes

When a single trade arrives:

ApproachNodes touchedTime (measured)
Full recompute4,001~27 us
Incremental (Ripple)3~250 ns

The incremental approach is 100x faster for the single-change case. This difference compounds: at 100K events/sec, the full-recompute approach spends 2.7 seconds per second on computation alone, while the incremental approach spends 25 milliseconds.

The Stabilization Cycle

Every change in Ripple follows the same pattern:

1. set_leaf(var, new_value)    -- marks leaf dirty, pushes to heap
2. stabilize(graph)            -- processes dirty heap in height order
3. watch(graph, node)          -- reads current value (O(1) array lookup)

Between steps 1 and 2, you can set multiple leaves. Stabilization processes all of them in a single pass, which is more efficient than stabilizing after each individual change.

set_leaf(price_AAPL, 150.0)    -- AAPL leaf -> dirty
set_leaf(price_GOOG, 2800.0)   -- GOOG leaf -> dirty
set_leaf(price_MSFT, 420.0)    -- MSFT leaf -> dirty
stabilize(g)                    -- processes 3 leaves + their dependents
                                -- in one pass, height-ordered

This batching property is exploited by the VWAP demo, which processes 1,000 trades before stabilizing.

Deterministic Replay

Because the incremental graph is a pure function from inputs to outputs (given a deterministic clock), Ripple supports deterministic replay: load a checkpoint, replay the input log from the checkpoint’s offset, and arrive at exactly the same state as before the crash.

This property requires that no module in Ripple calls Time_ns.now() or Random.int directly. All non-determinism flows through the EFFECT module interface (see Effect Injection).

The Graph Engine

The graph engine is the most performance-critical component in Ripple. It manages the incremental computation graph, tracks dirty nodes, and executes the stabilization algorithm. This page covers the three key mechanisms: the dirty heap, height-ordered propagation, and cutoff optimization.

Architecture

  ┌──────────────────────────────────────────────────────┐
  │                    Graph                             │
  │                                                      │
  │  ┌──────────────────────┐  ┌──────────────────────┐  │
  │  │   nodes : node array │  │  dirty_heap : Heap.t │  │
  │  │   (contiguous memory)│  │  (min-heap by height)│  │
  │  └──────────────────────┘  └──────────────────────┘  │
  │                                                      │
  │  node_count : int                                    │
  │  last_recompute_count : int                          │
  │  last_stabilization_ns : Int63.t                     │
  │  now : unit -> Time_ns.t                             │
  │  output_infos : (int, output_info) Hashtbl.t         │
  └──────────────────────────────────────────────────────┘

Nodes are stored in a flat array for cache locality. The graph grows the array by 2x when capacity is exceeded. All node access during stabilization is by integer index – no pointer chasing, no hash lookups on the hot path.

Node Types

Each node has a recompute_fn variant that determines how its value is derived:

VariantParentsComplexityUse Case
Leaf0O(1)External input
Map1O(1)Transform single value
Map22O(1)Combine two values
FoldNO(N)Aggregate (re-folds all)
Incr_foldNO(changed)Incremental aggregate

The Dirty Heap

The dirty heap is an array-based binary min-heap keyed on (height, node_id). It determines the order in which dirty nodes are processed during stabilization.

Heap ordering: (height, node_id) — lexicographic
  - Lower height first (process dependencies before dependents)
  - Lower node_id as tiebreaker (deterministic ordering)

Implementation properties:

  • Array-based: no pointer indirection, cache-friendly iteration
  • Pre-allocated: capacity grows only at graph construction time, never during stabilization
  • O(1) membership check: in_heap boolean array indexed by node_id
  • O(log n) push/pop: standard heap sift-up/sift-down
  • Duplicate rejection: push checks in_heap before inserting
type t =
  { mutable entries : entry array
  ; mutable size : int
  ; mutable in_heap : bool array  (* O(1) membership by node_id *)
  ; mutable capacity : int
  }

let push t ~height ~node_id =
  if not t.in_heap.(node_id) then begin
    t.size <- t.size + 1;
    t.entries.(t.size) <- { height; node_id };
    t.in_heap.(node_id) <- true;
    sift_up t t.size
  end

This design was chosen over a dirty-flag scan (O(N)) after benchmark B-05 proved that linear scanning is untenable at 10K+ nodes. The heap gives O(R log R) where R is the number of dirty nodes.

Stabilization Algorithm

The stabilization loop is the hot path. Here is the algorithm in pseudocode:

stabilize(graph):
    recomputed = 0
    changed_outputs = []
    t_start = now()

    while heap is not empty:
        entry = heap.pop_min()
        node = nodes[entry.node_id]

        if not node.is_dirty:
            continue    // already processed via earlier propagation

        node.is_dirty = false
        old_value = node.value (or node.old_value for leaves)

        // Recompute based on node type
        new_value = match node.recompute:
            | Leaf        -> node.value  (already set by set_leaf)
            | Map(p, f)   -> f(nodes[p].value)
            | Map2(a,b,f) -> f(nodes[a].value, nodes[b].value)
            | Fold(ps,f,init) -> fold f init [nodes[p].value for p in ps]
            | Incr_fold   -> ... (see below)

        // Cutoff check: did the value actually change?
        if not cutoff_fn(old_value, new_value):
            node.value = new_value
            recomputed++

            // Propagate dirty to dependents
            for dep_id in node.dependents:
                dep = nodes[dep_id]
                if not dep.is_dirty:
                    dep.is_dirty = true
                    heap.push(dep.height, dep.id)

            // Record output changes
            if node.is_output:
                changed_outputs.add(node, old_value, new_value)

    t_end = now()
    graph.last_recompute_count = recomputed
    return changed_outputs

Invariants Maintained

The stabilization algorithm preserves four critical invariants:

IDInvariantMeaning
I-1node.height > height of all dependenciesTopological ordering is valid
I-2dirty_heap contains only dirty nodesNo spurious recomputation
I-3After stabilize, no node is dirtyFull convergence
I-5Node value consistent with recompute functionOutput correctness

Cutoff Optimization

Cutoff is the mechanism that stops propagation when a node’s recomputed value equals its previous value. This is critical for preventing phantom updates.

type 'a t =
  | Phys_equal    (* Default: physical equality *)
  | Equal of ('a -> 'a -> bool)  (* Custom equality *)
  | Always        (* Always propagate *)
  | Never         (* Never propagate — constant node *)

Example where cutoff prevents unnecessary work:

[leaf: raw_price]  value: 200.0
      |
      v
[map: clamp]       f: min(x, 100.0)  value: 100.0
      |
      v
[map: format]      f: sprintf "%.2f" x  value: "100.00"

If raw_price changes from 200.0 to 250.0, clamp recomputes to min(250, 100) = 100.0. Since 100.0 equals the old value, cutoff fires and format is never recomputed. This saves an allocation (string formatting) on the hot path.

Incr_fold: O(1) Incremental Aggregation

Incr_fold is the key innovation for large fan-in aggregation. Instead of re-folding all parents on every stabilization, it tracks which parents changed and applies only incremental updates:

stabilize Incr_fold node:
    acc = old_value
    for each changed_parent_index:
        pid = parents[index]
        old_parent_value = parent_values[index]    // snapshot from last cycle
        new_parent_value = nodes[pid].value
        acc = remove(acc, old_parent_value)
        acc = add(acc, new_parent_value)
        parent_values[index] = new_parent_value    // update snapshot
    changed_parent_indices = []                     // reset for next cycle
    return acc

During dirty propagation, when a parent of an Incr_fold node changes, the parent’s index is recorded in changed_parent_indices. This way, the fold only processes changed parents.

For the VWAP use case with 2,000 symbols:

  • Fold: recomputes sum across all 2,000 prices = O(2000)
  • Incr_fold: removes old price, adds new price = O(1)

Memory Layout

Nodes are stored in a contiguous array. Each node is approximately 200 bytes:

node (approx 200 bytes):
  id              : int          (8 bytes)
  height          : int          (8 bytes)
  value           : Obj.t        (8 bytes, pointer to heap-allocated value)
  is_dirty        : bool         (1 byte, padded to 8)
  is_output       : bool         (1 byte, padded to 8)
  old_value       : Obj.t        (8 bytes)
  recompute       : recompute_fn (varies, ~40-80 bytes)
  cutoff_fn       : closure      (8 bytes)
  dependents      : int list     (16+ bytes)
  sexp_of_value   : option       (8 bytes)
  value_of_sexp   : option       (8 bytes)

At 2,000 nodes (the max per worker), the node array occupies ~400 KB – well within L2 cache on modern CPUs. The dirty heap adds another ~80 KB. Total working set for the graph engine: under 500 KB.

Delta Algebra

Deltas are the fundamental unit of change in Ripple. Rather than transmitting complete values on every update, Ripple computes and transmits the minimal difference between the old and new value. This page covers the delta type, its algebraic laws, and why those laws matter for effectively-once processing.

The Delta Type

type patch =
  | Field_set of { field_name : string; value : Sexp.t }
  | Field_remove of { field_name : string }
  | Map_set of { key : Sexp.t; value : Sexp.t }
  | Map_remove of { key : Sexp.t }

type 'a t =
  | Set of 'a           (* Replace the entire value *)
  | Patch of patch list  (* Update specific fields *)
  | Remove               (* Delete the keyed entry *)

Set

Replaces the entire value. Used for initial sync, small records (below the patch threshold), or when most fields have changed.

Patch

A list of field-level updates. Each patch operation is absolute (not relative): Field_set { field_name = "price"; value = Atom "150.0" } sets the price to 150.0 regardless of what the previous price was. This is what makes patches idempotent.

Remove

Deletes the keyed entry. Used when a symbol is removed from the partition or a window is garbage-collected.

Patch.t: Absolute Field-Level Updates

A critical design decision: patches are absolute, not relative. Field_set { field_name = "price"; value = 150.0 } means “set price to 150.0”, not “add 10.0 to price”. This distinction is essential:

  • Absolute patches are idempotent: applying the same patch twice produces the same result.
  • Relative patches are not: adding 10.0 twice gives a different result than adding it once.

For effectively-once semantics (at-least-once delivery + idempotent processing), idempotency of patches is non-negotiable.

The Six Algebraic Laws

The compose function combines two sequential deltas into one. It satisfies six algebraic laws:

Law 1: Associativity

compose(a, compose(b, c)) = compose(compose(a, b), c)

Composition order does not matter when grouping. This allows the transport layer to batch and merge deltas without worrying about evaluation order.

Law 2: Right Identity (Set)

compose(d, Set(v)) = Set(v)

Any delta followed by a Set is equivalent to just the Set. The Set completely replaces whatever came before. This is because Set is an absolute operation – it establishes a new baseline regardless of history.

Law 3: Right Annihilation (Remove)

compose(d, Remove) = Remove

Any delta followed by Remove is equivalent to just Remove. If the value is going to be deleted, it does not matter what preceded it.

Law 4: Patch Merge (Last Writer Wins)

compose(Patch(p1), Patch(p2)) = Patch(merge(p1, p2))

When composing two patch lists, fields in p2 override fields in p1 with the same name. Fields present only in p1 or only in p2 are preserved.

(* Example: composing two patches *)
let p1 = Patch [Field_set { field_name = "price"; value = Atom "100" }]
let p2 = Patch [Field_set { field_name = "price"; value = Atom "200" }
               ;Field_set { field_name = "size";  value = Atom "50"  }]

compose p1 p2
(* = Patch [Field_set "price" "200"; Field_set "size" "50"] *)
(* p2's price wins; p2's size is added *)

Law 5: Compatibility

apply(compose(d1, d2), v) = apply(d2, apply(d1, v))

Composing two deltas and applying the result is equivalent to applying them sequentially. This is the law that makes delta compression safe: the transport layer can compose multiple deltas into one without changing the outcome.

Law 6: Idempotency of Apply

apply(d, apply(d, v)) = apply(d, v)

Applying a delta twice produces the same result as applying it once. This is the foundation of effectively-once semantics. If a delta is delivered twice (at-least-once delivery), the second application is a no-op.

Why Idempotency Matters

Ripple provides effectively-once semantics: at-least-once delivery combined with idempotent processing. The guarantee chain:

1. Kafka provides at-least-once delivery (consumer may re-read after crash)
2. Deltas are idempotent (apply(d, apply(d, v)) = apply(d, v))
3. Remote nodes track sequence numbers (duplicate detection)
4. Result: each event's effect is applied exactly once

    Effectively-once = at-least-once delivery
                     + idempotent processing
                     + exactly-once state

Without idempotent deltas, re-delivery after a crash would corrupt the computation. With idempotent deltas, re-delivery is harmless.

The Diff Algorithm

The diff function computes the minimal delta between two values:

let diff ~sexp_of ~equal ~patch_threshold ~old ~new_ =
  if equal old new_ then
    Patch []  (* No change *)
  else
    let old_sexp = sexp_of old in
    let new_sexp = sexp_of new_ in
    match old_sexp, new_sexp with
    | List old_fields, List new_fields ->
      if List.length new_fields <= patch_threshold then
        Set new_  (* Small record: Set is cheaper than Patch *)
      else
        (* Compare field by field, emit only changed fields *)
        Patch (changed_fields old_fields new_fields)
    | _ ->
      Set new_  (* Non-record: full replacement *)

The patch_threshold parameter controls when to use Patch vs Set. For a 3-field record, the overhead of field-level diffing exceeds the savings – just send the whole value. For a 20-field record where 1 field changed, Patch sends ~90% less data.

Benchmark B-03 measured an 8.9x byte reduction using Patch vs Set on a 20-field record with 2 fields changed. This directly translates to less network bandwidth and lower serialization cost on the wire.

The Roundtrip Property

The most important property for correctness:

apply(diff(old, new), old) = Ok new

For any two values, computing the diff and applying it to the old value must produce the new value. This is tested as an inline expect test in delta.ml and should be verified by property-based tests for any new type added to the system.

Application to the Wire Protocol

On the wire, deltas are serialized as s-expressions for debugging or as bin_prot for performance:

Delta message (wire):
  source_output: "vwap"
  key: "AAPL"
  delta_bytes: <bin_prot encoded Delta.t>
  sequence_no: 42
  watermark_ns: 1709000000000000000

The receiver applies the delta to its local remote node, which triggers incremental recomputation in the downstream graph. The sequence number ensures idempotent delivery – if sequence 42 has already been applied, the delta is silently discarded.

Schemas & Compatibility

Ripple uses first-class, serializable type descriptors to ensure that producers and consumers agree on the shape of data crossing partition boundaries. This page covers the Schema.t type, ppx-driven derivation, version management, compatibility rules, and fingerprinting.

The Schema.t Type

A schema is a runtime representation of an OCaml record type:

module Version : sig
  type t [@@deriving sexp, bin_io, compare, hash]
  val of_int_exn : int -> t   (* must be positive *)
  val to_int : t -> int
end

type field_type =
  | String | Int | Int64 | Float | Bool | Sexp_opaque
  | Option of field_type
  | List of field_type
  | Record of field list
  | Variant of variant_case list
  | Ref of string              (* cross-reference to another schema *)

and field =
  { name : string
  ; typ : field_type
  ; optional : bool
  ; doc : string option
  }

and variant_case =
  { case_name : string
  ; case_args : field_type list
  }

type t =
  { name : string
  ; version : Version.t
  ; fields : field list
  ; doc : string option
  }

Schemas are themselves serializable ([@@deriving sexp, bin_io, compare, hash]), which means they can be transmitted over the wire during the handshake protocol and stored alongside checkpoints.

ppx Derivation

In production, schemas are derived from OCaml types via ppx, not written by hand:

type trade =
  { symbol : string
  ; price : float
  ; size : int
  ; timestamp_ns : Int64.t
  ; venue : string
  }
[@@deriving sexp, bin_io, compare, hash, schema]

The [@@deriving schema] annotation (via ppx_schema, proposed) generates a val schema : Schema.t value at compile time. This ensures the schema always matches the type definition – no drift between code and metadata.

Until ppx_schema is implemented, schemas are constructed manually:

let trade_schema : Schema.t =
  { name = "trade"
  ; version = Version.of_int_exn 1
  ; fields =
      [ { name = "symbol"; typ = String; optional = false; doc = None }
      ; { name = "price"; typ = Float; optional = false; doc = None }
      ; { name = "size"; typ = Int; optional = false; doc = None }
      ; { name = "timestamp_ns"; typ = Int64; optional = false; doc = None }
      ; { name = "venue"; typ = String; optional = false; doc = None }
      ]
  ; doc = Some "Trade event from exchange"
  }

Version Management

Versions are positive integers. Each breaking change increments the version. The version number is carried in every schema and checked during connection handshake.

Rules:

  • Version 1 is the initial schema.
  • Adding an optional field does not require a version bump (backward and forward compatible).
  • Adding a required field requires a version bump (breaks backward compatibility).
  • Removing a field requires a version bump.
  • Changing a field type requires a version bump (except widening conversions like Int -> Int64).

Compatibility Rules

Ripple enforces three levels of compatibility, following the same framework as Apache Avro and Confluent Schema Registry:

Backward Compatibility

A new reader can read data written by an old writer.

check_backward ~reader:v2 ~writer:v1

Rules:

  • Every required field in the reader must exist in the writer.
  • Field types must be compatible (same type, or a widening conversion).
  • New optional fields in the reader are allowed (reader uses default).

Forward Compatibility

An old reader can read data written by a new writer.

check_forward ~reader:v1 ~writer:v2

Rules:

  • Every required field in the reader must still exist in the writer.
  • The writer may add new fields that the old reader ignores.
  • The writer must not remove fields that the old reader requires.

Full Compatibility

Both backward and forward compatibility hold simultaneously.

check_full ~old:v1 ~new_:v2

Full compatibility is required for zero-downtime rolling upgrades: during a rolling deploy, some workers run v1 and others run v2. Both must be able to read each other’s output.

Compatibility Decision Table

ChangeBackwardForwardFull
Add optional fieldYesYesYes
Add required fieldNoYesNo
Remove optional fieldYesNoNo
Remove required fieldNoNoNo
Widen type (Int -> Int64)YesNoNo
Narrow type (Int64 -> Int)NoNoNo
Rename fieldNoNoNo

Incompatibility Reporting

When a compatibility check fails, the result includes structured error details:

type incompatibility =
  { path : string list        (* e.g., ["trade"; "venue"] *)
  ; kind : incompatibility_kind
  ; detail : string
  }

type result =
  | Compatible
  | Incompatible of incompatibility list

This allows the CLI and monitoring system to report exactly which fields caused the incompatibility.

Fingerprinting

Every schema has a deterministic fingerprint computed from its canonical s-expression form:

let fingerprint t =
  let sexp = to_canonical_sexp t in
  let canonical = Sexp.to_string sexp in
  sprintf "%016Lx" (Int64.of_int (String.hash canonical))

The canonical form sorts fields alphabetically before hashing, so field declaration order does not affect the fingerprint:

(* These two schemas have the same fingerprint *)
let s1 = { fields = [b_field; a_field]; ... }
let s2 = { fields = [a_field; b_field]; ... }
fingerprint s1 = fingerprint s2  (* true *)

Fingerprints are used in:

  1. Wire protocol headers: every delta message carries the schema fingerprint of its payload. The receiver verifies the fingerprint matches its local schema before deserializing.
  2. Checkpoint metadata: the checkpoint records which schema version was active when the snapshot was taken.
  3. Connection handshake: workers exchange schema fingerprints to verify compatibility before streaming deltas.

Fingerprint Stability Property

fingerprint(s) = fingerprint(t_of_sexp(sexp_of_t(s)))

A schema’s fingerprint is stable across serialization roundtrips. This is tested in the inline expect tests and is essential for cross-process fingerprint comparison.

Effect Injection

Ripple requires deterministic replay for crash recovery: given the same input sequence from a checkpoint, the system must produce exactly the same output. This means all non-determinism must be injectable through a single interface. This page explains the EFFECT module type, its two implementations, and why this design is mandatory.

The Problem

Consider a node that timestamps its output:

(* BAD: direct call to Time_ns.now *)
let f input =
  { data = compute input; timestamp = Time_ns.now () }

This function is non-deterministic. During replay after a crash, Time_ns.now() returns a different value than during the original computation. The replayed state diverges from the original, violating the determinism invariant.

The same problem applies to:

  • Random number generation (used for sampling, jitter)
  • Network I/O (reading from sockets)
  • File I/O (reading configuration)
  • System calls (getpid, hostname)

The EFFECT Module Type

All non-determinism flows through a single module signature:

module type S = sig
  val now : unit -> Time_ns.t
  val random_int : int -> int
end

Every component that needs time or randomness takes now or random_int as a parameter rather than calling Time_ns.now or Random.int directly.

The graph engine is parameterized by now:

let create ~now =
  { nodes = Array.create ~len:1024 (Obj.magic ())
  ; node_count = 0
  ; dirty_heap = Dirty_heap.create ~capacity:1024
  ; ...
  ; now    (* injected, not called directly *)
  }

The tracing system is parameterized by random_int:

let create_root ~random_int =
  { trace_id = gen_trace_id ~random_int
  ; span_id = gen_span_id ~random_int
  ; ...
  }

Two Implementations

Live: Production

module Live : S = struct
  let now = Time_ns.now
  let random_int = Random.int
end

Used by the worker binary and CLI. Provides real wall-clock time and pseudorandom numbers.

Test: Deterministic Simulation

module Test : sig
  include S
  val advance_time : Time_ns.Span.t -> unit
  val set_time : Time_ns.t -> unit
  val seed_random : int -> unit
end = struct
  let current_time = ref Time_ns.epoch
  let rng = ref (Random.State.make [| 42 |])

  let now () = !current_time
  let random_int bound = Random.State.int !rng bound

  let advance_time span =
    current_time := Time_ns.add !current_time span
  let set_time t = current_time := t
  let seed_random seed = rng := Random.State.make [| seed |]
end

Used by all tests, benchmarks, and the deterministic simulation harness. Time only advances when explicitly stepped. Random sequences are reproducible from a seed.

Usage Patterns

In Tests

let%expect_test "stabilization timing is deterministic" =
  Test.set_time Time_ns.epoch;
  let g = Graph.create ~now:Test.now in
  (* ... build graph ... *)
  Test.advance_time (Time_ns.Span.of_ms 1.0);
  let _ = Graph.stabilize g in
  (* last_stabilization_ns is exactly 1_000_000 ns *)

In the Worker Binary

let run ~worker_id ~partition_id =
  let now = Time_ns.now in  (* Live time *)
  let worker = Worker.create ~worker_id ~partition_id ~now in
  ...

In Deterministic Simulation

let simulate ~seed ~ticks =
  Test.seed_random seed;
  Test.set_time Time_ns.epoch;
  let g = Graph.create ~now:Test.now in
  for _ = 1 to ticks do
    Test.advance_time (Time_ns.Span.of_us 100.0);
    (* inject events, stabilize, check invariants *)
  done

Why This Design Is Mandatory

Deterministic Replay

The checkpoint-and-replay recovery protocol depends on determinism:

1. Load checkpoint (leaf values + input offsets)
2. Rebuild graph structure
3. Restore leaf values from checkpoint
4. Replay input log from checkpoint's offset
5. Result: same graph state as before crash

Step 5 only holds if every computation produces the same result given the same inputs. If any node calls Time_ns.now() directly, the replayed state diverges at that node and all its descendants.

Simulation Testing

The deterministic simulation harness (inspired by TigerBeetle’s approach) runs millions of simulated operations with injected failures. Each simulation is parameterized by a seed. When a bug is found, the seed reproduces the exact failure sequence.

This is impossible if the system has any direct sources of non-determinism.

The Rule

From the codebase:

“No module in Ripple may call Time_ns.now(), Random.int, or perform direct I/O. All such operations go through the EFFECT interface. Violations break deterministic replay and are considered bugs.”

This rule is enforced by code review, not by the type system (OCaml does not have an effect system that prevents calling Time_ns.now). A future direction would be to use OCaml 5 effects to enforce this at the type level.

Extending the Interface

When adding new sources of non-determinism, extend the S signature:

module type S = sig
  val now : unit -> Time_ns.t
  val random_int : int -> int
  (* Future additions: *)
  (* val hostname : unit -> string *)
  (* val getpid : unit -> int *)
end

Both Live and Test must be updated. The Test implementation must return deterministic values controllable by the test harness.

System Overview

Ripple is a distributed incremental stream processing system. This page describes the overall topology, the components within each worker node, the cluster-level services, and how data flows through the system.

System Topology

                     ┌─────────────────────┐
                     │    Coordinator       │
                     │  (stateless, HA)     │
                     │                      │
                     │  - Hash ring         │
                     │  - Partition assign  │
                     │  - Heartbeat detect  │
                     │  - Checkpoint trigger│
                     └────────┬─────────────┘
                              │ gRPC :9200
              ┌───────────────┼───────────────┐
              │               │               │
    ┌─────────v──────┐ ┌──────v────────┐ ┌────v──────────┐
    │   Worker-0     │ │   Worker-1    │ │   Worker-2    │
    │   partitions   │ │   partitions  │ │   partitions  │
    │   [0,1,2,3]    │ │   [4,5,6,7]   │ │   [8,9,10,11] │
    │                │ │               │ │               │
    │ ┌────────────┐ │ │               │ │               │
    │ │ Incr Graph │ │ │      ...      │ │      ...      │
    │ └────────────┘ │ │               │ │               │
    └───┬────────┬───┘ └───────────────┘ └───────────────┘
        │        │
   ┌────v──┐  ┌──v────┐
   │ Kafka │  │ MinIO │
   │(input)│  │(ckpt) │
   └───────┘  └───────┘

   Ports per worker:
     :9100 — HTTP health (/health, /ready)
     :9101 — Async RPC (delta exchange)
     :9102 — Prometheus metrics (/metrics)

Per-Node Components

Each worker contains the following components:

Incremental Graph Engine

The core computation engine. Holds the node array, dirty heap, and stabilization loop. Each worker has exactly one graph instance processing its assigned partitions.

See: The Graph Engine

Connector Layer

Sources and sinks that connect the graph to external systems:

ConnectorRole
Kafka_connectorReads trade events from Kafka topics
File_sourceReads from CSV files or stdin (demo/testing)
File_sinkWrites CSV output to files or stdout

Delta Transport

Handles cross-partition communication:

ComponentRole
Delta_bufferBuffers incoming deltas, deduplicates by sequence number
Delta_rpcAsync RPC client/server for delta exchange between workers
Remote_nodeGraph leaf that receives values from a remote worker

Checkpoint Manager

Manages periodic snapshots of graph state:

ComponentRole
CheckpointSerializable snapshot (leaf values + input offsets)
StorePluggable backend (In_memory, Local_disk, S3)

Window Manager

Tracks event-time windows and watermarks:

ComponentRole
WindowTime interval with assigner (tumbling/sliding/session)
WatermarkTracks completeness across sources (min of all)

Observability

ComponentRole
MetricsCounters, gauges, histograms (Prometheus-compatible)
TraceW3C distributed tracing with adaptive sampling
IntrospectGraph snapshot to sexp or DOT format
AlertRule-based alert evaluation and notification

Cluster-Level Services

Coordinator

The coordinator is a stateless service that manages the cluster topology:

  • Hash Ring: consistent hashing for partition-to-worker assignment
  • Worker Registration: workers register on startup, receive partition assignments
  • Heartbeat Detection: detects failed workers via heartbeat timeout (default 30s)
  • Checkpoint Triggers: coordinates cluster-wide checkpoint at configurable intervals (default 10s)
  • Rebalancing: reassigns partitions when workers join or leave

The coordinator runs as a Kubernetes Deployment with 2 replicas for HA. It is stateless – all persistent state lives in the checkpoint store (S3).

Kafka / Redpanda

The input event stream. Ripple reads from Kafka-compatible brokers (Redpanda recommended for development). Topics are partitioned, and Ripple’s partitioning aligns with Kafka partition IDs for locality.

MinIO / S3

Checkpoint storage. Checkpoints are written atomically (single-object PUT for S3, temp+rename for local disk) to ensure crash safety.

Data Flow

 Kafka topic: "trades"
       │
       │  (partitioned by symbol hash)
       │
       v
 ┌─ Worker-0 ───────────────────────────────────────────┐
 │                                                       │
 │  1. Read batch from Kafka partition                   │
 │  2. For each event:                                   │
 │     a. Assign to window (tumbling 60s)                │
 │     b. Update leaf node (set_leaf)                    │
 │     c. Advance watermark                              │
 │  3. Stabilize graph                                   │
 │  4. Collect changed outputs                           │
 │  5. Emit deltas to downstream workers (Async RPC)     │
 │  6. Write output to Kafka topic: "vwap-output"        │
 │                                                       │
 │  Periodically:                                        │
 │  - Checkpoint graph state to S3                       │
 │  - Send heartbeat to coordinator                      │
 │  - Export metrics to Prometheus                        │
 └───────────────────────────────────────────────────────┘

Cross-Partition Data Flow

When a worker’s output is consumed by another worker (e.g., a second-level aggregation), the delta transport handles the communication:

 Worker-0 (partition 0-3)          Worker-1 (partition 4-7)
 ┌──────────────────────┐          ┌──────────────────────┐
 │  [graph]             │          │  [graph]             │
 │    |                 │          │    |                  │
 │    v                 │          │    v                  │
 │  [output: vwap_p0]   │─delta──>│  [remote: vwap_p0]   │
 │                      │  RPC    │    |                  │
 └──────────────────────┘          │    v                  │
                                   │  [fold: global_vwap] │
                                   └──────────────────────┘

The remote node (add_remote) behaves identically to a local leaf from the graph’s perspective. The delta transport applies incoming deltas and sets the remote node’s value, which triggers incremental recomputation in the downstream graph.

Pipeline Topology

Pipelines are defined in OCaml code, not YAML:

let pipeline =
  Topology.create ~name:"vwap-60s"
  |> Topology.source ~name:"trades" ~topic:"raw-trades"
  |> Topology.partition ~name:"by-symbol" ~key_name:"symbol"
  |> Topology.window ~name:"1min"
       ~config:(Tumbling { size = Time_ns.Span.of_sec 60.0 })
  |> Topology.fold ~name:"vwap" ~f_name:"Vwap.compute"
  |> Topology.sink ~name:"output" ~topic:"vwap-results"

The topology builder validates the pipeline at construction time (source must be first, sink must be last, no duplicate names) and produces a Topology.t that the coordinator uses for partition assignment and deployment planning.

Wire Protocol

Ripple uses a binary wire protocol for cross-worker communication. Every message is wrapped in an envelope with integrity checking, schema identification, and sequence numbering. This page covers the envelope format, message types, handshake protocol, and error detection.

Message Envelope Format

Every wire message follows this layout:

 Offset  Size    Field
 ──────  ────    ─────
 0       4       Magic number (0x52495050 = "RIPP")
 4       1       Protocol version (currently 1)
 5       1       Message type (uint8)
 6       1       Flags (uint8, reserved)
 7       8       Sequence number (int64 LE)
 15      8       Timestamp (nanoseconds since epoch, int64 LE)
 23      32      Schema fingerprint (zero-padded ASCII)
 55      4       Payload length (int32 LE)
 59      N       Payload (bin_prot encoded)
 59+N    4       CRC-32C (Castagnoli, int32 LE)

Total overhead per message: 63 bytes (header + CRC). The payload is variable-length, encoded with bin_prot.

Field Details

Magic number: 0x52495050 (“RIPP” in ASCII). Allows detection of protocol mismatch or corrupt stream start.

Protocol version: Currently 1. Incremented for breaking wire format changes. The receiver must reject messages with an unsupported version.

Message type: Single byte identifying the payload variant. See the message types table below.

Flags: Reserved for future use (e.g., compression bit, priority). Currently always 0.

Sequence number: Monotonically increasing per sender. Used for:

  • Duplicate detection (idempotent delivery)
  • Gap detection (missing messages)
  • Ordering verification

Timestamp: Nanoseconds since Unix epoch. Event time, not wall-clock time. Used for watermark propagation.

Schema fingerprint: 32-byte zero-padded string. Identifies the schema of the payload. The receiver compares this against its local schema registry to verify compatibility before deserialization.

Payload length: 32-bit little-endian integer. Allows the receiver to read exactly the right number of bytes.

CRC-32C: Castagnoli polynomial (0x1EDC6F41). Covers all bytes from offset 0 through the end of the payload. A single bit flip in any field is detected before deserialization.

CRC-32C Integrity

The CRC uses the Castagnoli polynomial, which is hardware-accelerated on modern x86 CPUs (SSE 4.2 crc32 instruction). The implementation uses a lookup table for portability:

let compute_string s =
  let crc = ref 0xFFFFFFFFl in
  String.iter s ~f:(fun c ->
    let byte = Char.to_int c in
    let index = Int32.to_int_trunc
      (Int32.( land ) (Int32.( lxor ) !crc
        (Int32.of_int_trunc byte)) 0xFFl) in
    crc := Int32.( lxor )
      (Int32.shift_right_logical !crc 8) table.(index));
  Int32.( lxor ) !crc 0xFFFFFFFFl

On decode, the receiver:

  1. Computes CRC-32C over all bytes except the last 4
  2. Compares against the stored CRC
  3. Rejects the message on mismatch (no deserialization attempted)

This catches corruption from network bit flips, partial writes, and truncated messages before any bin_prot parsing occurs.

Message Types

TagTypeDirectionPurpose
0HandshakebidirectionalConnection establishment
1Deltaproducer -> consumerData change (hot path)
2Checkpoint_requestcoordinator -> workerTrigger checkpoint
3Checkpoint_ackworker -> coordinatorCheckpoint complete
4Heartbeatworker -> coordinatorLiveness signal
5Schema_negotiationbidirectionalSchema compatibility response

Handshake

type handshake =
  { source_worker_id : string
  ; output_schemas : (string * Schema.t) list
  ; protocol_version : int
  }

Sent on connection establishment. The receiver validates:

  1. Protocol version compatibility
  2. Schema compatibility for each declared output
  3. Responds with Schema_negotiation (compatible/incompatible + error details)

Delta (Hot Path)

type delta_msg =
  { source_output : string       (* which output produced this delta *)
  ; key : string                 (* bin_prot encoded key *)
  ; delta_bytes : string         (* bin_prot encoded Delta.t *)
  ; sequence_no : int
  ; watermark_ns : Int64.t option
  }

The delta message is the most frequent message type. It carries:

  • The name of the output that changed
  • The key (typically a symbol) identifying which entry changed
  • The serialized delta (Set, Patch, or Remove)
  • A sequence number for idempotent delivery
  • An optional watermark update piggy-backed on the data message

Heartbeat

type heartbeat =
  { worker_id : string
  ; timestamp_ns : Int64.t
  ; load : float       (* 0.0 to 1.0 *)
  }

Sent periodically (default every 5 seconds). The coordinator uses heartbeats for liveness detection and load-aware partition assignment.

Handshake Protocol

Connection establishment follows a 3-step protocol:

Worker-A                              Worker-B
   │                                     │
   │─── Handshake(schemas=[vwap/v2]) ──>│
   │                                     │
   │    check_backward(reader=v2,        │
   │                   writer=v2)        │
   │                                     │
   │<── Schema_negotiation(ok=true) ────│
   │                                     │
   │─── Delta(key=AAPL, seq=1) ────────>│
   │─── Delta(key=GOOG, seq=2) ────────>│
   │    ...                              │

If schema negotiation fails:

Worker-A                              Worker-B
   │                                     │
   │─── Handshake(schemas=[vwap/v3]) ──>│
   │                                     │
   │    check_backward(reader=v2,        │
   │                   writer=v3) FAIL   │
   │                                     │
   │<── Schema_negotiation(ok=false,     │
   │     errors=["missing field x"]) ───│
   │                                     │
   │    [connection rejected]            │

Sequence Numbering

Each worker maintains a monotonically increasing sequence counter per output. The receiver tracks the last-seen sequence number per source:

let set_remote remote ~sequence_no value =
  if sequence_no <= remote.sequence_no then
    false  (* Already seen -- idempotent discard *)
  else begin
    (* Apply the update *)
    remote.sequence_no <- sequence_no;
    true
  end

This provides:

  • Idempotent delivery: duplicate messages (from at-least-once transport) are silently discarded
  • Gap detection: if sequence jumps from 42 to 44, sequence 43 was lost (log warning)
  • Ordering: within a single source, deltas are applied in sequence order

Encoding

Payload encoding uses bin_prot for performance. The bin_prot format is:

  • Self-describing length prefixes
  • Native-endian integers (fast, but not portable across architectures)
  • No schema embedded in the payload (schema is identified by fingerprint in the header)

For debugging and human-readable inspection, the sexp format is available as a fallback. The debug tap CLI command decodes delta messages and prints them as s-expressions.

Checkpointing

Checkpoints are the foundation of Ripple’s fault tolerance. A checkpoint captures the complete state of a worker’s incremental graph at a consistent point in time, enabling recovery after crashes without replaying the entire input history. This page covers the checkpoint type, atomicity guarantees, pluggable stores, the restore protocol, and the effectively-once boundary.

The Checkpoint Type

type node_snapshot =
  { node_id : int
  ; height : int
  ; value_bytes : string   (* bin_prot or sexp serialized *)
  }

type t =
  { checkpoint_id : string            (* "ckpt-w1-42" *)
  ; worker_id : string
  ; partition_id : string
  ; timestamp_ns : Int64.t
  ; epoch : int                       (* monotonically increasing counter *)
  ; node_snapshots : node_snapshot list
  ; input_offsets : (string * Int64.t) list  (* source -> offset *)
  ; node_count : int
  }

Key design decision: only leaf node values are checkpointed. Compute nodes (map, fold, incr_fold) are not stored – they are recomputed by running stabilize after restoring the leaf values. This reduces checkpoint size by ~60% and avoids serializing closures.

The input_offsets field records the Kafka consumer offset for each input source at the time of the checkpoint. This is the replay starting point: on recovery, the consumer seeks to these offsets and replays forward.

Atomic Writes

Checkpoint writes are atomic. A crash mid-write must never leave a corrupt checkpoint visible to the restore protocol.

Local Disk: temp + rename

let write ~dir ckpt =
  let data = Bin_prot.Utils.bin_dump bin_writer_t ckpt
    |> Bigstring.to_string in
  let final_path = dir ^/ ckpt.checkpoint_id ^ ".ckpt" in
  let temp_path = final_path ^ ".tmp" in
  (* Write to temp file *)
  Out_channel.write_all temp_path ~data;
  (* Atomic rename -- crash before rename leaves only temp *)
  Core_unix.rename ~src:temp_path ~dst:final_path

If the process crashes during write_all, only the .tmp file exists. On recovery, .tmp files are ignored. If the process crashes after rename, the checkpoint is complete and valid.

S3: Single-Object PUT

S3 provides strong read-after-write consistency for individual objects. A single PutObject is atomic from the reader’s perspective – the object is either fully visible or not visible at all.

s3://ripple-checkpoints/prod/ckpt-w0-42.ckpt

There is no need for temp files or two-phase commit on S3.

Pluggable Stores

All checkpoint stores implement the same module signature:

module type S = sig
  type t

  val write : t -> Checkpoint.t -> unit Or_error.t
  val read : t -> checkpoint_id:string -> Checkpoint.t Or_error.t
  val latest : t -> worker_id:string -> Checkpoint.t option
  val list : t -> worker_id:string -> Checkpoint.t list
  val delete : t -> checkpoint_id:string -> unit Or_error.t
end

Three implementations are provided:

StoreUse CaseAtomicity Mechanism
In_memoryUnit testsHash table (trivially atomic)
Local_diskDevelopmenttemp file + rename
S3ProductionSingle-object PUT

Creating a Store

(* Testing *)
let store = Store.In_memory.create ()

(* Development *)
let store = Store.Local_disk.create ~dir:"/var/lib/ripple/checkpoints"

(* Production *)
let store = Store.S3.create
  ~bucket:"ripple-checkpoints"
  ~prefix:"prod"
  ~region:"us-east-1"

Snapshot and Restore

Taking a Checkpoint

let ckpt = Checkpoint.snapshot_graph
  ~graph:g
  ~worker_id:"w0"
  ~partition_id:"p0"
  ~epoch:42
  ~input_offsets:[("trades", 1000000L)]
  ~now
in
let _ = Store.write store ckpt

snapshot_graph calls Graph.snapshot_leaf_values which iterates over all nodes and serializes those with registered sexp_of_value functions. Only Leaf nodes with serialization support are included.

Restoring from Checkpoint

The restore protocol is a 4-step process:

1. Load latest checkpoint for this worker
2. Rebuild the graph structure (same code as initial construction)
3. Restore leaf values from checkpoint
4. Stabilize to recompute all derived nodes
5. Resume input processing from checkpoint's input offsets
(* Step 1: Load checkpoint *)
let ckpt = Store.latest store ~worker_id:"w0"
  |> Option.value_exn in

(* Step 2: Rebuild graph (same code as normal startup) *)
let g = build_vwap_graph ~symbols ~now in

(* Step 3: Restore leaf values *)
let _ = Checkpoint.restore_graph ~graph:g ~checkpoint:ckpt in

(* Step 4: Stabilize *)
let _ = Graph.stabilize g in

(* Step 5: Seek input to checkpoint offset *)
let offset = List.Assoc.find_exn ckpt.input_offsets
  ~equal:String.equal "trades" in
Kafka_consumer.seek ~offset

After step 4, the graph is in exactly the same state it was in when the checkpoint was taken. Derived nodes are recomputed from the restored leaf values, producing identical results (by the determinism guarantee).

Checkpoint Interval

The default checkpoint interval is 10 seconds, configured in the coordinator:

checkpoint_interval_sec = 10

This interval was chosen based on benchmark B-05:

  • At 100K events/sec, a 10-second interval means ~1M events between checkpoints
  • Worst-case replay of 1M events at 250ns/stabilization = ~0.25 seconds
  • Well within the 30-second recovery target

The coordinator triggers checkpoints cluster-wide:

let checkpoint_due t ~now_ns =
  let elapsed = Int64.(-) now_ns t.last_checkpoint_trigger_ns in
  if Int64.compare elapsed t.checkpoint_interval_ns >= 0 then begin
    t.last_checkpoint_trigger_ns <- now_ns;
    true
  end else false

The Effectively-Once Boundary

The checkpoint forms the boundary of effectively-once processing:

  Events before checkpoint:  guaranteed processed (in checkpoint state)
  Events during checkpoint:  included or excluded (depends on timing)
  Events after checkpoint:   replayed on recovery (idempotent application)

  ──────────┬──────────────┬──────────────────────>
            │  checkpoint  │
            │  offset: 42  │
            │              │
  applied   │  boundary    │  replay (idempotent)

The combination of:

  1. Checkpointed state (leaf values at offset 42)
  2. Replay from offset 42 (at-least-once delivery)
  3. Idempotent delta application (sequence-number dedup)

yields effectively-once semantics. Each event’s effect appears exactly once in the output, even though some events may be processed more than once during replay.

Checkpoint Size

For a 2,000-symbol VWAP pipeline:

  • 2,000 leaf nodes with vwap_state values
  • Each vwap_state serializes to ~50 bytes (sexp)
  • Plus metadata overhead
  • Total: ~120 KB per checkpoint

At a 10-second interval, this is ~12 KB/sec of checkpoint I/O. S3 PUT latency for a 120 KB object is typically 10-50ms, well within budget.

Windowing & Watermarks

Ripple processes event-time streams using temporal windows. Windows group events by time range, and watermarks track completeness to determine when window results can be emitted. This page covers window assigners, watermark tracking, trigger policies, and late event handling.

Window Types

A window is a half-open time interval [start_ns, end_ns):

type t =
  { id : string
  ; start_ns : Int64.t   (* inclusive *)
  ; end_ns : Int64.t     (* exclusive *)
  }

Events are assigned to windows by their event time, not processing time. This is a settled design decision: all computation in Ripple uses event time. Processing time is for logging only.

Window Assigners

Tumbling Windows

Non-overlapping, fixed-size windows. Every event belongs to exactly one window.

  |──── 60s ────|──── 60s ────|──── 60s ────|
  |  window-0   |  window-60  |  window-120 |
  0            60            120           180  (seconds)
let config = Tumbling { size = Time_ns.Span.of_sec 60.0 }

Assignment is O(1): window_start = event_time - (event_time mod size).

Sliding Windows

Overlapping windows. An event may belong to multiple windows.

  |──── 60s ────────────────|
       |──── 60s ────────────────|
            |──── 60s ────────────────|
  0    15   30   45   60   75   90      (seconds, slide=15s)
let config = Sliding
  { size = Time_ns.Span.of_sec 60.0
  ; slide = Time_ns.Span.of_sec 15.0
  }

Assignment returns multiple windows. For size / slide = 4, each event belongs to 4 windows.

Session Windows

Dynamic, gap-based windows. A new window starts when no events arrive within the gap duration.

  |─ session ─|   gap   |── session ──|   gap   |─ session ─|
  e1 e2 e3          e4 e5 e6 e7             e8 e9
let config = Session { gap = Time_ns.Span.of_sec 30.0 }

Session windows are the most complex: each event initially creates a new window, and the window manager merges overlapping sessions.

Watermark Tracking

A watermark W(t) guarantees that all events with event time <= t have been processed by the emitting node. Watermarks drive window triggering and late event classification.

The Tracker

type tracker =
  { mutable sources : (string, Int64.t) Hashtbl.Poly.t
  ; mutable current_watermark_ns : Int64.t
  ; mutable advance_count : int
  }

The tracker maintains a per-source watermark and computes the overall watermark as the minimum across all sources:

  Source A watermark: 100
  Source B watermark: 50
  Source C watermark: 200
  ─────────────────────
  Overall watermark:  50    (= min of all sources)

The overall watermark can only advance when the slowest source advances. This is because we cannot guarantee completeness beyond the slowest source.

Monotonicity

Watermarks are monotonically non-decreasing. An attempt to advance a source’s watermark backward is silently ignored:

let advance tracker ~source ~timestamp_ns =
  let prev = Hashtbl.find tracker.sources source
    |> Option.value ~default:0L in
  if Int64.( > ) timestamp_ns prev then begin
    Hashtbl.set tracker.sources ~key:source ~data:timestamp_ns;
    (* Recompute overall minimum *)
    ...
  end
  else None  (* backward movement ignored *)

This ensures that watermarks never retract, which would invalidate window triggering decisions.

Multi-Source Example

Time   Source A   Source B   Overall   Window [0, 100) triggered?
──────────────────────────────────────────────────────────────────
t0     advance    -          -         No (B still at 0)
       to 100
t1     -          advance    50        No (min(100, 50) = 50)
                  to 50
t2     -          advance    100       Yes! (min(100, 100) = 100)
                  to 100
t3     advance    -          100       No change (min(200, 100))
       to 200
t4     -          advance    150       Yes! Window [100, 200) if exists
                  to 150

Trigger Policies

A trigger policy determines when to emit window results:

module Trigger = struct
  type t =
    | On_watermark           (* emit when watermark >= window.end_ *)
    | On_every_element       (* emit after every input event *)
    | On_interval of Time_ns.Span.t  (* emit periodically *)
    | On_count of int        (* emit every N elements *)
end

On_watermark (default)

The window fires when the watermark advances past window.end_ns:

let triggerable_windows tracker ~windows =
  List.filter windows ~f:(fun w ->
    Int64.( >= ) tracker.current_watermark_ns w.end_ns)

This is the primary trigger for Ripple. It fires once per window and is deterministic (event-time based).

On_every_element

Fires after every event. Useful for real-time dashboards where you want to see partial window results update live. Generates high output volume.

On_interval

Fires every N seconds of processing time. Combines the freshness of On_every_element with bounded output rate.

On_count

Fires every N input elements. Useful for micro-batching.

Late Event Classification

Events that arrive after the watermark has advanced past their event time are classified as late:

type classification =
  | On_time      (* event_time >= watermark *)
  | Mildly_late  (* event_time < watermark but within allowed_lateness *)
  | Very_late    (* event_time < watermark - allowed_lateness *)
  | Future       (* event_time > watermark + 60s tolerance *)

Allowed Lateness

The allowed_lateness parameter (default: 60 seconds) defines a grace period for late events:

  watermark at T=200
  allowed_lateness = 60

  Event at T=195: On_time (195 >= 200? No, but >= 200 - 60 = 140: Mildly_late)
  Event at T=130: Very_late (130 < 140)
  Event at T=280: Future (280 > 200 + 60 = 260)
  • On_time: processed normally.
  • Mildly_late: processed, but the window may need to re-fire (retraction + correction).
  • Very_late: dropped and counted in the ripple_window_very_late_events_total metric.
  • Future: likely a clock skew issue. Logged as a warning.

Retractions

When a mildly late event arrives and updates a window that has already fired, Ripple emits a retraction followed by a corrected result:

module Retraction = struct
  type 'a t =
    | Emit of 'a       (* new or corrected result *)
    | Retract of 'a    (* withdraw previous result *)
end

This ensures downstream consumers can correct their state.

Interaction with Checkpointing

Window state is implicitly checkpointed through the graph’s leaf values. Each window’s accumulated state lives in the incremental graph as a leaf or fold node. When the checkpoint captures leaf values, it captures window state.

On recovery:

  1. Leaf values are restored (including window accumulations)
  2. The watermark tracker is reset to the checkpoint’s watermark
  3. Replay resumes from the checkpoint’s input offsets
  4. Events already in the checkpointed window state are re-applied idempotently

Observability

Ripple provides built-in observability through four mechanisms: metrics (Prometheus-compatible), distributed tracing (W3C Trace Context), graph introspection (sexp and DOT export), and rule-based alerting. This page covers each subsystem.

Metrics

Metrics are pre-registered at module initialization time. There are three metric types:

Counter

Monotonically increasing integer. Never decreases (except on process restart).

module Counter = struct
  type t = { name : string; mutable value : int }
  let incr t = t.value <- t.value + 1
  let incr_by t n = t.value <- t.value + n
end

Gauge

Float value that can go up or down. Represents current state.

module Gauge = struct
  type t = { name : string; mutable value : float }
  let set t v = t.value <- v
end

Histogram

Tracks distribution of observed values using fixed buckets. O(1) observe operation.

module Histogram = struct
  type t =
    { name : string
    ; buckets : float array
    ; mutable counts : int array
    ; mutable sum : float
    ; mutable count : int
    }
end

Default buckets: [0.001; 0.005; 0.01; 0.05; 0.1; 0.5; 1.0; 5.0; 10.0; 50.0; 100.0]

Pre-Registered Metrics

MetricTypeDescription
ripple_graph_stabilization_nsHistogramStabilization cycle duration in nanoseconds
ripple_graph_recompute_countHistogramNodes recomputed per stabilization
ripple_graph_node_countGaugeTotal nodes in the graph
ripple_graph_dirty_countGaugeCurrently dirty nodes
ripple_graph_stabilizations_totalCounterTotal stabilization cycles
ripple_graph_cutoff_hits_totalCounterTimes cutoff prevented propagation
ripple_transport_deltas_sent_totalCounterDeltas sent to other workers
ripple_transport_deltas_received_totalCounterDeltas received from other workers
ripple_transport_delta_bytes_totalCounterTotal delta bytes transmitted
ripple_transport_rpc_latency_nsHistogramRPC round-trip latency
ripple_transport_backpressure_levelGaugeCurrent backpressure (0.0 to 1.0)
ripple_window_active_windowsGaugeOpen window count
ripple_window_late_events_totalCounterMildly late events received
ripple_window_very_late_events_totalCounterVery late events (dropped)
ripple_window_retractions_totalCounterWindow result retractions
ripple_system_event_lag_nsHistogramEvent time lag (event time vs wall clock)
ripple_system_heap_wordsGaugeOCaml heap size in words

Prometheus Export

All metrics are exported in Prometheus exposition format via the /metrics HTTP endpoint on port 9102:

# TYPE ripple_graph_stabilizations_total counter
ripple_graph_stabilizations_total 1042

# TYPE ripple_graph_node_count gauge
ripple_graph_node_count 4001.000000

# TYPE ripple_graph_stabilization_ns histogram
ripple_graph_stabilization_ns_count 1042
ripple_graph_stabilization_ns_sum 284532.000000

The export function is a simple traversal of all registered metrics:

let to_prometheus () =
  let buf = Buffer.create 1024 in
  add_counter Graph.stabilizations_total;
  add_gauge Graph.node_count;
  add_histogram Graph.stabilization_time_ns;
  (* ... all metrics ... *)
  Buffer.contents buf

W3C Distributed Tracing

Trace context follows the W3C Trace Context specification:

type context =
  { trace_id : string     (* 32 hex chars, 128-bit *)
  ; span_id : string      (* 16 hex chars, 64-bit *)
  ; parent_id : string option
  ; sampled : bool
  ; baggage : (string * string) list
  }

Trace Propagation

Trace context propagates across worker boundaries via delta messages. Each stabilization cycle is a span:

Worker-A                              Worker-B
  span: "stabilize"                     span: "stabilize"
  trace_id: abc123...                   trace_id: abc123...  (same)
  span_id: def456...                    span_id: 789ghi...
  parent_id: None                       parent_id: def456... (links to A)

W3C Traceparent Header

Format: {version}-{trace_id}-{span_id}-{flags}
Example: 00-17380abc6f63f3cdaf093916e327df84-88b8fa2222fd7ba5-01

Parsed and generated by:

let to_traceparent ctx =
  let flags = if ctx.sampled then "01" else "00" in
  sprintf "00-%s-%s-%s" ctx.trace_id ctx.span_id flags

Adaptive Sampling

At high throughput, tracing every event is prohibitively expensive. The adaptive sampler adjusts the sampling rate to hit a target traces-per-second:

module Sampler = struct
  type t =
    { target_per_sec : int
    ; mutable rate : float          (* 0.0 to 1.0 *)
    ; mutable traces_this_second : int
    ; mutable last_adjust_ns : Int64.t
    ; min_rate : float
    ; max_rate : float
    }
end

Every second, the sampler computes:

new_rate = old_rate * (target / actual_traces)
new_rate = clamp(new_rate, min_rate, max_rate)

Default: target_per_sec = 100, min_rate = 0.001, max_rate = 1.0.

Graph Introspection

The introspection module provides runtime snapshots of the computation graph:

type node_info =
  { id : int
  ; height : int
  ; is_dirty : bool
  ; is_output : bool
  ; kind : string         (* "leaf", "map", "fold", etc. *)
  ; dependent_ids : int list
  ; value_sexp : Sexp.t option
  }

type graph_stats =
  { total_nodes : int
  ; dirty_nodes : int
  ; output_nodes : int
  ; max_height : int
  ; last_recompute_count : int
  }

S-expression Export

let snap = Introspect.snapshot graph in
let sexp = Introspect.to_sexp snap in
print_s sexp

Useful for expect tests that verify graph structure:

((nodes
  ((id 0)(height 0)(is_dirty false)(kind leaf)(dependent_ids (2)))
  ((id 1)(height 0)(is_dirty false)(kind leaf)(dependent_ids (2)))
  ((id 2)(height 1)(is_dirty false)(kind map2)(dependent_ids ())))
 (stats (total_nodes 3)(dirty_nodes 0)(output_nodes 0)(max_height 1)))

DOT Export

let dot = Introspect.to_dot snap in
Out_channel.write_all "graph.dot" ~data:dot

Produces Graphviz DOT format:

digraph ripple {
  rankdir=TB;
  node [fontname="monospace" fontsize=10];

  n0 [label="leaf_0" shape=ellipse];
  n1 [label="leaf_1" shape=ellipse];
  n2 [label="map2_2" shape=box color=blue style=bold];

  n0 -> n2;
  n1 -> n2;

  // nodes=3 dirty=0 outputs=1 max_height=1
}

Render with: dot -Tpng graph.dot -o graph.png

Alert Rules

Built-in alerts fire on operational conditions:

ConditionDefault ThresholdSeverity
Stabilization slow> 10msWarning
Backpressure high> 80%Critical
Late events excessive> 1% of totalWarning
Checkpoint stale> 20s (2x interval)Critical
Worker unresponsive> 30s no heartbeatCritical
Schema mismatchanyCritical
Data loss detectedanyCritical

Alerts are evaluated by the Alert.Manager:

let mgr = Alert.Manager.create () in
Alert.Manager.on_alert mgr ~f:(fun alert ->
  Log.error_s [%message "ALERT" (alert : Alert.t)]);
Alert.Manager.check_stabilization mgr
  ~stabilization_ns:15_000_000L
  ~worker_id:"w0" ~now_ns

All alert evaluation is local to the worker. The coordinator aggregates alerts across the cluster via heartbeat responses.

Coordinator

The coordinator is a stateless cluster management service that handles partition assignment, worker registration, failure detection, and checkpoint scheduling. This page covers the consistent hash ring, partition assignment algorithm, heartbeat liveness detection, and rebalancing protocol.

Design Principles

The coordinator is intentionally stateless. All durable state lives in the checkpoint store (S3). The coordinator can be restarted at any time without data loss – it reconstructs its view of the cluster from worker heartbeats.

In Kubernetes, the coordinator runs as a Deployment with 2 replicas for high availability. Both replicas are active and can independently manage the cluster. Workers register with whichever replica responds first.

Consistent Hash Ring

Partition-to-worker assignment uses a consistent hash ring with virtual nodes:

module Hash_ring = struct
  type t =
    { mutable ring : (int * string) list
    ; virtual_nodes : int   (* default: 128 *)
    }

  let add_worker t ~worker_id =
    let new_nodes = List.init t.virtual_nodes ~f:(fun i ->
      let key = sprintf "%s-vnode-%d" worker_id i in
      (hash_key key, worker_id))
    in
    t.ring <- List.sort (t.ring @ new_nodes)
      ~compare:(fun (a, _) (b, _) -> Int.compare a b)

  let assign_partition t ~partition_id =
    let h = hash_key (sprintf "partition-%d" partition_id) in
    (* Find first ring entry >= h (clockwise lookup) *)
    match List.find t.ring ~f:(fun (hash, _) -> hash >= h) with
    | Some (_, worker_id) -> Some worker_id
    | None ->
      (* Wrap around to first entry *)
      match t.ring with
      | (_, worker_id) :: _ -> Some worker_id
      | [] -> None
end

Why Consistent Hashing

When a worker is added or removed, consistent hashing minimizes the number of partitions that need to be reassigned. With 128 virtual nodes per worker and 128 partitions:

EventPartitions reassignedWithout consistent hashing
Add 1 worker (3 -> 4)~32 (1/4 of total)~96 (3/4 of total)
Remove 1 worker (4 -> 3)~32~96

This minimizes the amount of state that needs to be migrated during rebalancing.

Virtual Nodes

Each physical worker maps to 128 virtual nodes on the ring. This ensures even distribution: without virtual nodes, a small number of physical workers could create hotspots where some workers receive far more partitions than others.

Ring (simplified, 8 virtual nodes per worker):

  0 ──── w0 ── w1 ── w0 ── w2 ── w1 ── w0 ── w2 ── w1 ──── MAX
         vn0   vn0   vn1   vn0   vn1   vn2   vn1   vn2

Worker Registration

Workers register with the coordinator on startup:

let register_worker t ~worker_id ~now_ns =
  Hashtbl.set t.workers ~key:worker_id ~data:
    { worker_id
    ; partitions = []
    ; last_heartbeat_ns = now_ns
    ; load = 0.0
    ; state = `Active
    };
  Hash_ring.add_worker t.hash_ring ~worker_id;
  t.assignment_epoch <- t.assignment_epoch + 1

Registration:

  1. Adds the worker to the worker registry
  2. Inserts 128 virtual nodes into the hash ring
  3. Increments the assignment epoch (triggers rebalance)

The worker then queries its partition assignment:

let get_assignment t ~worker_id =
  let partitions = List.filter_map
    (List.init t.total_partitions ~f:Fn.id)
    ~f:(fun pid ->
      match Hash_ring.assign_partition t.hash_ring ~partition_id:pid with
      | Some wid when String.equal wid worker_id -> Some pid
      | _ -> None)
  in
  { worker_id; partitions; epoch = t.assignment_epoch }

Heartbeat Liveness Detection

Workers send heartbeats every 5 seconds (configurable). The heartbeat carries:

type heartbeat =
  { worker_id : string
  ; timestamp_ns : Int64.t
  ; load : float       (* 0.0 to 1.0 *)
  }

The coordinator updates the worker’s last heartbeat timestamp and load metric:

let heartbeat t ~worker_id ~now_ns ~load =
  match Hashtbl.find t.workers worker_id with
  | None -> ()  (* Unknown worker -- ignore *)
  | Some status ->
    Hashtbl.set t.workers ~key:worker_id
      ~data:{ status with
              last_heartbeat_ns = now_ns
            ; load
            ; state = `Active }

Failure Detection

The coordinator periodically scans for workers whose last heartbeat exceeds the timeout (default: 30 seconds):

let detect_failures t ~now_ns =
  let dead_workers =
    Hashtbl.fold t.workers ~init:[] ~f:(fun ~key:wid ~data:status acc ->
      let elapsed = Int64.(-) now_ns status.last_heartbeat_ns in
      if Int64.compare elapsed t.heartbeat_timeout_ns > 0 then
        (wid, status) :: acc
      else acc)
  in
  List.iter dead_workers ~f:(fun (wid, status) ->
    Hashtbl.set t.workers ~key:wid
      ~data:{ status with state = `Dead });
  List.map dead_workers ~f:fst

Worker state transitions:

                  heartbeat received
  Active ──────────────────────────────> Active
    │
    │  timeout exceeded
    v
  Suspected ──── confirmed ───> Dead ───> Removed

Rebalancing on Failure

When a worker is detected as dead, the coordinator removes it and triggers rebalancing:

let remove_worker t ~worker_id =
  Hashtbl.remove t.workers worker_id;
  Hash_ring.remove_worker t.hash_ring ~worker_id;
  t.assignment_epoch <- t.assignment_epoch + 1

The remaining workers detect the epoch change and query for their updated assignments. The consistent hash ring ensures that only partitions previously assigned to the dead worker are redistributed.

Rebalancing Flow

t=0s:   3 workers, 12 partitions
        w0: [0,1,2,3]  w1: [4,5,6,7]  w2: [8,9,10,11]

t=30s:  w2 heartbeat timeout
        coordinator: detect_failures -> ["w2"]
        coordinator: remove_worker "w2"
        epoch: 2 -> 3

t=31s:  w0 polls assignment (epoch 3)
        w0: [0,1,2,3,8,9]    (gained 8,9 from w2)
        w1: [4,5,6,7,10,11]  (gained 10,11 from w2)

t=32s:  w0 loads checkpoint for partitions 8,9 from S3
        w0 replays input from checkpoint offsets
        w0 resumes processing partitions 8,9

Checkpoint Triggers

The coordinator tracks checkpoint timing and signals workers when a checkpoint is due:

let checkpoint_due t ~now_ns =
  let elapsed = Int64.(-) now_ns t.last_checkpoint_trigger_ns in
  if Int64.compare elapsed t.checkpoint_interval_ns >= 0 then begin
    t.last_checkpoint_trigger_ns <- now_ns;
    true
  end else false

Default interval: 10 seconds.

When checkpoint_due returns true, the coordinator sends Checkpoint_request messages to all active workers. Each worker:

  1. Drains its current batch
  2. Writes a checkpoint to the store
  3. Responds with Checkpoint_ack

The coordinator does not wait for all acks before proceeding. Checkpoints are per-worker, not cluster-wide barriers. This avoids the stop-the-world pause of Chandy-Lamport-style distributed snapshots.

Cluster Status

The coordinator exposes a cluster status summary:

let cluster_status t =
  let active = Hashtbl.count t.workers ~f:(fun s ->
    match s.state with `Active -> true | _ -> false) in
  (active, Hashtbl.length t.workers, t.total_partitions, t.assignment_epoch)

This is consumed by ripple-cli status and the coordinator’s /health endpoint.

Configuration

ParameterDefaultDescription
total_partitions128Number of data partitions
heartbeat_timeout_sec30Seconds before a worker is declared dead
checkpoint_interval_sec10Seconds between checkpoint triggers
virtual_nodes128Virtual nodes per worker on hash ring
max_keys_per_partition2,000Maximum symbols per partition (B-05 mitigation)

CLI Reference

ripple-cli is the command-line management tool for Ripple clusters. It provides cluster status, graph inspection, schema listing, checkpoint management, and debug utilities.

Installation

dune build
# Binary at: _build/default/bin/cli/main.exe
# Or via make:
make cli  # runs 'ripple-cli info'

Command Overview

ripple-cli <subcommand>

Subcommands:
  status       Show cluster status
  inspect      Inspect cluster state (graph, schemas, metrics)
  checkpoint   Manage checkpoints
  debug        Debug tools (tap delta streams)
  info         Show version info

info

Display version and build information.

$ ripple-cli info
Ripple v0.1.0-dev
OCaml 5.3.0
Build: incremental graph + heap-based stabilization
Benchmarks: B-01=250ns B-02=82ns B-05=2.1s B-06=129ns

status

Show the overall cluster health.

$ ripple-cli status --coordinator localhost --port 9200
Ripple Cluster Status
----------------------------------------------------
Coordinator:  localhost:9200
Format:       text
Status:       [connected]
Workers:      3/3 active
Partitions:   128 assigned
Epoch:        7

Flags

FlagDefaultDescription
--coordinatorlocalhostCoordinator hostname
--port9200Coordinator gRPC port
--formattextOutput format: text, json, or sexp

inspect graph

Inspect the computation graph of a worker. Outputs the graph structure in DOT format (default), sexp, or JSON.

$ ripple-cli inspect graph --worker w0 --format dot
digraph ripple {
  rankdir=TB;
  node [shape=box];
  leaf_0 [label="leaf: AAPL" shape=ellipse];
  leaf_1 [label="leaf: GOOG" shape=ellipse];
  map_0 [label="map: identity"];
  map_1 [label="map: identity"];
  fold_0 [label="fold: sum" shape=diamond];
  leaf_0 -> map_0;
  leaf_1 -> map_1;
  map_0 -> fold_0;
  map_1 -> fold_0;
}

Pipe to Graphviz for visualization:

ripple-cli inspect graph --worker w0 --format dot | dot -Tpng -o graph.png

Flags

FlagDefaultDescription
--workerallWorker ID to inspect
--formatsexpOutput format: sexp, json, or dot

inspect schemas

List all registered schemas in the cluster.

$ ripple-cli inspect schemas --verbose
Registered Schemas
----------------------------------------------------
  trade/v1  fp=0a1b2c3d4e5f6789  fields=5
    symbol        : String    (required)
    price         : Float     (required)
    size          : Int       (required)
    timestamp_ns  : Int64     (required)
    venue         : String    (required)

  vwap/v1   fp=9876543210fedcba  fields=4
    symbol        : String    (required)
    vwap          : Float     (required)
    total_volume  : Int       (required)
    trade_count   : Int       (required)

Flags

FlagDescription
--verboseShow field details for each schema

inspect metrics

Dump the current metrics in Prometheus exposition format.

$ ripple-cli inspect metrics
# TYPE ripple_graph_stabilizations_total counter
ripple_graph_stabilizations_total 0

# TYPE ripple_graph_cutoff_hits_total counter
ripple_graph_cutoff_hits_total 0

# TYPE ripple_graph_node_count gauge
ripple_graph_node_count 0.000000

# TYPE ripple_graph_stabilization_ns histogram
ripple_graph_stabilization_ns_count 0
ripple_graph_stabilization_ns_sum 0.000000
...

This is the same format served by the /metrics endpoint on port 9102. Useful for debugging when Prometheus scraping is not configured.

checkpoint list

List checkpoints for a specific worker.

$ ripple-cli checkpoint list --worker w0 --dir /var/lib/ripple/checkpoints
Checkpoints for worker w0 (dir=/var/lib/ripple/checkpoints)
  Latest: epoch=42 nodes=2000 offsets=1

Flags

FlagDefaultDescription
--worker(required)Worker ID
--dir/var/lib/ripple/checkpointsCheckpoint directory

debug tap

Tap the live delta stream from a worker’s output. Streams deltas to stdout as they occur, decoded as s-expressions for human readability.

$ ripple-cli debug tap --output vwap --worker w0
Tapping output: vwap
[Connected to w0:9101]

seq=1 key=AAPL delta=(Patch ((Field_set (field_name price) (value 150.25))))
seq=2 key=GOOG delta=(Set ((symbol GOOG)(vwap 2801.5)(total_volume 500)(trade_count 3)))
seq=3 key=AAPL delta=(Patch ((Field_set (field_name price) (value 150.30))))
...
Press Ctrl+C to stop

Flags

FlagDefaultDescription
--output(required)Output name to tap
--workeranySpecific worker to tap

Exit Codes

CodeMeaning
0Success
1Invalid arguments or unknown subcommand
2Connection failure (coordinator unreachable)
3Schema incompatibility detected

Shell Completion

For bash:

eval "$(ripple-cli complete bash)"

For zsh:

eval "$(ripple-cli complete zsh)"

The CLI is built with Jane Street’s Command module, which provides automatic help text, flag parsing, and subcommand grouping.

Worker Configuration

This page covers the Ripple worker process: its lifecycle state machine, health endpoints, configuration parameters, and operational behavior.

Worker Lifecycle

Workers progress through seven well-defined states. The state machine prevents invalid transitions at runtime.

                  ┌──────────────┐
                  │   Starting   │
                  │ (boot, load  │
                  │  checkpoint) │
                  └──┬───┬───┬───┘
                     │   │   │
              ok     │   │   │  checkpoint found
                     │   │   │
        ┌────────────┘   │   └────────────┐
        v                │                v
  ┌───────────┐          │         ┌─────────────┐
  │  Active   │<─────────┘         │ Recovering  │
  │(processing│          ok        │(replaying   │
  │ events)   │<───────────────────│ from ckpt)  │
  └──┬──┬──┬──┘                    └──────┬──────┘
     │  │  │                              │
     │  │  │  drain request               │ failure
     │  │  v                              v
     │  │  ┌──────────┐            ┌──────────┐
     │  │  │ Draining │            │  Failed  │
     │  │  │(finish   │            │(crashed) │
     │  │  │ batch,   │            └──────────┘
     │  │  │ write    │                  ^
     │  │  │ ckpt)    │                  │
     │  │  └──┬───┬───┘          failure │
     │  │     │   │                      │
     │  │     │   └──────────────────────┘
     │  │     v
     │  │  resume
     │  │  (back to Active)
     │  │
     │  │  stop request
     │  v
     │  ┌───────────┐
     │  │ Stopping  │
     │  │(graceful  │
     │  │ shutdown) │
     │  └─────┬─────┘
     │        │
     │        v
     │  ┌───────────┐
     │  │  Stopped  │
     │  │(clean     │
     │  │ shutdown) │
     │  └───────────┘
     │
     └──── failure ──> Failed

State Definitions

StateDescriptionDuration
StartingBooting, loading configuration, checking for checkpoint< 5s
ActiveProcessing events from input sourcesIndefinite
DrainingCompleting current batch, writing checkpoint< 10s
StoppingGraceful shutdown in progress< 30s
StoppedShut down cleanlyTerminal
FailedCrashed or unrecoverable errorUntil restart
RecoveringLoading checkpoint and replaying events< 30s (target)

Valid Transitions

let valid_transition ~from ~to_ =
  match from, to_ with
  | Starting, Active -> true       (* successful boot *)
  | Starting, Failed -> true       (* boot failure *)
  | Starting, Recovering -> true   (* checkpoint found, replay needed *)
  | Active, Draining -> true       (* checkpoint trigger or rebalance *)
  | Active, Failed -> true         (* crash during processing *)
  | Active, Stopping -> true       (* graceful shutdown *)
  | Draining, Active -> true       (* drain complete, resume *)
  | Draining, Failed -> true       (* crash during drain *)
  | Draining, Stopping -> true     (* shutdown during drain *)
  | Stopping, Stopped -> true      (* shutdown complete *)
  | Stopping, Failed -> true       (* crash during shutdown *)
  | Failed, Recovering -> true     (* restart initiated *)
  | Recovering, Active -> true     (* replay complete *)
  | Recovering, Failed -> true     (* replay failure *)
  | _ -> false                     (* all other transitions are invalid *)

Invalid transitions return Or_error.t with a diagnostic message:

let result = Worker.transition w ~to_:Stopped in
(* Error: "Invalid state transition" (worker_id w1) (from Starting) (to_ Stopped) *)

Health Endpoints

Each worker serves HTTP endpoints for Kubernetes probes and monitoring:

Port 9100: Health and Readiness

EndpointMethodProbe TypeResponse
/healthGETLiveness200 OK if process is running, 503 otherwise
/readyGETReadiness200 READY if state is Active, 503 NOT READY otherwise

The liveness probe tells Kubernetes “is the process alive?” – it returns 200 as long as the HTTP server is responsive, regardless of worker state. The readiness probe tells Kubernetes “should traffic be routed here?” – it returns 200 only when the worker is Active and processing events.

Port 9102: Metrics

EndpointMethodResponse
/metricsGETPrometheus exposition format

Returns all registered metrics (see Observability).

Port 9101: RPC

Async RPC port for delta exchange between workers. This is a binary protocol, not HTTP.

Configuration

Command-Line Flags

ripple-worker \
  --worker-id w0 \
  --partition-id p0 \
  --coordinator localhost:9200 \
  --rpc-port 9101 \
  --health-port 9100 \
  --metrics-port 9102
FlagDefaultDescription
--worker-idw0Unique worker identifier
--partition-idp0Partition assignment (may be overridden by coordinator)
--coordinatorlocalhost:9200Coordinator gRPC address
--rpc-port9101Async RPC port for delta exchange
--health-port9100HTTP port for health and readiness probes
--metrics-port9102HTTP port for Prometheus metrics

Environment Variables

In Kubernetes, configuration is typically injected via environment variables:

VariableDescription
RIPPLE_WORKER_IDWorker identifier (from StatefulSet pod name)
RIPPLE_COORDINATOR_HOSTCoordinator service hostname
RIPPLE_COORDINATOR_PORTCoordinator gRPC port
RIPPLE_CHECKPOINT_BUCKETS3 bucket for checkpoint storage
RIPPLE_KAFKA_BROKERSComma-separated Kafka broker addresses

S-expression Config File

For complex configuration, use an s-expression config file (/etc/ripple/ripple.sexp):

((cluster
  ((name prod)
   (coordinator
    ((host ripple-coordinator.ripple.svc.cluster.local)
     (port 9200)))
   (checkpoint_store
    ((backend S3)
     (bucket ripple-checkpoints)
     (prefix prod)))
   (defaults
    ((num_partitions 128)
     (max_keys_per_partition 2000)
     (checkpoint_interval_sec 10)
     (heartbeat_interval_sec 5)
     (failure_detection_timeout_sec 30)
     (allowed_lateness_sec 60)
     (backpressure
      ((warn_threshold 0.6)
       (slow_threshold 0.8)
       (pause_threshold 0.95))))))))

Heartbeat Loop

The worker sends heartbeats to the coordinator every 5 seconds:

let rec heartbeat_loop () =
  let%bind () = Clock.after (Time_float.Span.of_sec 5.0) in
  let gc = Gc.stat () in
  Metrics.Gauge.set Metrics.System.heap_words
    (Float.of_int gc.heap_words);
  Log.Global.debug_s [%message "heartbeat"
    (worker_id : string)
    ~events:(worker_state.events_processed : int)];
  heartbeat_loop ()

Each heartbeat also updates the GC heap size metric, providing continuous visibility into memory usage.

Graceful Shutdown

When the worker receives SIGTERM (Kubernetes pod termination):

  1. Transition to Stopping state
  2. Readiness probe starts returning 503 NOT READY
  3. Drain current batch (finish processing, do not start new batch)
  4. Write final checkpoint
  5. Close RPC connections
  6. Transition to Stopped
  7. Exit process

The Kubernetes terminationGracePeriodSeconds is set to 30 seconds to allow time for the drain and checkpoint write.

Docker & Kubernetes

This page covers the deployment infrastructure for Ripple: the Docker image, docker compose development stack, and Kubernetes production manifests.

Dockerfile

Ripple uses a multi-stage build for minimal production images:

# Stage 1: Build
FROM ocaml/opam:ubuntu-22.04-ocaml-5.3 AS builder

RUN sudo apt-get update && sudo apt-get install -y \
    librdkafka-dev \
    libssl-dev \
    pkg-config \
    && sudo rm -rf /var/lib/apt/lists/*

WORKDIR /home/opam/ripple
COPY --chown=opam:opam ripple.opam dune-project ./
RUN opam install . --deps-only --yes

COPY --chown=opam:opam . .
RUN eval $(opam env) && dune build bin/worker/main.exe

# Stage 2: Runtime
FROM ubuntu:22.04 AS runtime

RUN apt-get update && apt-get install -y \
    librdkafka1 \
    ca-certificates \
    && rm -rf /var/lib/apt/lists/*

COPY --from=builder \
  /home/opam/ripple/_build/default/bin/worker/main.exe \
  /usr/local/bin/ripple-worker

EXPOSE 9100  # Health
EXPOSE 9101  # RPC
EXPOSE 9102  # Metrics

ENTRYPOINT ["/usr/local/bin/ripple-worker"]

The opam install . --deps-only layer is cached separately from the source copy, so dependency changes rebuild the dependency layer but source-only changes skip it.

Build the image:

docker build -f infra/docker/Dockerfile.worker -t ripple/worker:latest .

Docker Compose (Development)

The development stack runs Redpanda (Kafka-compatible) and MinIO (S3-compatible) alongside Ripple workers:

services:
  # Redpanda: Kafka-compatible broker, no ZooKeeper
  redpanda:
    image: docker.redpanda.com/redpandadata/redpanda:v24.1.1
    command:
      - redpanda start
      - --smp 1
      - --memory 512M
      - --overprovisioned
      - --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092
      - --advertise-kafka-addr internal://redpanda:9092,external://localhost:19092
    ports:
      - "19092:19092"   # Kafka API
      - "18082:18082"   # HTTP Proxy
    healthcheck:
      test: ["CMD", "rpk", "cluster", "health"]
      interval: 5s

  # MinIO: S3-compatible checkpoint storage
  minio:
    image: minio/minio:latest
    command: server /data --console-address ":9001"
    environment:
      MINIO_ROOT_USER: ripple
      MINIO_ROOT_PASSWORD: ripplepass
    ports:
      - "9000:9000"     # S3 API
      - "9001:9001"     # Web console
    volumes:
      - minio-data:/data

  # Init job: create Kafka topics
  create-infra:
    image: docker.redpanda.com/redpandadata/redpanda:v24.1.1
    depends_on:
      redpanda: { condition: service_healthy }
    entrypoint: >
      bash -c "
        rpk topic create trades --brokers redpanda:9092 --partitions 8 &&
        rpk topic create vwap-output --brokers redpanda:9092 --partitions 8
      "

  # Init job: create S3 bucket
  create-bucket:
    image: minio/mc:latest
    depends_on:
      minio: { condition: service_healthy }
    entrypoint: >
      bash -c "
        mc alias set local http://minio:9000 ripple ripplepass &&
        mc mb local/ripple-checkpoints --ignore-existing
      "

volumes:
  minio-data:

Usage

cd infra/compose

# Start infrastructure
docker compose up -d

# Wait for health checks
docker compose ps

# Run integration test
./run-integration-test.sh

# Teardown
docker compose down -v

Accessing Services

ServiceURLPurpose
Kafka APIlocalhost:19092Produce/consume trades
MinIO Consolehttp://localhost:9001Browse checkpoint bucket
MinIO S3 APIhttp://localhost:9000S3-compatible endpoint

Kubernetes (Production)

Namespace

apiVersion: v1
kind: Namespace
metadata:
  name: ripple
  labels:
    app: ripple

ConfigMap

apiVersion: v1
kind: ConfigMap
metadata:
  name: ripple-config
  namespace: ripple
data:
  checkpoint_bucket: "s3://ripple-checkpoints/prod"
  kafka_brokers: "kafka-0.kafka.svc:9092,kafka-1.kafka.svc:9092"
  ripple.sexp: |
    ((cluster
      ((name prod)
       (defaults
        ((num_partitions 128)
         (max_keys_per_partition 2000)
         (checkpoint_interval_sec 10)
         (heartbeat_interval_sec 5)
         (failure_detection_timeout_sec 30))))))

Worker StatefulSet

Workers use a StatefulSet (not Deployment) because they need:

  • Stable network identity for partition assignment
  • Stable storage for local checkpoint cache
  • Ordered, graceful scaling
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: ripple-worker
  namespace: ripple
spec:
  serviceName: ripple-worker
  replicas: 10
  podManagementPolicy: Parallel
  template:
    metadata:
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "9102"
    spec:
      terminationGracePeriodSeconds: 30
      containers:
        - name: worker
          image: ripple/worker:latest
          ports:
            - { name: health, containerPort: 9100 }
            - { name: rpc,    containerPort: 9101 }
            - { name: metrics, containerPort: 9102 }
          env:
            - name: RIPPLE_WORKER_ID
              valueFrom:
                fieldRef:
                  fieldPath: metadata.name
          resources:
            requests: { cpu: "1", memory: "512Mi" }
            limits:   { cpu: "2", memory: "1Gi" }
          livenessProbe:
            httpGet: { path: /health, port: health }
            initialDelaySeconds: 10
            periodSeconds: 10
          readinessProbe:
            httpGet: { path: /ready, port: health }
            initialDelaySeconds: 5
            periodSeconds: 5
          volumeMounts:
            - name: checkpoint-cache
              mountPath: /var/lib/ripple/checkpoints
  volumeClaimTemplates:
    - metadata:
        name: checkpoint-cache
      spec:
        accessModes: ["ReadWriteOnce"]
        resources:
          requests:
            storage: 10Gi

Key configuration:

  • podManagementPolicy: Parallel – workers start simultaneously, no sequential ordering needed
  • terminationGracePeriodSeconds: 30 – allows time for drain + checkpoint on shutdown
  • Worker ID is derived from the pod name (ripple-worker-0, ripple-worker-1, etc.)
  • Local checkpoint cache on PVC for fast recovery without S3 round-trip

Coordinator Deployment

The coordinator is stateless, so it uses a Deployment (not StatefulSet):

apiVersion: apps/v1
kind: Deployment
metadata:
  name: ripple-coordinator
  namespace: ripple
spec:
  replicas: 2   # HA -- active/standby
  template:
    spec:
      containers:
        - name: coordinator
          image: ripple/coordinator:latest
          ports:
            - { name: grpc,    containerPort: 9200 }
            - { name: health,  containerPort: 9201 }
            - { name: metrics, containerPort: 9202 }
          resources:
            requests: { cpu: "500m", memory: "256Mi" }
            limits:   { cpu: "1",    memory: "512Mi" }

Headless Service

apiVersion: v1
kind: Service
metadata:
  name: ripple-worker
  namespace: ripple
spec:
  clusterIP: None   # Headless for StatefulSet DNS
  selector:
    app: ripple
    component: worker
  ports:
    - { name: rpc, port: 9101 }
    - { name: metrics, port: 9102 }

The headless service gives each worker a stable DNS name: ripple-worker-0.ripple-worker.ripple.svc.cluster.local.

Scaling

Horizontal Scaling

# Scale workers
kubectl scale statefulset ripple-worker --replicas=20 -n ripple

The coordinator detects new workers via heartbeat registration and rebalances partitions automatically via the consistent hash ring.

Resource Guidelines

ComponentCPU RequestMemory RequestRationale
Worker1 core512 MiGraph engine is CPU-bound, 500KB working set
Coordinator500m256 MiLightweight, mostly heartbeat tracking

Workers are CPU-bound (stabilization loop). Memory usage is predictable: ~200 bytes/node * 4,001 nodes = ~800 KB for the graph, plus input buffers and GC overhead.

Monitoring

This page covers how to monitor a Ripple cluster: Prometheus scraping configuration, the full list of pre-registered metrics, recommended alert thresholds, and structured logging.

Prometheus Scraping

Every Ripple worker and coordinator exposes metrics on an HTTP endpoint:

ComponentPortPathContent-Type
Worker9102/metricstext/plain; version=0.0.4
Coordinator9202/metricstext/plain; version=0.0.4

Kubernetes Auto-Discovery

Workers and coordinators have Prometheus annotations in their pod templates:

metadata:
  annotations:
    prometheus.io/scrape: "true"
    prometheus.io/port: "9102"
    prometheus.io/path: "/metrics"

With Prometheus Kubernetes service discovery, no additional scrape configuration is needed. Prometheus will automatically discover and scrape all Ripple pods.

Manual Scrape Configuration

If not using Kubernetes service discovery:

scrape_configs:
  - job_name: 'ripple-workers'
    static_configs:
      - targets:
        - 'worker-0:9102'
        - 'worker-1:9102'
        - 'worker-2:9102'
    scrape_interval: 5s

  - job_name: 'ripple-coordinator'
    static_configs:
      - targets:
        - 'coordinator-0:9202'
    scrape_interval: 10s

Pre-Registered Metrics

Graph Engine Metrics

MetricTypeLabelsDescription
ripple_graph_stabilizations_totalCounterworkerTotal stabilization cycles executed
ripple_graph_cutoff_hits_totalCounterworkerTimes cutoff prevented unnecessary propagation
ripple_graph_node_countGaugeworkerTotal nodes in the computation graph
ripple_graph_dirty_countGaugeworkerNodes currently marked dirty
ripple_graph_stabilization_nsHistogramworkerStabilization cycle duration (nanoseconds)
ripple_graph_recompute_countHistogramworkerNodes recomputed per stabilization

Key metric: ripple_graph_stabilization_ns. The p99 of this histogram should stay below 10,000 ns (10 us). If it exceeds this, the worker is processing too many symbols per partition.

Transport Metrics

MetricTypeLabelsDescription
ripple_transport_deltas_sent_totalCounterworker, outputDeltas sent to downstream workers
ripple_transport_deltas_received_totalCounterworker, sourceDeltas received from upstream workers
ripple_transport_delta_bytes_totalCounterworkerTotal bytes transmitted as deltas
ripple_transport_rpc_latency_nsHistogramworkerRPC round-trip latency (nanoseconds)
ripple_transport_backpressure_levelGaugeworkerCurrent backpressure (0.0 to 1.0)

Key metric: ripple_transport_backpressure_level. Values above 0.8 trigger a Critical alert. Sustained values above 0.95 cause the worker to pause input consumption.

Window Metrics

MetricTypeLabelsDescription
ripple_window_active_windowsGaugeworkerNumber of open (not yet triggered) windows
ripple_window_late_events_totalCounterworkerMildly late events (within allowed lateness)
ripple_window_very_late_events_totalCounterworkerVery late events (dropped)
ripple_window_retractions_totalCounterworkerWindow result retractions issued

Key metric: ripple_window_very_late_events_total. A non-zero rate indicates either clock skew in the data source or insufficient allowed lateness.

System Metrics

MetricTypeLabelsDescription
ripple_system_event_lag_nsHistogramworkerEvent time lag (now - event_time)
ripple_system_heap_wordsGaugeworkerOCaml GC heap size in words

Key metric: ripple_system_heap_words. If this grows continuously, there is a memory leak. A healthy worker should show < 0.1% heap growth per checkpoint interval.

Alert Thresholds

groups:
  - name: ripple
    rules:
      - alert: StabilizationSlow
        expr: histogram_quantile(0.99, ripple_graph_stabilization_ns) > 10000000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Stabilization p99 > 10ms on {{ $labels.worker }}"

      - alert: BackpressureHigh
        expr: ripple_transport_backpressure_level > 0.8
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Backpressure at {{ $value | humanizePercentage }} on {{ $labels.worker }}"

      - alert: LateEventsExcessive
        expr: >
          rate(ripple_window_very_late_events_total[5m])
          / rate(ripple_graph_stabilizations_total[5m]) > 0.01
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Very late events exceed 1% on {{ $labels.worker }}"

      - alert: HeapGrowth
        expr: >
          deriv(ripple_system_heap_words[10m]) > 10000
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Continuous heap growth on {{ $labels.worker }}"

      - alert: WorkerDown
        expr: up{job="ripple-workers"} == 0
        for: 30s
        labels:
          severity: critical
        annotations:
          summary: "Worker {{ $labels.instance }} is down"

      - alert: NoStabilizations
        expr: >
          rate(ripple_graph_stabilizations_total[5m]) == 0
          and up{job="ripple-workers"} == 1
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Worker {{ $labels.worker }} has not stabilized in 5 minutes"

Threshold Summary

ConditionThresholdSeverityAction
Stabilization p99 > 10ms> 10_000_000 nsWarningCheck partition size, consider splitting
Backpressure > 80%> 0.8CriticalScale out workers or increase parallelism
Backpressure > 95%> 0.95CriticalWorker pauses input – immediate action needed
Very late events > 1%> 0.01 ratioWarningIncrease allowed_lateness_sec or fix source clocks
Checkpoint age > 20s> 2x intervalCriticalCheck checkpoint store (S3) availability
Worker heartbeat > 30s> 30sCriticalWorker is dead, coordinator will rebalance
Heap growth > 0.1%/intervalderiv > 10000WarningPossible memory leak, investigate with GC stats

Structured Logging

All Ripple log entries are machine-parseable s-expressions:

type entry =
  { level : level          (* Debug | Info | Warn | Error *)
  ; timestamp_ns : Int64.t
  ; worker_id : string option
  ; component : string     (* "graph", "transport", "checkpoint", etc. *)
  ; message : string
  ; context : (string * string) list
  }
[@@deriving sexp]

Example log output:

((level Info)(timestamp_ns 1709000000000000000)(worker_id (w0))(component graph)(message stabilized)(context ((nodes 4001)(recomputed 3)(dirty 0))))

Log Levels

LevelUsage
DebugPer-stabilization diagnostics, heartbeat details
InfoWorker lifecycle transitions, checkpoint writes
WarnLate events, slow stabilization, clock skew
ErrorSchema mismatch, checkpoint failure, crash recovery

Parsing Logs

Because logs are s-expressions, they can be parsed with any sexp library or with standard Unix tools:

# Extract all Error-level entries
grep '(level Error)' ripple.log | sexp query '(field message)'

# Count stabilizations per worker
grep '(message stabilized)' ripple.log | grep -o '(worker_id [^)]*' | sort | uniq -c

In production, ship logs to a centralized system (ELK, Loki, Datadog) and parse the sexp structure for indexing and alerting.

Benchmarks

Ripple’s performance claims are backed by a benchmark suite that runs as part of the build process. No claim without measurement. This page covers the benchmark results, how to run them, the pre-commit gate, and the regression policy.

Benchmark Results

IDBenchmarkTargetMeasuredStatus
B-01Incremental stabilization (1K symbols, single change)<= 10 us~250 nsPASS
B-01Incremental stabilization (10K symbols, single change)<= 3 us~250 nsPASS
B-02Trade sexp roundtrip>= 500K/sec~12M/secPASS
B-02Trade bin_prot roundtrip>= 500K/sec~12M/secPASS
B-03Delta diff (1 field changed on Trade)measure~200 nsPASS
B-05Replay 100K events (2K symbols)extrapolate <= 30s for 6M~2.1sPASS
B-05Replay 100K events (10K symbols)extrapolate <= 30s for 6MvariesCONDITIONAL
B-06Schema compatibility check<= 10 us~130 nsPASS
B-06Schema fingerprintmeasure~100 nsPASS

Reading the Results

  • Target: the performance requirement from the architecture design.
  • Measured: actual measurement on the test hardware.
  • PASS: measured value meets or exceeds the target.
  • CONDITIONAL: meets target under some conditions but not all. Requires mitigation (e.g., limiting symbols per worker to 2,000).
  • FAIL: does not meet target. Blocks any RFC that depends on this benchmark.

How to Run

Full Benchmark Suite

# Using make
make bench

# Direct invocation
dune exec bench/run_benchmarks.exe

This runs all benchmarks via Jane Street’s core_bench framework, which performs multiple iterations with warm-up to produce statistically meaningful results.

Individual Benchmarks

core_bench supports filtering by benchmark name:

# Run only B-01 benchmarks
dune exec bench/run_benchmarks.exe -- -name 'B-01'

# Run only B-06 benchmarks
dune exec bench/run_benchmarks.exe -- -name 'B-06'

# Show detailed statistics
dune exec bench/run_benchmarks.exe -- -quota 10 -ci-absolute

Benchmark Configuration

FlagDefaultDescription
-quota10Seconds per benchmark (more = more stable results)
-ci-absoluteoffShow confidence intervals
-nameallFilter by benchmark name regex
-saveoffSave results to file for comparison

Benchmark Descriptions

B-01: Incremental Stabilization Throughput

Measures the time to stabilize a VWAP graph after changing a single leaf. This is the most important benchmark – it validates that incremental computation is O(R), not O(N).

(* Setup: build graph with N symbol leaves + map + incr_fold *)
let _, leaves, _ = Test_graph.build_vwap_graph ~n_symbols:1000 in
let leaf = leaves.(500) in

(* Benchmark: change one leaf, stabilize *)
Bench.Test.create ~name:"B-01: stabilize 1K symbols" (fun () ->
  Graph.set_leaf leaf (next_value ());
  let _ = Graph.stabilize leaf.graph in
  ())

At 1K symbols, stabilization touches 3 nodes (leaf + map + incr_fold), taking ~250 ns regardless of graph size. This confirms O(R) behavior.

B-02: Serialization Throughput

Measures sexp and bin_prot roundtrip speed for the Trade type. Both must exceed 500K roundtrips/sec.

(* bin_prot roundtrip *)
Bench.Test.create ~name:"B-02: Trade bin_prot roundtrip" (fun () ->
  let _pos = Trade.bin_write_t buf ~pos:0 trade in
  let _trade = Trade.bin_read_t buf ~pos_ref:(ref 0) in
  ())

bin_prot achieves ~12M roundtrips/sec – 24x above the target. This confirms that serialization is not a bottleneck.

B-03: Delta Diff

Measures the cost of computing a delta between two Trade values differing in one field. Validates that field-level diffing is practical at high throughput.

B-05: Replay Recovery

Simulates crash recovery: replay 100K events through a stabilize loop. Extrapolates to 6M events to check the 30-second recovery target.

At 2K symbols (the mitigated limit), 100K events complete in ~2.1 seconds. Extrapolating to 6M: 2.1s * 60 = ~126s. This exceeds the 30-second target for 6M events, but with the 10-second checkpoint interval, worst-case replay is ~1M events = ~21 seconds, which passes.

At 10K symbols, performance degrades due to larger graph traversal. This is why the architecture mandates <= 2,000 symbols per worker.

B-06: Schema Validation

Measures backward compatibility checking between two schema versions. Must complete in < 10 us.

At ~130 ns, schema validation is 77x faster than the target.

Pre-Commit Hook

The pre-commit hook runs dune runtest which executes all inline expect tests. This includes tests that verify benchmark-relevant properties (e.g., selective recomputation, cutoff behavior, idempotent stabilize).

# Install the hook
make install-hooks

# Hook runs automatically on git commit:
# 1. dune runtest (all expect tests)
# 2. If any test fails, commit is rejected

The hook does not run the full benchmark suite (that would be too slow for every commit). Full benchmarks are run in CI and before releases.

Regression Gate

A benchmark moving from PASS to FAIL is a blocker for any RFC that depends on that benchmark:

Status TransitionAction
UNTESTED -> PASSFirst measurement, record baseline
UNTESTED -> FAILDesign issue, must be addressed before RFC proceeds
PASS -> PASSNormal, no action
PASS -> REGRESSEDInvestigate regression. If confirmed, blocks dependent RFCs
FAIL -> MITIGATEDArchitectural change addresses root cause (e.g., B-05 mitigation: limit to 2K symbols)
CONDITIONAL -> PASSAdditional validation confirms full compliance

Detecting Regressions

Compare current results against the baseline:

# Save baseline
dune exec bench/run_benchmarks.exe -- -save baseline.bench

# After changes, compare
dune exec bench/run_benchmarks.exe -- -save current.bench
# Manual comparison of results

Regressions of more than 2x from the baseline require investigation. Common causes:

  • Accidental allocation on the hot path
  • Hash table lookup replacing array index
  • Additional indirection in recompute function

Why It’s Fast

Ripple achieves sub-microsecond stabilization through three key optimizations: heap-based dirty propagation, incremental fold, and cutoff. This page explains each optimization, the memory layout that supports them, and the zero-allocation discipline on the hot path.

The Three Key Optimizations

1. Heap-Based Dirty Propagation

Problem: When a leaf changes, which nodes need recomputation?

Naive approach (dirty flags): Scan all N nodes, check if dirty, recompute if so. This is O(N) per stabilization regardless of how many nodes actually changed.

Ripple’s approach (min-heap): Push dirty nodes onto a min-heap keyed by (height, node_id). Pop nodes in order and recompute. This is O(R log R) where R is the number of dirty nodes.

Benchmark B-05 proved this matters:
  Linear scan at 10K nodes: ~27 us per stabilization
  Heap-based at 10K nodes:  ~250 ns per stabilization
  Speedup: 100x

The heap gives O(R log R), but in the common case R is very small (1 leaf + 2-3 dependents), so the practical complexity is O(1) with a small constant.

2. Incremental Fold (Incr_fold)

Problem: A portfolio VWAP aggregates 2,000 symbol prices. When one price changes, how do we update the sum?

Naive fold: Re-fold all 2,000 prices: sum = fold (+) 0 [p0; p1; ...; p1999]. This is O(N) per stabilization.

Incr_fold: Track which parents changed. For each changed parent, remove the old value and add the new value:

acc = old_sum
acc = acc - old_price_AAPL     (remove)
acc = acc + new_price_AAPL     (add)
return acc

This is O(changed_parents), which is O(1) for the single-trade case.

Performance comparison (2,000 symbols, 1 change):
  Fold:      O(2000) = ~5 us
  Incr_fold: O(1)    = ~50 ns
  Speedup:   100x

The Incr_fold node maintains a snapshot of all parent values from the previous cycle. During dirty propagation, when a parent of an Incr_fold is about to be marked dirty, the parent’s index is recorded. During recomputation, only those recorded indices are processed.

3. Cutoff Optimization

Problem: A node recomputes but produces the same value as before. Should its dependents recompute?

Without cutoff: Yes – every recomputation propagates to all dependents, creating phantom updates.

With cutoff: No – if the new value equals the old value (by the user-provided equality function), propagation stops.

[leaf: raw_price=250]
       |
       v
[map: clamp(100)]  recomputes: min(250, 100) = 100
       |            old value was 100
       |            cutoff: 100 == 100, STOP
       v
[map: format]       NOT recomputed (saved ~100ns + string allocation)

Cutoff is especially effective for:

  • Threshold/clamp functions
  • Boolean predicates (changes in input that don’t flip the predicate)
  • Rounding functions
  • Any function with a small output domain

In the VWAP pipeline, cutoff prevents propagation when a trade arrives but the VWAP (weighted average) does not change meaningfully due to rounding.

Memory Layout

Node Array

Nodes are stored in a contiguous array, not a linked structure:

graph.nodes:
  ┌──────┬──────┬──────┬──────┬──────┬──────┐
  │ n[0] │ n[1] │ n[2] │ n[3] │ n[4] │ ...  │
  └──────┴──────┴──────┴──────┴──────┴──────┘
  ^                                          ^
  sequential memory                          grows by 2x

Array access is O(1) by node ID: graph.nodes.(node_id). No hash table lookup, no pointer chase.

Dirty Heap

The heap is also array-based:

dirty_heap.entries:
  ┌─────────┬─────────┬─────────┬─────────┐
  │ (0, n3) │ (1, n7) │ (1, n9) │ unused  │
  └─────────┴─────────┴─────────┴─────────┘
  index 1    index 2    index 3    (1-indexed)

dirty_heap.in_heap:
  ┌───┬───┬───┬───┬───┬───┬───┬───┬───┬───┐
  │ F │ F │ F │ T │ F │ F │ F │ T │ F │ T │
  └───┴───┴───┴───┴───┴───┴───┴───┴───┴───┘
  n0   n1   n2   n3   n4   n5   n6   n7   n8   n9

in_heap provides O(1) membership check by node_id. This prevents duplicate insertion and is essential for the deduplication during dirty propagation.

Cache Friendliness

With 2,000 nodes at ~200 bytes each:

  • Node array: 400 KB (fits in L2 cache on most CPUs)
  • Dirty heap: ~80 KB (fits in L1 cache)
  • Total working set: < 500 KB

During stabilization, the access pattern is:

  1. Pop from heap (sequential access to heap array)
  2. Read node from node array (indexed access, likely cached)
  3. Read parent values from node array (indexed access)
  4. Write new value to node array
  5. Push dependents to heap

Steps 2-4 access a small number of nodes (typically 3-5) that are likely in the same cache line or adjacent lines. There is no pointer chasing through a linked data structure.

Zero Allocation on the Hot Path

The stabilization loop allocates nothing during the common case:

OperationAllocation?Why
Dirty_heap.pop_minNoReturns struct from pre-allocated array
Dirty_heap.pushNoWrites to pre-allocated array slot
Node recompute (Map)Depends on fUser function may allocate
Cutoff checkNoCalls user equality function
Set dependent dirtyNoSets boolean flag + heap push

The heap is pre-allocated to capacity. The node array is pre-allocated and grows only at construction time. The in_heap array is pre-allocated.

Allocation can occur in user-provided functions (f in Map, add/remove in Incr_fold). For numeric operations (float arithmetic), OCaml represents them as unboxed doubles – no allocation. For string operations or record construction, allocation occurs but is amortized by the GC.

Measuring Hot Path Allocations

Use OCaml’s Gc.stat() to verify zero allocation:

let before = Gc.stat () in
let _ = Graph.stabilize g in
let after = Gc.stat () in
let allocs = after.minor_words -. before.minor_words in
printf "words allocated: %.0f\n" allocs;
(* For numeric-only stabilization: 0 *)

Performance Summary

OperationTimeComplexity
set_leaf~10 nsO(1)
stabilize (1 change, numeric)~250 nsO(R log R), R typically 3
watch~5 nsO(1) array index
Incr_fold update~50 nsO(changed_parents)
Cutoff check (float equality)~2 nsO(1)
Dirty heap push~20 nsO(log N)
Dirty heap pop~20 nsO(log N)

End-to-end for a single trade through the VWAP pipeline:

  • set_leaf: 10 ns
  • stabilize (leaf + map + incr_fold): 250 ns
  • watch: 5 ns
  • Total: ~265 ns per trade

At 265 ns/trade, the theoretical maximum throughput is ~3.8 million trades/sec on a single core.

Memory Model

This page documents Ripple’s memory usage characteristics: per-node memory, the arena-style layout, GC behavior, heap growth bounds, and the node lifecycle.

Node Memory Model

Each node in the incremental graph occupies approximately 200 bytes:

Field                    Size      Notes
─────────────────────────────────────────────────────────
id : int                 8 bytes   Node identifier
height : int             8 bytes   Topological height
value : Obj.t            8 bytes   Pointer to heap-allocated value
is_dirty : bool          8 bytes   1 byte + 7 padding (word-aligned)
is_output : bool         8 bytes   1 byte + 7 padding
old_value : Obj.t        8 bytes   Previous value for delta generation
recompute : recompute_fn 40-80 B   Variant with closure(s)
cutoff_fn : closure      8 bytes   Pointer to equality function
dependents : int list    16+ B     Cons cells (typically 1-3 elements)
sexp_of_value : option   8 bytes   None or Some(closure)
value_of_sexp : option   8 bytes   None or Some(closure)
─────────────────────────────────────────────────────────
Total (typical)          ~200 bytes

The exact size varies by recompute variant:

  • Leaf: smallest (~160 bytes) – no parent references
  • Map: ~180 bytes – one parent ID + closure
  • Map2: ~200 bytes – two parent IDs + closure
  • Fold: ~200 bytes – parent array + closure + init value
  • Incr_fold: ~280 bytes – parent array + two closures + parent_values snapshot + parent_index hashtable + changed indices list

Arena-Style Layout

Nodes are stored in a contiguous array that grows by 2x when full:

Initial allocation: 1024 nodes
  ┌─────┬─────┬─────┬─────┬─────┬──────────────────────┐
  │ n0  │ n1  │ n2  │ ... │ n200│ <unused capacity>     │
  └─────┴─────┴─────┴─────┴─────┴──────────────────────┘
  ^                              ^                       ^
  0                            node_count=201         len=1024

After growth (triggered when node_count reaches capacity):
  ┌─────┬─────┬─────┬──────────────────────────────────┐
  │ n0  │ n1  │ ... │ <unused capacity>                 │
  └─────┴─────┴─────┴──────────────────────────────────┘
  ^                                                     ^
  0                                                   len=2048

Growth is done via Array.blit – the old array’s contents are copied to the new array. This happens only during graph construction, never during stabilization.

Memory Usage by Graph Size

SymbolsTotal NodesNode ArrayDirty HeapTotal
10030160 KB10 KB~70 KB
5001,501300 KB48 KB~350 KB
1,0003,001600 KB96 KB~700 KB
2,0006,0011.2 MB192 KB~1.4 MB
10,00030,0016 MB960 KB~7 MB

For the recommended maximum of 2,000 symbols per worker, the graph engine’s total memory footprint is approximately 1.4 MB. This easily fits in L2 cache on modern CPUs.

GC Behavior

Minor Heap Pressure

The stabilization loop’s hot path is designed to avoid minor heap allocation:

  • Heap push/pop operates on pre-allocated arrays
  • Node field updates are mutable in-place mutations
  • Numeric recompute functions (float arithmetic) use unboxed doubles

Allocation occurs only in:

  1. User-provided functions that create new values (e.g., record construction)
  2. output_change list construction (one per changed output per stabilization)
  3. dependents list cons when adding edges (graph construction only)

Major Heap Stability

A healthy Ripple worker exhibits stable major heap behavior:

Heap words over time (typical):

  Words
  1.5M ┤                                    ── stable plateau
       │  ┌──────────────────────────────────
  1.0M ┤  │
       │  │  graph construction
  0.5M ┤  │
       │──┘
  0.0M ┤
       └──────────────────────────────────────
       0s   5s   10s   15s   20s   25s   30s

The heap grows rapidly during graph construction (first 1-2 seconds), then stabilizes. During steady-state processing, the major heap should not grow significantly.

0.1% Heap Growth Bound

The architecture requires that major heap growth during steady-state processing is less than 0.1% per checkpoint interval (10 seconds). This is monitored by the ripple_system_heap_words gauge.

If heap growth exceeds this bound, it indicates one of:

  • A memory leak in a user-provided recompute function
  • Unbounded accumulation in a fold node
  • Growing dependents lists (should not happen after construction)

Monitoring GC

(* In the heartbeat loop *)
let gc = Gc.stat () in
Metrics.Gauge.set Metrics.System.heap_words
  (Float.of_int gc.heap_words);

Key Gc.stat fields to watch:

FieldHealthy RangeAlarm
heap_wordsStable after constructionContinuous growth
minor_collectionsProportional to event rate>> event rate
major_collections< 1 per checkpoint interval> 10 per interval
compactions0 in steady stateAny compaction

Node Lifecycle

Nodes in Ripple follow a four-phase lifecycle:

 Created ──> Active ──> Stale ──> Collected
   │           │          │          │
   │ add_node  │ normal   │ remove   │ GC reclaims
   │           │ ops      │ from     │
   │           │          │ graph    │

Created

A node is created by add_node (called internally by add_leaf, map, fold, etc.). The node is placed in the next available slot in the node array. Height is computed from parent heights.

Active

The node participates in stabilization. Its value is recomputed when its inputs change. It may be pushed onto the dirty heap and popped during stabilization.

Stale

When a node is no longer needed (e.g., a window is garbage-collected), it is marked stale. In the current implementation, stale nodes are not removed from the array – they remain in place but are not referenced by any active computation path.

This is a deliberate trade-off: removing nodes from the middle of the array would require reindexing all dependent references, which is expensive. Instead, stale nodes are left in place and their slots are effectively leaked until the graph is reconstructed (which happens on recovery from checkpoint).

Collected

Stale nodes are reclaimed when:

  1. The worker shuts down (all memory freed)
  2. The graph is reconstructed from a checkpoint (new array, stale nodes not restored)

Memory Implications

For long-running workers without crashes, stale node accumulation is bounded by:

  • Window GC: each expired window generates ~3 stale nodes (leaf + map + fold contribution)
  • With 60-second tumbling windows: ~3 nodes become stale per window per symbol every 60 seconds
  • At 2,000 symbols: ~6,000 stale nodes per minute = ~1.2 MB/minute

The checkpoint interval (10 seconds) limits how long stale nodes accumulate. On checkpoint restore, only active nodes are reconstructed.

For deployments where stale accumulation is a concern, the recommended mitigation is periodic checkpoint-and-restart: take a checkpoint, restart the worker, and restore from the checkpoint. This compacts the graph and eliminates all stale nodes.

Value Storage

Node values are stored as Obj.t (type-erased) internally, with typed accessors at the API boundary:

(* Internal storage: untyped *)
node.value <- Obj.repr (my_float_value : float)

(* API boundary: typed *)
let watch (type a) g (incr : a incr) : a =
  let node = g.nodes.(incr.incr_node_id) in
  (Obj.obj node.value : a)

This allows heterogeneous node values in a single array without boxing each value in a variant. The type safety is maintained by the 'a var and 'a incr phantom types – you cannot read a float node as an int because the type parameter prevents it at compile time.

Float values are stored as unboxed doubles by the OCaml runtime, meaning Obj.repr 42.0 does not allocate – it produces a tagged pointer directly. Integer values smaller than max_int / 2 are similarly unboxed.