From f366a89bf701e70481a18ffabdb2a1c30c2f2ee4 Mon Sep 17 00:00:00 2001 From: Mike Macpherson Date: Tue, 25 Nov 2025 11:07:03 -0800 Subject: [PATCH 1/7] Add decorator matrix and integration tests --- Makefile | 4 + pyproject.toml | 2 + tests/test_datastar_decorator_runtime.py | 127 ++++++++++++++++++ tests/test_decorator_matrix.py | 109 +++++++++++++++ tests/test_fastapi_decorator_integration.py | 120 +++++++++++++++++ tests/test_fasthtml_decorator_integration.py | 118 ++++++++++++++++ tests/test_litestar_decorator_integration.py | 120 +++++++++++++++++ tests/test_starlette_decorator_integration.py | 127 ++++++++++++++++++ 8 files changed, 727 insertions(+) create mode 100644 Makefile create mode 100644 tests/test_datastar_decorator_runtime.py create mode 100644 tests/test_decorator_matrix.py create mode 100644 tests/test_fastapi_decorator_integration.py create mode 100644 tests/test_fasthtml_decorator_integration.py create mode 100644 tests/test_litestar_decorator_integration.py create mode 100644 tests/test_starlette_decorator_integration.py diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..209a865 --- /dev/null +++ b/Makefile @@ -0,0 +1,4 @@ +.PHONY: test + +test: + uv run --dev pytest diff --git a/pyproject.toml b/pyproject.toml index 7c9c70d..c83c8c9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,12 +48,14 @@ urls.GitHub = "https://github.com/starfederation/datastar-python" dev = [ "django>=4.2.23", "fastapi>=0.116.1", + "httpx>=0.27", "litestar>=2.17", "pre-commit>=4.2", "python-fasthtml>=0.12.25; python_full_version>='3.10'", "quart>=0.20", "sanic>=25.3", "starlette>=0.47.3", + "uvicorn>=0.30", ] [tool.ruff] diff --git a/tests/test_datastar_decorator_runtime.py b/tests/test_datastar_decorator_runtime.py new file mode 100644 index 0000000..2ecd905 --- /dev/null +++ b/tests/test_datastar_decorator_runtime.py @@ -0,0 +1,127 @@ +"""Runtime-focused tests for datastar_response decorators.""" + +from __future__ import annotations + +import importlib +import inspect +import threading +import time +from typing import Any + +import anyio +import httpx +import pytest +import uvicorn +from starlette.applications import Starlette +from starlette.responses import PlainTextResponse +from starlette.routing import Route + +from datastar_py.sse import ServerSentEventGenerator as SSE + + +@pytest.fixture +def anyio_backend() -> str: + """Limit anyio plugin to asyncio backend for these tests.""" + return "asyncio" + + +@pytest.mark.parametrize("module_path", ["datastar_py.starlette", "datastar_py.fasthtml"]) +@pytest.mark.parametrize( + "variant", + [ + "sync_value", + "sync_generator", + "async_value", + "async_generator", + ], +) +def test_decorator_returns_response_objects(module_path: str, variant: str) -> None: + """Decorated handlers should stay sync-callable and return DatastarResponse immediately.""" + + mod = importlib.import_module(module_path) + datastar_response = mod.datastar_response + DatastarResponse = mod.DatastarResponse + + if variant == "sync_value": + @datastar_response + def handler() -> Any: + return SSE.patch_signals({"ok": True}) + elif variant == "sync_generator": + @datastar_response + def handler() -> Any: + yield SSE.patch_signals({"ok": True}) + elif variant == "async_value": + @datastar_response + async def handler() -> Any: + return SSE.patch_signals({"ok": True}) + else: + @datastar_response + async def handler() -> Any: + yield SSE.patch_signals({"ok": True}) + + result = handler() + if inspect.iscoroutine(result): + result.close() # avoid "coroutine was never awaited" warnings + + assert not inspect.iscoroutinefunction(handler), "Decorator should preserve sync callable semantics" + assert isinstance(result, DatastarResponse) + + +async def _fetch( + client: httpx.AsyncClient, path: str, timings: dict[str, float], key: str +) -> None: + start = time.perf_counter() + resp = await client.get(path, timeout=5.0) + timings[key] = time.perf_counter() - start + resp.raise_for_status() + + +@pytest.mark.anyio("asyncio") +async def test_sync_handler_runs_off_event_loop() -> None: + """Sync routes should stay in the threadpool; otherwise they block the event loop.""" + + entered = threading.Event() + + from datastar_py.starlette import datastar_response + + @datastar_response + def slow(request) -> Any: # noqa: ANN001 + entered.set() + time.sleep(1.0) # if run on the event loop, this blocks other requests + return SSE.patch_signals({"slow": True}) + + async def ping(request) -> PlainTextResponse: # noqa: ANN001 + return PlainTextResponse("pong") + + app = Starlette(routes=[Route("/slow", slow), Route("/ping", ping)]) + + config = uvicorn.Config(app, host="127.0.0.1", port=0, log_level="warning", lifespan="off") + server = uvicorn.Server(config) + thread = threading.Thread(target=server.run, daemon=True) + thread.start() + + try: + # Wait for server to start and expose sockets + for _ in range(50): + if server.started and getattr(server, "servers", None): + break + await anyio.sleep(0.05) + else: + pytest.fail("Server did not start") + + sock = server.servers[0].sockets[0] + host, port = sock.getsockname()[:2] + base_url = f"http://{host}:{port}" + + async with httpx.AsyncClient(base_url=base_url) as client: + timings: dict[str, float] = {} + async with anyio.create_task_group() as tg: + tg.start_soon(_fetch, client, "/slow", timings, "slow") + await anyio.to_thread.run_sync(entered.wait, 1.0) + tg.start_soon(_fetch, client, "/ping", timings, "ping") + + assert timings["slow"] >= 0.9 + assert timings["ping"] < 0.3, "Ping should not be blocked by slow sync handler" + finally: + server.should_exit = True + thread.join(timeout=2) diff --git a/tests/test_decorator_matrix.py b/tests/test_decorator_matrix.py new file mode 100644 index 0000000..852ed14 --- /dev/null +++ b/tests/test_decorator_matrix.py @@ -0,0 +1,109 @@ +"""Matrix tests for datastar_response across frameworks and callable types.""" + +from __future__ import annotations + +import importlib +import inspect +from typing import Any, Iterable + +import pytest + +from datastar_py.sse import ServerSentEventGenerator as SSE + + +FRAMEWORKS = [ + # name, module path, iterator attribute on response (None means use response directly) + ("starlette", "datastar_py.starlette", "body_iterator"), + ("fasthtml", "datastar_py.fasthtml", "body_iterator"), + ("fastapi", "datastar_py.fastapi", "body_iterator"), + ("litestar", "datastar_py.litestar", "iterator"), + ("django", "datastar_py.django", None), + # Quart and Sanic need full request contexts; covered elsewhere + ("quart", "datastar_py.quart", None), + ("sanic", "datastar_py.sanic", None), +] + + +@pytest.fixture +def anyio_backend() -> str: + """Limit anyio plugin to asyncio backend for these tests.""" + return "asyncio" + + +def _require_module(module_path: str) -> Any: + if not importlib.util.find_spec(module_path): + pytest.skip(f"{module_path} not installed") + return importlib.import_module(module_path) + + +async def _collect_events(resp: Any, iterator_attr: str | None) -> list[Any]: + """Gather events from response regardless of iterator style.""" + iterator = getattr(resp, iterator_attr) if iterator_attr else resp + events: list[Any] = [] + + if hasattr(iterator, "__aiter__"): + async for event in iterator: # type: ignore[has-type] + events.append(event) + elif isinstance(iterator, Iterable): + for event in iterator: + events.append(event) + else: + raise TypeError(f"Cannot iterate response events for {type(resp)}") + + return events + + +@pytest.mark.anyio +@pytest.mark.parametrize("framework_name,module_path,iterator_attr", FRAMEWORKS) +@pytest.mark.parametrize( + "variant", + ["sync_value", "sync_generator", "async_value", "async_generator"], +) +async def test_datastar_response_matrix( + framework_name: str, module_path: str, iterator_attr: str | None, variant: str +) -> None: + """Ensure decorator works for sync/async and generator/non-generator functions.""" + + if framework_name in {"quart", "sanic"}: + pytest.skip(f"{framework_name} decorator requires full request context to exercise") + if framework_name == "django": + from django.conf import settings + + if not settings.configured: + settings.configure(DEFAULT_CHARSET="utf-8") + if variant == "async_generator": + pytest.skip("Django adapter does not support async generators yet") + + mod = _require_module(module_path) + datastar_response = mod.datastar_response + DatastarResponse = mod.DatastarResponse + + if variant == "sync_value": + @datastar_response + def handler() -> Any: + return SSE.patch_signals({"ok": True}) + elif variant == "sync_generator": + @datastar_response + def handler() -> Any: + yield SSE.patch_signals({"ok": True}) + elif variant == "async_value": + @datastar_response + async def handler() -> Any: + return SSE.patch_signals({"ok": True}) + else: + @datastar_response + async def handler() -> Any: + yield SSE.patch_signals({"ok": True}) + + result = handler() + try: + if inspect.isawaitable(result): + result = await result + + assert isinstance(result, DatastarResponse) + events = await _collect_events(result, iterator_attr) + assert events, "Expected at least one event from response iterator" + finally: + # Avoid "coroutine was never awaited" warnings when assertions fail + if inspect.iscoroutine(result): + result.close() diff --git a/tests/test_fastapi_decorator_integration.py b/tests/test_fastapi_decorator_integration.py new file mode 100644 index 0000000..0ee3440 --- /dev/null +++ b/tests/test_fastapi_decorator_integration.py @@ -0,0 +1,120 @@ +"""Integration test: datastar_response within a live FastAPI app.""" + +from __future__ import annotations + +import threading +import time +from typing import Any + +import anyio +import httpx +import pytest +import uvicorn +from fastapi import FastAPI +from starlette.responses import PlainTextResponse + +from datastar_py.sse import ServerSentEventGenerator as SSE +from datastar_py.fastapi import datastar_response + + +@pytest.fixture +def anyio_backend() -> str: + return "asyncio" + + +async def _fetch(client: httpx.AsyncClient, path: str) -> httpx.Response: + resp = await client.get(path, timeout=5.0) + resp.raise_for_status() + return resp + + +@pytest.mark.anyio +async def test_fastapi_handlers_cover_matrix() -> None: + """Ensure FastAPI handlers across sync/async and gen/value work end-to-end.""" + + entered = threading.Event() + app = FastAPI() + + @app.get("/sync-value") + @datastar_response + def sync_value() -> Any: + entered.set() + time.sleep(0.2) # should run in threadpool + return SSE.patch_signals({"src": "sync_value"}) + + @app.get("/sync-generator") + @datastar_response + def sync_gen() -> Any: + yield SSE.patch_signals({"src": "sync_generator", "idx": 1}) + yield SSE.patch_signals({"src": "sync_generator", "idx": 2}) + + @app.get("/async-value") + @datastar_response + async def async_value() -> Any: + return SSE.patch_signals({"src": "async_value"}) + + @app.get("/async-generator") + @datastar_response + async def async_gen() -> Any: + yield SSE.patch_signals({"src": "async_generator", "idx": 1}) + yield SSE.patch_signals({"src": "async_generator", "idx": 2}) + + @app.get("/ping") + async def ping() -> PlainTextResponse: + return PlainTextResponse("pong") + + config = uvicorn.Config(app, host="127.0.0.1", port=0, log_level="warning", lifespan="off") + server = uvicorn.Server(config) + thread = threading.Thread(target=server.run, daemon=True) + thread.start() + + try: + for _ in range(50): + if server.started and getattr(server, "servers", None): + break + await anyio.sleep(0.05) + else: + pytest.fail("Server did not start") + + sock = server.servers[0].sockets[0] + host, port = sock.getsockname()[:2] + base_url = f"http://{host}:{port}" + + async with httpx.AsyncClient(base_url=base_url) as client: + # Concurrency sanity: sync_value should not stall ping + async with anyio.create_task_group() as tg: + slow_resp: httpx.Response | None = None + ping_resp: httpx.Response | None = None + + async def hit_slow(): + nonlocal slow_resp + slow_resp = await _fetch(client, "/sync-value") + + async def hit_ping(): + nonlocal ping_resp + await anyio.to_thread.run_sync(entered.wait, 1.0) + ping_resp = await _fetch(client, "/ping") + + tg.start_soon(hit_slow) + tg.start_soon(hit_ping) + + assert slow_resp is not None and slow_resp.status_code == 200 + assert ping_resp is not None and ping_resp.status_code == 200 + assert float(ping_resp.elapsed.total_seconds()) < 0.35 + + sync_value_body = (await _fetch(client, "/sync-value")).text + assert '"src":"sync_value"' in sync_value_body + + sync_gen_body = (await _fetch(client, "/sync-generator")).text + assert '"src":"sync_generator"' in sync_gen_body + assert '"idx":1' in sync_gen_body and '"idx":2' in sync_gen_body + + async_value_body = (await _fetch(client, "/async-value")).text + assert '"src":"async_value"' in async_value_body + + async_gen_body = (await _fetch(client, "/async-generator")).text + assert '"src":"async_generator"' in async_gen_body + assert '"idx":1' in async_gen_body and '"idx":2' in async_gen_body + finally: + server.should_exit = True + thread.join(timeout=2) diff --git a/tests/test_fasthtml_decorator_integration.py b/tests/test_fasthtml_decorator_integration.py new file mode 100644 index 0000000..3645c48 --- /dev/null +++ b/tests/test_fasthtml_decorator_integration.py @@ -0,0 +1,118 @@ +"""Integration test: datastar_response within a live FastHTML app.""" + +from __future__ import annotations + +import threading +import time +from typing import Any + +import anyio +import httpx +import pytest +import uvicorn +from fasthtml.common import fast_app +from starlette.responses import PlainTextResponse + +from datastar_py.sse import ServerSentEventGenerator as SSE +from datastar_py.fasthtml import datastar_response + + +@pytest.fixture +def anyio_backend() -> str: + return "asyncio" + + +async def _fetch(client: httpx.AsyncClient, path: str) -> httpx.Response: + resp = await client.get(path, timeout=5.0) + resp.raise_for_status() + return resp + + +@pytest.mark.anyio +async def test_fasthtml_sync_and_streaming_handlers() -> None: + """Ensure FastHTML routes across sync/async and gen/value work end-to-end.""" + + entered = threading.Event() + + app, rt = fast_app(htmx=False, live=False) + + @rt("/slow") + @datastar_response + def slow(request) -> Any: # noqa: ANN001 + entered.set() + time.sleep(0.2) # should not block event loop for other requests + return SSE.patch_signals({"src": "sync_value"}) + + @rt("/sync-generator") + @datastar_response + def sync_gen(request) -> Any: # noqa: ANN001 + yield SSE.patch_signals({"src": "sync_generator", "idx": 1}) + yield SSE.patch_signals({"src": "sync_generator", "idx": 2}) + + @rt("/stream") + @datastar_response + async def async_gen(request) -> Any: # noqa: ANN001 + yield SSE.patch_signals({"src": "async_generator", "idx": 1}) + yield SSE.patch_signals({"src": "async_generator", "idx": 2}) + + @rt("/async-value") + @datastar_response + async def async_value(request) -> Any: # noqa: ANN001 + return SSE.patch_signals({"src": "async_value"}) + + @rt("/ping") + async def ping(request) -> PlainTextResponse: # noqa: ANN001 + return PlainTextResponse("pong") + + config = uvicorn.Config(app, host="127.0.0.1", port=0, log_level="warning", lifespan="off") + server = uvicorn.Server(config) + thread = threading.Thread(target=server.run, daemon=True) + thread.start() + + try: + for _ in range(50): + if server.started and getattr(server, "servers", None): + break + await anyio.sleep(0.05) + else: + pytest.fail("Server did not start") + + sock = server.servers[0].sockets[0] + host, port = sock.getsockname()[:2] + base_url = f"http://{host}:{port}" + + async with httpx.AsyncClient(base_url=base_url) as client: + async with anyio.create_task_group() as tg: + slow_resp: httpx.Response | None = None + ping_resp: httpx.Response | None = None + + async def hit_slow(): + nonlocal slow_resp + slow_resp = await _fetch(client, "/slow") + + async def hit_ping(): + nonlocal ping_resp + await anyio.to_thread.run_sync(entered.wait, 1.0) + ping_resp = await _fetch(client, "/ping") + + tg.start_soon(hit_slow) + tg.start_soon(hit_ping) + + assert slow_resp is not None and slow_resp.status_code == 200 + assert ping_resp is not None and ping_resp.status_code == 200 + assert float(ping_resp.elapsed.total_seconds()) < 0.3 + + sync_gen_body = (await _fetch(client, "/sync-generator")).text + assert '"src":"sync_generator"' in sync_gen_body + assert '"idx":1' in sync_gen_body and '"idx":2' in sync_gen_body + + async_value_body = (await _fetch(client, "/async-value")).text + assert '"src":"async_value"' in async_value_body + + stream = await _fetch(client, "/stream") + body = stream.text + assert '"src":"async_generator"' in body + assert '"idx":1' in body and '"idx":2' in body + finally: + server.should_exit = True + thread.join(timeout=2) diff --git a/tests/test_litestar_decorator_integration.py b/tests/test_litestar_decorator_integration.py new file mode 100644 index 0000000..1529d7f --- /dev/null +++ b/tests/test_litestar_decorator_integration.py @@ -0,0 +1,120 @@ +"""Integration test: datastar_response within a live Litestar app.""" + +from __future__ import annotations + +import threading +import time +from typing import Any + +import anyio +import httpx +import pytest +import uvicorn +from litestar import Litestar, get +from starlette.responses import PlainTextResponse + +from datastar_py.sse import ServerSentEventGenerator as SSE +from datastar_py.litestar import datastar_response + + +@pytest.fixture +def anyio_backend() -> str: + return "asyncio" + + +async def _fetch(client: httpx.AsyncClient, path: str) -> httpx.Response: + resp = await client.get(path, timeout=5.0) + resp.raise_for_status() + return resp + + +@pytest.mark.anyio +async def test_litestar_handlers_cover_matrix() -> None: + """Ensure Litestar handlers across sync/async and gen/value work end-to-end.""" + + entered = threading.Event() + + @get("/sync-value") + @datastar_response + def sync_value() -> Any: + entered.set() + time.sleep(0.2) + return SSE.patch_signals({"src": "sync_value"}) + + @get("/sync-generator") + @datastar_response + def sync_gen() -> Any: + yield SSE.patch_signals({"src": "sync_generator", "idx": 1}) + yield SSE.patch_signals({"src": "sync_generator", "idx": 2}) + + @get("/async-value") + @datastar_response + async def async_value() -> Any: + return SSE.patch_signals({"src": "async_value"}) + + @get("/async-generator") + @datastar_response + async def async_gen() -> Any: + yield SSE.patch_signals({"src": "async_generator", "idx": 1}) + yield SSE.patch_signals({"src": "async_generator", "idx": 2}) + + @get("/ping") + async def ping() -> PlainTextResponse: + return PlainTextResponse("pong") + + app = Litestar(route_handlers=[sync_value, sync_gen, async_value, async_gen, ping]) + + config = uvicorn.Config(app, host="127.0.0.1", port=0, log_level="warning", lifespan="off") + server = uvicorn.Server(config) + thread = threading.Thread(target=server.run, daemon=True) + thread.start() + + try: + for _ in range(50): + if server.started and getattr(server, "servers", None): + break + await anyio.sleep(0.05) + else: + pytest.fail("Server did not start") + + sock = server.servers[0].sockets[0] + host, port = sock.getsockname()[:2] + base_url = f"http://{host}:{port}" + + async with httpx.AsyncClient(base_url=base_url) as client: + async with anyio.create_task_group() as tg: + slow_resp: httpx.Response | None = None + ping_resp: httpx.Response | None = None + + async def hit_slow(): + nonlocal slow_resp + slow_resp = await _fetch(client, "/sync-value") + + async def hit_ping(): + nonlocal ping_resp + await anyio.to_thread.run_sync(entered.wait, 1.0) + ping_resp = await _fetch(client, "/ping") + + tg.start_soon(hit_slow) + tg.start_soon(hit_ping) + + assert slow_resp is not None and slow_resp.status_code == 200 + assert ping_resp is not None and ping_resp.status_code == 200 + assert float(ping_resp.elapsed.total_seconds()) < 0.35 + + sync_value_body = (await _fetch(client, "/sync-value")).text + assert '"src":"sync_value"' in sync_value_body + + sync_gen_body = (await _fetch(client, "/sync-generator")).text + assert '"src":"sync_generator"' in sync_gen_body + assert '"idx":1' in sync_gen_body and '"idx":2' in sync_gen_body + + async_value_body = (await _fetch(client, "/async-value")).text + assert '"src":"async_value"' in async_value_body + + async_gen_body = (await _fetch(client, "/async-generator")).text + assert '"src":"async_generator"' in async_gen_body + assert '"idx":1' in async_gen_body and '"idx":2' in async_gen_body + finally: + server.should_exit = True + thread.join(timeout=2) diff --git a/tests/test_starlette_decorator_integration.py b/tests/test_starlette_decorator_integration.py new file mode 100644 index 0000000..fccdb94 --- /dev/null +++ b/tests/test_starlette_decorator_integration.py @@ -0,0 +1,127 @@ +"""Integration test: datastar_response within a live Starlette app.""" + +from __future__ import annotations + +import threading +import time +from typing import Any + +import anyio +import httpx +import pytest +import uvicorn +from starlette.applications import Starlette +from starlette.responses import PlainTextResponse +from starlette.routing import Route + +from datastar_py.sse import ServerSentEventGenerator as SSE +from datastar_py.starlette import datastar_response + + +@pytest.fixture +def anyio_backend() -> str: + return "asyncio" + + +async def _fetch(client: httpx.AsyncClient, path: str) -> httpx.Response: + resp = await client.get(path, timeout=5.0) + resp.raise_for_status() + return resp + + +@pytest.mark.anyio +async def test_starlette_sync_handler_runs_in_threadpool_and_streams() -> None: + """Ensure all handler shapes work end-to-end and sync stays in threadpool.""" + + entered = threading.Event() + + @datastar_response + def sync_value(request) -> Any: # noqa: ANN001 + entered.set() + time.sleep(0.2) # should not block event loop + return SSE.patch_signals({"src": "sync_value"}) + + @datastar_response + def sync_gen(request) -> Any: # noqa: ANN001 + yield SSE.patch_signals({"src": "sync_generator", "idx": 1}) + yield SSE.patch_signals({"src": "sync_generator", "idx": 2}) + + @datastar_response + async def async_value(request) -> Any: # noqa: ANN001 + return SSE.patch_signals({"src": "async_value"}) + + @datastar_response + async def async_gen(request) -> Any: # noqa: ANN001 + yield SSE.patch_signals({"src": "async_generator", "idx": 1}) + yield SSE.patch_signals({"src": "async_generator", "idx": 2}) + + async def ping(request) -> PlainTextResponse: # noqa: ANN001 + return PlainTextResponse("pong") + + app = Starlette( + routes=[ + Route("/sync-value", sync_value), + Route("/sync-generator", sync_gen), + Route("/async-value", async_value), + Route("/async-generator", async_gen), + Route("/ping", ping), + ] + ) + + config = uvicorn.Config(app, host="127.0.0.1", port=0, log_level="warning", lifespan="off") + server = uvicorn.Server(config) + thread = threading.Thread(target=server.run, daemon=True) + thread.start() + + try: + for _ in range(50): + if server.started and getattr(server, "servers", None): + break + await anyio.sleep(0.05) + else: + pytest.fail("Server did not start") + + sock = server.servers[0].sockets[0] + host, port = sock.getsockname()[:2] + base_url = f"http://{host}:{port}" + + async with httpx.AsyncClient(base_url=base_url) as client: + # Verify blocking sync handler doesn't stall other requests + # Concurrency sanity: sync_value blocks 0.2s but should not stall ping + async with anyio.create_task_group() as tg: + slow_resp: httpx.Response | None = None + ping_resp: httpx.Response | None = None + + async def hit_slow(): + nonlocal slow_resp + slow_resp = await _fetch(client, "/sync-value") + + async def hit_ping(): + nonlocal ping_resp + await anyio.to_thread.run_sync(entered.wait, 1.0) + ping_resp = await _fetch(client, "/ping") + + tg.start_soon(hit_slow) + tg.start_soon(hit_ping) + + assert slow_resp is not None and slow_resp.status_code == 200 + assert ping_resp is not None and ping_resp.status_code == 200 + assert float(ping_resp.elapsed.total_seconds()) < 0.35 + + # Verify content of each endpoint + sync_value_body = (await _fetch(client, "/sync-value")).text + assert '"src":"sync_value"' in sync_value_body + + sync_gen_body = (await _fetch(client, "/sync-generator")).text + assert '"src":"sync_generator"' in sync_gen_body + assert '"idx":1' in sync_gen_body and '"idx":2' in sync_gen_body + + async_value_body = (await _fetch(client, "/async-value")).text + assert '"src":"async_value"' in async_value_body + + async_gen_body = (await _fetch(client, "/async-generator")).text + assert '"src":"async_generator"' in async_gen_body + assert '"idx":1' in async_gen_body and '"idx":2' in async_gen_body + finally: + server.should_exit = True + thread.join(timeout=2) From 987eeceeaba7d2d3a32d74abae069a36f6744d6e Mon Sep 17 00:00:00 2001 From: Mike Macpherson Date: Tue, 25 Nov 2025 11:07:16 -0800 Subject: [PATCH 2/7] Normalize decorator return to DatastarResponse --- src/datastar_py/django.py | 13 ++++++++++++- src/datastar_py/litestar.py | 20 ++++++++++++++++---- src/datastar_py/sanic.py | 6 +++--- src/datastar_py/starlette.py | 25 +++++++++++++++++++++---- 4 files changed, 52 insertions(+), 12 deletions(-) diff --git a/src/datastar_py/django.py b/src/datastar_py/django.py index 64e39e9..e003f15 100644 --- a/src/datastar_py/django.py +++ b/src/datastar_py/django.py @@ -2,6 +2,7 @@ from collections.abc import Awaitable, Callable, Mapping from functools import wraps +from inspect import isawaitable from typing import Any, ParamSpec from django.http import HttpRequest @@ -54,7 +55,17 @@ def datastar_response( @wraps(func) async def wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: r = func(*args, **kwargs) - if isinstance(r, Awaitable): + + if hasattr(r, "__aiter__"): + raise NotImplementedError( + "Async generators/iterables are not yet supported by the Django adapter; " + "use a sync generator or return a single value/awaitable instead." + ) + + if hasattr(r, "__iter__") and not isinstance(r, (str, bytes)): + return DatastarResponse(r) + + if isawaitable(r): return DatastarResponse(await r) return DatastarResponse(r) diff --git a/src/datastar_py/litestar.py b/src/datastar_py/litestar.py index 6e3590f..ba01688 100644 --- a/src/datastar_py/litestar.py +++ b/src/datastar_py/litestar.py @@ -2,6 +2,7 @@ from collections.abc import Awaitable, Callable, Mapping from functools import wraps +from inspect import isawaitable from typing import ( TYPE_CHECKING, Any, @@ -64,17 +65,28 @@ def __init__( def datastar_response( func: Callable[P, Awaitable[DatastarEvents] | DatastarEvents], -) -> Callable[P, Awaitable[DatastarResponse]]: +) -> Callable[P, DatastarResponse]: """A decorator which wraps a function result in DatastarResponse. Can be used on a sync or async function or generator function. """ @wraps(func) - async def wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: + def wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: r = func(*args, **kwargs) - if isinstance(r, Awaitable): - return DatastarResponse(await r) + + if hasattr(r, "__aiter__"): + return DatastarResponse(r) + + if hasattr(r, "__iter__") and not isinstance(r, (str, bytes)): + return DatastarResponse(r) + + if isawaitable(r): + async def await_and_yield(): + yield await r + + return DatastarResponse(await_and_yield()) + return DatastarResponse(r) wrapper.__annotations__["return"] = DatastarResponse diff --git a/src/datastar_py/sanic.py b/src/datastar_py/sanic.py index 878ac8e..fffd585 100644 --- a/src/datastar_py/sanic.py +++ b/src/datastar_py/sanic.py @@ -3,8 +3,8 @@ from collections.abc import Awaitable, Callable, Collection, Mapping from contextlib import aclosing, closing from functools import wraps -from inspect import isasyncgen, isgenerator -from typing import Any, ParamSpec +from inspect import isasyncgen, isawaitable, isgenerator +from typing import Any, ParamSpec, Union from sanic import HTTPResponse, Request @@ -70,7 +70,7 @@ def datastar_response( @wraps(func) async def wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse | None: r = func(*args, **kwargs) - if isinstance(r, Awaitable): + if isawaitable(r): return DatastarResponse(await r) if isasyncgen(r): request = args[0] diff --git a/src/datastar_py/starlette.py b/src/datastar_py/starlette.py index 60b4aac..74f33db 100644 --- a/src/datastar_py/starlette.py +++ b/src/datastar_py/starlette.py @@ -2,6 +2,7 @@ from collections.abc import Awaitable, Callable, Mapping from functools import wraps +from inspect import isawaitable from typing import ( TYPE_CHECKING, Any, @@ -53,17 +54,33 @@ def __init__( def datastar_response( func: Callable[P, Awaitable[DatastarEvents] | DatastarEvents], -) -> Callable[P, Awaitable[DatastarResponse]]: +) -> Callable[P, DatastarResponse]: """A decorator which wraps a function result in DatastarResponse. Can be used on a sync or async function or generator function. """ @wraps(func) - async def wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: + def wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: r = func(*args, **kwargs) - if isinstance(r, Awaitable): - return DatastarResponse(await r) + + # Check for async generator/iterator first (most specific case) + if hasattr(r, "__aiter__"): + return DatastarResponse(r) + + # Check for sync generator/iterator (before Awaitable to avoid false positives) + if hasattr(r, "__iter__") and not isinstance(r, (str, bytes)): + return DatastarResponse(r) + + # Check for coroutines/tasks (but NOT async generators, already handled above) + if isawaitable(r): + # Wrap awaitable in an async generator that yields the result + async def await_and_yield(): + yield await r + + return DatastarResponse(await_and_yield()) + + # Default case: single value or unknown type return DatastarResponse(r) wrapper.__annotations__["return"] = DatastarResponse From 2a524c731a666e7d471ea7a5fb706105ccda13e8 Mon Sep 17 00:00:00 2001 From: Mike Macpherson Date: Tue, 25 Nov 2025 11:07:30 -0800 Subject: [PATCH 3/7] Document decorator preserving sync handlers --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index a861838..63d2653 100644 --- a/README.md +++ b/README.md @@ -85,6 +85,10 @@ headers of the SSE response yourself. A datastar response consists of 0..N datastar events. There are response classes included to make this easy in all of the supported frameworks. +Each framework also exposes a `@datastar_response` decorator that will wrap +return values (including generators) into the right response class while +preserving sync handlers as sync so frameworks can keep them in their +threadpools. The following examples will work across all supported frameworks when the response class is imported from the appropriate framework package. From 63cb03c13a46a61a2c36b04063c89863a8ae747a Mon Sep 17 00:00:00 2001 From: Mike Macpherson Date: Sat, 20 Dec 2025 13:51:44 -0800 Subject: [PATCH 4/7] Refine decorator to preserve sync/async semantics --- src/datastar_py/django.py | 58 +++++++++++-------- src/datastar_py/litestar.py | 49 ++++++++++------ src/datastar_py/quart.py | 47 ++++++++++++---- src/datastar_py/starlette.py | 54 ++++++++++-------- tests/test_datastar_decorator_runtime.py | 71 +++++++++++++++++++++--- 5 files changed, 201 insertions(+), 78 deletions(-) diff --git a/src/datastar_py/django.py b/src/datastar_py/django.py index e003f15..a18c613 100644 --- a/src/datastar_py/django.py +++ b/src/datastar_py/django.py @@ -1,8 +1,8 @@ from __future__ import annotations from collections.abc import Awaitable, Callable, Mapping -from functools import wraps -from inspect import isawaitable +from functools import partial, wraps +from inspect import isasyncgenfunction, iscoroutinefunction from typing import Any, ParamSpec from django.http import HttpRequest @@ -46,30 +46,44 @@ def __init__( def datastar_response( func: Callable[P, Awaitable[DatastarEvents] | DatastarEvents], -) -> Callable[P, Awaitable[DatastarResponse]]: +) -> Callable[P, Awaitable[DatastarResponse] | DatastarResponse]: """A decorator which wraps a function result in DatastarResponse. Can be used on a sync or async function or generator function. + Preserves the sync/async nature of the decorated function. """ - - @wraps(func) - async def wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: - r = func(*args, **kwargs) - - if hasattr(r, "__aiter__"): - raise NotImplementedError( - "Async generators/iterables are not yet supported by the Django adapter; " - "use a sync generator or return a single value/awaitable instead." - ) - - if hasattr(r, "__iter__") and not isinstance(r, (str, bytes)): - return DatastarResponse(r) - - if isawaitable(r): - return DatastarResponse(await r) - return DatastarResponse(r) - - return wrapper + # Unwrap partials to inspect the actual underlying function + actual_func = func + while isinstance(actual_func, partial): + actual_func = actual_func.func + + # Async generators not supported by Django + if isasyncgenfunction(actual_func): + raise NotImplementedError( + "Async generators are not yet supported by the Django adapter; " + "use a sync generator or return a single value/awaitable instead." + ) + + # Coroutine (async def + return) + if iscoroutinefunction(actual_func): + + @wraps(actual_func) + async def async_coro_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: + result = await func(*args, **kwargs) + return DatastarResponse(result) + + async_coro_wrapper.__annotations__["return"] = DatastarResponse + return async_coro_wrapper + + # Sync Function (def) - includes sync generators + else: + + @wraps(actual_func) + def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: + return DatastarResponse(func(*args, **kwargs)) + + sync_wrapper.__annotations__["return"] = DatastarResponse + return sync_wrapper def read_signals(request: HttpRequest) -> dict[str, Any] | None: diff --git a/src/datastar_py/litestar.py b/src/datastar_py/litestar.py index ba01688..77b2e4b 100644 --- a/src/datastar_py/litestar.py +++ b/src/datastar_py/litestar.py @@ -1,8 +1,8 @@ from __future__ import annotations from collections.abc import Awaitable, Callable, Mapping -from functools import wraps -from inspect import isawaitable +from functools import partial, wraps +from inspect import isasyncgenfunction, iscoroutinefunction from typing import ( TYPE_CHECKING, Any, @@ -65,32 +65,47 @@ def __init__( def datastar_response( func: Callable[P, Awaitable[DatastarEvents] | DatastarEvents], -) -> Callable[P, DatastarResponse]: +) -> Callable[P, Awaitable[DatastarResponse] | DatastarResponse]: """A decorator which wraps a function result in DatastarResponse. Can be used on a sync or async function or generator function. + Preserves the sync/async nature of the decorated function. """ + # Unwrap partials to inspect the actual underlying function + actual_func = func + while isinstance(actual_func, partial): + actual_func = actual_func.func - @wraps(func) - def wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: - r = func(*args, **kwargs) + # Case A: Async Generator (async def + yield) + if isasyncgenfunction(actual_func): - if hasattr(r, "__aiter__"): - return DatastarResponse(r) + @wraps(actual_func) + async def async_gen_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: + return DatastarResponse(func(*args, **kwargs)) - if hasattr(r, "__iter__") and not isinstance(r, (str, bytes)): - return DatastarResponse(r) + async_gen_wrapper.__annotations__["return"] = DatastarResponse + return async_gen_wrapper - if isawaitable(r): - async def await_and_yield(): - yield await r + # Case B: Standard Coroutine (async def + return) + elif iscoroutinefunction(actual_func): - return DatastarResponse(await_and_yield()) + @wraps(actual_func) + async def async_coro_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: + result = await func(*args, **kwargs) + return DatastarResponse(result) - return DatastarResponse(r) + async_coro_wrapper.__annotations__["return"] = DatastarResponse + return async_coro_wrapper - wrapper.__annotations__["return"] = DatastarResponse - return wrapper + # Case C: Sync Function (def) - includes sync generators + else: + + @wraps(actual_func) + def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: + return DatastarResponse(func(*args, **kwargs)) + + sync_wrapper.__annotations__["return"] = DatastarResponse + return sync_wrapper async def read_signals(request: Request) -> dict[str, Any] | None: diff --git a/src/datastar_py/quart.py b/src/datastar_py/quart.py index 1866523..d6ea835 100644 --- a/src/datastar_py/quart.py +++ b/src/datastar_py/quart.py @@ -1,11 +1,11 @@ from __future__ import annotations from collections.abc import Awaitable, Callable, Mapping -from functools import wraps -from inspect import isasyncgen, isasyncgenfunction, isgenerator +from functools import partial, wraps +from inspect import isasyncgen, isasyncgenfunction, iscoroutinefunction, isgenerator from typing import Any, ParamSpec -from quart import Response, copy_current_request_context, request, stream_with_context +from quart import Response, request, stream_with_context from . import _read_signals from .sse import SSE_HEADERS, DatastarEvents, ServerSentEventGenerator @@ -43,20 +43,47 @@ def __init__( def datastar_response( func: Callable[P, Awaitable[DatastarEvents] | DatastarEvents], -) -> Callable[P, Awaitable[DatastarResponse]]: +) -> Callable[P, Awaitable[DatastarResponse] | DatastarResponse]: """A decorator which wraps a function result in DatastarResponse. Can be used on a sync or async function or generator function. + Preserves the sync/async nature of the decorated function. """ + # Unwrap partials to inspect the actual underlying function + actual_func = func + while isinstance(actual_func, partial): + actual_func = actual_func.func - @wraps(func) - async def wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: - if isasyncgenfunction(func): + # Case A: Async Generator (async def + yield) + if isasyncgenfunction(actual_func): + + @wraps(actual_func) + async def async_gen_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: return DatastarResponse(stream_with_context(func)(*args, **kwargs)) - return DatastarResponse(await copy_current_request_context(func)(*args, **kwargs)) - wrapper.__annotations__["return"] = DatastarResponse - return wrapper + async_gen_wrapper.__annotations__["return"] = DatastarResponse + return async_gen_wrapper + + # Case B: Standard Coroutine (async def + return) + elif iscoroutinefunction(actual_func): + + @wraps(actual_func) + async def async_coro_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: + result = await func(*args, **kwargs) + return DatastarResponse(result) + + async_coro_wrapper.__annotations__["return"] = DatastarResponse + return async_coro_wrapper + + # Case C: Sync Function (def) - includes sync generators + else: + + @wraps(actual_func) + def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: + return DatastarResponse(func(*args, **kwargs)) + + sync_wrapper.__annotations__["return"] = DatastarResponse + return sync_wrapper async def read_signals() -> dict[str, Any] | None: diff --git a/src/datastar_py/starlette.py b/src/datastar_py/starlette.py index 74f33db..c286084 100644 --- a/src/datastar_py/starlette.py +++ b/src/datastar_py/starlette.py @@ -1,8 +1,8 @@ from __future__ import annotations from collections.abc import Awaitable, Callable, Mapping -from functools import wraps -from inspect import isawaitable +from functools import partial, wraps +from inspect import isasyncgenfunction, iscoroutinefunction from typing import ( TYPE_CHECKING, Any, @@ -54,37 +54,47 @@ def __init__( def datastar_response( func: Callable[P, Awaitable[DatastarEvents] | DatastarEvents], -) -> Callable[P, DatastarResponse]: +) -> Callable[P, Awaitable[DatastarResponse] | DatastarResponse]: """A decorator which wraps a function result in DatastarResponse. Can be used on a sync or async function or generator function. + Preserves the sync/async nature of the decorated function. """ + # Unwrap partials to inspect the actual underlying function + actual_func = func + while isinstance(actual_func, partial): + actual_func = actual_func.func - @wraps(func) - def wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: - r = func(*args, **kwargs) + # Case A: Async Generator (async def + yield) + if isasyncgenfunction(actual_func): - # Check for async generator/iterator first (most specific case) - if hasattr(r, "__aiter__"): - return DatastarResponse(r) + @wraps(actual_func) + async def async_gen_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: + return DatastarResponse(func(*args, **kwargs)) - # Check for sync generator/iterator (before Awaitable to avoid false positives) - if hasattr(r, "__iter__") and not isinstance(r, (str, bytes)): - return DatastarResponse(r) + async_gen_wrapper.__annotations__["return"] = DatastarResponse + return async_gen_wrapper - # Check for coroutines/tasks (but NOT async generators, already handled above) - if isawaitable(r): - # Wrap awaitable in an async generator that yields the result - async def await_and_yield(): - yield await r + # Case B: Standard Coroutine (async def + return) + elif iscoroutinefunction(actual_func): - return DatastarResponse(await_and_yield()) + @wraps(actual_func) + async def async_coro_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: + result = await func(*args, **kwargs) + return DatastarResponse(result) - # Default case: single value or unknown type - return DatastarResponse(r) + async_coro_wrapper.__annotations__["return"] = DatastarResponse + return async_coro_wrapper - wrapper.__annotations__["return"] = DatastarResponse - return wrapper + # Case C: Sync Function (def) - includes sync generators + else: + + @wraps(actual_func) + def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: + return DatastarResponse(func(*args, **kwargs)) + + sync_wrapper.__annotations__["return"] = DatastarResponse + return sync_wrapper async def read_signals(request: Request) -> dict[str, Any] | None: diff --git a/tests/test_datastar_decorator_runtime.py b/tests/test_datastar_decorator_runtime.py index 2ecd905..5eb91e9 100644 --- a/tests/test_datastar_decorator_runtime.py +++ b/tests/test_datastar_decorator_runtime.py @@ -35,8 +35,8 @@ def anyio_backend() -> str: "async_generator", ], ) -def test_decorator_returns_response_objects(module_path: str, variant: str) -> None: - """Decorated handlers should stay sync-callable and return DatastarResponse immediately.""" +def test_decorator_preserves_sync_async_semantics(module_path: str, variant: str) -> None: + """Decorated handlers should preserve sync/async nature of the original function.""" mod = importlib.import_module(module_path) datastar_response = mod.datastar_response @@ -59,12 +59,18 @@ async def handler() -> Any: async def handler() -> Any: yield SSE.patch_signals({"ok": True}) - result = handler() - if inspect.iscoroutine(result): - result.close() # avoid "coroutine was never awaited" warnings + is_async_variant = variant.startswith("async_") - assert not inspect.iscoroutinefunction(handler), "Decorator should preserve sync callable semantics" - assert isinstance(result, DatastarResponse) + # Verify the wrapper preserves sync/async nature + if is_async_variant: + assert inspect.iscoroutinefunction(handler), "Async handlers should remain async" + # Call and close coroutine to avoid warnings (we can't await in sync test) + coro = handler() + coro.close() + else: + assert not inspect.iscoroutinefunction(handler), "Sync handlers should remain sync" + result = handler() + assert isinstance(result, DatastarResponse), "Sync handlers should return DatastarResponse directly" async def _fetch( @@ -125,3 +131,54 @@ async def ping(request) -> PlainTextResponse: # noqa: ANN001 finally: server.should_exit = True thread.join(timeout=2) + + +def test_async_generator_iterates_on_event_loop() -> None: + """Async generators should iterate on the event loop, not spawn a thread. + + This addresses the concern that a sync wrapper might cause async handlers + to run in the threadpool. The wrapper being sync only affects where the + generator object is created (trivial); iteration happens based on iterator + type - Starlette's StreamingResponse detects __aiter__ and iterates async. + + This test uses Starlette, but the same principle applies to Litestar which + also uses a sync wrapper. Litestar's Stream response similarly detects + async iterators and iterates them on the event loop. + """ + from starlette.testclient import TestClient + + from datastar_py.starlette import datastar_response + + execution_threads: dict[str, str] = {} + + @datastar_response + async def async_gen_handler(request) -> Any: # noqa: ANN001 + execution_threads["async_gen"] = threading.current_thread().name + yield SSE.patch_signals({"async": True}) + + @datastar_response + def sync_gen_handler(request) -> Any: # noqa: ANN001 + execution_threads["sync_gen"] = threading.current_thread().name + yield SSE.patch_signals({"sync": True}) + + app = Starlette(routes=[ + Route("/async", async_gen_handler), + Route("/sync", sync_gen_handler), + ]) + + with TestClient(app) as client: + client.get("/async") + client.get("/sync") + + # Async generator runs on the asyncio portal thread (event loop context) + # Sync generator runs in a separate threadpool worker + # The key assertion: they run in DIFFERENT thread contexts + assert execution_threads["async_gen"] != execution_threads["sync_gen"], ( + f"Async and sync generators should run in different thread contexts. " + f"Async ran on: {execution_threads['async_gen']}, Sync ran on: {execution_threads['sync_gen']}" + ) + + # Async generator should be on the event loop thread (asyncio-portal-* or MainThread) + assert "asyncio" in execution_threads["async_gen"] or execution_threads["async_gen"] == "MainThread", ( + f"Async generator should run on event loop, but ran on {execution_threads['async_gen']}" + ) From add7b8d8e9c735a787570695eadca6407b780cf6 Mon Sep 17 00:00:00 2001 From: Mike Macpherson Date: Tue, 30 Dec 2025 10:31:53 -0800 Subject: [PATCH 5/7] respond to reviewer feedback: - remove unnecessary partial function check - exchange deco-time async generator vs coroutine check for runtime - make analogous changes for each supported framework - update test docstring to reflect implementation --- src/datastar_py/django.py | 37 +++++++------------ src/datastar_py/litestar.py | 47 ++++++++---------------- src/datastar_py/quart.py | 40 ++++++++------------ src/datastar_py/starlette.py | 47 ++++++++---------------- tests/test_datastar_decorator_runtime.py | 12 ++---- 5 files changed, 62 insertions(+), 121 deletions(-) diff --git a/src/datastar_py/django.py b/src/datastar_py/django.py index a18c613..04146c9 100644 --- a/src/datastar_py/django.py +++ b/src/datastar_py/django.py @@ -1,7 +1,7 @@ from __future__ import annotations from collections.abc import Awaitable, Callable, Mapping -from functools import partial, wraps +from functools import wraps from inspect import isasyncgenfunction, iscoroutinefunction from typing import Any, ParamSpec @@ -52,38 +52,27 @@ def datastar_response( Can be used on a sync or async function or generator function. Preserves the sync/async nature of the decorated function. """ - # Unwrap partials to inspect the actual underlying function - actual_func = func - while isinstance(actual_func, partial): - actual_func = actual_func.func - - # Async generators not supported by Django - if isasyncgenfunction(actual_func): + if isasyncgenfunction(func): raise NotImplementedError( "Async generators are not yet supported by the Django adapter; " "use a sync generator or return a single value/awaitable instead." ) - # Coroutine (async def + return) - if iscoroutinefunction(actual_func): - - @wraps(actual_func) - async def async_coro_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: - result = await func(*args, **kwargs) - return DatastarResponse(result) + if iscoroutinefunction(func): - async_coro_wrapper.__annotations__["return"] = DatastarResponse - return async_coro_wrapper + @wraps(func) + async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: + return DatastarResponse(await func(*args, **kwargs)) - # Sync Function (def) - includes sync generators - else: + async_wrapper.__annotations__["return"] = DatastarResponse + return async_wrapper - @wraps(actual_func) - def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: - return DatastarResponse(func(*args, **kwargs)) + @wraps(func) + def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: + return DatastarResponse(func(*args, **kwargs)) - sync_wrapper.__annotations__["return"] = DatastarResponse - return sync_wrapper + sync_wrapper.__annotations__["return"] = DatastarResponse + return sync_wrapper def read_signals(request: HttpRequest) -> dict[str, Any] | None: diff --git a/src/datastar_py/litestar.py b/src/datastar_py/litestar.py index 77b2e4b..a692536 100644 --- a/src/datastar_py/litestar.py +++ b/src/datastar_py/litestar.py @@ -1,8 +1,8 @@ from __future__ import annotations from collections.abc import Awaitable, Callable, Mapping -from functools import partial, wraps -from inspect import isasyncgenfunction, iscoroutinefunction +from functools import wraps +from inspect import isasyncgenfunction, isawaitable, iscoroutinefunction from typing import ( TYPE_CHECKING, Any, @@ -71,41 +71,24 @@ def datastar_response( Can be used on a sync or async function or generator function. Preserves the sync/async nature of the decorated function. """ - # Unwrap partials to inspect the actual underlying function - actual_func = func - while isinstance(actual_func, partial): - actual_func = actual_func.func + if iscoroutinefunction(func) or isasyncgenfunction(func): - # Case A: Async Generator (async def + yield) - if isasyncgenfunction(actual_func): - - @wraps(actual_func) - async def async_gen_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: - return DatastarResponse(func(*args, **kwargs)) - - async_gen_wrapper.__annotations__["return"] = DatastarResponse - return async_gen_wrapper - - # Case B: Standard Coroutine (async def + return) - elif iscoroutinefunction(actual_func): - - @wraps(actual_func) - async def async_coro_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: - result = await func(*args, **kwargs) + @wraps(func) + async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: + result = func(*args, **kwargs) + if isawaitable(result): + result = await result return DatastarResponse(result) - async_coro_wrapper.__annotations__["return"] = DatastarResponse - return async_coro_wrapper - - # Case C: Sync Function (def) - includes sync generators - else: + async_wrapper.__annotations__["return"] = DatastarResponse + return async_wrapper - @wraps(actual_func) - def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: - return DatastarResponse(func(*args, **kwargs)) + @wraps(func) + def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: + return DatastarResponse(func(*args, **kwargs)) - sync_wrapper.__annotations__["return"] = DatastarResponse - return sync_wrapper + sync_wrapper.__annotations__["return"] = DatastarResponse + return sync_wrapper async def read_signals(request: Request) -> dict[str, Any] | None: diff --git a/src/datastar_py/quart.py b/src/datastar_py/quart.py index d6ea835..d5924ec 100644 --- a/src/datastar_py/quart.py +++ b/src/datastar_py/quart.py @@ -1,7 +1,7 @@ from __future__ import annotations from collections.abc import Awaitable, Callable, Mapping -from functools import partial, wraps +from functools import wraps from inspect import isasyncgen, isasyncgenfunction, iscoroutinefunction, isgenerator from typing import Any, ParamSpec @@ -49,41 +49,31 @@ def datastar_response( Can be used on a sync or async function or generator function. Preserves the sync/async nature of the decorated function. """ - # Unwrap partials to inspect the actual underlying function - actual_func = func - while isinstance(actual_func, partial): - actual_func = actual_func.func + # Async generators require stream_with_context wrapping at decoration time + if isasyncgenfunction(func): - # Case A: Async Generator (async def + yield) - if isasyncgenfunction(actual_func): - - @wraps(actual_func) + @wraps(func) async def async_gen_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: return DatastarResponse(stream_with_context(func)(*args, **kwargs)) async_gen_wrapper.__annotations__["return"] = DatastarResponse return async_gen_wrapper - # Case B: Standard Coroutine (async def + return) - elif iscoroutinefunction(actual_func): - - @wraps(actual_func) - async def async_coro_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: - result = await func(*args, **kwargs) - return DatastarResponse(result) + if iscoroutinefunction(func): - async_coro_wrapper.__annotations__["return"] = DatastarResponse - return async_coro_wrapper + @wraps(func) + async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: + return DatastarResponse(await func(*args, **kwargs)) - # Case C: Sync Function (def) - includes sync generators - else: + async_wrapper.__annotations__["return"] = DatastarResponse + return async_wrapper - @wraps(actual_func) - def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: - return DatastarResponse(func(*args, **kwargs)) + @wraps(func) + def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: + return DatastarResponse(func(*args, **kwargs)) - sync_wrapper.__annotations__["return"] = DatastarResponse - return sync_wrapper + sync_wrapper.__annotations__["return"] = DatastarResponse + return sync_wrapper async def read_signals() -> dict[str, Any] | None: diff --git a/src/datastar_py/starlette.py b/src/datastar_py/starlette.py index c286084..abe1c13 100644 --- a/src/datastar_py/starlette.py +++ b/src/datastar_py/starlette.py @@ -1,8 +1,8 @@ from __future__ import annotations from collections.abc import Awaitable, Callable, Mapping -from functools import partial, wraps -from inspect import isasyncgenfunction, iscoroutinefunction +from functools import wraps +from inspect import isasyncgenfunction, isawaitable, iscoroutinefunction from typing import ( TYPE_CHECKING, Any, @@ -60,41 +60,24 @@ def datastar_response( Can be used on a sync or async function or generator function. Preserves the sync/async nature of the decorated function. """ - # Unwrap partials to inspect the actual underlying function - actual_func = func - while isinstance(actual_func, partial): - actual_func = actual_func.func + if iscoroutinefunction(func) or isasyncgenfunction(func): - # Case A: Async Generator (async def + yield) - if isasyncgenfunction(actual_func): - - @wraps(actual_func) - async def async_gen_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: - return DatastarResponse(func(*args, **kwargs)) - - async_gen_wrapper.__annotations__["return"] = DatastarResponse - return async_gen_wrapper - - # Case B: Standard Coroutine (async def + return) - elif iscoroutinefunction(actual_func): - - @wraps(actual_func) - async def async_coro_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: - result = await func(*args, **kwargs) + @wraps(func) + async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: + result = func(*args, **kwargs) + if isawaitable(result): + result = await result return DatastarResponse(result) - async_coro_wrapper.__annotations__["return"] = DatastarResponse - return async_coro_wrapper - - # Case C: Sync Function (def) - includes sync generators - else: + async_wrapper.__annotations__["return"] = DatastarResponse + return async_wrapper - @wraps(actual_func) - def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: - return DatastarResponse(func(*args, **kwargs)) + @wraps(func) + def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: + return DatastarResponse(func(*args, **kwargs)) - sync_wrapper.__annotations__["return"] = DatastarResponse - return sync_wrapper + sync_wrapper.__annotations__["return"] = DatastarResponse + return sync_wrapper async def read_signals(request: Request) -> dict[str, Any] | None: diff --git a/tests/test_datastar_decorator_runtime.py b/tests/test_datastar_decorator_runtime.py index 5eb91e9..e989856 100644 --- a/tests/test_datastar_decorator_runtime.py +++ b/tests/test_datastar_decorator_runtime.py @@ -136,14 +136,10 @@ async def ping(request) -> PlainTextResponse: # noqa: ANN001 def test_async_generator_iterates_on_event_loop() -> None: """Async generators should iterate on the event loop, not spawn a thread. - This addresses the concern that a sync wrapper might cause async handlers - to run in the threadpool. The wrapper being sync only affects where the - generator object is created (trivial); iteration happens based on iterator - type - Starlette's StreamingResponse detects __aiter__ and iterates async. - - This test uses Starlette, but the same principle applies to Litestar which - also uses a sync wrapper. Litestar's Stream response similarly detects - async iterators and iterates them on the event loop. + The decorator preserves async nature: async generators get an async wrapper, + ensuring they run on the event loop. Sync generators get a sync wrapper, + running in the threadpool. This test verifies these execute in different + thread contexts as expected. """ from starlette.testclient import TestClient From ca51009a4b6db614a3ad4da869c589af1e145f93 Mon Sep 17 00:00:00 2001 From: Mike Macpherson Date: Fri, 17 Apr 2026 08:24:31 -0700 Subject: [PATCH 6/7] respond to review: restore Quart context, allow Django async generators, trim tests - quart: restore copy_current_request_context in the coroutine branch (dropped in 90d9d41 without justification; reviewer flagged regression risk) - django: drop NotImplementedError for async generators; unify coroutine + async-gen into a single async wrapper. Django's StreamingHttpResponse supports async iteration under ASGI, so this restores prior permissive behavior - matrix test: unskip the Django async_generator case (now passes) - tests: remove per-framework live-server integration tests (fastapi, fasthtml, litestar, starlette); keep test_decorator_matrix.py for cross-framework contract coverage and test_sync_handler_runs_off_event_loop as the regression guard for the original bug Co-Authored-By: Claude Opus 4.7 (1M context) --- src/datastar_py/django.py | 15 +-- src/datastar_py/quart.py | 4 +- tests/test_datastar_decorator_runtime.py | 99 +------------- tests/test_decorator_matrix.py | 2 - tests/test_fastapi_decorator_integration.py | 120 ----------------- tests/test_fasthtml_decorator_integration.py | 118 ---------------- tests/test_litestar_decorator_integration.py | 120 ----------------- tests/test_starlette_decorator_integration.py | 127 ------------------ 8 files changed, 9 insertions(+), 596 deletions(-) delete mode 100644 tests/test_fastapi_decorator_integration.py delete mode 100644 tests/test_fasthtml_decorator_integration.py delete mode 100644 tests/test_litestar_decorator_integration.py delete mode 100644 tests/test_starlette_decorator_integration.py diff --git a/src/datastar_py/django.py b/src/datastar_py/django.py index 04146c9..955a343 100644 --- a/src/datastar_py/django.py +++ b/src/datastar_py/django.py @@ -2,7 +2,7 @@ from collections.abc import Awaitable, Callable, Mapping from functools import wraps -from inspect import isasyncgenfunction, iscoroutinefunction +from inspect import isasyncgenfunction, isawaitable, iscoroutinefunction from typing import Any, ParamSpec from django.http import HttpRequest @@ -52,17 +52,14 @@ def datastar_response( Can be used on a sync or async function or generator function. Preserves the sync/async nature of the decorated function. """ - if isasyncgenfunction(func): - raise NotImplementedError( - "Async generators are not yet supported by the Django adapter; " - "use a sync generator or return a single value/awaitable instead." - ) - - if iscoroutinefunction(func): + if iscoroutinefunction(func) or isasyncgenfunction(func): @wraps(func) async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: - return DatastarResponse(await func(*args, **kwargs)) + result = func(*args, **kwargs) + if isawaitable(result): + result = await result + return DatastarResponse(result) async_wrapper.__annotations__["return"] = DatastarResponse return async_wrapper diff --git a/src/datastar_py/quart.py b/src/datastar_py/quart.py index d5924ec..6395529 100644 --- a/src/datastar_py/quart.py +++ b/src/datastar_py/quart.py @@ -5,7 +5,7 @@ from inspect import isasyncgen, isasyncgenfunction, iscoroutinefunction, isgenerator from typing import Any, ParamSpec -from quart import Response, request, stream_with_context +from quart import Response, copy_current_request_context, request, stream_with_context from . import _read_signals from .sse import SSE_HEADERS, DatastarEvents, ServerSentEventGenerator @@ -63,7 +63,7 @@ async def async_gen_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarRespon @wraps(func) async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse: - return DatastarResponse(await func(*args, **kwargs)) + return DatastarResponse(await copy_current_request_context(func)(*args, **kwargs)) async_wrapper.__annotations__["return"] = DatastarResponse return async_wrapper diff --git a/tests/test_datastar_decorator_runtime.py b/tests/test_datastar_decorator_runtime.py index e989856..bd60146 100644 --- a/tests/test_datastar_decorator_runtime.py +++ b/tests/test_datastar_decorator_runtime.py @@ -1,9 +1,7 @@ -"""Runtime-focused tests for datastar_response decorators.""" +"""Runtime regression test for datastar_response: sync handlers must not stall the event loop.""" from __future__ import annotations -import importlib -import inspect import threading import time from typing import Any @@ -25,54 +23,6 @@ def anyio_backend() -> str: return "asyncio" -@pytest.mark.parametrize("module_path", ["datastar_py.starlette", "datastar_py.fasthtml"]) -@pytest.mark.parametrize( - "variant", - [ - "sync_value", - "sync_generator", - "async_value", - "async_generator", - ], -) -def test_decorator_preserves_sync_async_semantics(module_path: str, variant: str) -> None: - """Decorated handlers should preserve sync/async nature of the original function.""" - - mod = importlib.import_module(module_path) - datastar_response = mod.datastar_response - DatastarResponse = mod.DatastarResponse - - if variant == "sync_value": - @datastar_response - def handler() -> Any: - return SSE.patch_signals({"ok": True}) - elif variant == "sync_generator": - @datastar_response - def handler() -> Any: - yield SSE.patch_signals({"ok": True}) - elif variant == "async_value": - @datastar_response - async def handler() -> Any: - return SSE.patch_signals({"ok": True}) - else: - @datastar_response - async def handler() -> Any: - yield SSE.patch_signals({"ok": True}) - - is_async_variant = variant.startswith("async_") - - # Verify the wrapper preserves sync/async nature - if is_async_variant: - assert inspect.iscoroutinefunction(handler), "Async handlers should remain async" - # Call and close coroutine to avoid warnings (we can't await in sync test) - coro = handler() - coro.close() - else: - assert not inspect.iscoroutinefunction(handler), "Sync handlers should remain sync" - result = handler() - assert isinstance(result, DatastarResponse), "Sync handlers should return DatastarResponse directly" - - async def _fetch( client: httpx.AsyncClient, path: str, timings: dict[str, float], key: str ) -> None: @@ -131,50 +81,3 @@ async def ping(request) -> PlainTextResponse: # noqa: ANN001 finally: server.should_exit = True thread.join(timeout=2) - - -def test_async_generator_iterates_on_event_loop() -> None: - """Async generators should iterate on the event loop, not spawn a thread. - - The decorator preserves async nature: async generators get an async wrapper, - ensuring they run on the event loop. Sync generators get a sync wrapper, - running in the threadpool. This test verifies these execute in different - thread contexts as expected. - """ - from starlette.testclient import TestClient - - from datastar_py.starlette import datastar_response - - execution_threads: dict[str, str] = {} - - @datastar_response - async def async_gen_handler(request) -> Any: # noqa: ANN001 - execution_threads["async_gen"] = threading.current_thread().name - yield SSE.patch_signals({"async": True}) - - @datastar_response - def sync_gen_handler(request) -> Any: # noqa: ANN001 - execution_threads["sync_gen"] = threading.current_thread().name - yield SSE.patch_signals({"sync": True}) - - app = Starlette(routes=[ - Route("/async", async_gen_handler), - Route("/sync", sync_gen_handler), - ]) - - with TestClient(app) as client: - client.get("/async") - client.get("/sync") - - # Async generator runs on the asyncio portal thread (event loop context) - # Sync generator runs in a separate threadpool worker - # The key assertion: they run in DIFFERENT thread contexts - assert execution_threads["async_gen"] != execution_threads["sync_gen"], ( - f"Async and sync generators should run in different thread contexts. " - f"Async ran on: {execution_threads['async_gen']}, Sync ran on: {execution_threads['sync_gen']}" - ) - - # Async generator should be on the event loop thread (asyncio-portal-* or MainThread) - assert "asyncio" in execution_threads["async_gen"] or execution_threads["async_gen"] == "MainThread", ( - f"Async generator should run on event loop, but ran on {execution_threads['async_gen']}" - ) diff --git a/tests/test_decorator_matrix.py b/tests/test_decorator_matrix.py index 852ed14..3a9b234 100644 --- a/tests/test_decorator_matrix.py +++ b/tests/test_decorator_matrix.py @@ -71,8 +71,6 @@ async def test_datastar_response_matrix( if not settings.configured: settings.configure(DEFAULT_CHARSET="utf-8") - if variant == "async_generator": - pytest.skip("Django adapter does not support async generators yet") mod = _require_module(module_path) datastar_response = mod.datastar_response diff --git a/tests/test_fastapi_decorator_integration.py b/tests/test_fastapi_decorator_integration.py deleted file mode 100644 index 0ee3440..0000000 --- a/tests/test_fastapi_decorator_integration.py +++ /dev/null @@ -1,120 +0,0 @@ -"""Integration test: datastar_response within a live FastAPI app.""" - -from __future__ import annotations - -import threading -import time -from typing import Any - -import anyio -import httpx -import pytest -import uvicorn -from fastapi import FastAPI -from starlette.responses import PlainTextResponse - -from datastar_py.sse import ServerSentEventGenerator as SSE -from datastar_py.fastapi import datastar_response - - -@pytest.fixture -def anyio_backend() -> str: - return "asyncio" - - -async def _fetch(client: httpx.AsyncClient, path: str) -> httpx.Response: - resp = await client.get(path, timeout=5.0) - resp.raise_for_status() - return resp - - -@pytest.mark.anyio -async def test_fastapi_handlers_cover_matrix() -> None: - """Ensure FastAPI handlers across sync/async and gen/value work end-to-end.""" - - entered = threading.Event() - app = FastAPI() - - @app.get("/sync-value") - @datastar_response - def sync_value() -> Any: - entered.set() - time.sleep(0.2) # should run in threadpool - return SSE.patch_signals({"src": "sync_value"}) - - @app.get("/sync-generator") - @datastar_response - def sync_gen() -> Any: - yield SSE.patch_signals({"src": "sync_generator", "idx": 1}) - yield SSE.patch_signals({"src": "sync_generator", "idx": 2}) - - @app.get("/async-value") - @datastar_response - async def async_value() -> Any: - return SSE.patch_signals({"src": "async_value"}) - - @app.get("/async-generator") - @datastar_response - async def async_gen() -> Any: - yield SSE.patch_signals({"src": "async_generator", "idx": 1}) - yield SSE.patch_signals({"src": "async_generator", "idx": 2}) - - @app.get("/ping") - async def ping() -> PlainTextResponse: - return PlainTextResponse("pong") - - config = uvicorn.Config(app, host="127.0.0.1", port=0, log_level="warning", lifespan="off") - server = uvicorn.Server(config) - thread = threading.Thread(target=server.run, daemon=True) - thread.start() - - try: - for _ in range(50): - if server.started and getattr(server, "servers", None): - break - await anyio.sleep(0.05) - else: - pytest.fail("Server did not start") - - sock = server.servers[0].sockets[0] - host, port = sock.getsockname()[:2] - base_url = f"http://{host}:{port}" - - async with httpx.AsyncClient(base_url=base_url) as client: - # Concurrency sanity: sync_value should not stall ping - async with anyio.create_task_group() as tg: - slow_resp: httpx.Response | None = None - ping_resp: httpx.Response | None = None - - async def hit_slow(): - nonlocal slow_resp - slow_resp = await _fetch(client, "/sync-value") - - async def hit_ping(): - nonlocal ping_resp - await anyio.to_thread.run_sync(entered.wait, 1.0) - ping_resp = await _fetch(client, "/ping") - - tg.start_soon(hit_slow) - tg.start_soon(hit_ping) - - assert slow_resp is not None and slow_resp.status_code == 200 - assert ping_resp is not None and ping_resp.status_code == 200 - assert float(ping_resp.elapsed.total_seconds()) < 0.35 - - sync_value_body = (await _fetch(client, "/sync-value")).text - assert '"src":"sync_value"' in sync_value_body - - sync_gen_body = (await _fetch(client, "/sync-generator")).text - assert '"src":"sync_generator"' in sync_gen_body - assert '"idx":1' in sync_gen_body and '"idx":2' in sync_gen_body - - async_value_body = (await _fetch(client, "/async-value")).text - assert '"src":"async_value"' in async_value_body - - async_gen_body = (await _fetch(client, "/async-generator")).text - assert '"src":"async_generator"' in async_gen_body - assert '"idx":1' in async_gen_body and '"idx":2' in async_gen_body - finally: - server.should_exit = True - thread.join(timeout=2) diff --git a/tests/test_fasthtml_decorator_integration.py b/tests/test_fasthtml_decorator_integration.py deleted file mode 100644 index 3645c48..0000000 --- a/tests/test_fasthtml_decorator_integration.py +++ /dev/null @@ -1,118 +0,0 @@ -"""Integration test: datastar_response within a live FastHTML app.""" - -from __future__ import annotations - -import threading -import time -from typing import Any - -import anyio -import httpx -import pytest -import uvicorn -from fasthtml.common import fast_app -from starlette.responses import PlainTextResponse - -from datastar_py.sse import ServerSentEventGenerator as SSE -from datastar_py.fasthtml import datastar_response - - -@pytest.fixture -def anyio_backend() -> str: - return "asyncio" - - -async def _fetch(client: httpx.AsyncClient, path: str) -> httpx.Response: - resp = await client.get(path, timeout=5.0) - resp.raise_for_status() - return resp - - -@pytest.mark.anyio -async def test_fasthtml_sync_and_streaming_handlers() -> None: - """Ensure FastHTML routes across sync/async and gen/value work end-to-end.""" - - entered = threading.Event() - - app, rt = fast_app(htmx=False, live=False) - - @rt("/slow") - @datastar_response - def slow(request) -> Any: # noqa: ANN001 - entered.set() - time.sleep(0.2) # should not block event loop for other requests - return SSE.patch_signals({"src": "sync_value"}) - - @rt("/sync-generator") - @datastar_response - def sync_gen(request) -> Any: # noqa: ANN001 - yield SSE.patch_signals({"src": "sync_generator", "idx": 1}) - yield SSE.patch_signals({"src": "sync_generator", "idx": 2}) - - @rt("/stream") - @datastar_response - async def async_gen(request) -> Any: # noqa: ANN001 - yield SSE.patch_signals({"src": "async_generator", "idx": 1}) - yield SSE.patch_signals({"src": "async_generator", "idx": 2}) - - @rt("/async-value") - @datastar_response - async def async_value(request) -> Any: # noqa: ANN001 - return SSE.patch_signals({"src": "async_value"}) - - @rt("/ping") - async def ping(request) -> PlainTextResponse: # noqa: ANN001 - return PlainTextResponse("pong") - - config = uvicorn.Config(app, host="127.0.0.1", port=0, log_level="warning", lifespan="off") - server = uvicorn.Server(config) - thread = threading.Thread(target=server.run, daemon=True) - thread.start() - - try: - for _ in range(50): - if server.started and getattr(server, "servers", None): - break - await anyio.sleep(0.05) - else: - pytest.fail("Server did not start") - - sock = server.servers[0].sockets[0] - host, port = sock.getsockname()[:2] - base_url = f"http://{host}:{port}" - - async with httpx.AsyncClient(base_url=base_url) as client: - async with anyio.create_task_group() as tg: - slow_resp: httpx.Response | None = None - ping_resp: httpx.Response | None = None - - async def hit_slow(): - nonlocal slow_resp - slow_resp = await _fetch(client, "/slow") - - async def hit_ping(): - nonlocal ping_resp - await anyio.to_thread.run_sync(entered.wait, 1.0) - ping_resp = await _fetch(client, "/ping") - - tg.start_soon(hit_slow) - tg.start_soon(hit_ping) - - assert slow_resp is not None and slow_resp.status_code == 200 - assert ping_resp is not None and ping_resp.status_code == 200 - assert float(ping_resp.elapsed.total_seconds()) < 0.3 - - sync_gen_body = (await _fetch(client, "/sync-generator")).text - assert '"src":"sync_generator"' in sync_gen_body - assert '"idx":1' in sync_gen_body and '"idx":2' in sync_gen_body - - async_value_body = (await _fetch(client, "/async-value")).text - assert '"src":"async_value"' in async_value_body - - stream = await _fetch(client, "/stream") - body = stream.text - assert '"src":"async_generator"' in body - assert '"idx":1' in body and '"idx":2' in body - finally: - server.should_exit = True - thread.join(timeout=2) diff --git a/tests/test_litestar_decorator_integration.py b/tests/test_litestar_decorator_integration.py deleted file mode 100644 index 1529d7f..0000000 --- a/tests/test_litestar_decorator_integration.py +++ /dev/null @@ -1,120 +0,0 @@ -"""Integration test: datastar_response within a live Litestar app.""" - -from __future__ import annotations - -import threading -import time -from typing import Any - -import anyio -import httpx -import pytest -import uvicorn -from litestar import Litestar, get -from starlette.responses import PlainTextResponse - -from datastar_py.sse import ServerSentEventGenerator as SSE -from datastar_py.litestar import datastar_response - - -@pytest.fixture -def anyio_backend() -> str: - return "asyncio" - - -async def _fetch(client: httpx.AsyncClient, path: str) -> httpx.Response: - resp = await client.get(path, timeout=5.0) - resp.raise_for_status() - return resp - - -@pytest.mark.anyio -async def test_litestar_handlers_cover_matrix() -> None: - """Ensure Litestar handlers across sync/async and gen/value work end-to-end.""" - - entered = threading.Event() - - @get("/sync-value") - @datastar_response - def sync_value() -> Any: - entered.set() - time.sleep(0.2) - return SSE.patch_signals({"src": "sync_value"}) - - @get("/sync-generator") - @datastar_response - def sync_gen() -> Any: - yield SSE.patch_signals({"src": "sync_generator", "idx": 1}) - yield SSE.patch_signals({"src": "sync_generator", "idx": 2}) - - @get("/async-value") - @datastar_response - async def async_value() -> Any: - return SSE.patch_signals({"src": "async_value"}) - - @get("/async-generator") - @datastar_response - async def async_gen() -> Any: - yield SSE.patch_signals({"src": "async_generator", "idx": 1}) - yield SSE.patch_signals({"src": "async_generator", "idx": 2}) - - @get("/ping") - async def ping() -> PlainTextResponse: - return PlainTextResponse("pong") - - app = Litestar(route_handlers=[sync_value, sync_gen, async_value, async_gen, ping]) - - config = uvicorn.Config(app, host="127.0.0.1", port=0, log_level="warning", lifespan="off") - server = uvicorn.Server(config) - thread = threading.Thread(target=server.run, daemon=True) - thread.start() - - try: - for _ in range(50): - if server.started and getattr(server, "servers", None): - break - await anyio.sleep(0.05) - else: - pytest.fail("Server did not start") - - sock = server.servers[0].sockets[0] - host, port = sock.getsockname()[:2] - base_url = f"http://{host}:{port}" - - async with httpx.AsyncClient(base_url=base_url) as client: - async with anyio.create_task_group() as tg: - slow_resp: httpx.Response | None = None - ping_resp: httpx.Response | None = None - - async def hit_slow(): - nonlocal slow_resp - slow_resp = await _fetch(client, "/sync-value") - - async def hit_ping(): - nonlocal ping_resp - await anyio.to_thread.run_sync(entered.wait, 1.0) - ping_resp = await _fetch(client, "/ping") - - tg.start_soon(hit_slow) - tg.start_soon(hit_ping) - - assert slow_resp is not None and slow_resp.status_code == 200 - assert ping_resp is not None and ping_resp.status_code == 200 - assert float(ping_resp.elapsed.total_seconds()) < 0.35 - - sync_value_body = (await _fetch(client, "/sync-value")).text - assert '"src":"sync_value"' in sync_value_body - - sync_gen_body = (await _fetch(client, "/sync-generator")).text - assert '"src":"sync_generator"' in sync_gen_body - assert '"idx":1' in sync_gen_body and '"idx":2' in sync_gen_body - - async_value_body = (await _fetch(client, "/async-value")).text - assert '"src":"async_value"' in async_value_body - - async_gen_body = (await _fetch(client, "/async-generator")).text - assert '"src":"async_generator"' in async_gen_body - assert '"idx":1' in async_gen_body and '"idx":2' in async_gen_body - finally: - server.should_exit = True - thread.join(timeout=2) diff --git a/tests/test_starlette_decorator_integration.py b/tests/test_starlette_decorator_integration.py deleted file mode 100644 index fccdb94..0000000 --- a/tests/test_starlette_decorator_integration.py +++ /dev/null @@ -1,127 +0,0 @@ -"""Integration test: datastar_response within a live Starlette app.""" - -from __future__ import annotations - -import threading -import time -from typing import Any - -import anyio -import httpx -import pytest -import uvicorn -from starlette.applications import Starlette -from starlette.responses import PlainTextResponse -from starlette.routing import Route - -from datastar_py.sse import ServerSentEventGenerator as SSE -from datastar_py.starlette import datastar_response - - -@pytest.fixture -def anyio_backend() -> str: - return "asyncio" - - -async def _fetch(client: httpx.AsyncClient, path: str) -> httpx.Response: - resp = await client.get(path, timeout=5.0) - resp.raise_for_status() - return resp - - -@pytest.mark.anyio -async def test_starlette_sync_handler_runs_in_threadpool_and_streams() -> None: - """Ensure all handler shapes work end-to-end and sync stays in threadpool.""" - - entered = threading.Event() - - @datastar_response - def sync_value(request) -> Any: # noqa: ANN001 - entered.set() - time.sleep(0.2) # should not block event loop - return SSE.patch_signals({"src": "sync_value"}) - - @datastar_response - def sync_gen(request) -> Any: # noqa: ANN001 - yield SSE.patch_signals({"src": "sync_generator", "idx": 1}) - yield SSE.patch_signals({"src": "sync_generator", "idx": 2}) - - @datastar_response - async def async_value(request) -> Any: # noqa: ANN001 - return SSE.patch_signals({"src": "async_value"}) - - @datastar_response - async def async_gen(request) -> Any: # noqa: ANN001 - yield SSE.patch_signals({"src": "async_generator", "idx": 1}) - yield SSE.patch_signals({"src": "async_generator", "idx": 2}) - - async def ping(request) -> PlainTextResponse: # noqa: ANN001 - return PlainTextResponse("pong") - - app = Starlette( - routes=[ - Route("/sync-value", sync_value), - Route("/sync-generator", sync_gen), - Route("/async-value", async_value), - Route("/async-generator", async_gen), - Route("/ping", ping), - ] - ) - - config = uvicorn.Config(app, host="127.0.0.1", port=0, log_level="warning", lifespan="off") - server = uvicorn.Server(config) - thread = threading.Thread(target=server.run, daemon=True) - thread.start() - - try: - for _ in range(50): - if server.started and getattr(server, "servers", None): - break - await anyio.sleep(0.05) - else: - pytest.fail("Server did not start") - - sock = server.servers[0].sockets[0] - host, port = sock.getsockname()[:2] - base_url = f"http://{host}:{port}" - - async with httpx.AsyncClient(base_url=base_url) as client: - # Verify blocking sync handler doesn't stall other requests - # Concurrency sanity: sync_value blocks 0.2s but should not stall ping - async with anyio.create_task_group() as tg: - slow_resp: httpx.Response | None = None - ping_resp: httpx.Response | None = None - - async def hit_slow(): - nonlocal slow_resp - slow_resp = await _fetch(client, "/sync-value") - - async def hit_ping(): - nonlocal ping_resp - await anyio.to_thread.run_sync(entered.wait, 1.0) - ping_resp = await _fetch(client, "/ping") - - tg.start_soon(hit_slow) - tg.start_soon(hit_ping) - - assert slow_resp is not None and slow_resp.status_code == 200 - assert ping_resp is not None and ping_resp.status_code == 200 - assert float(ping_resp.elapsed.total_seconds()) < 0.35 - - # Verify content of each endpoint - sync_value_body = (await _fetch(client, "/sync-value")).text - assert '"src":"sync_value"' in sync_value_body - - sync_gen_body = (await _fetch(client, "/sync-generator")).text - assert '"src":"sync_generator"' in sync_gen_body - assert '"idx":1' in sync_gen_body and '"idx":2' in sync_gen_body - - async_value_body = (await _fetch(client, "/async-value")).text - assert '"src":"async_value"' in async_value_body - - async_gen_body = (await _fetch(client, "/async-generator")).text - assert '"src":"async_generator"' in async_gen_body - assert '"idx":1' in async_gen_body and '"idx":2' in async_gen_body - finally: - server.should_exit = True - thread.join(timeout=2) From 2580f0248d66f77ebda0e87b1f171528f7cb7102 Mon Sep 17 00:00:00 2001 From: Mike Macpherson Date: Fri, 17 Apr 2026 10:10:46 -0700 Subject: [PATCH 7/7] fix pre-commit lint failures - add tests/** to ruff per-file-ignores for ANN, PLC0415, PLR2004 (mirrors existing examples/ treatment; these rules are noisy for test fixtures, conditional framework imports, and threshold values) - remove leftover unused Union import in sanic.py (rebase artifact) - apply ruff --fix and ruff format to test files - add `lint` target to Makefile Co-Authored-By: Claude Opus 4.7 (1M context) --- Makefile | 5 ++++- pyproject.toml | 1 + src/datastar_py/sanic.py | 2 +- tests/test_datastar_decorator_runtime.py | 5 ++--- tests/test_decorator_matrix.py | 9 ++++++--- 5 files changed, 14 insertions(+), 8 deletions(-) diff --git a/Makefile b/Makefile index 209a865..9e2a8d2 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,7 @@ -.PHONY: test +.PHONY: test lint test: uv run --dev pytest + +lint: + uv run --dev pre-commit run --all-files diff --git a/pyproject.toml b/pyproject.toml index c83c8c9..8344cdd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -90,5 +90,6 @@ lint.ignore = [ "E501", ] lint.per-file-ignores."examples/**/*.py" = [ "ANN", "DTZ005", "PLC0415" ] +lint.per-file-ignores."tests/**/*.py" = [ "ANN", "PLC0415", "PLR2004" ] lint.fixable = [ "ALL" ] lint.pylint.allow-magic-value-types = [ "int", "str" ] diff --git a/src/datastar_py/sanic.py b/src/datastar_py/sanic.py index fffd585..4786443 100644 --- a/src/datastar_py/sanic.py +++ b/src/datastar_py/sanic.py @@ -4,7 +4,7 @@ from contextlib import aclosing, closing from functools import wraps from inspect import isasyncgen, isawaitable, isgenerator -from typing import Any, ParamSpec, Union +from typing import Any, ParamSpec from sanic import HTTPResponse, Request diff --git a/tests/test_datastar_decorator_runtime.py b/tests/test_datastar_decorator_runtime.py index bd60146..f7f2038 100644 --- a/tests/test_datastar_decorator_runtime.py +++ b/tests/test_datastar_decorator_runtime.py @@ -35,18 +35,17 @@ async def _fetch( @pytest.mark.anyio("asyncio") async def test_sync_handler_runs_off_event_loop() -> None: """Sync routes should stay in the threadpool; otherwise they block the event loop.""" - entered = threading.Event() from datastar_py.starlette import datastar_response @datastar_response - def slow(request) -> Any: # noqa: ANN001 + def slow(request) -> Any: entered.set() time.sleep(1.0) # if run on the event loop, this blocks other requests return SSE.patch_signals({"slow": True}) - async def ping(request) -> PlainTextResponse: # noqa: ANN001 + async def ping(request) -> PlainTextResponse: return PlainTextResponse("pong") app = Starlette(routes=[Route("/slow", slow), Route("/ping", ping)]) diff --git a/tests/test_decorator_matrix.py b/tests/test_decorator_matrix.py index 3a9b234..00fd50f 100644 --- a/tests/test_decorator_matrix.py +++ b/tests/test_decorator_matrix.py @@ -4,13 +4,13 @@ import importlib import inspect -from typing import Any, Iterable +from collections.abc import Iterable +from typing import Any import pytest from datastar_py.sse import ServerSentEventGenerator as SSE - FRAMEWORKS = [ # name, module path, iterator attribute on response (None means use response directly) ("starlette", "datastar_py.starlette", "body_iterator"), @@ -63,7 +63,6 @@ async def test_datastar_response_matrix( framework_name: str, module_path: str, iterator_attr: str | None, variant: str ) -> None: """Ensure decorator works for sync/async and generator/non-generator functions.""" - if framework_name in {"quart", "sanic"}: pytest.skip(f"{framework_name} decorator requires full request context to exercise") if framework_name == "django": @@ -77,18 +76,22 @@ async def test_datastar_response_matrix( DatastarResponse = mod.DatastarResponse if variant == "sync_value": + @datastar_response def handler() -> Any: return SSE.patch_signals({"ok": True}) elif variant == "sync_generator": + @datastar_response def handler() -> Any: yield SSE.patch_signals({"ok": True}) elif variant == "async_value": + @datastar_response async def handler() -> Any: return SSE.patch_signals({"ok": True}) else: + @datastar_response async def handler() -> Any: yield SSE.patch_signals({"ok": True})