Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

## [Unreleased]

## [0.3.0] - 2026-05-28

### Added
- **OpenAI-compat `/v1/embeddings` endpoint** — translates requests to `/api/embed` internally and wraps the Ollama response in the OpenAI Embeddings API format. Enables clients using the OpenAI SDK (e.g. Graphiti) to route through OQP without reconfiguration. Always-on; no config toggle required.

### Fixed
- `asyncio.get_event_loop()` in `_enqueue_request` replaced with `get_running_loop()` — eliminates DeprecationWarning in Python 3.10+ and RuntimeError in 3.12+ when called outside an async context.
- Streaming response generator now closes the underlying httpx response in a `finally` block — prevents connection leaks when a client disconnects mid-stream.
- `_apply_env_overrides` now correctly skips env vars with numeric path components (e.g. `OQP_OLLAMA__HOSTS__0__URL`) instead of raising `TypeError: list indices must be integers`.

## [0.2.0] - 2026-04-22

### Added
Expand Down
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -382,11 +382,29 @@ Returns full queue state, host health, per-client stats, routing decisions, and
| `GET /health` | None | Liveness probe — always open |
| `GET /queue/status` | Token (when enabled) | Full queue, host, client, security state |
| `GET /metrics` | Token (when enabled) | Prometheus text format |
| `POST /api/embed` | Token (when enabled) | Native Ollama embedding endpoint |
| `POST /v1/embeddings` | Token (when enabled) | OpenAI-compat embedding endpoint (see below) |
| `POST /queue/pause?tier=low` | Management key | Stop accepting requests for tier |
| `POST /queue/resume?tier=low` | Management key | Resume tier |
| `POST /queue/drain` | Management key | Wait for queues to empty |
| `POST /queue/flush?tier=low` | Management key | Drop all pending requests immediately |

### OpenAI-compat embeddings

OQP accepts `POST /v1/embeddings` using the OpenAI Embeddings API format and translates it to Ollama's `/api/embed` internally. The response is wrapped back into OpenAI format before returning. Auth, priority ceiling, and the embedding cache all apply identically to native `/api/embed` requests — the rewrite happens before any of those checks.

This lets clients that use the OpenAI SDK (e.g. Graphiti with `provider: openai`) route through OQP without changing their provider configuration. Point them at OQP's port instead of Ollama's:

```yaml
# Graphiti config — before
api_url: http://localhost:11434/v1

# Graphiti config — after (routes through OQP on port 11435)
api_url: http://localhost:11435/v1
```

The endpoint is always-on. No config toggle is required.

### Webhook events

```yaml
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "ollama-queue-proxy"
version = "0.2.0"
version = "0.3.0"
description = "Drop-in HTTP proxy for Ollama with priority queuing, per-client auth, and model-aware failover"
readme = "README.md"
license = { text = "MIT" }
Expand Down
2 changes: 1 addition & 1 deletion src/ollama_queue_proxy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""ollama-queue-proxy: Drop-in HTTP proxy for Ollama with priority queuing."""

__version__ = "0.2.0"
__version__ = "0.3.0"
12 changes: 4 additions & 8 deletions src/ollama_queue_proxy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,18 +273,14 @@ def _apply_env_overrides(data: dict, prefix: str = "OQP") -> dict:
if not key.startswith(prefix + "_"):
continue
parts = key[len(prefix) + 1 :].lower().split("__")
# List-index overrides (e.g. OQP_OLLAMA__HOSTS__0__URL) are not
# supported — skip the entire key if any component is a digit.
if any(p.isdigit() for p in parts):
continue
target = data
for part in parts[:-1]:
if part.isdigit():
# list index — handled below
continue
target = target.setdefault(part, {})
leaf = parts[-1]
if leaf.isdigit():
# Can't safely do list index overrides on arbitrary dicts here.
# The most important list override (hosts[0].url) is handled by
# full-key matching. Log and skip.
continue
# Attempt type coercion for booleans and integers
if value.lower() in ("true", "false"):
target[leaf] = value.lower() == "true"
Expand Down
27 changes: 25 additions & 2 deletions src/ollama_queue_proxy/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from .config import Config, load_config
from .hosts import HostManager
from .middleware import RequestContextMiddleware, get_client_id, parse_priority
from .openai_compat import is_openai_compat_path, rewrite_path, wrap_response
from .proxy import dispatch_request, read_body
from .queue import PriorityQueueManager, QueueFull, QueueItem, QueuePaused, RequestExpired
from .routes.queue import router as queue_router
Expand Down Expand Up @@ -218,6 +219,7 @@ async def _enqueue_request(
tier: str,
state: AppState,
reentries: int = 0,
path_override: str | None = None,
) -> JSONResponse:
"""
Buffer the request body, enqueue it, and await dispatch. Used by both the main
Expand All @@ -241,7 +243,7 @@ async def _enqueue_request(

# keep_alive injection — runs before cache check so cached responses also reflect
# the injected value (though for embeddings keep_alive has no effect upstream)
path = request.url.path
path = path_override if path_override is not None else request.url.path
ka_cfg = state.config.keep_alive
if path in _KEEP_ALIVE_PATHS:
body = _inject_keep_alive(
Expand Down Expand Up @@ -277,7 +279,9 @@ async def _enqueue_request(
)

enqueue_time = time.monotonic()
future: asyncio.Future = asyncio.get_event_loop().create_future()
# get_running_loop() replaces deprecated get_event_loop() — the latter raises
# RuntimeError in Python 3.12+ when called outside a running event loop.
future: asyncio.Future = asyncio.get_running_loop().create_future()

conc_mgr = state.concurrency_manager

Expand All @@ -294,6 +298,7 @@ async def dispatch_fn():
host_manager=state.host_manager,
client=state.http_client,
routing_table=state.routing_table,
path_override=path_override,
)
finally:
if conc_mgr is not None:
Expand Down Expand Up @@ -399,6 +404,24 @@ async def proxy_handler(request: Request, path: str):
requested_priority = parse_priority(request)
tier = state.auth_manager.enforce_priority_ceiling(requested_priority, key_cfg)

# OpenAI-compat path handling: rewrite path before enqueue, wrap response after
compat_path = "/" + path
if is_openai_compat_path(compat_path):
native_path = rewrite_path(compat_path)
response = await _enqueue_request(
request=request,
client_id=client_id,
tier=tier,
state=state,
path_override=native_path,
)
# Only wrap successful JSON responses; pass through errors unchanged
if isinstance(response, JSONResponse) and response.status_code == 200:
ollama_body = json.loads(response.body)
wrapped = wrap_response(ollama_body)
return JSONResponse(content=wrapped, status_code=200)
return response

return await _enqueue_request(
request=request,
client_id=client_id,
Expand Down
68 changes: 68 additions & 0 deletions src/ollama_queue_proxy/openai_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""OpenAI-compatible embedding endpoint translation layer.

Translates POST /v1/embeddings ↔ POST /api/embed so that clients using the
OpenAI Embeddings API (e.g. Graphiti with provider=openai) can route through
OQP without reconfiguration.

Request translation: /v1/embeddings body is identical to /api/embed — both
accept {"model": "...", "input": "..." | [...]}. Only the path is rewritten.

Response translation: Ollama /api/embed returns:
{"embeddings": [[...], ...], "model": "bge-m3", ...}

OpenAI /v1/embeddings expects:
{
"object": "list",
"data": [{"object": "embedding", "embedding": [...], "index": 0}, ...],
"model": "bge-m3",
"usage": {"prompt_tokens": 0, "total_tokens": 0}
}
"""

from __future__ import annotations

_OPENAI_COMPAT_PATHS: frozenset[str] = frozenset({"/v1/embeddings"})


def is_openai_compat_path(path: str) -> bool:
"""Return True if the path should be handled by the compat layer."""
normalized = "/" + path.lstrip("/")
return normalized in _OPENAI_COMPAT_PATHS


def rewrite_path(path: str) -> str:
"""Rewrite an OpenAI-compat path to the native Ollama equivalent."""
normalized = "/" + path.lstrip("/")
if normalized in _OPENAI_COMPAT_PATHS:
return "/api/embed"
return path


def wrap_response(ollama_body: dict, model: str | None = None) -> dict:
"""Wrap an Ollama /api/embed response body in the OpenAI embeddings format.

Args:
ollama_body: Parsed JSON response from Ollama /api/embed.
model: Model name override (falls back to ollama_body["model"]).

Returns:
Dict conforming to the OpenAI /v1/embeddings response schema.
"""
embeddings: list[list[float]] = ollama_body.get("embeddings") or []
resolved_model: str = model or ollama_body.get("model", "")

data = [
{
"object": "embedding",
"embedding": vec,
"index": i,
}
for i, vec in enumerate(embeddings)
]

return {
"object": "list",
"data": data,
"model": resolved_model,
"usage": {"prompt_tokens": 0, "total_tokens": 0},
}
13 changes: 10 additions & 3 deletions src/ollama_queue_proxy/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,14 @@ async def dispatch_request(
host_manager: HostManager,
client: httpx.AsyncClient,
routing_table: RoutingTable | None = None,
path_override: str | None = None,
) -> StreamingResponse | JSONResponse:
"""
Dispatch a buffered request to the appropriate Ollama host with failover.
Failover only applies before any response bytes are sent to the client.
"""
request_id = getattr(request.state, "request_id", "unknown")
path = request.url.path
path = path_override if path_override is not None else request.url.path
query = request.url.query
method = request.method

Expand Down Expand Up @@ -223,8 +224,14 @@ def _next_host() -> OllamaHost | None:

if is_streaming:
async def stream_gen(r=resp):
async for chunk in r.aiter_bytes():
yield chunk
try:
async for chunk in r.aiter_bytes():
yield chunk
finally:
# Close the httpx response explicitly — if the client
# disconnects mid-stream the generator is abandoned and
# GC may never run, leaking the underlying connection.
await r.aclose()

return StreamingResponse(
stream_gen(),
Expand Down
116 changes: 116 additions & 0 deletions tests/test_openai_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""Tests for the OpenAI-compat embedding translation layer (openai_compat.py)."""

import pytest

from ollama_queue_proxy.openai_compat import (
_OPENAI_COMPAT_PATHS,
is_openai_compat_path,
rewrite_path,
wrap_response,
)


# ── path detection ────────────────────────────────────────────────────────────


def test_v1_embeddings_is_compat_path():
assert is_openai_compat_path("/v1/embeddings") is True


def test_v1_embeddings_without_leading_slash():
assert is_openai_compat_path("v1/embeddings") is True


def test_api_embed_is_not_compat_path():
assert is_openai_compat_path("/api/embed") is False


def test_api_generate_is_not_compat_path():
assert is_openai_compat_path("/api/generate") is False


# ── path rewrite ──────────────────────────────────────────────────────────────


def test_rewrite_v1_embeddings_to_api_embed():
assert rewrite_path("/v1/embeddings") == "/api/embed"


def test_rewrite_non_compat_path_unchanged():
assert rewrite_path("/api/generate") == "/api/generate"


def test_rewrite_without_leading_slash():
assert rewrite_path("v1/embeddings") == "/api/embed"


# ── response wrapping ─────────────────────────────────────────────────────────


def test_wrap_response_single_embedding():
ollama = {
"embeddings": [[0.1, 0.2, 0.3]],
"model": "bge-m3",
}
result = wrap_response(ollama)

assert result["object"] == "list"
assert result["model"] == "bge-m3"
assert len(result["data"]) == 1
assert result["data"][0]["object"] == "embedding"
assert result["data"][0]["embedding"] == [0.1, 0.2, 0.3]
assert result["data"][0]["index"] == 0
assert result["usage"]["prompt_tokens"] == 0
assert result["usage"]["total_tokens"] == 0


def test_wrap_response_batch_embeddings():
vectors = [[0.1, 0.2], [0.3, 0.4], [0.5, 0.6]]
ollama = {"embeddings": vectors, "model": "bge-m3"}
result = wrap_response(ollama)

assert len(result["data"]) == 3
for i, item in enumerate(result["data"]):
assert item["index"] == i
assert item["embedding"] == vectors[i]


def test_wrap_response_model_override():
ollama = {"embeddings": [[0.1]], "model": "bge-m3"}
result = wrap_response(ollama, model="nomic-embed-text")
assert result["model"] == "nomic-embed-text"


def test_wrap_response_empty_embeddings():
ollama = {"embeddings": [], "model": "bge-m3"}
result = wrap_response(ollama)
assert result["data"] == []
assert result["object"] == "list"


def test_wrap_response_missing_embeddings_key():
ollama = {"model": "bge-m3"}
result = wrap_response(ollama)
assert result["data"] == []


def test_wrap_response_missing_model_falls_back_to_empty_string():
ollama = {"embeddings": [[0.1]]}
result = wrap_response(ollama)
assert result["model"] == ""


# ── test_env_override_list_index_skipped_gracefully ───────────────────────────


def test_env_override_list_index_skipped_gracefully(monkeypatch):
"""OQP_OLLAMA__HOSTS__0__URL must not crash and must not corrupt data."""
monkeypatch.setenv("OQP_OLLAMA__HOSTS__0__URL", "http://tampered:11434")

from ollama_queue_proxy.config import _apply_env_overrides

data = {"ollama": {"hosts": [{"url": "http://original:11434", "name": "primary"}]}}
result = _apply_env_overrides(data)

# The list must be untouched — the env var is silently skipped
assert result["ollama"]["hosts"][0]["url"] == "http://original:11434"
Loading