Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Your First Pipeline

This page walks through building a minimal incremental computation graph in OCaml. By the end, you will understand leaves, map nodes, fold nodes, stabilization, and output observation.

Core Concepts in 60 Seconds

Ripple’s computation model is a directed acyclic graph (DAG) of nodes:

  • Leaf nodes hold input values that you set externally.
  • Compute nodes (map, map2, fold, incr_fold) derive values from their parents.
  • Stabilization propagates changes from dirty leaves through the graph, recomputing only what changed.
    [leaf: price]     [leaf: size]
         |                 |
         v                 v
    [map: validate]   [map: validate]
              \          /
               v        v
           [map2: compute_vwap]
                   |
                   v
              [output: vwap]

Creating a Graph

Every graph requires a now function for timestamping stabilization cycles. In tests, use Effect_intf.Test.now for deterministic time. In production, use Time_ns.now.

open! Core

let () =
  (* Create graph with deterministic time for reproducibility *)
  let g = Ripple_graph.Graph.create
    ~now:Ripple_kernel.Effect_intf.Test.now
  in
  ignore g

Adding Leaf Nodes

Leaves are the entry points for data. Each leaf requires an initial value and an equality function for cutoff optimization (see Cutoff Optimization).

(* A leaf holding a float value *)
let price = Ripple_graph.Graph.add_leaf g
  ~initial:100.0
  ~equal:Float.equal
in

(* A leaf holding an int value *)
let volume = Ripple_graph.Graph.add_leaf g
  ~initial:0
  ~equal:Int.equal
in

add_leaf returns an 'a var – a typed handle for setting the value later. The type parameter 'a ensures you cannot set a float leaf with an int.

Deriving Compute Nodes

map: Transform a Single Parent

(* Convert var to incr for use as a dependency *)
let price_incr = Ripple_graph.Graph.var_to_incr price in

(* Double the price *)
let doubled = Ripple_graph.Graph.map g price_incr
  ~f:(fun p -> p *. 2.0)
  ~equal:Float.equal
in

map creates a node at height parent.height + 1, adds a dependency edge, and computes an initial value immediately.

map2: Combine Two Parents

let volume_incr = Ripple_graph.Graph.var_to_incr volume in

let notional = Ripple_graph.Graph.map2 g price_incr volume_incr
  ~f:(fun p v -> p *. Float.of_int v)
  ~equal:Float.equal
in

fold_array: Aggregate Many Parents

(* Sum across multiple price leaves *)
let leaves = Array.init 5 ~f:(fun i ->
  Ripple_graph.Graph.add_leaf g
    ~initial:(Float.of_int (i + 1))
    ~equal:Float.equal)
in
let incrs = Array.map leaves
  ~f:Ripple_graph.Graph.var_to_incr
in
let total = Ripple_graph.Graph.fold_array g incrs
  ~init:0.0
  ~f:(fun acc v -> acc +. v)
  ~equal:Float.equal
in

Important: fold_array is O(N) per stabilization – it re-folds all parents every time any parent changes. For large parent sets, use incr_fold_array instead.

incr_fold_array: O(1) Incremental Aggregation

let total_incr = Ripple_graph.Graph.incr_fold_array g incrs
  ~init:0.0
  ~add:(fun acc v -> acc +. v)
  ~remove:(fun acc v -> acc -. v)
  ~equal:Float.equal
in

incr_fold_array tracks which parents changed and applies only remove(old_value) + add(new_value) for each changed parent. This is the key primitive for VWAP: when one symbol’s price changes, the portfolio sum updates in O(1), not O(N).

Stabilization

Nothing happens until you call stabilize. This is the core operation – it processes all dirty nodes in topological order (min-heap sorted by height) and returns a list of changed outputs.

(* Set a leaf value *)
Ripple_graph.Graph.set_leaf price 150.0;

(* Propagate changes through the graph *)
let _changed_outputs = Ripple_graph.Graph.stabilize g in

(* Read the current value of any node *)
let current_doubled = Ripple_graph.Graph.watch g doubled in
printf "doubled = %.1f\n" current_doubled;
(* Output: doubled = 300.0 *)

Key properties of stabilize:

  • Idempotent: calling stabilize when no nodes are dirty does zero work.
  • Deterministic: same inputs always produce same outputs (provided now is deterministic).
  • Selective: only recomputes nodes whose inputs actually changed.

Watching Output Values

watch reads the current value of any 'a incr node:

let value : float = Ripple_graph.Graph.watch g notional in

This is a direct array lookup – O(1), no allocation.

Complete Example

open! Core

let () =
  Ripple_kernel.Effect_intf.Test.set_time Time_ns.epoch;
  let g = Ripple_graph.Graph.create
    ~now:Ripple_kernel.Effect_intf.Test.now
  in
  (* Build graph: price -> doubled, volume -> notional *)
  let price = Ripple_graph.Graph.add_leaf g
    ~initial:100.0 ~equal:Float.equal in
  let volume = Ripple_graph.Graph.add_leaf g
    ~initial:1000 ~equal:Int.equal in
  let p = Ripple_graph.Graph.var_to_incr price in
  let v = Ripple_graph.Graph.var_to_incr volume in
  let doubled = Ripple_graph.Graph.map g p
    ~f:(fun x -> x *. 2.0) ~equal:Float.equal in
  let notional = Ripple_graph.Graph.map2 g p v
    ~f:(fun p v -> p *. Float.of_int v) ~equal:Float.equal in
  (* Initial stabilize *)
  let _ = Ripple_graph.Graph.stabilize g in
  printf "doubled=%.1f notional=%.1f\n"
    (Ripple_graph.Graph.watch g doubled)
    (Ripple_graph.Graph.watch g notional);
  (* Update price only — volume's branch is untouched *)
  Ripple_graph.Graph.set_leaf price 150.0;
  let _ = Ripple_graph.Graph.stabilize g in
  printf "doubled=%.1f notional=%.1f recomputed=%d\n"
    (Ripple_graph.Graph.watch g doubled)
    (Ripple_graph.Graph.watch g notional)
    (Ripple_graph.Graph.last_recompute_count g)

Expected output:

doubled=200.0 notional=100000.0
doubled=300.0 notional=150000.0 recomputed=3

The second stabilization recomputes 3 nodes: the price leaf (changed), doubled (depends on price), and notional (depends on price). The volume leaf and any nodes that depend only on volume are untouched.

What’s Next