Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Windowing & Watermarks

Ripple processes event-time streams using temporal windows. Windows group events by time range, and watermarks track completeness to determine when window results can be emitted. This page covers window assigners, watermark tracking, trigger policies, and late event handling.

Window Types

A window is a half-open time interval [start_ns, end_ns):

type t =
  { id : string
  ; start_ns : Int64.t   (* inclusive *)
  ; end_ns : Int64.t     (* exclusive *)
  }

Events are assigned to windows by their event time, not processing time. This is a settled design decision: all computation in Ripple uses event time. Processing time is for logging only.

Window Assigners

Tumbling Windows

Non-overlapping, fixed-size windows. Every event belongs to exactly one window.

  |──── 60s ────|──── 60s ────|──── 60s ────|
  |  window-0   |  window-60  |  window-120 |
  0            60            120           180  (seconds)
let config = Tumbling { size = Time_ns.Span.of_sec 60.0 }

Assignment is O(1): window_start = event_time - (event_time mod size).

Sliding Windows

Overlapping windows. An event may belong to multiple windows.

  |──── 60s ────────────────|
       |──── 60s ────────────────|
            |──── 60s ────────────────|
  0    15   30   45   60   75   90      (seconds, slide=15s)
let config = Sliding
  { size = Time_ns.Span.of_sec 60.0
  ; slide = Time_ns.Span.of_sec 15.0
  }

Assignment returns multiple windows. For size / slide = 4, each event belongs to 4 windows.

Session Windows

Dynamic, gap-based windows. A new window starts when no events arrive within the gap duration.

  |─ session ─|   gap   |── session ──|   gap   |─ session ─|
  e1 e2 e3          e4 e5 e6 e7             e8 e9
let config = Session { gap = Time_ns.Span.of_sec 30.0 }

Session windows are the most complex: each event initially creates a new window, and the window manager merges overlapping sessions.

Watermark Tracking

A watermark W(t) guarantees that all events with event time <= t have been processed by the emitting node. Watermarks drive window triggering and late event classification.

The Tracker

type tracker =
  { mutable sources : (string, Int64.t) Hashtbl.Poly.t
  ; mutable current_watermark_ns : Int64.t
  ; mutable advance_count : int
  }

The tracker maintains a per-source watermark and computes the overall watermark as the minimum across all sources:

  Source A watermark: 100
  Source B watermark: 50
  Source C watermark: 200
  ─────────────────────
  Overall watermark:  50    (= min of all sources)

The overall watermark can only advance when the slowest source advances. This is because we cannot guarantee completeness beyond the slowest source.

Monotonicity

Watermarks are monotonically non-decreasing. An attempt to advance a source’s watermark backward is silently ignored:

let advance tracker ~source ~timestamp_ns =
  let prev = Hashtbl.find tracker.sources source
    |> Option.value ~default:0L in
  if Int64.( > ) timestamp_ns prev then begin
    Hashtbl.set tracker.sources ~key:source ~data:timestamp_ns;
    (* Recompute overall minimum *)
    ...
  end
  else None  (* backward movement ignored *)

This ensures that watermarks never retract, which would invalidate window triggering decisions.

Multi-Source Example

Time   Source A   Source B   Overall   Window [0, 100) triggered?
──────────────────────────────────────────────────────────────────
t0     advance    -          -         No (B still at 0)
       to 100
t1     -          advance    50        No (min(100, 50) = 50)
                  to 50
t2     -          advance    100       Yes! (min(100, 100) = 100)
                  to 100
t3     advance    -          100       No change (min(200, 100))
       to 200
t4     -          advance    150       Yes! Window [100, 200) if exists
                  to 150

Trigger Policies

A trigger policy determines when to emit window results:

module Trigger = struct
  type t =
    | On_watermark           (* emit when watermark >= window.end_ *)
    | On_every_element       (* emit after every input event *)
    | On_interval of Time_ns.Span.t  (* emit periodically *)
    | On_count of int        (* emit every N elements *)
end

On_watermark (default)

The window fires when the watermark advances past window.end_ns:

let triggerable_windows tracker ~windows =
  List.filter windows ~f:(fun w ->
    Int64.( >= ) tracker.current_watermark_ns w.end_ns)

This is the primary trigger for Ripple. It fires once per window and is deterministic (event-time based).

On_every_element

Fires after every event. Useful for real-time dashboards where you want to see partial window results update live. Generates high output volume.

On_interval

Fires every N seconds of processing time. Combines the freshness of On_every_element with bounded output rate.

On_count

Fires every N input elements. Useful for micro-batching.

Late Event Classification

Events that arrive after the watermark has advanced past their event time are classified as late:

type classification =
  | On_time      (* event_time >= watermark *)
  | Mildly_late  (* event_time < watermark but within allowed_lateness *)
  | Very_late    (* event_time < watermark - allowed_lateness *)
  | Future       (* event_time > watermark + 60s tolerance *)

Allowed Lateness

The allowed_lateness parameter (default: 60 seconds) defines a grace period for late events:

  watermark at T=200
  allowed_lateness = 60

  Event at T=195: On_time (195 >= 200? No, but >= 200 - 60 = 140: Mildly_late)
  Event at T=130: Very_late (130 < 140)
  Event at T=280: Future (280 > 200 + 60 = 260)
  • On_time: processed normally.
  • Mildly_late: processed, but the window may need to re-fire (retraction + correction).
  • Very_late: dropped and counted in the ripple_window_very_late_events_total metric.
  • Future: likely a clock skew issue. Logged as a warning.

Retractions

When a mildly late event arrives and updates a window that has already fired, Ripple emits a retraction followed by a corrected result:

module Retraction = struct
  type 'a t =
    | Emit of 'a       (* new or corrected result *)
    | Retract of 'a    (* withdraw previous result *)
end

This ensures downstream consumers can correct their state.

Interaction with Checkpointing

Window state is implicitly checkpointed through the graph’s leaf values. Each window’s accumulated state lives in the incremental graph as a leaf or fold node. When the checkpoint captures leaf values, it captures window state.

On recovery:

  1. Leaf values are restored (including window accumulations)
  2. The watermark tracker is reset to the checkpoint’s watermark
  3. Replay resumes from the checkpoint’s input offsets
  4. Events already in the checkpointed window state are re-applied idempotently