diff --git a/server/terminal-stream/replay-ring.ts b/server/terminal-stream/replay-ring.ts index 85ae3f33..f5ef2304 100644 --- a/server/terminal-stream/replay-ring.ts +++ b/server/terminal-stream/replay-ring.ts @@ -112,7 +112,16 @@ export class ReplayRing { const frame = this.frames[i] if (frame.seqStart > normalizedToSeq) break if (frame.bytes > budget && frames.length > 0) break - frames.push(frame) + + const previous = frames[frames.length - 1] + if (previous && frame.seqStart === previous.seqEnd + 1) { + previous.seqEnd = frame.seqEnd + previous.data += frame.data + previous.bytes += frame.bytes + previous.at = frame.at + } else { + frames.push({ ...frame }) + } budget -= frame.bytes if (budget <= 0) break } diff --git a/src/components/terminal/terminal-write-queue.ts b/src/components/terminal/terminal-write-queue.ts index d963f1e1..b66e7555 100644 --- a/src/components/terminal/terminal-write-queue.ts +++ b/src/components/terminal/terminal-write-queue.ts @@ -14,7 +14,6 @@ type TerminalWriteQueueArgs = { write: (data: string, onWritten?: () => void) => void onDrain?: () => void budgetMs?: number - replayBudgetMs?: number now?: () => number requestFrame?: (cb: FrameRequestCallback) => number cancelFrame?: (id: number) => void @@ -35,23 +34,17 @@ type TaskQueueItem = { type QueueItem = WriteQueueItem | TaskQueueItem -const DEFAULT_REPLAY_BUDGET_MS = 32 const MAX_COALESCED_REPLAY_WRITE_LENGTH = 256 * 1024 export function createTerminalWriteQueue(args: TerminalWriteQueueArgs): TerminalWriteQueue { const queue: QueueItem[] = [] const budgetMs = args.budgetMs ?? 8 - const replayBudgetMs = args.replayBudgetMs ?? Math.max(DEFAULT_REPLAY_BUDGET_MS, budgetMs) const now = args.now ?? (() => performance.now()) const requestFrame = args.requestFrame ?? ((cb) => requestAnimationFrame(cb)) const cancelFrame = args.cancelFrame ?? ((id) => cancelAnimationFrame(id)) let rafId: number | null = null let scheduled = false - const budgetForMode = (mode: TerminalWriteQueueMode | undefined) => ( - mode === 'replay' ? replayBudgetMs : budgetMs - ) - const runItem = (item: QueueItem) => { if (item.kind === 'task') { item.task() @@ -67,7 +60,7 @@ export function createTerminalWriteQueue(args: TerminalWriteQueueArgs): Terminal } const flush = () => { - const deadline = now() + budgetForMode(queue[0]?.mode) + const deadline = now() + budgetMs while (queue.length > 0 && now() <= deadline) { const next = queue.shift() if (next) runItem(next) diff --git a/test/server/ws-terminal-stream-v2-replay.test.ts b/test/server/ws-terminal-stream-v2-replay.test.ts index b5206adb..18701a32 100644 --- a/test/server/ws-terminal-stream-v2-replay.test.ts +++ b/test/server/ws-terminal-stream-v2-replay.test.ts @@ -408,9 +408,10 @@ describe('terminal stream v2 replay', () => { expect(ready.replayFromSeq).toBe(3) expect(ready.replayToSeq).toBe(4) expect(ready.attachRequestId).toBe(attachRequestId) - expect(replayed.length).toBe(2) + expect(replayed.length).toBe(1) expect(replayed[0]?.seqStart).toBe(3) - expect(replayed[1]?.seqEnd).toBe(4) + expect(replayed[0]?.seqEnd).toBe(4) + expect(replayed.map((frame) => frame.data).join('')).toBe('threefour') expect(replayed.every((frame) => frame.seqStart > 2)).toBe(true) expect(replayed.every((frame) => frame.attachRequestId === attachRequestId)).toBe(true) @@ -508,11 +509,11 @@ describe('terminal stream v2 replay', () => { await close1() const { ws: ws2, close: close2 } = await createAuthenticatedConnection(port) - const received: Array<{ type: string; seqStart?: number; seqEnd?: number }> = [] + const received: Array<{ type: string; seqStart?: number; seqEnd?: number; data?: string }> = [] const listener = (data: WebSocket.Data) => { const msg = JSON.parse(data.toString()) if (msg.terminalId !== terminalId) return - received.push({ type: msg.type, seqStart: msg.seqStart, seqEnd: msg.seqEnd }) + received.push({ type: msg.type, seqStart: msg.seqStart, seqEnd: msg.seqEnd, data: msg.data }) } ws2.on('message', listener) @@ -536,13 +537,10 @@ describe('terminal stream v2 replay', () => { expect(received[0]?.type).toBe('terminal.attach.ready') const replayed = received.filter((msg) => msg.type === 'terminal.output') - expect(replayed).toHaveLength(7) + expect(replayed).toHaveLength(1) expect(replayed[0]?.seqStart).toBe(6) expect(replayed[replayed.length - 1]?.seqEnd).toBe(12) - for (let i = 0; i < replayed.length; i += 1) { - expect(replayed[i]?.seqStart).toBe(6 + i) - expect(replayed[i]?.seqEnd).toBe(6 + i) - } + expect(replayed.map((msg) => msg.data ?? '').join('')).toBe('f6|f7|f8|f9|f10|f11|f12|') expect(received.some((msg) => msg.type === 'terminals.changed')).toBe(false) await close2() diff --git a/test/unit/client/components/terminal/terminal-write-queue.test.ts b/test/unit/client/components/terminal/terminal-write-queue.test.ts index de38572a..99045c94 100644 --- a/test/unit/client/components/terminal/terminal-write-queue.test.ts +++ b/test/unit/client/components/terminal/terminal-write-queue.test.ts @@ -126,7 +126,7 @@ describe('createTerminalWriteQueue', () => { expect(callbacks).toEqual(['A', 'B']) }) - it('uses the replay budget when draining replay work', () => { + it('keeps replay work on the normal frame budget', () => { const tasks: string[] = [] const rafCallbacks: FrameRequestCallback[] = [] let nowMs = 0 @@ -140,7 +140,6 @@ describe('createTerminalWriteQueue', () => { cancelFrame: () => {}, now: () => nowMs, budgetMs: 4, - replayBudgetMs: 12, }) queue.enqueueTask(() => { @@ -158,6 +157,16 @@ describe('createTerminalWriteQueue', () => { rafCallbacks.shift()?.(16) + expect(tasks).toEqual(['A']) + expect(rafCallbacks).toHaveLength(1) + + rafCallbacks.shift()?.(32) + + expect(tasks).toEqual(['A', 'B']) + expect(rafCallbacks).toHaveLength(1) + + rafCallbacks.shift()?.(48) + expect(tasks).toEqual(['A', 'B', 'C']) expect(rafCallbacks).toHaveLength(0) }) diff --git a/test/unit/server/terminal-stream/replay-ring.test.ts b/test/unit/server/terminal-stream/replay-ring.test.ts index ac89e5a1..d2486a47 100644 --- a/test/unit/server/terminal-stream/replay-ring.test.ts +++ b/test/unit/server/terminal-stream/replay-ring.test.ts @@ -53,7 +53,7 @@ describe('ReplayRing', () => { expect(replay.frames[1].seqEnd).toBe(3) }) - it('returns bounded replay batches without materializing the full replay window', () => { + it('returns coalesced bounded replay batches without materializing the full replay window', () => { const ring = new ReplayRing(1024) ring.append('aa') ring.append('bb') @@ -61,11 +61,48 @@ describe('ReplayRing', () => { ring.append('dd') const firstBatch = ring.replayBatchSince(0, 4, 4) - expect(firstBatch.frames.map((f) => f.data)).toEqual(['aa', 'bb']) + expect(firstBatch.frames).toHaveLength(1) + expect(firstBatch.frames[0]).toMatchObject({ + seqStart: 1, + seqEnd: 2, + data: 'aabb', + bytes: 4, + }) expect(firstBatch.missedFromSeq).toBeUndefined() const secondBatch = ring.replayBatchSince(firstBatch.frames.at(-1)?.seqEnd, 4, 4) - expect(secondBatch.frames.map((f) => f.data)).toEqual(['cc', 'dd']) + expect(secondBatch.frames).toHaveLength(1) + expect(secondBatch.frames[0]).toMatchObject({ + seqStart: 3, + seqEnd: 4, + data: 'ccdd', + bytes: 4, + }) + }) + + it('splits coalesced replay batches at the byte budget', () => { + const ring = new ReplayRing(1024) + ring.append('aaa') + ring.append('bbb') + ring.append('ccc') + + const firstBatch = ring.replayBatchSince(0, 6, 3) + expect(firstBatch.frames).toHaveLength(1) + expect(firstBatch.frames[0]).toMatchObject({ + seqStart: 1, + seqEnd: 2, + data: 'aaabbb', + bytes: 6, + }) + + const secondBatch = ring.replayBatchSince(firstBatch.frames[0].seqEnd, 6, 3) + expect(secondBatch.frames).toHaveLength(1) + expect(secondBatch.frames[0]).toMatchObject({ + seqStart: 3, + seqEnd: 3, + data: 'ccc', + bytes: 3, + }) }) it('reports replay miss when requested sequence is older than tail', () => { diff --git a/test/unit/server/ws-handler-backpressure.test.ts b/test/unit/server/ws-handler-backpressure.test.ts index 7efbf2d7..3bfd0c8c 100644 --- a/test/unit/server/ws-handler-backpressure.test.ts +++ b/test/unit/server/ws-handler-backpressure.test.ts @@ -461,6 +461,40 @@ describe('TerminalStreamBroker catastrophic bufferedAmount handling', () => { broker.close() }) + it('coalesces contiguous replay frames before sending terminal.output payloads', async () => { + const registry = new FakeBrokerRegistry() + const broker = new TerminalStreamBroker(registry as any, vi.fn()) + registry.createTerminal('term-replay-coalesced') + + for (let i = 1; i <= 1000; i += 1) { + registry.emit('terminal.output.raw', { + terminalId: 'term-replay-coalesced', + data: `f${i};`, + at: Date.now(), + }) + } + + const wsReplay = createMockWs() + await broker.attach(wsReplay as any, 'term-replay-coalesced', 'transport_reconnect', 80, 24, 0, 'replay-attach') + vi.advanceTimersByTime(5) + + const outputs = wsReplay.send.mock.calls + .map(([raw]) => (typeof raw === 'string' ? JSON.parse(raw) : raw)) + .filter((payload) => payload?.type === 'terminal.output') + + expect(outputs).toHaveLength(1) + expect(outputs[0]).toMatchObject({ + attachRequestId: 'replay-attach', + seqStart: 1, + seqEnd: 1000, + }) + expect(outputs[0].data).toContain('f1;') + expect(outputs[0].data).toContain('f1000;') + expect(Buffer.byteLength(outputs[0].data, 'utf8')).toBeLessThanOrEqual(MAX_REALTIME_MESSAGE_BYTES) + + broker.close() + }) + it('drains foreground replay batches without the background pacing delay', async () => { const registry = new FakeBrokerRegistry() const broker = new TerminalStreamBroker(registry as any, vi.fn())