From 5b4a1d4ec7f57949f6f0a8b077dc09ff09c7a23d Mon Sep 17 00:00:00 2001 From: John Pollock Date: Sun, 7 Jun 2026 08:33:30 -0500 Subject: [PATCH 1/4] pyisolate 0.10.3rc1: per-channel tensor transport + modern-Python event loop Serialize tensors using the per-channel transport mode in the RPC pre-passes so a shared-memory TensorRef is never emitted onto a JSON channel. This fixes KeyError('data') in torch-free sealed/conda workers when an IsolationToolkit shared-memory serializer is co-resident in the host process. Avoid asyncio.get_event_loop() RuntimeError/DeprecationWarning on Python >=3.12 by reusing the running or installed event loop and creating one only as a last resort. Add regression tests for both fixes and bump version to 0.10.3rc1. --- pyisolate/_internal/model_serialization.py | 15 +++- pyisolate/_internal/rpc_protocol.py | 23 ++++- pyisolate/_internal/rpc_serialization.py | 14 ++- pyproject.toml | 2 +- tests/test_event_channel.py | 10 +-- tests/test_rpc_contract.py | 85 +++++++++++++++++++ tests/test_tensor_shared_memory.py | 36 ++++++++ tests/test_tensor_transport_mode_isolation.py | 66 ++++++++++++++ 8 files changed, 237 insertions(+), 14 deletions(-) create mode 100644 tests/test_tensor_shared_memory.py create mode 100644 tests/test_tensor_transport_mode_isolation.py diff --git a/pyisolate/_internal/model_serialization.py b/pyisolate/_internal/model_serialization.py index 0922b1d..dc261ef 100644 --- a/pyisolate/_internal/model_serialization.py +++ b/pyisolate/_internal/model_serialization.py @@ -39,10 +39,13 @@ def _serialize_for_isolation_impl( if isinstance(handle, remote_handle_type): return handle - serializer = registry.get_serializer(type_name) - if serializer is not None: - return serializer(data) - + # Handle torch tensors BEFORE the registry's mode-bound "Tensor" serializer so the + # per-channel transport mode (JSONSocketTransport._tensor_transport) decides the wire + # format -- not the process-global registry mode. Otherwise a host running a + # shared_memory (share_torch) extension alongside a json (sealed/conda) extension emits + # a shared-memory TensorRef onto the json channel, which a torch-free sealed worker + # cannot decode (KeyError 'data'). Returning the tensor here defers encoding to the + # transport, which already serializes per channel via serialize_tensor(mode=...). if torch_module is not None and isinstance(data, torch_module.Tensor): if data.is_cuda: if _cuda_ipc_enabled: @@ -50,6 +53,10 @@ def _serialize_for_isolation_impl( return data.cpu() return data + serializer = registry.get_serializer(type_name) + if serializer is not None: + return serializer(data) + if isinstance(data, dict): return { k: _serialize_for_isolation_impl( diff --git a/pyisolate/_internal/rpc_protocol.py b/pyisolate/_internal/rpc_protocol.py index 22c1548..c2e43f0 100644 --- a/pyisolate/_internal/rpc_protocol.py +++ b/pyisolate/_internal/rpc_protocol.py @@ -18,6 +18,7 @@ import queue import threading import uuid +import warnings from collections.abc import Callable from typing import ( TYPE_CHECKING, @@ -162,7 +163,27 @@ def __init__( self.lock = threading.Lock() self.pending: dict[int, RPCPendingRequest] = {} - self.default_loop = asyncio.get_event_loop() + # Acquire the loop without raising when constructed outside a running loop. + # Python >=3.10 deprecated and >=3.12 removed implicit main-thread event loop + # creation, so an eager asyncio.get_event_loop() raised here in sync construction + # paths. Preserve the historical get_event_loop() semantics: prefer the running + # loop, then the thread's installed loop (set via asyncio.set_event_loop, e.g. a + # synchronous caller that constructs the RPC before running its own loop), and only + # create+install a new loop as a last resort. update_event_loop() may replace it. + try: + self.default_loop = asyncio.get_running_loop() + except RuntimeError: + # No running loop. Reuse the thread's installed loop if present, else install a + # fresh one. asyncio.get_event_loop() returns an installed loop, and only emits + # the "no current event loop" DeprecationWarning (Python >=3.12) when none is + # installed -- we handle creation explicitly, so that one warning is silenced. + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + try: + self.default_loop = asyncio.get_event_loop() + except RuntimeError: + self.default_loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.default_loop) self._loop_lock = threading.Lock() # Protects default_loop updates self.callees: dict[str, object] = {} self.callbacks: dict[str, Any] = {} diff --git a/pyisolate/_internal/rpc_serialization.py b/pyisolate/_internal/rpc_serialization.py index 9e222c6..accc2e3 100644 --- a/pyisolate/_internal/rpc_serialization.py +++ b/pyisolate/_internal/rpc_serialization.py @@ -264,10 +264,14 @@ def _prepare_for_rpc_impl( torch_module: Any, ) -> Any: obj_type = type(obj) - serializer = _resolve_serializer_for_type(registry, obj_type) - if serializer is not None: - return serializer(obj) + # Handle torch tensors BEFORE the registry's mode-bound "Tensor" serializer so the + # per-channel transport mode (JSONSocketTransport._tensor_transport) decides the wire + # format -- not the process-global registry mode. Otherwise a host running a + # shared_memory (share_torch) extension alongside a json (sealed/conda) extension emits + # a shared-memory TensorRef onto the json channel, which a torch-free sealed worker + # cannot decode (KeyError 'data'). Returning the tensor here defers encoding to the + # transport, whose _json_default serializes per channel via serialize_tensor(mode=...). if torch_module is not None and isinstance(obj, torch_module.Tensor): if obj.is_cuda: if os.environ.get("PYISOLATE_ENABLE_CUDA_IPC") == "1": @@ -277,6 +281,10 @@ def _prepare_for_rpc_impl( return obj.cpu() return obj + serializer = _resolve_serializer_for_type(registry, obj_type) + if serializer is not None: + return serializer(obj) + if isinstance(obj, dict): return { k: _prepare_for_rpc_impl(v, registry=registry, torch_module=torch_module) for k, v in obj.items() diff --git a/pyproject.toml b/pyproject.toml index 91c1e49..e2ba1a8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "pyisolate" -version = "0.10.2" +version = "0.10.3rc1" description = "A Python library for dividing execution across multiple virtual environments" readme = "README.md" requires-python = ">=3.10" diff --git a/tests/test_event_channel.py b/tests/test_event_channel.py index 30c1dc4..2a22ccb 100644 --- a/tests/test_event_channel.py +++ b/tests/test_event_channel.py @@ -27,7 +27,7 @@ def handler(payload: Any) -> None: received.append(payload) bridge.register_handler("progress", handler) - asyncio.get_event_loop().run_until_complete(bridge.dispatch("progress", {"value": 5, "total": 10})) + asyncio.run(bridge.dispatch("progress", {"value": 5, "total": 10})) assert len(received) == 1 assert received[0] == {"value": 5, "total": 10} @@ -37,7 +37,7 @@ def test_emit_unregistered_event_raises(self) -> None: bridge = _EventBridge() with pytest.raises(ValueError, match="No handler registered for event 'unknown_event'"): - asyncio.get_event_loop().run_until_complete(bridge.dispatch("unknown_event", {})) + asyncio.run(bridge.dispatch("unknown_event", {})) def test_emit_event_rejects_non_json_payload(self) -> None: """emit_event with non-JSON-serializable payload raises immediately.""" @@ -62,7 +62,7 @@ async def async_handler(payload: Any) -> None: received.append(payload) bridge.register_handler("test", async_handler) - asyncio.get_event_loop().run_until_complete(bridge.dispatch("test", {"key": "value"})) + asyncio.run(bridge.dispatch("test", {"key": "value"})) assert received == [{"key": "value"}] @@ -75,8 +75,8 @@ def test_multiple_events_independent(self) -> None: bridge.register_handler("progress", lambda p: progress_calls.append(p)) bridge.register_handler("preview", lambda p: preview_calls.append(p)) - asyncio.get_event_loop().run_until_complete(bridge.dispatch("progress", {"value": 1})) - asyncio.get_event_loop().run_until_complete(bridge.dispatch("preview", {"image": "data"})) + asyncio.run(bridge.dispatch("progress", {"value": 1})) + asyncio.run(bridge.dispatch("preview", {"image": "data"})) assert progress_calls == [{"value": 1}] assert preview_calls == [{"image": "data"}] diff --git a/tests/test_rpc_contract.py b/tests/test_rpc_contract.py index 091bc4a..d2c092f 100644 --- a/tests/test_rpc_contract.py +++ b/tests/test_rpc_contract.py @@ -136,6 +136,91 @@ def test_singleton_survives_loop_recreation(self) -> None: elif not loop1.is_closed(): loop1.close() + def test_asyncrpc_constructs_without_current_event_loop(self) -> None: + """AsyncRPC must construct when no current event loop exists. + + The host launches extensions from a synchronous path (host._launch_with_uds), + constructing AsyncRPC outside any running loop. Python >=3.12 removed implicit + main-thread loop creation, so an eager asyncio.get_event_loop() in __init__ + raised "There is no current event loop". This guards that regression. + """ + import queue + + from pyisolate._internal.rpc_protocol import AsyncRPC + + try: + previous_loop = asyncio.get_event_loop_policy().get_event_loop() + except RuntimeError: + previous_loop = None + + asyncio.set_event_loop(None) + rpc = None + try: + rpc = AsyncRPC(recv_queue=cast(Any, queue.Queue()), send_queue=cast(Any, queue.Queue())) + assert isinstance(rpc.default_loop, asyncio.AbstractEventLoop) + assert not rpc.default_loop.is_closed() + finally: + created = rpc.default_loop if rpc is not None else None + asyncio.set_event_loop(previous_loop) + if created is not None and created is not previous_loop: + created.close() + + def test_asyncrpc_reuses_preset_thread_loop(self) -> None: + """AsyncRPC must adopt the thread's installed (set-but-not-running) loop. + + A synchronous caller may create a loop, install it via asyncio.set_event_loop(), + construct AsyncRPC, then drive that loop. __init__ must adopt the installed loop + (matching historical asyncio.get_event_loop() behavior) instead of creating a + separate loop that rpc.run()/dispatch would schedule on but nobody runs. + """ + import queue + + from pyisolate._internal.rpc_protocol import AsyncRPC + + try: + previous_loop = asyncio.get_event_loop_policy().get_event_loop() + except RuntimeError: + previous_loop = None + + installed = asyncio.new_event_loop() + asyncio.set_event_loop(installed) + try: + rpc = AsyncRPC(recv_queue=cast(Any, queue.Queue()), send_queue=cast(Any, queue.Queue())) + assert rpc.default_loop is installed + finally: + asyncio.set_event_loop(previous_loop) + installed.close() + + def test_asyncrpc_construction_emits_no_deprecation_warning(self) -> None: + """AsyncRPC construction must not leak a 'no current event loop' DeprecationWarning. + + The fix for the >=3.12 get_event_loop() crash must not itself emit the very + deprecation it works around. Treats DeprecationWarning as an error while + constructing with no installed loop. + """ + import queue + import warnings + + from pyisolate._internal.rpc_protocol import AsyncRPC + + try: + previous_loop = asyncio.get_event_loop_policy().get_event_loop() + except RuntimeError: + previous_loop = None + + asyncio.set_event_loop(None) + rpc = None + try: + with warnings.catch_warnings(): + warnings.simplefilter("error", DeprecationWarning) + rpc = AsyncRPC(recv_queue=cast(Any, queue.Queue()), send_queue=cast(Any, queue.Queue())) + assert isinstance(rpc.default_loop, asyncio.AbstractEventLoop) + finally: + created = rpc.default_loop if rpc is not None else None + asyncio.set_event_loop(previous_loop) + if created is not None and created is not previous_loop: + created.close() + def test_singleton_data_persists_across_loops(self) -> None: """Data stored in singleton persists across event loops.""" try: diff --git a/tests/test_tensor_shared_memory.py b/tests/test_tensor_shared_memory.py new file mode 100644 index 0000000..e25135a --- /dev/null +++ b/tests/test_tensor_shared_memory.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +import pytest + +torch = pytest.importorskip("torch") + +from pyisolate._internal.tensor_serializer import ( # noqa: E402 + _reset_shm_check, + deserialize_tensor, + serialize_tensor, +) + + +def test_cpu_torch_share_roundtrip_is_zero_copy() -> None: + """torch_share CPU transport must stay shared-memory, never a value copy. + + Runs single-process so it executes on Windows, where the multi-process + torch_share RPC tests are skipped (extension loading needs Unix sockets). + Guards the regression where a /dev/shm gate degrades CPU sharing to a + file-based value copy: a copy still passes ``torch.equal`` but breaks the + shared-storage contract that callers depend on for zero-copy transfer. + """ + _reset_shm_check() + original = torch.arange(25, dtype=torch.float32).reshape(5, 5) + + payload = serialize_tensor(original, mode="shared_memory") + + assert payload["__type__"] == "TensorRef", "CPU tensor degraded to a value copy" + assert payload["device"] == "cpu" + assert payload["strategy"] in ("file_system", "file_system_borrowed") + + rebuilt = deserialize_tensor(payload, mode="shared_memory") + assert torch.equal(rebuilt, original) + + original[0, 0] = 999.0 + assert float(rebuilt[0, 0]) == 999.0, "receiver did not observe sender mutation" diff --git a/tests/test_tensor_transport_mode_isolation.py b/tests/test_tensor_transport_mode_isolation.py new file mode 100644 index 0000000..86f4bd4 --- /dev/null +++ b/tests/test_tensor_transport_mode_isolation.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +if TYPE_CHECKING: + from pyisolate._internal.serialization_registry import SerializerRegistry + + +def _shared_memory_registry() -> SerializerRegistry: + """A registry whose Tensor serializer is pinned to shared_memory, simulating a + co-resident share_torch extension that registered last on the process-global + SerializerRegistry singleton.""" + from pyisolate._internal.serialization_registry import SerializerRegistry + from pyisolate._internal.tensor_serializer import register_tensor_serializer + + registry = SerializerRegistry() + register_tensor_serializer(registry, mode="shared_memory") + return registry + + +def test_serialize_for_isolation_defers_tensor_to_per_channel_transport() -> None: + """serialize_for_isolation must not pre-encode torch tensors via the registry's + mode-bound "Tensor" serializer; the tensor must be deferred so the per-channel + transport decides the wire format. + + Regression: a host running a shared_memory (share_torch) extension alongside a json + (sealed/conda) extension pinned the global "Tensor" serializer to shared_memory + (last-writer-wins). serialize_for_isolation then emitted a shared-memory TensorRef + onto the json channel; the torch-free sealed worker raised KeyError('data') decoding + it and its RPC recv thread died ("Socket closed"). + """ + torch = pytest.importorskip("torch") + + from pyisolate._internal.model_serialization import _serialize_for_isolation_impl + from pyisolate._internal.remote_handle import RemoteObjectHandle + + out = _serialize_for_isolation_impl( + torch.zeros(2, 3), + registry=_shared_memory_registry(), + torch_module=torch, + remote_handle_type=RemoteObjectHandle, + ) + + assert isinstance(out, torch.Tensor) + assert not (isinstance(out, dict) and out.get("__type__") == "TensorRef") + + +def test_prepare_for_rpc_defers_tensor_to_per_channel_transport() -> None: + """_prepare_for_rpc_impl (the RPC argument pre-pass) must likewise defer torch tensors + instead of pre-encoding them with the global registry mode. This is the path that + serializes execute_node arguments; the per-channel transport must choose the format. + """ + torch = pytest.importorskip("torch") + + from pyisolate._internal.rpc_serialization import _prepare_for_rpc_impl + + out = _prepare_for_rpc_impl( + torch.zeros(2, 3), + registry=_shared_memory_registry(), + torch_module=torch, + ) + + assert isinstance(out, torch.Tensor) + assert not (isinstance(out, dict) and out.get("__type__") == "TensorRef") From c6a1bba021c8491e6c717c653685788f3ffd236c Mon Sep 17 00:00:00 2001 From: John Pollock Date: Sun, 7 Jun 2026 09:24:54 -0500 Subject: [PATCH 2/4] fix: bind RPC dispatch loop to the running loop in run() AsyncRPC may be constructed before the event loop that will service it exists (the synchronous host launch path), so __init__ can only install a placeholder loop on Python >=3.12. _recv_thread dispatches inbound calls via run_coroutine_threadsafe(default_loop), which executes only on a running loop -- a never-run placeholder would hang every inbound child-to-host call or event. run() now adopts the running loop before starting the dispatch threads, making the actually-running loop the authoritative dispatch target (matching the manual rebind tests already perform and the update_event_loop contract). Adds a regression test. --- pyisolate/_internal/rpc_protocol.py | 19 ++++++++++++- tests/test_rpc_contract.py | 43 +++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/pyisolate/_internal/rpc_protocol.py b/pyisolate/_internal/rpc_protocol.py index c2e43f0..9b515ae 100644 --- a/pyisolate/_internal/rpc_protocol.py +++ b/pyisolate/_internal/rpc_protocol.py @@ -169,7 +169,8 @@ def __init__( # paths. Preserve the historical get_event_loop() semantics: prefer the running # loop, then the thread's installed loop (set via asyncio.set_event_loop, e.g. a # synchronous caller that constructs the RPC before running its own loop), and only - # create+install a new loop as a last resort. update_event_loop() may replace it. + # create+install a new loop as a last resort. update_event_loop() may replace it, + # and run() rebinds to the running loop when started inside one. try: self.default_loop = asyncio.get_running_loop() except RuntimeError: @@ -370,6 +371,22 @@ def _fail_pending_requests(self, error_msg: str) -> None: ) def run(self) -> None: + # Bind dispatch to the loop that actually services it. _recv_thread + # dispatches inbound calls via run_coroutine_threadsafe(default_loop), + # which executes only on a *running* loop. run() starts those threads, so + # when invoked from within a running loop -- the supported pattern + # (ensure_process_started()/run_until_stopped() under asyncio.run) -- that + # running loop is the authoritative dispatch target and supersedes any loop + # __init__ acquired before the loop existed. Mirrors _get_valid_loop()'s + # running-loop capture; a fully synchronous caller with no running loop must + # call update_event_loop() once its loop starts. + try: + running_loop = asyncio.get_running_loop() + except RuntimeError: + running_loop = None + if running_loop is not None: + with self._loop_lock: + self.default_loop = running_loop self.blocking_future = self.default_loop.create_future() self._threads = [ threading.Thread(target=self._recv_thread, daemon=True), diff --git a/tests/test_rpc_contract.py b/tests/test_rpc_contract.py index d2c092f..ff40ecc 100644 --- a/tests/test_rpc_contract.py +++ b/tests/test_rpc_contract.py @@ -221,6 +221,49 @@ def test_asyncrpc_construction_emits_no_deprecation_warning(self) -> None: if created is not None and created is not previous_loop: created.close() + def test_run_rebinds_dispatch_to_running_loop(self) -> None: + """run() binds default_loop to the running loop that services dispatch. + + AsyncRPC may be constructed before the loop that will run it exists (the + synchronous host launch path), so __init__ can only install a placeholder + loop. _recv_thread dispatches inbound calls via + run_coroutine_threadsafe(default_loop), which executes only on a *running* + loop; a placeholder nobody runs would hang every inbound child->host call. + run() therefore adopts the running loop before starting the dispatch + threads. Guards the regression where a never-run fallback loop is used as + the dispatch target on the Python >=3.12 sync-host path. + """ + import queue + + from pyisolate._internal.rpc_protocol import AsyncRPC + + try: + previous_loop = asyncio.get_event_loop_policy().get_event_loop() + except RuntimeError: + previous_loop = None + + # Construct with no running/installed loop so default_loop is a placeholder + # distinct from the loop run() will later execute under. + asyncio.set_event_loop(None) + recv_q: queue.Queue[Any] = queue.Queue() + recv_q.put(None) # makes _recv_thread exit cleanly right after run() + rpc = AsyncRPC(recv_queue=cast(Any, recv_q), send_queue=cast(Any, queue.Queue())) + placeholder = rpc.default_loop + + async def _run_inside_loop() -> asyncio.AbstractEventLoop: + rpc.run() + return asyncio.get_running_loop() + + try: + running = asyncio.run(_run_inside_loop()) + assert rpc.default_loop is running + assert rpc.default_loop is not placeholder + finally: + rpc.shutdown() + asyncio.set_event_loop(previous_loop) + if not placeholder.is_closed(): + placeholder.close() + def test_singleton_data_persists_across_loops(self) -> None: """Data stored in singleton persists across event loops.""" try: From 405d765a143086928aaa3d390d8de72b44053eef Mon Sep 17 00:00:00 2001 From: John Pollock Date: Sun, 7 Jun 2026 09:32:56 -0500 Subject: [PATCH 3/4] fix: evaluate CUDA IPC env at runtime in serialize_for_isolation model_serialization captured _cuda_ipc_enabled at import time, but the host sets PYISOLATE_ENABLE_CUDA_IPC during _initialize_process -- after this module is imported. With the tensor branch reordered ahead of the registry serializer, that stale flag made configured CUDA tensors fall back to obj.cpu() instead of deferring the on-device tensor to the per-channel CUDA IPC transport, silently dropping CUDA transport. Read the env at call time, matching rpc_serialization._prepare_for_rpc_impl, and drop the now-unused module flag. Adds a runtime-evaluation regression test. --- pyisolate/_internal/model_serialization.py | 9 ++-- tests/test_tensor_transport_mode_isolation.py | 44 +++++++++++++++++++ 2 files changed, 49 insertions(+), 4 deletions(-) diff --git a/pyisolate/_internal/model_serialization.py b/pyisolate/_internal/model_serialization.py index dc261ef..2ac3b75 100644 --- a/pyisolate/_internal/model_serialization.py +++ b/pyisolate/_internal/model_serialization.py @@ -12,14 +12,11 @@ import logging import os -import sys from typing import TYPE_CHECKING, Any from .serialization_registry import SerializerRegistry from .torch_gate import get_torch_optional -_cuda_ipc_enabled = sys.platform == "linux" and os.environ.get("PYISOLATE_ENABLE_CUDA_IPC") == "1" - if TYPE_CHECKING: # pragma: no cover - typing aids pass # type: ignore[import-not-found] @@ -48,7 +45,11 @@ def _serialize_for_isolation_impl( # transport, which already serializes per channel via serialize_tensor(mode=...). if torch_module is not None and isinstance(data, torch_module.Tensor): if data.is_cuda: - if _cuda_ipc_enabled: + # Read the CUDA IPC env at call time, not import time: the host sets + # PYISOLATE_ENABLE_CUDA_IPC during _initialize_process, after this module is + # imported, so an import-time snapshot would be stale and downgrade configured + # CUDA tensors to CPU. Matches rpc_serialization._prepare_for_rpc_impl. + if os.environ.get("PYISOLATE_ENABLE_CUDA_IPC") == "1": return data return data.cpu() return data diff --git a/tests/test_tensor_transport_mode_isolation.py b/tests/test_tensor_transport_mode_isolation.py index 86f4bd4..92138b8 100644 --- a/tests/test_tensor_transport_mode_isolation.py +++ b/tests/test_tensor_transport_mode_isolation.py @@ -47,6 +47,50 @@ def test_serialize_for_isolation_defers_tensor_to_per_channel_transport() -> Non assert not (isinstance(out, dict) and out.get("__type__") == "TensorRef") +def test_serialize_for_isolation_reads_cuda_ipc_env_at_runtime(monkeypatch: pytest.MonkeyPatch) -> None: + """The CUDA-IPC decision in serialize_for_isolation must be evaluated at call time, + not captured at import. + + Regression: a module-level _cuda_ipc_enabled snapshot, consulted before the registry + serializer, was stale because the host sets PYISOLATE_ENABLE_CUDA_IPC during + _initialize_process -- after this module is imported. With CUDA IPC configured it + therefore sent obj.cpu() instead of deferring the on-device tensor to the per-channel + CUDA IPC transport, silently losing CUDA transport. A fake on-device tensor isolates + the env decision without requiring a GPU. + """ + from pyisolate._internal.model_serialization import _serialize_for_isolation_impl + from pyisolate._internal.remote_handle import RemoteObjectHandle + from pyisolate._internal.serialization_registry import SerializerRegistry + + class FakeCudaTensor: + is_cuda = True + + def cpu(self) -> str: + return "DOWNGRADED_TO_CPU" + + class FakeTorch: + Tensor = FakeCudaTensor + + tensor = FakeCudaTensor() + registry = SerializerRegistry() + + def _serialize() -> object: + return _serialize_for_isolation_impl( + tensor, + registry=registry, + torch_module=FakeTorch, + remote_handle_type=RemoteObjectHandle, + ) + + # CUDA IPC configured at runtime -> defer the on-device tensor (do NOT downgrade). + monkeypatch.setenv("PYISOLATE_ENABLE_CUDA_IPC", "1") + assert _serialize() is tensor + + # CUDA IPC not configured -> fall back to CPU. + monkeypatch.delenv("PYISOLATE_ENABLE_CUDA_IPC", raising=False) + assert _serialize() == "DOWNGRADED_TO_CPU" + + def test_prepare_for_rpc_defers_tensor_to_per_channel_transport() -> None: """_prepare_for_rpc_impl (the RPC argument pre-pass) must likewise defer torch tensors instead of pre-encoding them with the global registry mode. This is the path that From 51493af7486b9ecd51006ce0786cecd54953d7d6 Mon Sep 17 00:00:00 2001 From: John Pollock Date: Sun, 7 Jun 2026 09:42:05 -0500 Subject: [PATCH 4/4] fix: close self-created fallback event loop when superseded When AsyncRPC.__init__ creates and installs a new event loop as a last resort (no running or installed loop), a later run()/update_event_loop() rebind to a real running loop left that created loop open and unused -- a leaked event loop that emits ResourceWarning: unclosed event loop in sync-construction paths that subsequently use asyncio.run(). Track ownership of the self-created loop and close it on supersession, in both run() and update_event_loop(). Strengthens the run()-rebind regression test to assert the placeholder loop is closed. --- pyisolate/_internal/rpc_protocol.py | 22 ++++++++++++++++++++++ tests/test_rpc_contract.py | 2 ++ 2 files changed, 24 insertions(+) diff --git a/pyisolate/_internal/rpc_protocol.py b/pyisolate/_internal/rpc_protocol.py index 9b515ae..bfebafa 100644 --- a/pyisolate/_internal/rpc_protocol.py +++ b/pyisolate/_internal/rpc_protocol.py @@ -163,6 +163,10 @@ def __init__( self.lock = threading.Lock() self.pending: dict[int, RPCPendingRequest] = {} + # Set only when the last-resort branch below creates a loop we own, so run()/ + # update_event_loop() can close it if a real loop later supersedes it (avoids a + # leaked, never-run event loop -> ResourceWarning: unclosed event loop). + self._created_default_loop: asyncio.AbstractEventLoop | None = None # Acquire the loop without raising when constructed outside a running loop. # Python >=3.10 deprecated and >=3.12 removed implicit main-thread event loop # creation, so an eager asyncio.get_event_loop() raised here in sync construction @@ -185,6 +189,7 @@ def __init__( except RuntimeError: self.default_loop = asyncio.new_event_loop() asyncio.set_event_loop(self.default_loop) + self._created_default_loop = self.default_loop self._loop_lock = threading.Lock() # Protects default_loop updates self.callees: dict[str, object] = {} self.callbacks: dict[str, Any] = {} @@ -231,9 +236,25 @@ def update_event_loop(self, loop: asyncio.AbstractEventLoop | None = None) -> No with self._loop_lock: if loop is None: loop = asyncio.get_event_loop() + self._release_created_loop_if_superseded(loop) self.default_loop = loop logger.debug(f"RPC {self.id}: Updated default_loop to {loop}") + def _release_created_loop_if_superseded(self, new_loop: asyncio.AbstractEventLoop) -> None: + """Close the fallback loop __init__ created if a different loop supersedes it. + + Caller must hold self._loop_lock. __init__ creates and installs a new event loop + only as a last resort (no running or installed loop). When run() or + update_event_loop() later binds dispatch to a different, real loop, that created + loop would otherwise be left open and unused -- a leaked event loop that emits + ResourceWarning. We own it, so we close it on supersession. + """ + created = self._created_default_loop + if created is not None and created is not new_loop: + if not created.is_closed(): + created.close() + self._created_default_loop = None + def register_callback(self, func: Any) -> str: callback_id = str(uuid.uuid4()) with self.lock: @@ -386,6 +407,7 @@ def run(self) -> None: running_loop = None if running_loop is not None: with self._loop_lock: + self._release_created_loop_if_superseded(running_loop) self.default_loop = running_loop self.blocking_future = self.default_loop.create_future() self._threads = [ diff --git a/tests/test_rpc_contract.py b/tests/test_rpc_contract.py index ff40ecc..dfc30c2 100644 --- a/tests/test_rpc_contract.py +++ b/tests/test_rpc_contract.py @@ -258,6 +258,8 @@ async def _run_inside_loop() -> asyncio.AbstractEventLoop: running = asyncio.run(_run_inside_loop()) assert rpc.default_loop is running assert rpc.default_loop is not placeholder + # The created fallback loop is closed when superseded -- no leaked loop. + assert placeholder.is_closed() finally: rpc.shutdown() asyncio.set_event_loop(previous_loop)