Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 38 additions & 13 deletions server/coding-cli/codex-app-server/launch-planner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,28 @@ type PlanCreateInput = {
approvalPolicy?: string
}

type CodexLaunchProxyOptions = {
upstreamWsUrl: string
requireCandidatePersistence?: boolean
}

type CodexLaunchProxy = Pick<
CodexRemoteProxy,
| 'start'
| 'close'
| 'markCandidatePersisted'
| 'onCandidate'
| 'onTurnStarted'
| 'onTurnCompleted'
| 'onRepairTrigger'
| 'onThreadLifecycle'
| 'onLifecycleLoss'
>

type CodexLaunchPlannerOptions = {
proxyFactory?: (options: CodexLaunchProxyOptions) => CodexLaunchProxy
}

function errorMessage(error: unknown): string {
return error instanceof Error ? error.message : String(error)
}
Expand All @@ -82,13 +104,18 @@ export class CodexLaunchPlanner {
private readonly activeSidecars = new Set<CodexLaunchSidecar>()
private readonly failedSidecarShutdowns = new Set<CodexLaunchSidecar>()
private readonly runtimeFactory: () => CodexRuntimeLike
private readonly proxyFactory: (options: CodexLaunchProxyOptions) => CodexLaunchProxy
private shutdownStarted = false
private shutdownPromise: Promise<void> | null = null

constructor(runtimeOrFactory: CodexRuntimeLike | (() => CodexRuntimeLike)) {
constructor(
runtimeOrFactory: CodexRuntimeLike | (() => CodexRuntimeLike),
options: CodexLaunchPlannerOptions = {},
) {
this.runtimeFactory = typeof runtimeOrFactory === 'function'
? runtimeOrFactory
: () => runtimeOrFactory
this.proxyFactory = options.proxyFactory ?? ((proxyOptions) => new CodexRemoteProxy(proxyOptions))
}

async planCreate(input: PlanCreateInput): Promise<CodexLaunchPlan> {
Expand All @@ -97,14 +124,14 @@ export class CodexLaunchPlanner {
this.assertAcceptingPlans()

const runtime = this.runtimeFactory()
let proxy: CodexRemoteProxy | undefined
let proxy: CodexLaunchProxy | undefined
const sidecar = this.createSidecar(runtime, () => proxy)
this.activeSidecars.add(sidecar)

try {
if (input.resumeSessionId) {
const ready = await runtime.ensureReady(input.cwd)
proxy = new CodexRemoteProxy({
proxy = this.proxyFactory({
upstreamWsUrl: ready.wsUrl,
requireCandidatePersistence: false,
})
Expand All @@ -120,7 +147,7 @@ export class CodexLaunchPlanner {
}

const ready = await runtime.ensureReady(input.cwd)
proxy = new CodexRemoteProxy({ upstreamWsUrl: ready.wsUrl })
proxy = this.proxyFactory({ upstreamWsUrl: ready.wsUrl })
const proxyReady = await proxy.start()
this.assertAcceptingPlans()

Expand Down Expand Up @@ -187,14 +214,10 @@ export class CodexLaunchPlanner {
}
}

private createSidecar(runtime: CodexRuntimeLike, getProxy: () => CodexRemoteProxy | undefined): CodexLaunchSidecar {
private createSidecar(runtime: CodexRuntimeLike, getProxy: () => CodexLaunchProxy | undefined): CodexLaunchSidecar {
let shutdownPromise: Promise<void> | null = null
let shutdownAttemptStarted = false
let shutdownSucceeded = false
let notifyShutdownStarted!: () => void
const shutdownStarted = new Promise<void>((resolve) => {
notifyShutdownStarted = resolve
})
const assertAdoptable = () => {
if (this.shutdownStarted || shutdownAttemptStarted) {
throw new Error('Codex launch sidecar is shutting down; it cannot be adopted.')
Expand Down Expand Up @@ -255,12 +278,14 @@ export class CodexLaunchPlanner {
}
if (!shutdownAttemptStarted) {
shutdownAttemptStarted = true
notifyShutdownStarted()
}
const attempt = Promise.resolve()
.then(async () => {
await getProxy()?.close()
await runtime.shutdown()
.then(() => {
const proxy = getProxy()
return waitForAllSettledOrThrow([
...(proxy ? [proxy.close()] : []),
runtime.shutdown(),
], 'Codex launch sidecar shutdown failed.')
})
.then(() => {
shutdownSucceeded = true
Expand Down
101 changes: 87 additions & 14 deletions test/unit/server/coding-cli/codex-app-server/launch-planner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,63 @@ class FakeRuntime {
}
}

type FakeProxyOptions = {
upstreamWsUrl: string
requireCandidatePersistence?: boolean
}

class FakeProxy {
private static nextPort = 54000

closeCalls = 0
closeBlocker?: Promise<void>
closeError?: Error
readonly wsUrl: string
readonly upstreamWsUrl: string
readonly requireCandidatePersistence: boolean

constructor(options: FakeProxyOptions) {
this.upstreamWsUrl = options.upstreamWsUrl
this.requireCandidatePersistence = options.requireCandidatePersistence ?? true
this.wsUrl = `ws://127.0.0.1:${FakeProxy.nextPort++}`
}

async start() {
return { wsUrl: this.wsUrl }
}

async close() {
this.closeCalls += 1
await this.closeBlocker
if (this.closeError) throw this.closeError
}

markCandidatePersisted() {}
onCandidate() { return () => undefined }
onTurnStarted() { return () => undefined }
onTurnCompleted() { return () => undefined }
onRepairTrigger() { return () => undefined }
onThreadLifecycle() { return () => undefined }
onLifecycleLoss() { return () => undefined }
}

function createPlanner(
runtimeOrFactory: ConstructorParameters<typeof CodexLaunchPlanner>[0],
proxies: FakeProxy[] = [],
) {
return new CodexLaunchPlanner(runtimeOrFactory, {
proxyFactory: (options: FakeProxyOptions) => {
const proxy = new FakeProxy(options)
proxies.push(proxy)
return proxy as any
},
})
}

describe('CodexLaunchPlanner', () => {
it('creates a distinct owned sidecar for each launch plan', async () => {
const runtimes: FakeRuntime[] = []
const planner = new CodexLaunchPlanner(() => {
const planner = createPlanner(() => {
const index = runtimes.length + 1
const runtime = new FakeRuntime(`ws://127.0.0.1:${43000 + index}`, `thread-${index}`)
runtimes.push(runtime)
Expand Down Expand Up @@ -113,7 +166,7 @@ describe('CodexLaunchPlanner', () => {
it('shuts down the owned sidecar when planning fails before adoption', async () => {
const runtime = new FakeRuntime('ws://127.0.0.1:43010', 'thread-fail')
runtime.ensureReadyError = new Error('start failed')
const planner = new CodexLaunchPlanner(() => runtime as any)
const planner = createPlanner(() => runtime as any)

await expect(planner.planCreate({ cwd: '/repo/fail' })).rejects.toThrow('start failed')

Expand All @@ -124,7 +177,7 @@ describe('CodexLaunchPlanner', () => {
const runtime = new FakeRuntime('ws://127.0.0.1:43022', 'thread-fail')
runtime.ensureReadyError = new Error('start failed')
runtime.shutdownError = new Error('verified runtime teardown failed')
const planner = new CodexLaunchPlanner(() => runtime as any)
const planner = createPlanner(() => runtime as any)

let rejection: unknown
try {
Expand All @@ -143,7 +196,7 @@ describe('CodexLaunchPlanner', () => {
const adoptedRuntime = new FakeRuntime('ws://127.0.0.1:43011', 'thread-adopted')
const pendingRuntime = new FakeRuntime('ws://127.0.0.1:43012', 'thread-pending')
const runtimes = [adoptedRuntime, pendingRuntime]
const planner = new CodexLaunchPlanner(() => runtimes.shift()! as any)
const planner = createPlanner(() => runtimes.shift()! as any)

const adopted = await planner.planCreate({ cwd: '/repo/adopted' })
const pending = await planner.planCreate({ cwd: '/repo/pending' })
Expand All @@ -162,7 +215,7 @@ describe('CodexLaunchPlanner', () => {
it('keeps a failed-adoption sidecar planner-owned so shutdown can clean it up', async () => {
const runtime = new FakeRuntime('ws://127.0.0.1:43013', 'thread-adopt-fails')
runtime.adoptError = new Error('no active owned Codex app-server sidecar')
const planner = new CodexLaunchPlanner(() => runtime as any)
const planner = createPlanner(() => runtime as any)

const plan = await planner.planCreate({ cwd: '/repo/adopt-fails' })
await expect(plan.sidecar.adopt({ terminalId: 'term-adopt-fails', generation: 1 }))
Expand All @@ -179,7 +232,7 @@ describe('CodexLaunchPlanner', () => {
const firstRuntime = new FakeRuntime('ws://127.0.0.1:43014', 'thread-before-shutdown')
firstRuntime.shutdownBlocker = shutdownGate.promise
const runtimes = [firstRuntime]
const planner = new CodexLaunchPlanner(() => {
const planner = createPlanner(() => {
const runtime = runtimes.shift()
if (!runtime) throw new Error('unexpected runtime allocation')
return runtime as any
Expand All @@ -201,7 +254,7 @@ describe('CodexLaunchPlanner', () => {
const runtime = new FakeRuntime('ws://127.0.0.1:43018', 'thread-after-shutdown')
const readinessGate = deferred()
runtime.ensureReadyBlocker = readinessGate.promise
const planner = new CodexLaunchPlanner(() => runtime as any)
const planner = createPlanner(() => runtime as any)

const plan = planner.planCreate({ cwd: '/repo/in-flight' })
await vi.waitFor(() => expect(runtime.ensureReadyCalls).toBe(1))
Expand All @@ -220,7 +273,7 @@ describe('CodexLaunchPlanner', () => {
const runtime = new FakeRuntime('ws://127.0.0.1:43019', 'thread-adopt-after-shutdown')
const shutdownGate = deferred()
runtime.shutdownBlocker = shutdownGate.promise
const planner = new CodexLaunchPlanner(() => runtime as any)
const planner = createPlanner(() => runtime as any)

const plan = await planner.planCreate({ cwd: '/repo/adopt-after-shutdown' })
const shutdown = planner.shutdown()
Expand All @@ -234,10 +287,30 @@ describe('CodexLaunchPlanner', () => {
await shutdown
})

it('starts runtime teardown even when proxy close is still pending', async () => {
const runtime = new FakeRuntime('ws://127.0.0.1:43025', 'thread-slow-proxy-close')
const proxyCloseGate = deferred()
const proxies: FakeProxy[] = []
const planner = createPlanner(() => runtime as any, proxies)

const plan = await planner.planCreate({ cwd: '/repo/slow-proxy-close' })
proxies[0].closeBlocker = proxyCloseGate.promise

const shutdown = planner.shutdown()

await vi.waitFor(() => expect(proxies[0].closeCalls).toBe(1))
await vi.waitFor(() => expect(runtime.shutdownCalls).toBe(1))
await expect(plan.sidecar.adopt({ terminalId: 'term-slow-proxy-close', generation: 1 }))
.rejects.toThrow(/shutting down/i)

proxyCloseGate.resolve()
await shutdown
})

it('keeps failed unadopted sidecar teardown planner-owned and joinable by planner shutdown', async () => {
const runtime = new FakeRuntime('ws://127.0.0.1:43015', 'thread-teardown-fails')
runtime.shutdownError = new Error('verified runtime teardown failed')
const planner = new CodexLaunchPlanner(() => runtime as any)
const planner = createPlanner(() => runtime as any)

const plan = await planner.planCreate({ cwd: '/repo/unadopted' })

Expand All @@ -249,7 +322,7 @@ describe('CodexLaunchPlanner', () => {
it('retries a failed planner-owned sidecar teardown on a later shutdown join', async () => {
const runtime = new FakeRuntime('ws://127.0.0.1:43023', 'thread-teardown-retry')
runtime.shutdownError = new Error('transient metadata cleanup failure')
const planner = new CodexLaunchPlanner(() => runtime as any)
const planner = createPlanner(() => runtime as any)

const plan = await planner.planCreate({ cwd: '/repo/unadopted-retry' })

Expand All @@ -264,7 +337,7 @@ describe('CodexLaunchPlanner', () => {

it('blocks new plans behind a failed planner-owned sidecar teardown until retry succeeds', async () => {
const runtimes: FakeRuntime[] = []
const planner = new CodexLaunchPlanner(() => {
const planner = createPlanner(() => {
const index = runtimes.length + 1
const runtime = new FakeRuntime(`ws://127.0.0.1:${43030 + index}`, `thread-${index}`)
runtimes.push(runtime)
Expand Down Expand Up @@ -298,7 +371,7 @@ describe('CodexLaunchPlanner', () => {
const slowShutdown = deferred()
secondRuntime.shutdownBlocker = slowShutdown.promise
const runtimes = [firstRuntime, secondRuntime]
const planner = new CodexLaunchPlanner(() => runtimes.shift()! as any)
const planner = createPlanner(() => runtimes.shift()! as any)

await planner.planCreate({ cwd: '/repo/fast-fails' })
await planner.planCreate({ cwd: '/repo/slow-shutdown' })
Expand Down Expand Up @@ -326,7 +399,7 @@ describe('CodexLaunchPlanner', () => {
undefined,
[[], ['other-thread'], ['thread-ready']],
)
const planner = new CodexLaunchPlanner(() => runtime as any)
const planner = createPlanner(() => runtime as any)

const plan = await planner.planCreate({ resumeSessionId: 'thread-ready' })

Expand All @@ -336,7 +409,7 @@ describe('CodexLaunchPlanner', () => {

it('passes resume cwd to sidecar readiness', async () => {
const runtime = new FakeRuntime('ws://127.0.0.1:43024', 'thread-ready', undefined, [['thread-ready']])
const planner = new CodexLaunchPlanner(() => runtime as any)
const planner = createPlanner(() => runtime as any)

await planner.planCreate({ resumeSessionId: 'thread-ready', cwd: '/repo/resume' })

Expand Down
Loading