Skip to content

Commit 4e76fe2

Browse files
committed
refactor(ddp-streamer): backpressure, deterministic logout, metrics, fan-out tests
Internal refactor of ee/apps/ddp-streamer keeping the wire protocol identical (DDP-over-WS, EJSON, login via Account.login, method fallback to MeteorService). Heartbeat & backpressure: - Split heartbeat into idleTimer / pongTimer. The pongTimer is armed only when a server PING is sent and is cleared exclusively by an inbound PONG; other inbound traffic no longer extends the deadline (a broken client that keeps sending data but never replies to PING is now disconnected deterministically). - Drop slow consumers: when ws.bufferedAmount exceeds MAX_BUFFERED_BYTES (default 4 MiB, env override DDP_MAX_BUFFERED_BYTES), close with code 1013. Fan-out path: - Encapsulate the ws private _sender.sendFrame call behind RawSender so the upgrade path to a different ws/uWS implementation is local. Apply the same bufferedAmount guard at the fan-out level. - Extract fanOutText from Stream.sendToManySubscriptions so the loop is testable without instantiating the full Streamer base class. Lifecycle: - Replace setTimeout(1ms) on logout with setImmediate; ws frame ordering guarantees the result/updated frames reach the wire before the close frame. - Decode binary frames as UTF-8 in Server.parse instead of throwing 500 (some proxies/wrappers deliver UTF-8 JSON in binary frames). Observability: - Register ddp_method_total{namespace,status}, ddp_close_total{code} and ddp_send_buffer_bytes{nodeID} (sampled every 5s). - Method labels are bucketed by the prefix before ':' or '.' to keep Prometheus cardinality bounded. - Replace console.error/warn with @rocket.chat/logger in Client.ts and Streamer.ts. Tests: - New specs: Client.spec, Streamer.spec, RawSender.spec. - Server.spec extended to cover parse() and metric increments. - Suite grows from 6 to 32 tests; coverage from ~10% to 63% (RawSender 94%, Streamer 75%, Server 65%, Client 60%).
1 parent aed3c5d commit 4e76fe2

11 files changed

Lines changed: 719 additions & 76 deletions

File tree

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
import { EventEmitter } from 'events';
2+
import type { IncomingMessage } from 'http';
3+
4+
import type WebSocket from 'ws';
5+
6+
import { Client } from './Client';
7+
import { DDP_EVENTS, MAX_BUFFERED_BYTES, TIMEOUT, WS_ERRORS } from './constants';
8+
9+
jest.mock('./Server', () => ({ SERVER_ID: '{"msg":"server_id","server_id":"0"}' }));
10+
11+
jest.mock('./configureServer', () => {
12+
const realEvents: typeof import('events') = jest.requireActual('events');
13+
class FakeServer extends realEvents.EventEmitter {
14+
id = 'fake-server';
15+
16+
serialize = (obj: unknown) => JSON.stringify(obj);
17+
18+
parse = (data: any) => JSON.parse(data.toString());
19+
20+
call = jest.fn().mockResolvedValue(undefined);
21+
22+
subscribe = jest.fn().mockResolvedValue(undefined);
23+
}
24+
return { server: new FakeServer(), events: new realEvents.EventEmitter() };
25+
});
26+
27+
jest.mock('@rocket.chat/core-services', () => ({
28+
Presence: { updateConnection: jest.fn().mockResolvedValue(undefined) },
29+
}));
30+
31+
class FakeWebSocket extends EventEmitter {
32+
public bufferedAmount = 0;
33+
34+
public readyState = 1;
35+
36+
public send = jest.fn();
37+
38+
public close = jest.fn((_code?: number, _reason?: string) => {
39+
this.readyState = 3;
40+
});
41+
}
42+
43+
const makeReq = (): IncomingMessage => ({ socket: { remoteAddress: '127.0.0.1' }, headers: {} }) as unknown as IncomingMessage;
44+
45+
const asWs = (ws: FakeWebSocket): WebSocket => ws as unknown as WebSocket;
46+
47+
describe('Client.send backpressure', () => {
48+
let ws: FakeWebSocket;
49+
let client: any;
50+
51+
beforeEach(() => {
52+
jest.useFakeTimers();
53+
ws = new FakeWebSocket();
54+
client = new Client(asWs(ws), false, makeReq());
55+
ws.send.mockClear();
56+
ws.close.mockClear();
57+
});
58+
59+
afterEach(() => {
60+
jest.clearAllTimers();
61+
jest.useRealTimers();
62+
});
63+
64+
it('closes with code 1013 when bufferedAmount exceeds the threshold', () => {
65+
ws.bufferedAmount = MAX_BUFFERED_BYTES + 1;
66+
client.send('hello');
67+
expect(ws.close).toHaveBeenCalledWith(WS_ERRORS.SLOW_CONSUMER, expect.any(String));
68+
expect(ws.send).not.toHaveBeenCalled();
69+
});
70+
71+
it('forwards to ws.send when bufferedAmount is below the threshold', () => {
72+
ws.bufferedAmount = 1024;
73+
client.send('hello');
74+
expect(ws.send).toHaveBeenCalledWith('hello');
75+
expect(ws.close).not.toHaveBeenCalled();
76+
});
77+
78+
it('encodes payloads for meteor clients with the SockJS array wrapper', () => {
79+
const meteorWs = new FakeWebSocket();
80+
const meteorClient = new Client(asWs(meteorWs), true, makeReq());
81+
meteorWs.send.mockClear();
82+
meteorClient.send('payload');
83+
expect(meteorWs.send).toHaveBeenCalledWith('a["payload"]');
84+
});
85+
});
86+
87+
describe('Client heartbeat', () => {
88+
let ws: FakeWebSocket;
89+
90+
const handshake = async (): Promise<void> => {
91+
ws.emit('message', Buffer.from(JSON.stringify({ msg: DDP_EVENTS.CONNECT, version: '1' })), false);
92+
await flushAsync();
93+
};
94+
95+
const flushAsync = async (): Promise<void> => {
96+
// `Client.handler` is async — yield twice so server.parse, the internal emit, and
97+
// the once('message') listener all complete before the test inspects state.
98+
await Promise.resolve();
99+
await Promise.resolve();
100+
};
101+
102+
beforeEach(() => {
103+
jest.useFakeTimers();
104+
ws = new FakeWebSocket();
105+
// Client wires itself into the ws via constructor side effects; we don't need the ref.
106+
void new Client(asWs(ws), false, makeReq());
107+
ws.send.mockClear();
108+
ws.close.mockClear();
109+
});
110+
111+
afterEach(() => {
112+
jest.clearAllTimers();
113+
jest.useRealTimers();
114+
});
115+
116+
const lastSent = (): string | undefined => ws.send.mock.calls.at(-1)?.[0];
117+
118+
it('sends PING after the idle window expires', () => {
119+
jest.advanceTimersByTime(TIMEOUT);
120+
expect(lastSent()).toContain(`"${DDP_EVENTS.PING}"`);
121+
});
122+
123+
it('closes with code 4000 when no PONG arrives within the pong window', () => {
124+
jest.advanceTimersByTime(TIMEOUT); // PING sent, pong window starts
125+
jest.advanceTimersByTime(TIMEOUT); // pong window expires
126+
expect(ws.close).toHaveBeenCalledWith(WS_ERRORS.TIMEOUT, expect.any(String));
127+
});
128+
129+
it('does not extend the pong window when non-PONG traffic arrives', async () => {
130+
await handshake();
131+
ws.close.mockClear();
132+
jest.advanceTimersByTime(TIMEOUT); // PING sent
133+
ws.emit('message', Buffer.from(JSON.stringify({ msg: DDP_EVENTS.METHOD, method: 'foo', id: '1' })), false);
134+
await flushAsync();
135+
jest.advanceTimersByTime(TIMEOUT - 1);
136+
expect(ws.close).not.toHaveBeenCalled();
137+
jest.advanceTimersByTime(2);
138+
expect(ws.close).toHaveBeenCalledWith(WS_ERRORS.TIMEOUT, expect.any(String));
139+
});
140+
141+
it('cancels the pong window and resumes the idle cycle when PONG arrives', async () => {
142+
await handshake();
143+
ws.close.mockClear();
144+
jest.advanceTimersByTime(TIMEOUT); // PING #1 sent, pong window armed
145+
ws.send.mockClear();
146+
ws.emit('message', Buffer.from(JSON.stringify({ msg: DDP_EVENTS.PONG })), false);
147+
await flushAsync();
148+
// Past the original pong deadline — we should still be alive.
149+
jest.advanceTimersByTime(TIMEOUT);
150+
expect(ws.close).not.toHaveBeenCalled();
151+
// And the next idle window should have produced a fresh PING.
152+
expect(lastSent()).toContain(`"${DDP_EVENTS.PING}"`);
153+
});
154+
});

ee/apps/ddp-streamer/src/Client.ts

Lines changed: 57 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,18 @@ import type { IncomingMessage } from 'http';
33

44
import { Presence } from '@rocket.chat/core-services';
55
import type { ISocketConnection } from '@rocket.chat/core-typings';
6+
import { Logger } from '@rocket.chat/logger';
67
import { throttle } from 'underscore';
78
import { v1 as uuidv1 } from 'uuid';
89
import type WebSocket from 'ws';
910

1011
import { SERVER_ID } from './Server';
1112
import { server } from './configureServer';
12-
import { DDP_EVENTS, WS_ERRORS, WS_ERRORS_MESSAGES, TIMEOUT } from './constants';
13+
import { DDP_EVENTS, WS_ERRORS, WS_ERRORS_MESSAGES, TIMEOUT, MAX_BUFFERED_BYTES } from './constants';
1314
import type { IPacket } from './types/IPacket';
1415

16+
const logger = new Logger('DDP-Streamer-Client');
17+
1518
// TODO why localhost not as 127.0.0.1?
1619
// based on Meteor's implementation (link)
1720
const getClientAddress = (req: IncomingMessage): string | undefined => {
@@ -61,7 +64,9 @@ export const clientMap = new WeakMap<WebSocket, Client>();
6164
export class Client extends EventEmitter {
6265
private chain = Promise.resolve();
6366

64-
protected timeout: NodeJS.Timeout;
67+
protected idleTimer?: NodeJS.Timeout;
68+
69+
protected pongTimer?: NodeJS.Timeout;
6570

6671
public readonly session = uuidv1();
6772

@@ -75,11 +80,13 @@ export class Client extends EventEmitter {
7580

7681
public userToken?: string;
7782

83+
public closeCode?: number;
84+
7885
private updatePresence = throttle(
7986
() => {
8087
if (this.userId) {
8188
void Presence.updateConnection(this.userId, this.connection.id).catch((err) => {
82-
console.error('Error updating connection presence:', err);
89+
logger.error({ msg: 'Error updating connection presence', err });
8390
});
8491
}
8592
},
@@ -104,17 +111,18 @@ export class Client extends EventEmitter {
104111
httpHeaders: req.headers,
105112
};
106113

107-
this.renewTimeout(TIMEOUT / 1000);
114+
this.armIdleTimer();
108115
this.ws.on('message', this.handler);
109-
this.ws.on('close', (...args) => {
116+
this.ws.on('close', (code: number, ...rest: unknown[]) => {
117+
this.closeCode = code;
110118
server.emit(DDP_EVENTS.DISCONNECTED, this);
111-
this.emit('close', ...args);
119+
this.emit('close', code, ...rest);
112120
this.subscriptions.clear();
113-
clearTimeout(this.timeout);
121+
this.clearTimers();
114122
});
115123

116124
this.ws.on('error', (err) => {
117-
console.error('Unexpected error:', err);
125+
logger.error({ msg: 'Unexpected WebSocket error', err });
118126
this.ws.close(WS_ERRORS.CLOSE_PROTOCOL_ERROR, WS_ERRORS_MESSAGES.CLOSE_PROTOCOL_ERROR);
119127
});
120128

@@ -124,7 +132,9 @@ export class Client extends EventEmitter {
124132

125133
server.emit(DDP_EVENTS.CONNECTED, this);
126134

127-
this.ws.on('message', () => this.renewTimeout(TIMEOUT));
135+
// Any inbound traffic resets the idle window. The pong timer, when armed, is only
136+
// cleared by an actual PONG (handled in `process`).
137+
this.ws.on('message', () => this.armIdleTimer());
128138

129139
this.once('message', ({ msg }) => {
130140
if (msg !== DDP_EVENTS.CONNECT) {
@@ -158,6 +168,9 @@ export class Client extends EventEmitter {
158168
case DDP_EVENTS.PING:
159169
this.pong(packet.id);
160170
break;
171+
case DDP_EVENTS.PONG:
172+
this.handlePong();
173+
break;
161174
case DDP_EVENTS.METHOD:
162175
if (!packet.method) {
163176
return this.ws.close(WS_ERRORS.CLOSE_PROTOCOL_ERROR);
@@ -203,12 +216,35 @@ export class Client extends EventEmitter {
203216

204217
handleIdle = (): void => {
205218
this.ping();
206-
this.timeout = setTimeout(this.closeTimeout, TIMEOUT);
219+
// Only the actual PONG clears this; other inbound messages do NOT.
220+
clearTimeout(this.pongTimer);
221+
this.pongTimer = setTimeout(this.closeTimeout, TIMEOUT);
207222
};
208223

209-
renewTimeout(timeout = TIMEOUT): void {
210-
clearTimeout(this.timeout);
211-
this.timeout = setTimeout(this.handleIdle, timeout);
224+
handlePong(): void {
225+
clearTimeout(this.pongTimer);
226+
this.pongTimer = undefined;
227+
// Resume the normal idle cycle. The 'message' listener that runs alongside us is
228+
// guarded by `pongTimer`, so it skipped re-arming while we were awaiting PONG.
229+
this.armIdleTimer();
230+
}
231+
232+
armIdleTimer(timeout = TIMEOUT): void {
233+
// Liveness during the PING/PONG window is governed strictly by `pongTimer`.
234+
// Other inbound traffic must NOT extend the deadline — that would mask a broken
235+
// client that keeps sending data but never replies to PING.
236+
if (this.pongTimer) {
237+
return;
238+
}
239+
clearTimeout(this.idleTimer);
240+
this.idleTimer = setTimeout(this.handleIdle, timeout);
241+
}
242+
243+
clearTimers(): void {
244+
clearTimeout(this.idleTimer);
245+
clearTimeout(this.pongTimer);
246+
this.idleTimer = undefined;
247+
this.pongTimer = undefined;
212248
}
213249

214250
handler = async (payload: WebSocket.Data, isBinary: boolean): Promise<void> => {
@@ -221,7 +257,7 @@ export class Client extends EventEmitter {
221257
}
222258
this.process(packet.msg, packet);
223259
} catch (err) {
224-
console.error(err);
260+
logger.error({ msg: 'Failed to handle inbound DDP frame', err });
225261
return this.ws.close(WS_ERRORS.UNSUPPORTED_DATA, WS_ERRORS_MESSAGES.UNSUPPORTED_DATA);
226262
}
227263
};
@@ -234,6 +270,13 @@ export class Client extends EventEmitter {
234270
}
235271

236272
send(payload: string): void {
273+
// Drop slow consumers deterministically: if the OS-level write buffer is past the
274+
// configured threshold the client cannot keep up. Closing here prevents heap blow-up
275+
// on the streamer pod when a single peer stalls.
276+
if (this.ws.bufferedAmount > MAX_BUFFERED_BYTES) {
277+
this.ws.close(WS_ERRORS.SLOW_CONSUMER, WS_ERRORS_MESSAGES.SLOW_CONSUMER);
278+
return;
279+
}
237280
return this.ws.send(this.encodePayload(payload));
238281
}
239282
}

ee/apps/ddp-streamer/src/DDPStreamer.ts

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,27 @@ export class DDPStreamer extends ServiceClass {
113113
description: 'Users logged by streamer',
114114
});
115115

116+
metrics.register({
117+
name: 'ddp_method_total',
118+
type: 'counter',
119+
labelNames: ['namespace', 'status'],
120+
description: 'DDP method calls by namespace and outcome',
121+
});
122+
123+
metrics.register({
124+
name: 'ddp_close_total',
125+
type: 'counter',
126+
labelNames: ['code'],
127+
description: 'DDP WebSocket close events by code',
128+
});
129+
130+
metrics.register({
131+
name: 'ddp_send_buffer_bytes',
132+
type: 'gauge',
133+
labelNames: ['nodeID'],
134+
description: 'Maximum WebSocket send buffer size across connected clients (bytes)',
135+
});
136+
116137
server.setMetrics(metrics);
117138

118139
server.on(DDP_EVENTS.CONNECTED, () => {
@@ -123,13 +144,29 @@ export class DDPStreamer extends ServiceClass {
123144
metrics.increment('users_logged', { nodeID }, 1);
124145
});
125146

126-
server.on(DDP_EVENTS.DISCONNECTED, ({ userId }) => {
147+
server.on(DDP_EVENTS.DISCONNECTED, ({ userId, closeCode }: Client) => {
127148
metrics.decrement('users_connected', { nodeID }, 1);
128149
if (userId) {
129150
metrics.decrement('users_logged', { nodeID }, 1);
130151
}
152+
metrics.increment('ddp_close_total', { code: String(closeCode ?? 'unknown') }, 1);
131153
});
132154

155+
// Periodically sample the maximum send-buffer depth across all live connections.
156+
// Exposes when a single peer is starting to fall behind before we hit the 1013 kick.
157+
const sampleSendBuffer = (): void => {
158+
let max = 0;
159+
this.wss?.clients.forEach((ws) => {
160+
if (ws.bufferedAmount > max) {
161+
max = ws.bufferedAmount;
162+
}
163+
});
164+
metrics.set('ddp_send_buffer_bytes', max, { nodeID });
165+
};
166+
const sampleInterval = setInterval(sampleSendBuffer, 5000);
167+
// `unref` avoids holding the event loop open during shutdown.
168+
sampleInterval.unref?.();
169+
133170
async function sendUserData(client: Client, userId: string) {
134171
// TODO figure out what fields to send. maybe to to export function getBaseUserFields to a package
135172
const loggedUser = await Users.findOneById(userId, {

0 commit comments

Comments
 (0)