Skip to content

Latest commit

 

History

History
506 lines (374 loc) · 11.8 KB

File metadata and controls

506 lines (374 loc) · 11.8 KB

EdgeQuake Python SDK - Streaming Guide

Complete guide to Server-Sent Events (SSE) streaming in the EdgeQuake Python SDK.

Table of Contents


What is Streaming?

Streaming allows EdgeQuake to send responses incrementally as tokens are generated by the LLM, rather than waiting for the entire response to complete. This enables:

  • Real-time UI updates — Display text as it's generated
  • Better UX — Users see progress instead of loading spinner
  • Lower perceived latency — First tokens arrive quickly
  • Cancellation support — Stop generation mid-stream

HTTP Protocol

Streaming uses Server-Sent Events (SSE) over HTTP:

GET /query/stream?query=What+is+EdgeQuake HTTP/1.1
Host: localhost:8080
Accept: text/event-stream

HTTP/1.1 200 OK
Content-Type: text/event-stream

data: {"chunk":"Edge","type":"token"}

data: {"chunk":"Quake","type":"token"}

data: {"chunk":" is","type":"token"}

data: {"type":"done"}

Query Streaming

Basic Streaming Query

import sys
from edgequake import EdgequakeClient

client = EdgequakeClient(api_key="your-key")

# Stream query results
for chunk in client.query.stream(query="What are knowledge graphs?"):
    if isinstance(chunk, dict) and "chunk" in chunk:
        sys.stdout.write(chunk["chunk"])
        sys.stdout.flush()
    elif isinstance(chunk, str):
        sys.stdout.write(chunk)
        sys.stdout.flush()

print()  # Newline after streaming

Hybrid Mode Streaming

# Stream with hybrid retrieval mode
for chunk in client.query.stream(
    query="Explain how RAG enhances LLMs",
    mode="hybrid",
    top_k=10
):
    if isinstance(chunk, dict):
        # Structured chunk
        if chunk.get("type") == "token":
            print(chunk["chunk"], end="", flush=True)
        elif chunk.get("type") == "source":
            print(f"\n[Source: {chunk['source']}]", flush=True)
    else:
        # Plain text chunk
        print(chunk, end="", flush=True)

Collecting Full Response

# Collect all chunks into full response
chunks = []
for chunk in client.query.stream(query="What is EdgeQuake?"):
    if isinstance(chunk, dict) and "chunk" in chunk:
        chunks.append(chunk["chunk"])
    elif isinstance(chunk, str):
        chunks.append(chunk)

full_response = "".join(chunks)
print(full_response)

Chat Streaming

Basic Chat Streaming

import sys
from edgequake import EdgequakeClient

client = EdgequakeClient(api_key="your-key")

# OpenAI-compatible streaming
for chunk in client.chat.stream(
    model="edgequake",
    messages=[
        {"role": "user", "content": "What are the benefits of graph-based RAG?"}
    ]
):
    if isinstance(chunk, dict):
        delta = chunk.get("choices", [{}])[0].get("delta", {}).get("content")
        if delta:
            sys.stdout.write(delta)
            sys.stdout.flush()

print()

Multi-Turn Conversation Streaming

import sys

messages = [
    {"role": "system", "content": "You are a helpful assistant."},
    {"role": "user", "content": "What is EdgeQuake?"}
]

# Stream first response
response_text = ""
for chunk in client.chat.stream(model="edgequake", messages=messages):
    delta = chunk.get("choices", [{}])[0].get("delta", {}).get("content")
    if delta:
        response_text += delta
        sys.stdout.write(delta)
        sys.stdout.flush()

print()

# Add assistant's response to conversation
messages.append({"role": "assistant", "content": response_text})

# Continue conversation
messages.append({"role": "user", "content": "How does it work?"})

for chunk in client.chat.stream(model="edgequake", messages=messages):
    delta = chunk.get("choices", [{}])[0].get("delta", {}).get("content")
    if delta:
        sys.stdout.write(delta)
        sys.stdout.flush()

print()

Error Handling

Connection Errors

from edgequake.exceptions import NetworkError, TimeoutError

try:
    for chunk in client.query.stream(query="What is RAG?"):
        print(chunk, end="", flush=True)
except NetworkError as e:
    print(f"\n[Connection lost: {e}]")
    # Option 1: Retry from beginning
    for chunk in client.query.stream(query="What is RAG?"):
        print(chunk, end="", flush=True)
except TimeoutError:
    print("\n[Stream timeout]")

Incomplete Streams

import time

timeout_seconds = 60
start_time = time.time()

try:
    for chunk in client.query.stream(query="Write a long essay..."):
        if time.time() - start_time > timeout_seconds:
            print("\n[Stream timeout — partial response received]")
            break
        print(chunk, end="", flush=True)
except Exception as e:
    print(f"\n[Stream error: {e}]")

Graceful Degradation

def query_with_fallback(client, query):
    """Try streaming, fall back to non-streaming on error."""
    try:
        # Try streaming first
        response = ""
        for chunk in client.query.stream(query=query):
            if isinstance(chunk, str):
                response += chunk
            elif isinstance(chunk, dict) and "chunk" in chunk:
                response += chunk["chunk"]
        return response
    except Exception as e:
        print(f"[Streaming failed: {e}, using synchronous mode]")
        # Fall back to synchronous query
        result = client.query.execute(query=query)
        return result["answer"]

answer = query_with_fallback(client, "What is EdgeQuake?")
print(answer)

Advanced Patterns

Progress Tracking

import sys

total_tokens = 0
start_time = time.time()

for chunk in client.query.stream(query="Explain knowledge graphs in detail"):
    if isinstance(chunk, dict) and "chunk" in chunk:
        total_tokens += 1
        sys.stdout.write(chunk["chunk"])
        sys.stdout.flush()

        # Show progress every 10 tokens
        if total_tokens % 10 == 0:
            elapsed = time.time() - start_time
            tokens_per_sec = total_tokens / elapsed
            print(f"\n[{total_tokens} tokens, {tokens_per_sec:.1f} tokens/sec]", flush=True)

print(f"\n\nTotal: {total_tokens} tokens in {time.time() - start_time:.2f}s")

Buffered Streaming (Word-by-Word)

import sys

buffer = ""
for chunk in client.query.stream(query="What is RAG?"):
    if isinstance(chunk, dict) and "chunk" in chunk:
        buffer += chunk["chunk"]
    elif isinstance(chunk, str):
        buffer += chunk

    # Flush on space (word boundary)
    if " " in buffer or "\n" in buffer:
        sys.stdout.write(buffer)
        sys.stdout.flush()
        buffer = ""

# Flush remaining
if buffer:
    sys.stdout.write(buffer)
    sys.stdout.flush()

print()

Async Streaming (AsyncIO)

import asyncio
from edgequake import AsyncEdgequakeClient

async def stream_query_async():
    client = AsyncEdgequakeClient(api_key="your-key")

    async for chunk in client.query.stream(query="What is EdgeQuake?"):
        if isinstance(chunk, dict) and "chunk" in chunk:
            print(chunk["chunk"], end="", flush=True)
        await asyncio.sleep(0)  # Yield control

    print()

asyncio.run(stream_query_async())

Stream to File

with open("response.txt", "w") as f:
    for chunk in client.query.stream(query="Explain RAG"):
        if isinstance(chunk, str):
            f.write(chunk)
            f.flush()
        elif isinstance(chunk, dict) and "chunk" in chunk:
            f.write(chunk["chunk"])
            f.flush()

print("Response saved to response.txt")

Real-Time UI Update (Flask Example)

from flask import Flask, Response, stream_with_context
from edgequake import EdgequakeClient

app = Flask(__name__)
client = EdgequakeClient(api_key="your-key")

@app.route("/stream")
def stream_endpoint():
    def generate():
        for chunk in client.query.stream(query="What is EdgeQuake?"):
            if isinstance(chunk, str):
                yield f"data: {chunk}\n\n"
            elif isinstance(chunk, dict) and "chunk" in chunk:
                yield f"data: {chunk['chunk']}\n\n"
        yield "data: [DONE]\n\n"

    return Response(stream_with_context(generate()), mimetype="text/event-stream")

# Client-side JavaScript:
# const eventSource = new EventSource('/stream');
# eventSource.onmessage = (event) => {
#     document.getElementById('output').textContent += event.data;
# };

Performance Considerations

1. Network Latency

Streaming is most beneficial when network latency is low:

import time

# Measure first token latency
start = time.time()
first_chunk_received = False

for chunk in client.query.stream(query="What is RAG?"):
    if not first_chunk_received:
        first_token_latency = time.time() - start
        print(f"[First token: {first_token_latency:.3f}s]")
        first_chunk_received = True
    print(chunk, end="", flush=True)

Expected latencies:

  • Local (localhost): 10-50ms
  • Same datacenter: 50-200ms
  • Cross-region: 200-500ms

2. Buffering

Python's sys.stdout is line-buffered by default. Use flush=True:

# ❌ Bad: May not show tokens immediately
print(chunk, end="")

# ✅ Good: Forces immediate output
print(chunk, end="", flush=True)

# Or:
import sys
sys.stdout.write(chunk)
sys.stdout.flush()

3. Token Generation Speed

Streaming throughput depends on LLM provider:

Provider Tokens/sec
Ollama (local) 10-50
OpenAI GPT-4 30-80
OpenAI GPT-3.5 80-150

4. Connection Timeouts

# Increase timeout for long-running streams
client = EdgequakeClient(
    api_key="your-key",
    timeout=300  # 5 minutes
)

for chunk in client.query.stream(query="Write a detailed essay..."):
    print(chunk, end="", flush=True)

5. Memory Usage

Streaming uses constant memory (unlike buffering entire response):

import sys

# ✅ Good: Constant memory
for chunk in client.query.stream(query="Long essay"):
    sys.stdout.write(chunk)

# ❌ Bad: Stores entire response in memory
chunks = list(client.query.stream(query="Long essay"))
full_text = "".join(chunks)

Troubleshooting

No Output

Problem: Stream appears frozen

Solution: Ensure output is flushed:

import sys
for chunk in client.query.stream(query="..."):
    sys.stdout.write(chunk)
    sys.stdout.flush()  # Force flush

Broken Pipe Errors

Problem: BrokenPipeError: [Errno 32] Broken pipe

Solution: Connection lost mid-stream, retry:

from edgequake.exceptions import NetworkError

max_retries = 3
for attempt in range(max_retries):
    try:
        for chunk in client.query.stream(query="..."):
            print(chunk, end="", flush=True)
        break  # Success
    except NetworkError:
        if attempt < max_retries - 1:
            print("\n[Retrying...]")
            continue
        raise

Slow Streaming

Problem: Tokens arrive slowly

Solutions:

  1. Check network latency: ping your-api-server
  2. Increase buffer size in SDK (if configurable)
  3. Use faster LLM provider
  4. Reduce top_k to retrieve less context

JSON Parsing Errors

Problem: JSONDecodeError while parsing chunks

Solution: Handle both JSON and plain text:

import json

for chunk in client.query.stream(query="..."):
    try:
        parsed = json.loads(chunk) if isinstance(chunk, str) else chunk
        print(parsed.get("chunk", ""), end="", flush=True)
    except json.JSONDecodeError:
        # Plain text chunk
        print(chunk, end="", flush=True)

See Also