From d773fbc42fd30589062377c88fc804dced10dc4c Mon Sep 17 00:00:00 2001 From: John Pollock Date: Sun, 7 Jun 2026 14:21:45 -0500 Subject: [PATCH] pyisolate 0.10.3rc1: per-channel tensor transport + modern-Python event loop Serialize torch tensors before the registry's mode-bound "Tensor" serializer in both serialization pre-passes so a shared-memory TensorRef is never emitted onto a JSON channel. This fixes KeyError('data') in torch-free sealed/conda workers when a shared_memory (share_torch) extension is co-resident with a json (sealed/conda) extension. CUDA-IPC routing is evaluated at call time so configured CUDA tensors are not downgraded. Acquire the event loop in AsyncRPC.__init__ without raising on Python >=3.12: prefer the running loop, then the thread's installed loop, and create one only as a last resort. Fixes RuntimeError('There is no current event loop') in the synchronous host launch path. Add regression tests for both fixes and bump version to 0.10.3rc1 (pyproject and __init__). --- pyisolate/__init__.py | 2 +- pyisolate/_internal/model_serialization.py | 24 ++-- pyisolate/_internal/rpc_protocol.py | 23 +++- pyisolate/_internal/rpc_serialization.py | 14 ++- pyproject.toml | 2 +- tests/test_event_channel.py | 10 +- tests/test_rpc_contract.py | 85 ++++++++++++++ tests/test_tensor_shared_memory.py | 36 ++++++ tests/test_tensor_transport_mode_isolation.py | 110 ++++++++++++++++++ 9 files changed, 287 insertions(+), 19 deletions(-) create mode 100644 tests/test_tensor_shared_memory.py create mode 100644 tests/test_tensor_transport_mode_isolation.py diff --git a/pyisolate/__init__.py b/pyisolate/__init__.py index dfda991..b73e4fe 100644 --- a/pyisolate/__init__.py +++ b/pyisolate/__init__.py @@ -44,7 +44,7 @@ if TYPE_CHECKING: from .interfaces import IsolationAdapter -__version__ = "0.10.2" +__version__ = "0.10.3rc1" __all__ = [ "ExtensionBase", diff --git a/pyisolate/_internal/model_serialization.py b/pyisolate/_internal/model_serialization.py index 0922b1d..2ac3b75 100644 --- a/pyisolate/_internal/model_serialization.py +++ b/pyisolate/_internal/model_serialization.py @@ -12,14 +12,11 @@ import logging import os -import sys from typing import TYPE_CHECKING, Any from .serialization_registry import SerializerRegistry from .torch_gate import get_torch_optional -_cuda_ipc_enabled = sys.platform == "linux" and os.environ.get("PYISOLATE_ENABLE_CUDA_IPC") == "1" - if TYPE_CHECKING: # pragma: no cover - typing aids pass # type: ignore[import-not-found] @@ -39,17 +36,28 @@ def _serialize_for_isolation_impl( if isinstance(handle, remote_handle_type): return handle - serializer = registry.get_serializer(type_name) - if serializer is not None: - return serializer(data) - + # Handle torch tensors BEFORE the registry's mode-bound "Tensor" serializer so the + # per-channel transport mode (JSONSocketTransport._tensor_transport) decides the wire + # format -- not the process-global registry mode. Otherwise a host running a + # shared_memory (share_torch) extension alongside a json (sealed/conda) extension emits + # a shared-memory TensorRef onto the json channel, which a torch-free sealed worker + # cannot decode (KeyError 'data'). Returning the tensor here defers encoding to the + # transport, which already serializes per channel via serialize_tensor(mode=...). if torch_module is not None and isinstance(data, torch_module.Tensor): if data.is_cuda: - if _cuda_ipc_enabled: + # Read the CUDA IPC env at call time, not import time: the host sets + # PYISOLATE_ENABLE_CUDA_IPC during _initialize_process, after this module is + # imported, so an import-time snapshot would be stale and downgrade configured + # CUDA tensors to CPU. Matches rpc_serialization._prepare_for_rpc_impl. + if os.environ.get("PYISOLATE_ENABLE_CUDA_IPC") == "1": return data return data.cpu() return data + serializer = registry.get_serializer(type_name) + if serializer is not None: + return serializer(data) + if isinstance(data, dict): return { k: _serialize_for_isolation_impl( diff --git a/pyisolate/_internal/rpc_protocol.py b/pyisolate/_internal/rpc_protocol.py index 22c1548..c2e43f0 100644 --- a/pyisolate/_internal/rpc_protocol.py +++ b/pyisolate/_internal/rpc_protocol.py @@ -18,6 +18,7 @@ import queue import threading import uuid +import warnings from collections.abc import Callable from typing import ( TYPE_CHECKING, @@ -162,7 +163,27 @@ def __init__( self.lock = threading.Lock() self.pending: dict[int, RPCPendingRequest] = {} - self.default_loop = asyncio.get_event_loop() + # Acquire the loop without raising when constructed outside a running loop. + # Python >=3.10 deprecated and >=3.12 removed implicit main-thread event loop + # creation, so an eager asyncio.get_event_loop() raised here in sync construction + # paths. Preserve the historical get_event_loop() semantics: prefer the running + # loop, then the thread's installed loop (set via asyncio.set_event_loop, e.g. a + # synchronous caller that constructs the RPC before running its own loop), and only + # create+install a new loop as a last resort. update_event_loop() may replace it. + try: + self.default_loop = asyncio.get_running_loop() + except RuntimeError: + # No running loop. Reuse the thread's installed loop if present, else install a + # fresh one. asyncio.get_event_loop() returns an installed loop, and only emits + # the "no current event loop" DeprecationWarning (Python >=3.12) when none is + # installed -- we handle creation explicitly, so that one warning is silenced. + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + try: + self.default_loop = asyncio.get_event_loop() + except RuntimeError: + self.default_loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.default_loop) self._loop_lock = threading.Lock() # Protects default_loop updates self.callees: dict[str, object] = {} self.callbacks: dict[str, Any] = {} diff --git a/pyisolate/_internal/rpc_serialization.py b/pyisolate/_internal/rpc_serialization.py index 9e222c6..accc2e3 100644 --- a/pyisolate/_internal/rpc_serialization.py +++ b/pyisolate/_internal/rpc_serialization.py @@ -264,10 +264,14 @@ def _prepare_for_rpc_impl( torch_module: Any, ) -> Any: obj_type = type(obj) - serializer = _resolve_serializer_for_type(registry, obj_type) - if serializer is not None: - return serializer(obj) + # Handle torch tensors BEFORE the registry's mode-bound "Tensor" serializer so the + # per-channel transport mode (JSONSocketTransport._tensor_transport) decides the wire + # format -- not the process-global registry mode. Otherwise a host running a + # shared_memory (share_torch) extension alongside a json (sealed/conda) extension emits + # a shared-memory TensorRef onto the json channel, which a torch-free sealed worker + # cannot decode (KeyError 'data'). Returning the tensor here defers encoding to the + # transport, whose _json_default serializes per channel via serialize_tensor(mode=...). if torch_module is not None and isinstance(obj, torch_module.Tensor): if obj.is_cuda: if os.environ.get("PYISOLATE_ENABLE_CUDA_IPC") == "1": @@ -277,6 +281,10 @@ def _prepare_for_rpc_impl( return obj.cpu() return obj + serializer = _resolve_serializer_for_type(registry, obj_type) + if serializer is not None: + return serializer(obj) + if isinstance(obj, dict): return { k: _prepare_for_rpc_impl(v, registry=registry, torch_module=torch_module) for k, v in obj.items() diff --git a/pyproject.toml b/pyproject.toml index 91c1e49..e2ba1a8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "pyisolate" -version = "0.10.2" +version = "0.10.3rc1" description = "A Python library for dividing execution across multiple virtual environments" readme = "README.md" requires-python = ">=3.10" diff --git a/tests/test_event_channel.py b/tests/test_event_channel.py index 30c1dc4..2a22ccb 100644 --- a/tests/test_event_channel.py +++ b/tests/test_event_channel.py @@ -27,7 +27,7 @@ def handler(payload: Any) -> None: received.append(payload) bridge.register_handler("progress", handler) - asyncio.get_event_loop().run_until_complete(bridge.dispatch("progress", {"value": 5, "total": 10})) + asyncio.run(bridge.dispatch("progress", {"value": 5, "total": 10})) assert len(received) == 1 assert received[0] == {"value": 5, "total": 10} @@ -37,7 +37,7 @@ def test_emit_unregistered_event_raises(self) -> None: bridge = _EventBridge() with pytest.raises(ValueError, match="No handler registered for event 'unknown_event'"): - asyncio.get_event_loop().run_until_complete(bridge.dispatch("unknown_event", {})) + asyncio.run(bridge.dispatch("unknown_event", {})) def test_emit_event_rejects_non_json_payload(self) -> None: """emit_event with non-JSON-serializable payload raises immediately.""" @@ -62,7 +62,7 @@ async def async_handler(payload: Any) -> None: received.append(payload) bridge.register_handler("test", async_handler) - asyncio.get_event_loop().run_until_complete(bridge.dispatch("test", {"key": "value"})) + asyncio.run(bridge.dispatch("test", {"key": "value"})) assert received == [{"key": "value"}] @@ -75,8 +75,8 @@ def test_multiple_events_independent(self) -> None: bridge.register_handler("progress", lambda p: progress_calls.append(p)) bridge.register_handler("preview", lambda p: preview_calls.append(p)) - asyncio.get_event_loop().run_until_complete(bridge.dispatch("progress", {"value": 1})) - asyncio.get_event_loop().run_until_complete(bridge.dispatch("preview", {"image": "data"})) + asyncio.run(bridge.dispatch("progress", {"value": 1})) + asyncio.run(bridge.dispatch("preview", {"image": "data"})) assert progress_calls == [{"value": 1}] assert preview_calls == [{"image": "data"}] diff --git a/tests/test_rpc_contract.py b/tests/test_rpc_contract.py index 091bc4a..d2c092f 100644 --- a/tests/test_rpc_contract.py +++ b/tests/test_rpc_contract.py @@ -136,6 +136,91 @@ def test_singleton_survives_loop_recreation(self) -> None: elif not loop1.is_closed(): loop1.close() + def test_asyncrpc_constructs_without_current_event_loop(self) -> None: + """AsyncRPC must construct when no current event loop exists. + + The host launches extensions from a synchronous path (host._launch_with_uds), + constructing AsyncRPC outside any running loop. Python >=3.12 removed implicit + main-thread loop creation, so an eager asyncio.get_event_loop() in __init__ + raised "There is no current event loop". This guards that regression. + """ + import queue + + from pyisolate._internal.rpc_protocol import AsyncRPC + + try: + previous_loop = asyncio.get_event_loop_policy().get_event_loop() + except RuntimeError: + previous_loop = None + + asyncio.set_event_loop(None) + rpc = None + try: + rpc = AsyncRPC(recv_queue=cast(Any, queue.Queue()), send_queue=cast(Any, queue.Queue())) + assert isinstance(rpc.default_loop, asyncio.AbstractEventLoop) + assert not rpc.default_loop.is_closed() + finally: + created = rpc.default_loop if rpc is not None else None + asyncio.set_event_loop(previous_loop) + if created is not None and created is not previous_loop: + created.close() + + def test_asyncrpc_reuses_preset_thread_loop(self) -> None: + """AsyncRPC must adopt the thread's installed (set-but-not-running) loop. + + A synchronous caller may create a loop, install it via asyncio.set_event_loop(), + construct AsyncRPC, then drive that loop. __init__ must adopt the installed loop + (matching historical asyncio.get_event_loop() behavior) instead of creating a + separate loop that rpc.run()/dispatch would schedule on but nobody runs. + """ + import queue + + from pyisolate._internal.rpc_protocol import AsyncRPC + + try: + previous_loop = asyncio.get_event_loop_policy().get_event_loop() + except RuntimeError: + previous_loop = None + + installed = asyncio.new_event_loop() + asyncio.set_event_loop(installed) + try: + rpc = AsyncRPC(recv_queue=cast(Any, queue.Queue()), send_queue=cast(Any, queue.Queue())) + assert rpc.default_loop is installed + finally: + asyncio.set_event_loop(previous_loop) + installed.close() + + def test_asyncrpc_construction_emits_no_deprecation_warning(self) -> None: + """AsyncRPC construction must not leak a 'no current event loop' DeprecationWarning. + + The fix for the >=3.12 get_event_loop() crash must not itself emit the very + deprecation it works around. Treats DeprecationWarning as an error while + constructing with no installed loop. + """ + import queue + import warnings + + from pyisolate._internal.rpc_protocol import AsyncRPC + + try: + previous_loop = asyncio.get_event_loop_policy().get_event_loop() + except RuntimeError: + previous_loop = None + + asyncio.set_event_loop(None) + rpc = None + try: + with warnings.catch_warnings(): + warnings.simplefilter("error", DeprecationWarning) + rpc = AsyncRPC(recv_queue=cast(Any, queue.Queue()), send_queue=cast(Any, queue.Queue())) + assert isinstance(rpc.default_loop, asyncio.AbstractEventLoop) + finally: + created = rpc.default_loop if rpc is not None else None + asyncio.set_event_loop(previous_loop) + if created is not None and created is not previous_loop: + created.close() + def test_singleton_data_persists_across_loops(self) -> None: """Data stored in singleton persists across event loops.""" try: diff --git a/tests/test_tensor_shared_memory.py b/tests/test_tensor_shared_memory.py new file mode 100644 index 0000000..e25135a --- /dev/null +++ b/tests/test_tensor_shared_memory.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +import pytest + +torch = pytest.importorskip("torch") + +from pyisolate._internal.tensor_serializer import ( # noqa: E402 + _reset_shm_check, + deserialize_tensor, + serialize_tensor, +) + + +def test_cpu_torch_share_roundtrip_is_zero_copy() -> None: + """torch_share CPU transport must stay shared-memory, never a value copy. + + Runs single-process so it executes on Windows, where the multi-process + torch_share RPC tests are skipped (extension loading needs Unix sockets). + Guards the regression where a /dev/shm gate degrades CPU sharing to a + file-based value copy: a copy still passes ``torch.equal`` but breaks the + shared-storage contract that callers depend on for zero-copy transfer. + """ + _reset_shm_check() + original = torch.arange(25, dtype=torch.float32).reshape(5, 5) + + payload = serialize_tensor(original, mode="shared_memory") + + assert payload["__type__"] == "TensorRef", "CPU tensor degraded to a value copy" + assert payload["device"] == "cpu" + assert payload["strategy"] in ("file_system", "file_system_borrowed") + + rebuilt = deserialize_tensor(payload, mode="shared_memory") + assert torch.equal(rebuilt, original) + + original[0, 0] = 999.0 + assert float(rebuilt[0, 0]) == 999.0, "receiver did not observe sender mutation" diff --git a/tests/test_tensor_transport_mode_isolation.py b/tests/test_tensor_transport_mode_isolation.py new file mode 100644 index 0000000..92138b8 --- /dev/null +++ b/tests/test_tensor_transport_mode_isolation.py @@ -0,0 +1,110 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +if TYPE_CHECKING: + from pyisolate._internal.serialization_registry import SerializerRegistry + + +def _shared_memory_registry() -> SerializerRegistry: + """A registry whose Tensor serializer is pinned to shared_memory, simulating a + co-resident share_torch extension that registered last on the process-global + SerializerRegistry singleton.""" + from pyisolate._internal.serialization_registry import SerializerRegistry + from pyisolate._internal.tensor_serializer import register_tensor_serializer + + registry = SerializerRegistry() + register_tensor_serializer(registry, mode="shared_memory") + return registry + + +def test_serialize_for_isolation_defers_tensor_to_per_channel_transport() -> None: + """serialize_for_isolation must not pre-encode torch tensors via the registry's + mode-bound "Tensor" serializer; the tensor must be deferred so the per-channel + transport decides the wire format. + + Regression: a host running a shared_memory (share_torch) extension alongside a json + (sealed/conda) extension pinned the global "Tensor" serializer to shared_memory + (last-writer-wins). serialize_for_isolation then emitted a shared-memory TensorRef + onto the json channel; the torch-free sealed worker raised KeyError('data') decoding + it and its RPC recv thread died ("Socket closed"). + """ + torch = pytest.importorskip("torch") + + from pyisolate._internal.model_serialization import _serialize_for_isolation_impl + from pyisolate._internal.remote_handle import RemoteObjectHandle + + out = _serialize_for_isolation_impl( + torch.zeros(2, 3), + registry=_shared_memory_registry(), + torch_module=torch, + remote_handle_type=RemoteObjectHandle, + ) + + assert isinstance(out, torch.Tensor) + assert not (isinstance(out, dict) and out.get("__type__") == "TensorRef") + + +def test_serialize_for_isolation_reads_cuda_ipc_env_at_runtime(monkeypatch: pytest.MonkeyPatch) -> None: + """The CUDA-IPC decision in serialize_for_isolation must be evaluated at call time, + not captured at import. + + Regression: a module-level _cuda_ipc_enabled snapshot, consulted before the registry + serializer, was stale because the host sets PYISOLATE_ENABLE_CUDA_IPC during + _initialize_process -- after this module is imported. With CUDA IPC configured it + therefore sent obj.cpu() instead of deferring the on-device tensor to the per-channel + CUDA IPC transport, silently losing CUDA transport. A fake on-device tensor isolates + the env decision without requiring a GPU. + """ + from pyisolate._internal.model_serialization import _serialize_for_isolation_impl + from pyisolate._internal.remote_handle import RemoteObjectHandle + from pyisolate._internal.serialization_registry import SerializerRegistry + + class FakeCudaTensor: + is_cuda = True + + def cpu(self) -> str: + return "DOWNGRADED_TO_CPU" + + class FakeTorch: + Tensor = FakeCudaTensor + + tensor = FakeCudaTensor() + registry = SerializerRegistry() + + def _serialize() -> object: + return _serialize_for_isolation_impl( + tensor, + registry=registry, + torch_module=FakeTorch, + remote_handle_type=RemoteObjectHandle, + ) + + # CUDA IPC configured at runtime -> defer the on-device tensor (do NOT downgrade). + monkeypatch.setenv("PYISOLATE_ENABLE_CUDA_IPC", "1") + assert _serialize() is tensor + + # CUDA IPC not configured -> fall back to CPU. + monkeypatch.delenv("PYISOLATE_ENABLE_CUDA_IPC", raising=False) + assert _serialize() == "DOWNGRADED_TO_CPU" + + +def test_prepare_for_rpc_defers_tensor_to_per_channel_transport() -> None: + """_prepare_for_rpc_impl (the RPC argument pre-pass) must likewise defer torch tensors + instead of pre-encoding them with the global registry mode. This is the path that + serializes execute_node arguments; the per-channel transport must choose the format. + """ + torch = pytest.importorskip("torch") + + from pyisolate._internal.rpc_serialization import _prepare_for_rpc_impl + + out = _prepare_for_rpc_impl( + torch.zeros(2, 3), + registry=_shared_memory_registry(), + torch_module=torch, + ) + + assert isinstance(out, torch.Tensor) + assert not (isinstance(out, dict) and out.get("__type__") == "TensorRef")