BugFix: Bot-343, DataTracks return from subscribe() and read() from rust sdk#119
BugFix: Bot-343, DataTracks return from subscribe() and read() from rust sdk#119stephen-derosa wants to merge 5 commits intolivekit:mainfrom
Conversation
alan-george-lk
left a comment
There was a problem hiding this comment.
Looks great, love the bundled unit test
There was a problem hiding this comment.
Pull request overview
This PR aligns the C++ SDK’s remote DataTrack streaming behavior with the Rust SDK update (rust-sdks#1057) by preserving and surfacing terminal subscription errors delivered via the FFI stream EOS, and by exercising that behavior in unit tests.
Changes:
- Add
DataTrackStream::terminalError()and store an optional terminal subscription error when EOS arrives with an error payload. - Log terminal subscription errors when the data reader thread exits after
read()ends. - Add unit tests validating that normal EOS produces no terminal error, while subscribe-failure EOS records a typed error.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
include/livekit/data_track_stream.h |
Exposes terminalError() and stores an optional terminal subscription error; adds test-only friend access. |
src/data_track_stream.cpp |
Parses EOS error payload from FFI events and persists it as a terminal error; clears it on local close before EOS. |
src/subscription_thread_dispatcher.cpp |
Logs terminal subscription errors after the data read loop ends. |
src/tests/unit/test_data_track_stream.cpp |
New unit tests covering terminal error behavior for normal EOS vs subscribe-failure EOS. |
src/tests/CMakeLists.txt |
Links protobuf for unit tests across platforms (supports protobuf usage in new unit test and static-link scenarios). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
ladvoc
left a comment
There was a problem hiding this comment.
LGTM, just the one question. Also, have you tested this against the Rust branch?
| LK_LOG_ERROR("Data frame callback exception: {}", e.what()); | ||
| } | ||
| } | ||
| const auto &error = stream->terminalError(); |
There was a problem hiding this comment.
question: Is the user able to access the error reason when using the thread dispatcher?
There was a problem hiding this comment.
no in the current implementation
| } | ||
| subscription_handle_.reset(); | ||
| listener_id = listener_id_; | ||
| listener_id_ = 0; |
There was a problem hiding this comment.
just found out this might be a potential bug for double RemoveListener(listener_id);
when a close() is called, it will set listener_id_ = 0;
and you will do
if (listener_id != -1) {
FfiClient::instance().RemoveListener(listener_id);
}
That says, if close() is called twice, we will call FfiClient::instance().RemoveListener(0); which might be wrong
There was a problem hiding this comment.
good callout. listener_id_ should be -1. updated.
And we protect against superfluous close() calls with:
if (closed_) {
return;
}
|
|
||
| std::optional<SubscribeDataTrackError> DataTrackStream::terminalError() const { | ||
| const std::scoped_lock<std::mutex> lock(mutex_); | ||
| return terminal_error_; |
There was a problem hiding this comment.
Yes, intentional. terminalError() is meant to be readable as post-stream state. If the stream has already reached EOS with an FFI-reported terminal error, close() preserves it. If close() is called locally before EOS, we clear terminal_error_ because the stream ended by local cancellation, not by a remote/FFI terminal failure.
I added a small comment here
xianshijing-lk
left a comment
There was a problem hiding this comment.
lgtm if you address the comments
… unpublishes before subscribing, then asserts the stream ends via read() == false with terminalError().code == SubscribeDataTrackErrorCode::UNPUBLISHED and a non-empty message.
| livekit | ||
| spdlog::spdlog | ||
| $<$<PLATFORM_ID:Windows>:${LIVEKIT_PROTOBUF_TARGET}> | ||
| ${LIVEKIT_PROTOBUF_TARGET} |
There was a problem hiding this comment.
not sure why copilot thought this needed
| const auto &error = stream->terminalError(); | ||
| if (error.has_value()) { | ||
| LK_LOG_ERROR( | ||
| "Data reader stream ended with subscription error for \"{}\" from " | ||
| "\"{}\": code={} message={}", | ||
| track_name, identity, static_cast<std::uint32_t>(error->code), | ||
| error->message); |
| void DataTrackStream::pushEos(std::optional<SubscribeDataTrackError> error) { | ||
| { | ||
| const std::scoped_lock<std::mutex> lock(mutex_); | ||
| if (eof_) { |
aligns with changes here:
livekit/rust-sdks#1057
Depends on: