Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions src/praisonai/praisonai/cli/commands/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
- a2a: Agent-to-Agent protocol
- a2u: Agent-to-User event stream
- unified: All providers combined
- openai: OpenAI API compatibility layer
"""

from typing import Optional
Expand Down Expand Up @@ -125,6 +126,7 @@ def serve_callback(ctx: typer.Context):
[green]a2a[/green] Agent-to-Agent protocol (port 8001)
[green]a2u[/green] Agent-to-User events (port 8002)
[green]unified[/green] All providers combined (port 8765)
[green]openai[/green] OpenAI API compatibility layer (port 8765)

[bold]Management:[/bold]
[yellow]start[/yellow] Start legacy API server
Expand Down Expand Up @@ -577,3 +579,140 @@ def serve_unified(
output = get_output_controller()
output.print_error(f"Unified serve module not available: {e}")
raise typer.Exit(4)


@app.command("openai")
def serve_openai(
host: str = typer.Option("127.0.0.1", "--host", "-h", help="Host to bind to"),
port: int = typer.Option(8765, "--port", "-p", help="Port to bind to"),
agents_url: str = typer.Option("http://127.0.0.1:8000", "--agents-url", help="URL of the running Agents API server"),
api_key: Optional[str] = typer.Option(None, "--api-key", help="API key for authentication"),
reload: bool = typer.Option(False, "--reload", help="Enable auto-reload"),
):
Comment on lines +585 to +591

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Add an --agents-url option to allow users to specify the URL of their running Agents API server. Hardcoding it to the same host and port as the OpenAI server will cause tool invocation to fail with a 404, as the agents API endpoints are not registered on this server.

def serve_openai( 
    host: str = typer.Option("127.0.0.1", "--host", "-h", help="Host to bind to"),
    port: int = typer.Option(8765, "--port", "-p", help="Port to bind to"),
    agents_url: str = typer.Option("http://127.0.0.1:8000", "--agents-url", help="URL of the running Agents API server"),
    api_key: Optional[str] = typer.Option(None, "--api-key", help="API key for authentication"),
    reload: bool = typer.Option(False, "--reload", help="Enable auto-reload"),
):

"""Start OpenAI API compatibility server.

Provides OpenAI-compatible endpoints like /v1/chat/completions for
drop-in compatibility with OpenAI client libraries.

Examples:
praisonai serve openai
praisonai serve openai --port 8765 --api-key mykey
"""
output = get_output_controller()

try:
import uvicorn
from fastapi import FastAPI
from praisonai.endpoints.providers import OpenAICompatProvider, AgentsAPIProvider
from praisonai.endpoints.server import create_unified_app, register_provider_to_discovery, register_endpoint_to_discovery

output.print_info(f"Starting OpenAI-compatible server on {host}:{port}")

# Create providers
agents_provider = AgentsAPIProvider(base_url=agents_url)
openai_provider = OpenAICompatProvider(
base_url=f"http://{host}:{port}",
api_key=api_key,
agent_provider=agents_provider
)
Comment on lines +589 to +617

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Enforce --api-key on inbound requests.

--api-key is documented as authentication, but none of the /v1/* handlers verifies the client’s bearer key; it is only passed into OpenAICompatProvider. That leaves chat/completions and /v1/tools/invoke unauthenticated whenever this server is reachable.

🛡️ Suggested direction
         from fastapi import Request, Response
         from fastapi.responses import StreamingResponse
+        import hmac
         import json
+
+        def _json_error(message: str, error_type: str, status_code: int) -> Response:
+            return Response(
+                content=json.dumps({"error": {"message": message, "type": error_type}}),
+                status_code=status_code,
+                media_type="application/json",
+                headers={"WWW-Authenticate": "Bearer"} if status_code == 401 else None,
+            )
+
+        def _require_api_key(request: Request) -> Optional[Response]:
+            if api_key is None:
+                return None
+
+            authorization = request.headers.get("authorization", "")
+            bearer_token = (
+                authorization[7:].strip()
+                if authorization.lower().startswith("bearer ")
+                else ""
+            )
+            x_api_key = request.headers.get("x-api-key", "")
+
+            if (
+                bearer_token and hmac.compare_digest(bearer_token, api_key)
+            ) or (
+                x_api_key and hmac.compare_digest(x_api_key, api_key)
+            ):
+                return None
+
+            return _json_error("Invalid API key", "authentication_error", 401)
 
         `@app.post`("/v1/chat/completions")
         async def chat_completions(request: Request):
+            auth_error = _require_api_key(request)
+            if auth_error is not None:
+                return auth_error
             body = await request.json()

Apply the same _require_api_key() guard to /v1/completions, /v1/models, and /v1/tools/invoke.

Also applies to: 630-695

🧰 Tools
🪛 ast-grep (0.44.0)

[warning] 613-613: Do not make http calls without encryption
Context: f"http://{host}:{port}"
Note: [CWE-319] Cleartext Transmission of Sensitive Information.

(requests-http)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/praisonai/praisonai/cli/commands/serve.py` around lines 589 - 617, The
OpenAI-compatible server accepts an --api-key parameter for authentication but
does not enforce API key validation on incoming requests to the /v1/* endpoints.
Apply the existing _require_api_key() guard to the /v1/completions, /v1/models,
and /v1/tools/invoke endpoint handlers to verify that inbound requests include
the correct bearer token before processing them. This should be done in the
endpoint registration or middleware setup where create_unified_app is used to
ensure all authentication-required endpoints are protected.


# Create server and register OpenAI provider
app = create_unified_app(server_name="PraisonAI OpenAI API")
register_provider_to_discovery(app, openai_provider.get_provider_info())
for endpoint in openai_provider.list_endpoints():
register_endpoint_to_discovery(app, endpoint)

# Add OpenAI-style route mappings
from fastapi import Request, Response
from fastapi.responses import StreamingResponse
import json

@app.post("/v1/chat/completions")
async def chat_completions(request: Request):
body = await request.json()

if body.get("stream", False):
Comment on lines +632 to +634

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Normalize bad JSON into OpenAI-style 400 responses.

await request.json() and the immediate body.get(...) can surface malformed or non-object JSON as an unhandled server error before provider error mapping runs. Add one request-body helper and reuse it across the POST endpoints.

🐛 Suggested direction
+        from json import JSONDecodeError
+
+        async def _read_json_object(request: Request):
+            try:
+                body = await request.json()
+            except JSONDecodeError:
+                return None, Response(
+                    content=json.dumps({"error": {"message": "Invalid JSON body", "type": "invalid_request_error"}}),
+                    status_code=400,
+                    media_type="application/json",
+                )
+
+            if not isinstance(body, dict):
+                return None, Response(
+                    content=json.dumps({"error": {"message": "Request body must be a JSON object", "type": "invalid_request_error"}}),
+                    status_code=400,
+                    media_type="application/json",
+                )
+
+            return body, None
+
         `@app.post`("/v1/chat/completions")
         async def chat_completions(request: Request):
-            body = await request.json()
+            body, body_error = await _read_json_object(request)
+            if body_error is not None:
+                return body_error

Apply the same helper to /v1/completions and /v1/tools/invoke.

Also applies to: 666-667, 685-686

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/praisonai/praisonai/cli/commands/serve.py` around lines 632 - 634,
Malformed JSON in the request body can cause unhandled exceptions that bypass
provider error mapping at lines 632-634. Create a helper function to safely
parse and validate the request body JSON, catching JSONDecodeError and
non-object payloads to return OpenAI-style 400 error responses. Apply this
helper function consistently across all three POST endpoints: the current
endpoint at line 632, the /v1/completions endpoint at lines 666-667, and the
/v1/tools/invoke endpoint at lines 685-686, replacing the direct `await
request.json()` calls with the helper to ensure all JSON parsing errors are
normalized into proper error responses before the `body.get()` call.

def generate():
for chunk in openai_provider.invoke_stream("chat_completions", body):
if chunk["event"] == "data":
yield f"data: {json.dumps(chunk['data'])}\n\n"
elif chunk["event"] == "done":
yield "data: [DONE]\n\n"
elif chunk["event"] == "error":
# Send error as OpenAI-formatted SSE error chunk
error_chunk = {
"error": {
"message": chunk.get("data", {}).get("error", "Stream error occurred"),
"type": "stream_error"
}
}
yield f"data: {json.dumps(error_chunk)}\n\n"
yield "data: [DONE]\n\n"
break
return StreamingResponse(generate(), media_type="text/event-stream")
Comment thread
greptile-apps[bot] marked this conversation as resolved.

result = openai_provider.invoke("chat_completions", body, stream=False)
if not result.ok:
return Response(
content=json.dumps({"error": {"message": result.error, "type": "api_error"}}),
status_code=400,
media_type="application/json"
)

return result.data
Comment on lines +630 to +662

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

When stream=True, calling openai_provider.invoke first will delegate to chat_completion(..., stream=True), which crashes with an AttributeError because it tries to access .choices on the returned generator. This causes streaming requests to always fail with a 400 error. Additionally, calling invoke and then invoke_stream would perform the LLM call twice. The streaming response should also use the standard text/event-stream media type.

        @app.post("/v1/chat/completions")
        async def chat_completions(request: Request):
            body = await request.json()
            
            if body.get("stream", False):
                def generate():
                    for chunk in openai_provider.invoke_stream("chat_completions", body):
                        if chunk["event"] == "data":
                            yield f"data: {json.dumps(chunk['data'])}\n\n"
                        elif chunk["event"] == "done":
                            yield "data: [DONE]\n\n"
                return StreamingResponse(generate(), media_type="text/event-stream")
            
            result = openai_provider.invoke("chat_completions", body, stream=False)
            if not result.ok:
                return Response(
                    content=json.dumps({"error": {"message": result.error, "type": "api_error"}}),
                    status_code=400,
                    media_type="application/json"
                )
            
            return result.data

Comment thread
greptile-apps[bot] marked this conversation as resolved.

@app.post("/v1/completions")
async def completions(request: Request):
body = await request.json()
result = openai_provider.invoke("completions", body)

if not result.ok:
return Response(
content=json.dumps({"error": {"message": result.error, "type": "api_error"}}),
status_code=400,
media_type="application/json"
)

return result.data

@app.get("/v1/models")
async def models():
result = openai_provider.invoke("models")
return result.data if result.ok else {"error": result.error}

@app.post("/v1/tools/invoke")
async def tools_invoke(request: Request):
body = await request.json()
result = openai_provider.invoke("tools_invoke", body)

if not result.ok:
return Response(
content=json.dumps({"error": {"message": result.error, "type": "api_error"}}),
status_code=400,
media_type="application/json"
)

return result.data

output.print("OpenAI-compatible endpoints available:")
output.print(" POST /v1/chat/completions")
output.print(" POST /v1/completions")
output.print(" GET /v1/models")
output.print(" POST /v1/tools/invoke")

# Start server
uvicorn.run(
app,
host=host,
port=port,
reload=reload,
access_log=False,
)
Comment on lines +704 to +710

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 --reload flag always raises ValueError at startup

uvicorn.run() requires the app argument to be a string import path (e.g. "module:app") whenever reload=True. Passing a FastAPI instance — as done here — causes uvicorn to raise ValueError: reload requires that 'app' is a string. That exception is caught by the broad except Exception handler and exits with code 1, printing "Failed to start OpenAI server", which makes the failure look like an infrastructure problem rather than a misconfiguration. The --reload flag is exposed in the CLI and documented in the help text, so any user who passes it will see a silent crash.

Comment on lines +704 to +710

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🌐 Web query:

Does uvicorn.run support reload=True when the ASGI app is passed as a FastAPI object instead of an import string, and what does the current Uvicorn documentation recommend?

💡 Result:

Uvicorn does not support the reload=True option when the ASGI application is passed as an object [1][2]. If you attempt to use reload=True with an application instance, Uvicorn will typically fail or display a warning because it requires an import string (e.g., "main:app") to properly manage the lifecycle of the reloaded processes [3][2]. The current Uvicorn documentation explicitly recommends using the import string style when running programmatically if you require features like auto-reloading or multiple workers [1][4]. When using these features, you must pass the application as a string representing the import path, and it is strongly advised to wrap the uvicorn.run call within an if name == 'main': block to prevent issues with multiprocessing [5][1][4].

Citations:


Reject --reload for dynamically constructed FastAPI app.

The code passes an in-memory FastAPI object to uvicorn.run() with reload=True, but Uvicorn's reload mode requires an import string (e.g., "main:app") to function. This feature is broken and will fail or be silently ignored. Either disable reload here or extract the app into an importable module.

Suggested fix
+        if reload:
+            output.print_error(
+                "--reload is not supported for the dynamically constructed OpenAI server yet"
+            )
+            raise typer.Exit(2)
+
         uvicorn.run(
             app,
             host=host,
             port=port,
-            reload=reload,
+            reload=False,
             access_log=False,
         )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
uvicorn.run(
app,
host=host,
port=port,
reload=reload,
access_log=False,
)
if reload:
output.print_error(
"--reload is not supported for the dynamically constructed OpenAI server yet"
)
raise typer.Exit(2)
uvicorn.run(
app,
host=host,
port=port,
reload=False,
access_log=False,
)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/praisonai/praisonai/cli/commands/serve.py` around lines 704 - 710, The
uvicorn.run() call passes a dynamically constructed FastAPI app object with
reload=True, but Uvicorn's reload mode requires an importable module path
string, not an in-memory app object. Set the reload parameter to False in the
uvicorn.run() call since the app is created dynamically and cannot be reloaded
from an import path.


except ImportError as e:
output.print_error(f"OpenAI compatibility module not available: {e}")
output.print("Install with: pip install praisonai[api]")
raise typer.Exit(4)
except Exception as e:
output.print_error(f"Failed to start OpenAI server: {e}")
raise typer.Exit(1)
1 change: 1 addition & 0 deletions src/praisonai/praisonai/endpoints/providers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"ToolsMCPProvider": ".tools_mcp",
"A2AProvider": ".a2a",
"A2UProvider": ".a2u",
"OpenAICompatProvider": ".openai_compat",
}

def __getattr__(name: str):
Expand Down
Loading
Loading