Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion server/terminal-stream/replay-ring.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,16 @@ export class ReplayRing {
const frame = this.frames[i]
if (frame.seqStart > normalizedToSeq) break
if (frame.bytes > budget && frames.length > 0) break
frames.push(frame)

const previous = frames[frames.length - 1]
if (previous && frame.seqStart === previous.seqEnd + 1) {
previous.seqEnd = frame.seqEnd
previous.data += frame.data
previous.bytes += frame.bytes
previous.at = frame.at
} else {
frames.push({ ...frame })
}
budget -= frame.bytes
if (budget <= 0) break
}
Expand Down
9 changes: 1 addition & 8 deletions src/components/terminal/terminal-write-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ type TerminalWriteQueueArgs = {
write: (data: string, onWritten?: () => void) => void
onDrain?: () => void
budgetMs?: number
replayBudgetMs?: number
now?: () => number
requestFrame?: (cb: FrameRequestCallback) => number
cancelFrame?: (id: number) => void
Expand All @@ -35,23 +34,17 @@ type TaskQueueItem = {

type QueueItem = WriteQueueItem | TaskQueueItem

const DEFAULT_REPLAY_BUDGET_MS = 32
const MAX_COALESCED_REPLAY_WRITE_LENGTH = 256 * 1024

export function createTerminalWriteQueue(args: TerminalWriteQueueArgs): TerminalWriteQueue {
const queue: QueueItem[] = []
const budgetMs = args.budgetMs ?? 8
const replayBudgetMs = args.replayBudgetMs ?? Math.max(DEFAULT_REPLAY_BUDGET_MS, budgetMs)
const now = args.now ?? (() => performance.now())
const requestFrame = args.requestFrame ?? ((cb) => requestAnimationFrame(cb))
const cancelFrame = args.cancelFrame ?? ((id) => cancelAnimationFrame(id))
let rafId: number | null = null
let scheduled = false

const budgetForMode = (mode: TerminalWriteQueueMode | undefined) => (
mode === 'replay' ? replayBudgetMs : budgetMs
)

const runItem = (item: QueueItem) => {
if (item.kind === 'task') {
item.task()
Expand All @@ -67,7 +60,7 @@ export function createTerminalWriteQueue(args: TerminalWriteQueueArgs): Terminal
}

const flush = () => {
const deadline = now() + budgetForMode(queue[0]?.mode)
const deadline = now() + budgetMs
while (queue.length > 0 && now() <= deadline) {
const next = queue.shift()
if (next) runItem(next)
Expand Down
16 changes: 7 additions & 9 deletions test/server/ws-terminal-stream-v2-replay.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,10 @@ describe('terminal stream v2 replay', () => {
expect(ready.replayFromSeq).toBe(3)
expect(ready.replayToSeq).toBe(4)
expect(ready.attachRequestId).toBe(attachRequestId)
expect(replayed.length).toBe(2)
expect(replayed.length).toBe(1)
expect(replayed[0]?.seqStart).toBe(3)
expect(replayed[1]?.seqEnd).toBe(4)
expect(replayed[0]?.seqEnd).toBe(4)
expect(replayed.map((frame) => frame.data).join('')).toBe('threefour')
expect(replayed.every((frame) => frame.seqStart > 2)).toBe(true)
expect(replayed.every((frame) => frame.attachRequestId === attachRequestId)).toBe(true)

Expand Down Expand Up @@ -508,11 +509,11 @@ describe('terminal stream v2 replay', () => {
await close1()

const { ws: ws2, close: close2 } = await createAuthenticatedConnection(port)
const received: Array<{ type: string; seqStart?: number; seqEnd?: number }> = []
const received: Array<{ type: string; seqStart?: number; seqEnd?: number; data?: string }> = []
const listener = (data: WebSocket.Data) => {
const msg = JSON.parse(data.toString())
if (msg.terminalId !== terminalId) return
received.push({ type: msg.type, seqStart: msg.seqStart, seqEnd: msg.seqEnd })
received.push({ type: msg.type, seqStart: msg.seqStart, seqEnd: msg.seqEnd, data: msg.data })
}
ws2.on('message', listener)

Expand All @@ -536,13 +537,10 @@ describe('terminal stream v2 replay', () => {
expect(received[0]?.type).toBe('terminal.attach.ready')

const replayed = received.filter((msg) => msg.type === 'terminal.output')
expect(replayed).toHaveLength(7)
expect(replayed).toHaveLength(1)
expect(replayed[0]?.seqStart).toBe(6)
expect(replayed[replayed.length - 1]?.seqEnd).toBe(12)
for (let i = 0; i < replayed.length; i += 1) {
expect(replayed[i]?.seqStart).toBe(6 + i)
expect(replayed[i]?.seqEnd).toBe(6 + i)
}
expect(replayed.map((msg) => msg.data ?? '').join('')).toBe('f6|f7|f8|f9|f10|f11|f12|')
expect(received.some((msg) => msg.type === 'terminals.changed')).toBe(false)

await close2()
Expand Down
13 changes: 11 additions & 2 deletions test/unit/client/components/terminal/terminal-write-queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ describe('createTerminalWriteQueue', () => {
expect(callbacks).toEqual(['A', 'B'])
})

it('uses the replay budget when draining replay work', () => {
it('keeps replay work on the normal frame budget', () => {
const tasks: string[] = []
const rafCallbacks: FrameRequestCallback[] = []
let nowMs = 0
Expand All @@ -140,7 +140,6 @@ describe('createTerminalWriteQueue', () => {
cancelFrame: () => {},
now: () => nowMs,
budgetMs: 4,
replayBudgetMs: 12,
})

queue.enqueueTask(() => {
Expand All @@ -158,6 +157,16 @@ describe('createTerminalWriteQueue', () => {

rafCallbacks.shift()?.(16)

expect(tasks).toEqual(['A'])
expect(rafCallbacks).toHaveLength(1)

rafCallbacks.shift()?.(32)

expect(tasks).toEqual(['A', 'B'])
expect(rafCallbacks).toHaveLength(1)

rafCallbacks.shift()?.(48)

expect(tasks).toEqual(['A', 'B', 'C'])
expect(rafCallbacks).toHaveLength(0)
})
Expand Down
43 changes: 40 additions & 3 deletions test/unit/server/terminal-stream/replay-ring.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,56 @@ describe('ReplayRing', () => {
expect(replay.frames[1].seqEnd).toBe(3)
})

it('returns bounded replay batches without materializing the full replay window', () => {
it('returns coalesced bounded replay batches without materializing the full replay window', () => {
const ring = new ReplayRing(1024)
ring.append('aa')
ring.append('bb')
ring.append('cc')
ring.append('dd')

const firstBatch = ring.replayBatchSince(0, 4, 4)
expect(firstBatch.frames.map((f) => f.data)).toEqual(['aa', 'bb'])
expect(firstBatch.frames).toHaveLength(1)
expect(firstBatch.frames[0]).toMatchObject({
seqStart: 1,
seqEnd: 2,
data: 'aabb',
bytes: 4,
})
expect(firstBatch.missedFromSeq).toBeUndefined()

const secondBatch = ring.replayBatchSince(firstBatch.frames.at(-1)?.seqEnd, 4, 4)
expect(secondBatch.frames.map((f) => f.data)).toEqual(['cc', 'dd'])
expect(secondBatch.frames).toHaveLength(1)
expect(secondBatch.frames[0]).toMatchObject({
seqStart: 3,
seqEnd: 4,
data: 'ccdd',
bytes: 4,
})
})

it('splits coalesced replay batches at the byte budget', () => {
const ring = new ReplayRing(1024)
ring.append('aaa')
ring.append('bbb')
ring.append('ccc')

const firstBatch = ring.replayBatchSince(0, 6, 3)
expect(firstBatch.frames).toHaveLength(1)
expect(firstBatch.frames[0]).toMatchObject({
seqStart: 1,
seqEnd: 2,
data: 'aaabbb',
bytes: 6,
})

const secondBatch = ring.replayBatchSince(firstBatch.frames[0].seqEnd, 6, 3)
expect(secondBatch.frames).toHaveLength(1)
expect(secondBatch.frames[0]).toMatchObject({
seqStart: 3,
seqEnd: 3,
data: 'ccc',
bytes: 3,
})
})

it('reports replay miss when requested sequence is older than tail', () => {
Expand Down
34 changes: 34 additions & 0 deletions test/unit/server/ws-handler-backpressure.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,40 @@ describe('TerminalStreamBroker catastrophic bufferedAmount handling', () => {
broker.close()
})

it('coalesces contiguous replay frames before sending terminal.output payloads', async () => {
const registry = new FakeBrokerRegistry()
const broker = new TerminalStreamBroker(registry as any, vi.fn())
registry.createTerminal('term-replay-coalesced')

for (let i = 1; i <= 1000; i += 1) {
registry.emit('terminal.output.raw', {
terminalId: 'term-replay-coalesced',
data: `f${i};`,
at: Date.now(),
})
}

const wsReplay = createMockWs()
await broker.attach(wsReplay as any, 'term-replay-coalesced', 'transport_reconnect', 80, 24, 0, 'replay-attach')
vi.advanceTimersByTime(5)

const outputs = wsReplay.send.mock.calls
.map(([raw]) => (typeof raw === 'string' ? JSON.parse(raw) : raw))
.filter((payload) => payload?.type === 'terminal.output')

expect(outputs).toHaveLength(1)
expect(outputs[0]).toMatchObject({
attachRequestId: 'replay-attach',
seqStart: 1,
seqEnd: 1000,
})
expect(outputs[0].data).toContain('f1;')
expect(outputs[0].data).toContain('f1000;')
expect(Buffer.byteLength(outputs[0].data, 'utf8')).toBeLessThanOrEqual(MAX_REALTIME_MESSAGE_BYTES)

broker.close()
})

it('drains foreground replay batches without the background pacing delay', async () => {
const registry = new FakeBrokerRegistry()
const broker = new TerminalStreamBroker(registry as any, vi.fn())
Expand Down
Loading