Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Wire Protocol

Ripple uses a binary wire protocol for cross-worker communication. Every message is wrapped in an envelope with integrity checking, schema identification, and sequence numbering. This page covers the envelope format, message types, handshake protocol, and error detection.

Message Envelope Format

Every wire message follows this layout:

 Offset  Size    Field
 ──────  ────    ─────
 0       4       Magic number (0x52495050 = "RIPP")
 4       1       Protocol version (currently 1)
 5       1       Message type (uint8)
 6       1       Flags (uint8, reserved)
 7       8       Sequence number (int64 LE)
 15      8       Timestamp (nanoseconds since epoch, int64 LE)
 23      32      Schema fingerprint (zero-padded ASCII)
 55      4       Payload length (int32 LE)
 59      N       Payload (bin_prot encoded)
 59+N    4       CRC-32C (Castagnoli, int32 LE)

Total overhead per message: 63 bytes (header + CRC). The payload is variable-length, encoded with bin_prot.

Field Details

Magic number: 0x52495050 (“RIPP” in ASCII). Allows detection of protocol mismatch or corrupt stream start.

Protocol version: Currently 1. Incremented for breaking wire format changes. The receiver must reject messages with an unsupported version.

Message type: Single byte identifying the payload variant. See the message types table below.

Flags: Reserved for future use (e.g., compression bit, priority). Currently always 0.

Sequence number: Monotonically increasing per sender. Used for:

  • Duplicate detection (idempotent delivery)
  • Gap detection (missing messages)
  • Ordering verification

Timestamp: Nanoseconds since Unix epoch. Event time, not wall-clock time. Used for watermark propagation.

Schema fingerprint: 32-byte zero-padded string. Identifies the schema of the payload. The receiver compares this against its local schema registry to verify compatibility before deserialization.

Payload length: 32-bit little-endian integer. Allows the receiver to read exactly the right number of bytes.

CRC-32C: Castagnoli polynomial (0x1EDC6F41). Covers all bytes from offset 0 through the end of the payload. A single bit flip in any field is detected before deserialization.

CRC-32C Integrity

The CRC uses the Castagnoli polynomial, which is hardware-accelerated on modern x86 CPUs (SSE 4.2 crc32 instruction). The implementation uses a lookup table for portability:

let compute_string s =
  let crc = ref 0xFFFFFFFFl in
  String.iter s ~f:(fun c ->
    let byte = Char.to_int c in
    let index = Int32.to_int_trunc
      (Int32.( land ) (Int32.( lxor ) !crc
        (Int32.of_int_trunc byte)) 0xFFl) in
    crc := Int32.( lxor )
      (Int32.shift_right_logical !crc 8) table.(index));
  Int32.( lxor ) !crc 0xFFFFFFFFl

On decode, the receiver:

  1. Computes CRC-32C over all bytes except the last 4
  2. Compares against the stored CRC
  3. Rejects the message on mismatch (no deserialization attempted)

This catches corruption from network bit flips, partial writes, and truncated messages before any bin_prot parsing occurs.

Message Types

TagTypeDirectionPurpose
0HandshakebidirectionalConnection establishment
1Deltaproducer -> consumerData change (hot path)
2Checkpoint_requestcoordinator -> workerTrigger checkpoint
3Checkpoint_ackworker -> coordinatorCheckpoint complete
4Heartbeatworker -> coordinatorLiveness signal
5Schema_negotiationbidirectionalSchema compatibility response

Handshake

type handshake =
  { source_worker_id : string
  ; output_schemas : (string * Schema.t) list
  ; protocol_version : int
  }

Sent on connection establishment. The receiver validates:

  1. Protocol version compatibility
  2. Schema compatibility for each declared output
  3. Responds with Schema_negotiation (compatible/incompatible + error details)

Delta (Hot Path)

type delta_msg =
  { source_output : string       (* which output produced this delta *)
  ; key : string                 (* bin_prot encoded key *)
  ; delta_bytes : string         (* bin_prot encoded Delta.t *)
  ; sequence_no : int
  ; watermark_ns : Int64.t option
  }

The delta message is the most frequent message type. It carries:

  • The name of the output that changed
  • The key (typically a symbol) identifying which entry changed
  • The serialized delta (Set, Patch, or Remove)
  • A sequence number for idempotent delivery
  • An optional watermark update piggy-backed on the data message

Heartbeat

type heartbeat =
  { worker_id : string
  ; timestamp_ns : Int64.t
  ; load : float       (* 0.0 to 1.0 *)
  }

Sent periodically (default every 5 seconds). The coordinator uses heartbeats for liveness detection and load-aware partition assignment.

Handshake Protocol

Connection establishment follows a 3-step protocol:

Worker-A                              Worker-B
   │                                     │
   │─── Handshake(schemas=[vwap/v2]) ──>│
   │                                     │
   │    check_backward(reader=v2,        │
   │                   writer=v2)        │
   │                                     │
   │<── Schema_negotiation(ok=true) ────│
   │                                     │
   │─── Delta(key=AAPL, seq=1) ────────>│
   │─── Delta(key=GOOG, seq=2) ────────>│
   │    ...                              │

If schema negotiation fails:

Worker-A                              Worker-B
   │                                     │
   │─── Handshake(schemas=[vwap/v3]) ──>│
   │                                     │
   │    check_backward(reader=v2,        │
   │                   writer=v3) FAIL   │
   │                                     │
   │<── Schema_negotiation(ok=false,     │
   │     errors=["missing field x"]) ───│
   │                                     │
   │    [connection rejected]            │

Sequence Numbering

Each worker maintains a monotonically increasing sequence counter per output. The receiver tracks the last-seen sequence number per source:

let set_remote remote ~sequence_no value =
  if sequence_no <= remote.sequence_no then
    false  (* Already seen -- idempotent discard *)
  else begin
    (* Apply the update *)
    remote.sequence_no <- sequence_no;
    true
  end

This provides:

  • Idempotent delivery: duplicate messages (from at-least-once transport) are silently discarded
  • Gap detection: if sequence jumps from 42 to 44, sequence 43 was lost (log warning)
  • Ordering: within a single source, deltas are applied in sequence order

Encoding

Payload encoding uses bin_prot for performance. The bin_prot format is:

  • Self-describing length prefixes
  • Native-endian integers (fast, but not portable across architectures)
  • No schema embedded in the payload (schema is identified by fingerprint in the header)

For debugging and human-readable inspection, the sexp format is available as a fallback. The debug tap CLI command decodes delta messages and prints them as s-expressions.