From 2676f5e1e351961b347bc24890622dfd599d5387 Mon Sep 17 00:00:00 2001 From: Igor Ulyanov Date: Sun, 14 Jun 2026 10:02:46 +0300 Subject: [PATCH 1/3] feat: add streaming support for chat completions with SSE event handling --- examples/13-chat-stream.cpp | 34 +++++ examples/CMakeLists.txt | 2 + include/openai/openai.hpp | 252 ++++++++++++++++++++++++++++++++++++ 3 files changed, 288 insertions(+) create mode 100644 examples/13-chat-stream.cpp diff --git a/examples/13-chat-stream.cpp b/examples/13-chat-stream.cpp new file mode 100644 index 0000000..22537ea --- /dev/null +++ b/examples/13-chat-stream.cpp @@ -0,0 +1,34 @@ +#include "openai.hpp" + +#include + +int main() { + openai::start(); + + openai::Json input = R"( + { + "model": "gpt-3.5-turbo", + "messages":[{"role":"user", "content":"Write a short haiku about the sea."}], + "max_tokens": 64, + "temperature": 0 + } + )"_json; + + auto on_chunk = [](const openai::Json& chunk) -> bool { + if (!chunk.contains("choices") || !chunk["choices"].is_array() || chunk["choices"].empty()) { + return true; + } + const auto& choice = chunk["choices"][0]; + if (!choice.contains("delta") || !choice["delta"].is_object()) { + return true; + } + const auto& delta = choice["delta"]; + if (delta.contains("content") && delta["content"].is_string()) { + std::cout << delta["content"].get() << std::flush; + } + return true; + }; + + auto final_response = openai::chat().createStream(input, on_chunk); + std::cout << "\n\nFinal aggregated response:\n" << final_response.dump(2) << '\n'; +} diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index a590434..50131ee 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -38,6 +38,7 @@ set (SOURCES_EXAMPLES 10-chat 11-audio 12-moderation + 13-chat-stream ) set (TARGETS_EXAMPLES @@ -53,6 +54,7 @@ set (TARGETS_EXAMPLES 10-chat 11-audio 12-moderation + 13-chat-stream ) foreach( name ${TARGETS_EXAMPLES} ) diff --git a/include/openai/openai.hpp b/include/openai/openai.hpp index 16a65c9..ac9bd8a 100644 --- a/include/openai/openai.hpp +++ b/include/openai/openai.hpp @@ -36,6 +36,7 @@ #include #include #include +#include #ifndef CURL_STATICLIB #include @@ -114,6 +115,8 @@ class Session { Response postPrepare(const std::string& contentType = ""); Response deletePrepare(); Response makeRequest(const std::string& contentType = ""); + Response makeStreamRequest(std::function on_sse_event, + const std::string& contentType = "application/json"); std::string easyEscape(const std::string& text); private: @@ -122,6 +125,64 @@ class Session { return size * nmemb; } + struct StreamCtx { + std::function on_sse_event; + std::string buffer; + bool aborted = false; + }; + + static size_t writeStreamFunction(void* ptr, size_t size, size_t nmemb, void* userdata) { + size_t total = size * nmemb; + StreamCtx* ctx = static_cast(userdata); + if (ctx->aborted) { + return 0; // abort transfer + } + ctx->buffer.append((char*)ptr, total); + + // SSE events are separated by \n\n (or \r\n\r\n). Process all complete events. + while (true) { + size_t sep = ctx->buffer.find("\n\n"); + size_t sep_len = 2; + if (sep == std::string::npos) { + size_t rsep = ctx->buffer.find("\r\n\r\n"); + if (rsep == std::string::npos) break; + sep = rsep; + sep_len = 4; + } + std::string event = ctx->buffer.substr(0, sep); + ctx->buffer.erase(0, sep + sep_len); + + // An event may contain multiple lines; collect all data: lines. + std::string data_payload; + size_t pos = 0; + while (pos < event.size()) { + size_t eol = event.find('\n', pos); + std::string line = (eol == std::string::npos) + ? event.substr(pos) + : event.substr(pos, eol - pos); + if (!line.empty() && line.back() == '\r') line.pop_back(); + if (line.rfind("data:", 0) == 0) { + std::string v = line.substr(5); + if (!v.empty() && v.front() == ' ') v.erase(0, 1); + if (!data_payload.empty()) data_payload += "\n"; + data_payload += v; + } + if (eol == std::string::npos) break; + pos = eol + 1; + } + + if (data_payload.empty()) continue; + if (data_payload == "[DONE]") continue; + + if (!ctx->on_sse_event(data_payload)) { + ctx->aborted = true; + return 0; // abort transfer + } + } + + return total; + } + private: CURL* curl_; CURLcode res_; @@ -234,6 +295,49 @@ inline Response Session::makeRequest(const std::string& contentType) { return { response_string, is_error, error_msg }; } +inline Response Session::makeStreamRequest(std::function on_sse_event, + const std::string& contentType) { + std::lock_guard lock(mutex_request_); + + struct curl_slist* headers = NULL; + if (!contentType.empty()) { + headers = curl_slist_append(headers, std::string{"Content-Type: " + contentType}.c_str()); + } + headers = curl_slist_append(headers, std::string{"Authorization: Bearer " + token_}.c_str()); + if (!organization_.empty()) { + headers = curl_slist_append(headers, std::string{"OpenAI-Organization: " + organization_}.c_str()); + } + if (!beta_.empty()) { + headers = curl_slist_append(headers, std::string{"OpenAI-Beta: " + beta_}.c_str()); + } + headers = curl_slist_append(headers, "Accept: text/event-stream"); + curl_easy_setopt(curl_, CURLOPT_HTTPHEADER, headers); + curl_easy_setopt(curl_, CURLOPT_URL, url_.c_str()); + + StreamCtx ctx; + ctx.on_sse_event = std::move(on_sse_event); + + curl_easy_setopt(curl_, CURLOPT_WRITEFUNCTION, writeStreamFunction); + curl_easy_setopt(curl_, CURLOPT_WRITEDATA, &ctx); + + res_ = curl_easy_perform(curl_); + + bool is_error = false; + std::string error_msg{}; + if (res_ != CURLE_OK && !ctx.aborted) { + is_error = true; + error_msg = "OpenAI curl_easy_perform() failed: " + std::string{curl_easy_strerror(res_)}; + if (throw_exception_) { + throw std::runtime_error(error_msg); + } + else { + std::cerr << error_msg << '\n'; + } + } + + return { std::string{}, is_error, error_msg }; +} + inline std::string Session::easyEscape(const std::string& text) { char *encoded_output = curl_easy_escape(curl_, text.c_str(), static_cast(text.length())); const auto str = std::string{ encoded_output }; @@ -323,6 +427,10 @@ struct CategoryCompletion { // Given a prompt, the model will return one or more predicted chat completions. struct CategoryChat { Json create(Json input); + // Streaming variant: sets "stream": true, calls on_chunk for every SSE event + // (with the parsed chunk JSON). Returning false from on_chunk aborts the stream. + // Returns a final aggregated JSON with the same shape as create() (choices[0].message.*). + Json createStream(Json input, std::function on_chunk); CategoryChat(OpenAI& openai) : openai_{openai} {} @@ -515,6 +623,22 @@ class OpenAI { return post(suffix, json.dump(), contentType); } + // Streaming POST: sends body, calls on_event for each SSE data payload (raw string). + // Return false from on_event to abort the stream. + void postStream(const std::string& suffix, const std::string& data, + std::function on_event, + const std::string& contentType = "application/json") { + setParameters(suffix, data, contentType); + auto response = session_.makeStreamRequest(std::move(on_event), contentType); + if (response.is_error) { trigger_error(response.error_message); } + } + + void postStream(const std::string& suffix, const Json& json, + std::function on_event, + const std::string& contentType = "application/json") { + postStream(suffix, json.dump(), std::move(on_event), contentType); + } + Json del(const std::string& suffix) { setParameters(suffix, ""); auto response = session_.deletePrepare(); @@ -885,6 +1009,134 @@ inline Json CategoryChat::create(Json input) { return openai_.post("chat/completions", input); } +// POST https://api.openai.com/v1/chat/completions with "stream": true. +// Aggregates streamed deltas into a final JSON object with the same shape as create(). +inline Json CategoryChat::createStream(Json input, std::function on_chunk) { + input["stream"] = true; + + // Aggregated final response (filled from deltas). + Json final_response = { + {"choices", Json::array({ + Json{ + {"index", 0}, + {"message", { + {"role", "assistant"}, + {"content", ""} + }}, + {"finish_reason", nullptr} + } + })} + }; + auto& agg_msg = final_response["choices"][0]["message"]; + std::string content_acc; + std::string reasoning_acc; + // tool_calls aggregated by index + std::map tool_calls_by_index; + bool stream_error = false; + std::string stream_error_msg; + + auto on_event = [&](const std::string& payload) -> bool { + Json chunk; + try { + chunk = Json::parse(payload); + } catch (...) { + return true; // ignore non-JSON payloads + } + + // Server-side error reported mid-stream. + if (chunk.is_object() && chunk.contains("error")) { + stream_error = true; + try { + if (chunk["error"].is_object() && chunk["error"].contains("message")) { + stream_error_msg = chunk["error"]["message"].get(); + } else { + stream_error_msg = chunk["error"].dump(); + } + } catch (...) {} + final_response["error"] = chunk["error"]; + on_chunk(chunk); + return false; + } + + if (!on_chunk(chunk)) return false; + + if (!chunk.contains("choices") || !chunk["choices"].is_array() || chunk["choices"].empty()) { + return true; + } + const auto& choice = chunk["choices"][0]; + if (choice.contains("finish_reason") && !choice["finish_reason"].is_null()) { + final_response["choices"][0]["finish_reason"] = choice["finish_reason"]; + } + if (!choice.contains("delta") || !choice["delta"].is_object()) { + return true; + } + const auto& delta = choice["delta"]; + if (delta.contains("role") && delta["role"].is_string()) { + agg_msg["role"] = delta["role"]; + } + if (delta.contains("content") && delta["content"].is_string()) { + content_acc += delta["content"].get(); + } + if (delta.contains("reasoning_content") && delta["reasoning_content"].is_string()) { + reasoning_acc += delta["reasoning_content"].get(); + } + if (delta.contains("annotations") && delta["annotations"].is_array()) { + agg_msg["annotations"] = delta["annotations"]; + } + if (delta.contains("tool_calls") && delta["tool_calls"].is_array()) { + for (const auto& tc : delta["tool_calls"]) { + int idx = tc.contains("index") && tc["index"].is_number_integer() + ? tc["index"].get() + : (int)tool_calls_by_index.size(); + auto& acc = tool_calls_by_index[idx]; + if (acc.is_null()) { + acc = Json::object(); + acc["type"] = "function"; + acc["function"] = Json::object(); + acc["function"]["name"] = ""; + acc["function"]["arguments"] = ""; + } + if (tc.contains("id") && tc["id"].is_string()) { + acc["id"] = tc["id"]; + } + if (tc.contains("type") && tc["type"].is_string()) { + acc["type"] = tc["type"]; + } + if (tc.contains("function") && tc["function"].is_object()) { + const auto& f = tc["function"]; + if (f.contains("name") && f["name"].is_string()) { + acc["function"]["name"] = acc["function"]["name"].get() + + f["name"].get(); + } + if (f.contains("arguments") && f["arguments"].is_string()) { + acc["function"]["arguments"] = acc["function"]["arguments"].get() + + f["arguments"].get(); + } + } + } + } + return true; + }; + + openai_.postStream("chat/completions", input, on_event); + + agg_msg["content"] = content_acc; + if (!reasoning_acc.empty()) { + agg_msg["reasoning_content"] = reasoning_acc; + } + if (!tool_calls_by_index.empty()) { + Json arr = Json::array(); + for (auto& kv : tool_calls_by_index) { + arr.push_back(kv.second); + } + agg_msg["tool_calls"] = arr; + } + if (stream_error && !final_response.contains("error")) { + final_response["error"] = { {"message", stream_error_msg} }; + } + return final_response; +} + // POST https://api.openai.com/v1/audio/transcriptions // Transcribes audio into the input language. inline Json CategoryAudio::transcribe(Json input) { From 1e7762162ec99616602594bba8ead2d438c04143 Mon Sep 17 00:00:00 2001 From: Igor Ulyanov Date: Sun, 14 Jun 2026 15:49:13 +0300 Subject: [PATCH 2/3] feat: add streaming documentation --- doc/streaming/README.md | 149 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 149 insertions(+) create mode 100644 doc/streaming/README.md diff --git a/doc/streaming/README.md b/doc/streaming/README.md new file mode 100644 index 0000000..87856d4 --- /dev/null +++ b/doc/streaming/README.md @@ -0,0 +1,149 @@ +# Streaming support for Chat Completions + +This document describes the streaming entry point added to the C++ OpenAI +wrapper: `openai::chat().createStream(input, on_chunk)`. It mirrors the +existing `create()` API but delivers incremental deltas to a user-supplied +callback as they arrive, and returns a final aggregated JSON object with the +same shape as the non-streaming call. + +A self-contained example is included as +[`examples/13-chat-stream.cpp`](../../examples/13-chat-stream.cpp). + +## Summary + +This adds support for OpenAI's Server-Sent Events (SSE) streaming on the +`chat/completions` endpoint. The new entry point lets you consume tokens as +they are produced by the model and still get a fully aggregated response at +the end, with no changes to existing APIs. + +## What's new + +- `CategoryChat::createStream(Json input, std::function on_chunk)` + - Forces `"stream": true` on the request. + - Invokes `on_chunk` once per parsed SSE event (the raw chunk JSON as sent + by the server). + - Returning `false` from `on_chunk` aborts the transfer cleanly (libcurl + write callback returns 0). + - Aggregates `delta.content`, `delta.reasoning_content`, `delta.tool_calls` + (merged by `index`, with `function.name` / `function.arguments` + concatenated), `delta.annotations` (latest value wins), `role` and + `finish_reason` into a final response shaped like a regular + `chat/completions` reply (`choices[0].message.*`). + - Mid-stream `error` objects are first forwarded to the callback, then + surfaced on the returned JSON, and stop the stream. + - Payloads that fail to parse as JSON are silently skipped; the terminal + `[DONE]` marker is filtered out and never reaches `on_chunk`. +- Low-level plumbing in `Session` / `OpenAI`: + - `Session::makeStreamRequest` — SSE-aware write callback that buffers + partial chunks and dispatches complete `data:` payloads (handles + multi-line events, `\r\n` line endings, and the terminal `[DONE]` + marker). + - `OpenAI::postStream` — streaming counterpart to `post`, sends the body + and forwards SSE payloads to a user callback. +- New example: `examples/13-chat-stream.cpp` (registered in + `examples/CMakeLists.txt`), prints tokens to `stdout` as they arrive and + then dumps the aggregated final response. + +## API + +```cpp +openai::Json createStream(openai::Json input, + std::function on_chunk); +``` + +- `input` — same JSON body you'd pass to `create()`; `stream: true` is set + automatically. +- `on_chunk` — called for every streamed event with the parsed chunk JSON. + Return `true` to keep going, `false` to abort. +- Return value — an aggregated final JSON (same shape as `create()`'s + response), populated from the streamed deltas; on errors, includes an + `error` field. + +## Field-by-field aggregation + +How each field in `delta` is folded into the final `choices[0].message`: + +- `content`, `reasoning_content` — **concatenated** in arrival order. +- `tool_calls` — merged by `index` into a single array, ordered by `index` + ascending: + - `function.arguments` and `function.name` — **concatenated** (the server + streams `arguments` as a JSON string in fragments; it only becomes valid + JSON once the stream ends). + - `id`, `type` — **last value wins** (`type` defaults to `"function"` if + the server never sends it). +- `annotations` — **last value wins** (replaced wholesale by the most + recent delta). +- `role`, `finish_reason` — **last non-empty value wins** (`role` defaults + to `"assistant"`). + +Note: this aggregation does **not** change the model's output relative to a +non-streaming `create()` call. The resulting `choices[0].message.*` is +equivalent to what `create()` would return for the same request. The +practical differences are: + +- Top-level response fields that the server only sends with the final + non-streaming reply (`id`, `created`, `model`, `usage`, + `system_fingerprint`, ...) are **not** present on the aggregated object — + the streaming protocol does not include them. +- Only `choices[0]` is aggregated. If you request `n > 1`, the additional + choices are still delivered to `on_chunk` as they arrive, but they will + not appear in the returned aggregated JSON. + +## Callback contract + +Things worth knowing about the callback contract: + +- `createStream` is **synchronous**: it blocks until the stream ends (or the + callback returns `false`). Capturing locals by reference is therefore safe + — they're guaranteed to outlive the call. +- The callback runs on the **calling thread**, from inside libcurl's write + callback. Don't issue another request on the same `OpenAI` instance from + within the callback — the session is guarded by a mutex and would + deadlock. +- Returning `false` is the clean way to cancel; libcurl will report the + aborted transfer, and `createStream` will still return the partially + aggregated response. +- Exceptions thrown from the callback will propagate up through libcurl; + prefer catching inside the callback if you need fine-grained control. + +## Usage example + +```cpp +#include "openai.hpp" +#include + +int main() { + openai::start(); + + openai::Json input = R"({ + "model": "gpt-3.5-turbo", + "messages":[{"role":"user","content":"Write a short haiku about the sea."}], + "max_tokens": 64, + "temperature": 0 + })"_json; + + std::string collected; + auto on_chunk = [&](const openai::Json& chunk) -> bool { + const auto& choices = chunk.value("choices", openai::Json::array()); + if (choices.empty()) return true; + const auto& delta = choices[0].value("delta", openai::Json::object()); + if (delta.contains("content") && delta["content"].is_string()) { + const auto piece = delta["content"].get(); + std::cout << piece << std::flush; + collected += piece; // mutate caller's local state, no globals + } + return true; // return false to cancel mid-stream + }; + + auto final_response = openai::chat().createStream(input, on_chunk); + std::cout << "\n\nFinal aggregated response:\n" + << final_response.dump(2) << '\n'; +} +``` + +## Backward compatibility + +- Purely additive. No existing function signatures, types, or behavior are + changed. +- Non-streaming `create()` is untouched. +- SSE parsing is implemented on top of the existing libcurl integration. From 670b5abeababef048d2bd2e58b374baec6bf9e56 Mon Sep 17 00:00:00 2001 From: Igor Ulyanov Date: Sun, 14 Jun 2026 20:28:20 +0300 Subject: [PATCH 3/3] fix: preserve top-level request fields in streaming result to match non-streaming request behaviour --- doc/streaming/README.md | 10 ++++++---- include/openai/openai.hpp | 14 ++++++++++++++ 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/doc/streaming/README.md b/doc/streaming/README.md index 87856d4..b49ceec 100644 --- a/doc/streaming/README.md +++ b/doc/streaming/README.md @@ -81,10 +81,12 @@ non-streaming `create()` call. The resulting `choices[0].message.*` is equivalent to what `create()` would return for the same request. The practical differences are: -- Top-level response fields that the server only sends with the final - non-streaming reply (`id`, `created`, `model`, `usage`, - `system_fingerprint`, ...) are **not** present on the aggregated object — - the streaming protocol does not include them. +- Top-level response fields that the server sends on streamed chunks + (`id`, `created`, `model`, `usage`, `system_fingerprint`, and provider- + specific fields such as Perplexity Sonar's `citations` / `search_results`) + are forwarded into the aggregated object on a **last non-null value wins** + basis. Fields that the server only emits with the final non-streaming + reply and never includes in any streamed chunk will still be absent. - Only `choices[0]` is aggregated. If you request `n > 1`, the additional choices are still delivered to `on_chunk` as they arrive, but they will not appear in the returned aggregated JSON. diff --git a/include/openai/openai.hpp b/include/openai/openai.hpp index ac9bd8a..3f4e7e6 100644 --- a/include/openai/openai.hpp +++ b/include/openai/openai.hpp @@ -1060,6 +1060,20 @@ inline Json CategoryChat::createStream(Json input, std::function