Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Schemas & Compatibility

Ripple uses first-class, serializable type descriptors to ensure that producers and consumers agree on the shape of data crossing partition boundaries. This page covers the Schema.t type, ppx-driven derivation, version management, compatibility rules, and fingerprinting.

The Schema.t Type

A schema is a runtime representation of an OCaml record type:

module Version : sig
  type t [@@deriving sexp, bin_io, compare, hash]
  val of_int_exn : int -> t   (* must be positive *)
  val to_int : t -> int
end

type field_type =
  | String | Int | Int64 | Float | Bool | Sexp_opaque
  | Option of field_type
  | List of field_type
  | Record of field list
  | Variant of variant_case list
  | Ref of string              (* cross-reference to another schema *)

and field =
  { name : string
  ; typ : field_type
  ; optional : bool
  ; doc : string option
  }

and variant_case =
  { case_name : string
  ; case_args : field_type list
  }

type t =
  { name : string
  ; version : Version.t
  ; fields : field list
  ; doc : string option
  }

Schemas are themselves serializable ([@@deriving sexp, bin_io, compare, hash]), which means they can be transmitted over the wire during the handshake protocol and stored alongside checkpoints.

ppx Derivation

In production, schemas are derived from OCaml types via ppx, not written by hand:

type trade =
  { symbol : string
  ; price : float
  ; size : int
  ; timestamp_ns : Int64.t
  ; venue : string
  }
[@@deriving sexp, bin_io, compare, hash, schema]

The [@@deriving schema] annotation (via ppx_schema, proposed) generates a val schema : Schema.t value at compile time. This ensures the schema always matches the type definition – no drift between code and metadata.

Until ppx_schema is implemented, schemas are constructed manually:

let trade_schema : Schema.t =
  { name = "trade"
  ; version = Version.of_int_exn 1
  ; fields =
      [ { name = "symbol"; typ = String; optional = false; doc = None }
      ; { name = "price"; typ = Float; optional = false; doc = None }
      ; { name = "size"; typ = Int; optional = false; doc = None }
      ; { name = "timestamp_ns"; typ = Int64; optional = false; doc = None }
      ; { name = "venue"; typ = String; optional = false; doc = None }
      ]
  ; doc = Some "Trade event from exchange"
  }

Version Management

Versions are positive integers. Each breaking change increments the version. The version number is carried in every schema and checked during connection handshake.

Rules:

  • Version 1 is the initial schema.
  • Adding an optional field does not require a version bump (backward and forward compatible).
  • Adding a required field requires a version bump (breaks backward compatibility).
  • Removing a field requires a version bump.
  • Changing a field type requires a version bump (except widening conversions like Int -> Int64).

Compatibility Rules

Ripple enforces three levels of compatibility, following the same framework as Apache Avro and Confluent Schema Registry:

Backward Compatibility

A new reader can read data written by an old writer.

check_backward ~reader:v2 ~writer:v1

Rules:

  • Every required field in the reader must exist in the writer.
  • Field types must be compatible (same type, or a widening conversion).
  • New optional fields in the reader are allowed (reader uses default).

Forward Compatibility

An old reader can read data written by a new writer.

check_forward ~reader:v1 ~writer:v2

Rules:

  • Every required field in the reader must still exist in the writer.
  • The writer may add new fields that the old reader ignores.
  • The writer must not remove fields that the old reader requires.

Full Compatibility

Both backward and forward compatibility hold simultaneously.

check_full ~old:v1 ~new_:v2

Full compatibility is required for zero-downtime rolling upgrades: during a rolling deploy, some workers run v1 and others run v2. Both must be able to read each other’s output.

Compatibility Decision Table

ChangeBackwardForwardFull
Add optional fieldYesYesYes
Add required fieldNoYesNo
Remove optional fieldYesNoNo
Remove required fieldNoNoNo
Widen type (Int -> Int64)YesNoNo
Narrow type (Int64 -> Int)NoNoNo
Rename fieldNoNoNo

Incompatibility Reporting

When a compatibility check fails, the result includes structured error details:

type incompatibility =
  { path : string list        (* e.g., ["trade"; "venue"] *)
  ; kind : incompatibility_kind
  ; detail : string
  }

type result =
  | Compatible
  | Incompatible of incompatibility list

This allows the CLI and monitoring system to report exactly which fields caused the incompatibility.

Fingerprinting

Every schema has a deterministic fingerprint computed from its canonical s-expression form:

let fingerprint t =
  let sexp = to_canonical_sexp t in
  let canonical = Sexp.to_string sexp in
  sprintf "%016Lx" (Int64.of_int (String.hash canonical))

The canonical form sorts fields alphabetically before hashing, so field declaration order does not affect the fingerprint:

(* These two schemas have the same fingerprint *)
let s1 = { fields = [b_field; a_field]; ... }
let s2 = { fields = [a_field; b_field]; ... }
fingerprint s1 = fingerprint s2  (* true *)

Fingerprints are used in:

  1. Wire protocol headers: every delta message carries the schema fingerprint of its payload. The receiver verifies the fingerprint matches its local schema before deserializing.
  2. Checkpoint metadata: the checkpoint records which schema version was active when the snapshot was taken.
  3. Connection handshake: workers exchange schema fingerprints to verify compatibility before streaming deltas.

Fingerprint Stability Property

fingerprint(s) = fingerprint(t_of_sexp(sexp_of_t(s)))

A schema’s fingerprint is stable across serialization roundtrips. This is tested in the inline expect tests and is essential for cross-process fingerprint comparison.