From a7e3104d711a951e617040c0a914819de152be88 Mon Sep 17 00:00:00 2001 From: shohei81 Date: Fri, 16 Jan 2026 11:56:27 +0900 Subject: [PATCH 1/7] Add Amplifier setup guide and responses endpoint --- docs/AMPLIFIER.md | 60 ++++++++ src/main.py | 368 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 428 insertions(+) create mode 100644 docs/AMPLIFIER.md diff --git a/docs/AMPLIFIER.md b/docs/AMPLIFIER.md new file mode 100644 index 0000000..c5b4194 --- /dev/null +++ b/docs/AMPLIFIER.md @@ -0,0 +1,60 @@ +# Amplifier Setup (OpenAI-Compatible Wrapper) + +This repo exposes an OpenAI-compatible API at `http://localhost:8000` including +`/v1/responses`. Microsoft Amplifier expects the Responses API, so it should work +once the base URL points to this server. + +## 1) Start the wrapper + +```bash +poetry install +poetry run uvicorn src.main:app --reload --port 8000 +``` + +If you want API key protection, set `API_KEY` before starting the server: + +```bash +export API_KEY=your-wrapper-key +``` + +## 2) Initialize Amplifier with the wrapper URL + +During `amplifier init`, set the **API base URL** to: + +``` +http://localhost:8000 +``` + +If you already initialized, re-run `amplifier init` or update the saved config +to point to the same base URL. + +## 3) Select the provider/model + +Amplifier expects OpenAI-style endpoints, so use the OpenAI provider: + +```bash +amplifier provider use openai --model claude-sonnet-4-5-20250929 +``` + +Other Claude models from this wrapper can be used as well (see `/v1/models`). + +## 4) (Optional) Set client auth for Amplifier + +If the wrapper has `API_KEY` enabled, set an OpenAI-style API key in your shell +before running Amplifier: + +```bash +export OPENAI_API_KEY=your-wrapper-key +``` + +Amplifier will send `Authorization: Bearer $OPENAI_API_KEY` to the wrapper. + +## 5) Quick check + +```bash +curl -X POST http://localhost:8000/v1/responses \ + -H "Content-Type: application/json" \ + -d '{"model":"claude-sonnet-4-5-20250929","input":"Hello!"}' +``` + +You should get a JSON response with `"object": "response"` and output text. diff --git a/src/main.py b/src/main.py index 4a74aa4..099f64b 100644 --- a/src/main.py +++ b/src/main.py @@ -5,6 +5,7 @@ import secrets import string import uuid +from datetime import datetime from typing import Optional, AsyncGenerator, Dict, Any from contextlib import asynccontextmanager @@ -22,6 +23,7 @@ ChatCompletionStreamResponse, Choice, Message, + get_default_model, Usage, StreamChoice, SessionListResponse, @@ -388,6 +390,75 @@ async def validation_exception_handler(request: Request, exc: RequestValidationE return JSONResponse(status_code=422, content=error_response) +def _extract_text_from_response_content(content: Any) -> str: + """Extract plain text from Responses API content structures.""" + if content is None: + return "" + if isinstance(content, str): + return content + if isinstance(content, list): + text_parts = [] + for part in content: + if isinstance(part, str): + text_parts.append(part) + continue + if isinstance(part, dict): + part_type = part.get("type") + if part_type in ("input_text", "text", "output_text"): + text_parts.append(part.get("text") or part.get("content") or "") + elif "text" in part: + text_parts.append(part.get("text", "")) + elif "content" in part: + text_parts.append(_extract_text_from_response_content(part.get("content"))) + return "\n".join(p for p in text_parts if p) + if isinstance(content, dict): + if "text" in content: + return str(content.get("text", "")) + if "content" in content: + return _extract_text_from_response_content(content.get("content")) + return str(content) + + +def _messages_from_responses_input(input_value: Any) -> list[Message]: + """Convert OpenAI Responses API input to OpenAI chat messages.""" + if input_value is None: + return [] + if isinstance(input_value, str): + return [Message(role="user", content=input_value)] + if isinstance(input_value, dict): + if input_value.get("type") == "message" or "role" in input_value: + role = input_value.get("role", "user") + content = _extract_text_from_response_content(input_value.get("content")) + return [Message(role=role, content=content)] + return [Message(role="user", content=_extract_text_from_response_content(input_value))] + if isinstance(input_value, list): + # If it looks like a list of messages, convert each item to a Message. + if input_value and isinstance(input_value[0], dict) and ( + "role" in input_value[0] or input_value[0].get("type") == "message" + ): + messages: list[Message] = [] + for item in input_value: + role = item.get("role", "user") + content = _extract_text_from_response_content(item.get("content")) + messages.append(Message(role=role, content=content)) + return messages + # Otherwise treat it as content parts for a single user message. + return [Message(role="user", content=_extract_text_from_response_content(input_value))] + return [Message(role="user", content=str(input_value))] + + +def _build_responses_output(text: str, item_id: str) -> list[Dict[str, Any]]: + """Build Responses API output array.""" + return [ + { + "id": item_id, + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": text}], + } + ] + + async def generate_streaming_response( request: ChatCompletionRequest, request_id: str, claude_headers: Optional[Dict[str, Any]] = None ) -> AsyncGenerator[str, None]: @@ -605,6 +676,149 @@ async def generate_streaming_response( yield f"data: {json.dumps(error_chunk)}\n\n" +async def generate_responses_stream( + request: ChatCompletionRequest, request_id: str, claude_headers: Optional[Dict[str, Any]] = None +) -> AsyncGenerator[str, None]: + """Generate SSE formatted streaming response for /v1/responses.""" + try: + all_messages, actual_session_id = session_manager.process_messages( + request.messages, request.session_id + ) + + prompt, system_prompt = MessageAdapter.messages_to_prompt(all_messages) + + sampling_instructions = request.get_sampling_instructions() + if sampling_instructions: + if system_prompt: + system_prompt = f"{system_prompt}\n\n{sampling_instructions}" + else: + system_prompt = sampling_instructions + + prompt = MessageAdapter.filter_content(prompt) + if system_prompt: + system_prompt = MessageAdapter.filter_content(system_prompt) + + claude_options = request.to_claude_options() + if claude_headers: + claude_options.update(claude_headers) + + if claude_options.get("model"): + ParameterValidator.validate_model(claude_options["model"]) + + if not request.enable_tools: + claude_options["disallowed_tools"] = CLAUDE_TOOLS + claude_options["max_turns"] = 1 + else: + claude_options["allowed_tools"] = DEFAULT_ALLOWED_TOOLS + claude_options["permission_mode"] = "bypassPermissions" + + output_item_id = f"msg_{uuid.uuid4().hex[:24]}" + created_ts = int(datetime.now().timestamp()) + response_created = { + "id": request_id, + "object": "response", + "created": created_ts, + "model": request.model, + "status": "in_progress", + } + yield f"event: response.created\ndata: {json.dumps(response_created)}\n\n" + + chunks_buffer = [] + assistant_text_parts: list[str] = [] + + async for chunk in claude_cli.run_completion( + prompt=prompt, + system_prompt=system_prompt, + model=claude_options.get("model"), + max_turns=claude_options.get("max_turns", 10), + allowed_tools=claude_options.get("allowed_tools"), + disallowed_tools=claude_options.get("disallowed_tools"), + permission_mode=claude_options.get("permission_mode"), + stream=True, + ): + chunks_buffer.append(chunk) + content = None + if chunk.get("type") == "assistant" and "message" in chunk: + message = chunk["message"] + if isinstance(message, dict) and "content" in message: + content = message["content"] + elif "content" in chunk and isinstance(chunk["content"], list): + content = chunk["content"] + + if content is None: + continue + + if isinstance(content, list): + for block in content: + if hasattr(block, "text"): + raw_text = block.text + elif isinstance(block, dict) and block.get("type") == "text": + raw_text = block.get("text", "") + else: + continue + + filtered_text = MessageAdapter.filter_content(raw_text) + if filtered_text and not filtered_text.isspace(): + assistant_text_parts.append(filtered_text) + delta_event = { + "type": "response.output_text.delta", + "delta": filtered_text, + "item_id": output_item_id, + "output_index": 0, + "content_index": 0, + } + yield ( + f"event: response.output_text.delta\ndata: {json.dumps(delta_event)}\n\n" + ) + elif isinstance(content, str): + filtered_content = MessageAdapter.filter_content(content) + if filtered_content and not filtered_content.isspace(): + assistant_text_parts.append(filtered_content) + delta_event = { + "type": "response.output_text.delta", + "delta": filtered_content, + "item_id": output_item_id, + "output_index": 0, + "content_index": 0, + } + yield ( + f"event: response.output_text.delta\ndata: {json.dumps(delta_event)}\n\n" + ) + + assistant_content = "".join(assistant_text_parts).strip() + if not assistant_content and chunks_buffer: + assistant_content = claude_cli.parse_claude_message(chunks_buffer) or "" + assistant_content = MessageAdapter.filter_content(assistant_content) + + if actual_session_id and assistant_content: + assistant_message = Message(role="assistant", content=assistant_content) + session_manager.add_assistant_response(actual_session_id, assistant_message) + + token_usage = claude_cli.estimate_token_usage(prompt, assistant_content, request.model) + usage = { + "input_tokens": token_usage["prompt_tokens"], + "output_tokens": token_usage["completion_tokens"], + "total_tokens": token_usage["total_tokens"], + } + + response_completed = { + "id": request_id, + "object": "response", + "created": created_ts, + "model": request.model, + "status": "completed", + "output": _build_responses_output(assistant_content, output_item_id), + "usage": usage, + } + yield f"event: response.completed\ndata: {json.dumps(response_completed)}\n\n" + yield "data: [DONE]\n\n" + + except Exception as e: + logger.error(f"Responses streaming error: {e}") + error_chunk = {"error": {"message": str(e), "type": "streaming_error"}} + yield f"data: {json.dumps(error_chunk)}\n\n" + + @app.post("/v1/chat/completions") @rate_limit_endpoint("chat") async def chat_completions( @@ -760,6 +974,160 @@ async def chat_completions( raise HTTPException(status_code=500, detail=str(e)) +@app.post("/v1/responses") +@rate_limit_endpoint("chat") +async def responses_endpoint( + request: Request, + credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), +): + """OpenAI Responses API compatible endpoint.""" + await verify_api_key(request, credentials) + + auth_valid, auth_info = validate_claude_code_auth() + if not auth_valid: + error_detail = { + "message": "Claude Code authentication failed", + "errors": auth_info.get("errors", []), + "method": auth_info.get("method", "none"), + "help": "Check /v1/auth/status for detailed authentication information", + } + raise HTTPException(status_code=503, detail=error_detail) + + try: + body = await request.json() + if not isinstance(body, dict): + raise HTTPException(status_code=400, detail="Invalid request body") + + model = body.get("model") or get_default_model() + stream = bool(body.get("stream", False)) + temperature = body.get("temperature", 1.0) + top_p = body.get("top_p", 1.0) + max_output_tokens = body.get("max_output_tokens") or body.get("max_tokens") + user = body.get("user") + session_id = body.get("session_id") + + messages = [] + if "messages" in body: + messages = [Message(**msg) for msg in body.get("messages", [])] + else: + messages = _messages_from_responses_input(body.get("input")) + + instructions = body.get("instructions") or body.get("system") or body.get("system_prompt") + if instructions: + messages = [Message(role="system", content=instructions)] + messages + + if not messages: + raise HTTPException(status_code=400, detail="Missing input/messages for /v1/responses") + + enable_tools = bool(body.get("enable_tools")) or bool(body.get("tools")) + + request_body = ChatCompletionRequest( + model=model, + messages=messages, + temperature=temperature, + top_p=top_p, + stream=stream, + max_completion_tokens=max_output_tokens, + user=user, + session_id=session_id, + enable_tools=enable_tools, + ) + + request_id = f"resp_{uuid.uuid4().hex[:24]}" + claude_headers = ParameterValidator.extract_claude_headers(dict(request.headers)) + + if request_body.stream: + return StreamingResponse( + generate_responses_stream(request_body, request_id, claude_headers), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + }, + ) + + all_messages, actual_session_id = session_manager.process_messages( + request_body.messages, request_body.session_id + ) + + prompt, system_prompt = MessageAdapter.messages_to_prompt(all_messages) + + sampling_instructions = request_body.get_sampling_instructions() + if sampling_instructions: + if system_prompt: + system_prompt = f"{system_prompt}\n\n{sampling_instructions}" + else: + system_prompt = sampling_instructions + + prompt = MessageAdapter.filter_content(prompt) + if system_prompt: + system_prompt = MessageAdapter.filter_content(system_prompt) + + claude_options = request_body.to_claude_options() + if claude_headers: + claude_options.update(claude_headers) + + if claude_options.get("model"): + ParameterValidator.validate_model(claude_options["model"]) + + if not request_body.enable_tools: + claude_options["disallowed_tools"] = CLAUDE_TOOLS + claude_options["max_turns"] = 1 + else: + claude_options["allowed_tools"] = DEFAULT_ALLOWED_TOOLS + claude_options["permission_mode"] = "bypassPermissions" + + chunks = [] + async for chunk in claude_cli.run_completion( + prompt=prompt, + system_prompt=system_prompt, + model=claude_options.get("model"), + max_turns=claude_options.get("max_turns", 10), + allowed_tools=claude_options.get("allowed_tools"), + disallowed_tools=claude_options.get("disallowed_tools"), + permission_mode=claude_options.get("permission_mode"), + stream=False, + ): + chunks.append(chunk) + + raw_assistant_content = claude_cli.parse_claude_message(chunks) + if not raw_assistant_content: + raise HTTPException(status_code=500, detail="No response from Claude Code") + + assistant_content = MessageAdapter.filter_content(raw_assistant_content) + + if actual_session_id: + assistant_message = Message(role="assistant", content=assistant_content) + session_manager.add_assistant_response(actual_session_id, assistant_message) + + token_usage = claude_cli.estimate_token_usage(prompt, assistant_content, request_body.model) + usage = { + "input_tokens": token_usage["prompt_tokens"], + "output_tokens": token_usage["completion_tokens"], + "total_tokens": token_usage["total_tokens"], + } + + output_item_id = f"msg_{uuid.uuid4().hex[:24]}" + response = { + "id": request_id, + "object": "response", + "created": int(datetime.now().timestamp()), + "model": request_body.model, + "status": "completed", + "output": _build_responses_output(assistant_content, output_item_id), + "output_text": assistant_content, + "usage": usage, + } + + return response + + except HTTPException: + raise + except Exception as e: + logger.error(f"Responses API error: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + @app.post("/v1/messages") @rate_limit_endpoint("chat") async def anthropic_messages( From 875cf23d047ce2af6510e17212971ee349aaf6ad Mon Sep 17 00:00:00 2001 From: shohei81 Date: Sat, 17 Jan 2026 12:59:30 +0900 Subject: [PATCH 2/7] chore: drop unrelated Amplifier docs from upstream PR --- docs/AMPLIFIER.md | 60 ----------------------------------------------- 1 file changed, 60 deletions(-) delete mode 100644 docs/AMPLIFIER.md diff --git a/docs/AMPLIFIER.md b/docs/AMPLIFIER.md deleted file mode 100644 index c5b4194..0000000 --- a/docs/AMPLIFIER.md +++ /dev/null @@ -1,60 +0,0 @@ -# Amplifier Setup (OpenAI-Compatible Wrapper) - -This repo exposes an OpenAI-compatible API at `http://localhost:8000` including -`/v1/responses`. Microsoft Amplifier expects the Responses API, so it should work -once the base URL points to this server. - -## 1) Start the wrapper - -```bash -poetry install -poetry run uvicorn src.main:app --reload --port 8000 -``` - -If you want API key protection, set `API_KEY` before starting the server: - -```bash -export API_KEY=your-wrapper-key -``` - -## 2) Initialize Amplifier with the wrapper URL - -During `amplifier init`, set the **API base URL** to: - -``` -http://localhost:8000 -``` - -If you already initialized, re-run `amplifier init` or update the saved config -to point to the same base URL. - -## 3) Select the provider/model - -Amplifier expects OpenAI-style endpoints, so use the OpenAI provider: - -```bash -amplifier provider use openai --model claude-sonnet-4-5-20250929 -``` - -Other Claude models from this wrapper can be used as well (see `/v1/models`). - -## 4) (Optional) Set client auth for Amplifier - -If the wrapper has `API_KEY` enabled, set an OpenAI-style API key in your shell -before running Amplifier: - -```bash -export OPENAI_API_KEY=your-wrapper-key -``` - -Amplifier will send `Authorization: Bearer $OPENAI_API_KEY` to the wrapper. - -## 5) Quick check - -```bash -curl -X POST http://localhost:8000/v1/responses \ - -H "Content-Type: application/json" \ - -d '{"model":"claude-sonnet-4-5-20250929","input":"Hello!"}' -``` - -You should get a JSON response with `"object": "response"` and output text. From aa32a491b9545d3e3a76cb27a67d49ad6bf1c160 Mon Sep 17 00:00:00 2001 From: shohei81 Date: Thu, 28 May 2026 22:02:16 +0900 Subject: [PATCH 3/7] style: apply black formatting to responses input handler Why: CI lint step (`black --check src tests`) was failing on src/main.py, which blocked the rest of the matrix (test/type-check/security/pytest all got skipped on Python 3.10/3.11/3.12). Co-Authored-By: Claude Opus 4.7 (1M context) --- src/main.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main.py b/src/main.py index 099f64b..cc23d36 100644 --- a/src/main.py +++ b/src/main.py @@ -433,8 +433,10 @@ def _messages_from_responses_input(input_value: Any) -> list[Message]: return [Message(role="user", content=_extract_text_from_response_content(input_value))] if isinstance(input_value, list): # If it looks like a list of messages, convert each item to a Message. - if input_value and isinstance(input_value[0], dict) and ( - "role" in input_value[0] or input_value[0].get("type") == "message" + if ( + input_value + and isinstance(input_value[0], dict) + and ("role" in input_value[0] or input_value[0].get("type") == "message") ): messages: list[Message] = [] for item in input_value: From 827bbfa1647c87d03348c33108d5fbcd0d769e19 Mon Sep 17 00:00:00 2001 From: shohei81 Date: Thu, 28 May 2026 22:24:32 +0900 Subject: [PATCH 4/7] feat(responses): align streaming + response shape with OpenAI Responses API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Minimum spec-compliance fixes for /v1/responses: - Streaming: emit the full SSE event sequence per spec (response.created → in_progress → output_item.added → content_part.added → output_text.delta×N → output_text.done → content_part.done → output_item.done → completed). Each event now carries `type` + monotonic `sequence_number`, and the response object is nested under the `response` key as required by OpenAI SDK clients. - Add `output_text` to the final streaming response object so clients see the same flattened text field they get from non-streaming. - Map `previous_response_id` → internal session_id so real Responses API clients can chain conversations without using our custom `session_id` field. - `_build_responses_output` now includes `status` and `annotations` on the message/output_text items (used by both streaming and non-streaming responses). Why minimum: refactoring the duplicated chat/responses pipeline is out of scope here; these changes only shift the surface to match what OpenAI clients actually expect, without touching the core Claude execution path. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/main.py | 170 ++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 131 insertions(+), 39 deletions(-) diff --git a/src/main.py b/src/main.py index cc23d36..474291e 100644 --- a/src/main.py +++ b/src/main.py @@ -456,7 +456,8 @@ def _build_responses_output(text: str, item_id: str) -> list[Dict[str, Any]]: "id": item_id, "type": "message", "role": "assistant", - "content": [{"type": "output_text", "text": text}], + "status": "completed", + "content": [{"type": "output_text", "text": text, "annotations": []}], } ] @@ -716,17 +717,57 @@ async def generate_responses_stream( output_item_id = f"msg_{uuid.uuid4().hex[:24]}" created_ts = int(datetime.now().timestamp()) - response_created = { + + # Spec: every event carries `type` + monotonic `sequence_number`. + # See https://developers.openai.com/api/reference/resources/responses/streaming-events + seq = 0 + + def _sse(event_name: str, payload: Dict[str, Any]) -> str: + nonlocal seq + data = {"type": event_name, "sequence_number": seq, **payload} + seq += 1 + return f"event: {event_name}\ndata: {json.dumps(data)}\n\n" + + base_response: Dict[str, Any] = { "id": request_id, "object": "response", - "created": created_ts, + "created_at": created_ts, "model": request.model, "status": "in_progress", + "output": [], } - yield f"event: response.created\ndata: {json.dumps(response_created)}\n\n" + yield _sse("response.created", {"response": base_response}) + yield _sse("response.in_progress", {"response": base_response}) chunks_buffer = [] assistant_text_parts: list[str] = [] + item_started = False + + def _emit_item_added() -> str: + return _sse( + "response.output_item.added", + { + "output_index": 0, + "item": { + "id": output_item_id, + "type": "message", + "role": "assistant", + "status": "in_progress", + "content": [], + }, + }, + ) + + def _emit_part_added() -> str: + return _sse( + "response.content_part.added", + { + "item_id": output_item_id, + "output_index": 0, + "content_index": 0, + "part": {"type": "output_text", "text": "", "annotations": []}, + }, + ) async for chunk in claude_cli.run_completion( prompt=prompt, @@ -750,42 +791,36 @@ async def generate_responses_stream( if content is None: continue + raw_texts: list[str] = [] if isinstance(content, list): for block in content: if hasattr(block, "text"): - raw_text = block.text + raw_texts.append(block.text) elif isinstance(block, dict) and block.get("type") == "text": - raw_text = block.get("text", "") - else: - continue - - filtered_text = MessageAdapter.filter_content(raw_text) - if filtered_text and not filtered_text.isspace(): - assistant_text_parts.append(filtered_text) - delta_event = { - "type": "response.output_text.delta", - "delta": filtered_text, - "item_id": output_item_id, - "output_index": 0, - "content_index": 0, - } - yield ( - f"event: response.output_text.delta\ndata: {json.dumps(delta_event)}\n\n" - ) + raw_texts.append(block.get("text", "")) elif isinstance(content, str): - filtered_content = MessageAdapter.filter_content(content) - if filtered_content and not filtered_content.isspace(): - assistant_text_parts.append(filtered_content) - delta_event = { - "type": "response.output_text.delta", - "delta": filtered_content, + raw_texts.append(content) + + for raw_text in raw_texts: + filtered_text = MessageAdapter.filter_content(raw_text) + if not filtered_text or filtered_text.isspace(): + continue + + if not item_started: + item_started = True + yield _emit_item_added() + yield _emit_part_added() + + assistant_text_parts.append(filtered_text) + yield _sse( + "response.output_text.delta", + { "item_id": output_item_id, "output_index": 0, "content_index": 0, - } - yield ( - f"event: response.output_text.delta\ndata: {json.dumps(delta_event)}\n\n" - ) + "delta": filtered_text, + }, + ) assistant_content = "".join(assistant_text_parts).strip() if not assistant_content and chunks_buffer: @@ -803,16 +838,73 @@ async def generate_responses_stream( "total_tokens": token_usage["total_tokens"], } - response_completed = { - "id": request_id, - "object": "response", - "created": created_ts, - "model": request.model, + # If Claude returned text only in a final non-streaming chunk, synthesize + # the missing added+delta events so the spec sequence stays intact. + if not item_started and assistant_content: + item_started = True + yield _emit_item_added() + yield _emit_part_added() + yield _sse( + "response.output_text.delta", + { + "item_id": output_item_id, + "output_index": 0, + "content_index": 0, + "delta": assistant_content, + }, + ) + + if item_started: + yield _sse( + "response.output_text.done", + { + "item_id": output_item_id, + "output_index": 0, + "content_index": 0, + "text": assistant_content, + }, + ) + yield _sse( + "response.content_part.done", + { + "item_id": output_item_id, + "output_index": 0, + "content_index": 0, + "part": { + "type": "output_text", + "text": assistant_content, + "annotations": [], + }, + }, + ) + yield _sse( + "response.output_item.done", + { + "output_index": 0, + "item": { + "id": output_item_id, + "type": "message", + "role": "assistant", + "status": "completed", + "content": [ + { + "type": "output_text", + "text": assistant_content, + "annotations": [], + } + ], + }, + }, + ) + + final_response = { + **base_response, "status": "completed", "output": _build_responses_output(assistant_content, output_item_id), + "output_text": assistant_content, "usage": usage, } - yield f"event: response.completed\ndata: {json.dumps(response_completed)}\n\n" + yield _sse("response.completed", {"response": final_response}) yield "data: [DONE]\n\n" except Exception as e: @@ -1006,7 +1098,7 @@ async def responses_endpoint( top_p = body.get("top_p", 1.0) max_output_tokens = body.get("max_output_tokens") or body.get("max_tokens") user = body.get("user") - session_id = body.get("session_id") + session_id = body.get("session_id") or body.get("previous_response_id") messages = [] if "messages" in body: From b82f7221b2b2f5b7a413b71710cfde51295a3f8e Mon Sep 17 00:00:00 2001 From: shohei81 Date: Thu, 28 May 2026 22:27:57 +0900 Subject: [PATCH 5/7] fix(responses): use created_at to match OpenAI Responses API schema Non-streaming response was returning `created` (Chat Completions field name), causing the OpenAI SDK Responses model to fail validation since it expects `created_at`. Streaming already uses `created_at`. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main.py b/src/main.py index 474291e..340a6dc 100644 --- a/src/main.py +++ b/src/main.py @@ -1205,7 +1205,7 @@ async def responses_endpoint( response = { "id": request_id, "object": "response", - "created": int(datetime.now().timestamp()), + "created_at": int(datetime.now().timestamp()), "model": request_body.model, "status": "completed", "output": _build_responses_output(assistant_content, output_item_id), From e43279950a78e9a1f73e90d16f6a908385962b20 Mon Sep 17 00:00:00 2001 From: shohei81 Date: Thu, 28 May 2026 22:30:23 +0900 Subject: [PATCH 6/7] Revert "fix(responses): use created_at to match OpenAI Responses API schema" This reverts commit b82f7221b2b2f5b7a413b71710cfde51295a3f8e. --- src/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main.py b/src/main.py index 340a6dc..474291e 100644 --- a/src/main.py +++ b/src/main.py @@ -1205,7 +1205,7 @@ async def responses_endpoint( response = { "id": request_id, "object": "response", - "created_at": int(datetime.now().timestamp()), + "created": int(datetime.now().timestamp()), "model": request_body.model, "status": "completed", "output": _build_responses_output(assistant_content, output_item_id), From 99fbf8f006e735f4f6f347363e2a93fddae22882 Mon Sep 17 00:00:00 2001 From: shohei81 Date: Thu, 28 May 2026 22:30:23 +0900 Subject: [PATCH 7/7] Revert "feat(responses): align streaming + response shape with OpenAI Responses API" This reverts commit 827bbfa1647c87d03348c33108d5fbcd0d769e19. --- src/main.py | 170 ++++++++++++---------------------------------------- 1 file changed, 39 insertions(+), 131 deletions(-) diff --git a/src/main.py b/src/main.py index 474291e..cc23d36 100644 --- a/src/main.py +++ b/src/main.py @@ -456,8 +456,7 @@ def _build_responses_output(text: str, item_id: str) -> list[Dict[str, Any]]: "id": item_id, "type": "message", "role": "assistant", - "status": "completed", - "content": [{"type": "output_text", "text": text, "annotations": []}], + "content": [{"type": "output_text", "text": text}], } ] @@ -717,57 +716,17 @@ async def generate_responses_stream( output_item_id = f"msg_{uuid.uuid4().hex[:24]}" created_ts = int(datetime.now().timestamp()) - - # Spec: every event carries `type` + monotonic `sequence_number`. - # See https://developers.openai.com/api/reference/resources/responses/streaming-events - seq = 0 - - def _sse(event_name: str, payload: Dict[str, Any]) -> str: - nonlocal seq - data = {"type": event_name, "sequence_number": seq, **payload} - seq += 1 - return f"event: {event_name}\ndata: {json.dumps(data)}\n\n" - - base_response: Dict[str, Any] = { + response_created = { "id": request_id, "object": "response", - "created_at": created_ts, + "created": created_ts, "model": request.model, "status": "in_progress", - "output": [], } - yield _sse("response.created", {"response": base_response}) - yield _sse("response.in_progress", {"response": base_response}) + yield f"event: response.created\ndata: {json.dumps(response_created)}\n\n" chunks_buffer = [] assistant_text_parts: list[str] = [] - item_started = False - - def _emit_item_added() -> str: - return _sse( - "response.output_item.added", - { - "output_index": 0, - "item": { - "id": output_item_id, - "type": "message", - "role": "assistant", - "status": "in_progress", - "content": [], - }, - }, - ) - - def _emit_part_added() -> str: - return _sse( - "response.content_part.added", - { - "item_id": output_item_id, - "output_index": 0, - "content_index": 0, - "part": {"type": "output_text", "text": "", "annotations": []}, - }, - ) async for chunk in claude_cli.run_completion( prompt=prompt, @@ -791,36 +750,42 @@ def _emit_part_added() -> str: if content is None: continue - raw_texts: list[str] = [] if isinstance(content, list): for block in content: if hasattr(block, "text"): - raw_texts.append(block.text) + raw_text = block.text elif isinstance(block, dict) and block.get("type") == "text": - raw_texts.append(block.get("text", "")) + raw_text = block.get("text", "") + else: + continue + + filtered_text = MessageAdapter.filter_content(raw_text) + if filtered_text and not filtered_text.isspace(): + assistant_text_parts.append(filtered_text) + delta_event = { + "type": "response.output_text.delta", + "delta": filtered_text, + "item_id": output_item_id, + "output_index": 0, + "content_index": 0, + } + yield ( + f"event: response.output_text.delta\ndata: {json.dumps(delta_event)}\n\n" + ) elif isinstance(content, str): - raw_texts.append(content) - - for raw_text in raw_texts: - filtered_text = MessageAdapter.filter_content(raw_text) - if not filtered_text or filtered_text.isspace(): - continue - - if not item_started: - item_started = True - yield _emit_item_added() - yield _emit_part_added() - - assistant_text_parts.append(filtered_text) - yield _sse( - "response.output_text.delta", - { + filtered_content = MessageAdapter.filter_content(content) + if filtered_content and not filtered_content.isspace(): + assistant_text_parts.append(filtered_content) + delta_event = { + "type": "response.output_text.delta", + "delta": filtered_content, "item_id": output_item_id, "output_index": 0, "content_index": 0, - "delta": filtered_text, - }, - ) + } + yield ( + f"event: response.output_text.delta\ndata: {json.dumps(delta_event)}\n\n" + ) assistant_content = "".join(assistant_text_parts).strip() if not assistant_content and chunks_buffer: @@ -838,73 +803,16 @@ def _emit_part_added() -> str: "total_tokens": token_usage["total_tokens"], } - # If Claude returned text only in a final non-streaming chunk, synthesize - # the missing added+delta events so the spec sequence stays intact. - if not item_started and assistant_content: - item_started = True - yield _emit_item_added() - yield _emit_part_added() - yield _sse( - "response.output_text.delta", - { - "item_id": output_item_id, - "output_index": 0, - "content_index": 0, - "delta": assistant_content, - }, - ) - - if item_started: - yield _sse( - "response.output_text.done", - { - "item_id": output_item_id, - "output_index": 0, - "content_index": 0, - "text": assistant_content, - }, - ) - yield _sse( - "response.content_part.done", - { - "item_id": output_item_id, - "output_index": 0, - "content_index": 0, - "part": { - "type": "output_text", - "text": assistant_content, - "annotations": [], - }, - }, - ) - yield _sse( - "response.output_item.done", - { - "output_index": 0, - "item": { - "id": output_item_id, - "type": "message", - "role": "assistant", - "status": "completed", - "content": [ - { - "type": "output_text", - "text": assistant_content, - "annotations": [], - } - ], - }, - }, - ) - - final_response = { - **base_response, + response_completed = { + "id": request_id, + "object": "response", + "created": created_ts, + "model": request.model, "status": "completed", "output": _build_responses_output(assistant_content, output_item_id), - "output_text": assistant_content, "usage": usage, } - yield _sse("response.completed", {"response": final_response}) + yield f"event: response.completed\ndata: {json.dumps(response_completed)}\n\n" yield "data: [DONE]\n\n" except Exception as e: @@ -1098,7 +1006,7 @@ async def responses_endpoint( top_p = body.get("top_p", 1.0) max_output_tokens = body.get("max_output_tokens") or body.get("max_tokens") user = body.get("user") - session_id = body.get("session_id") or body.get("previous_response_id") + session_id = body.get("session_id") messages = [] if "messages" in body: