Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ session.json
session
*.env.*
!.env.example
CLAUDE.md
CLAUDE.md
shell.nix
224 changes: 196 additions & 28 deletions deepseek/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ class Reply:

text: str
conversation_id: str
reasoning_text: str = ""
citations: list[dict] = None

def __post_init__(self):
if self.citations is None:
self.citations = []

def __str__(self) -> str: # so print(reply) shows the text
return self.text
Expand Down Expand Up @@ -179,16 +185,23 @@ def chat(
"""Return the complete reply (`.text`) plus its `.conversation_id`."""
s = self.stream(prompt, conversation_id=conversation_id,
model=model, thinking=thinking, search=search)
text = "".join(s)
return Reply(text=text, conversation_id=s.conversation_id)
content_parts: list[str] = []
for chunk_type, text in s:
if chunk_type == "content":
content_parts.append(text)
return Reply(text="".join(content_parts),
reasoning_text=s.reasoning_text,
conversation_id=s.conversation_id,
citations=s.citations)

def close(self) -> None:
self._http.close()


class _Stream:
"""Iterable of reply-text chunks. After it's consumed, `.conversation_id`
holds the token for resuming the conversation."""
"""Iterable of (type, text) chunks. After it's consumed, `.conversation_id`
holds the token for resuming the conversation and `.reasoning_text` contains
the model's internal thinking (if any)."""

def __init__(self, client: "DeepSeekClient", prompt: str, session_id: str,
parent_id: Optional[int], model: str,
Expand All @@ -201,8 +214,9 @@ def __init__(self, client: "DeepSeekClient", prompt: str, session_id: str,
self._thinking = thinking
self._search = search
self._message_id: Optional[int] = None
self._thinking_parts: list[str] = []

def __iter__(self) -> Iterator[str]:
def __iter__(self) -> Iterator[tuple[str, str]]:
body = {
"chat_session_id": self._session_id,
"parent_message_id": self._parent_id,
Expand All @@ -219,34 +233,54 @@ def __iter__(self) -> Iterator[str]:
# PoW challenges are short-lived, so solve right before the request.
headers = {"x-ds-pow-response": self._client._pow_header()}
meta: dict = {}
self._thinking_parts = []
self._citations = []
with self._client._http.stream(
"POST", COMPLETION_PATH, json=body, headers=headers
) as resp:
resp.raise_for_status()
yield from _parse_sse(resp.iter_lines(), meta)
for chunk_type, text in _parse_sse(resp.iter_lines(), meta):
if chunk_type == "thinking":
self._thinking_parts.append(text)
yield (chunk_type, text)
if meta.get("message_id") is not None:
self._message_id = meta["message_id"]
self._citations = meta.get("citations", [])

@property
def conversation_id(self) -> str:
return _encode_cid(self._session_id, self._message_id)

@property
def reasoning_text(self) -> str:
"""The model's internal reasoning (only available after iteration)."""
return "".join(self._thinking_parts)

@property
def citations(self) -> list:
"""Search result citations collected during parsing (only available after iteration)."""
return getattr(self, "_citations", [])


def _parse_sse(lines, meta: Optional[dict] = None) -> Iterator[tuple[str, str]]:
"""Turn DeepSeek's SSE completion stream into ``(type, text)`` chunks.

def _parse_sse(lines, meta: Optional[dict] = None) -> Iterator[str]:
"""Turn DeepSeek's SSE completion stream into reply-text deltas.
Yields ``("thinking", text)`` for THINK-fragment content and
``("content", text)`` for RESPONSE-fragment content. TOOL_* fragments
(search, open) are tracked but not yielded directly — their result URLs
and titles are accumulated into ``meta["citations"]`` when *meta* is given.

The stream sends an initial snapshot frame whose `v` is the full response
object (with `fragments[].content`), then a series of append frames:
* {"p":"response/fragments/-1/content","o":"APPEND","v":" what"} (sets path)
* {"v":"'s"} (appends to it)
We track the active append path and emit only RESPONSE-fragment text.
The stream sends an initial snapshot frame whose ``v`` is the full response
object, then path-addressed append / batch frames that modify fragments.
We maintain an ordered list of fragment types so we know which kind of
content is arriving at ``fragments/-1``.

If `meta` is given, the assistant's `message_id` is recorded into it (used to
build the resumable conversation_id). The exact field location can vary, so
we look in a few plausible spots defensively.
If *meta* is given, the assistant's ``message_id`` is recorded into it,
and search-result citations are collected into ``meta["citations"]``.
"""
active_path: Optional[str] = None
emitted_initial = False
fragments: list[Optional[str]] = [] # ordered fragment types by index

for line in lines:
if not line or not line.startswith("data:"):
Expand All @@ -261,32 +295,166 @@ def _parse_sse(lines, meta: Optional[dict] = None) -> Iterator[str]:

v = obj.get("v")

# Snapshot frame: full response object.
# ── snapshot frame ──────────────────────────────────────────────
if isinstance(v, dict) and "response" in v:
if meta is not None:
_capture_message_id(meta, v)
fragments.clear()
for frag in v["response"].get("fragments", []):
if frag.get("type") == "RESPONSE" and frag.get("content"):
active_path = "response/fragments/-1/content"
if not emitted_initial:
emitted_initial = True
yield frag["content"]
ft = frag.get("type") if isinstance(frag, dict) else None
fragments.append(ft)
if isinstance(frag, dict):
if ft in ("TOOL_SEARCH", "SEARCH"):
_capture_search_results(meta, frag.get("results", []))
elif ft == "TOOL_OPEN":
_capture_source(meta, frag.get("result"))
elif ft in ("RESPONSE", "THINK") and frag.get("content"):
active_path = "response/fragments/-1/content"
if not emitted_initial:
emitted_initial = True
yield ("thinking" if ft == "THINK" else "content",
frag["content"])
continue

# Path-setting append frame.
# ── BATCH operation (nested mutations) ──────────────────────────
if "p" in obj and obj.get("o") == "BATCH" and isinstance(v, list):
active_path = obj["p"]
_scan_for_sources(meta, v)
yield from _process_batch(v, active_path, fragments, meta)
continue

# ── bare BATCH (list of ops without outer wrapper) ──────────────
if "p" not in obj and "o" not in obj and isinstance(v, list):
_scan_for_sources(meta, v)
yield from _process_batch(v, active_path or "response", fragments, meta)
continue

# ── path-setting frame ──────────────────────────────────────────
if "p" in obj:
active_path = obj["p"]
if meta is not None and active_path.endswith("message_id") \
and isinstance(v, int):
meta["message_id"] = v
if obj.get("o") == "APPEND" and isinstance(v, str) \
and active_path.endswith("content"):
yield v
op = obj.get("o")
if op == "SET" and meta is not None and isinstance(v, (dict, list)):
_scan_for_sources(meta, v)
if active_path.endswith("results") and isinstance(v, list):
_capture_search_results(meta, v)
elif op == "APPEND":
if isinstance(v, str) and active_path.endswith("content"):
ft = _last_fragment_type(fragments)
yield ("thinking" if ft == "THINK" else "content", v)
elif active_path.endswith("fragments"):
items = v if isinstance(v, list) else [v]
for frag in items:
ft = frag.get("type") if isinstance(frag, dict) else None
fragments.append(ft)
if isinstance(frag, dict):
if ft in ("TOOL_SEARCH", "SEARCH"):
_capture_search_results(meta, frag.get("results", []))
elif ft == "TOOL_OPEN":
_capture_source(meta, frag.get("result"))
elif ft in ("RESPONSE", "THINK") \
and frag.get("content"):
yield ("thinking" if ft == "THINK" else "content",
frag["content"])
elif op is None:
if isinstance(v, str) and active_path.endswith("content"):
ft = _last_fragment_type(fragments)
yield ("thinking" if ft == "THINK" else "content", v)
elif isinstance(v, list) and meta is not None \
and active_path.endswith("results"):
_scan_for_sources(meta, v)
_capture_search_results(meta, v)
continue

# Bare append to the current path.
# ── bare append (path kept from previous frame) ─────────────────
if isinstance(v, str) and active_path and active_path.endswith("content"):
yield v
ft = _last_fragment_type(fragments)
yield ("thinking" if ft == "THINK" else "content", v)


def _last_fragment_type(fragments: list) -> Optional[str]:
"""Type of the fragment at index -1 (the most recently added one)."""
return fragments[-1] if fragments else None


def _capture_source(meta: Optional[dict], result) -> None:
"""Add a single search/open result to ``meta["citations"]``, deduplicating by URL."""
if meta is None or not isinstance(result, dict):
return
url = result.get("url", "")
if not url:
return
citations = meta.setdefault("citations", [])
if not any(c.get("url") == url for c in citations):
citations.append({"url": url, "title": result.get("title", "")})


def _capture_search_results(meta: Optional[dict], results: list) -> None:
"""Add all results from a TOOL_SEARCH fragment to the citations list."""
if meta is None:
return
for r in results:
_capture_source(meta, r)


def _scan_for_sources(meta: Optional[dict], obj, seen_urls: set = None) -> None:
"""Recursively scan *obj* for dicts carrying ``url`` + ``title`` keys.

This catches search results regardless of the SSE path or fragment type
they arrive in, making citation extraction resilient to API changes.
"""
if meta is None:
return
if seen_urls is None:
seen_urls = set()
if isinstance(obj, dict):
url = obj.get("url")
if url and isinstance(url, str) and url.startswith("http") \
and url not in seen_urls:
seen_urls.add(url)
_capture_source(meta, obj)
for v in obj.values():
_scan_for_sources(meta, v, seen_urls)
elif isinstance(obj, list):
for item in obj:
_scan_for_sources(meta, item, seen_urls)


def _process_batch(ops: list, base_path: str, fragments: list,
meta: Optional[dict] = None) -> Iterator[tuple[str, str]]:
"""Recursively process BATCH operations, tracking fragment types and
yielding any content appends found inside."""
for op in ops:
if not isinstance(op, dict):
continue
p = op.get("p", "")
o = op.get("o", "")
v = op.get("v")

resolved = f"{base_path}/{p}" if p else base_path

if p == "fragments" and o == "APPEND":
items = v if isinstance(v, list) else [v]
for frag in items:
ft = frag.get("type") if isinstance(frag, dict) else None
fragments.append(ft)
if isinstance(frag, dict):
if ft in ("TOOL_SEARCH", "SEARCH"):
_capture_search_results(meta, frag.get("results", []))
elif ft == "TOOL_OPEN":
_capture_source(meta, frag.get("result"))
elif ft in ("RESPONSE", "THINK") and frag.get("content"):
yield ("thinking" if ft == "THINK" else "content",
frag["content"])
elif resolved.endswith("content") and o == "APPEND" and isinstance(v, str):
ft = _last_fragment_type(fragments)
yield ("thinking" if ft == "THINK" else "content", v)
elif o == "BATCH" and isinstance(v, list):
yield from _process_batch(v, resolved, fragments, meta)
# SET ops (references, status, elapsed_secs) are intentionally ignored.



def _capture_message_id(meta: dict, snapshot: dict) -> None:
Expand Down
16 changes: 10 additions & 6 deletions examples/03_direct_stream.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""Example 3 — stream the reply as it is generated, in-process.

client.stream() yields pieces of text as they arrive, instead of waiting for the
whole reply. Good for showing output live (like a chat UI typing).
client.stream() yields (type, text) tuples as they arrive; type is "thinking" for
DeepThink internal reasoning and "content" for the actual response. Good for
showing output live (like a chat UI typing).

Run it from the project root:

Expand All @@ -17,12 +18,15 @@

client = DeepSeekClient()

# Omit conversation_id to start fresh. Each chunk is a string; the
# Omit conversation_id to start fresh. Each chunk is a (type, text) tuple; the
# conversation_id is filled in once the stream finishes.
stream = client.stream("Tell me a short, clean joke.")
for chunk in stream:
print(chunk, end="", flush=True)
stream = client.stream("Tell me a short, clean joke.", thinking=True)
for chunk_type, chunk in stream:
if chunk_type == "content":
print(chunk, end="", flush=True)

print() # newline after the streamed text
print("conversation_id:", stream.conversation_id)
if stream.reasoning_text:
print("reasoning:", stream.reasoning_text[:120] + "...")
client.close()
4 changes: 3 additions & 1 deletion server/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,6 @@ def gen():
except Exception as e:
return _error(f"DeepSeek request failed: {e}")

return completion_response(req.model, reply.text, prompt, reply.conversation_id)
return completion_response(req.model, reply.text, prompt,
reply.conversation_id, reply.reasoning_text,
reply.citations)
Loading