Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

The Graph Engine

The graph engine is the most performance-critical component in Ripple. It manages the incremental computation graph, tracks dirty nodes, and executes the stabilization algorithm. This page covers the three key mechanisms: the dirty heap, height-ordered propagation, and cutoff optimization.

Architecture

  ┌──────────────────────────────────────────────────────┐
  │                    Graph                             │
  │                                                      │
  │  ┌──────────────────────┐  ┌──────────────────────┐  │
  │  │   nodes : node array │  │  dirty_heap : Heap.t │  │
  │  │   (contiguous memory)│  │  (min-heap by height)│  │
  │  └──────────────────────┘  └──────────────────────┘  │
  │                                                      │
  │  node_count : int                                    │
  │  last_recompute_count : int                          │
  │  last_stabilization_ns : Int63.t                     │
  │  now : unit -> Time_ns.t                             │
  │  output_infos : (int, output_info) Hashtbl.t         │
  └──────────────────────────────────────────────────────┘

Nodes are stored in a flat array for cache locality. The graph grows the array by 2x when capacity is exceeded. All node access during stabilization is by integer index – no pointer chasing, no hash lookups on the hot path.

Node Types

Each node has a recompute_fn variant that determines how its value is derived:

VariantParentsComplexityUse Case
Leaf0O(1)External input
Map1O(1)Transform single value
Map22O(1)Combine two values
FoldNO(N)Aggregate (re-folds all)
Incr_foldNO(changed)Incremental aggregate

The Dirty Heap

The dirty heap is an array-based binary min-heap keyed on (height, node_id). It determines the order in which dirty nodes are processed during stabilization.

Heap ordering: (height, node_id) — lexicographic
  - Lower height first (process dependencies before dependents)
  - Lower node_id as tiebreaker (deterministic ordering)

Implementation properties:

  • Array-based: no pointer indirection, cache-friendly iteration
  • Pre-allocated: capacity grows only at graph construction time, never during stabilization
  • O(1) membership check: in_heap boolean array indexed by node_id
  • O(log n) push/pop: standard heap sift-up/sift-down
  • Duplicate rejection: push checks in_heap before inserting
type t =
  { mutable entries : entry array
  ; mutable size : int
  ; mutable in_heap : bool array  (* O(1) membership by node_id *)
  ; mutable capacity : int
  }

let push t ~height ~node_id =
  if not t.in_heap.(node_id) then begin
    t.size <- t.size + 1;
    t.entries.(t.size) <- { height; node_id };
    t.in_heap.(node_id) <- true;
    sift_up t t.size
  end

This design was chosen over a dirty-flag scan (O(N)) after benchmark B-05 proved that linear scanning is untenable at 10K+ nodes. The heap gives O(R log R) where R is the number of dirty nodes.

Stabilization Algorithm

The stabilization loop is the hot path. Here is the algorithm in pseudocode:

stabilize(graph):
    recomputed = 0
    changed_outputs = []
    t_start = now()

    while heap is not empty:
        entry = heap.pop_min()
        node = nodes[entry.node_id]

        if not node.is_dirty:
            continue    // already processed via earlier propagation

        node.is_dirty = false
        old_value = node.value (or node.old_value for leaves)

        // Recompute based on node type
        new_value = match node.recompute:
            | Leaf        -> node.value  (already set by set_leaf)
            | Map(p, f)   -> f(nodes[p].value)
            | Map2(a,b,f) -> f(nodes[a].value, nodes[b].value)
            | Fold(ps,f,init) -> fold f init [nodes[p].value for p in ps]
            | Incr_fold   -> ... (see below)

        // Cutoff check: did the value actually change?
        if not cutoff_fn(old_value, new_value):
            node.value = new_value
            recomputed++

            // Propagate dirty to dependents
            for dep_id in node.dependents:
                dep = nodes[dep_id]
                if not dep.is_dirty:
                    dep.is_dirty = true
                    heap.push(dep.height, dep.id)

            // Record output changes
            if node.is_output:
                changed_outputs.add(node, old_value, new_value)

    t_end = now()
    graph.last_recompute_count = recomputed
    return changed_outputs

Invariants Maintained

The stabilization algorithm preserves four critical invariants:

IDInvariantMeaning
I-1node.height > height of all dependenciesTopological ordering is valid
I-2dirty_heap contains only dirty nodesNo spurious recomputation
I-3After stabilize, no node is dirtyFull convergence
I-5Node value consistent with recompute functionOutput correctness

Cutoff Optimization

Cutoff is the mechanism that stops propagation when a node’s recomputed value equals its previous value. This is critical for preventing phantom updates.

type 'a t =
  | Phys_equal    (* Default: physical equality *)
  | Equal of ('a -> 'a -> bool)  (* Custom equality *)
  | Always        (* Always propagate *)
  | Never         (* Never propagate — constant node *)

Example where cutoff prevents unnecessary work:

[leaf: raw_price]  value: 200.0
      |
      v
[map: clamp]       f: min(x, 100.0)  value: 100.0
      |
      v
[map: format]      f: sprintf "%.2f" x  value: "100.00"

If raw_price changes from 200.0 to 250.0, clamp recomputes to min(250, 100) = 100.0. Since 100.0 equals the old value, cutoff fires and format is never recomputed. This saves an allocation (string formatting) on the hot path.

Incr_fold: O(1) Incremental Aggregation

Incr_fold is the key innovation for large fan-in aggregation. Instead of re-folding all parents on every stabilization, it tracks which parents changed and applies only incremental updates:

stabilize Incr_fold node:
    acc = old_value
    for each changed_parent_index:
        pid = parents[index]
        old_parent_value = parent_values[index]    // snapshot from last cycle
        new_parent_value = nodes[pid].value
        acc = remove(acc, old_parent_value)
        acc = add(acc, new_parent_value)
        parent_values[index] = new_parent_value    // update snapshot
    changed_parent_indices = []                     // reset for next cycle
    return acc

During dirty propagation, when a parent of an Incr_fold node changes, the parent’s index is recorded in changed_parent_indices. This way, the fold only processes changed parents.

For the VWAP use case with 2,000 symbols:

  • Fold: recomputes sum across all 2,000 prices = O(2000)
  • Incr_fold: removes old price, adds new price = O(1)

Memory Layout

Nodes are stored in a contiguous array. Each node is approximately 200 bytes:

node (approx 200 bytes):
  id              : int          (8 bytes)
  height          : int          (8 bytes)
  value           : Obj.t        (8 bytes, pointer to heap-allocated value)
  is_dirty        : bool         (1 byte, padded to 8)
  is_output       : bool         (1 byte, padded to 8)
  old_value       : Obj.t        (8 bytes)
  recompute       : recompute_fn (varies, ~40-80 bytes)
  cutoff_fn       : closure      (8 bytes)
  dependents      : int list     (16+ bytes)
  sexp_of_value   : option       (8 bytes)
  value_of_sexp   : option       (8 bytes)

At 2,000 nodes (the max per worker), the node array occupies ~400 KB – well within L2 cache on modern CPUs. The dirty heap adds another ~80 KB. Total working set for the graph engine: under 500 KB.