Skip to content

ObserveRTC/observer-js

Repository files navigation

ObserverTC — @observertc/observer-js

NPM version License

In one line: feed it WebRTC getStats() snapshots, and get back a live, queryable model of every call plus a single typed event stream to react to.

observer-js is a server-side Node.js library for monitoring WebRTC sessions. A WebRTC application (typically an SFU or a signaling/stats backend) feeds it ClientSample objects — periodic snapshots of each participant's RTCPeerConnection.getStats() output plus application events — and observer-js maintains a live, in-memory model of every call, participant, peer connection, and media stream, derives per-interval and cumulative metrics, and emits a single, unified stream of typed events the application can react to.

What you can do with it:

  • Monitor calls live — a queryable in-memory tree of every call, client, peer connection, track, codec, ICE candidate and data channel, each holding current and cumulative metrics.
  • React on one event bus — subscribe once on the Observer; every payload carries its full ancestry (call → client → peer connection → stat), so you never walk the tree to subscribe.
  • Get derived metrics for free — counter-reset-safe per-tick deltas, bitrates, jitter, RTT, fraction-lost, remote-RTP (RTCP) correlation, and TURN/TCP usage from the selected candidate pair.
  • Correlate across an SFU — link a publisher's outbound track to every subscriber's inbound track (RemoteTrackResolver), and observe mediasoup routers/transports/producers/consumers on the server side.
  • Detect server-only problems — cross-client Detectors raise call-issues for conditions no single client can see (e.g. everyone in a call degrading at once).
  • Persist every sample — per-client sinks (JSONL file, in-memory, or your own) for archival, streaming, and offline replay.
  • Drop it in safely — warn-don't-throw, a pluggable logger, dual ESM + CommonJS, and no media-stack dependency in the core.

Status: 1.0.0-beta. The API described here is current and intended to be implemented against directly. This document is written to be self-sufficient: an engineer (or an AI agent) should be able to integrate the library, or develop it further, from this file alone. A companion doc, docs/logging.md, covers logging integration in depth.

Packaging: server-side, Node.js ≥ 22, shipped as a dual ESM + CommonJS build — so it works whether your project uses import (ESM) or require() (CommonJS). Everything — including the built-in file sink — is exported from the single @observertc/observer-js entry.


Table of contents

  1. Installation
  2. Quick start
  3. Data flow
  4. Entity hierarchy
  5. Ingestion: accept(), context & lifecycle
  6. Update policies
  7. The event bus ← the core of the API
  8. API reference
  9. Schema types (ClientSample)
  10. Detectors (server-side extension point)
  11. Remote track resolution (mediasoup / SFU)
  12. Mediasoup router observation
  13. Sinks (per-client sample persistence)
  14. Logging
  15. Error-handling philosophy
  16. Development & extension guide

Installation

npm install @observertc/observer-js
# or
yarn add @observertc/observer-js

Server-side, Node.js ≥ 22, dual ESM + CommonJS. The package ships both module formats, so it works the same whether your project is ESM or CommonJS — your import line is unchanged either way:

import { Observer, ClientSample, createJsonlFileSinkFactory } from '@observertc/observer-js';

In an ESM project this resolves to the .mjs build; in a CommonJS project (where TypeScript compiles your import down to require()) it resolves to the .js build. Everything is exported from the single @observertc/observer-js entry. Written in TypeScript; ships type declarations for both formats (dist/index.d.ts for require, dist/index.d.mts for import). Runtime dependencies: @bufbuild/protobuf, events, uuid. The library does not bundle a logger or any transport — see Logging.

ClientSample and friends are re-exported from this package, and are also published as the shared schema in @observertc/schemas; samples produced on the client (e.g. by @observertc/client-monitor-js) conform to the same shape.


Quick start

import { Observer, ClientSample } from '@observertc/observer-js';

// 1. Create an observer.
const observer = new Observer({
  // when the observer aggregates call/client metrics:
  updatePolicy: 'update-when-all-call-updated',
  // default policy applied to calls created automatically by accept():
  defaultCallUpdatePolicy: 'update-on-any-client-updated',
  // optional auto-teardown:
  closeCallIfEmptyForMs: 20_000,
  closeClientIfIdleForMs: 60_000,
});

// 2. Subscribe on the single bus. Every payload is an object with the ancestry.
observer.on('call-added', ({ observedCall }) => {
  console.log('new call', observedCall.callId);
});

observer.on('client-issue', ({ observedClient, issue }) => {
  console.warn(`[${observedClient.clientId}] ${issue.type}`, issue.payload);
});

observer.on('peer-connection-updated', ({ observedClient, observedPeerConnection }) => {
  console.log(observedClient.clientId, 'RTT(ms):', observedPeerConnection.currentRttInMs);
});

observer.on('sample-rejected', ({ reason, sample }) => {
  console.warn('dropped a sample:', reason);
});

// 3. Feed samples. `context` (optional) is transient per-accept data, carried to the
//    `*-updated` events this accept triggers (never written to appData).
function onClientStats(sample: ClientSample) {
  observer.accept(sample, { studioVersion: '1.2.3' });
}

// 4. Tear down.
process.on('SIGINT', () => observer.close());

Data flow

client getStats()  ──►  ClientSample  ──►  observer.accept(sample, ctx?)
                                               │
              ┌────────────────────────────────┘
              ▼
   get-or-create ObservedCall ──► get-or-create ObservedClient ──► client.accept(sample, ctx)
                                                                        │
                                              per peerConnections[] in the sample
                                                                        ▼
                                              get-or-create ObservedPeerConnection
                                              .accept(pcSample, ctx) updates all sub-stats,
                                              derives deltas/bitrates/RTT, correlates remote RTP
                                                                        │
                          metrics roll up: PeerConnection → Client → Call → Observer
                                                                        │
                                          events emitted on the Observer bus  ──►  your handlers
  • A sample must have callId and clientId (the library sets them, or the app does). If either is missing, the sample is dropped and sample-rejected is emitted.
  • Sub-entities that stop appearing in samples are garbage-collected via a "visited" mark-and-sweep on each ObservedPeerConnection.accept(), emitting the corresponding *-removed events.

Entity hierarchy

Class Created by Keyed on its parent as Holds
Observer new Observer(config?) — (root) observedCalls: Map<string, ObservedCall>, global counters, the event bus
ObservedCall observer.createObservedCall(settings) / lazily by accept observedCalls observedClients: Map<string, ObservedClient>, call-wide metrics, detectors, scoreCalculator
ObservedClient call.createObservedClient(settings) / lazily observedClients observedPeerConnections: Map<string, ObservedPeerConnection>, per-client metrics
ObservedPeerConnection lazily, from sample.peerConnections[] observedPeerConnections the 15 sub-stat maps below, transport/RTT/bitrate metrics
Sub-stats lazily, from the PeerConnectionSample maps on the PC individual WebRTC stat objects

ObservedPeerConnection sub-stat maps (all public readonly):

observedCertificates, observedCodecs, observedDataChannels,
observedIceCandidates, observedIceCandidatesPair, observedIceTransports,
observedInboundRtps, observedInboundTracks, observedMediaPlayouts,
observedMediaSources, observedOutboundRtps, observedOutboundTracks,
observedPeerConnectionTransports, observedRemoteInboundRtps, observedRemoteOutboundRtps

Each sub-stat class (ObservedInboundRtp, ObservedOutboundRtp, ObservedInboundTrack, ObservedOutboundTrack, ObservedDataChannel, ObservedIceCandidate, ObservedIceCandidatePair, ObservedIceTransport, ObservedCertificate, ObservedCodec, ObservedMediaSource, ObservedMediaPlayout, ObservedPeerConnectionTransport, ObservedRemoteInboundRtp, ObservedRemoteOutboundRtp) mirrors the corresponding stat fields from the schema plus derived fields (deltas, bitrates).


Ingestion: accept(), context & lifecycle

observer.accept(sample, context?)

The single entry point. It:

  1. drops + emits sample-rejected if the observer is closed;
  2. runs the sample through the global accept-middleware chain (see below);
  3. (chain terminal) drops + emits sample-rejected if callId/clientId is missing;
  4. gets or lazily creates the ObservedCall and ObservedClient (their appData comes from the configured factories, never from context);
  5. delegates to client.accept(sample, context), which fans out to each ObservedPeerConnection.accept(pcSample, context).

Accept middlewares (global pre-dispatch hook)

observer.addAcceptMiddleware(...) registers middlewares run on every sample inside accept(), in order, before the sample is dispatched to any call or client. Each middleware gets a { sample, context } payload; it can inspect or mutate the sample (set/normalize callId/clientId, enrich, redact) or the context, then call next(payload) to continue. Not calling next drops the sample — nothing is created and no event fires. A throwing middleware is caught and warns (the sample is dropped), never crashing accept().

import { Observer, AcceptMiddleware } from '@observertc/observer-js';

const observer = new Observer();

// derive callId/clientId from the app's own attachment, before dispatch
const route: AcceptMiddleware = ({ sample }, next) => {
  sample.callId ??= sample.attachments?.roomId as string;
  sample.clientId ??= sample.attachments?.peerId as string;
  next({ sample });
};

// drop samples from a blocklisted client (never dispatched)
const filter: AcceptMiddleware = (payload, next) => {
  if (blocked.has(payload.sample.clientId)) return;   // no next() => dropped
  next(payload);
};

observer.addAcceptMiddleware(route, filter);
// observer.removeAcceptMiddleware(route);

This is a lightweight global injection point, distinct from the larger (not-yet-built) ClientSampleProcessor pipeline in the roadmap. When no middleware is registered, accept() dispatches directly with no overhead.

context (the AcceptContext)

type AcceptContext = Record<string, unknown>;

A single, optional, free-form object threaded down the whole accept chain (Observer → Client → PeerConnection). It is transient request-scoped data — temporary or contextual information the application wants available while an update is processed.

context is never written to appData and is not stored on any entity. The two are deliberately distinct:

  • appData — application-assigned extra info that identifies/decorates an entity, fixed at creation (via settings.appData or the createCallAppData / createClientAppData factories), or assigned by the app on the *-added events. The library never changes it.
  • context — passed per accept(), may differ on every call, and is carried straight through to the *-updated events that the accept() triggers, then discarded.

client-updated and peer-connection-updated carry the exact context of that sample; call-updated carries the context of the client accept() that drove the call update (absent for interval- or teardown-driven call updates). When no context is given, the field is absent.

Get-or-create helpers

If you want to create/configure entities yourself before/without samples:

const call = observer.getOrCreateObservedCall({ callId, appData });        // ObservedCall | undefined
const client = call?.getOrCreateObservedClient({ clientId, appData });     // ObservedClient | undefined

These return undefined (and warn) when the parent is closed; createObservedCall/ createObservedClient return the existing instance (and warn) if the id already exists.

Automatic teardown

  • closeClientIfIdleForMs — a client with no sample for this long auto-closes.
  • closeCallIfEmptyForMs — a call with zero clients for this long auto-closes.
  • Closing cascades down (call → clients → peer connections → sub-stats), unsubscribing listeners and emitting the *-closed / *-removed events.

Update policies

"Update" means recompute aggregated metrics and emit the *-updated event at that level. Both the observer and each call have a configurable trigger. Updates are event-driven — there is no built-in timer. An app that wants a fixed cadence can call observer.update() / call.update() from its own setInterval. With 'none', nothing auto-updates — the level updates only when the application calls the public update() itself.

Observer-level (ObserverConfig.updatePolicy, default update-when-all-call-updated):

Policy Triggers observer.update() when…
update-on-any-call-updated any call updates
update-when-all-call-updated every call has updated since the last observer update
none never automatically — only when the app calls observer.update()

Call-level (ObservedCallSettings.updatePolicy, defaulted from ObserverConfig.defaultCallUpdatePolicy):

Policy Triggers call.update() when…
update-on-any-client-updated any client in the call updates
update-when-all-client-updated every client has updated since the last call update
none never automatically — only when the app calls call.update()

The event bus

This is the primary API. Subscribe on the Observer instance — it is the single emitter for the entire hierarchy. The ObservedCall / ObservedClient / ObservedPeerConnection objects are themselves EventEmitters too, but those local events are reserved for internal lifecycle/teardown wiring (see Local lifecycle events); application code should use the Observer bus.

Payload shape: ancestry + subject

Every Observer event delivers exactly one argument: a payload object. The payload always contains the ancestry from the observer down to the entity that raised it, plus any event- specific subject:

type ObserverEventBase            = { observer: Observer };
type ObservedCallScope            = ObserverEventBase            & { observedCall: ObservedCall };
type ObservedClientScope          = ObservedCallScope            & { observedClient: ObservedClient };
type ObservedPeerConnectionScope  = ObservedClientScope          & { observedPeerConnection: ObservedPeerConnection };

So a peer-connection-level event hands you the observer, call, client, and peer connection:

observer.on('inbound-rtp-added', ({ observer, observedCall, observedClient, observedPeerConnection, observedInboundRtp }) => {
  // all five are present and correctly typed
});

observer.on/off/once/emit are fully typed against the event map — the handler argument is inferred per event name.

Event catalogue

All payloads include the ancestry for their level (above). The Extra column lists the additional field(s) on top of that scope.

Observer level — scope { observer }

Event Extra payload Fires when
observer-updated observer.update() ran (per the observer update policy)
observer-closed observer.close()
sample-rejected { reason: 'observer-closed' | 'missing-callId' | 'missing-clientId', sample: ClientSample } a sample was dropped by accept()

Mediasoup level — scope { observer, observedMediasoupRouter }

Event Extra Fires when
mediasoup-router-added observer.createObservedMediasoupRouter(...) registered a router
mediasoup-router-matched-with-call { observedCall } the router was matched to a call — explicitly via callId, or by WebRTC-transport ↔ peer-connection correlation. Emitted once per distinct call; the observer stores nothing — your handler decides what to do
mediasoup-router-removed the underlying mediasoup router closed (its router.observer close fired)

See Mediasoup router observation for the full design and examples.

Call level — scope { observer, observedCall }

Event Extra Fires when
call-added a call is created
call-updated { context?: AcceptContext } call.update() ran
call-closed the call closed
call-empty last client left the call
call-not-empty first client joined a previously-empty call
call-issue { issue: ClientIssue } call.addIssue(...) (server-side detector finding)

Client level — scope { observer, observedCall, observedClient }

Event Extra Fires when
client-added a client is created
client-sink-created { sink: ClientSampleSink } a per-client sink was created (only when createClientSink returns one); fires right after client-added
client-updated { sample: ClientSample, elapsedTimeInMs: number, context?: AcceptContext } the client processed a sample
client-closed the client closed
client-joined first CLIENT_JOINED event seen
client-left CLIENT_LEFT seen (or inferred on close)
client-rejoined { timestamp: number } a later CLIENT_JOINED after an earlier join
client-issue { issue: ClientIssue } a client-reported issue arrived, or client.addIssue(...)
client-metadata { metaData: ClientMetaData } a client meta item arrived
client-extension-stats { extensionStats: ExtensionStat } an app-defined extension stat arrived
client-event { event: ClientEvent } any client event was processed

Peer-connection level — scope { observer, observedCall, observedClient, observedPeerConnection }

Event Extra Notes
peer-connection-added / peer-connection-closed lifecycle of the PC
peer-connection-updated { context?: AcceptContext } the PC processed a sample
ice-connection-state-changed / ice-gathering-state-changed / connection-state-changed { state: string } driven by client events
selected-candidate-pair-changed declared; not currently emitted
inbound-track-added / -updated / -removed / -muted / -unmuted { observedInboundTrack }
outbound-track-added / -updated / -removed / -muted / -unmuted { observedOutboundTrack }
inbound-rtp-added / -updated / -removed { observedInboundRtp } -updated fires every tick
outbound-rtp-added / -updated / -removed { observedOutboundRtp } -updated fires every tick
remote-inbound-rtp-added / -updated / -removed { observedRemoteInboundRtp }
remote-outbound-rtp-added / -updated / -removed { observedRemoteOutboundRtp }
data-channel-added / -updated / -removed { observedDataChannel }
ice-candidate-added / -updated / -removed { observedIceCandidate }
ice-candidate-pair-added / -updated / -removed { observedIceCandidatePair }
ice-transport-added / -updated / -removed { observedIceTransport }
codec-added / -updated / -removed { observedCodec }
media-source-added / -updated / -removed { observedMediaSource }
media-playout-added / -updated / -removed { observedMediaPlayout }
peer-connection-transport-added / -updated / -removed { observedPeerConnectionTransport }
certificate-added / -updated / -removed { observedCertificate }

Volume note. The *-updated sub-stat events fire on every peer-connection accept() (i.e. per sample, per stream). For high-throughput servers, subscribe only to what you need, or read fields off the entities on client-updated / call-updated instead.

Local lifecycle events

These remain on the individual entities (not the bus), for teardown/coordination. You can listen to them, but prefer the bus equivalents above for application logic.

Entity Local events
ObservedCall update, newclient, empty, not-empty, close
ObservedClient update (sample, elapsedTimeInMs), close, joined, left
ObservedPeerConnection removed-inbound-track, removed-outbound-track, close

API reference

Observer

new Observer<AppData>(config?: ObserverConfig<AppData>)

type ObserverConfig<AppData = Record<string, unknown>> = {
    updatePolicy?: 'update-on-any-call-updated' | 'update-when-all-call-updated' | 'none';
    defaultCallUpdatePolicy?: ObservedCallSettings['updatePolicy'];
    appData?: AppData;
    closeClientIfIdleForMs?: number;
    closeCallIfEmptyForMs?: number;
    // appData factories — run when an entity is created without explicit appData
    // (incl. lazily by accept()). appData is application-owned; accept `context` never touches it.
    createCallAppData?: (p: { callId: string; observer: Observer }) => Record<string, unknown>;
    createClientAppData?: (p: { clientId: string; observedCall: ObservedCall }) => Record<string, unknown>;
    // sink factory — produces a per-client sink that receives every accepted sample (see Sinks).
    createClientSink?: (p: { clientId: string; observedCall: ObservedCall }) => ClientSampleSink | undefined;
    // track-resolver factory — produces a call's RemoteTrackResolver (see Remote track resolution).
    createTrackResolver?: (observedCall: ObservedCall) => RemoteTrackResolver | undefined;
  };

appData factories. Instead of pre-creating a call/client (or assigning on call-added / client-added) just to enrich its appData, register a factory once. It runs in the entity's constructor whenever it's created without an explicit settings.appData — including the lazy creation inside accept(). The client factory receives the already-created parent observedCall, so it can derive fields from it. appData is application-owned and is never modified by the accept() context.

const observer = new Observer({
  createCallAppData:   ({ callId })                 => ({ callId, startedAt: Date.now(), region: 'eu' }),
  createClientAppData: ({ clientId, observedCall }) => ({ clientId, region: observedCall.appData.region }),
});

Key members:

  • accept(sample: ClientSample, context?: AcceptContext): void
  • addAcceptMiddleware(...mw: AcceptMiddleware[]): this / removeAcceptMiddleware(...mw): this — global pre-dispatch sample hooks (see Accept middlewares)
  • getObservedCall<T>(callId): ObservedCall<T> | undefined
  • createObservedCall<T>(settings): ObservedCall<T> | undefined
  • getOrCreateObservedCall<T>(settings): ObservedCall<T> | undefined
  • update(): void — force an aggregation/observer-updated tick
  • close(): void
  • readonly observedCalls: Map<string, ObservedCall>
  • readonly observedTURN: ObservedTURN
  • get appData(), get numberOfCalls()
  • counters: numberOfClients, numberOfClientsUsingTurn, numberOfInboundRtpStreams, numberOfOutboundRtpStreams, numberOfDataChannels, numberOfPeerConnections, totalAddedCall, totalRemovedCall, closed
  • on/off/once/emit typed against the event map

ObservedCall

type ObservedCallSettings<AppData = Record<string, unknown>> = {
    updatePolicy?: 'update-on-any-client-updated' | 'update-when-all-client-updated' | 'none';
    callId: string;
    appData?: AppData;
    closeCallIfEmptyForMs?: number;
  };

Key members:

  • readonly callId: string, appData: AppData
  • readonly observedClients: Map<string, ObservedClient>, get numberOfClients()
  • getObservedClient<T>(clientId), createObservedClient<T>(settings), getOrCreateObservedClient<T>(settings) (all … | undefined)
  • addIssue(issue: ClientIssue): void — raise a call-level issue → emits call-issue
  • readonly detectors: Detectors — server-side detector registry (empty by default; see Detectors)
  • scoreCalculator: ScoreCalculator, get score(), readonly calculatedScore
  • remoteTrackResolver?: RemoteTrackResolver — set from ObserverConfig.createTrackResolver at call creation (see Remote track resolution)
  • aggregates: numberOfIssues, numberOfPeerConnections, numberOfInboundRtpStreams, numberOfOutboundRtpStreams, numberOfDataChannels, maxNumberOfClients, clientsUsedTurn: Set<string>, startedAt?, endedAt?, closedAt?, closed
  • update(), close()

ObservedClient

type ObservedClientSettings<AppData = Record<string, unknown>> = {
  clientId: string;
  appData?: AppData;
  closeClientIfIdleForMs?: number;
};

Key members:

  • readonly clientId: string, appData: AppData, readonly call: ObservedCall
  • readonly observedPeerConnections: Map<string, ObservedPeerConnection>
  • readonly sink?: ClientSampleSink — the per-client sink (see Sinks), if createClientSink is configured; listen on it for close/error
  • Injection API (queue app data to be merged into the next sample processing): injectEvent(ClientEvent), injectIssue(ClientIssue), injectMetaData(ClientMetaData), injectExtensionStat(ExtensionStat), injectAttachment(key, value)
  • Direct add API (process immediately): addIssue(ClientIssue), addMetadata(ClientMetaData), addExtensionStats(ExtensionStat)
  • Metrics (current/derived): currentAvgRttInMs?, currentMinRttInMs?, currentMaxRttInMs?, receivingAudioBitrate, receivingVideoBitrate, sendingAudioBitrate, sendingVideoBitrate, usingTURN, usingTCP, availableIncomingBitrate, availableOutgoingBitrate
  • Counts: numberOfInboundRtpStreams, numberOfOutboundRtpStreams, numberOfInbundTracks, numberOfOutboundTracks, numberOfDataChannels, numberOfPeerConnections
  • Per-tick deltas: deltaReceivedAudioBytes, deltaSentAudioBytes, … (see source for the full set)
  • Lifecycle: joinedAt?, leftAt?, closedAt?, closed, get score()
  • Metadata: browser?, engine?, platform?, operationSystem?, mediaDevices, mediaConstraints
  • accept(sample, context?), close()

ObservedPeerConnection

Key members:

  • readonly peerConnectionId: string, readonly client: ObservedClient, appData?
  • The 15 observed* sub-stat Maps (listed above), plus array getters: codecs, inboundRtps, outboundRtps, remoteInboundRtps, remoteOutboundRtps, mediaSources, mediaPlayouts, dataChannels, peerConnectionTransports, iceTransports, iceCandidates, iceCandidatePairs, certificates, selectedIceCandidatePairs, selectedIceCandiadtePairForTurn
  • State: connectionState?, iceConnectionState?, iceGatheringState?, usingTURN, usingTCP
  • Metrics: currentRttInMs?, currentJitter?, availableIncomingBitrate, availableOutgoingBitrate, sending/receiving bitrates, packet rates, and total* / delta* byte/packet counters
  • accept(pcSample, context?), close(), get score()

Remote-RTP correlation (derived). During accept(), receiver/sender reports are linked to the local streams by remoteId (fallback SSRC) and surfaced as fields:

  • on ObservedOutboundRtp: remoteRttInMs?, remoteFractionLost?, remoteJitter?, remotePacketsLost?
  • on ObservedInboundRtp: remoteRttInMs?, remoteBytesSent?, remotePacketsSent?, remoteTimestamp?

These are reset each tick and only set when the matching remote report is present.


Schema types (ClientSample)

The shape of an accepted sample (re-exported from this package; identical to @observertc/schemas). Only the top level is shown — each stat object mirrors the standard WebRTC getStats() dictionaries plus a few extensions.

type ClientSample = {
  timestamp: number;          // client wall-clock (ms epoch)
  callId?: string;            // set by you or the library
  clientId?: string;          // set by you or the library
  score?: number;             // optional client-computed score (0..5)
  attachments?: Record<string, unknown>;
  peerConnections?: PeerConnectionSample[];
  clientEvents?: ClientEvent[];
  clientIssues?: ClientIssue[];
  clientMetaItems?: ClientMetaData[];
  extensionStats?: ExtensionStat[];
};

type PeerConnectionSample = {
  peerConnectionId: string;
  attachments?: Record<string, unknown>;   // e.g. { direction: 'send'|'recv', producerId, consumerId, label }
  score?: number;
  inboundTracks?; outboundTracks?;
  codecs?;
  inboundRtps?; remoteInboundRtps?;
  outboundRtps?; remoteOutboundRtps?;
  mediaSources?; mediaPlayouts?;
  peerConnectionTransports?; dataChannels?;
  iceTransports?; iceCandidates?; iceCandidatePairs?;
  certificates?;
};

type ClientEvent     = { type: string; payload?: string; timestamp?: number; /* +ids */ };
type ClientIssue     = { type: string; payload?: string; timestamp?: number };   // also used for call-issue
type ClientMetaData  = { type: string; payload?: string; timestamp?: number; /* +ids */ };
type ExtensionStat   = { type: string; payload?: string };

payload fields are JSON strings; the library parses the ones it understands.

ClientEventTypes (enum of known event.type values): CLIENT_JOINED, CLIENT_LEFT, PEER_CONNECTION_OPENED/CLOSED/STATE_CHANGED, MEDIA_TRACK_ADDED/REMOVED/MUTED/UNMUTED/RESUMED, ICE_GATHERING_STATE_CHANGED, ICE_CONNECTION_STATE_CHANGED, DATA_CHANNEL_OPEN/CLOSED/ERROR, NEGOTIATION_NEEDED, SIGNALING_STATE_CHANGE, ICE_CANDIDATE, ICE_CANDIDATE_ERROR, and the mediasoup set PRODUCER_* / CONSUMER_* / DATA_PRODUCER_* / DATA_CONSUMER_*.

ClientMetaTypes (enum of known meta type values): MEDIA_CONSTRAINT, MEDIA_DEVICE, MEDIA_DEVICES_SUPPORTED_CONSTRAINTS, USER_MEDIA_ERROR, LOCAL_SDP, OPERATION_SYSTEM, ENGINE, PLATFORM, BROWSER.

Worked example: a real ClientSample

Two consecutive samples from one participant ("Guest" in room qq0iwfnd) of an edumeet/mediasoup call show what actually flows through accept(): a rich join snapshot, then lean steady-state ticks.

Sample 1 — the join snapshot. Carries the one-off lifecycle clientEvents and device clientMetaItems alongside the first stats. (Abbreviated; ids and times are from the real log.)

{
  "timestamp": 1780572332518,
  "callId":   "d3dbf2f5-79be-4cb8-9d43-fb404f07ef27",
  "clientId": "c926983c-4468-4046-ae8c-a9cabe1a1868",
  "score": 0,                                          // no quality measured yet on the join tick
  "attachments": { "displayName": "Guest", "roomId": "qq0iwfnd", "actualSessionId": "d3dbf2f5-…" },

  "clientEvents": [                                     // chronological lifecycle (12 in the real sample)
    { "type": "CLIENT_JOINED",                 "timestamp": 1780572324515 },
    { "type": "PEER_CONNECTION_OPENED",        "timestamp": 1780572326790 },   // pc=b81c8d9d (media)
    { "type": "ICE_GATHERING_STATE_CHANGED",   "timestamp": 1780572326811 },   // → gathering
    { "type": "PEER_CONNECTION_STATE_CHANGED", "timestamp": 1780572326812 },   // → connecting
    { "type": "PRODUCER_ADDED",                "timestamp": 1780572326821 },   // producer=1abdaf82 (audio)
    { "type": "MEDIA_TRACK_ADDED",             "timestamp": 1780572326821 },   // track=36ae42df  (audio)
    { "type": "PEER_CONNECTION_STATE_CHANGED", "timestamp": 1780572326827 },   // → connected
    { "type": "PRODUCER_ADDED",                "timestamp": 1780572326837 },   // producer=ba06a35b (video)
    { "type": "DATA_PRODUCER_CREATED",         "timestamp": 1780572326853 }
  ],

  "clientMetaItems": [                                  // environment & devices, one-off (10 in the real sample)
    { "type": "USER_AGENT_DATA", "payload": "{…Chrome 148 / macOS…}" },
    { "type": "MEDIA_DEVICE",    "payload": "{…\"BRIO 4K Stream Edition\"…}" }
    // …mic / camera / speaker devices…
  ],

  "peerConnections": [
    {
      "peerConnectionId": "b81c8d9d-…",                // the media PC — Guest publishes to the SFU
      "outboundRtps":      [ /* audio + video */ ],
      "outboundTracks":    [ /* mic + camera: label, settings, capabilities */ ],
      "remoteInboundRtps": [ /* RTCP feedback from the SFU */ ],
      "codecs": [ /**/ ], "iceTransports": [ /**/ ],
      "iceCandidatePairs": [ /**/ ], "dataChannels": [ /**/ ]
    },
    { "peerConnectionId": "8635acb7-…", "peerConnectionTransports": [ /**/ ] }  // signaling-only PC
  ]
}

What accept() does with it, in order — each step emits on the bus with full ancestry:

  1. lazily creates the ObservedCallcall-added;
  2. creates the ObservedClientclient-added, then client-joined (from CLIENT_JOINED);
  3. creates an ObservedPeerConnection per entry → peer-connection-added (×2 here);
  4. creates an ObservedOutboundTrack per track → outbound-track-added, plus the matching outbound-rtp-added;
  5. replays the device list as client-metadata events and the lifecycle items as client-event; and finally client-updated for the whole tick.

attachments.roomId lands on observedClient.attachments (read it on client-updated, not at creation — see Ingestion).

Sample 2 — a steady-state tick (~8 s later): same callId / clientId, no new clientEvents or clientMetaItems, just refreshed peerConnections stats. Each PC now scores 5 and the aggregate client score is 4.74 — a healthy call. This is the shape of nearly every sample: each tick refreshes metrics and fires the *-updated events, while the heavy join snapshot happens only once.


Detectors (server-side extension point)

observer-js deliberately ships no built-in detectors. Per-client signals — packet loss, jitter, RTT, freezes, etc. — are already detectable on the client and arrive on samples as clientIssues (surfaced via client-issue). Server-side detection should focus on what only the server can see by correlating data across the clients of a call.

The hook lives on ObservedCall:

import { Observer, Detector } from '@observertc/observer-js';

class MyCrossClientDetector implements Detector {
  readonly name = 'my-detector';
  constructor(private readonly call /* : ObservedCall */) {}
  update() {                                   // called on every call.update()
    // …inspect this.call.observedClients across participants…
    if (/* condition only visible server-side */ false) {
      this.call.addIssue({ type: this.name, payload: JSON.stringify({ /* … */ }), timestamp: Date.now() });
      // → emitted on the bus as 'call-issue'
    }
  }
}

const observer = new Observer();
observer.on('call-added', ({ observedCall }) => {
  observedCall.detectors.add(new MyCrossClientDetector(observedCall));
});
observer.on('call-issue', ({ observedCall, issue }) => { /* react */ });

Detector interface and the registry:

interface Detector { readonly name: string; update(): void; }

class Detectors {
  add(d: Detector): void;
  remove(d: Detector): void;
  clear(): void;
  update(): void;          // called by ObservedCall.update(); guards each detector in try/catch
  get listOfNames(): string[];
}

Remote track resolution (mediasoup / SFU)

In an SFU, one participant's outbound track is delivered to other participants as inbound tracks (one publisher → many subscribers). Correlation is opt-in per observer: set ObserverConfig.createTrackResolver, a factory invoked when each call is created that returns the call's RemoteTrackResolver (or undefined for none).

RemoteTrackResolver is a generic, strategy-driven class. It subscribes to the bus (filtered to its call) and links tracks by publisher id — the link key — maintaining the links directly on the tracks: inboundTrack.remoteOutboundTrack and outboundTrack.remoteInboundTracks: Set.

import { Observer, createDefaultMediasoupRemoteTrackResolverFactory } from '@observertc/observer-js';

const observer = new Observer({
  createTrackResolver: createDefaultMediasoupRemoteTrackResolverFactory(),
});

// later, given tracks (links are kept up to date as tracks come and go):
const source    = inboundTrack.remoteOutboundTrack;       // the publishing ObservedOutboundTrack
const receivers = [ ...outboundTrack.remoteInboundTracks ]; // the subscribing ObservedInboundTrack[]

Two built-in factories ship: createDefaultMediasoupRemoteTrackResolverFactory() (publisher = attachments.producerId, subscriber = attachments.consumerId) and createP2pRemoteTrackResolverFactory() (matches by RTP SSRC, preserved end-to-end in p2p).

For any other topology, build a RemoteTrackResolver with your own key resolvers — the publisher id is just whatever links a subscribed track to the published one:

import { Observer, RemoteTrackResolver } from '@observertc/observer-js';

const observer = new Observer({
  createTrackResolver: (observedCall) => new RemoteTrackResolver(observedCall, {
    resolveOutboundTrackPublisherId: (out) => out.attachments?.mediaId as string | undefined,
    resolveInboundTrackPublisherId:  (inb) => inb.attachments?.mediaId as string | undefined,
    resolveInboundTrackSubscriberId: (inb) => inb.attachments?.subId  as string | undefined, // optional
  }),
});

For the mediasoup factory, the application puts producerId / consumerId (and optionally direction, label) into the track attachments.


Mediasoup router observation

Everything above is built from the client-reported ClientSample. When you run a mediasoup SFU you also have the server's own ground truth — its routers, transports, producers, consumers and data channels, with exact lifetimes and state transitions. ObservedMediasoupRouter captures that server-side view into a MediasoupRouterSample, completely independent of the client sample pipeline.

The concept

You hand the observer a live mediasoup Router; it attaches to mediasoup's own observer API and, from then on, passively records the router's topology and lifecycle — with no polling and no changes to your media code:

  • new transports (webrtc / plain / pipe / direct), their selected tuple, and ICE state transitions;
  • producers (codec, SSRCs/RIDs, pause/resume) and consumers (pause/resume, producerPaused/producerResumed);
  • data producers and data consumers;
  • createdAt / closedAt for every entity above.

All of it accumulates on observedMediasoupRouter.sample (a MediasoupRouterSample — see src/schema/MediasoupRouter.ts). This is a Sample, not a Report: it mirrors the naming of ClientSample and is yours to snapshot, persist, or correlate.

Matching a router to a call — by event, not by storage

A router belongs to one or more calls, but the observer does not store the router (or its sample) on the ObservedCall. Instead, when a match is found it emits mediasoup-router-matched-with-call and steps back — your application decides what the pairing means. Stamp the callId into the router's appData, build your own index, attach the sample to the call in your database — whatever fits your system. The library stays unopinionated and loosely coupled.

There are two ways a match is discovered, controlled by the settings you pass to createObservedMediasoupRouter:

  • Explicit (callId) — you already know the call. If that call currently exists, mediasoup-router-matched-with-call fires immediately.
  • Implicit (bindCallByWebRtcTransportId: true) — let the observer discover it. As peer connections are observed (peer-connection-added), the observer checks whether the peer connection's id is one of the router's WebRTC transport ids. On a hit, it's a match. It emits once per distinct call, and keeps watching so additional calls sharing the router can still match later. The internal listener is removed automatically when the router closes or the observer closes.

When the underlying mediasoup router closes, its close propagates to ObservedMediasoupRouter, which emits mediasoup-router-removed. That is your cue to do whatever cleanup or persistence you want with the now-final sample — again, the observer itself keeps nothing.

Options — observer.createObservedMediasoupRouter(settings)

Field Type Required Meaning
router mediasoup.types.Router yes the live router to observe; the observer attaches to router.observer
routerId string yes your id for the router (the sample also carries router.id)
appData Record<string, unknown> no application-owned bag on the ObservedMediasoupRouter (e.g. where you record the matched callId)
attachments Record<string, unknown> no free-form data copied onto sample.attachments
callId string no explicit match: emit mediasoup-router-matched-with-call now if this call exists
bindCallByWebRtcTransportId boolean no implicit match: discover the call(s) by correlating WebRTC transport ids with peer-connection ids

Returns the ObservedMediasoupRouter, or undefined if the observer is closed (a router with the same id returns the existing instance — both warn).

Useful members on the returned object: .sample (the MediasoupRouterSample), .appData, .attachments (getter over sample.attachments), .webrtcTransportIds: Set<string>, .id, .close().

Example

import { Observer } from '@observertc/observer-js';
import type { ObservedMediasoupRouterScope, ObservedCallScope } from '@observertc/observer-js';

const observer = new Observer();

// 1) Feed client samples as usual so the observer knows about calls & peer connections.
//    (e.g. transport-layer: observer.accept(clientSample, context))

// 2) Observe the SFU side. Let the observer discover which call this router serves.
const router = /* your mediasoup router */ undefined as any;
const observedRouter = observer.createObservedMediasoupRouter({
  router,
  routerId: router.id,
  bindCallByWebRtcTransportId: true, // discover the call by peer-connection correlation
  appData: {},                        // we'll record the matched callId here
});

// 3) The observer found a call for the router — WE decide what to do with the pairing.
observer.on('mediasoup-router-matched-with-call',
  ({ observedMediasoupRouter, observedCall }: ObservedMediasoupRouterScope & ObservedCallScope) => {
    // e.g. remember the association on the router's appData…
    observedMediasoupRouter.appData.callId = observedCall.callId;
    // …or attach the live server sample to the call in your own store:
    myStore.linkRouterToCall(observedCall.callId, observedMediasoupRouter.sample);
  },
);

// 4) The router closed — WE decide what to persist/forward with the final sample.
observer.on('mediasoup-router-removed', ({ observedMediasoupRouter }: ObservedMediasoupRouterScope) => {
  const callId = observedMediasoupRouter.appData.callId as string | undefined;
  myStore.saveRouterSample(callId, observedMediasoupRouter.sample);
});

// (optional) react to the router being registered at all:
observer.on('mediasoup-router-added', ({ observedMediasoupRouter }) => {
  console.log('observing router', observedMediasoupRouter.id);
});

If you already know the call, skip discovery and match explicitly:

observer.createObservedMediasoupRouter({ router, routerId: router.id, callId });
// → `mediasoup-router-matched-with-call` fires immediately if `callId` is a known call

Why event-driven instead of storing on the call

  • Loose coupling. The call model stays about client telemetry; the SFU view lives on its own object and is associated only if and how you choose.
  • You own the association. One router may serve multiple calls, a call may be served by multiple routers, and the right place to keep that mapping is application-specific — so the observer hands you the match and the final sample and gets out of the way.
  • No silent accumulation. Nothing is appended to ObservedCall, so there is no hidden growth or lifetime you have to reason about; the router sample lives exactly as long as you keep a reference.

Sinks (per-client sample persistence)

A sink receives the samples a client accepts — for archival, streaming, or later offline replay. Each ObservedClient gets its own sink, produced by the ObserverConfig.createClientSink factory when the client is created (return undefined for no sink). The client pushes every accepted sample to its sink, and end()s it on close.

The ClientSampleSink base class

ClientSampleSink is an abstract base class (a typed EventEmitter). You create a sink by subclassing it and implementing write and end. It is object-mode: write receives the ClientSample object, so each sink decides how (or whether) to serialize it — JSON line, protobuf, a remote POST body, an in-memory push, etc.

import { ClientSampleSink, ClientSample } from '@observertc/observer-js';

abstract class ClientSampleSink /* extends EventEmitter */ {
  abstract write(sample: ClientSample): boolean;   // accept one sample; `false` = backpressure
  abstract end(): void;                            // flush; emit `close` when the destination is ready

  // typed events (inherited): the listener signature is inferred from the event name
  on(event: 'close' | 'finish' | 'drain', listener: () => void): this;
  on(event: 'error', listener: (err: Error) => void): this;
  // ...and the matching `once` / `off` / `emit`
}
Event Meaning
close the destination is fully written and closed (e.g. a file flushed and its fd closed) — "ready"
error the destination failed
finish end() was processed and queued data flushed (before close)
drain the buffer drained after backpressure; safe to write more

The library calls write(sample) synchronously per accepted sample (it is not awaited), end()s the sink when the client closes, and attaches an error listener so a failing sink can't crash the process (it also catches throws from write/end). The application — which created the sink — listens for close (destination ready) and error. Because write isn't awaited in the accept() hot path, backpressure and batching are the sink's concern.

Built-in sinks

import { Observer, createJsonlFileSinkFactory } from '@observertc/observer-js';

const observer = new Observer({
  // one ./stats/<callId>__<clientId>.jsonl per client
  createClientSink: createJsonlFileSinkFactory({ directory: './stats' }),
});

// React when a sink is created for a client:
observer.on('client-sink-created', ({ observedClient, sink }) => {
  sink.on('close', () => {
    // the file is fully flushed and its fd closed — ready to upload, move, etc.
  });
});
Export Signature Notes
createJsonlFileSinkFactory ({ directory, flags?, getFileName?, serializeSample? }) => ClientSampleSinkFactory per-client JSONL files; path defaults to ${callId}__${clientId}.jsonl under directory (which must exist)
createJsonlFileSink ({ path, flags?, serializeSample? }) => ClientSampleSink a single JSONL file; wraps fs.WriteStream and re-emits its close/finish/drain/error
JsonlFileSink class extends ClientSampleSink the underlying class; exposes readonly path so a close handler knows which file is ready
createInMemorySink / InMemorySink (samples?: ClientSample[]) => InMemorySink collects the accepted sample objects into .samples: ClientSample[]; emits close on end()

serializeSample?: (sample: ClientSample) => string overrides the default JSON.stringify for the JSONL sinks (e.g. to redact or reshape before writing).

Reading sink-specific info (e.g. the file path)

The bus hands you the sink as the base ClientSampleSink. To read information specific to a sink type — for a file sink, where it was written — narrow with instanceof and read the sink's public fields. JsonlFileSink exposes path:

import { JsonlFileSink } from '@observertc/observer-js';

observer.on('client-sink-created', ({ observedClient, sink }) => {
  if (sink instanceof JsonlFileSink) {
    const { path } = sink;                          // the file this client's samples go to
    sink.once('close', () => uploadFile(path));     // close = flushed & fd closed → ready
  }
});

The general pattern: each concrete sink exposes whatever it wants as public readonly fields, and consumers narrow (instanceof YourSink) to read them. Your own sinks do the same.

Writing your own sink

Subclass ClientSampleSink and emit the lifecycle events yourself — for any non-file destination (a remote endpoint, a message queue, an object store, …):

import { ClientSampleSink, ClientSample, ClientSampleSinkFactory } from '@observertc/observer-js';

class HttpSink extends ClientSampleSink {
  private buffer: ClientSample[] = [];
  constructor(private readonly url: string) { super(); }

  write(sample: ClientSample): boolean {
    this.buffer.push(sample);                  // batch; decide your own backpressure
    return true;
  }
  end(): void {
    fetch(this.url, { method: 'POST', body: JSON.stringify(this.buffer) })
      .then(() => this.emit('close'))          // signal "destination ready"
      .catch((err) => this.emit('error', err));
  }
}

const createClientSink: ClientSampleSinkFactory = ({ clientId, observedCall }) =>
  new HttpSink(`https://stats.example.com/${observedCall.callId}/${clientId}`);

const observer = new Observer({ createClientSink });

observedClient.sink? exposes the created sink; the client-sink-created event delivers it on the bus with full ancestry. ClientSampleSinkFactory is (p: { clientId: string; observedCall: ObservedCall }) => ClientSampleSink | undefined.

Logging

observer-js logs through a single, swappable sink. Out of the box it writes debug and above to console (verbose — install your own sink for production). Funnel everything into your logger:

import { setObserverLogger, type ObserverLogger } from '@observertc/observer-js';

setObserverLogger({
  trace: (m, ...a) => myLogger.trace(`[${m}]`, ...a),
  debug: (m, ...a) => myLogger.debug(`[${m}]`, ...a),
  info:  (m, ...a) => myLogger.info(`[${m}]`, ...a),
  warn:  (m, ...a) => myLogger.warn(`[${m}]`, ...a),
  error: (m, ...a) => myLogger.error(`[${m}]`, ...a),
});

createLogger(moduleName) is also exported for your own modules. See docs/logging.md for pino / winston / console recipes, level filtering, per-module routing, and full silencing.


Error-handling philosophy

The library warns and degrades; it does not throw on operational problems:

  • createObservedCall / createObservedClient on a closed parent → warn + return undefined.
  • Duplicate id → warn + return the existing instance.
  • accept() on a closed client → warn + no-op.
  • Sample missing callId/clientId, or observer closed → sample-rejected event.
  • A throwing accept-middleware → warn + drop that sample (never crashes accept()).

Therefore create* and getOrCreate* return T | undefined; guard the result. The Middleware utility's internal invariants (e.g. calling next() twice) throw, but those throws are caught by accept() and surfaced as a warning.


Development & extension guide

yarn install
yarn build       # tsup → dist/ (dual ESM .mjs + CJS .js, single entry, .d.ts/.d.mts + sourcemaps)
yarn lint        # eslint -c .eslintrc.json "src/**/*.ts"
yarn typecheck   # tsc --noEmit
yarn test        # jest

The build is driven by tsup (config in tsup.config.ts): a single entry (src/index.ts), dual ESM + CommonJS output to dist/ (index.mjs / index.js) with .d.mts / .d.ts types and sourcemaps, targeting Node 20. CI (.github/workflows/ci.yml) runs lint + typecheck + build + test on every push/PR.

Project layout (src/): Observer.ts, ObservedCall.ts, ObservedClient.ts, ObservedPeerConnection.ts, the Observed* sub-stat classes, ObserverEvents.ts (the typed event map + scope types), detectors/ (Detector, Detectors), scores/, updaters/ (update-policy strategies), utils/ (remote-track resolvers), common/ (logger, utils, Middleware), schema/ (sample/event/meta types), and sinks/ (the ClientSampleSink base + JsonlFileSink / InMemorySink, re-exported from the package root).

Conventions to follow when developing further:

  • Single event bus. New consumer-facing events go in ObserverEvents.ts with an object payload [<Scope> & { …subject }], and are emitted via the component's _notify(type, { ...this.eventScope, …subject }). Each component has a precomputed eventScope field and a thin _notify wrapper around the right emitter. Keep purely internal coordination as local EventEmitter events (and remember to off them on close).
  • Warn, don't throw on operational/edge conditions; return undefined where a value can't be produced.
  • Counter-reset-safe deltas. When computing a delta from a cumulative counter, never emit a negative value (guard curr >= prev), to survive counter resets / SSRC reuse.
  • Explicit accumulation. The per-sample metric accumulation in accept() is intentionally explicit and not abstracted — match that style.
  • Detectors are server-side. Add cross-client detectors on ObservedCall.detectors; don't re-implement client-detectable signals.

Recipes:

  • Add an event: add the key + payload to ObserverEvents; in the owning component call this._notify('my-event', { ...this.eventScope, subject }).
  • Add a per-stream metric: add the field to the relevant Observed*Rtp/track class, populate it in its update() (reset at the top of update() if it's per-tick), and read it from a *-updated handler.
  • Add a detector: implement Detector, register it on call-added via observedCall.detectors.add(...), surface findings with observedCall.addIssue(...).

License

Apache-2.0. Part of the ObserverTC ecosystem.

About

Server-side component to monitor WebRTC stack

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors