Skip to content

Commit 15ad5ab

Browse files
committed
fix: release asyncio.Lock before streaming to prevent orphan on client disconnect
Narrow the lock scope in ContextWebSocket.execute() so that the lock is only held during the prepare+send phase, not during result streaming. Previously, the lock was held for the entire generator lifetime including the _wait_for_result() streaming loop. When a client disconnected (e.g. SDK timeout), Starlette abandoned the generator while it was blocked at `await queue.get()`. The lock stayed held until the kernel finished internally, blocking all subsequent executions on the same context and causing cascading timeouts. The fix moves the streaming phase (Phase B) outside the `async with self._lock` block. This is safe because results are routed by unique message_id in _process_message() — no shared state is accessed during streaming. A try/finally ensures the execution entry is cleaned up even if the generator is abandoned. Fixes #213
1 parent 6d703a4 commit 15ad5ab

1 file changed

Lines changed: 15 additions & 4 deletions

File tree

template/server/messaging.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,14 @@ async def execute(
294294
if self._ws is None:
295295
raise Exception("WebSocket not connected")
296296

297+
message_id = str(uuid.uuid4())
298+
299+
# Phase A: prepare and send to kernel (under lock)
300+
# The lock serializes the prepare+send phase to protect shared state
301+
# (_global_env_vars init, _cleanup_task lifecycle, WebSocket sends).
302+
# It is released before streaming results so that a client disconnect
303+
# during streaming does not orphan the lock and block subsequent
304+
# executions. See https://github.com/e2b-dev/code-interpreter/issues/213
297305
async with self._lock:
298306
# Wait for any pending cleanup task to complete
299307
if self._cleanup_task and not self._cleanup_task.done():
@@ -327,7 +335,6 @@ async def execute(
327335
)
328336
complete_code = f"{indented_env_code}\n{complete_code}"
329337

330-
message_id = str(uuid.uuid4())
331338
execution = Execution()
332339
self._executions[message_id] = execution
333340

@@ -362,11 +369,15 @@ async def execute(
362369
)
363370
await execution.queue.put(UnexpectedEndOfExecution())
364371

365-
# Stream the results
372+
# Phase B: stream results (no lock held)
373+
# Results are routed by unique message_id in _process_message(),
374+
# so no shared state is accessed during streaming.
375+
try:
366376
async for item in self._wait_for_result(message_id):
367377
yield item
368-
369-
del self._executions[message_id]
378+
finally:
379+
# Clean up execution entry even if generator is abandoned
380+
self._executions.pop(message_id, None)
370381

371382
# Clean up env vars in a separate request after the main code has run
372383
if env_vars:

0 commit comments

Comments
 (0)