Complete guide to Server-Sent Events (SSE) streaming in the EdgeQuake Python SDK.
- What is Streaming?
- Query Streaming
- Chat Streaming
- Error Handling
- Advanced Patterns
- Performance Considerations
Streaming allows EdgeQuake to send responses incrementally as tokens are generated by the LLM, rather than waiting for the entire response to complete. This enables:
- Real-time UI updates — Display text as it's generated
- Better UX — Users see progress instead of loading spinner
- Lower perceived latency — First tokens arrive quickly
- Cancellation support — Stop generation mid-stream
Streaming uses Server-Sent Events (SSE) over HTTP:
GET /query/stream?query=What+is+EdgeQuake HTTP/1.1
Host: localhost:8080
Accept: text/event-stream
HTTP/1.1 200 OK
Content-Type: text/event-stream
data: {"chunk":"Edge","type":"token"}
data: {"chunk":"Quake","type":"token"}
data: {"chunk":" is","type":"token"}
data: {"type":"done"}import sys
from edgequake import EdgequakeClient
client = EdgequakeClient(api_key="your-key")
# Stream query results
for chunk in client.query.stream(query="What are knowledge graphs?"):
if isinstance(chunk, dict) and "chunk" in chunk:
sys.stdout.write(chunk["chunk"])
sys.stdout.flush()
elif isinstance(chunk, str):
sys.stdout.write(chunk)
sys.stdout.flush()
print() # Newline after streaming# Stream with hybrid retrieval mode
for chunk in client.query.stream(
query="Explain how RAG enhances LLMs",
mode="hybrid",
top_k=10
):
if isinstance(chunk, dict):
# Structured chunk
if chunk.get("type") == "token":
print(chunk["chunk"], end="", flush=True)
elif chunk.get("type") == "source":
print(f"\n[Source: {chunk['source']}]", flush=True)
else:
# Plain text chunk
print(chunk, end="", flush=True)# Collect all chunks into full response
chunks = []
for chunk in client.query.stream(query="What is EdgeQuake?"):
if isinstance(chunk, dict) and "chunk" in chunk:
chunks.append(chunk["chunk"])
elif isinstance(chunk, str):
chunks.append(chunk)
full_response = "".join(chunks)
print(full_response)import sys
from edgequake import EdgequakeClient
client = EdgequakeClient(api_key="your-key")
# OpenAI-compatible streaming
for chunk in client.chat.stream(
model="edgequake",
messages=[
{"role": "user", "content": "What are the benefits of graph-based RAG?"}
]
):
if isinstance(chunk, dict):
delta = chunk.get("choices", [{}])[0].get("delta", {}).get("content")
if delta:
sys.stdout.write(delta)
sys.stdout.flush()
print()import sys
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "What is EdgeQuake?"}
]
# Stream first response
response_text = ""
for chunk in client.chat.stream(model="edgequake", messages=messages):
delta = chunk.get("choices", [{}])[0].get("delta", {}).get("content")
if delta:
response_text += delta
sys.stdout.write(delta)
sys.stdout.flush()
print()
# Add assistant's response to conversation
messages.append({"role": "assistant", "content": response_text})
# Continue conversation
messages.append({"role": "user", "content": "How does it work?"})
for chunk in client.chat.stream(model="edgequake", messages=messages):
delta = chunk.get("choices", [{}])[0].get("delta", {}).get("content")
if delta:
sys.stdout.write(delta)
sys.stdout.flush()
print()from edgequake.exceptions import NetworkError, TimeoutError
try:
for chunk in client.query.stream(query="What is RAG?"):
print(chunk, end="", flush=True)
except NetworkError as e:
print(f"\n[Connection lost: {e}]")
# Option 1: Retry from beginning
for chunk in client.query.stream(query="What is RAG?"):
print(chunk, end="", flush=True)
except TimeoutError:
print("\n[Stream timeout]")import time
timeout_seconds = 60
start_time = time.time()
try:
for chunk in client.query.stream(query="Write a long essay..."):
if time.time() - start_time > timeout_seconds:
print("\n[Stream timeout — partial response received]")
break
print(chunk, end="", flush=True)
except Exception as e:
print(f"\n[Stream error: {e}]")def query_with_fallback(client, query):
"""Try streaming, fall back to non-streaming on error."""
try:
# Try streaming first
response = ""
for chunk in client.query.stream(query=query):
if isinstance(chunk, str):
response += chunk
elif isinstance(chunk, dict) and "chunk" in chunk:
response += chunk["chunk"]
return response
except Exception as e:
print(f"[Streaming failed: {e}, using synchronous mode]")
# Fall back to synchronous query
result = client.query.execute(query=query)
return result["answer"]
answer = query_with_fallback(client, "What is EdgeQuake?")
print(answer)import sys
total_tokens = 0
start_time = time.time()
for chunk in client.query.stream(query="Explain knowledge graphs in detail"):
if isinstance(chunk, dict) and "chunk" in chunk:
total_tokens += 1
sys.stdout.write(chunk["chunk"])
sys.stdout.flush()
# Show progress every 10 tokens
if total_tokens % 10 == 0:
elapsed = time.time() - start_time
tokens_per_sec = total_tokens / elapsed
print(f"\n[{total_tokens} tokens, {tokens_per_sec:.1f} tokens/sec]", flush=True)
print(f"\n\nTotal: {total_tokens} tokens in {time.time() - start_time:.2f}s")import sys
buffer = ""
for chunk in client.query.stream(query="What is RAG?"):
if isinstance(chunk, dict) and "chunk" in chunk:
buffer += chunk["chunk"]
elif isinstance(chunk, str):
buffer += chunk
# Flush on space (word boundary)
if " " in buffer or "\n" in buffer:
sys.stdout.write(buffer)
sys.stdout.flush()
buffer = ""
# Flush remaining
if buffer:
sys.stdout.write(buffer)
sys.stdout.flush()
print()import asyncio
from edgequake import AsyncEdgequakeClient
async def stream_query_async():
client = AsyncEdgequakeClient(api_key="your-key")
async for chunk in client.query.stream(query="What is EdgeQuake?"):
if isinstance(chunk, dict) and "chunk" in chunk:
print(chunk["chunk"], end="", flush=True)
await asyncio.sleep(0) # Yield control
print()
asyncio.run(stream_query_async())with open("response.txt", "w") as f:
for chunk in client.query.stream(query="Explain RAG"):
if isinstance(chunk, str):
f.write(chunk)
f.flush()
elif isinstance(chunk, dict) and "chunk" in chunk:
f.write(chunk["chunk"])
f.flush()
print("Response saved to response.txt")from flask import Flask, Response, stream_with_context
from edgequake import EdgequakeClient
app = Flask(__name__)
client = EdgequakeClient(api_key="your-key")
@app.route("/stream")
def stream_endpoint():
def generate():
for chunk in client.query.stream(query="What is EdgeQuake?"):
if isinstance(chunk, str):
yield f"data: {chunk}\n\n"
elif isinstance(chunk, dict) and "chunk" in chunk:
yield f"data: {chunk['chunk']}\n\n"
yield "data: [DONE]\n\n"
return Response(stream_with_context(generate()), mimetype="text/event-stream")
# Client-side JavaScript:
# const eventSource = new EventSource('/stream');
# eventSource.onmessage = (event) => {
# document.getElementById('output').textContent += event.data;
# };Streaming is most beneficial when network latency is low:
import time
# Measure first token latency
start = time.time()
first_chunk_received = False
for chunk in client.query.stream(query="What is RAG?"):
if not first_chunk_received:
first_token_latency = time.time() - start
print(f"[First token: {first_token_latency:.3f}s]")
first_chunk_received = True
print(chunk, end="", flush=True)Expected latencies:
- Local (localhost): 10-50ms
- Same datacenter: 50-200ms
- Cross-region: 200-500ms
Python's sys.stdout is line-buffered by default. Use flush=True:
# ❌ Bad: May not show tokens immediately
print(chunk, end="")
# ✅ Good: Forces immediate output
print(chunk, end="", flush=True)
# Or:
import sys
sys.stdout.write(chunk)
sys.stdout.flush()Streaming throughput depends on LLM provider:
| Provider | Tokens/sec |
|---|---|
| Ollama (local) | 10-50 |
| OpenAI GPT-4 | 30-80 |
| OpenAI GPT-3.5 | 80-150 |
# Increase timeout for long-running streams
client = EdgequakeClient(
api_key="your-key",
timeout=300 # 5 minutes
)
for chunk in client.query.stream(query="Write a detailed essay..."):
print(chunk, end="", flush=True)Streaming uses constant memory (unlike buffering entire response):
import sys
# ✅ Good: Constant memory
for chunk in client.query.stream(query="Long essay"):
sys.stdout.write(chunk)
# ❌ Bad: Stores entire response in memory
chunks = list(client.query.stream(query="Long essay"))
full_text = "".join(chunks)Problem: Stream appears frozen
Solution: Ensure output is flushed:
import sys
for chunk in client.query.stream(query="..."):
sys.stdout.write(chunk)
sys.stdout.flush() # Force flushProblem: BrokenPipeError: [Errno 32] Broken pipe
Solution: Connection lost mid-stream, retry:
from edgequake.exceptions import NetworkError
max_retries = 3
for attempt in range(max_retries):
try:
for chunk in client.query.stream(query="..."):
print(chunk, end="", flush=True)
break # Success
except NetworkError:
if attempt < max_retries - 1:
print("\n[Retrying...]")
continue
raiseProblem: Tokens arrive slowly
Solutions:
- Check network latency:
ping your-api-server - Increase buffer size in SDK (if configurable)
- Use faster LLM provider
- Reduce
top_kto retrieve less context
Problem: JSONDecodeError while parsing chunks
Solution: Handle both JSON and plain text:
import json
for chunk in client.query.stream(query="..."):
try:
parsed = json.loads(chunk) if isinstance(chunk, str) else chunk
print(parsed.get("chunk", ""), end="", flush=True)
except json.JSONDecodeError:
# Plain text chunk
print(chunk, end="", flush=True)- API Reference:
API.md - Authentication:
AUTHENTICATION.md - Examples:
examples/streaming_query.py