Your First Pipeline
This page walks through building a minimal incremental computation graph in OCaml. By the end, you will understand leaves, map nodes, fold nodes, stabilization, and output observation.
Core Concepts in 60 Seconds
Ripple’s computation model is a directed acyclic graph (DAG) of nodes:
- Leaf nodes hold input values that you set externally.
- Compute nodes (map, map2, fold, incr_fold) derive values from their parents.
- Stabilization propagates changes from dirty leaves through the graph, recomputing only what changed.
[leaf: price] [leaf: size]
| |
v v
[map: validate] [map: validate]
\ /
v v
[map2: compute_vwap]
|
v
[output: vwap]
Creating a Graph
Every graph requires a now function for timestamping stabilization cycles. In tests, use Effect_intf.Test.now for deterministic time. In production, use Time_ns.now.
open! Core
let () =
(* Create graph with deterministic time for reproducibility *)
let g = Ripple_graph.Graph.create
~now:Ripple_kernel.Effect_intf.Test.now
in
ignore g
Adding Leaf Nodes
Leaves are the entry points for data. Each leaf requires an initial value and an equality function for cutoff optimization (see Cutoff Optimization).
(* A leaf holding a float value *)
let price = Ripple_graph.Graph.add_leaf g
~initial:100.0
~equal:Float.equal
in
(* A leaf holding an int value *)
let volume = Ripple_graph.Graph.add_leaf g
~initial:0
~equal:Int.equal
in
add_leaf returns an 'a var – a typed handle for setting the value later. The type parameter 'a ensures you cannot set a float leaf with an int.
Deriving Compute Nodes
map: Transform a Single Parent
(* Convert var to incr for use as a dependency *)
let price_incr = Ripple_graph.Graph.var_to_incr price in
(* Double the price *)
let doubled = Ripple_graph.Graph.map g price_incr
~f:(fun p -> p *. 2.0)
~equal:Float.equal
in
map creates a node at height parent.height + 1, adds a dependency edge, and computes an initial value immediately.
map2: Combine Two Parents
let volume_incr = Ripple_graph.Graph.var_to_incr volume in
let notional = Ripple_graph.Graph.map2 g price_incr volume_incr
~f:(fun p v -> p *. Float.of_int v)
~equal:Float.equal
in
fold_array: Aggregate Many Parents
(* Sum across multiple price leaves *)
let leaves = Array.init 5 ~f:(fun i ->
Ripple_graph.Graph.add_leaf g
~initial:(Float.of_int (i + 1))
~equal:Float.equal)
in
let incrs = Array.map leaves
~f:Ripple_graph.Graph.var_to_incr
in
let total = Ripple_graph.Graph.fold_array g incrs
~init:0.0
~f:(fun acc v -> acc +. v)
~equal:Float.equal
in
Important: fold_array is O(N) per stabilization – it re-folds all parents every time any parent changes. For large parent sets, use incr_fold_array instead.
incr_fold_array: O(1) Incremental Aggregation
let total_incr = Ripple_graph.Graph.incr_fold_array g incrs
~init:0.0
~add:(fun acc v -> acc +. v)
~remove:(fun acc v -> acc -. v)
~equal:Float.equal
in
incr_fold_array tracks which parents changed and applies only remove(old_value) + add(new_value) for each changed parent. This is the key primitive for VWAP: when one symbol’s price changes, the portfolio sum updates in O(1), not O(N).
Stabilization
Nothing happens until you call stabilize. This is the core operation – it processes all dirty nodes in topological order (min-heap sorted by height) and returns a list of changed outputs.
(* Set a leaf value *)
Ripple_graph.Graph.set_leaf price 150.0;
(* Propagate changes through the graph *)
let _changed_outputs = Ripple_graph.Graph.stabilize g in
(* Read the current value of any node *)
let current_doubled = Ripple_graph.Graph.watch g doubled in
printf "doubled = %.1f\n" current_doubled;
(* Output: doubled = 300.0 *)
Key properties of stabilize:
- Idempotent: calling
stabilizewhen no nodes are dirty does zero work. - Deterministic: same inputs always produce same outputs (provided
nowis deterministic). - Selective: only recomputes nodes whose inputs actually changed.
Watching Output Values
watch reads the current value of any 'a incr node:
let value : float = Ripple_graph.Graph.watch g notional in
This is a direct array lookup – O(1), no allocation.
Complete Example
open! Core
let () =
Ripple_kernel.Effect_intf.Test.set_time Time_ns.epoch;
let g = Ripple_graph.Graph.create
~now:Ripple_kernel.Effect_intf.Test.now
in
(* Build graph: price -> doubled, volume -> notional *)
let price = Ripple_graph.Graph.add_leaf g
~initial:100.0 ~equal:Float.equal in
let volume = Ripple_graph.Graph.add_leaf g
~initial:1000 ~equal:Int.equal in
let p = Ripple_graph.Graph.var_to_incr price in
let v = Ripple_graph.Graph.var_to_incr volume in
let doubled = Ripple_graph.Graph.map g p
~f:(fun x -> x *. 2.0) ~equal:Float.equal in
let notional = Ripple_graph.Graph.map2 g p v
~f:(fun p v -> p *. Float.of_int v) ~equal:Float.equal in
(* Initial stabilize *)
let _ = Ripple_graph.Graph.stabilize g in
printf "doubled=%.1f notional=%.1f\n"
(Ripple_graph.Graph.watch g doubled)
(Ripple_graph.Graph.watch g notional);
(* Update price only — volume's branch is untouched *)
Ripple_graph.Graph.set_leaf price 150.0;
let _ = Ripple_graph.Graph.stabilize g in
printf "doubled=%.1f notional=%.1f recomputed=%d\n"
(Ripple_graph.Graph.watch g doubled)
(Ripple_graph.Graph.watch g notional)
(Ripple_graph.Graph.last_recompute_count g)
Expected output:
doubled=200.0 notional=100000.0
doubled=300.0 notional=150000.0 recomputed=3
The second stabilization recomputes 3 nodes: the price leaf (changed), doubled (depends on price), and notional (depends on price). The volume leaf and any nodes that depend only on volume are untouched.
What’s Next
- VWAP Demo – see a full pipeline processing 100K events
- Incremental Computation – understand why this approach is fundamentally faster
- The Graph Engine – how the dirty heap and cutoff optimization work