API Reference

This reference lists the Python surface exposed by transports.

Session

class transports.Session[source]

Bases: object

host(model: Any) int[source]

Host a model: register its schema, store its value in the core, and watch it. Returns id.

ids() List[int][source]

The ids of all hosted models.

on_patch(fn: Callable[[int, dict], None]) None[source]

Register a callback invoked as fn(model_id, patch) for each emitted patch.

flush() List[Tuple[int, dict]][source]

Diff every dirty model against the core and emit the minimal patches. Returns them.

update(mid: int) List[Tuple[int, dict]][source]

Force a diff+emit for one hosted model and flush.

Automatic emission needs bigbrother to observe the model; models without a __dict__ (msgspec.Struct and other __slots__ types) can’t be watched, so mutate them and then call update(id) explicitly.

drain() List[Tuple[int, dict]][source]

Flush, then return and clear the accumulated outbox.

snapshot(mid: int) dict[source]

{“type_name”:.., “rev”:.., “value”:..} for a hosted model (flushes pending changes).

value(mid: int) dict[source]

The current core Value of a hosted model.

apply_patch(mid: int, patch: dict) bool[source]

Apply an authoritative remote patch to a hosted/mirrored model (adopts the patch’s rev).

Also refreshes the hosted Python object so the caller’s own reference stays in sync — the update is made under observation suppression so it doesn’t echo back as a new patch.

submit(mid: int, patch: dict) dict | None[source]

Apply a client-proposed patch as the server: the server owns rev.

The proposal’s ops are applied to the hosted value, the server’s rev is bumped (not the client’s guess), the hosted Python object is refreshed, and the authoritative patch (with the server rev) is returned to broadcast to every connection. None if the id is unknown.

since(mid: int, since_rev: int) List[dict] | None[source]

Patches to advance a mirror from since_rev to current, or None if the log can’t bridge the gap (the next needed patch was evicted) — then the caller should send a fresh snapshot instead.

schema(type_name: str) dict | None[source]

Model bridge

transports.to_value(model: Any) Any[source]

A pydantic / dataclass / msgspec model as the core Value (a tagged Map).

transports.from_value(value: Any, cls: Type[M]) M[source]

Reconstruct a model of type cls from a core Value.

transports.schema_of(cls: type) dict[source]

Derive a core Schema (type_name + fields) from a pydantic / dataclass / msgspec class.

transports.schema_to_ts(schema: dict) str[source]

Render a core Schema as a TypeScript interface declaration.

Connections

class transports.Server(session: Session, *, default_codec: str = 'json')[source]

Bases: object

Serves a Session to connected clients: sends a snapshot on connect, broadcasts patches, and relays a client’s patches to the other clients (a hub). Transport-agnostic — its methods return the messages to send; an adapter such as ws_endpoint performs the I/O.

Each connection has its own negotiated codec, so outbound messages are encoded per connection.

open(conn: Any, codec: str | None = None, since: Dict[int, int] | None = None) List[str | bytes][source]

Register a connection; return the messages that bring it up to date.

Fresh connect (since=None) → a snapshot per model. Resume (since={mid: last_rev}) → only the patches each model emitted after last_rev, falling back to a snapshot for any model whose replay log can’t bridge the gap. So a reconnecting client replays the delta, not the whole model.

recv(conn: Any, data: str | bytes) Dict[Any, List[str | bytes]][source]

Handle an inbound message (text or binary frame); returns messages to send, keyed by conn.

A client patch is a proposal: the server applies it, bumps its own authoritative rev, and echoes the resulting patch to every connection (including the origin), each in that connection’s codec. Models are server-authoritative — a client’s mirror updates when this echo arrives, not optimistically.

flush() Dict[Any, List[str | bytes]][source]

Drain the session and return the patch messages to broadcast, encoded per connection.

close(conn: Any) None[source]
class transports.Client(codec: str = 'json')[source]

Bases: object

Mirrors a remote Session — applies snapshot/patch messages to a local copy of each model.

Read values with value(id) or materialize them with model(id, cls). Drive it with a live connection via connect(url), or feed it messages directly with recv(data). The codec (“json”, “msgpack”, or “cbor”) frames outbound edits and decodes inbound frames.

recv(data: str | bytes) None[source]

Apply an inbound snapshot or patch message (text or binary frame) to the local mirror.

value(mid: int) Any[source]

The current mirrored core Value of a model.

model(mid: int, cls: Type[M]) M[source]

Materialize the mirrored model as an instance of cls.

ids() List[int][source]
edit(mid: int, new_value: Any) str | bytes[source]

Propose an edit to a mirrored model; returns the patch frame to send (encoded in this codec).

Models are server-authoritative: the edit is a proposal, and the local mirror updates only when the server echoes the authoritative patch back (via recv), not optimistically. This keeps rev owned by the server and avoids client/server rev divergence.

async connect(url: str) None[source]

Connect to a transports server and mirror it until the connection closes (one connection).

async run(url: str, *, authority: str = 'server', retry: float = 1.0) None[source]

Connect and mirror, reconnecting whenever the connection drops — so the client survives a server restart or a network blip. authority decides reconciliation on each (re)connect:

  • "server" (default): the server is canonical; the client adopts its state (resuming from ?since= when it can, else a fresh snapshot). This is the “refetch on refresh” behavior.

  • "client": the client is canonical; after the server’s snapshot it pushes its last-known state back as an edit, so a server that came back stale or empty is rectified from the client. With a CRDT model the push merges (newer stamps win); otherwise it overwrites.

Runs until cancelled. The choice of where the authoritative state lives is yours — pair this with the server-side durability hooks (Hub.on_shared_write) as your use case needs.

async connect_sse(url: str) None[source]

Mirror a transports server over Server-Sent Events (receive-only) until the stream closes.

SSE is a one-way server→client channel, so this only receives snapshots and patches; use connect() (WebSocket) when the client also needs to send edits.

transports.ws_endpoint(server: Broadcaster)[source]

Build a Starlette WebSocket endpoint that serves server (a Server or Hub).

The connection’s codec is read from a ?codec= query param, falling back to the broadcaster’s default_codec. Wire it into an app, e.g. WebSocketRoute("/ws", ws_endpoint(server)), and run autosync(server) as a background task to stream server-side model changes to clients.

transports.sse_endpoint(server: Broadcaster)[source]

Build a Starlette endpoint that streams a Server/Hub to a client over SSE.

Wire it into an app as a normal route (Route("/sse", sse_endpoint(server))) and run autosync(server) as a background task to stream server-side model changes to connected clients.

transports.serve_comm(server: Broadcaster, comm: Any, codec: str = 'json') _CommConn[source]

Wire a comm to a Server/Hub: send the opening snapshots and relay inbound messages.

The comm is registered as a connection; its messages (msg["content"]["data"]) are fed to server.recv and any resulting messages are sent back over the relevant connections. Returns the connection handle. Call sync(server) after host-side mutations to push changes.

transports.serve_anywidget(server: Broadcaster, widget: Any, codec: str = 'json') _WidgetConn[source]

Wire a widget to a Server/Hub. Returns the connection handle; call sync(server) to push.

The widget’s custom messages drive the protocol: {"ready": true} triggers the opening snapshots, and any {"wire": <wire>} is relayed to server.recv (a client’s edits), with results sent back over the relevant widgets.

async transports.autosync(server: Broadcaster, interval: float = 0.01) None[source]

Background task: periodically flush and broadcast patches to all connections.

Run exactly one of these per Server/Hub (not per connection), so a single drain feeds every client. The async counterpart of sync — use this for socket backends (WebSocket/SSE) driven by an event loop, and sync for the synchronous ones (Jupyter comm/anywidget).

transports.sync(server: Broadcaster) None[source]

Drain host-side changes and deliver the patches over every connection, synchronously.

The manual counterpart of autosync, for backends driven by a synchronous loop (a Jupyter comm or anywidget): call it after mutating hosted models — e.g. at the end of a cell, or from a kernel timer. Each connection handle exposes send(wire).

Multi-tenancy and sharing

transports.READ = 'read'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

transports.WRITE = 'write'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

class transports.Hub(key: Callable[[Any], Any], *, default_codec: str = 'json')[source]

Bases: object

Route connections to per-tenant Session objects and fan shared data structures to subscribers.

Construct with key, a function mapping a connection handle to its tenant key. Register shared models with share() and connect tenants to them with subscribe(). Like Server, the methods return the messages to send keyed by connection; an adapter such as ws_endpoint(hub) performs I/O.

tenant(key: Any) Session[source]

Get (or create) the Session holding a tenant’s private models.

share(model_or_value: ~typing.Any, type_name: str | None = None, *, merge: ~typing.Any = <class 'transports.hub.LastWriteWins'>, replay: bool = False, rev: int = 0, merge_state: dict | None = None) int[source]

Register a shared data structure; returns its shared id.

Pass a model instance (pydantic/dataclass/msgspec) to capture its value and type name, or a core Value dict together with type_name. merge is a MergeStrategy subclass (each shared model gets its own instance) or an instance to reuse. replay=True keeps a bounded patch log so a joining worker can catch up by delta (see since_shared()) instead of a full snapshot. To restore a model from a durable checkpoint on startup, pass rev and merge_state (from a prior snapshot_shared()) alongside the saved value.

subscribe(tenant_key: Any, sid: int, mode: str = 'read') None[source]

Subscribe a tenant to a shared model with READ or WRITE access.

open(conn: Any, codec: str | None = None, since: Dict[int, int] | None = None) List[str | bytes][source]

Register a connection; return the messages to bring it up to date — its tenant’s private models and its subscribed shared models. With since={mid: last_rev} a reconnecting client resumes its private models from the delta (shared models re-snapshot — a shared replay log is a follow-on).

recv(conn: Any, data: str | bytes) Dict[Any, List[str | bytes]][source]

Handle an inbound patch; returns messages to send, keyed by connection.

A patch to a private model is applied as the server (the tenant’s session owns rev) and the authoritative patch is broadcast to all of that tenant’s connections. A patch to a shared model (from a WRITE subscriber) is merged into the authoritative value and broadcast to every subscriber connection. Both paths are server-authoritative (origin included).

flush() Dict[Any, List[str | bytes]][source]

Drain every tenant session and any host-side shared writes; route the patches per tenant/subscription.

set_shared(sid: int, new_value_or_model: Any) None[source]

Write to a shared model from the host side; the change is broadcast on the next sync/autosync.

apply_shared(sid: int, patch: dict, origin: Any) None[source]

Merge a shared-model write that happened on another worker (delivered over a backplane), and queue the resulting authoritative fan for this worker’s subscribers on the next flush. Do not re-publish — the originating worker already broadcast it. When the model’s MergeStrategy is a CRDT this is convergent: applying the same set of writes in any order yields the same value, so concurrent edits from clients on different workers reconcile identically everywhere.

on_shared_write(callback: Callable | None) None[source]

Register a callback fired after each authoritative shared write, with (sid, type_name, value, rev, patch, merge_state). transports stores nothing durably; persist these (the value+rev+merge_state, or append the patch) to make a model survive a full-cluster restart, and restore with share(value=…, rev=…, merge_state=…). Gate on a single writer (e.g. the relay’s leader) if you don’t want every worker persisting the same change.

snapshot_shared(sid: int) dict[source]

The full transferable/persistable state of a shared model: value, rev and the merge clock (merge_state). Used by the relay to catch up a joining worker, and by users to checkpoint for durability.

since_shared(sid: int, since_rev: int) List[dict] | None[source]

Patches after since_rev for a delta catch-up, or None if it is outside the kept log (the caller should fall back to a snapshot). Requires share(replay=True). Mirrors Session.since.

apply_snapshot_shared(sid: int, value: dict, rev: int, merge_state: dict | None) None[source]

Adopt a peer’s snapshot of a shared model: set value/rev and restore the merge clock, so later merges respect the transferred causal stamps.

apply_delta_shared(sid: int, patches: List[dict], rev: int, merge_state: dict | None) None[source]

Catch up by applying replay patches onto the current (restored) value, then restoring the merge clock — cheaper than a snapshot when a recent checkpoint is held.

close(conn: Any) None[source]
class transports.MergeStrategy[source]

Bases: object

How a write to a shared model is reconciled into its authoritative value.

merge(current, patch, origin) returns the new core Value. Implementations may be stateful; pass the class (not an instance) to Hub.share(merge=…) so each shared model gets its own instance and its own state.

restore(state: dict) None[source]

Adopt metadata produced by state() (catch-up or durable restore). Default: ignore.

state() dict[source]

The strategy’s own metadata (e.g. a CRDT clock), JSON-serializable, so a model’s full state can be transferred to a joining worker or persisted by the user. Stateless strategies return {}.

class transports.LastWriteWins[source]

Bases: MergeStrategy

Apply each write in arrival order (today’s Store semantics). Order-dependent.

class transports.LwwMapCrdt[source]

Bases: MergeStrategy

Conflict-free per-top-level-key last-writer-wins register map.

Each top-level map key carries a logical stamp (patch rev, origin); a key’s write is accepted only if its stamp is at least the stored one. Two consequences: concurrent edits to different keys both survive, and conflicting edits to the same key converge to the same value regardless of the order the hub happens to receive them in (the stamp is intrinsic to the write, not its arrival order). Nested or list ops fall back to a direct apply, stamped by their top-level key.

restore(state: dict) None[source]

Adopt metadata produced by state() (catch-up or durable restore). Default: ignore.

state() dict[source]

The strategy’s own metadata (e.g. a CRDT clock), JSON-serializable, so a model’s full state can be transferred to a joining worker or persisted by the user. Stateless strategies return {}.

Protocol helpers

transports.protocol.normalize_codec(name: str | None) str[source]

Map a codec name or content-type to a canonical name (JSON, MSGPACK, CBOR, or a registered custom content type).

transports.protocol.snapshot_msg(model_id: int, type_name: str, rev: int, value: Any) str[source]
transports.protocol.patch_msg(model_id: int, patch: dict) str[source]
transports.protocol.encode(msg_json: str, codec: str = 'json') str | bytes[source]

Encode a JSON message string into the wire form for codec.

Returns the string unchanged for JSON, MessagePack bytes for the msgpack codec, or whatever a registered custom codec produces — so the caller sends a text or binary frame accordingly.

transports.protocol.decode(data: str | bytes, codec: str | None = None) dict[source]

Parse an inbound frame to a message dict.

Pass the connection’s codec to select the decoder (required for custom codecs). With no codec the built-ins are inferred from the frame type (str=JSON, bytes=msgpack).

The codec registry functions (encode_as, decode_as, register_codec, unregister_codec, registered_codecs) are re-exported at the top level and listed under Core below.

Core

transports.diff(old, new)

Diff two JSON-encoded models, returning the JSON-encoded patch.

transports.apply(value, patch)

Apply a JSON-encoded patch to a JSON-encoded model, returning the JSON-encoded result.

transports.encode(value)

Encode a JSON-encoded model to codec bytes.

transports.decode(data)

Decode codec bytes back to a JSON-encoded model string.

transports.encode_as(value_json: str, content_type: str) bytes[source]

Encode a model Value (JSON string) to bytes with the named codec (built-in or registered).

transports.decode_as(data: str | bytes, content_type: str) str[source]

Decode bytes back to a model Value (JSON string) with the named codec (built-in or registered).

transports.json_to_msgpack(json)

Convert an arbitrary JSON document to MessagePack bytes (for encoding whole protocol messages).

transports.msgpack_to_json(data)

Convert MessagePack bytes back to a JSON document.

transports.register_codec(content_type: str, encode: Callable[[Any], str | bytes], decode: Callable[[str | bytes], Any]) None[source]

Register a custom wire codec under content_type.

encode turns a JSON-able object (a protocol message or a model Value) into wire bytes (or a str); decode is its inverse. Once registered, content_type works anywhere a codec name is accepted — Client(codec=content_type), a ?codec= query param, or encode_as / decode_as. Register a matching implementation in every binding that needs it (the JS binding has its own registerCodec). The built-in json / msgpack codecs cannot be overridden.

transports.unregister_codec(content_type: str) None[source]

Remove a previously registered custom codec.

transports.registered_codecs() Tuple[str, ...][source]

The content types of the currently registered custom codecs.

class transports.Store

Bases: object

In-process model store: host / mutate → patch / apply / snapshot.

apply(id, patch_json)

Apply a JSON patch to a mirrored model; returns whether the id was known.

host(type_name, value_json)

Host a model from its JSON; returns the assigned id.

mutate(id, value_json)

Replace a hosted model from JSON; returns the JSON patch (or None if id unknown).

snapshot(id)

{“type_name”:..,”rev”:..,”value”:..} for a hosted model, or None.