diff --git a/src/components/TerminalView.tsx b/src/components/TerminalView.tsx index f723d8b16..619641eb8 100644 --- a/src/components/TerminalView.tsx +++ b/src/components/TerminalView.tsx @@ -89,7 +89,11 @@ import { type TerminalRuntime, } from '@/components/terminal/terminal-runtime' import { createLayoutScheduler } from '@/components/terminal/layout-scheduler' -import { createTerminalWriteQueue, type TerminalWriteQueue } from '@/components/terminal/terminal-write-queue' +import { + createTerminalWriteQueue, + type TerminalWriteQueue, + type TerminalWriteQueueOptions, +} from '@/components/terminal/terminal-write-queue' import { nanoid } from 'nanoid' import { cn } from '@/lib/utils' import { Terminal } from '@xterm/xterm' @@ -992,11 +996,11 @@ function TerminalView({ tabId, paneId, paneContent, hidden }: TerminalViewProps) } }, [suppressNetworkEffects, ws]) - const enqueueTerminalWrite = useCallback((data: string, onWritten?: () => void) => { + const enqueueTerminalWrite = useCallback((data: string, onWritten?: () => void, options?: TerminalWriteQueueOptions) => { if (!data) return const queue = writeQueueRef.current if (queue) { - queue.enqueue(data, onWritten) + queue.enqueue(data, onWritten, options) return } const term = termRef.current @@ -1069,6 +1073,7 @@ function TerminalView({ tabId, paneId, paneContent, hidden }: TerminalViewProps) tid: string | undefined, allowReplies: boolean, onRendered?: () => void, + writeOptions?: TerminalWriteQueueOptions, ) => { const startup = extractTerminalStartupProbes(raw, startupProbeStateRef.current, { foreground: resolvedThemeRef.current.foreground, @@ -1098,7 +1103,7 @@ function TerminalView({ tabId, paneId, paneContent, hidden }: TerminalViewProps) } if (cleaned) { - enqueueTerminalWrite(cleaned, onRendered) + enqueueTerminalWrite(cleaned, onRendered, writeOptions) } else { onRendered?.() } @@ -2172,7 +2177,14 @@ function TerminalView({ tabId, paneId, paneContent, hidden }: TerminalViewProps) markAttachComplete() } } - handleTerminalOutput(raw, mode, tid, !frameOverlapsReplay, completeRenderedFrame) + handleTerminalOutput( + raw, + mode, + tid, + !frameOverlapsReplay, + completeRenderedFrame, + { mode: frameOverlapsReplay ? 'replay' : 'live' }, + ) if (completedAttachOnFrame && frameOverlapsReplay) { resetStartupProbeParser({ discardReplayRemainder: true }) } diff --git a/src/components/terminal/terminal-write-queue.ts b/src/components/terminal/terminal-write-queue.ts index 9081fa7d9..d963f1e15 100644 --- a/src/components/terminal/terminal-write-queue.ts +++ b/src/components/terminal/terminal-write-queue.ts @@ -1,32 +1,76 @@ export type TerminalWriteQueue = { - enqueue: (data: string, onWritten?: () => void) => void - enqueueTask: (task: () => void) => void + enqueue: (data: string, onWritten?: () => void, options?: TerminalWriteQueueOptions) => void + enqueueTask: (task: () => void, options?: TerminalWriteQueueOptions) => void clear: () => void } +export type TerminalWriteQueueMode = 'live' | 'replay' + +export type TerminalWriteQueueOptions = { + mode?: TerminalWriteQueueMode +} + type TerminalWriteQueueArgs = { write: (data: string, onWritten?: () => void) => void onDrain?: () => void budgetMs?: number + replayBudgetMs?: number now?: () => number requestFrame?: (cb: FrameRequestCallback) => number cancelFrame?: (id: number) => void } +type WriteQueueItem = { + kind: 'write' + mode: TerminalWriteQueueMode + data: string + callbacks: Array<() => void> +} + +type TaskQueueItem = { + kind: 'task' + mode: TerminalWriteQueueMode + task: () => void +} + +type QueueItem = WriteQueueItem | TaskQueueItem + +const DEFAULT_REPLAY_BUDGET_MS = 32 +const MAX_COALESCED_REPLAY_WRITE_LENGTH = 256 * 1024 + export function createTerminalWriteQueue(args: TerminalWriteQueueArgs): TerminalWriteQueue { - const queue: Array<() => void> = [] + const queue: QueueItem[] = [] const budgetMs = args.budgetMs ?? 8 + const replayBudgetMs = args.replayBudgetMs ?? Math.max(DEFAULT_REPLAY_BUDGET_MS, budgetMs) const now = args.now ?? (() => performance.now()) const requestFrame = args.requestFrame ?? ((cb) => requestAnimationFrame(cb)) const cancelFrame = args.cancelFrame ?? ((id) => cancelAnimationFrame(id)) let rafId: number | null = null let scheduled = false + const budgetForMode = (mode: TerminalWriteQueueMode | undefined) => ( + mode === 'replay' ? replayBudgetMs : budgetMs + ) + + const runItem = (item: QueueItem) => { + if (item.kind === 'task') { + item.task() + return + } + + const onWritten = item.callbacks.length > 0 + ? () => { + for (const callback of item.callbacks) callback() + } + : undefined + args.write(item.data, onWritten) + } + const flush = () => { - const deadline = now() + budgetMs + const deadline = now() + budgetForMode(queue[0]?.mode) while (queue.length > 0 && now() <= deadline) { const next = queue.shift() - next?.() + if (next) runItem(next) } if (queue.length > 0) { scheduleFlush() @@ -46,13 +90,26 @@ export function createTerminalWriteQueue(args: TerminalWriteQueueArgs): Terminal } return { - enqueue(data, onWritten) { + enqueue(data, onWritten, options) { if (!data) return - queue.push(() => args.write(data, onWritten)) + const mode = options?.mode ?? 'live' + const callbacks = onWritten ? [onWritten] : [] + const previous = queue[queue.length - 1] + if ( + mode === 'replay' + && previous?.kind === 'write' + && previous.mode === 'replay' + && previous.data.length + data.length <= MAX_COALESCED_REPLAY_WRITE_LENGTH + ) { + previous.data += data + previous.callbacks.push(...callbacks) + } else { + queue.push({ kind: 'write', mode, data, callbacks }) + } scheduleFlush() }, - enqueueTask(task) { - queue.push(task) + enqueueTask(task, options) { + queue.push({ kind: 'task', mode: options?.mode ?? 'live', task }) scheduleFlush() }, clear() { diff --git a/test/e2e/terminal-create-attach-ordering.test.tsx b/test/e2e/terminal-create-attach-ordering.test.tsx index 2caaabaa2..1490f198e 100644 --- a/test/e2e/terminal-create-attach-ordering.test.tsx +++ b/test/e2e/terminal-create-attach-ordering.test.tsx @@ -354,6 +354,85 @@ describe('terminal create/attach ordering (e2e)', () => { expect(writes).toContain('hidden-r8') }) + it('coalesces attach replay writes that arrive before the frame drain', async () => { + const rafCallbacks: FrameRequestCallback[] = [] + vi.mocked(window.requestAnimationFrame).mockImplementation((cb: FrameRequestCallback) => { + rafCallbacks.push(cb) + return rafCallbacks.length + }) + const store = createStore({ + status: 'running', + requestId: 'req-order-coalesce', + terminalId: 'term-order-coalesce', + }) + + render( + + , + ) + + await waitFor(() => { + expect(lastSent('terminal.attach', 'term-order-coalesce')).toMatchObject({ + type: 'terminal.attach', + terminalId: 'term-order-coalesce', + attachRequestId: expect.any(String), + }) + }) + rafCallbacks.length = 0 + + const attach = lastSent('terminal.attach', 'term-order-coalesce') + wsHarness.emit({ + type: 'terminal.attach.ready', + terminalId: 'term-order-coalesce', + headSeq: 3, + replayFromSeq: 1, + replayToSeq: 3, + attachRequestId: attach.attachRequestId, + }) + wsHarness.emit({ + type: 'terminal.output', + terminalId: 'term-order-coalesce', + seqStart: 1, + seqEnd: 1, + data: 'replay-1', + attachRequestId: attach.attachRequestId, + }) + wsHarness.emit({ + type: 'terminal.output', + terminalId: 'term-order-coalesce', + seqStart: 2, + seqEnd: 2, + data: 'replay-2', + attachRequestId: attach.attachRequestId, + }) + wsHarness.emit({ + type: 'terminal.output', + terminalId: 'term-order-coalesce', + seqStart: 3, + seqEnd: 3, + data: 'replay-3', + attachRequestId: attach.attachRequestId, + }) + + expect(terminalInstances[0].write).not.toHaveBeenCalled() + + act(() => { + while (rafCallbacks.length > 0) { + rafCallbacks.shift()?.(16) + } + }) + + expect(terminalInstances[0].write.mock.calls.map(([data]) => String(data))).toEqual([ + 'replay-1replay-2replay-3', + ]) + }) + it('reconnect path drops stale frames from the old attach generation and accepts the new one only after ready', async () => { const store = createStore({ status: 'running', diff --git a/test/unit/client/components/terminal/terminal-write-queue.test.ts b/test/unit/client/components/terminal/terminal-write-queue.test.ts index d94355468..de38572a4 100644 --- a/test/unit/client/components/terminal/terminal-write-queue.test.ts +++ b/test/unit/client/components/terminal/terminal-write-queue.test.ts @@ -98,4 +98,67 @@ describe('createTerminalWriteQueue', () => { expect(writes).toEqual(['A', 'B', 'C']) expect(rafCallbacks).toHaveLength(0) }) + + it('coalesces adjacent writes and preserves write callbacks', () => { + const writes: string[] = [] + const callbacks: string[] = [] + const rafCallbacks: FrameRequestCallback[] = [] + + const queue = createTerminalWriteQueue({ + write: (chunk, onWritten) => { + writes.push(chunk) + onWritten?.() + }, + requestFrame: (cb) => { + rafCallbacks.push(cb) + return rafCallbacks.length + }, + cancelFrame: () => {}, + }) + + queue.enqueue('A', () => callbacks.push('A'), { mode: 'replay' }) + queue.enqueue('B', () => callbacks.push('B'), { mode: 'replay' }) + queue.enqueue('C', undefined, { mode: 'replay' }) + + rafCallbacks.shift()?.(16) + + expect(writes).toEqual(['ABC']) + expect(callbacks).toEqual(['A', 'B']) + }) + + it('uses the replay budget when draining replay work', () => { + const tasks: string[] = [] + const rafCallbacks: FrameRequestCallback[] = [] + let nowMs = 0 + + const queue = createTerminalWriteQueue({ + write: () => {}, + requestFrame: (cb) => { + rafCallbacks.push(cb) + return rafCallbacks.length + }, + cancelFrame: () => {}, + now: () => nowMs, + budgetMs: 4, + replayBudgetMs: 12, + }) + + queue.enqueueTask(() => { + tasks.push('A') + nowMs += 5 + }, { mode: 'replay' }) + queue.enqueueTask(() => { + tasks.push('B') + nowMs += 5 + }, { mode: 'replay' }) + queue.enqueueTask(() => { + tasks.push('C') + nowMs += 5 + }, { mode: 'replay' }) + + rafCallbacks.shift()?.(16) + + expect(tasks).toEqual(['A', 'B', 'C']) + expect(rafCallbacks).toHaveLength(0) + }) })