From 0e89ca351321030ae033554b185f624c24592a5f Mon Sep 17 00:00:00 2001 From: Codex Date: Wed, 10 Jun 2026 10:33:25 -0700 Subject: [PATCH] Ack duplicate Codex interrupts after completion --- .../codex-app-server/remote-proxy.ts | 71 +++++++++++++++++-- .../codex-app-server/remote-proxy.test.ts | 53 ++++++++++++++ 2 files changed, 120 insertions(+), 4 deletions(-) diff --git a/server/coding-cli/codex-app-server/remote-proxy.ts b/server/coding-cli/codex-app-server/remote-proxy.ts index ec6ccc9cd..e553d65c7 100644 --- a/server/coding-cli/codex-app-server/remote-proxy.ts +++ b/server/coding-cli/codex-app-server/remote-proxy.ts @@ -3,6 +3,7 @@ import { allocateLocalhostPort, type LoopbackServerEndpoint } from '../../local- import { CodexFsChangedNotificationSchema, CodexThreadLifecycleNotificationSchema, + CodexTurnInterruptParamsSchema, CodexTurnCompletedNotificationSchema, CodexTurnStartedNotificationSchema, type CodexThreadHandle, @@ -47,6 +48,7 @@ type CodexRemoteProxyOptions = { const DEFAULT_REQUEST_HOLD_TIMEOUT_MS = 5_000 const DEFAULT_CANDIDATE_CAPTURE_TIMEOUT_MS = 45_000 +const MAX_COMPLETED_TURN_KEYS = 256 export class CodexRemoteProxy { private readonly upstreamWsUrl: string @@ -67,6 +69,8 @@ export class CodexRemoteProxy { private readonly repairTriggerHandlers = new Set<(event: CodexRemoteProxyRepairTrigger) => void>() private readonly lifecycleHandlers = new Set<(event: CodexThreadLifecycleEvent) => void>() private readonly lifecycleLossHandlers = new Set<(event: CodexThreadLifecycleLossEvent) => void>() + private readonly activeTurnKeys = new Set() + private readonly completedTurnKeys = new Set() constructor(options: CodexRemoteProxyOptions) { this.upstreamWsUrl = options.upstreamWsUrl @@ -260,16 +264,31 @@ export class CodexRemoteProxy { const parsed = parseJson(raw) const method = parsed && typeof parsed === 'object' ? (parsed as Record).method : undefined const id = jsonRpcId(parsed) - if (id !== undefined && typeof method === 'string') { - connection.pendingMethods.set(id, method) - } if (typeof method === 'string') { log.debug({ proxyWsUrl: this.endpoint ? this.wsUrl : undefined, upstreamWsUrl: this.upstreamWsUrl, method, id, - }, 'Codex remote proxy forwarding client request') + }, 'Codex remote proxy received client request') + } + + const completedTurnInterrupt = this.completedTurnInterrupt(parsed) + if (id !== undefined && completedTurnInterrupt) { + log.info({ + proxyWsUrl: this.endpoint ? this.wsUrl : undefined, + upstreamWsUrl: this.upstreamWsUrl, + method, + id, + threadId: completedTurnInterrupt.threadId, + turnId: completedTurnInterrupt.turnId, + }, 'Codex remote proxy acknowledged interrupt for completed turn') + this.sendJsonRpcSuccess(connection.client, id, {}) + return + } + + if (id !== undefined && typeof method === 'string') { + connection.pendingMethods.set(id, method) } if (this.requireCandidatePersistence && method === 'turn/start' && !this.candidatePersisted) { @@ -348,12 +367,14 @@ export class CodexRemoteProxy { const turnStarted = CodexTurnStartedNotificationSchema.safeParse(parsed) if (turnStarted.success) { + this.recordTurnStarted(turnStarted.data.params) this.emitTurnEvent(this.turnStartedHandlers, turnStarted.data.params) return } const turnCompleted = CodexTurnCompletedNotificationSchema.safeParse(parsed) if (turnCompleted.success) { + this.recordTurnCompleted(turnCompleted.data.params) this.emitTurnEvent(this.turnCompletedHandlers, turnCompleted.data.params) return } @@ -430,6 +451,10 @@ export class CodexRemoteProxy { })) } + private sendJsonRpcSuccess(client: WebSocket, id: JsonRpcId, result: Record): void { + sendIfOpen(client, JSON.stringify({ id, result })) + } + private ensureCandidateCaptureTimer(): void { if (!this.requireCandidatePersistence) return if (this.candidatePersisted || this.candidateCaptureTimer) return @@ -467,6 +492,40 @@ export class CodexRemoteProxy { } } + private recordTurnStarted(params: { threadId: string; turnId?: string }): void { + if (typeof params.turnId !== 'string') return + const key = turnKey(params.threadId, params.turnId) + this.activeTurnKeys.add(key) + this.completedTurnKeys.delete(key) + } + + private recordTurnCompleted(params: { threadId: string; turnId?: string }): void { + if (typeof params.turnId !== 'string') return + const key = turnKey(params.threadId, params.turnId) + this.activeTurnKeys.delete(key) + this.rememberCompletedTurnKey(key) + } + + private rememberCompletedTurnKey(key: string): void { + this.completedTurnKeys.delete(key) + this.completedTurnKeys.add(key) + while (this.completedTurnKeys.size > MAX_COMPLETED_TURN_KEYS) { + const oldest = this.completedTurnKeys.values().next().value + if (typeof oldest !== 'string') return + this.completedTurnKeys.delete(oldest) + } + } + + private completedTurnInterrupt(parsed: unknown): { threadId: string; turnId: string } | undefined { + if (!parsed || typeof parsed !== 'object') return undefined + const message = parsed as Record + if (message.method !== 'turn/interrupt') return undefined + const params = CodexTurnInterruptParamsSchema.safeParse(message.params) + if (!params.success) return undefined + const key = turnKey(params.data.threadId, params.data.turnId) + return this.completedTurnKeys.has(key) && !this.activeTurnKeys.has(key) ? params.data : undefined + } + private emitRepairTrigger(event: CodexRemoteProxyRepairTrigger): void { for (const handler of this.repairTriggerHandlers) { handler(event) @@ -514,6 +573,10 @@ function sendIfOpen(socket: WebSocket, data: WebSocket.RawData | string): void { } } +function turnKey(threadId: string, turnId: string): string { + return `${threadId}\u0000${turnId}` +} + function normalizeCandidateThread(thread: unknown): CodexThreadHandle | undefined { if (!thread || typeof thread !== 'object') return undefined const candidate = thread as Record diff --git a/test/unit/server/coding-cli/codex-app-server/remote-proxy.test.ts b/test/unit/server/coding-cli/codex-app-server/remote-proxy.test.ts index 9daeac6be..c3e7dc081 100644 --- a/test/unit/server/coding-cli/codex-app-server/remote-proxy.test.ts +++ b/test/unit/server/coding-cli/codex-app-server/remote-proxy.test.ts @@ -81,6 +81,15 @@ function nextMessage(socket: WebSocket): Promise { }) } +function nextMessageWithin(socket: WebSocket, ms: number): Promise { + return Promise.race([ + nextMessage(socket), + delay(ms).then(() => { + throw new Error(`Timed out waiting ${ms}ms for websocket message.`) + }), + ]) +} + function nextMessageFrame(socket: WebSocket): Promise<{ message: any; isBinary: boolean }> { return new Promise((resolve) => { socket.once('message', (raw, isBinary) => resolve({ @@ -336,4 +345,48 @@ describe('CodexRemoteProxy', () => { params: { threadId: 'thread-1', turnId: 'turn-1', status: 'completed' }, }) }) + + it('acks duplicate turn/interrupt after the turn already completed', async () => { + const interruptRequests: unknown[] = [] + const upstream = await startUpstream((socket, message) => { + if (message.method !== 'turn/interrupt') return + interruptRequests.push(message) + if (interruptRequests.length !== 1) return + + socket.send(JSON.stringify({ id: message.id, result: {} })) + socket.send(JSON.stringify({ + method: 'thread/status/changed', + params: { threadId: 'thread-1', status: { type: 'idle' } }, + })) + socket.send(JSON.stringify({ + method: 'turn/completed', + params: { threadId: 'thread-1', turnId: 'turn-1' }, + })) + }) + const proxy = await startProxy(upstream.wsUrl, { + requireCandidatePersistence: false, + }) + const completed = new Promise((resolve) => { + proxy.onTurnCompleted((event) => resolve(event)) + }) + const tui = await connect(proxy.wsUrl) + + tui.send(JSON.stringify({ + id: 1, + method: 'turn/interrupt', + params: { threadId: 'thread-1', turnId: 'turn-1' }, + })) + await expect(nextMessageWithin(tui, 100)).resolves.toEqual({ id: 1, result: {} }) + await expect(completed).resolves.toMatchObject({ threadId: 'thread-1', turnId: 'turn-1' }) + + tui.send(JSON.stringify({ + id: 2, + method: 'turn/interrupt', + params: { threadId: 'thread-1', turnId: 'turn-1' }, + })) + + await expect(nextMessageWithin(tui, 50)).resolves.toEqual({ id: 2, result: {} }) + await delay(25) + expect(interruptRequests).toHaveLength(1) + }) })