From 86cec7f97b9ab852d28914d6caefdfbfe6c19525 Mon Sep 17 00:00:00 2001 From: SimonTaurus Date: Thu, 7 May 2026 03:35:07 +0200 Subject: [PATCH] feat: route Prefect API through MW ApiGateway with httpx transport - Add ApiGatewayTransport that rewrites Prefect URLs to ?path= format and injects MW session cookies + CSRF tokens - Monkey-patch httpx.AsyncClient in parent process for seamless routing - Add .pth startup hook for Prefect subprocesses (lazy MW login) - Auto-install hook into Lib/site-packages when ApiGateway URL detected - Fix event loop binding by creating fresh transport per request - Remove debug print statements, use logging.debug instead --- examples/prefect/hello_world.py | 52 ++++--- src/osw/utils/_httpx_gateway.py | 61 ++++++++ src/osw/utils/workflow.py | 259 ++++++++++++++++++++++++++++++-- 3 files changed, 341 insertions(+), 31 deletions(-) create mode 100644 src/osw/utils/_httpx_gateway.py diff --git a/examples/prefect/hello_world.py b/examples/prefect/hello_world.py index 2328c38..689bfa7 100644 --- a/examples/prefect/hello_world.py +++ b/examples/prefect/hello_world.py @@ -1,26 +1,44 @@ """Dummy Prefect workflow that prints a message on an OSW page. -Usage (PowerShell): - $env:PREFECT_API_URL="https://osw.example.com:4200/api" - $env:PREFECT_PUBLIC_URL="https://osw.example.com/w/rest.php/apigateway/v1/prefect" - $env:OSW_USER="Bot@name" - $env:OSW_SERVER="osw.example.com" - $env:OSW_PASSWORD="bot-password" - python examples/prefect/hello_world.py - -Usage (Bash): - export PREFECT_API_URL="https://osw.example.com:4200/api" - export PREFECT_PUBLIC_URL="https://osw.example.com/w/rest.php/apigateway/v1/prefect" - export OSW_USER="Bot@name" - export OSW_SERVER="osw.example.com" - export OSW_PASSWORD="bot-password" - python examples/prefect/hello_world.py +Option A — Direct Prefect access (worker can reach Prefect server directly): + + PowerShell: + $env:PREFECT_API_URL="https://osw.example.com:4200/api" + $env:PREFECT_PUBLIC_URL="https://osw.example.com/w/rest.php/apigateway/v1/prefect" + $env:OSW_USER="Bot@name"; $env:OSW_SERVER="osw.example.com" + $env:OSW_PASSWORD="bot-password" + python examples/prefect/hello_world.py + + Bash: + export PREFECT_API_URL="https://osw.example.com:4200/api" + export PREFECT_PUBLIC_URL="https://osw.example.com/w/rest.php/apigateway/v1/prefect" + export OSW_USER="Bot@name" OSW_SERVER="osw.example.com" + export OSW_PASSWORD="bot-password" + python examples/prefect/hello_world.py + +Option B — ApiGateway only (Prefect server behind firewall, only reachable + through MediaWiki ApiGateway extension): + + PowerShell: + $env:PREFECT_API_URL="https://osw.example.com/w/rest.php/apigateway/v1/prefect" + $env:OSW_USER="Bot@name"; $env:OSW_SERVER="osw.example.com" + $env:OSW_PASSWORD="bot-password" + python examples/prefect/hello_world.py + + Bash: + export PREFECT_API_URL="https://osw.example.com/w/rest.php/apigateway/v1/prefect" + export OSW_USER="Bot@name" OSW_SERVER="osw.example.com" + export OSW_PASSWORD="bot-password" + python examples/prefect/hello_world.py Environment variables: - PREFECT_API_URL Prefect server API URL (used by the worker) + PREFECT_API_URL Prefect server API URL (used by the worker). + Can be a direct URL or an ApiGateway URL. PREFECT_PUBLIC_URL (optional) Public URL stored in PrefectFlow entity, - for use by browser clients (e.g. prefect.js). + for browser clients (e.g. prefect.js). Falls back to PREFECT_API_URL if not set. + Only needed when PREFECT_API_URL differs from the + public gateway URL (Option A). OSW_USER OSW bot username OSW_SERVER OSW instance domain OSW_PASSWORD OSW bot password. If not set, falls back to a diff --git a/src/osw/utils/_httpx_gateway.py b/src/osw/utils/_httpx_gateway.py new file mode 100644 index 0000000..6cdd649 --- /dev/null +++ b/src/osw/utils/_httpx_gateway.py @@ -0,0 +1,61 @@ +"""Auto-patches httpx for ApiGateway routing in Prefect subprocesses. + +Activated by osw-httpx-gateway.pth in site-packages. Only patches if +PREFECT_API_URL contains an ApiGateway URL pattern. Login is lazy — +no network calls at import time. All ``osw`` imports are deferred to +first request so the hook works even before editable installs are on +sys.path. +""" + +import os + + +def _install(): + api_url = os.environ.get("PREFECT_API_URL", "") + if "/rest.php/apigateway/" not in api_url: + return + + if not os.environ.get("OSW_SERVER"): + return + + try: + import httpx + except ImportError: + return + + class _LazyApiGatewayTransport(httpx.AsyncBaseTransport): + """Wraps ApiGatewayTransport with lazy MW login on first request.""" + + def __init__(self, gateway_url): + self._gateway_url = gateway_url + self._inner = None + + def _ensure_initialized(self): + if self._inner is not None: + return + # Deferred import — osw may not be on sys.path at .pth time + from osw.utils.workflow import ApiGatewayTransport, connect + + osw_instance = connect() + self._inner = ApiGatewayTransport( + gateway_url=self._gateway_url, + mw_site=osw_instance.site.mw_site, + ) + + async def handle_async_request(self, request): + self._ensure_initialized() + return await self._inner.handle_async_request(request) + + _transport = _LazyApiGatewayTransport(api_url) + _original_init = httpx.AsyncClient.__init__ + + def _patched_init(self, *args, **kwargs): + base = str(kwargs.get("base_url", "")) + if "/rest.php/apigateway/" in base and "transport" not in kwargs: + kwargs["transport"] = _transport + _original_init(self, *args, **kwargs) + + httpx.AsyncClient.__init__ = _patched_init + + +_install() diff --git a/src/osw/utils/workflow.py b/src/osw/utils/workflow.py index 4a0e81b..b628599 100644 --- a/src/osw/utils/workflow.py +++ b/src/osw/utils/workflow.py @@ -1,17 +1,17 @@ """Prefect utils as support for OpenSemanticWorld.""" import asyncio +import os import re import sys import uuid as uuid_module from datetime import timedelta -from importlib.metadata import version from inspect import signature from os import environ from typing import Any, Dict, Iterable, List, Optional, Union from uuid import UUID -from packaging.specifiers import SpecifierSet +import httpx from prefect import Flow, get_client, serve from prefect.blocks.notifications import MicrosoftTeamsWebhook from prefect.blocks.system import Secret @@ -102,6 +102,201 @@ class Config: arbitrary_types_allowed = True +# ------------------------------ API GATEWAY --------------------- +class ApiGatewayTransport(httpx.AsyncBaseTransport): + """httpx transport that routes Prefect API calls through MW ApiGateway. + + Rewrites URLs from Prefect format to ApiGateway ``?path=`` format + and injects MediaWiki session cookies + CSRF tokens. + """ + + def __init__(self, gateway_url: str, mw_site, csrf_required: bool = True): + """ + Parameters + ---------- + gateway_url + ApiGateway endpoint URL, e.g. + ``https://osw.example.com/w/rest.php/apigateway/v1/prefect`` + mw_site + Authenticated mwclient Site instance. + csrf_required + Whether to send MW CSRF token for write methods. + """ + self._gateway_url = gateway_url.rstrip("/") + self._mw_site = mw_site + self._csrf_token = None + self._csrf_required = csrf_required + + def _get_csrf_token(self) -> str: + if self._csrf_token is None: + self._csrf_token = self._mw_site.get_token("csrf") + return self._csrf_token + + def _get_cookies(self) -> dict: + return dict(self._mw_site.connection.cookies) + + def _rewrite_request(self, request: httpx.Request) -> httpx.Request: + from urllib.parse import quote, urlparse + + parsed = urlparse(str(request.url)) + gateway_parsed = urlparse(self._gateway_url) + + # Extract subpath: everything after the gateway path prefix + # preserve trailing slash (Prefect/FastAPI requires it) + subpath = parsed.path[len(gateway_parsed.path) :].lstrip("/") + + # Build query: path= + # Original query params go into ApiGateway's "query" parameter + # (ApiGateway only forwards path, query, token, headers to backend) + query_parts = [f"path={quote(subpath, safe='/')}"] + if parsed.query: + query_parts.append(f"query={quote(parsed.query, safe='')}") + + method = request.method.upper() + if method in ("POST", "PUT", "PATCH", "DELETE") and self._csrf_required: + query_parts.append(f"token={quote(self._get_csrf_token(), safe='')}") + + new_url = f"{self._gateway_url}?{'&'.join(query_parts)}" + + headers = dict(request.headers) + cookies = self._get_cookies() + cookie_str = "; ".join(f"{k}={v}" for k, v in cookies.items()) + if cookie_str: + headers["cookie"] = cookie_str + # MW REST API requires Origin + CSRF token for POST requests + origin = f"{gateway_parsed.scheme}://{gateway_parsed.netloc}" + headers["origin"] = origin + if method in ("POST", "PUT", "PATCH", "DELETE"): + headers["x-csrf-token"] = self._get_csrf_token() + + return httpx.Request( + method=method, + url=new_url, + headers=headers, + content=request.content, + ) + + async def handle_async_request(self, request: httpx.Request) -> httpx.Response: + import logging + + log = logging.getLogger(__name__) + log.debug("ApiGateway: %s %s", request.method, request.url) + rewritten = self._rewrite_request(request) + log.debug("ApiGateway rewritten: %s %s", rewritten.method, rewritten.url) + # Create a fresh transport per request to avoid event-loop binding + inner = httpx.AsyncHTTPTransport() + response = await inner.handle_async_request(rewritten) + log.debug("ApiGateway response: %s", response.status_code) + # Follow redirects by rewriting internal Location URLs back + # through the gateway (backend may return internal Docker URLs) + if response.status_code in (301, 302, 307, 308): + location = response.headers.get("location") + if location: + from urllib.parse import quote + from urllib.parse import urlparse as _urlparse + + loc_parsed = _urlparse(location) + api_idx = loc_parsed.path.find("/api/") + if api_idx >= 0: + redirect_subpath = loc_parsed.path[api_idx + 5 :] + else: + redirect_subpath = loc_parsed.path.lstrip("/") + redirect_subpath = redirect_subpath.lstrip("/") + redirect_query = [f"path={redirect_subpath}"] + if loc_parsed.query: + redirect_query.append(f"query={quote(loc_parsed.query, safe='')}") + redirect_url = f"{self._gateway_url}?{'&'.join(redirect_query)}" + log.debug("ApiGateway following redirect: %s", redirect_url) + redirect_req = httpx.Request( + method=( + rewritten.method + if response.status_code in (307, 308) + else "GET" + ), + url=redirect_url, + headers=rewritten.headers, + content=( + rewritten.content + if response.status_code in (307, 308) + else None + ), + ) + response = await httpx.AsyncHTTPTransport().handle_async_request( + redirect_req + ) + # Refresh CSRF token and retry once on 403 + if response.status_code == 403: + self._csrf_token = None + rewritten = self._rewrite_request(request) + response = await httpx.AsyncHTTPTransport().handle_async_request(rewritten) + return response + + +def get_gateway_httpx_settings(gateway_url: str, osw_instance: OSW) -> dict: + """Create httpx_settings for routing Prefect SDK through ApiGateway. + + Returns a dict suitable for ``get_client(httpx_settings=...)``. + + Parameters + ---------- + gateway_url + ApiGateway endpoint URL. + osw_instance + A connected OSW instance (provides mwclient session). + """ + transport = ApiGatewayTransport( + gateway_url=gateway_url, + mw_site=osw_instance.site.mw_site, + csrf_required=False, + ) + return {"transport": transport, "base_url": gateway_url} + + +def _is_apigateway_url(url: str) -> bool: + return "/rest.php/apigateway/" in url + + +_PTH_FILENAME = "osw-httpx-gateway.pth" +_PTH_MODULE = "_osw_httpx_gateway.py" +_PTH_CONTENT = "import _osw_httpx_gateway\n" + + +def _get_site_packages() -> str: + """Return the Lib/site-packages directory (not the venv root).""" + import site + + for sp in site.getsitepackages(): + if sp.endswith("site-packages"): + return sp + return site.getsitepackages()[-1] + + +def install_gateway_hook(): + """Install the .pth startup hook into site-packages. + + Copies ``_httpx_gateway.py`` as a standalone module (no ``osw.`` prefix) + so it's importable before editable installs are on ``sys.path``. + """ + import shutil + + sp = _get_site_packages() + src = os.path.join(os.path.dirname(__file__), "_httpx_gateway.py") + shutil.copy2(src, os.path.join(sp, _PTH_MODULE)) + with open(os.path.join(sp, _PTH_FILENAME), "w") as f: + f.write(_PTH_CONTENT) + print(f"Installed gateway hook in {sp}") + + +def uninstall_gateway_hook(): + """Remove the .pth startup hook and module from site-packages.""" + sp = _get_site_packages() + for name in (_PTH_FILENAME, _PTH_MODULE): + target = os.path.join(sp, name) + if os.path.exists(target): + os.remove(target) + print(f"Removed {target}") + + # ------------------------------ NOTIFICATIONS --------------------- class NotifyTeamsParam(BaseModel): """Parameter set for notifying Microsoft Teams using class NotifyTeams""" @@ -283,7 +478,11 @@ async def register_flow( ) # fetch flow uuid from Prefect API - async with get_client() as client: + gateway_url = public_url or environ.get("PREFECT_API_URL", "") + httpx_kw = {} + if _is_apigateway_url(gateway_url): + httpx_kw = get_gateway_httpx_settings(gateway_url, osw_instance) + async with get_client(httpx_settings=httpx_kw or None) as client: response = await client.read_flow_by_name(flow.name) flow_uuid = response.id @@ -411,7 +610,11 @@ async def register_flow( f"{snippet}\n" "\n" ) - print(f"DEBUG: Writing usage template to '{software_title}':\n{snippet}") + import logging + + logging.getLogger(__name__).debug( + "Writing usage template to '%s':\n%s", software_title, snippet + ) page = osw_instance.site.get_page( WtSite.GetPageParam(titles=[software_title]) @@ -425,6 +628,34 @@ async def _deploy(param: DeployParam): This should become part of osw-python """ + _original_api_url = None + _original_httpx_init = None + gateway_url = param.public_url or environ.get("PREFECT_API_URL", "") + if _is_apigateway_url(gateway_url) and param.osw is not None: + _original_api_url = environ.get("PREFECT_API_URL") + environ["PREFECT_API_URL"] = gateway_url + _gw_transport = ApiGatewayTransport( + gateway_url=gateway_url, + mw_site=param.osw.site.mw_site, + ) + # Patch httpx.AsyncClient to auto-inject our transport when + # the base_url is an ApiGateway URL. One patch covers ALL + # Prefect client instances regardless of how they're created. + _original_httpx_init = httpx.AsyncClient.__init__ + + def _patched_httpx_init(self, *args, **kwargs): + base = str(kwargs.get("base_url", "")) + if _is_apigateway_url(base) and "transport" not in kwargs: + kwargs["transport"] = _gw_transport + _original_httpx_init(self, *args, **kwargs) + + httpx.AsyncClient.__init__ = _patched_httpx_init + + # Auto-install .pth hook so Prefect subprocesses also get patched + _pth_target = os.path.join(_get_site_packages(), _PTH_FILENAME) + if not os.path.exists(_pth_target): + install_gateway_hook() + deployments = [] for deploy_config in param.deployments: @@ -450,25 +681,25 @@ async def _deploy(param: DeployParam): public_url=param.public_url, ) - if version("prefect") in SpecifierSet(">=3.0"): - print(f"prefect version IF: {version('prefect')}") - # return deployments - await serve(*deployments) - else: - print(f"prefect version ELSE: {version('prefect')}") + try: await serve(*deployments) + finally: + # Restore patched httpx.AsyncClient.__init__ + if _original_httpx_init is not None: + httpx.AsyncClient.__init__ = _original_httpx_init + # Restore original PREFECT_API_URL if we overrode it + if _original_api_url is not None: + environ["PREFECT_API_URL"] = _original_api_url + elif _is_apigateway_url(gateway_url): + environ.pop("PREFECT_API_URL", None) def deploy(param: DeployParam): """Function to serve configured flows as deployments by python version.""" if sys.version_info >= (3, 11): - print(f"python version IF: {sys.version_info}") - # python >= 3.11 with asyncio.Runner() as runner: runner.run(_deploy(param)) else: - # python < 3.11 - print(f"python version ELSE: {sys.version_info}") asyncio.run(_deploy(param))