The Graph Engine
The graph engine is the most performance-critical component in Ripple. It manages the incremental computation graph, tracks dirty nodes, and executes the stabilization algorithm. This page covers the three key mechanisms: the dirty heap, height-ordered propagation, and cutoff optimization.
Architecture
┌──────────────────────────────────────────────────────┐
│ Graph │
│ │
│ ┌──────────────────────┐ ┌──────────────────────┐ │
│ │ nodes : node array │ │ dirty_heap : Heap.t │ │
│ │ (contiguous memory)│ │ (min-heap by height)│ │
│ └──────────────────────┘ └──────────────────────┘ │
│ │
│ node_count : int │
│ last_recompute_count : int │
│ last_stabilization_ns : Int63.t │
│ now : unit -> Time_ns.t │
│ output_infos : (int, output_info) Hashtbl.t │
└──────────────────────────────────────────────────────┘
Nodes are stored in a flat array for cache locality. The graph grows the array by 2x when capacity is exceeded. All node access during stabilization is by integer index – no pointer chasing, no hash lookups on the hot path.
Node Types
Each node has a recompute_fn variant that determines how its value is derived:
| Variant | Parents | Complexity | Use Case |
|---|---|---|---|
Leaf | 0 | O(1) | External input |
Map | 1 | O(1) | Transform single value |
Map2 | 2 | O(1) | Combine two values |
Fold | N | O(N) | Aggregate (re-folds all) |
Incr_fold | N | O(changed) | Incremental aggregate |
The Dirty Heap
The dirty heap is an array-based binary min-heap keyed on (height, node_id). It determines the order in which dirty nodes are processed during stabilization.
Heap ordering: (height, node_id) — lexicographic
- Lower height first (process dependencies before dependents)
- Lower node_id as tiebreaker (deterministic ordering)
Implementation properties:
- Array-based: no pointer indirection, cache-friendly iteration
- Pre-allocated: capacity grows only at graph construction time, never during stabilization
- O(1) membership check:
in_heapboolean array indexed bynode_id - O(log n) push/pop: standard heap sift-up/sift-down
- Duplicate rejection:
pushchecksin_heapbefore inserting
type t =
{ mutable entries : entry array
; mutable size : int
; mutable in_heap : bool array (* O(1) membership by node_id *)
; mutable capacity : int
}
let push t ~height ~node_id =
if not t.in_heap.(node_id) then begin
t.size <- t.size + 1;
t.entries.(t.size) <- { height; node_id };
t.in_heap.(node_id) <- true;
sift_up t t.size
end
This design was chosen over a dirty-flag scan (O(N)) after benchmark B-05 proved that linear scanning is untenable at 10K+ nodes. The heap gives O(R log R) where R is the number of dirty nodes.
Stabilization Algorithm
The stabilization loop is the hot path. Here is the algorithm in pseudocode:
stabilize(graph):
recomputed = 0
changed_outputs = []
t_start = now()
while heap is not empty:
entry = heap.pop_min()
node = nodes[entry.node_id]
if not node.is_dirty:
continue // already processed via earlier propagation
node.is_dirty = false
old_value = node.value (or node.old_value for leaves)
// Recompute based on node type
new_value = match node.recompute:
| Leaf -> node.value (already set by set_leaf)
| Map(p, f) -> f(nodes[p].value)
| Map2(a,b,f) -> f(nodes[a].value, nodes[b].value)
| Fold(ps,f,init) -> fold f init [nodes[p].value for p in ps]
| Incr_fold -> ... (see below)
// Cutoff check: did the value actually change?
if not cutoff_fn(old_value, new_value):
node.value = new_value
recomputed++
// Propagate dirty to dependents
for dep_id in node.dependents:
dep = nodes[dep_id]
if not dep.is_dirty:
dep.is_dirty = true
heap.push(dep.height, dep.id)
// Record output changes
if node.is_output:
changed_outputs.add(node, old_value, new_value)
t_end = now()
graph.last_recompute_count = recomputed
return changed_outputs
Invariants Maintained
The stabilization algorithm preserves four critical invariants:
| ID | Invariant | Meaning |
|---|---|---|
| I-1 | node.height > height of all dependencies | Topological ordering is valid |
| I-2 | dirty_heap contains only dirty nodes | No spurious recomputation |
| I-3 | After stabilize, no node is dirty | Full convergence |
| I-5 | Node value consistent with recompute function | Output correctness |
Cutoff Optimization
Cutoff is the mechanism that stops propagation when a node’s recomputed value equals its previous value. This is critical for preventing phantom updates.
type 'a t =
| Phys_equal (* Default: physical equality *)
| Equal of ('a -> 'a -> bool) (* Custom equality *)
| Always (* Always propagate *)
| Never (* Never propagate — constant node *)
Example where cutoff prevents unnecessary work:
[leaf: raw_price] value: 200.0
|
v
[map: clamp] f: min(x, 100.0) value: 100.0
|
v
[map: format] f: sprintf "%.2f" x value: "100.00"
If raw_price changes from 200.0 to 250.0, clamp recomputes to min(250, 100) = 100.0. Since 100.0 equals the old value, cutoff fires and format is never recomputed. This saves an allocation (string formatting) on the hot path.
Incr_fold: O(1) Incremental Aggregation
Incr_fold is the key innovation for large fan-in aggregation. Instead of re-folding all parents on every stabilization, it tracks which parents changed and applies only incremental updates:
stabilize Incr_fold node:
acc = old_value
for each changed_parent_index:
pid = parents[index]
old_parent_value = parent_values[index] // snapshot from last cycle
new_parent_value = nodes[pid].value
acc = remove(acc, old_parent_value)
acc = add(acc, new_parent_value)
parent_values[index] = new_parent_value // update snapshot
changed_parent_indices = [] // reset for next cycle
return acc
During dirty propagation, when a parent of an Incr_fold node changes, the parent’s index is recorded in changed_parent_indices. This way, the fold only processes changed parents.
For the VWAP use case with 2,000 symbols:
- Fold: recomputes sum across all 2,000 prices = O(2000)
- Incr_fold: removes old price, adds new price = O(1)
Memory Layout
Nodes are stored in a contiguous array. Each node is approximately 200 bytes:
node (approx 200 bytes):
id : int (8 bytes)
height : int (8 bytes)
value : Obj.t (8 bytes, pointer to heap-allocated value)
is_dirty : bool (1 byte, padded to 8)
is_output : bool (1 byte, padded to 8)
old_value : Obj.t (8 bytes)
recompute : recompute_fn (varies, ~40-80 bytes)
cutoff_fn : closure (8 bytes)
dependents : int list (16+ bytes)
sexp_of_value : option (8 bytes)
value_of_sexp : option (8 bytes)
At 2,000 nodes (the max per worker), the node array occupies ~400 KB – well within L2 cache on modern CPUs. The dirty heap adds another ~80 KB. Total working set for the graph engine: under 500 KB.