From d112e152351a17479603e3429be4ade5f1786b1f Mon Sep 17 00:00:00 2001 From: nico Date: Thu, 2 Jul 2026 21:19:11 +0200 Subject: [PATCH] Fix reasoning and web search output for OpenAI compatibility - Separate THINK and RESPONSE fragments into reasoning_content and content respectively, matching OpenAI's delta format - Capture SEARCH/TOOL_SEARCH/TOOL_OPEN results as a citations array in the response - Handle BATCH operations, implicit path frames, and bare BATCH arrays in the SSE parser - Add shell.nix to .gitignore --- .gitignore | 3 +- deepseek/client.py | 224 ++++++++++++++++++++++++++++++----- examples/03_direct_stream.py | 16 ++- server/api.py | 4 +- server/openai_format.py | 60 +++++++--- 5 files changed, 255 insertions(+), 52 deletions(-) diff --git a/.gitignore b/.gitignore index ee08b94..1fbdffb 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,5 @@ session.json session *.env.* !.env.example -CLAUDE.md \ No newline at end of file +CLAUDE.md +shell.nix \ No newline at end of file diff --git a/deepseek/client.py b/deepseek/client.py index 677c75b..c4aea5c 100644 --- a/deepseek/client.py +++ b/deepseek/client.py @@ -65,6 +65,12 @@ class Reply: text: str conversation_id: str + reasoning_text: str = "" + citations: list[dict] = None + + def __post_init__(self): + if self.citations is None: + self.citations = [] def __str__(self) -> str: # so print(reply) shows the text return self.text @@ -179,16 +185,23 @@ def chat( """Return the complete reply (`.text`) plus its `.conversation_id`.""" s = self.stream(prompt, conversation_id=conversation_id, model=model, thinking=thinking, search=search) - text = "".join(s) - return Reply(text=text, conversation_id=s.conversation_id) + content_parts: list[str] = [] + for chunk_type, text in s: + if chunk_type == "content": + content_parts.append(text) + return Reply(text="".join(content_parts), + reasoning_text=s.reasoning_text, + conversation_id=s.conversation_id, + citations=s.citations) def close(self) -> None: self._http.close() class _Stream: - """Iterable of reply-text chunks. After it's consumed, `.conversation_id` - holds the token for resuming the conversation.""" + """Iterable of (type, text) chunks. After it's consumed, `.conversation_id` + holds the token for resuming the conversation and `.reasoning_text` contains + the model's internal thinking (if any).""" def __init__(self, client: "DeepSeekClient", prompt: str, session_id: str, parent_id: Optional[int], model: str, @@ -201,8 +214,9 @@ def __init__(self, client: "DeepSeekClient", prompt: str, session_id: str, self._thinking = thinking self._search = search self._message_id: Optional[int] = None + self._thinking_parts: list[str] = [] - def __iter__(self) -> Iterator[str]: + def __iter__(self) -> Iterator[tuple[str, str]]: body = { "chat_session_id": self._session_id, "parent_message_id": self._parent_id, @@ -219,34 +233,54 @@ def __iter__(self) -> Iterator[str]: # PoW challenges are short-lived, so solve right before the request. headers = {"x-ds-pow-response": self._client._pow_header()} meta: dict = {} + self._thinking_parts = [] + self._citations = [] with self._client._http.stream( "POST", COMPLETION_PATH, json=body, headers=headers ) as resp: resp.raise_for_status() - yield from _parse_sse(resp.iter_lines(), meta) + for chunk_type, text in _parse_sse(resp.iter_lines(), meta): + if chunk_type == "thinking": + self._thinking_parts.append(text) + yield (chunk_type, text) if meta.get("message_id") is not None: self._message_id = meta["message_id"] + self._citations = meta.get("citations", []) @property def conversation_id(self) -> str: return _encode_cid(self._session_id, self._message_id) + @property + def reasoning_text(self) -> str: + """The model's internal reasoning (only available after iteration).""" + return "".join(self._thinking_parts) + + @property + def citations(self) -> list: + """Search result citations collected during parsing (only available after iteration).""" + return getattr(self, "_citations", []) + + +def _parse_sse(lines, meta: Optional[dict] = None) -> Iterator[tuple[str, str]]: + """Turn DeepSeek's SSE completion stream into ``(type, text)`` chunks. -def _parse_sse(lines, meta: Optional[dict] = None) -> Iterator[str]: - """Turn DeepSeek's SSE completion stream into reply-text deltas. + Yields ``("thinking", text)`` for THINK-fragment content and + ``("content", text)`` for RESPONSE-fragment content. TOOL_* fragments + (search, open) are tracked but not yielded directly — their result URLs + and titles are accumulated into ``meta["citations"]`` when *meta* is given. - The stream sends an initial snapshot frame whose `v` is the full response - object (with `fragments[].content`), then a series of append frames: - * {"p":"response/fragments/-1/content","o":"APPEND","v":" what"} (sets path) - * {"v":"'s"} (appends to it) - We track the active append path and emit only RESPONSE-fragment text. + The stream sends an initial snapshot frame whose ``v`` is the full response + object, then path-addressed append / batch frames that modify fragments. + We maintain an ordered list of fragment types so we know which kind of + content is arriving at ``fragments/-1``. - If `meta` is given, the assistant's `message_id` is recorded into it (used to - build the resumable conversation_id). The exact field location can vary, so - we look in a few plausible spots defensively. + If *meta* is given, the assistant's ``message_id`` is recorded into it, + and search-result citations are collected into ``meta["citations"]``. """ active_path: Optional[str] = None emitted_initial = False + fragments: list[Optional[str]] = [] # ordered fragment types by index for line in lines: if not line or not line.startswith("data:"): @@ -261,32 +295,166 @@ def _parse_sse(lines, meta: Optional[dict] = None) -> Iterator[str]: v = obj.get("v") - # Snapshot frame: full response object. + # ── snapshot frame ────────────────────────────────────────────── if isinstance(v, dict) and "response" in v: if meta is not None: _capture_message_id(meta, v) + fragments.clear() for frag in v["response"].get("fragments", []): - if frag.get("type") == "RESPONSE" and frag.get("content"): - active_path = "response/fragments/-1/content" - if not emitted_initial: - emitted_initial = True - yield frag["content"] + ft = frag.get("type") if isinstance(frag, dict) else None + fragments.append(ft) + if isinstance(frag, dict): + if ft in ("TOOL_SEARCH", "SEARCH"): + _capture_search_results(meta, frag.get("results", [])) + elif ft == "TOOL_OPEN": + _capture_source(meta, frag.get("result")) + elif ft in ("RESPONSE", "THINK") and frag.get("content"): + active_path = "response/fragments/-1/content" + if not emitted_initial: + emitted_initial = True + yield ("thinking" if ft == "THINK" else "content", + frag["content"]) continue - # Path-setting append frame. + # ── BATCH operation (nested mutations) ────────────────────────── + if "p" in obj and obj.get("o") == "BATCH" and isinstance(v, list): + active_path = obj["p"] + _scan_for_sources(meta, v) + yield from _process_batch(v, active_path, fragments, meta) + continue + + # ── bare BATCH (list of ops without outer wrapper) ────────────── + if "p" not in obj and "o" not in obj and isinstance(v, list): + _scan_for_sources(meta, v) + yield from _process_batch(v, active_path or "response", fragments, meta) + continue + + # ── path-setting frame ────────────────────────────────────────── if "p" in obj: active_path = obj["p"] if meta is not None and active_path.endswith("message_id") \ and isinstance(v, int): meta["message_id"] = v - if obj.get("o") == "APPEND" and isinstance(v, str) \ - and active_path.endswith("content"): - yield v + op = obj.get("o") + if op == "SET" and meta is not None and isinstance(v, (dict, list)): + _scan_for_sources(meta, v) + if active_path.endswith("results") and isinstance(v, list): + _capture_search_results(meta, v) + elif op == "APPEND": + if isinstance(v, str) and active_path.endswith("content"): + ft = _last_fragment_type(fragments) + yield ("thinking" if ft == "THINK" else "content", v) + elif active_path.endswith("fragments"): + items = v if isinstance(v, list) else [v] + for frag in items: + ft = frag.get("type") if isinstance(frag, dict) else None + fragments.append(ft) + if isinstance(frag, dict): + if ft in ("TOOL_SEARCH", "SEARCH"): + _capture_search_results(meta, frag.get("results", [])) + elif ft == "TOOL_OPEN": + _capture_source(meta, frag.get("result")) + elif ft in ("RESPONSE", "THINK") \ + and frag.get("content"): + yield ("thinking" if ft == "THINK" else "content", + frag["content"]) + elif op is None: + if isinstance(v, str) and active_path.endswith("content"): + ft = _last_fragment_type(fragments) + yield ("thinking" if ft == "THINK" else "content", v) + elif isinstance(v, list) and meta is not None \ + and active_path.endswith("results"): + _scan_for_sources(meta, v) + _capture_search_results(meta, v) continue - # Bare append to the current path. + # ── bare append (path kept from previous frame) ───────────────── if isinstance(v, str) and active_path and active_path.endswith("content"): - yield v + ft = _last_fragment_type(fragments) + yield ("thinking" if ft == "THINK" else "content", v) + + +def _last_fragment_type(fragments: list) -> Optional[str]: + """Type of the fragment at index -1 (the most recently added one).""" + return fragments[-1] if fragments else None + + +def _capture_source(meta: Optional[dict], result) -> None: + """Add a single search/open result to ``meta["citations"]``, deduplicating by URL.""" + if meta is None or not isinstance(result, dict): + return + url = result.get("url", "") + if not url: + return + citations = meta.setdefault("citations", []) + if not any(c.get("url") == url for c in citations): + citations.append({"url": url, "title": result.get("title", "")}) + + +def _capture_search_results(meta: Optional[dict], results: list) -> None: + """Add all results from a TOOL_SEARCH fragment to the citations list.""" + if meta is None: + return + for r in results: + _capture_source(meta, r) + + +def _scan_for_sources(meta: Optional[dict], obj, seen_urls: set = None) -> None: + """Recursively scan *obj* for dicts carrying ``url`` + ``title`` keys. + + This catches search results regardless of the SSE path or fragment type + they arrive in, making citation extraction resilient to API changes. + """ + if meta is None: + return + if seen_urls is None: + seen_urls = set() + if isinstance(obj, dict): + url = obj.get("url") + if url and isinstance(url, str) and url.startswith("http") \ + and url not in seen_urls: + seen_urls.add(url) + _capture_source(meta, obj) + for v in obj.values(): + _scan_for_sources(meta, v, seen_urls) + elif isinstance(obj, list): + for item in obj: + _scan_for_sources(meta, item, seen_urls) + + +def _process_batch(ops: list, base_path: str, fragments: list, + meta: Optional[dict] = None) -> Iterator[tuple[str, str]]: + """Recursively process BATCH operations, tracking fragment types and + yielding any content appends found inside.""" + for op in ops: + if not isinstance(op, dict): + continue + p = op.get("p", "") + o = op.get("o", "") + v = op.get("v") + + resolved = f"{base_path}/{p}" if p else base_path + + if p == "fragments" and o == "APPEND": + items = v if isinstance(v, list) else [v] + for frag in items: + ft = frag.get("type") if isinstance(frag, dict) else None + fragments.append(ft) + if isinstance(frag, dict): + if ft in ("TOOL_SEARCH", "SEARCH"): + _capture_search_results(meta, frag.get("results", [])) + elif ft == "TOOL_OPEN": + _capture_source(meta, frag.get("result")) + elif ft in ("RESPONSE", "THINK") and frag.get("content"): + yield ("thinking" if ft == "THINK" else "content", + frag["content"]) + elif resolved.endswith("content") and o == "APPEND" and isinstance(v, str): + ft = _last_fragment_type(fragments) + yield ("thinking" if ft == "THINK" else "content", v) + elif o == "BATCH" and isinstance(v, list): + yield from _process_batch(v, resolved, fragments, meta) + # SET ops (references, status, elapsed_secs) are intentionally ignored. + def _capture_message_id(meta: dict, snapshot: dict) -> None: diff --git a/examples/03_direct_stream.py b/examples/03_direct_stream.py index c77d9a1..6a0e1a9 100644 --- a/examples/03_direct_stream.py +++ b/examples/03_direct_stream.py @@ -1,7 +1,8 @@ """Example 3 — stream the reply as it is generated, in-process. -client.stream() yields pieces of text as they arrive, instead of waiting for the -whole reply. Good for showing output live (like a chat UI typing). +client.stream() yields (type, text) tuples as they arrive; type is "thinking" for +DeepThink internal reasoning and "content" for the actual response. Good for +showing output live (like a chat UI typing). Run it from the project root: @@ -17,12 +18,15 @@ client = DeepSeekClient() -# Omit conversation_id to start fresh. Each chunk is a string; the +# Omit conversation_id to start fresh. Each chunk is a (type, text) tuple; the # conversation_id is filled in once the stream finishes. -stream = client.stream("Tell me a short, clean joke.") -for chunk in stream: - print(chunk, end="", flush=True) +stream = client.stream("Tell me a short, clean joke.", thinking=True) +for chunk_type, chunk in stream: + if chunk_type == "content": + print(chunk, end="", flush=True) print() # newline after the streamed text print("conversation_id:", stream.conversation_id) +if stream.reasoning_text: + print("reasoning:", stream.reasoning_text[:120] + "...") client.close() diff --git a/server/api.py b/server/api.py index 471b258..8687ac8 100644 --- a/server/api.py +++ b/server/api.py @@ -141,4 +141,6 @@ def gen(): except Exception as e: return _error(f"DeepSeek request failed: {e}") - return completion_response(req.model, reply.text, prompt, reply.conversation_id) + return completion_response(req.model, reply.text, prompt, + reply.conversation_id, reply.reasoning_text, + reply.citations) diff --git a/server/openai_format.py b/server/openai_format.py index f0985cb..5df304f 100644 --- a/server/openai_format.py +++ b/server/openai_format.py @@ -3,6 +3,10 @@ DeepSeek's protocol has no system/role channel — just a single `prompt` string. So we flatten the OpenAI `messages` array into one prompt, and wrap DeepSeek's text output back into OpenAI response/stream objects. + +The client's stream yields ``(type, text)`` tuples where *type* is ``"thinking"`` +or ``"content"``. Thinking text is mapped to OpenAI's ``reasoning_content`` +delta field; regular content maps to ``content``. """ from __future__ import annotations @@ -10,7 +14,7 @@ import json import time import uuid -from typing import Iterable, List +from typing import Iterable, List, Optional from .schemas import ChatMessage @@ -62,14 +66,20 @@ def _est_tokens(text: str) -> int: def completion_response(model: str, content: str, prompt: str, - conversation_id: str = None) -> dict: + conversation_id: str = None, + reasoning_content: Optional[str] = None, + citations: Optional[list] = None) -> dict: """A full (non-streaming) OpenAI chat.completion object. - `conversation_id` is an extra top-level field (outside OpenAI's schema) you - send back to resume the conversation. + `conversation_id` and `citations` are extra top-level fields (outside + OpenAI's schema) that the caller can use to resume the conversation or + display search-result sources. """ pt, ct = _est_tokens(prompt), _est_tokens(content) - return { + message: dict = {"role": "assistant", "content": content} + if reasoning_content: + message["reasoning_content"] = reasoning_content + result = { "id": _id(), "object": "chat.completion", "created": _now(), @@ -78,7 +88,7 @@ def completion_response(model: str, content: str, prompt: str, "choices": [ { "index": 0, - "message": {"role": "assistant", "content": content}, + "message": message, "finish_reason": "stop", } ], @@ -88,15 +98,22 @@ def completion_response(model: str, content: str, prompt: str, "total_tokens": pt + ct, }, } + if citations: + result["citations"] = citations + return result -def stream_chunks(model: str, stream: Iterable[str]) -> Iterable[str]: - """Yield OpenAI SSE lines (`data: {...}\\n\\n`) for a streamed completion. +def stream_chunks(model: str, stream: Iterable[tuple[str, str]]) -> Iterable[str]: + """Yield OpenAI SSE lines (``data: {...}\\n\\n``) for a streamed completion. - `stream` is the client's stream object; after it's consumed we read its - `.conversation_id` and attach it to the final chunk. + *stream* is the client's stream yielding ``(type, text)`` tuples where + *type* is ``"thinking"`` or ``"content"``. Thinking text becomes + ``reasoning_content`` in the delta; regular text becomes ``content``. + After the stream is consumed we read its ``.conversation_id`` and attach + it to the final chunk. """ cid, created = _id(), _now() + role_sent = False def frame(delta: dict, finish=None, extra: dict = None) -> str: obj = { @@ -110,11 +127,22 @@ def frame(delta: dict, finish=None, extra: dict = None) -> str: obj.update(extra) return f"data: {json.dumps(obj, ensure_ascii=False)}\n\n" - # First frame announces the assistant role. - yield frame({"role": "assistant", "content": ""}) - for d in stream: - if d: - yield frame({"content": d}) + for chunk_type, text in stream: + if not text: + continue + if chunk_type == "thinking": + delta = {"reasoning_content": text} + else: + delta = {"content": text} + if not role_sent: + delta["role"] = "assistant" + role_sent = True + yield frame(delta) + conversation_id = getattr(stream, "conversation_id", None) - yield frame({}, finish="stop", extra={"conversation_id": conversation_id}) + citations = getattr(stream, "citations", None) + final_extra = {"conversation_id": conversation_id} + if citations: + final_extra["citations"] = citations + yield frame({}, finish="stop", extra=final_extra) yield "data: [DONE]\n\n"