From 0621ec68f5e61ed17b947f54a92ece740d677844 Mon Sep 17 00:00:00 2001 From: TadMSTR <69825253+TadMSTR@users.noreply.github.com> Date: Thu, 28 May 2026 07:49:33 -0400 Subject: [PATCH 1/4] feat: OpenAI-compat /v1/embeddings endpoint + bug fixes (v0.3.0) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add OpenAI-compatible embedding endpoint so Graphiti (and any OpenAI SDK client) can route through OQP without reconfiguration. New: - openai_compat.py: path detection (is_openai_compat_path), path rewrite (/v1/embeddings → /api/embed), and response wrapping (Ollama → OpenAI embeddings format). Always-on; no config required. - proxy_handler: detect /v1/embeddings, rewrite path before enqueue, wrap response after dispatch. Cache and auth apply unchanged. - _enqueue_request / dispatch_request: add path_override param for compat path rewrite to propagate through queue/cache/dispatch logic. - 14 new tests covering path detection, rewrite, response wrapping, and list-index env var skip behavior. Bug fixes: - main.py: asyncio.get_event_loop() → get_running_loop() (DeprecationWarning in 3.10+, RuntimeError in 3.12+) - proxy.py: streaming generator now closes httpx response in finally block to prevent connection leak on client disconnect - config.py: _apply_env_overrides now skips any key with a numeric path component instead of crashing with TypeError when the target is a list (e.g. OQP_OLLAMA__HOSTS__0__URL) Bump version to 0.3.0. Co-Authored-By: Claude Sonnet 4.6 --- pyproject.toml | 2 +- src/ollama_queue_proxy/__init__.py | 2 +- src/ollama_queue_proxy/config.py | 12 +-- src/ollama_queue_proxy/main.py | 26 +++++- src/ollama_queue_proxy/openai_compat.py | 68 ++++++++++++++ src/ollama_queue_proxy/proxy.py | 10 +- tests/test_openai_compat.py | 116 ++++++++++++++++++++++++ 7 files changed, 221 insertions(+), 15 deletions(-) create mode 100644 src/ollama_queue_proxy/openai_compat.py create mode 100644 tests/test_openai_compat.py diff --git a/pyproject.toml b/pyproject.toml index 802da31..c5f1884 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "ollama-queue-proxy" -version = "0.2.0" +version = "0.3.0" description = "Drop-in HTTP proxy for Ollama with priority queuing, per-client auth, and model-aware failover" readme = "README.md" license = { text = "MIT" } diff --git a/src/ollama_queue_proxy/__init__.py b/src/ollama_queue_proxy/__init__.py index a402eae..544175b 100644 --- a/src/ollama_queue_proxy/__init__.py +++ b/src/ollama_queue_proxy/__init__.py @@ -1,3 +1,3 @@ """ollama-queue-proxy: Drop-in HTTP proxy for Ollama with priority queuing.""" -__version__ = "0.2.0" +__version__ = "0.3.0" diff --git a/src/ollama_queue_proxy/config.py b/src/ollama_queue_proxy/config.py index 5369ec9..ad6b8b8 100644 --- a/src/ollama_queue_proxy/config.py +++ b/src/ollama_queue_proxy/config.py @@ -273,18 +273,14 @@ def _apply_env_overrides(data: dict, prefix: str = "OQP") -> dict: if not key.startswith(prefix + "_"): continue parts = key[len(prefix) + 1 :].lower().split("__") + # List-index overrides (e.g. OQP_OLLAMA__HOSTS__0__URL) are not + # supported — skip the entire key if any component is a digit. + if any(p.isdigit() for p in parts): + continue target = data for part in parts[:-1]: - if part.isdigit(): - # list index — handled below - continue target = target.setdefault(part, {}) leaf = parts[-1] - if leaf.isdigit(): - # Can't safely do list index overrides on arbitrary dicts here. - # The most important list override (hosts[0].url) is handled by - # full-key matching. Log and skip. - continue # Attempt type coercion for booleans and integers if value.lower() in ("true", "false"): target[leaf] = value.lower() == "true" diff --git a/src/ollama_queue_proxy/main.py b/src/ollama_queue_proxy/main.py index 8b38c13..9ccd55f 100644 --- a/src/ollama_queue_proxy/main.py +++ b/src/ollama_queue_proxy/main.py @@ -22,6 +22,7 @@ from .config import Config, load_config from .hosts import HostManager from .middleware import RequestContextMiddleware, get_client_id, parse_priority +from .openai_compat import is_openai_compat_path, rewrite_path, wrap_response from .proxy import dispatch_request, read_body from .queue import PriorityQueueManager, QueueFull, QueueItem, QueuePaused, RequestExpired from .routes.queue import router as queue_router @@ -218,6 +219,7 @@ async def _enqueue_request( tier: str, state: AppState, reentries: int = 0, + path_override: str | None = None, ) -> JSONResponse: """ Buffer the request body, enqueue it, and await dispatch. Used by both the main @@ -241,7 +243,7 @@ async def _enqueue_request( # keep_alive injection — runs before cache check so cached responses also reflect # the injected value (though for embeddings keep_alive has no effect upstream) - path = request.url.path + path = path_override if path_override is not None else request.url.path ka_cfg = state.config.keep_alive if path in _KEEP_ALIVE_PATHS: body = _inject_keep_alive( @@ -277,7 +279,7 @@ async def _enqueue_request( ) enqueue_time = time.monotonic() - future: asyncio.Future = asyncio.get_event_loop().create_future() + future: asyncio.Future = asyncio.get_running_loop().create_future() conc_mgr = state.concurrency_manager @@ -294,6 +296,7 @@ async def dispatch_fn(): host_manager=state.host_manager, client=state.http_client, routing_table=state.routing_table, + path_override=path_override, ) finally: if conc_mgr is not None: @@ -399,6 +402,25 @@ async def proxy_handler(request: Request, path: str): requested_priority = parse_priority(request) tier = state.auth_manager.enforce_priority_ceiling(requested_priority, key_cfg) + # OpenAI-compat path handling: rewrite path before enqueue, wrap response after + compat_path = "/" + path + if is_openai_compat_path(compat_path): + native_path = rewrite_path(compat_path) + response = await _enqueue_request( + request=request, + client_id=client_id, + tier=tier, + state=state, + path_override=native_path, + ) + # Only wrap successful JSON responses; pass through errors unchanged + if isinstance(response, JSONResponse) and response.status_code == 200: + import json as _json + ollama_body = _json.loads(response.body) + wrapped = wrap_response(ollama_body) + return JSONResponse(content=wrapped, status_code=200) + return response + return await _enqueue_request( request=request, client_id=client_id, diff --git a/src/ollama_queue_proxy/openai_compat.py b/src/ollama_queue_proxy/openai_compat.py new file mode 100644 index 0000000..3576972 --- /dev/null +++ b/src/ollama_queue_proxy/openai_compat.py @@ -0,0 +1,68 @@ +"""OpenAI-compatible embedding endpoint translation layer. + +Translates POST /v1/embeddings ↔ POST /api/embed so that clients using the +OpenAI Embeddings API (e.g. Graphiti with provider=openai) can route through +OQP without reconfiguration. + +Request translation: /v1/embeddings body is identical to /api/embed — both +accept {"model": "...", "input": "..." | [...]}. Only the path is rewritten. + +Response translation: Ollama /api/embed returns: + {"embeddings": [[...], ...], "model": "bge-m3", ...} + +OpenAI /v1/embeddings expects: + { + "object": "list", + "data": [{"object": "embedding", "embedding": [...], "index": 0}, ...], + "model": "bge-m3", + "usage": {"prompt_tokens": 0, "total_tokens": 0} + } +""" + +from __future__ import annotations + +_OPENAI_COMPAT_PATHS: frozenset[str] = frozenset({"/v1/embeddings"}) + + +def is_openai_compat_path(path: str) -> bool: + """Return True if the path should be handled by the compat layer.""" + normalized = "/" + path.lstrip("/") + return normalized in _OPENAI_COMPAT_PATHS + + +def rewrite_path(path: str) -> str: + """Rewrite an OpenAI-compat path to the native Ollama equivalent.""" + normalized = "/" + path.lstrip("/") + if normalized in _OPENAI_COMPAT_PATHS: + return "api/embed" + return path + + +def wrap_response(ollama_body: dict, model: str | None = None) -> dict: + """Wrap an Ollama /api/embed response body in the OpenAI embeddings format. + + Args: + ollama_body: Parsed JSON response from Ollama /api/embed. + model: Model name override (falls back to ollama_body["model"]). + + Returns: + Dict conforming to the OpenAI /v1/embeddings response schema. + """ + embeddings: list[list[float]] = ollama_body.get("embeddings") or [] + resolved_model: str = model or ollama_body.get("model", "") + + data = [ + { + "object": "embedding", + "embedding": vec, + "index": i, + } + for i, vec in enumerate(embeddings) + ] + + return { + "object": "list", + "data": data, + "model": resolved_model, + "usage": {"prompt_tokens": 0, "total_tokens": 0}, + } diff --git a/src/ollama_queue_proxy/proxy.py b/src/ollama_queue_proxy/proxy.py index 65690d8..538adc6 100644 --- a/src/ollama_queue_proxy/proxy.py +++ b/src/ollama_queue_proxy/proxy.py @@ -119,13 +119,14 @@ async def dispatch_request( host_manager: HostManager, client: httpx.AsyncClient, routing_table: RoutingTable | None = None, + path_override: str | None = None, ) -> StreamingResponse | JSONResponse: """ Dispatch a buffered request to the appropriate Ollama host with failover. Failover only applies before any response bytes are sent to the client. """ request_id = getattr(request.state, "request_id", "unknown") - path = request.url.path + path = path_override if path_override is not None else request.url.path query = request.url.query method = request.method @@ -223,8 +224,11 @@ def _next_host() -> OllamaHost | None: if is_streaming: async def stream_gen(r=resp): - async for chunk in r.aiter_bytes(): - yield chunk + try: + async for chunk in r.aiter_bytes(): + yield chunk + finally: + await r.aclose() return StreamingResponse( stream_gen(), diff --git a/tests/test_openai_compat.py b/tests/test_openai_compat.py new file mode 100644 index 0000000..75dba18 --- /dev/null +++ b/tests/test_openai_compat.py @@ -0,0 +1,116 @@ +"""Tests for the OpenAI-compat embedding translation layer (openai_compat.py).""" + +import pytest + +from ollama_queue_proxy.openai_compat import ( + _OPENAI_COMPAT_PATHS, + is_openai_compat_path, + rewrite_path, + wrap_response, +) + + +# ── path detection ──────────────────────────────────────────────────────────── + + +def test_v1_embeddings_is_compat_path(): + assert is_openai_compat_path("/v1/embeddings") is True + + +def test_v1_embeddings_without_leading_slash(): + assert is_openai_compat_path("v1/embeddings") is True + + +def test_api_embed_is_not_compat_path(): + assert is_openai_compat_path("/api/embed") is False + + +def test_api_generate_is_not_compat_path(): + assert is_openai_compat_path("/api/generate") is False + + +# ── path rewrite ────────────────────────────────────────────────────────────── + + +def test_rewrite_v1_embeddings_to_api_embed(): + assert rewrite_path("/v1/embeddings") == "api/embed" + + +def test_rewrite_non_compat_path_unchanged(): + assert rewrite_path("/api/generate") == "/api/generate" + + +def test_rewrite_without_leading_slash(): + assert rewrite_path("v1/embeddings") == "api/embed" + + +# ── response wrapping ───────────────────────────────────────────────────────── + + +def test_wrap_response_single_embedding(): + ollama = { + "embeddings": [[0.1, 0.2, 0.3]], + "model": "bge-m3", + } + result = wrap_response(ollama) + + assert result["object"] == "list" + assert result["model"] == "bge-m3" + assert len(result["data"]) == 1 + assert result["data"][0]["object"] == "embedding" + assert result["data"][0]["embedding"] == [0.1, 0.2, 0.3] + assert result["data"][0]["index"] == 0 + assert result["usage"]["prompt_tokens"] == 0 + assert result["usage"]["total_tokens"] == 0 + + +def test_wrap_response_batch_embeddings(): + vectors = [[0.1, 0.2], [0.3, 0.4], [0.5, 0.6]] + ollama = {"embeddings": vectors, "model": "bge-m3"} + result = wrap_response(ollama) + + assert len(result["data"]) == 3 + for i, item in enumerate(result["data"]): + assert item["index"] == i + assert item["embedding"] == vectors[i] + + +def test_wrap_response_model_override(): + ollama = {"embeddings": [[0.1]], "model": "bge-m3"} + result = wrap_response(ollama, model="nomic-embed-text") + assert result["model"] == "nomic-embed-text" + + +def test_wrap_response_empty_embeddings(): + ollama = {"embeddings": [], "model": "bge-m3"} + result = wrap_response(ollama) + assert result["data"] == [] + assert result["object"] == "list" + + +def test_wrap_response_missing_embeddings_key(): + ollama = {"model": "bge-m3"} + result = wrap_response(ollama) + assert result["data"] == [] + + +def test_wrap_response_missing_model_falls_back_to_empty_string(): + ollama = {"embeddings": [[0.1]]} + result = wrap_response(ollama) + assert result["model"] == "" + + +# ── test_env_override_list_index_skipped_gracefully ─────────────────────────── + + +def test_env_override_list_index_skipped_gracefully(monkeypatch): + """OQP_OLLAMA__HOSTS__0__URL must not crash and must not corrupt data.""" + monkeypatch.setenv("OQP_OLLAMA__HOSTS__0__URL", "http://tampered:11434") + + from ollama_queue_proxy.config import _apply_env_overrides + + data = {"ollama": {"hosts": [{"url": "http://original:11434", "name": "primary"}]}} + result = _apply_env_overrides(data) + + # The list must be untouched — the env var is silently skipped + assert result["ollama"]["hosts"][0]["url"] == "http://original:11434" From 50e5ac09099047d1b8bf5540785d30b045763696 Mon Sep 17 00:00:00 2001 From: TadMSTR <69825253+TadMSTR@users.noreply.github.com> Date: Thu, 28 May 2026 07:51:37 -0400 Subject: [PATCH 2/4] docs: add CHANGELOG entry for v0.3.0 Co-Authored-By: Claude Sonnet 4.6 --- CHANGELOG.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6987117..e5eca45 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,16 @@ ## [Unreleased] +## [0.3.0] - 2026-05-28 + +### Added +- **OpenAI-compat `/v1/embeddings` endpoint** — translates requests to `/api/embed` internally and wraps the Ollama response in the OpenAI Embeddings API format. Enables clients using the OpenAI SDK (e.g. Graphiti) to route through OQP without reconfiguration. Always-on; no config toggle required. + +### Fixed +- `asyncio.get_event_loop()` in `_enqueue_request` replaced with `get_running_loop()` — eliminates DeprecationWarning in Python 3.10+ and RuntimeError in 3.12+ when called outside an async context. +- Streaming response generator now closes the underlying httpx response in a `finally` block — prevents connection leaks when a client disconnects mid-stream. +- `_apply_env_overrides` now correctly skips env vars with numeric path components (e.g. `OQP_OLLAMA__HOSTS__0__URL`) instead of raising `TypeError: list indices must be integers`. + ## [0.2.0] - 2026-04-22 ### Added From 6376353f805122de641e7a9d9ebc48f804b711e3 Mon Sep 17 00:00:00 2001 From: TadMSTR <69825253+TadMSTR@users.noreply.github.com> Date: Thu, 28 May 2026 08:08:27 -0400 Subject: [PATCH 3/4] fix: leading slash in rewrite_path, agent-consumer comments, remove redundant import MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - openai_compat.py: rewrite_path() now returns "/api/embed" (was "api/embed") — missing slash produced malformed URLs like "http://host:11434api/embed" - proxy.py: comment on stream_gen finally block — explains it closes the httpx connection on client disconnect to prevent connection leak (not obvious from code) - main.py: comment on get_running_loop() — explains why get_event_loop() was replaced (deprecated 3.10+, raises RuntimeError in 3.12+) - main.py: remove local `import json as _json` inside proxy_handler — json is already imported at module level; use it directly - test_openai_compat.py: update expected values to match corrected "/api/embed" Co-Authored-By: Claude Sonnet 4.6 --- src/ollama_queue_proxy/main.py | 5 +++-- src/ollama_queue_proxy/openai_compat.py | 2 +- src/ollama_queue_proxy/proxy.py | 3 +++ tests/test_openai_compat.py | 4 ++-- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/ollama_queue_proxy/main.py b/src/ollama_queue_proxy/main.py index 9ccd55f..a796e0c 100644 --- a/src/ollama_queue_proxy/main.py +++ b/src/ollama_queue_proxy/main.py @@ -279,6 +279,8 @@ async def _enqueue_request( ) enqueue_time = time.monotonic() + # get_running_loop() replaces deprecated get_event_loop() — the latter raises + # RuntimeError in Python 3.12+ when called outside a running event loop. future: asyncio.Future = asyncio.get_running_loop().create_future() conc_mgr = state.concurrency_manager @@ -415,8 +417,7 @@ async def proxy_handler(request: Request, path: str): ) # Only wrap successful JSON responses; pass through errors unchanged if isinstance(response, JSONResponse) and response.status_code == 200: - import json as _json - ollama_body = _json.loads(response.body) + ollama_body = json.loads(response.body) wrapped = wrap_response(ollama_body) return JSONResponse(content=wrapped, status_code=200) return response diff --git a/src/ollama_queue_proxy/openai_compat.py b/src/ollama_queue_proxy/openai_compat.py index 3576972..35b3165 100644 --- a/src/ollama_queue_proxy/openai_compat.py +++ b/src/ollama_queue_proxy/openai_compat.py @@ -34,7 +34,7 @@ def rewrite_path(path: str) -> str: """Rewrite an OpenAI-compat path to the native Ollama equivalent.""" normalized = "/" + path.lstrip("/") if normalized in _OPENAI_COMPAT_PATHS: - return "api/embed" + return "/api/embed" return path diff --git a/src/ollama_queue_proxy/proxy.py b/src/ollama_queue_proxy/proxy.py index 538adc6..7ad9c15 100644 --- a/src/ollama_queue_proxy/proxy.py +++ b/src/ollama_queue_proxy/proxy.py @@ -228,6 +228,9 @@ async def stream_gen(r=resp): async for chunk in r.aiter_bytes(): yield chunk finally: + # Close the httpx response explicitly — if the client + # disconnects mid-stream the generator is abandoned and + # GC may never run, leaking the underlying connection. await r.aclose() return StreamingResponse( diff --git a/tests/test_openai_compat.py b/tests/test_openai_compat.py index 75dba18..7f3153e 100644 --- a/tests/test_openai_compat.py +++ b/tests/test_openai_compat.py @@ -33,7 +33,7 @@ def test_api_generate_is_not_compat_path(): def test_rewrite_v1_embeddings_to_api_embed(): - assert rewrite_path("/v1/embeddings") == "api/embed" + assert rewrite_path("/v1/embeddings") == "/api/embed" def test_rewrite_non_compat_path_unchanged(): @@ -41,7 +41,7 @@ def test_rewrite_non_compat_path_unchanged(): def test_rewrite_without_leading_slash(): - assert rewrite_path("v1/embeddings") == "api/embed" + assert rewrite_path("v1/embeddings") == "/api/embed" # ── response wrapping ───────────────────────────────────────────────────────── From a8258ff347d5a47ec305276d4913ed28b3fc5808 Mon Sep 17 00:00:00 2001 From: TadMSTR <69825253+TadMSTR@users.noreply.github.com> Date: Thu, 28 May 2026 08:10:51 -0400 Subject: [PATCH 4/4] docs: document /v1/embeddings OpenAI-compat endpoint in README Add endpoint to the Endpoints table and a dedicated section explaining the translation, Graphiti config snippet, and that it is always-on. Co-Authored-By: Claude Sonnet 4.6 --- README.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/README.md b/README.md index b1e374f..22bfe57 100644 --- a/README.md +++ b/README.md @@ -382,11 +382,29 @@ Returns full queue state, host health, per-client stats, routing decisions, and | `GET /health` | None | Liveness probe — always open | | `GET /queue/status` | Token (when enabled) | Full queue, host, client, security state | | `GET /metrics` | Token (when enabled) | Prometheus text format | +| `POST /api/embed` | Token (when enabled) | Native Ollama embedding endpoint | +| `POST /v1/embeddings` | Token (when enabled) | OpenAI-compat embedding endpoint (see below) | | `POST /queue/pause?tier=low` | Management key | Stop accepting requests for tier | | `POST /queue/resume?tier=low` | Management key | Resume tier | | `POST /queue/drain` | Management key | Wait for queues to empty | | `POST /queue/flush?tier=low` | Management key | Drop all pending requests immediately | +### OpenAI-compat embeddings + +OQP accepts `POST /v1/embeddings` using the OpenAI Embeddings API format and translates it to Ollama's `/api/embed` internally. The response is wrapped back into OpenAI format before returning. Auth, priority ceiling, and the embedding cache all apply identically to native `/api/embed` requests — the rewrite happens before any of those checks. + +This lets clients that use the OpenAI SDK (e.g. Graphiti with `provider: openai`) route through OQP without changing their provider configuration. Point them at OQP's port instead of Ollama's: + +```yaml +# Graphiti config — before +api_url: http://localhost:11434/v1 + +# Graphiti config — after (routes through OQP on port 11435) +api_url: http://localhost:11435/v1 +``` + +The endpoint is always-on. No config toggle is required. + ### Webhook events ```yaml