Windowing & Watermarks
Ripple processes event-time streams using temporal windows. Windows group events by time range, and watermarks track completeness to determine when window results can be emitted. This page covers window assigners, watermark tracking, trigger policies, and late event handling.
Window Types
A window is a half-open time interval [start_ns, end_ns):
type t =
{ id : string
; start_ns : Int64.t (* inclusive *)
; end_ns : Int64.t (* exclusive *)
}
Events are assigned to windows by their event time, not processing time. This is a settled design decision: all computation in Ripple uses event time. Processing time is for logging only.
Window Assigners
Tumbling Windows
Non-overlapping, fixed-size windows. Every event belongs to exactly one window.
|──── 60s ────|──── 60s ────|──── 60s ────|
| window-0 | window-60 | window-120 |
0 60 120 180 (seconds)
let config = Tumbling { size = Time_ns.Span.of_sec 60.0 }
Assignment is O(1): window_start = event_time - (event_time mod size).
Sliding Windows
Overlapping windows. An event may belong to multiple windows.
|──── 60s ────────────────|
|──── 60s ────────────────|
|──── 60s ────────────────|
0 15 30 45 60 75 90 (seconds, slide=15s)
let config = Sliding
{ size = Time_ns.Span.of_sec 60.0
; slide = Time_ns.Span.of_sec 15.0
}
Assignment returns multiple windows. For size / slide = 4, each event belongs to 4 windows.
Session Windows
Dynamic, gap-based windows. A new window starts when no events arrive within the gap duration.
|─ session ─| gap |── session ──| gap |─ session ─|
e1 e2 e3 e4 e5 e6 e7 e8 e9
let config = Session { gap = Time_ns.Span.of_sec 30.0 }
Session windows are the most complex: each event initially creates a new window, and the window manager merges overlapping sessions.
Watermark Tracking
A watermark W(t) guarantees that all events with event time <= t have been processed by the emitting node. Watermarks drive window triggering and late event classification.
The Tracker
type tracker =
{ mutable sources : (string, Int64.t) Hashtbl.Poly.t
; mutable current_watermark_ns : Int64.t
; mutable advance_count : int
}
The tracker maintains a per-source watermark and computes the overall watermark as the minimum across all sources:
Source A watermark: 100
Source B watermark: 50
Source C watermark: 200
─────────────────────
Overall watermark: 50 (= min of all sources)
The overall watermark can only advance when the slowest source advances. This is because we cannot guarantee completeness beyond the slowest source.
Monotonicity
Watermarks are monotonically non-decreasing. An attempt to advance a source’s watermark backward is silently ignored:
let advance tracker ~source ~timestamp_ns =
let prev = Hashtbl.find tracker.sources source
|> Option.value ~default:0L in
if Int64.( > ) timestamp_ns prev then begin
Hashtbl.set tracker.sources ~key:source ~data:timestamp_ns;
(* Recompute overall minimum *)
...
end
else None (* backward movement ignored *)
This ensures that watermarks never retract, which would invalidate window triggering decisions.
Multi-Source Example
Time Source A Source B Overall Window [0, 100) triggered?
──────────────────────────────────────────────────────────────────
t0 advance - - No (B still at 0)
to 100
t1 - advance 50 No (min(100, 50) = 50)
to 50
t2 - advance 100 Yes! (min(100, 100) = 100)
to 100
t3 advance - 100 No change (min(200, 100))
to 200
t4 - advance 150 Yes! Window [100, 200) if exists
to 150
Trigger Policies
A trigger policy determines when to emit window results:
module Trigger = struct
type t =
| On_watermark (* emit when watermark >= window.end_ *)
| On_every_element (* emit after every input event *)
| On_interval of Time_ns.Span.t (* emit periodically *)
| On_count of int (* emit every N elements *)
end
On_watermark (default)
The window fires when the watermark advances past window.end_ns:
let triggerable_windows tracker ~windows =
List.filter windows ~f:(fun w ->
Int64.( >= ) tracker.current_watermark_ns w.end_ns)
This is the primary trigger for Ripple. It fires once per window and is deterministic (event-time based).
On_every_element
Fires after every event. Useful for real-time dashboards where you want to see partial window results update live. Generates high output volume.
On_interval
Fires every N seconds of processing time. Combines the freshness of On_every_element with bounded output rate.
On_count
Fires every N input elements. Useful for micro-batching.
Late Event Classification
Events that arrive after the watermark has advanced past their event time are classified as late:
type classification =
| On_time (* event_time >= watermark *)
| Mildly_late (* event_time < watermark but within allowed_lateness *)
| Very_late (* event_time < watermark - allowed_lateness *)
| Future (* event_time > watermark + 60s tolerance *)
Allowed Lateness
The allowed_lateness parameter (default: 60 seconds) defines a grace period for late events:
watermark at T=200
allowed_lateness = 60
Event at T=195: On_time (195 >= 200? No, but >= 200 - 60 = 140: Mildly_late)
Event at T=130: Very_late (130 < 140)
Event at T=280: Future (280 > 200 + 60 = 260)
- On_time: processed normally.
- Mildly_late: processed, but the window may need to re-fire (retraction + correction).
- Very_late: dropped and counted in the
ripple_window_very_late_events_totalmetric. - Future: likely a clock skew issue. Logged as a warning.
Retractions
When a mildly late event arrives and updates a window that has already fired, Ripple emits a retraction followed by a corrected result:
module Retraction = struct
type 'a t =
| Emit of 'a (* new or corrected result *)
| Retract of 'a (* withdraw previous result *)
end
This ensures downstream consumers can correct their state.
Interaction with Checkpointing
Window state is implicitly checkpointed through the graph’s leaf values. Each window’s accumulated state lives in the incremental graph as a leaf or fold node. When the checkpoint captures leaf values, it captures window state.
On recovery:
- Leaf values are restored (including window accumulations)
- The watermark tracker is reset to the checkpoint’s watermark
- Replay resumes from the checkpoint’s input offsets
- Events already in the checkpointed window state are re-applied idempotently