Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 6 additions & 29 deletions include/livekit/audio_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,17 @@

#pragma once

#include <condition_variable>
#include <cstdint>
#include <deque>
#include <cstddef>
#include <memory>
#include <mutex>
#include <optional>
#include <string>

#include "audio_frame.h"
#include "ffi_handle.h"
#include "participant.h"
#include "track.h"

namespace livekit {

class Participant;

namespace proto {
class FfiEvent;
}
Expand Down Expand Up @@ -119,34 +115,15 @@ class AudioStream {
void close();

private:
AudioStream() = default;
AudioStream();

void initFromTrack(const std::shared_ptr<Track> &track,
const Options &options);
void initFromParticipant(Participant &participant, TrackSource track_source,
const Options &options);

// FFI event handler (registered with FfiClient)
void onFfiEvent(const proto::FfiEvent &event);

// Queue helpers
void pushFrame(AudioFrameEvent &&ev);
void pushEos();

mutable std::mutex mutex_;
std::condition_variable cv_;
std::deque<AudioFrameEvent> queue_;
std::size_t capacity_{0};
bool eof_{false};
bool closed_{false};

Options options_;

// Underlying FFI audio stream handle
FfiHandle stream_handle_;

// Listener id registered on FfiClient
std::int32_t listener_id_{0};
struct Impl;
std::unique_ptr<Impl> impl_;
};

} // namespace livekit
40 changes: 3 additions & 37 deletions include/livekit/data_track_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@
#include "livekit/data_track_frame.h"
#include "livekit/ffi_handle.h"

#include <condition_variable>
#include <cstdint>
#include <deque>
#include <memory>
#include <mutex>
#include <optional>

namespace livekit {
Expand Down Expand Up @@ -90,43 +87,12 @@ class DataTrackStream {
private:
friend class RemoteDataTrack;

DataTrackStream() = default;
DataTrackStream();
/// Internal init helper, called by RemoteDataTrack.
void init(FfiHandle subscription_handle);

/// FFI event handler, called by FfiClient.
void onFfiEvent(const proto::FfiEvent &event);

/// Push a received DataTrackFrame to the internal storage.
void pushFrame(DataTrackFrame &&frame);

/// Push an end-of-stream signal (EOS).
void pushEos();

/** Protects all mutable state below. */
mutable std::mutex mutex_;

/** Signalled when a frame is pushed or the subscription ends. */
std::condition_variable cv_;

/**
* Received frame awaiting read().
* NOTE: the Rust side handles buffering, so we should only really ever have
* one item.
*/
std::optional<DataTrackFrame> frame_;

/** True once the remote side signals end-of-stream. */
bool eof_{false};

/** True after close() has been called by the consumer. */
bool closed_{false};

/** RAII handle for the Rust-owned subscription resource. */
FfiHandle subscription_handle_;

/** FfiClient listener id for routing FfiEvent callbacks to this object. */
std::int32_t listener_id_{0};
struct Impl;
std::unique_ptr<Impl> impl_;
};

} // namespace livekit
28 changes: 5 additions & 23 deletions include/livekit/room.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@

#include "livekit/data_stream.h"
#include "livekit/e2ee.h"
#include "livekit/ffi_handle.h"
#include "livekit/room_event_types.h"
#include "livekit/subscription_thread_dispatcher.h"

#include <cstdint>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <vector>

namespace livekit {

Expand Down Expand Up @@ -318,27 +319,8 @@ class Room {
private:
friend class RoomCallbackTest;

mutable std::mutex lock_;
ConnectionState connection_state_ = ConnectionState::Disconnected;
RoomDelegate *delegate_ = nullptr; // Not owned
RoomInfoData room_info_;
std::shared_ptr<FfiHandle> room_handle_;
std::unique_ptr<LocalParticipant> local_participant_;
std::unordered_map<std::string, std::shared_ptr<RemoteParticipant>>
remote_participants_;
// Data stream
std::unordered_map<std::string, TextStreamHandler> text_stream_handlers_;
std::unordered_map<std::string, ByteStreamHandler> byte_stream_handlers_;
std::unordered_map<std::string, std::shared_ptr<TextStreamReader>>
text_stream_readers_;
std::unordered_map<std::string, std::shared_ptr<ByteStreamReader>>
byte_stream_readers_;
// E2EE
std::unique_ptr<E2EEManager> e2ee_manager_;
std::shared_ptr<SubscriptionThreadDispatcher> subscription_thread_dispatcher_;

// FfiClient listener ID (0 means no listener registered)
int listener_id_{0};
struct Impl;
std::unique_ptr<Impl> impl_;

void OnEvent(const proto::FfiEvent &event);
};
Expand Down
61 changes: 13 additions & 48 deletions include/livekit/subscription_thread_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@
#include "livekit/audio_stream.h"
#include "livekit/video_stream.h"

#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>

namespace livekit {
Expand Down Expand Up @@ -357,13 +356,6 @@ class SubscriptionThreadDispatcher {
}
};

/// Active read-side resources for one audio/video subscription dispatch slot.
struct ActiveReader {
std::shared_ptr<AudioStream> audio_stream;
std::shared_ptr<VideoStream> video_stream;
std::thread thread;
};

/// Compound lookup key for a remote participant identity and data track name.
struct DataCallbackKey {
std::string participant_identity;
Expand All @@ -390,14 +382,6 @@ class SubscriptionThreadDispatcher {
DataFrameCallback callback;
};

/// Active read-side resources for one data track stream subscription.
struct ActiveDataReader {
std::shared_ptr<RemoteDataTrack> remote_track;
std::mutex sub_mutex;
std::shared_ptr<DataTrackStream> stream; // guarded by sub_mutex
std::thread thread;
};

/// Stored audio callback registration plus stream-construction options.
struct RegisteredAudioCallback {
AudioFrameCallback callback;
Expand Down Expand Up @@ -455,39 +439,20 @@ class SubscriptionThreadDispatcher {
const std::shared_ptr<RemoteDataTrack> &track,
const DataFrameCallback &cb);

/// Protects callback registration maps and active reader state.
mutable std::mutex lock_;

/// Registered audio frame callbacks keyed by \ref CallbackKey.
std::unordered_map<CallbackKey, RegisteredAudioCallback, CallbackKeyHash>
audio_callbacks_;

/// Registered video frame callbacks keyed by \ref CallbackKey.
std::unordered_map<CallbackKey, RegisteredVideoCallback, CallbackKeyHash>
video_callbacks_;

/// Active stream/thread state keyed by \ref CallbackKey.
std::unordered_map<CallbackKey, ActiveReader, CallbackKeyHash>
active_readers_;

/// Next auto-increment ID for data frame callbacks.
DataFrameCallbackId next_data_callback_id_{0};

/// Registered data frame callbacks keyed by opaque callback ID.
std::unordered_map<DataFrameCallbackId, RegisteredDataCallback>
data_callbacks_;

/// Active data reader threads keyed by callback ID.
std::unordered_map<DataFrameCallbackId, std::shared_ptr<ActiveDataReader>>
active_data_readers_;

/// Currently published remote data tracks, keyed by (participant, name).
std::unordered_map<DataCallbackKey, std::shared_ptr<RemoteDataTrack>,
DataCallbackKeyHash>
remote_data_tracks_;

/// Hard limit on concurrently active per-subscription reader threads.
static constexpr int kMaxActiveReaders = 20;

std::size_t audioCallbackCountForTest() const;
std::size_t videoCallbackCountForTest() const;
std::size_t activeReaderCountForTest() const;
std::size_t dataCallbackCountForTest() const;
std::size_t activeDataReaderCountForTest() const;
std::size_t remoteDataTrackCountForTest() const;
bool hasAudioCallbackForTest(const CallbackKey &key) const;
bool hasVideoCallbackForTest(const CallbackKey &key) const;

struct Impl;
std::unique_ptr<Impl> impl_;
};

} // namespace livekit
Expand Down
32 changes: 6 additions & 26 deletions include/livekit/video_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,19 @@

#pragma once

#include <condition_variable>
#include <cstddef>
#include <cstdint>
#include <deque>
#include <functional>
#include <memory>
#include <mutex>
#include <optional>

#include "ffi_handle.h"
#include "participant.h"
#include "track.h"
#include "video_frame.h"
#include "video_source.h"

namespace livekit {

class Participant;

// A single video frame event delivered by VideoStream::read().
struct VideoFrameEvent {
VideoFrame frame;
Expand Down Expand Up @@ -110,33 +107,16 @@ class VideoStream {
void close();

private:
VideoStream() = default;
VideoStream();

// Internal init helpers, used by the factories
void initFromTrack(const std::shared_ptr<Track> &track,
const Options &options);
void initFromParticipant(Participant &participant, TrackSource source,
const Options &options);

// FFI event handler (registered with FfiClient)
void onFfiEvent(const proto::FfiEvent &event);

// Queue helpers
void pushFrame(VideoFrameEvent &&ev);
void pushEos();

mutable std::mutex mutex_;
std::condition_variable cv_;
std::deque<VideoFrameEvent> queue_;
std::size_t capacity_{0};
bool eof_{false};
bool closed_{false};

// Underlying FFI handle for the video stream
FfiHandle stream_handle_;

// Listener id registered on FfiClient
std::int32_t listener_id_{0};
struct Impl;
std::unique_ptr<Impl> impl_;
};

} // namespace livekit
Loading
Loading