diff --git a/CHANGELOG.md b/CHANGELOG.md index 6987117..e5eca45 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,16 @@ ## [Unreleased] +## [0.3.0] - 2026-05-28 + +### Added +- **OpenAI-compat `/v1/embeddings` endpoint** — translates requests to `/api/embed` internally and wraps the Ollama response in the OpenAI Embeddings API format. Enables clients using the OpenAI SDK (e.g. Graphiti) to route through OQP without reconfiguration. Always-on; no config toggle required. + +### Fixed +- `asyncio.get_event_loop()` in `_enqueue_request` replaced with `get_running_loop()` — eliminates DeprecationWarning in Python 3.10+ and RuntimeError in 3.12+ when called outside an async context. +- Streaming response generator now closes the underlying httpx response in a `finally` block — prevents connection leaks when a client disconnects mid-stream. +- `_apply_env_overrides` now correctly skips env vars with numeric path components (e.g. `OQP_OLLAMA__HOSTS__0__URL`) instead of raising `TypeError: list indices must be integers`. + ## [0.2.0] - 2026-04-22 ### Added diff --git a/README.md b/README.md index b1e374f..22bfe57 100644 --- a/README.md +++ b/README.md @@ -382,11 +382,29 @@ Returns full queue state, host health, per-client stats, routing decisions, and | `GET /health` | None | Liveness probe — always open | | `GET /queue/status` | Token (when enabled) | Full queue, host, client, security state | | `GET /metrics` | Token (when enabled) | Prometheus text format | +| `POST /api/embed` | Token (when enabled) | Native Ollama embedding endpoint | +| `POST /v1/embeddings` | Token (when enabled) | OpenAI-compat embedding endpoint (see below) | | `POST /queue/pause?tier=low` | Management key | Stop accepting requests for tier | | `POST /queue/resume?tier=low` | Management key | Resume tier | | `POST /queue/drain` | Management key | Wait for queues to empty | | `POST /queue/flush?tier=low` | Management key | Drop all pending requests immediately | +### OpenAI-compat embeddings + +OQP accepts `POST /v1/embeddings` using the OpenAI Embeddings API format and translates it to Ollama's `/api/embed` internally. The response is wrapped back into OpenAI format before returning. Auth, priority ceiling, and the embedding cache all apply identically to native `/api/embed` requests — the rewrite happens before any of those checks. + +This lets clients that use the OpenAI SDK (e.g. Graphiti with `provider: openai`) route through OQP without changing their provider configuration. Point them at OQP's port instead of Ollama's: + +```yaml +# Graphiti config — before +api_url: http://localhost:11434/v1 + +# Graphiti config — after (routes through OQP on port 11435) +api_url: http://localhost:11435/v1 +``` + +The endpoint is always-on. No config toggle is required. + ### Webhook events ```yaml diff --git a/pyproject.toml b/pyproject.toml index 802da31..c5f1884 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "ollama-queue-proxy" -version = "0.2.0" +version = "0.3.0" description = "Drop-in HTTP proxy for Ollama with priority queuing, per-client auth, and model-aware failover" readme = "README.md" license = { text = "MIT" } diff --git a/src/ollama_queue_proxy/__init__.py b/src/ollama_queue_proxy/__init__.py index a402eae..544175b 100644 --- a/src/ollama_queue_proxy/__init__.py +++ b/src/ollama_queue_proxy/__init__.py @@ -1,3 +1,3 @@ """ollama-queue-proxy: Drop-in HTTP proxy for Ollama with priority queuing.""" -__version__ = "0.2.0" +__version__ = "0.3.0" diff --git a/src/ollama_queue_proxy/config.py b/src/ollama_queue_proxy/config.py index 5369ec9..ad6b8b8 100644 --- a/src/ollama_queue_proxy/config.py +++ b/src/ollama_queue_proxy/config.py @@ -273,18 +273,14 @@ def _apply_env_overrides(data: dict, prefix: str = "OQP") -> dict: if not key.startswith(prefix + "_"): continue parts = key[len(prefix) + 1 :].lower().split("__") + # List-index overrides (e.g. OQP_OLLAMA__HOSTS__0__URL) are not + # supported — skip the entire key if any component is a digit. + if any(p.isdigit() for p in parts): + continue target = data for part in parts[:-1]: - if part.isdigit(): - # list index — handled below - continue target = target.setdefault(part, {}) leaf = parts[-1] - if leaf.isdigit(): - # Can't safely do list index overrides on arbitrary dicts here. - # The most important list override (hosts[0].url) is handled by - # full-key matching. Log and skip. - continue # Attempt type coercion for booleans and integers if value.lower() in ("true", "false"): target[leaf] = value.lower() == "true" diff --git a/src/ollama_queue_proxy/main.py b/src/ollama_queue_proxy/main.py index 8b38c13..a796e0c 100644 --- a/src/ollama_queue_proxy/main.py +++ b/src/ollama_queue_proxy/main.py @@ -22,6 +22,7 @@ from .config import Config, load_config from .hosts import HostManager from .middleware import RequestContextMiddleware, get_client_id, parse_priority +from .openai_compat import is_openai_compat_path, rewrite_path, wrap_response from .proxy import dispatch_request, read_body from .queue import PriorityQueueManager, QueueFull, QueueItem, QueuePaused, RequestExpired from .routes.queue import router as queue_router @@ -218,6 +219,7 @@ async def _enqueue_request( tier: str, state: AppState, reentries: int = 0, + path_override: str | None = None, ) -> JSONResponse: """ Buffer the request body, enqueue it, and await dispatch. Used by both the main @@ -241,7 +243,7 @@ async def _enqueue_request( # keep_alive injection — runs before cache check so cached responses also reflect # the injected value (though for embeddings keep_alive has no effect upstream) - path = request.url.path + path = path_override if path_override is not None else request.url.path ka_cfg = state.config.keep_alive if path in _KEEP_ALIVE_PATHS: body = _inject_keep_alive( @@ -277,7 +279,9 @@ async def _enqueue_request( ) enqueue_time = time.monotonic() - future: asyncio.Future = asyncio.get_event_loop().create_future() + # get_running_loop() replaces deprecated get_event_loop() — the latter raises + # RuntimeError in Python 3.12+ when called outside a running event loop. + future: asyncio.Future = asyncio.get_running_loop().create_future() conc_mgr = state.concurrency_manager @@ -294,6 +298,7 @@ async def dispatch_fn(): host_manager=state.host_manager, client=state.http_client, routing_table=state.routing_table, + path_override=path_override, ) finally: if conc_mgr is not None: @@ -399,6 +404,24 @@ async def proxy_handler(request: Request, path: str): requested_priority = parse_priority(request) tier = state.auth_manager.enforce_priority_ceiling(requested_priority, key_cfg) + # OpenAI-compat path handling: rewrite path before enqueue, wrap response after + compat_path = "/" + path + if is_openai_compat_path(compat_path): + native_path = rewrite_path(compat_path) + response = await _enqueue_request( + request=request, + client_id=client_id, + tier=tier, + state=state, + path_override=native_path, + ) + # Only wrap successful JSON responses; pass through errors unchanged + if isinstance(response, JSONResponse) and response.status_code == 200: + ollama_body = json.loads(response.body) + wrapped = wrap_response(ollama_body) + return JSONResponse(content=wrapped, status_code=200) + return response + return await _enqueue_request( request=request, client_id=client_id, diff --git a/src/ollama_queue_proxy/openai_compat.py b/src/ollama_queue_proxy/openai_compat.py new file mode 100644 index 0000000..35b3165 --- /dev/null +++ b/src/ollama_queue_proxy/openai_compat.py @@ -0,0 +1,68 @@ +"""OpenAI-compatible embedding endpoint translation layer. + +Translates POST /v1/embeddings ↔ POST /api/embed so that clients using the +OpenAI Embeddings API (e.g. Graphiti with provider=openai) can route through +OQP without reconfiguration. + +Request translation: /v1/embeddings body is identical to /api/embed — both +accept {"model": "...", "input": "..." | [...]}. Only the path is rewritten. + +Response translation: Ollama /api/embed returns: + {"embeddings": [[...], ...], "model": "bge-m3", ...} + +OpenAI /v1/embeddings expects: + { + "object": "list", + "data": [{"object": "embedding", "embedding": [...], "index": 0}, ...], + "model": "bge-m3", + "usage": {"prompt_tokens": 0, "total_tokens": 0} + } +""" + +from __future__ import annotations + +_OPENAI_COMPAT_PATHS: frozenset[str] = frozenset({"/v1/embeddings"}) + + +def is_openai_compat_path(path: str) -> bool: + """Return True if the path should be handled by the compat layer.""" + normalized = "/" + path.lstrip("/") + return normalized in _OPENAI_COMPAT_PATHS + + +def rewrite_path(path: str) -> str: + """Rewrite an OpenAI-compat path to the native Ollama equivalent.""" + normalized = "/" + path.lstrip("/") + if normalized in _OPENAI_COMPAT_PATHS: + return "/api/embed" + return path + + +def wrap_response(ollama_body: dict, model: str | None = None) -> dict: + """Wrap an Ollama /api/embed response body in the OpenAI embeddings format. + + Args: + ollama_body: Parsed JSON response from Ollama /api/embed. + model: Model name override (falls back to ollama_body["model"]). + + Returns: + Dict conforming to the OpenAI /v1/embeddings response schema. + """ + embeddings: list[list[float]] = ollama_body.get("embeddings") or [] + resolved_model: str = model or ollama_body.get("model", "") + + data = [ + { + "object": "embedding", + "embedding": vec, + "index": i, + } + for i, vec in enumerate(embeddings) + ] + + return { + "object": "list", + "data": data, + "model": resolved_model, + "usage": {"prompt_tokens": 0, "total_tokens": 0}, + } diff --git a/src/ollama_queue_proxy/proxy.py b/src/ollama_queue_proxy/proxy.py index 65690d8..7ad9c15 100644 --- a/src/ollama_queue_proxy/proxy.py +++ b/src/ollama_queue_proxy/proxy.py @@ -119,13 +119,14 @@ async def dispatch_request( host_manager: HostManager, client: httpx.AsyncClient, routing_table: RoutingTable | None = None, + path_override: str | None = None, ) -> StreamingResponse | JSONResponse: """ Dispatch a buffered request to the appropriate Ollama host with failover. Failover only applies before any response bytes are sent to the client. """ request_id = getattr(request.state, "request_id", "unknown") - path = request.url.path + path = path_override if path_override is not None else request.url.path query = request.url.query method = request.method @@ -223,8 +224,14 @@ def _next_host() -> OllamaHost | None: if is_streaming: async def stream_gen(r=resp): - async for chunk in r.aiter_bytes(): - yield chunk + try: + async for chunk in r.aiter_bytes(): + yield chunk + finally: + # Close the httpx response explicitly — if the client + # disconnects mid-stream the generator is abandoned and + # GC may never run, leaking the underlying connection. + await r.aclose() return StreamingResponse( stream_gen(), diff --git a/tests/test_openai_compat.py b/tests/test_openai_compat.py new file mode 100644 index 0000000..7f3153e --- /dev/null +++ b/tests/test_openai_compat.py @@ -0,0 +1,116 @@ +"""Tests for the OpenAI-compat embedding translation layer (openai_compat.py).""" + +import pytest + +from ollama_queue_proxy.openai_compat import ( + _OPENAI_COMPAT_PATHS, + is_openai_compat_path, + rewrite_path, + wrap_response, +) + + +# ── path detection ──────────────────────────────────────────────────────────── + + +def test_v1_embeddings_is_compat_path(): + assert is_openai_compat_path("/v1/embeddings") is True + + +def test_v1_embeddings_without_leading_slash(): + assert is_openai_compat_path("v1/embeddings") is True + + +def test_api_embed_is_not_compat_path(): + assert is_openai_compat_path("/api/embed") is False + + +def test_api_generate_is_not_compat_path(): + assert is_openai_compat_path("/api/generate") is False + + +# ── path rewrite ────────────────────────────────────────────────────────────── + + +def test_rewrite_v1_embeddings_to_api_embed(): + assert rewrite_path("/v1/embeddings") == "/api/embed" + + +def test_rewrite_non_compat_path_unchanged(): + assert rewrite_path("/api/generate") == "/api/generate" + + +def test_rewrite_without_leading_slash(): + assert rewrite_path("v1/embeddings") == "/api/embed" + + +# ── response wrapping ───────────────────────────────────────────────────────── + + +def test_wrap_response_single_embedding(): + ollama = { + "embeddings": [[0.1, 0.2, 0.3]], + "model": "bge-m3", + } + result = wrap_response(ollama) + + assert result["object"] == "list" + assert result["model"] == "bge-m3" + assert len(result["data"]) == 1 + assert result["data"][0]["object"] == "embedding" + assert result["data"][0]["embedding"] == [0.1, 0.2, 0.3] + assert result["data"][0]["index"] == 0 + assert result["usage"]["prompt_tokens"] == 0 + assert result["usage"]["total_tokens"] == 0 + + +def test_wrap_response_batch_embeddings(): + vectors = [[0.1, 0.2], [0.3, 0.4], [0.5, 0.6]] + ollama = {"embeddings": vectors, "model": "bge-m3"} + result = wrap_response(ollama) + + assert len(result["data"]) == 3 + for i, item in enumerate(result["data"]): + assert item["index"] == i + assert item["embedding"] == vectors[i] + + +def test_wrap_response_model_override(): + ollama = {"embeddings": [[0.1]], "model": "bge-m3"} + result = wrap_response(ollama, model="nomic-embed-text") + assert result["model"] == "nomic-embed-text" + + +def test_wrap_response_empty_embeddings(): + ollama = {"embeddings": [], "model": "bge-m3"} + result = wrap_response(ollama) + assert result["data"] == [] + assert result["object"] == "list" + + +def test_wrap_response_missing_embeddings_key(): + ollama = {"model": "bge-m3"} + result = wrap_response(ollama) + assert result["data"] == [] + + +def test_wrap_response_missing_model_falls_back_to_empty_string(): + ollama = {"embeddings": [[0.1]]} + result = wrap_response(ollama) + assert result["model"] == "" + + +# ── test_env_override_list_index_skipped_gracefully ─────────────────────────── + + +def test_env_override_list_index_skipped_gracefully(monkeypatch): + """OQP_OLLAMA__HOSTS__0__URL must not crash and must not corrupt data.""" + monkeypatch.setenv("OQP_OLLAMA__HOSTS__0__URL", "http://tampered:11434") + + from ollama_queue_proxy.config import _apply_env_overrides + + data = {"ollama": {"hosts": [{"url": "http://original:11434", "name": "primary"}]}} + result = _apply_env_overrides(data) + + # The list must be untouched — the env var is silently skipped + assert result["ollama"]["hosts"][0]["url"] == "http://original:11434"