In one line: feed it WebRTC
getStats()snapshots, and get back a live, queryable model of every call plus a single typed event stream to react to.
observer-js is a server-side Node.js library for monitoring WebRTC sessions. A WebRTC
application (typically an SFU or a signaling/stats backend) feeds it ClientSample objects —
periodic snapshots of each participant's RTCPeerConnection.getStats() output plus
application events — and observer-js maintains a live, in-memory model of every call,
participant, peer connection, and media stream, derives per-interval and cumulative metrics,
and emits a single, unified stream of typed events the application can react to.
What you can do with it:
- Monitor calls live — a queryable in-memory tree of every call, client, peer connection, track, codec, ICE candidate and data channel, each holding current and cumulative metrics.
- React on one event bus — subscribe once on the
Observer; every payload carries its full ancestry (call → client → peer connection → stat), so you never walk the tree to subscribe. - Get derived metrics for free — counter-reset-safe per-tick deltas, bitrates, jitter, RTT, fraction-lost, remote-RTP (RTCP) correlation, and TURN/TCP usage from the selected candidate pair.
- Correlate across an SFU — link a publisher's outbound track to every subscriber's inbound
track (
RemoteTrackResolver), and observe mediasoup routers/transports/producers/consumers on the server side. - Detect server-only problems — cross-client
Detectors raisecall-issues for conditions no single client can see (e.g. everyone in a call degrading at once). - Persist every sample — per-client sinks (JSONL file, in-memory, or your own) for archival, streaming, and offline replay.
- Drop it in safely — warn-don't-throw, a pluggable logger, dual ESM + CommonJS, and no media-stack dependency in the core.
Status:
1.0.0-beta. The API described here is current and intended to be implemented against directly. This document is written to be self-sufficient: an engineer (or an AI agent) should be able to integrate the library, or develop it further, from this file alone. A companion doc,docs/logging.md, covers logging integration in depth.
Packaging: server-side, Node.js ≥ 22, shipped as a dual ESM + CommonJS build — so it works whether your project uses
import(ESM) orrequire()(CommonJS). Everything — including the built-in file sink — is exported from the single@observertc/observer-jsentry.
- Installation
- Quick start
- Data flow
- Entity hierarchy
- Ingestion:
accept(), context & lifecycle - Update policies
- The event bus ← the core of the API
- API reference
- Schema types (
ClientSample) - Detectors (server-side extension point)
- Remote track resolution (mediasoup / SFU)
- Mediasoup router observation
- Sinks (per-client sample persistence)
- Logging
- Error-handling philosophy
- Development & extension guide
npm install @observertc/observer-js
# or
yarn add @observertc/observer-jsServer-side, Node.js ≥ 22, dual ESM + CommonJS. The package ships both module formats, so it works the same whether your project is ESM or CommonJS — your import line is unchanged either way:
import { Observer, ClientSample, createJsonlFileSinkFactory } from '@observertc/observer-js';In an ESM project this resolves to the .mjs build; in a CommonJS project (where TypeScript
compiles your import down to require()) it resolves to the .js build. Everything is exported
from the single @observertc/observer-js entry. Written in TypeScript; ships type declarations for
both formats (dist/index.d.ts for require, dist/index.d.mts for import). Runtime
dependencies: @bufbuild/protobuf, events, uuid. The library does not bundle a logger or
any transport — see Logging.
ClientSample and friends are re-exported from this package, and are also published as the
shared schema in @observertc/schemas; samples
produced on the client (e.g. by @observertc/client-monitor-js) conform to the same shape.
import { Observer, ClientSample } from '@observertc/observer-js';
// 1. Create an observer.
const observer = new Observer({
// when the observer aggregates call/client metrics:
updatePolicy: 'update-when-all-call-updated',
// default policy applied to calls created automatically by accept():
defaultCallUpdatePolicy: 'update-on-any-client-updated',
// optional auto-teardown:
closeCallIfEmptyForMs: 20_000,
closeClientIfIdleForMs: 60_000,
});
// 2. Subscribe on the single bus. Every payload is an object with the ancestry.
observer.on('call-added', ({ observedCall }) => {
console.log('new call', observedCall.callId);
});
observer.on('client-issue', ({ observedClient, issue }) => {
console.warn(`[${observedClient.clientId}] ${issue.type}`, issue.payload);
});
observer.on('peer-connection-updated', ({ observedClient, observedPeerConnection }) => {
console.log(observedClient.clientId, 'RTT(ms):', observedPeerConnection.currentRttInMs);
});
observer.on('sample-rejected', ({ reason, sample }) => {
console.warn('dropped a sample:', reason);
});
// 3. Feed samples. `context` (optional) is transient per-accept data, carried to the
// `*-updated` events this accept triggers (never written to appData).
function onClientStats(sample: ClientSample) {
observer.accept(sample, { studioVersion: '1.2.3' });
}
// 4. Tear down.
process.on('SIGINT', () => observer.close());client getStats() ──► ClientSample ──► observer.accept(sample, ctx?)
│
┌────────────────────────────────┘
▼
get-or-create ObservedCall ──► get-or-create ObservedClient ──► client.accept(sample, ctx)
│
per peerConnections[] in the sample
▼
get-or-create ObservedPeerConnection
.accept(pcSample, ctx) updates all sub-stats,
derives deltas/bitrates/RTT, correlates remote RTP
│
metrics roll up: PeerConnection → Client → Call → Observer
│
events emitted on the Observer bus ──► your handlers
- A sample must have
callIdandclientId(the library sets them, or the app does). If either is missing, the sample is dropped andsample-rejectedis emitted. - Sub-entities that stop appearing in samples are garbage-collected via a "visited"
mark-and-sweep on each
ObservedPeerConnection.accept(), emitting the corresponding*-removedevents.
| Class | Created by | Keyed on its parent as | Holds |
|---|---|---|---|
Observer |
new Observer(config?) |
— (root) | observedCalls: Map<string, ObservedCall>, global counters, the event bus |
ObservedCall |
observer.createObservedCall(settings) / lazily by accept |
observedCalls |
observedClients: Map<string, ObservedClient>, call-wide metrics, detectors, scoreCalculator |
ObservedClient |
call.createObservedClient(settings) / lazily |
observedClients |
observedPeerConnections: Map<string, ObservedPeerConnection>, per-client metrics |
ObservedPeerConnection |
lazily, from sample.peerConnections[] |
observedPeerConnections |
the 15 sub-stat maps below, transport/RTT/bitrate metrics |
| Sub-stats | lazily, from the PeerConnectionSample |
maps on the PC | individual WebRTC stat objects |
ObservedPeerConnection sub-stat maps (all public readonly):
observedCertificates, observedCodecs, observedDataChannels,
observedIceCandidates, observedIceCandidatesPair, observedIceTransports,
observedInboundRtps, observedInboundTracks, observedMediaPlayouts,
observedMediaSources, observedOutboundRtps, observedOutboundTracks,
observedPeerConnectionTransports, observedRemoteInboundRtps, observedRemoteOutboundRtps
Each sub-stat class (ObservedInboundRtp, ObservedOutboundRtp, ObservedInboundTrack,
ObservedOutboundTrack, ObservedDataChannel, ObservedIceCandidate,
ObservedIceCandidatePair, ObservedIceTransport, ObservedCertificate, ObservedCodec,
ObservedMediaSource, ObservedMediaPlayout, ObservedPeerConnectionTransport,
ObservedRemoteInboundRtp, ObservedRemoteOutboundRtp) mirrors the corresponding stat
fields from the schema plus derived fields (deltas, bitrates).
The single entry point. It:
- drops + emits
sample-rejectedif the observer is closed; - runs the sample through the global accept-middleware chain (see below);
- (chain terminal) drops + emits
sample-rejectedifcallId/clientIdis missing; - gets or lazily creates the
ObservedCallandObservedClient(theirappDatacomes from the configured factories, never fromcontext); - delegates to
client.accept(sample, context), which fans out to eachObservedPeerConnection.accept(pcSample, context).
observer.addAcceptMiddleware(...) registers middlewares run on every sample inside
accept(), in order, before the sample is dispatched to any call or client. Each middleware
gets a { sample, context } payload; it can inspect or mutate the sample (set/normalize
callId/clientId, enrich, redact) or the context, then call next(payload) to continue.
Not calling next drops the sample — nothing is created and no event fires. A throwing
middleware is caught and warns (the sample is dropped), never crashing accept().
import { Observer, AcceptMiddleware } from '@observertc/observer-js';
const observer = new Observer();
// derive callId/clientId from the app's own attachment, before dispatch
const route: AcceptMiddleware = ({ sample }, next) => {
sample.callId ??= sample.attachments?.roomId as string;
sample.clientId ??= sample.attachments?.peerId as string;
next({ sample });
};
// drop samples from a blocklisted client (never dispatched)
const filter: AcceptMiddleware = (payload, next) => {
if (blocked.has(payload.sample.clientId)) return; // no next() => dropped
next(payload);
};
observer.addAcceptMiddleware(route, filter);
// observer.removeAcceptMiddleware(route);This is a lightweight global injection point, distinct from the larger (not-yet-built)
ClientSampleProcessor pipeline in the roadmap. When no middleware is registered, accept()
dispatches directly with no overhead.
type AcceptContext = Record<string, unknown>;A single, optional, free-form object threaded down the whole accept chain
(Observer → Client → PeerConnection). It is transient request-scoped data — temporary or
contextual information the application wants available while an update is processed.
context is never written to appData and is not stored on any entity. The two are
deliberately distinct:
appData— application-assigned extra info that identifies/decorates an entity, fixed at creation (viasettings.appDataor thecreateCallAppData/createClientAppDatafactories), or assigned by the app on the*-addedevents. The library never changes it.context— passed peraccept(), may differ on every call, and is carried straight through to the*-updatedevents that theaccept()triggers, then discarded.
client-updated and peer-connection-updated carry the exact context of that sample;
call-updated carries the context of the client accept() that drove the call update (absent
for interval- or teardown-driven call updates). When no context is given, the field is absent.
If you want to create/configure entities yourself before/without samples:
const call = observer.getOrCreateObservedCall({ callId, appData }); // ObservedCall | undefined
const client = call?.getOrCreateObservedClient({ clientId, appData }); // ObservedClient | undefinedThese return undefined (and warn) when the parent is closed; createObservedCall/
createObservedClient return the existing instance (and warn) if the id already exists.
closeClientIfIdleForMs— a client with no sample for this long auto-closes.closeCallIfEmptyForMs— a call with zero clients for this long auto-closes.- Closing cascades down (call → clients → peer connections → sub-stats), unsubscribing
listeners and emitting the
*-closed/*-removedevents.
"Update" means recompute aggregated metrics and emit the *-updated event at that level.
Both the observer and each call have a configurable trigger. Updates are event-driven — there
is no built-in timer. An app that wants a fixed cadence can call observer.update() /
call.update() from its own setInterval. With 'none', nothing auto-updates — the level
updates only when the application calls the public update() itself.
Observer-level (ObserverConfig.updatePolicy, default update-when-all-call-updated):
| Policy | Triggers observer.update() when… |
|---|---|
update-on-any-call-updated |
any call updates |
update-when-all-call-updated |
every call has updated since the last observer update |
none |
never automatically — only when the app calls observer.update() |
Call-level (ObservedCallSettings.updatePolicy, defaulted from
ObserverConfig.defaultCallUpdatePolicy):
| Policy | Triggers call.update() when… |
|---|---|
update-on-any-client-updated |
any client in the call updates |
update-when-all-client-updated |
every client has updated since the last call update |
none |
never automatically — only when the app calls call.update() |
This is the primary API. Subscribe on the Observer instance — it is the single emitter
for the entire hierarchy. The ObservedCall / ObservedClient / ObservedPeerConnection
objects are themselves EventEmitters too, but those local events are reserved for internal
lifecycle/teardown wiring (see Local lifecycle events); application
code should use the Observer bus.
Every Observer event delivers exactly one argument: a payload object. The payload always contains the ancestry from the observer down to the entity that raised it, plus any event- specific subject:
type ObserverEventBase = { observer: Observer };
type ObservedCallScope = ObserverEventBase & { observedCall: ObservedCall };
type ObservedClientScope = ObservedCallScope & { observedClient: ObservedClient };
type ObservedPeerConnectionScope = ObservedClientScope & { observedPeerConnection: ObservedPeerConnection };So a peer-connection-level event hands you the observer, call, client, and peer connection:
observer.on('inbound-rtp-added', ({ observer, observedCall, observedClient, observedPeerConnection, observedInboundRtp }) => {
// all five are present and correctly typed
});observer.on/off/once/emit are fully typed against the event map — the handler argument is
inferred per event name.
All payloads include the ancestry for their level (above). The Extra column lists the additional field(s) on top of that scope.
| Event | Extra payload | Fires when |
|---|---|---|
observer-updated |
— | observer.update() ran (per the observer update policy) |
observer-closed |
— | observer.close() |
sample-rejected |
{ reason: 'observer-closed' | 'missing-callId' | 'missing-clientId', sample: ClientSample } |
a sample was dropped by accept() |
| Event | Extra | Fires when |
|---|---|---|
mediasoup-router-added |
— | observer.createObservedMediasoupRouter(...) registered a router |
mediasoup-router-matched-with-call |
{ observedCall } |
the router was matched to a call — explicitly via callId, or by WebRTC-transport ↔ peer-connection correlation. Emitted once per distinct call; the observer stores nothing — your handler decides what to do |
mediasoup-router-removed |
— | the underlying mediasoup router closed (its router.observer close fired) |
See Mediasoup router observation for the full design and examples.
| Event | Extra | Fires when |
|---|---|---|
call-added |
— | a call is created |
call-updated |
{ context?: AcceptContext } |
call.update() ran |
call-closed |
— | the call closed |
call-empty |
— | last client left the call |
call-not-empty |
— | first client joined a previously-empty call |
call-issue |
{ issue: ClientIssue } |
call.addIssue(...) (server-side detector finding) |
| Event | Extra | Fires when |
|---|---|---|
client-added |
— | a client is created |
client-sink-created |
{ sink: ClientSampleSink } |
a per-client sink was created (only when createClientSink returns one); fires right after client-added |
client-updated |
{ sample: ClientSample, elapsedTimeInMs: number, context?: AcceptContext } |
the client processed a sample |
client-closed |
— | the client closed |
client-joined |
— | first CLIENT_JOINED event seen |
client-left |
— | CLIENT_LEFT seen (or inferred on close) |
client-rejoined |
{ timestamp: number } |
a later CLIENT_JOINED after an earlier join |
client-issue |
{ issue: ClientIssue } |
a client-reported issue arrived, or client.addIssue(...) |
client-metadata |
{ metaData: ClientMetaData } |
a client meta item arrived |
client-extension-stats |
{ extensionStats: ExtensionStat } |
an app-defined extension stat arrived |
client-event |
{ event: ClientEvent } |
any client event was processed |
| Event | Extra | Notes |
|---|---|---|
peer-connection-added / peer-connection-closed |
— | lifecycle of the PC |
peer-connection-updated |
{ context?: AcceptContext } |
the PC processed a sample |
ice-connection-state-changed / ice-gathering-state-changed / connection-state-changed |
{ state: string } |
driven by client events |
selected-candidate-pair-changed |
— | declared; not currently emitted |
inbound-track-added / -updated / -removed / -muted / -unmuted |
{ observedInboundTrack } |
|
outbound-track-added / -updated / -removed / -muted / -unmuted |
{ observedOutboundTrack } |
|
inbound-rtp-added / -updated / -removed |
{ observedInboundRtp } |
-updated fires every tick |
outbound-rtp-added / -updated / -removed |
{ observedOutboundRtp } |
-updated fires every tick |
remote-inbound-rtp-added / -updated / -removed |
{ observedRemoteInboundRtp } |
|
remote-outbound-rtp-added / -updated / -removed |
{ observedRemoteOutboundRtp } |
|
data-channel-added / -updated / -removed |
{ observedDataChannel } |
|
ice-candidate-added / -updated / -removed |
{ observedIceCandidate } |
|
ice-candidate-pair-added / -updated / -removed |
{ observedIceCandidatePair } |
|
ice-transport-added / -updated / -removed |
{ observedIceTransport } |
|
codec-added / -updated / -removed |
{ observedCodec } |
|
media-source-added / -updated / -removed |
{ observedMediaSource } |
|
media-playout-added / -updated / -removed |
{ observedMediaPlayout } |
|
peer-connection-transport-added / -updated / -removed |
{ observedPeerConnectionTransport } |
|
certificate-added / -updated / -removed |
{ observedCertificate } |
Volume note. The
*-updatedsub-stat events fire on every peer-connectionaccept()(i.e. per sample, per stream). For high-throughput servers, subscribe only to what you need, or read fields off the entities onclient-updated/call-updatedinstead.
These remain on the individual entities (not the bus), for teardown/coordination. You can listen to them, but prefer the bus equivalents above for application logic.
| Entity | Local events |
|---|---|
ObservedCall |
update, newclient, empty, not-empty, close |
ObservedClient |
update (sample, elapsedTimeInMs), close, joined, left |
ObservedPeerConnection |
removed-inbound-track, removed-outbound-track, close |
new Observer<AppData>(config?: ObserverConfig<AppData>)
type ObserverConfig<AppData = Record<string, unknown>> = {
updatePolicy?: 'update-on-any-call-updated' | 'update-when-all-call-updated' | 'none';
defaultCallUpdatePolicy?: ObservedCallSettings['updatePolicy'];
appData?: AppData;
closeClientIfIdleForMs?: number;
closeCallIfEmptyForMs?: number;
// appData factories — run when an entity is created without explicit appData
// (incl. lazily by accept()). appData is application-owned; accept `context` never touches it.
createCallAppData?: (p: { callId: string; observer: Observer }) => Record<string, unknown>;
createClientAppData?: (p: { clientId: string; observedCall: ObservedCall }) => Record<string, unknown>;
// sink factory — produces a per-client sink that receives every accepted sample (see Sinks).
createClientSink?: (p: { clientId: string; observedCall: ObservedCall }) => ClientSampleSink | undefined;
// track-resolver factory — produces a call's RemoteTrackResolver (see Remote track resolution).
createTrackResolver?: (observedCall: ObservedCall) => RemoteTrackResolver | undefined;
};appData factories. Instead of pre-creating a call/client (or assigning on call-added /
client-added) just to enrich its appData, register a factory once. It runs in the entity's
constructor whenever it's created without an explicit settings.appData — including the lazy
creation inside accept(). The client factory receives the already-created parent
observedCall, so it can derive fields from it. appData is application-owned and is never
modified by the accept() context.
const observer = new Observer({
createCallAppData: ({ callId }) => ({ callId, startedAt: Date.now(), region: 'eu' }),
createClientAppData: ({ clientId, observedCall }) => ({ clientId, region: observedCall.appData.region }),
});Key members:
accept(sample: ClientSample, context?: AcceptContext): voidaddAcceptMiddleware(...mw: AcceptMiddleware[]): this/removeAcceptMiddleware(...mw): this— global pre-dispatch sample hooks (see Accept middlewares)getObservedCall<T>(callId): ObservedCall<T> | undefinedcreateObservedCall<T>(settings): ObservedCall<T> | undefinedgetOrCreateObservedCall<T>(settings): ObservedCall<T> | undefinedupdate(): void— force an aggregation/observer-updatedtickclose(): voidreadonly observedCalls: Map<string, ObservedCall>readonly observedTURN: ObservedTURNget appData(),get numberOfCalls()- counters:
numberOfClients,numberOfClientsUsingTurn,numberOfInboundRtpStreams,numberOfOutboundRtpStreams,numberOfDataChannels,numberOfPeerConnections,totalAddedCall,totalRemovedCall,closed on/off/once/emittyped against the event map
type ObservedCallSettings<AppData = Record<string, unknown>> = {
updatePolicy?: 'update-on-any-client-updated' | 'update-when-all-client-updated' | 'none';
callId: string;
appData?: AppData;
closeCallIfEmptyForMs?: number;
};Key members:
readonly callId: string,appData: AppDatareadonly observedClients: Map<string, ObservedClient>,get numberOfClients()getObservedClient<T>(clientId),createObservedClient<T>(settings),getOrCreateObservedClient<T>(settings)(all… | undefined)addIssue(issue: ClientIssue): void— raise a call-level issue → emitscall-issuereadonly detectors: Detectors— server-side detector registry (empty by default; see Detectors)scoreCalculator: ScoreCalculator,get score(),readonly calculatedScoreremoteTrackResolver?: RemoteTrackResolver— set fromObserverConfig.createTrackResolverat call creation (see Remote track resolution)- aggregates:
numberOfIssues,numberOfPeerConnections,numberOfInboundRtpStreams,numberOfOutboundRtpStreams,numberOfDataChannels,maxNumberOfClients,clientsUsedTurn: Set<string>,startedAt?,endedAt?,closedAt?,closed update(),close()
type ObservedClientSettings<AppData = Record<string, unknown>> = {
clientId: string;
appData?: AppData;
closeClientIfIdleForMs?: number;
};Key members:
readonly clientId: string,appData: AppData,readonly call: ObservedCallreadonly observedPeerConnections: Map<string, ObservedPeerConnection>readonly sink?: ClientSampleSink— the per-client sink (see Sinks), ifcreateClientSinkis configured; listen on it forclose/error- Injection API (queue app data to be merged into the next sample processing):
injectEvent(ClientEvent),injectIssue(ClientIssue),injectMetaData(ClientMetaData),injectExtensionStat(ExtensionStat),injectAttachment(key, value) - Direct add API (process immediately):
addIssue(ClientIssue),addMetadata(ClientMetaData),addExtensionStats(ExtensionStat) - Metrics (current/derived):
currentAvgRttInMs?,currentMinRttInMs?,currentMaxRttInMs?,receivingAudioBitrate,receivingVideoBitrate,sendingAudioBitrate,sendingVideoBitrate,usingTURN,usingTCP,availableIncomingBitrate,availableOutgoingBitrate - Counts:
numberOfInboundRtpStreams,numberOfOutboundRtpStreams,numberOfInbundTracks,numberOfOutboundTracks,numberOfDataChannels,numberOfPeerConnections - Per-tick deltas:
deltaReceivedAudioBytes,deltaSentAudioBytes, … (see source for the full set) - Lifecycle:
joinedAt?,leftAt?,closedAt?,closed,get score() - Metadata:
browser?,engine?,platform?,operationSystem?,mediaDevices,mediaConstraints accept(sample, context?),close()
Key members:
readonly peerConnectionId: string,readonly client: ObservedClient,appData?- The 15
observed*sub-statMaps (listed above), plus array getters:codecs,inboundRtps,outboundRtps,remoteInboundRtps,remoteOutboundRtps,mediaSources,mediaPlayouts,dataChannels,peerConnectionTransports,iceTransports,iceCandidates,iceCandidatePairs,certificates,selectedIceCandidatePairs,selectedIceCandiadtePairForTurn - State:
connectionState?,iceConnectionState?,iceGatheringState?,usingTURN,usingTCP - Metrics:
currentRttInMs?,currentJitter?,availableIncomingBitrate,availableOutgoingBitrate, sending/receiving bitrates, packet rates, andtotal*/delta*byte/packet counters accept(pcSample, context?),close(),get score()
Remote-RTP correlation (derived). During accept(), receiver/sender reports are linked
to the local streams by remoteId (fallback SSRC) and surfaced as fields:
- on
ObservedOutboundRtp:remoteRttInMs?,remoteFractionLost?,remoteJitter?,remotePacketsLost? - on
ObservedInboundRtp:remoteRttInMs?,remoteBytesSent?,remotePacketsSent?,remoteTimestamp?
These are reset each tick and only set when the matching remote report is present.
The shape of an accepted sample (re-exported from this package; identical to
@observertc/schemas). Only the top level is shown — each stat object mirrors the standard
WebRTC getStats() dictionaries plus a few extensions.
type ClientSample = {
timestamp: number; // client wall-clock (ms epoch)
callId?: string; // set by you or the library
clientId?: string; // set by you or the library
score?: number; // optional client-computed score (0..5)
attachments?: Record<string, unknown>;
peerConnections?: PeerConnectionSample[];
clientEvents?: ClientEvent[];
clientIssues?: ClientIssue[];
clientMetaItems?: ClientMetaData[];
extensionStats?: ExtensionStat[];
};
type PeerConnectionSample = {
peerConnectionId: string;
attachments?: Record<string, unknown>; // e.g. { direction: 'send'|'recv', producerId, consumerId, label }
score?: number;
inboundTracks?; outboundTracks?;
codecs?;
inboundRtps?; remoteInboundRtps?;
outboundRtps?; remoteOutboundRtps?;
mediaSources?; mediaPlayouts?;
peerConnectionTransports?; dataChannels?;
iceTransports?; iceCandidates?; iceCandidatePairs?;
certificates?;
};
type ClientEvent = { type: string; payload?: string; timestamp?: number; /* +ids */ };
type ClientIssue = { type: string; payload?: string; timestamp?: number }; // also used for call-issue
type ClientMetaData = { type: string; payload?: string; timestamp?: number; /* +ids */ };
type ExtensionStat = { type: string; payload?: string };payload fields are JSON strings; the library parses the ones it understands.
ClientEventTypes (enum of known event.type values): CLIENT_JOINED, CLIENT_LEFT,
PEER_CONNECTION_OPENED/CLOSED/STATE_CHANGED, MEDIA_TRACK_ADDED/REMOVED/MUTED/UNMUTED/RESUMED,
ICE_GATHERING_STATE_CHANGED, ICE_CONNECTION_STATE_CHANGED, DATA_CHANNEL_OPEN/CLOSED/ERROR,
NEGOTIATION_NEEDED, SIGNALING_STATE_CHANGE, ICE_CANDIDATE, ICE_CANDIDATE_ERROR, and the
mediasoup set PRODUCER_* / CONSUMER_* / DATA_PRODUCER_* / DATA_CONSUMER_*.
ClientMetaTypes (enum of known meta type values): MEDIA_CONSTRAINT, MEDIA_DEVICE,
MEDIA_DEVICES_SUPPORTED_CONSTRAINTS, USER_MEDIA_ERROR, LOCAL_SDP, OPERATION_SYSTEM,
ENGINE, PLATFORM, BROWSER.
Two consecutive samples from one participant ("Guest" in room qq0iwfnd) of an
edumeet/mediasoup call show what actually flows through accept(): a rich join snapshot,
then lean steady-state ticks.
Sample 1 — the join snapshot. Carries the one-off lifecycle clientEvents and device
clientMetaItems alongside the first stats. (Abbreviated; ids and times are from the real log.)
What accept() does with it, in order — each step emits on the bus with full ancestry:
- lazily creates the
ObservedCall→call-added; - creates the
ObservedClient→client-added, thenclient-joined(fromCLIENT_JOINED); - creates an
ObservedPeerConnectionper entry →peer-connection-added(×2 here); - creates an
ObservedOutboundTrackper track →outbound-track-added, plus the matchingoutbound-rtp-added; - replays the device list as
client-metadataevents and the lifecycle items asclient-event; and finallyclient-updatedfor the whole tick.
attachments.roomId lands on observedClient.attachments (read it on client-updated, not at
creation — see Ingestion).
Sample 2 — a steady-state tick (~8 s later): same callId / clientId, no new
clientEvents or clientMetaItems, just refreshed peerConnections stats. Each PC now scores 5
and the aggregate client score is 4.74 — a healthy call. This is the shape of nearly every
sample: each tick refreshes metrics and fires the *-updated events, while the heavy join
snapshot happens only once.
observer-js deliberately ships no built-in detectors. Per-client signals — packet loss,
jitter, RTT, freezes, etc. — are already detectable on the client and arrive on samples as
clientIssues (surfaced via client-issue). Server-side detection should focus on what only
the server can see by correlating data across the clients of a call.
The hook lives on ObservedCall:
import { Observer, Detector } from '@observertc/observer-js';
class MyCrossClientDetector implements Detector {
readonly name = 'my-detector';
constructor(private readonly call /* : ObservedCall */) {}
update() { // called on every call.update()
// …inspect this.call.observedClients across participants…
if (/* condition only visible server-side */ false) {
this.call.addIssue({ type: this.name, payload: JSON.stringify({ /* … */ }), timestamp: Date.now() });
// → emitted on the bus as 'call-issue'
}
}
}
const observer = new Observer();
observer.on('call-added', ({ observedCall }) => {
observedCall.detectors.add(new MyCrossClientDetector(observedCall));
});
observer.on('call-issue', ({ observedCall, issue }) => { /* react */ });Detector interface and the registry:
interface Detector { readonly name: string; update(): void; }
class Detectors {
add(d: Detector): void;
remove(d: Detector): void;
clear(): void;
update(): void; // called by ObservedCall.update(); guards each detector in try/catch
get listOfNames(): string[];
}In an SFU, one participant's outbound track is delivered to other participants as inbound
tracks (one publisher → many subscribers). Correlation is opt-in per observer: set
ObserverConfig.createTrackResolver, a factory invoked when each call is created that returns the
call's RemoteTrackResolver (or undefined for none).
RemoteTrackResolver is a generic, strategy-driven class. It subscribes to the bus (filtered to
its call) and links tracks by publisher id — the link key — maintaining the links directly on
the tracks: inboundTrack.remoteOutboundTrack and outboundTrack.remoteInboundTracks: Set.
import { Observer, createDefaultMediasoupRemoteTrackResolverFactory } from '@observertc/observer-js';
const observer = new Observer({
createTrackResolver: createDefaultMediasoupRemoteTrackResolverFactory(),
});
// later, given tracks (links are kept up to date as tracks come and go):
const source = inboundTrack.remoteOutboundTrack; // the publishing ObservedOutboundTrack
const receivers = [ ...outboundTrack.remoteInboundTracks ]; // the subscribing ObservedInboundTrack[]Two built-in factories ship: createDefaultMediasoupRemoteTrackResolverFactory() (publisher =
attachments.producerId, subscriber = attachments.consumerId) and
createP2pRemoteTrackResolverFactory() (matches by RTP SSRC, preserved end-to-end in p2p).
For any other topology, build a RemoteTrackResolver with your own key resolvers — the publisher
id is just whatever links a subscribed track to the published one:
import { Observer, RemoteTrackResolver } from '@observertc/observer-js';
const observer = new Observer({
createTrackResolver: (observedCall) => new RemoteTrackResolver(observedCall, {
resolveOutboundTrackPublisherId: (out) => out.attachments?.mediaId as string | undefined,
resolveInboundTrackPublisherId: (inb) => inb.attachments?.mediaId as string | undefined,
resolveInboundTrackSubscriberId: (inb) => inb.attachments?.subId as string | undefined, // optional
}),
});For the mediasoup factory, the application puts producerId / consumerId (and optionally
direction, label) into the track attachments.
Everything above is built from the client-reported ClientSample. When you run a
mediasoup SFU you also have the server's own ground truth — its
routers, transports, producers, consumers and data channels, with exact lifetimes and state
transitions. ObservedMediasoupRouter captures that server-side view into a
MediasoupRouterSample, completely independent of the client sample pipeline.
You hand the observer a live mediasoup Router; it attaches to mediasoup's own observer API and,
from then on, passively records the router's topology and lifecycle — with no polling and no
changes to your media code:
- new transports (
webrtc/plain/pipe/direct), their selectedtuple, and ICE state transitions; - producers (codec, SSRCs/RIDs,
pause/resume) and consumers (pause/resume,producerPaused/producerResumed); - data producers and data consumers;
createdAt/closedAtfor every entity above.
All of it accumulates on observedMediasoupRouter.sample (a MediasoupRouterSample — see
src/schema/MediasoupRouter.ts). This is a Sample, not a
Report: it mirrors the naming of ClientSample and is yours to snapshot, persist, or correlate.
A router belongs to one or more calls, but the observer does not store the router (or its sample)
on the ObservedCall. Instead, when a match is found it emits
mediasoup-router-matched-with-call and steps back — your application decides what the
pairing means. Stamp the callId into the router's appData, build your own index, attach the
sample to the call in your database — whatever fits your system. The library stays unopinionated and
loosely coupled.
There are two ways a match is discovered, controlled by the settings you pass to
createObservedMediasoupRouter:
- Explicit (
callId) — you already know the call. If that call currently exists,mediasoup-router-matched-with-callfires immediately. - Implicit (
bindCallByWebRtcTransportId: true) — let the observer discover it. As peer connections are observed (peer-connection-added), the observer checks whether the peer connection's id is one of the router's WebRTC transport ids. On a hit, it's a match. It emits once per distinct call, and keeps watching so additional calls sharing the router can still match later. The internal listener is removed automatically when the router closes or the observer closes.
When the underlying mediasoup router closes, its close propagates to ObservedMediasoupRouter,
which emits mediasoup-router-removed. That is your cue to do whatever cleanup or persistence
you want with the now-final sample — again, the observer itself keeps nothing.
| Field | Type | Required | Meaning |
|---|---|---|---|
router |
mediasoup.types.Router |
yes | the live router to observe; the observer attaches to router.observer |
routerId |
string |
yes | your id for the router (the sample also carries router.id) |
appData |
Record<string, unknown> |
no | application-owned bag on the ObservedMediasoupRouter (e.g. where you record the matched callId) |
attachments |
Record<string, unknown> |
no | free-form data copied onto sample.attachments |
callId |
string |
no | explicit match: emit mediasoup-router-matched-with-call now if this call exists |
bindCallByWebRtcTransportId |
boolean |
no | implicit match: discover the call(s) by correlating WebRTC transport ids with peer-connection ids |
Returns the ObservedMediasoupRouter, or undefined if the observer is closed (a router with the
same id returns the existing instance — both warn).
Useful members on the returned object: .sample (the MediasoupRouterSample), .appData,
.attachments (getter over sample.attachments), .webrtcTransportIds: Set<string>, .id,
.close().
import { Observer } from '@observertc/observer-js';
import type { ObservedMediasoupRouterScope, ObservedCallScope } from '@observertc/observer-js';
const observer = new Observer();
// 1) Feed client samples as usual so the observer knows about calls & peer connections.
// (e.g. transport-layer: observer.accept(clientSample, context))
// 2) Observe the SFU side. Let the observer discover which call this router serves.
const router = /* your mediasoup router */ undefined as any;
const observedRouter = observer.createObservedMediasoupRouter({
router,
routerId: router.id,
bindCallByWebRtcTransportId: true, // discover the call by peer-connection correlation
appData: {}, // we'll record the matched callId here
});
// 3) The observer found a call for the router — WE decide what to do with the pairing.
observer.on('mediasoup-router-matched-with-call',
({ observedMediasoupRouter, observedCall }: ObservedMediasoupRouterScope & ObservedCallScope) => {
// e.g. remember the association on the router's appData…
observedMediasoupRouter.appData.callId = observedCall.callId;
// …or attach the live server sample to the call in your own store:
myStore.linkRouterToCall(observedCall.callId, observedMediasoupRouter.sample);
},
);
// 4) The router closed — WE decide what to persist/forward with the final sample.
observer.on('mediasoup-router-removed', ({ observedMediasoupRouter }: ObservedMediasoupRouterScope) => {
const callId = observedMediasoupRouter.appData.callId as string | undefined;
myStore.saveRouterSample(callId, observedMediasoupRouter.sample);
});
// (optional) react to the router being registered at all:
observer.on('mediasoup-router-added', ({ observedMediasoupRouter }) => {
console.log('observing router', observedMediasoupRouter.id);
});If you already know the call, skip discovery and match explicitly:
observer.createObservedMediasoupRouter({ router, routerId: router.id, callId });
// → `mediasoup-router-matched-with-call` fires immediately if `callId` is a known call- Loose coupling. The call model stays about client telemetry; the SFU view lives on its own object and is associated only if and how you choose.
- You own the association. One router may serve multiple calls, a call may be served by multiple routers, and the right place to keep that mapping is application-specific — so the observer hands you the match and the final sample and gets out of the way.
- No silent accumulation. Nothing is appended to
ObservedCall, so there is no hidden growth or lifetime you have to reason about; the router sample lives exactly as long as you keep a reference.
A sink receives the samples a client accepts — for archival, streaming, or later offline
replay. Each ObservedClient gets its own sink, produced by the
ObserverConfig.createClientSink factory when the client is created (return undefined for no
sink). The client pushes every accepted sample to its sink, and end()s it on close.
ClientSampleSink is an abstract base class (a typed EventEmitter). You create a sink by
subclassing it and implementing write and end. It is object-mode: write receives
the ClientSample object, so each sink decides how (or whether) to serialize it — JSON line,
protobuf, a remote POST body, an in-memory push, etc.
import { ClientSampleSink, ClientSample } from '@observertc/observer-js';
abstract class ClientSampleSink /* extends EventEmitter */ {
abstract write(sample: ClientSample): boolean; // accept one sample; `false` = backpressure
abstract end(): void; // flush; emit `close` when the destination is ready
// typed events (inherited): the listener signature is inferred from the event name
on(event: 'close' | 'finish' | 'drain', listener: () => void): this;
on(event: 'error', listener: (err: Error) => void): this;
// ...and the matching `once` / `off` / `emit`
}| Event | Meaning |
|---|---|
close |
the destination is fully written and closed (e.g. a file flushed and its fd closed) — "ready" |
error |
the destination failed |
finish |
end() was processed and queued data flushed (before close) |
drain |
the buffer drained after backpressure; safe to write more |
The library calls write(sample) synchronously per accepted sample (it is not awaited),
end()s the sink when the client closes, and attaches an error listener so a failing sink
can't crash the process (it also catches throws from write/end). The application — which
created the sink — listens for close (destination ready) and error. Because write isn't
awaited in the accept() hot path, backpressure and batching are the sink's concern.
import { Observer, createJsonlFileSinkFactory } from '@observertc/observer-js';
const observer = new Observer({
// one ./stats/<callId>__<clientId>.jsonl per client
createClientSink: createJsonlFileSinkFactory({ directory: './stats' }),
});
// React when a sink is created for a client:
observer.on('client-sink-created', ({ observedClient, sink }) => {
sink.on('close', () => {
// the file is fully flushed and its fd closed — ready to upload, move, etc.
});
});| Export | Signature | Notes |
|---|---|---|
createJsonlFileSinkFactory |
({ directory, flags?, getFileName?, serializeSample? }) => ClientSampleSinkFactory |
per-client JSONL files; path defaults to ${callId}__${clientId}.jsonl under directory (which must exist) |
createJsonlFileSink |
({ path, flags?, serializeSample? }) => ClientSampleSink |
a single JSONL file; wraps fs.WriteStream and re-emits its close/finish/drain/error |
JsonlFileSink |
class extends ClientSampleSink |
the underlying class; exposes readonly path so a close handler knows which file is ready |
createInMemorySink / InMemorySink |
(samples?: ClientSample[]) => InMemorySink |
collects the accepted sample objects into .samples: ClientSample[]; emits close on end() |
serializeSample?: (sample: ClientSample) => string overrides the default JSON.stringify for
the JSONL sinks (e.g. to redact or reshape before writing).
The bus hands you the sink as the base ClientSampleSink. To read information specific to a sink
type — for a file sink, where it was written — narrow with instanceof and read the sink's
public fields. JsonlFileSink exposes path:
import { JsonlFileSink } from '@observertc/observer-js';
observer.on('client-sink-created', ({ observedClient, sink }) => {
if (sink instanceof JsonlFileSink) {
const { path } = sink; // the file this client's samples go to
sink.once('close', () => uploadFile(path)); // close = flushed & fd closed → ready
}
});The general pattern: each concrete sink exposes whatever it wants as public readonly fields, and
consumers narrow (instanceof YourSink) to read them. Your own sinks do the same.
Subclass ClientSampleSink and emit the lifecycle events yourself — for any non-file
destination (a remote endpoint, a message queue, an object store, …):
import { ClientSampleSink, ClientSample, ClientSampleSinkFactory } from '@observertc/observer-js';
class HttpSink extends ClientSampleSink {
private buffer: ClientSample[] = [];
constructor(private readonly url: string) { super(); }
write(sample: ClientSample): boolean {
this.buffer.push(sample); // batch; decide your own backpressure
return true;
}
end(): void {
fetch(this.url, { method: 'POST', body: JSON.stringify(this.buffer) })
.then(() => this.emit('close')) // signal "destination ready"
.catch((err) => this.emit('error', err));
}
}
const createClientSink: ClientSampleSinkFactory = ({ clientId, observedCall }) =>
new HttpSink(`https://stats.example.com/${observedCall.callId}/${clientId}`);
const observer = new Observer({ createClientSink });observedClient.sink? exposes the created sink; the client-sink-created event delivers it on
the bus with full ancestry. ClientSampleSinkFactory is
(p: { clientId: string; observedCall: ObservedCall }) => ClientSampleSink | undefined.
observer-js logs through a single, swappable sink. Out of the box it writes debug and
above to console (verbose — install your own sink for production). Funnel everything into your
logger:
import { setObserverLogger, type ObserverLogger } from '@observertc/observer-js';
setObserverLogger({
trace: (m, ...a) => myLogger.trace(`[${m}]`, ...a),
debug: (m, ...a) => myLogger.debug(`[${m}]`, ...a),
info: (m, ...a) => myLogger.info(`[${m}]`, ...a),
warn: (m, ...a) => myLogger.warn(`[${m}]`, ...a),
error: (m, ...a) => myLogger.error(`[${m}]`, ...a),
});createLogger(moduleName) is also exported for your own modules. See
docs/logging.md for pino / winston / console recipes, level
filtering, per-module routing, and full silencing.
The library warns and degrades; it does not throw on operational problems:
createObservedCall/createObservedClienton a closed parent → warn + returnundefined.- Duplicate id → warn + return the existing instance.
accept()on a closed client → warn + no-op.- Sample missing
callId/clientId, or observer closed →sample-rejectedevent. - A throwing accept-middleware → warn + drop that sample (never crashes
accept()).
Therefore create* and getOrCreate* return T | undefined; guard the result. The
Middleware utility's internal invariants (e.g. calling next() twice) throw, but those throws
are caught by accept() and surfaced as a warning.
yarn install
yarn build # tsup → dist/ (dual ESM .mjs + CJS .js, single entry, .d.ts/.d.mts + sourcemaps)
yarn lint # eslint -c .eslintrc.json "src/**/*.ts"
yarn typecheck # tsc --noEmit
yarn test # jestThe build is driven by tsup (config in tsup.config.ts): a single
entry (src/index.ts), dual ESM + CommonJS output to dist/ (index.mjs / index.js) with
.d.mts / .d.ts types and sourcemaps, targeting Node 20. CI (.github/workflows/ci.yml) runs
lint + typecheck + build + test on every push/PR.
Project layout (src/): Observer.ts, ObservedCall.ts, ObservedClient.ts,
ObservedPeerConnection.ts, the Observed* sub-stat classes, ObserverEvents.ts (the typed
event map + scope types), detectors/ (Detector, Detectors), scores/, updaters/
(update-policy strategies), utils/ (remote-track resolvers), common/ (logger, utils,
Middleware), schema/ (sample/event/meta types), and sinks/ (the ClientSampleSink base +
JsonlFileSink / InMemorySink, re-exported from the package root).
Conventions to follow when developing further:
- Single event bus. New consumer-facing events go in
ObserverEvents.tswith an object payload[<Scope> & { …subject }], and are emitted via the component's_notify(type, { ...this.eventScope, …subject }). Each component has a precomputedeventScopefield and a thin_notifywrapper around the right emitter. Keep purely internal coordination as local EventEmitter events (and remember tooffthem on close). - Warn, don't throw on operational/edge conditions; return
undefinedwhere a value can't be produced. - Counter-reset-safe deltas. When computing a delta from a cumulative counter, never emit a
negative value (guard
curr >= prev), to survive counter resets / SSRC reuse. - Explicit accumulation. The per-sample metric accumulation in
accept()is intentionally explicit and not abstracted — match that style. - Detectors are server-side. Add cross-client detectors on
ObservedCall.detectors; don't re-implement client-detectable signals.
Recipes:
- Add an event: add the key + payload to
ObserverEvents; in the owning component callthis._notify('my-event', { ...this.eventScope, subject }). - Add a per-stream metric: add the field to the relevant
Observed*Rtp/track class, populate it in itsupdate()(reset at the top ofupdate()if it's per-tick), and read it from a*-updatedhandler. - Add a detector: implement
Detector, register it oncall-addedviaobservedCall.detectors.add(...), surface findings withobservedCall.addIssue(...).
Apache-2.0. Part of the ObserverTC ecosystem.
{ "timestamp": 1780572332518, "callId": "d3dbf2f5-79be-4cb8-9d43-fb404f07ef27", "clientId": "c926983c-4468-4046-ae8c-a9cabe1a1868", "score": 0, // no quality measured yet on the join tick "attachments": { "displayName": "Guest", "roomId": "qq0iwfnd", "actualSessionId": "d3dbf2f5-…" }, "clientEvents": [ // chronological lifecycle (12 in the real sample) { "type": "CLIENT_JOINED", "timestamp": 1780572324515 }, { "type": "PEER_CONNECTION_OPENED", "timestamp": 1780572326790 }, // pc=b81c8d9d (media) { "type": "ICE_GATHERING_STATE_CHANGED", "timestamp": 1780572326811 }, // → gathering { "type": "PEER_CONNECTION_STATE_CHANGED", "timestamp": 1780572326812 }, // → connecting { "type": "PRODUCER_ADDED", "timestamp": 1780572326821 }, // producer=1abdaf82 (audio) { "type": "MEDIA_TRACK_ADDED", "timestamp": 1780572326821 }, // track=36ae42df (audio) { "type": "PEER_CONNECTION_STATE_CHANGED", "timestamp": 1780572326827 }, // → connected { "type": "PRODUCER_ADDED", "timestamp": 1780572326837 }, // producer=ba06a35b (video) { "type": "DATA_PRODUCER_CREATED", "timestamp": 1780572326853 } ], "clientMetaItems": [ // environment & devices, one-off (10 in the real sample) { "type": "USER_AGENT_DATA", "payload": "{…Chrome 148 / macOS…}" }, { "type": "MEDIA_DEVICE", "payload": "{…\"BRIO 4K Stream Edition\"…}" } // …mic / camera / speaker devices… ], "peerConnections": [ { "peerConnectionId": "b81c8d9d-…", // the media PC — Guest publishes to the SFU "outboundRtps": [ /* audio + video */ ], "outboundTracks": [ /* mic + camera: label, settings, capabilities */ ], "remoteInboundRtps": [ /* RTCP feedback from the SFU */ ], "codecs": [ /* … */ ], "iceTransports": [ /* … */ ], "iceCandidatePairs": [ /* … */ ], "dataChannels": [ /* … */ ] }, { "peerConnectionId": "8635acb7-…", "peerConnectionTransports": [ /* … */ ] } // signaling-only PC ] }