diff --git a/.github/workflows/builds.yml b/.github/workflows/builds.yml index 784687d2..28defd53 100644 --- a/.github/workflows/builds.yml +++ b/.github/workflows/builds.yml @@ -65,23 +65,28 @@ jobs: include: - os: ubuntu-latest name: linux-x64 - build_cmd: ./build.sh release-all + build_cmd: ./build.sh release-examples build_dir: build-release - os: ubuntu-24.04-arm name: linux-arm64 - build_cmd: ./build.sh release-all + build_cmd: ./build.sh release-examples build_dir: build-release - - os: macos-latest + - os: macos-latest-xlarge name: macos-arm64 - build_cmd: ./build.sh release-all + build_cmd: ./build.sh release-examples build_dir: build-release - - os: macos-latest + - os: macos-26-large name: macos-x64 - build_cmd: ./build.sh release-all --macos-arch x86_64 + build_cmd: ./build.sh release-examples --macos-arch x86_64 build_dir: build-release - - os: windows-latest + # Pinned to windows-2022 (VS 17) because build.cmd hard-codes the + # CMake generator to "Visual Studio 17 2022". windows-latest now + # ships VS 18 (Visual Studio 2026) and CMake configure fails with + # "could not find any instance of Visual Studio". Re-evaluate when + # build.cmd grows VS-version detection. + - os: windows-2022 name: windows-x64 - build_cmd: .\build.cmd release-all + build_cmd: .\build.cmd release-examples build_dir: build-release name: Build (${{ matrix.name }}) @@ -149,22 +154,10 @@ jobs: run: rustup target add x86_64-apple-darwin # ---------- Cache Cargo ---------- - - name: Cache Cargo registry - uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5 + - name: Cache Cargo + uses: Swatinem/rust-cache@c19371144df3bb44fab255c43d04cbc2ab54d1c4 # v2.9.1 with: - path: | - ~/.cargo/registry - ~/.cargo/git - key: ${{ runner.os }}-${{ matrix.name }}-cargo-reg-${{ hashFiles('**/Cargo.lock') }} - restore-keys: ${{ runner.os }}-${{ matrix.name }}-cargo-reg- - - - name: Cache Cargo target - uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5 - with: - path: client-sdk-rust/target - key: ${{ runner.os }}-${{ matrix.name }}-cargo-target-${{ hashFiles('**/Cargo.lock') }} - restore-keys: | - ${{ runner.os }}-${{ matrix.name }}-cargo-target- + workspaces: client-sdk-rust -> target # ---------- Build environment setup ---------- - name: Set Linux build environment @@ -288,29 +281,6 @@ jobs: } if ($failed) { exit 1 } else { exit 0 } - # ---------- Run unit tests ---------- - - name: Run unit tests (Unix) - if: runner.os != 'Windows' - shell: bash - run: | - ${{ matrix.build_dir }}/bin/livekit_unit_tests \ - --gtest_output=xml:${{ matrix.build_dir }}/unit-test-results.xml - - - name: Run unit tests (Windows) - if: runner.os == 'Windows' - shell: pwsh - run: | - & "${{ matrix.build_dir }}/bin/livekit_unit_tests.exe" ` - --gtest_output=xml:${{ matrix.build_dir }}/unit-test-results.xml - - - name: Upload test results - if: always() - uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1 - with: - name: test-results-${{ matrix.name }} - path: ${{ matrix.build_dir }}/unit-test-results.xml - retention-days: 7 - # ---------- Upload artifacts ---------- - name: Upload build artifacts uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1 diff --git a/.github/workflows/make-release.yml b/.github/workflows/make-release.yml index b0c6b1bf..6e76fdc6 100644 --- a/.github/workflows/make-release.yml +++ b/.github/workflows/make-release.yml @@ -36,7 +36,11 @@ jobs: - os: macos-latest name: macos-x64 generator: Ninja - - os: windows-latest + # Pinned to windows-2022 to match the VS 17 generator below. + # windows-latest now provisions VS 18 (Visual Studio 2026), which + # CMake's "Visual Studio 17 2022" generator can't find. See + # builds.yml for the longer-form rationale. + - os: windows-2022 name: windows-x64 generator: "Visual Studio 17 2022" diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 00000000..81f5284a --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,262 @@ +name: Tests + +on: + push: + branches: ["main"] + paths: + - src/** + - include/** + - client-sdk-rust/** + - CMakeLists.txt + - CMakePresets.json + - build.sh + - build.cmd + - vcpkg.json + - .token_helpers/** + - .github/workflows/tests.yml + pull_request: + branches: ["main"] + paths: + - src/** + - include/** + - client-sdk-rust/** + - CMakeLists.txt + - CMakePresets.json + - build.sh + - build.cmd + - vcpkg.json + - .token_helpers/** + - .github/workflows/tests.yml + workflow_dispatch: + +permissions: + contents: read + actions: read + packages: read + +env: + CARGO_TERM_COLOR: always + # vcpkg binary caching for Windows (mirrors builds.yml) + VCPKG_DEFAULT_TRIPLET: x64-windows-static-md + VCPKG_DEFAULT_HOST_TRIPLET: x64-windows-static-md + VCPKG_TARGET_TRIPLET: x64-windows-static-md + +jobs: + test: + strategy: + fail-fast: false + matrix: + include: + - os: ubuntu-latest + name: linux-x64 + build_cmd: ./build.sh release-tests + e2e-testing: true + - os: ubuntu-24.04-arm + name: linux-arm64 + build_cmd: ./build.sh release-tests + e2e-testing: true + - os: macos-latest-xlarge + name: macos-arm64 + build_cmd: ./build.sh release-tests + # E2E not possible on GHA Mac runner currently + e2e-testing: false + - os: macos-26-large + name: macos-x64 + build_cmd: ./build.sh release-tests --macos-arch x86_64 + # E2E not possible on GHA Mac runner currently + e2e-testing: false + - os: windows-2022 + name: windows-x64 + build_cmd: .\build.cmd release-tests + + name: Test (${{ matrix.name }}) + runs-on: ${{ matrix.os }} + + steps: + - name: Checkout (with submodules) + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + with: + submodules: recursive + fetch-depth: 1 + + - name: Pull LFS files + run: git lfs pull + + # ---------- vcpkg caching for Windows (mirrors builds.yml) ---------- + - name: Export GitHub Actions cache environment variables + if: runner.os == 'Windows' + uses: actions/github-script@ed597411d8f924073f98dfc5c65a23a2325f34cd # v8.0.0 + with: + script: | + core.exportVariable('ACTIONS_CACHE_URL', process.env.ACTIONS_CACHE_URL || ''); + core.exportVariable('ACTIONS_RUNTIME_TOKEN', process.env.ACTIONS_RUNTIME_TOKEN || ''); + + - name: Setup vcpkg (Windows only) + if: runner.os == 'Windows' + uses: lukka/run-vcpkg@6fe69898af670ac05f4a8427cc5cff4fb361cee5 # v11.5 + with: + vcpkgGitCommitId: 'fb87e2bb3fe69e16c224989acb5a61349166c782' + + # ---------- OS-specific deps ---------- + - name: Install deps (Ubuntu) + if: runner.os == 'Linux' + run: | + set -eux + sudo apt-get update + sudo apt-get install -y \ + build-essential cmake ninja-build pkg-config \ + llvm-dev libclang-dev clang \ + libva-dev libdrm-dev libgbm-dev libx11-dev libgl1-mesa-dev \ + libxext-dev libxcomposite-dev libxdamage-dev libxfixes-dev \ + libxrandr-dev libxi-dev libxkbcommon-dev \ + libasound2-dev libpulse-dev \ + libssl-dev \ + libprotobuf-dev protobuf-compiler \ + libabsl-dev \ + libwayland-dev libdecor-0-dev \ + libspdlog-dev \ + jq + + - name: Install deps (macOS) + if: runner.os == 'macOS' + run: | + set -eux + brew update + brew install cmake ninja protobuf abseil spdlog jq + + # ---------- Rust toolchain ---------- + - name: Install Rust (stable) + uses: dtolnay/rust-toolchain@3c5f7ea28cd621ae0bf5283f0e981fb97b8a7af9 + with: + toolchain: stable + + - name: Install Rust cross-compilation target + if: matrix.name == 'macos-x64' + run: rustup target add x86_64-apple-darwin + + # ---------- Cache Cargo ---------- + - name: Cache Cargo + uses: Swatinem/rust-cache@c19371144df3bb44fab255c43d04cbc2ab54d1c4 # v2.9.1 + with: + workspaces: client-sdk-rust -> target + + # ---------- Build environment setup ---------- + - name: Set Linux build environment + if: runner.os == 'Linux' + run: | + echo "CXXFLAGS=-Wno-deprecated-declarations" >> $GITHUB_ENV + echo "CFLAGS=-Wno-deprecated-declarations" >> $GITHUB_ENV + LLVM_VERSION=$(llvm-config --version | cut -d. -f1) + echo "LIBCLANG_PATH=/usr/lib/llvm-${LLVM_VERSION}/lib" >> $GITHUB_ENV + + # ---------- Build (release-tests: tests on, examples off) ---------- + - name: Build tests (Unix) + if: runner.os != 'Windows' + shell: bash + run: | + chmod +x build.sh + ${{ matrix.build_cmd }} + + - name: Build tests (Windows) + if: runner.os == 'Windows' + shell: pwsh + run: ${{ matrix.build_cmd }} + + # ---------- Run unit tests ---------- + - name: Run unit tests (Unix) + if: runner.os != 'Windows' + timeout-minutes: 1 + shell: bash + run: | + build-release/bin/livekit_unit_tests \ + --gtest_output=xml:build-release/unit-test-results.xml + + - name: Run unit tests (Windows) + if: runner.os == 'Windows' + timeout-minutes: 1 + shell: pwsh + run: | + build-release\bin\livekit_unit_tests.exe ` + --gtest_output="xml:build-release\unit-test-results.xml" + + # ---------- Install + start livekit-server for integration tests ---------- + - name: Install livekit-server and lk CLI + if: matrix.e2e-testing + shell: bash + run: | + set -euxo pipefail + if [[ "$RUNNER_OS" == "Linux" ]]; then + # Linux: official install scripts. lk's installer parses the GitHub + # API JSON with jq (already installed above). + curl -sSL https://get.livekit.io | bash + curl -sSL https://get.livekit.io/cli | bash + else + # macOS: Homebrew formulas. Server install script aborts on Darwin. + brew install livekit livekit-cli + fi + livekit-server --version + lk --version + + - name: Start livekit-server + if: matrix.e2e-testing + shell: bash + env: + LIVEKIT_CONFIG: "enable_data_tracks: true" + run: | + set -euxo pipefail + # Background the server with nohup so it survives this step's shell + # exit and remains running for the integration-test step. + nohup livekit-server --dev > livekit-server.log 2>&1 & + echo $! > livekit-server.pid + # Port 7880 is a WebSocket endpoint, so a TCP-connect probe is the + # most reliable readiness signal. + for i in $(seq 1 30); do + if nc -z 127.0.0.1 7880 >/dev/null 2>&1; then + echo "livekit-server is ready" + exit 0 + fi + sleep 1 + done + echo "::error::livekit-server failed to start within 30s" + tail -n 200 livekit-server.log || true + exit 1 + + - name: Run integration tests + if: matrix.e2e-testing + timeout-minutes: 5 + shell: bash + env: + RUST_LOG: "metrics=debug" + run: | + set -euo pipefail + source .token_helpers/set_data_track_test_tokens.bash + build-release/bin/livekit_integration_tests \ + --gtest_filter='DataTrackScenarios/DataTrackTransportTest.PublishesAndReceivesFramesEndToEnd/*' \ + --gtest_output=xml:build-release/integration-test-results.xml + + - name: Stop livekit-server + if: always() && matrix.e2e-testing + shell: bash + run: | + if [ -f livekit-server.pid ]; then + kill "$(cat livekit-server.pid)" 2>/dev/null || true + rm -f livekit-server.pid + fi + + - name: Dump livekit-server log on failure + if: failure() && matrix.e2e-testing + shell: bash + run: tail -n 500 livekit-server.log || true + + # ---------- Upload results ---------- + - name: Upload test results + if: always() + uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1 + with: + name: test-results-${{ matrix.name }} + path: | + build-release/unit-test-results.xml + build-release/integration-test-results.xml + livekit-server.log + if-no-files-found: ignore + retention-days: 7 diff --git a/.token_helpers/README.md b/.token_helpers/README.md index ebed99c1..e9b1f34e 100644 --- a/.token_helpers/README.md +++ b/.token_helpers/README.md @@ -5,4 +5,7 @@ Examples of generating tokens Generate tokens and then set them as env vars for the current terminal session ## set_data_track_test_tokens.bash -Generate tokens for data track integration tests and set them as env vars for the current terminal session. \ No newline at end of file +Generate the two participant tokens required by the C++ SDK's integration +and stress test suites (data tracks, RPC, media multistream, etc.) and +export them as `LIVEKIT_TOKEN_A`, `LIVEKIT_TOKEN_B`, and `LIVEKIT_URL` for +the current terminal session. \ No newline at end of file diff --git a/.token_helpers/set_data_track_test_tokens.bash b/.token_helpers/set_data_track_test_tokens.bash index b9bf99b8..3b8d712c 100755 --- a/.token_helpers/set_data_track_test_tokens.bash +++ b/.token_helpers/set_data_track_test_tokens.bash @@ -14,14 +14,15 @@ # limitations under the License. # Generate two LiveKit access tokens via `lk` and set the environment variables -# required by src/tests/integration/test_data_track.cpp. +# required by the C++ SDK's integration and stress tests (data tracks, RPC, +# media multistream, etc.). # -# source examples/tokens/set_data_track_test_tokens.bash -# eval "$(bash examples/tokens/set_data_track_test_tokens.bash)" +# source .token_helpers/set_data_track_test_tokens.bash +# eval "$(bash .token_helpers/set_data_track_test_tokens.bash)" # # Exports: -# LK_TOKEN_TEST_A -# LK_TOKEN_TEST_B +# LIVEKIT_TOKEN_A +# LIVEKIT_TOKEN_B # LIVEKIT_URL=ws://localhost:7880 # @@ -47,8 +48,6 @@ fi LIVEKIT_ROOM="cpp_data_track_test" LIVEKIT_IDENTITY_A="cpp-test-a" LIVEKIT_IDENTITY_B="cpp-test-b" -LIVEKIT_CALLER_IDENTITY="caller" -LIVEKIT_RECEIVER_IDENTITY="receiver" if [[ $# -ne 0 ]]; then _fail "this script is hard-coded and does not accept arguments" 2 @@ -104,29 +103,24 @@ _create_token() { printf '%s' "$token" } -LK_TOKEN_TEST_A="$(_create_token "$LIVEKIT_IDENTITY_A")" -LK_TOKEN_TEST_B="$(_create_token "$LIVEKIT_IDENTITY_B")" -LIVEKIT_CALLER_TOKEN="$(_create_token "$LIVEKIT_CALLER_IDENTITY")" -LIVEKIT_RECEIVER_TOKEN="$(_create_token "$LIVEKIT_RECEIVER_IDENTITY")" +LIVEKIT_TOKEN_A="$(_create_token "$LIVEKIT_IDENTITY_A")" +LIVEKIT_TOKEN_B="$(_create_token "$LIVEKIT_IDENTITY_B")" + _apply() { - export LK_TOKEN_TEST_A - export LK_TOKEN_TEST_B - export LIVEKIT_CALLER_TOKEN - export LIVEKIT_RECEIVER_TOKEN + export LIVEKIT_TOKEN_A + export LIVEKIT_TOKEN_B export LIVEKIT_URL } _emit_eval() { - printf 'export LK_TOKEN_TEST_A=%q\n' "$LK_TOKEN_TEST_A" - printf 'export LK_TOKEN_TEST_B=%q\n' "$LK_TOKEN_TEST_B" - printf 'export LIVEKIT_CALLER_TOKEN=%q\n' "$LIVEKIT_CALLER_TOKEN" - printf 'export LIVEKIT_RECEIVER_TOKEN=%q\n' "$LIVEKIT_RECEIVER_TOKEN" + printf 'export LIVEKIT_TOKEN_A=%q\n' "$LIVEKIT_TOKEN_A" + printf 'export LIVEKIT_TOKEN_B=%q\n' "$LIVEKIT_TOKEN_B" printf 'export LIVEKIT_URL=%q\n' "$LIVEKIT_URL" } if [[ "$_sourced" -eq 1 ]]; then _apply - echo "LK_TOKEN_TEST_A, LK_TOKEN_TEST_B, LIVEKIT_CALLER_TOKEN, LIVEKIT_RECEIVER_TOKEN, and LIVEKIT_URL set for this shell." >&2 + echo "LIVEKIT_TOKEN_A, LIVEKIT_TOKEN_B, and LIVEKIT_URL set for this shell." >&2 else _emit_eval echo "set_data_track_test_tokens.bash: for this shell run: source $0 or: eval \"\$(bash $0 ...)\"" >&2 diff --git a/AGENTS.md b/AGENTS.md index f6a0d883..e78838f7 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -107,6 +107,9 @@ Updates to ./build.sh and ./build.cmd should be accompanied by updates to this f ./build.sh clean-all # Full clean (C++ + Rust targets) ``` +The build scripts pass an explicit job count to `cmake --build --parallel`. Set +`CMAKE_BUILD_PARALLEL_LEVEL` to override the default detected logical CPU count. + **Requirements:** CMake 3.20+, C++17, Rust toolchain (cargo), protoc. On macOS: `brew install cmake ninja protobuf abseil spdlog`. On Linux: see the CI workflow for apt packages. ### SDK Packaging diff --git a/README.md b/README.md index 6136542b..f037be5a 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,9 @@ Using build scripts: .\build.cmd release-all # Build Release with tests + examples ``` +The build scripts pass an explicit job count to `cmake --build --parallel`. Set +`CMAKE_BUILD_PARALLEL_LEVEL` to override the default detected logical CPU count. + ### Windows build using cmake/vcpkg ```bash cmake -S . -B build -DCMAKE_TOOLCHAIN_FILE="$PWD/vcpkg/scripts/buildsystems/vcpkg.cmake" # Generate Makefiles in build folder @@ -418,25 +421,43 @@ ctest --output-on-failure | `livekit_integration_tests` | Quick tests (~1-2 minutes) for SDK functionality | | `livekit_stress_tests` | Long-running tests (configurable, default 1 hour) | -### RPC Test Environment Variables +### Integration & Stress Test Environment Variables -RPC integration and stress tests require a LiveKit server and two participant tokens: +The integration and stress test suites (data tracks, RPC, media multistream, +etc.) require a LiveKit server and two participant tokens: ```bash # Required -export LIVEKIT_URL="wss://your-server.livekit.cloud" -export LIVEKIT_CALLER_TOKEN="" -export LIVEKIT_RECEIVER_TOKEN="" +export LIVEKIT_URL="ws://localhost:7880" # or wss://your-server.livekit.cloud +export LIVEKIT_TOKEN_A="" +export LIVEKIT_TOKEN_B="" # Optional (for stress tests) export RPC_STRESS_DURATION_SECONDS=3600 # Test duration (default: 1 hour) export RPC_STRESS_CALLER_THREADS=4 # Concurrent caller threads (default: 4) ``` -**Generate tokens for RPC tests:** +**Generate tokens for the test suites:** + +The easiest path is to source the helper script, which will mint both +participant tokens against a local `livekit-server --dev` and export +`LIVEKIT_TOKEN_A`, `LIVEKIT_TOKEN_B`, and `LIVEKIT_URL` for the current shell: + +```bash +source .token_helpers/set_data_track_test_tokens.bash +``` + +To generate tokens manually instead (e.g. against a non-default server): + ```bash -lk token create -r test -i rpc-caller --join --valid-for 99999h --dev --room=rpc-test-room -lk token create -r test -i rpc-receiver --join --valid-for 99999h --dev --room=rpc-test-room +export LIVEKIT_TOKEN_A="$(lk token create --api-key devkey --api-secret secret -i cpp-test-a \ + --join --valid-for 99999h --room cpp_data_track_test \ + --grant '{"canPublish":true,"canSubscribe":true,"canPublishData":true}' \ + --token-only)" +export LIVEKIT_TOKEN_B="$(lk token create --api-key devkey --api-secret secret -i cpp-test-b \ + --join --valid-for 99999h --room cpp_data_track_test \ + --grant '{"canPublish":true,"canSubscribe":true,"canPublishData":true}' \ + --token-only)" ``` ### Test Coverage diff --git a/build.cmd b/build.cmd index 7008df93..da6f08c6 100644 --- a/build.cmd +++ b/build.cmd @@ -7,6 +7,7 @@ set "BUILD_TYPE=Release" set "PRESET=windows-release" set "LIVEKIT_VERSION=" set "CMAKE_EXTRA_ARGS=" +set "BUILD_PARALLEL_JOBS=" REM ============================================================ REM Auto-detect LIBCLANG_PATH if not already set @@ -214,8 +215,17 @@ if errorlevel 1 ( goto build_only :build_only -echo ==^> Building (%BUILD_TYPE%)... -cmake --build "%BUILD_DIR%" --config %BUILD_TYPE% +if not defined BUILD_PARALLEL_JOBS ( + if defined CMAKE_BUILD_PARALLEL_LEVEL ( + set "BUILD_PARALLEL_JOBS=%CMAKE_BUILD_PARALLEL_LEVEL%" + ) else if defined NUMBER_OF_PROCESSORS ( + set "BUILD_PARALLEL_JOBS=%NUMBER_OF_PROCESSORS%" + ) else ( + set "BUILD_PARALLEL_JOBS=2" + ) +) +echo ==^> Building (%BUILD_TYPE%) with %BUILD_PARALLEL_JOBS% parallel jobs... +cmake --build "%BUILD_DIR%" --config %BUILD_TYPE% --parallel "%BUILD_PARALLEL_JOBS%" if errorlevel 1 ( echo Build failed! exit /b 1 diff --git a/build.sh b/build.sh index 13b67274..2a9560c4 100755 --- a/build.sh +++ b/build.sh @@ -161,14 +161,39 @@ configure() { fi } +detect_parallel_jobs() { + if [[ -n "${CMAKE_BUILD_PARALLEL_LEVEL:-}" ]]; then + echo "${CMAKE_BUILD_PARALLEL_LEVEL}" + return + fi + + local jobs="" + if command -v getconf >/dev/null 2>&1; then + jobs="$(getconf _NPROCESSORS_ONLN 2>/dev/null || true)" + fi + if [[ -z "${jobs}" || ! "${jobs}" =~ ^[0-9]+$ || "${jobs}" -lt 1 ]]; then + if command -v sysctl >/dev/null 2>&1; then + jobs="$(sysctl -n hw.ncpu 2>/dev/null || true)" + fi + fi + if [[ -z "${jobs}" || ! "${jobs}" =~ ^[0-9]+$ || "${jobs}" -lt 1 ]]; then + jobs="2" + fi + + echo "${jobs}" +} + build() { - echo "==> Building (${BUILD_TYPE})..." + local parallel_jobs + parallel_jobs="$(detect_parallel_jobs)" + + echo "==> Building (${BUILD_TYPE}) with ${parallel_jobs} parallel jobs..." if [[ -n "${PRESET}" ]] && [[ -f "${PROJECT_ROOT}/CMakePresets.json" ]]; then # Use preset build if available - cmake --build --preset "${PRESET}" + cmake --build --preset "${PRESET}" --parallel "${parallel_jobs}" else # Fallback to traditional build - cmake --build "${BUILD_DIR}" + cmake --build "${BUILD_DIR}" --parallel "${parallel_jobs}" fi } diff --git a/client-sdk-rust b/client-sdk-rust index fd3df873..776b3ab1 160000 --- a/client-sdk-rust +++ b/client-sdk-rust @@ -1 +1 @@ -Subproject commit fd3df87386cd0abd66fcc0e1dcc15f93235e56d2 +Subproject commit 776b3ab171c89a6ddc27847a2071ab0c1c763f7f diff --git a/src/data_track_stream.cpp b/src/data_track_stream.cpp index 30a355c8..5431f434 100644 --- a/src/data_track_stream.cpp +++ b/src/data_track_stream.cpp @@ -19,7 +19,7 @@ #include "data_track.pb.h" #include "ffi.pb.h" #include "ffi_client.h" -#include "lk_log.h" +#include "scoped_timer.h" #include @@ -37,33 +37,40 @@ void DataTrackStream::init(FfiHandle subscription_handle) { } bool DataTrackStream::read(DataTrackFrame &out) { + LK_SCOPED_TIMER("data_track_stream::read"); { const std::scoped_lock lock(mutex_); if (closed_ || eof_) { return false; } + } - const auto subscription_handle = - static_cast(subscription_handle_.get()); + const auto subscription_handle = + static_cast(subscription_handle_.get()); - // Signal the Rust side that we're ready to receive the next frame. - // The Rust SubscriptionTask uses a demand-driven protocol: it won't pull - // from the underlying stream until notified via this request. + // Signal the Rust side that we're ready to receive the next frame. + // The Rust SubscriptionTask uses a demand-driven protocol: it won't pull + // from the underlying stream until notified via this request. + { + LK_SCOPED_TIMER("data_track_stream::read.sendRequest(FFI)"); proto::FfiRequest req; auto *msg = req.mutable_data_track_stream_read(); msg->set_stream_handle(subscription_handle); FfiClient::instance().sendRequest(req); } - std::unique_lock lock(mutex_); - cv_.wait(lock, [this] { return frame_.has_value() || eof_ || closed_; }); + { + LK_SCOPED_TIMER("data_track_stream::read.wait"); + std::unique_lock lock(mutex_); + cv_.wait(lock, [this] { return frame_.has_value() || eof_ || closed_; }); - if (closed_ || (!frame_.has_value() && eof_)) { - return false; - } + if (closed_ || (!frame_.has_value() && eof_)) { + return false; + } - out = std::move(*frame_); // NOLINT(bugprone-unchecked-optional-access) - frame_.reset(); + out = std::move(*frame_); // NOLINT(bugprone-unchecked-optional-access) + frame_.reset(); + } return true; } @@ -88,9 +95,12 @@ void DataTrackStream::close() { } void DataTrackStream::onFfiEvent(const FfiEvent &event) { + // Fast-path filter without taking the timer cost: every listener sees + // every FFI event, but only DataTrackStreamEvents are interesting here. if (event.message_case() != FfiEvent::kDataTrackStreamEvent) { return; } + LK_SCOPED_TIMER("data_track_stream::onFfiEvent"); const auto &dts = event.data_track_stream_event(); { @@ -111,6 +121,7 @@ void DataTrackStream::onFfiEvent(const FfiEvent &event) { } void DataTrackStream::pushFrame(DataTrackFrame &&frame) { + LK_SCOPED_TIMER("data_track_stream::pushFrame"); const std::scoped_lock lock(mutex_); if (closed_ || eof_) { @@ -123,6 +134,7 @@ void DataTrackStream::pushFrame(DataTrackFrame &&frame) { frame_ = std::move(frame); // notify no matter what since we got a new frame + // mutex_.unlock(); cv_.notify_one(); } diff --git a/src/ffi_client.cpp b/src/ffi_client.cpp index 1f08d824..8ff343ac 100644 --- a/src/ffi_client.cpp +++ b/src/ffi_client.cpp @@ -31,6 +31,7 @@ #include "livekit_ffi.h" #include "room.pb.h" #include "room_proto_converter.h" +#include "scoped_timer.h" namespace livekit { @@ -185,6 +186,7 @@ void FfiClient::RemoveListener(ListenerId id) { proto::FfiResponse FfiClient::sendRequest(const proto::FfiRequest &request) const { + LK_SCOPED_TIMER("ffi_client::sendRequest"); std::string bytes; if (!request.SerializeToString(&bytes) || bytes.empty()) { throw std::runtime_error("failed to serialize FfiRequest"); @@ -213,6 +215,7 @@ FfiClient::sendRequest(const proto::FfiRequest &request) const { } void FfiClient::PushEvent(const proto::FfiEvent &event) const { + LK_SCOPED_TIMER("ffi_client::PushEvent"); std::unique_ptr to_complete; std::vector listeners_copy; { diff --git a/src/local_data_track.cpp b/src/local_data_track.cpp index 3c24a9cf..a5cf0d67 100644 --- a/src/local_data_track.cpp +++ b/src/local_data_track.cpp @@ -22,6 +22,7 @@ #include "data_track.pb.h" #include "ffi.pb.h" #include "ffi_client.h" +#include "scoped_timer.h" namespace livekit { @@ -35,6 +36,7 @@ LocalDataTrack::LocalDataTrack(const proto::OwnedLocalDataTrack &owned) Result LocalDataTrack::tryPush(const DataTrackFrame &frame) { + LK_SCOPED_TIMER("local_data_track::tryPush"); if (!handle_.valid()) { return Result::failure( LocalDataTrackTryPushError{ diff --git a/src/scoped_timer.h b/src/scoped_timer.h new file mode 100644 index 00000000..6b31d8a2 --- /dev/null +++ b/src/scoped_timer.h @@ -0,0 +1,77 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIVEKIT_SCOPED_TIMER_H +#define LIVEKIT_SCOPED_TIMER_H + +#include "lk_log.h" + +#include +#include + +namespace livekit::detail { + +// RAII timer that logs its label and elapsed microseconds at LK_LOG_DEBUG +// when it goes out of scope. +// +// Intended for ad hoc instrumentation of hot paths (e.g. the data-track +// FFI hop). To see output, raise the runtime log level to debug before +// running the test, e.g.: +// +// livekit::setLogLevel(livekit::LogLevel::Debug); +// +// In release builds compiled with -DLIVEKIT_LOG_LEVEL=INFO (or higher), +// the underlying LK_LOG_DEBUG call is stripped at compile time and this +// class collapses to a steady_clock pair the optimizer is free to drop. +// +// `label` is captured as a string_view; only string literals or storage +// that outlives the scope are safe. +class ScopedTimer { +public: + explicit ScopedTimer(std::string_view label) noexcept + : label_(label), start_(std::chrono::steady_clock::now()) {} + + ~ScopedTimer() { + const auto elapsed = std::chrono::steady_clock::now() - start_; + const auto us = + std::chrono::duration_cast(elapsed).count(); + LK_LOG_DEBUG("[scoped_timer] {} took {} us", label_, us); + } + + ScopedTimer(const ScopedTimer &) = delete; + ScopedTimer &operator=(const ScopedTimer &) = delete; + ScopedTimer(ScopedTimer &&) = delete; + ScopedTimer &operator=(ScopedTimer &&) = delete; + +private: + std::string_view label_; + std::chrono::steady_clock::time_point start_; +}; + +} // namespace livekit::detail + +#define LK_SCOPED_TIMER_CONCAT2(a, b) a##b +#define LK_SCOPED_TIMER_CONCAT(a, b) LK_SCOPED_TIMER_CONCAT2(a, b) + +// Drop one of these at the top of any scope to time it. Multiple per +// function are fine; pick distinct labels. +#define LK_SCOPED_TIMER(label) \ + ::livekit::detail::ScopedTimer LK_SCOPED_TIMER_CONCAT(_lk_scoped_timer_, \ + __LINE__) { \ + label \ + } + +#endif /* LIVEKIT_SCOPED_TIMER_H */ diff --git a/src/tests/common/test_common.h b/src/tests/common/test_common.h index 495a5f56..8a6e6aee 100644 --- a/src/tests/common/test_common.h +++ b/src/tests/common/test_common.h @@ -70,8 +70,8 @@ constexpr char kLocalTestLiveKitUrl[] = "ws://localhost:7880"; * * Environment variables: * LIVEKIT_URL - WebSocket URL of the LiveKit server - * LIVEKIT_CALLER_TOKEN - Token for the caller/sender participant - * LIVEKIT_RECEIVER_TOKEN - Token for the receiver participant + * LIVEKIT_TOKEN_A - Token for the first test participant + * LIVEKIT_TOKEN_B - Token for the second test participant * TEST_ITERATIONS - Number of iterations for iterative tests (default: * 10) STRESS_DURATION_SECONDS - Duration for stress tests in seconds (default: * 600) STRESS_CALLER_THREADS - Number of caller threads for stress tests @@ -79,8 +79,8 @@ constexpr char kLocalTestLiveKitUrl[] = "ws://localhost:7880"; */ struct TestConfig { std::string url; - std::string caller_token; - std::string receiver_token; + std::string token_a; + std::string token_b; int test_iterations; int stress_duration_seconds; int num_caller_threads; @@ -89,16 +89,16 @@ struct TestConfig { static TestConfig fromEnv() { TestConfig config; const char *url = std::getenv("LIVEKIT_URL"); - const char *caller_token = std::getenv("LIVEKIT_CALLER_TOKEN"); - const char *receiver_token = std::getenv("LIVEKIT_RECEIVER_TOKEN"); + const char *token_a = std::getenv("LIVEKIT_TOKEN_A"); + const char *token_b = std::getenv("LIVEKIT_TOKEN_B"); const char *iterations_env = std::getenv("TEST_ITERATIONS"); const char *duration_env = std::getenv("STRESS_DURATION_SECONDS"); const char *threads_env = std::getenv("STRESS_CALLER_THREADS"); - if (url && caller_token && receiver_token) { + if (url && token_a && token_b) { config.url = url; - config.caller_token = caller_token; - config.receiver_token = receiver_token; + config.token_a = token_a; + config.token_b = token_b; config.available = true; } @@ -142,17 +142,17 @@ inline bool waitForParticipant(Room *room, const std::string &identity, } inline std::array getDataTrackTestTokens() { - const char *token_a = std::getenv("LIVEKIT_CALLER_TOKEN"); + const char *token_a = std::getenv("LIVEKIT_TOKEN_A"); if (token_a == nullptr || std::string(token_a).empty()) { throw std::runtime_error( - "LIVEKIT_CALLER_TOKEN must be present and non-empty for data track E2E " + "LIVEKIT_TOKEN_A must be present and non-empty for data track E2E " "tests"); } - const char *token_b = std::getenv("LIVEKIT_RECEIVER_TOKEN"); + const char *token_b = std::getenv("LIVEKIT_TOKEN_B"); if (token_b == nullptr || std::string(token_b).empty()) { throw std::runtime_error( - "LIVEKIT_RECEIVER_TOKEN must be present and non-empty for data track E2E " + "LIVEKIT_TOKEN_B must be present and non-empty for data track E2E " "tests"); } @@ -216,7 +216,7 @@ testRooms(const std::vector &room_configs) { if (room_configs.size() > 2) { throw std::invalid_argument( - "testRooms supports at most two rooms with LIVEKIT_CALLER_TOKEN/B"); + "testRooms supports at most two rooms with LIVEKIT_TOKEN_A/LIVEKIT_TOKEN_B"); } auto tokens = getDataTrackTestTokens(); @@ -225,13 +225,20 @@ testRooms(const std::vector &room_configs) { rooms.reserve(room_configs.size()); for (size_t i = 0; i < room_configs.size(); ++i) { + // Force dual-PC for localhost. `livekit-server --dev` does not advertise + // /rtc/v1, so single-PC (the C++ SDK's default) attempts a handshake and + // falls back -- adding latency and flakiness on shared CI runners. The + // Rust E2E harness applies the same override + // (livekit/tests/common/e2e/mod.rs `force_v0`). + auto room_options = room_configs[i].room_options; + room_options.single_peer_connection = false; + auto room = std::make_unique(); if (room_configs[i].delegate != nullptr) { room->setDelegate(room_configs[i].delegate); } - if (!room->Connect(kLocalTestLiveKitUrl, tokens[i], - room_configs[i].room_options)) { + if (!room->Connect(kLocalTestLiveKitUrl, tokens[i], room_options)) { throw std::runtime_error("Failed to connect test room " + std::to_string(i)); } @@ -508,8 +515,8 @@ class LiveKitTestBase : public ::testing::Test { /// Skip the test if the required environment variables are not set void skipIfNotConfigured() { if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " - "LIVEKIT_RECEIVER_TOKEN not set"; + GTEST_SKIP() + << "LIVEKIT_URL, LIVEKIT_TOKEN_A, and LIVEKIT_TOKEN_B not set"; } } diff --git a/src/tests/integration/test_data_track.cpp b/src/tests/integration/test_data_track.cpp index 31a9b15f..ab6565ca 100644 --- a/src/tests/integration/test_data_track.cpp +++ b/src/tests/integration/test_data_track.cpp @@ -24,6 +24,7 @@ #include "../common/test_common.h" #include "ffi_client.h" +#include "lk_log.h" #include #include @@ -41,11 +42,9 @@ using namespace std::chrono_literals; namespace { constexpr char kTrackNamePrefix[] = "data_track_e2e"; -constexpr auto kPublishDuration = 5s; constexpr auto kTrackWaitTimeout = 10s; constexpr auto kReadTimeout = 30s; constexpr auto kPollingInterval = 10ms; -constexpr float kMinimumReceivedPercent = 0.95f; constexpr int kResubscribeIterations = 10; constexpr int kPublishManyTrackCount = 256; constexpr auto kPublishManyTimeout = 5s; @@ -212,11 +211,19 @@ TEST_P(DataTrackTransportTest, PublishesAndReceivesFramesEndToEnd) { const auto publish_fps = std::get<0>(GetParam()); const auto payload_len = std::get<1>(GetParam()); const auto track_name = makeTrackName("transport"); - const auto frame_count = static_cast(std::llround( - std::chrono::duration(kPublishDuration).count() * publish_fps)); - DataTrackPublishedDelegate subscriber_delegate; + // How long to publish frames for. + constexpr auto kPublishDuration = 10s; + + // Percentage of total frames that must be received on the subscriber end in + // order for the test to pass. + constexpr float kMinimumReceivedPercent = 0.95f; + std::vector room_configs(2); + room_configs[0].room_options.single_peer_connection = false; + room_configs[1].room_options.single_peer_connection = false; + + DataTrackPublishedDelegate subscriber_delegate; room_configs[1].delegate = &subscriber_delegate; auto rooms = testRooms(room_configs); @@ -224,51 +231,54 @@ TEST_P(DataTrackTransportTest, PublishesAndReceivesFramesEndToEnd) { const auto publisher_identity = publisher_room->localParticipant()->identity(); - std::exception_ptr publish_error; - std::thread publisher([&]() { - try { - auto track = - requirePublishedTrack(publisher_room->localParticipant(), track_name); - if (!track->isPublished()) { - throw std::runtime_error("Publisher failed to publish data track"); - } - if (track->info().uses_e2ee) { - throw std::runtime_error("Unexpected E2EE on test data track"); - } - if (track->info().name != track_name) { - throw std::runtime_error("Published track name mismatch"); - } - - const auto frame_interval = - std::chrono::duration_cast( - std::chrono::duration(1.0 / publish_fps)); - auto next_send = std::chrono::steady_clock::now(); - - std::cout << "Publishing " << frame_count - << " frames with payload length " << payload_len << std::endl; - for (size_t index = 0; index < frame_count; ++index) { - std::vector payload(payload_len, - static_cast(index)); - requirePushSuccess(track->tryPush(std::move(payload)), - "Failed to push data frame"); - - next_send += frame_interval; - std::this_thread::sleep_until(next_send); - } - - track->unpublishDataTrack(); - } catch (...) { - publish_error = std::current_exception(); - } - }); + auto track = + requirePublishedTrack(publisher_room->localParticipant(), track_name); + std::cerr << "Track published\n"; auto remote_track = subscriber_delegate.waitForTrack(kTrackWaitTimeout); + std::cerr << "Got remote track: " << remote_track->info().sid << "\n"; + ASSERT_NE(remote_track, nullptr) << "Timed out waiting for remote data track"; EXPECT_TRUE(remote_track->isPublished()); EXPECT_FALSE(remote_track->info().uses_e2ee); EXPECT_EQ(remote_track->info().name, track_name); EXPECT_EQ(remote_track->publisherIdentity(), publisher_identity); + const auto frame_count = static_cast(std::llround( + std::chrono::duration(kPublishDuration).count() * publish_fps)); + std::cout << "Publishing " << frame_count << " frames\n"; + + auto publish = [&]() { + if (!track->isPublished()) { + throw std::runtime_error("Publisher failed to publish data track"); + } + if (track->info().uses_e2ee) { + throw std::runtime_error("Unexpected E2EE on test data track"); + } + if (track->info().name != track_name) { + throw std::runtime_error("Published track name mismatch"); + } + + const auto frame_interval = + std::chrono::duration_cast( + std::chrono::duration(1.0 / publish_fps)); + auto next_send = std::chrono::steady_clock::now(); + + std::cout << "Publishing " << frame_count + << " frames with payload length " << payload_len << '\n'; + for (size_t index = 0; index < frame_count; ++index) { + std::vector payload(payload_len, + static_cast(index)); + requirePushSuccess(track->tryPush(std::move(payload)), + "Failed to push data frame"); + + next_send += frame_interval; + std::this_thread::sleep_until(next_send); + } + + track->unpublishDataTrack(); + }; + auto subscribe_result = remote_track->subscribe(); if (!subscribe_result) { FAIL() << describeDataTrackError(subscribe_result.error()); @@ -277,59 +287,62 @@ TEST_P(DataTrackTransportTest, PublishesAndReceivesFramesEndToEnd) { std::promise receive_count_promise; auto receive_count_future = receive_count_promise.get_future(); - std::exception_ptr subscribe_error; - std::thread subscriber([&]() { - try { - size_t received_count = 0; - DataTrackFrame frame; - while (subscription->read(frame) && received_count < frame_count) { - if (frame.payload.empty()) { - throw std::runtime_error("Received empty data frame"); - } - - const auto first_byte = frame.payload.front(); - if (!std::all_of(frame.payload.begin(), frame.payload.end(), - [first_byte](std::uint8_t byte) { - return byte == first_byte; - })) { - throw std::runtime_error("Received frame with inconsistent payload"); - } - if (frame.user_timestamp.has_value()) { - throw std::runtime_error( - "Received unexpected user timestamp in transport test"); - } - - ++received_count; + + auto subscribe = [&]() { + size_t received_count = 0; + DataTrackFrame frame; + while (subscription->read(frame) && received_count < frame_count) { + if (frame.payload.empty()) { + throw std::runtime_error("Received empty data frame"); } - receive_count_promise.set_value(received_count); - } catch (...) { - subscribe_error = std::current_exception(); - receive_count_promise.set_exception(std::current_exception()); + const auto first_byte = frame.payload.front(); + if (!std::all_of(frame.payload.begin(), frame.payload.end(), + [first_byte](std::uint8_t byte) { + return byte == first_byte; + })) { + throw std::runtime_error("Received frame with inconsistent payload"); + } + if (frame.user_timestamp.has_value()) { + throw std::runtime_error( + "Received unexpected user timestamp in transport test"); + } + + ++received_count; } - }); - if (receive_count_future.wait_for(kReadTimeout) != - std::future_status::ready) { - subscription->close(); - ADD_FAILURE() << "Timed out waiting for data frames"; - } + receive_count_promise.set_value(received_count); + }; - subscriber.join(); - publisher.join(); + // Launch both — these START IMMEDIATELY on background threads. + auto pub_fut = std::async(std::launch::async, publish); + auto sub_fut = std::async(std::launch::async, subscribe); - if (publish_error) { - std::rethrow_exception(publish_error); - } - if (subscribe_error) { - std::rethrow_exception(subscribe_error); + // Wait for both, with a combined deadline (the timeout(...) wrapper). + const auto deadline = std::chrono::steady_clock::now() + kPublishDuration + kReadTimeout; + + // if (receive_count_future.wait_for(kReadTimeout) != + // std::future_status::ready) { + // subscription->close(); + // ADD_FAILURE() << "Timed out waiting for data frames"; + // } + + const bool pub_ok = pub_fut.wait_until(deadline) == std::future_status::ready; + const bool sub_ok = sub_fut.wait_until(deadline) == std::future_status::ready; + + if (!pub_ok || !sub_ok) { + ADD_FAILURE() << "Timed out waiting for data frames"; } + // Equivalent of `try_join!`'s ? — re-throws any exception from either task. + pub_fut.get(); + sub_fut.get(); + const auto received_count = receive_count_future.get(); const auto received_percent = static_cast(received_count) / static_cast(frame_count); std::cout << "Received " << received_count << "/" << frame_count - << " frames (" << received_percent * 100.0f << "%)" << std::endl; + << " frames (" << received_percent * 100.0f << "%)" << '\n'; EXPECT_GE(received_percent, kMinimumReceivedPercent) << "Received " << received_count << "/" << frame_count << " frames"; diff --git a/src/tests/integration/test_media_multistream.cpp b/src/tests/integration/test_media_multistream.cpp index 8d48f3c5..9dcc03da 100644 --- a/src/tests/integration/test_media_multistream.cpp +++ b/src/tests/integration/test_media_multistream.cpp @@ -86,8 +86,8 @@ class MediaMultiStreamIntegrationTest : public LiveKitTestBase { void MediaMultiStreamIntegrationTest::runPublishTwoVideoAndTwoAudioTracks( bool single_peer_connection) { if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " - "LIVEKIT_RECEIVER_TOKEN not set"; + GTEST_SKIP() + << "LIVEKIT_URL, LIVEKIT_TOKEN_A, and LIVEKIT_TOKEN_B not set"; } RoomOptions options; @@ -100,11 +100,11 @@ void MediaMultiStreamIntegrationTest::runPublishTwoVideoAndTwoAudioTracks( auto receiver_room = std::make_unique(); receiver_room->setDelegate(&receiver_delegate); ASSERT_TRUE( - receiver_room->Connect(config_.url, config_.receiver_token, options)) + receiver_room->Connect(config_.url, config_.token_b, options)) << "Receiver failed to connect"; auto sender_room = std::make_unique(); - ASSERT_TRUE(sender_room->Connect(config_.url, config_.caller_token, options)) + ASSERT_TRUE(sender_room->Connect(config_.url, config_.token_a, options)) << "Sender failed to connect"; const std::string receiver_identity = diff --git a/src/tests/integration/test_room.cpp b/src/tests/integration/test_room.cpp index a5f4fbe5..1a1339b1 100644 --- a/src/tests/integration/test_room.cpp +++ b/src/tests/integration/test_room.cpp @@ -98,14 +98,14 @@ TEST_F(RoomTest, RemoteParticipantLookupBeforeConnect) { << "Looking up participant before connect should return nullptr"; } -// Server-dependent tests - require LIVEKIT_URL and LIVEKIT_TOKEN env vars +// Server-dependent tests - require LIVEKIT_URL and LIVEKIT_TOKEN_A env vars class RoomServerTest : public ::testing::Test { protected: void SetUp() override { livekit::initialize(livekit::LogLevel::Info, livekit::LogSink::kConsole); const char *url_env = std::getenv("LIVEKIT_URL"); - const char *token_env = std::getenv("LIVEKIT_CALLER_TOKEN"); + const char *token_env = std::getenv("LIVEKIT_TOKEN_A"); if (url_env && token_env) { server_url_ = url_env; @@ -123,7 +123,7 @@ class RoomServerTest : public ::testing::Test { TEST_F(RoomServerTest, ConnectToServer) { if (!server_available_) { - GTEST_SKIP() << "LIVEKIT_URL and LIVEKIT_TOKEN not set, skipping server " + GTEST_SKIP() << "LIVEKIT_URL and LIVEKIT_TOKEN_A not set, skipping server " "connection test"; } diff --git a/src/tests/integration/test_rpc.cpp b/src/tests/integration/test_rpc.cpp index 939a6f66..ab03d656 100644 --- a/src/tests/integration/test_rpc.cpp +++ b/src/tests/integration/test_rpc.cpp @@ -36,20 +36,20 @@ constexpr size_t kMaxRpcPayloadSize = 15 * 1024; // Test configuration from environment variables struct RpcTestConfig { std::string url; - std::string caller_token; - std::string receiver_token; + std::string token_a; + std::string token_b; bool available = false; static RpcTestConfig fromEnv() { RpcTestConfig config; const char *url = std::getenv("LIVEKIT_URL"); - const char *caller_token = std::getenv("LIVEKIT_CALLER_TOKEN"); - const char *receiver_token = std::getenv("LIVEKIT_RECEIVER_TOKEN"); + const char *token_a = std::getenv("LIVEKIT_TOKEN_A"); + const char *token_b = std::getenv("LIVEKIT_TOKEN_B"); - if (url && caller_token && receiver_token) { + if (url && token_a && token_b) { config.url = url; - config.caller_token = caller_token; - config.receiver_token = receiver_token; + config.token_a = token_a; + config.token_b = token_b; config.available = true; } return config; @@ -117,8 +117,8 @@ class RpcIntegrationTest : public ::testing::Test { // Test basic RPC round-trip TEST_F(RpcIntegrationTest, BasicRpcRoundTrip) { if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " - "LIVEKIT_RECEIVER_TOKEN not set"; + GTEST_SKIP() + << "LIVEKIT_URL, LIVEKIT_TOKEN_A, and LIVEKIT_TOKEN_B not set"; } // Create receiver room @@ -127,7 +127,7 @@ TEST_F(RpcIntegrationTest, BasicRpcRoundTrip) { receiver_options.auto_subscribe = true; bool receiver_connected = receiver_room->Connect( - config_.url, config_.receiver_token, receiver_options); + config_.url, config_.token_b, receiver_options); ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; std::string receiver_identity = receiver_room->localParticipant()->identity(); @@ -154,7 +154,7 @@ TEST_F(RpcIntegrationTest, BasicRpcRoundTrip) { caller_options.auto_subscribe = true; bool caller_connected = - caller_room->Connect(config_.url, config_.caller_token, caller_options); + caller_room->Connect(config_.url, config_.token_a, caller_options); ASSERT_TRUE(caller_connected) << "Caller failed to connect"; // Wait for receiver to be visible to caller @@ -187,8 +187,8 @@ TEST_F(RpcIntegrationTest, BasicRpcRoundTrip) { // Test maximum payload size (15KB) TEST_F(RpcIntegrationTest, MaxPayloadSize) { if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " - "LIVEKIT_RECEIVER_TOKEN not set"; + GTEST_SKIP() + << "LIVEKIT_URL, LIVEKIT_TOKEN_A, and LIVEKIT_TOKEN_B not set"; } auto receiver_room = std::make_unique(); @@ -196,7 +196,7 @@ TEST_F(RpcIntegrationTest, MaxPayloadSize) { options.auto_subscribe = true; bool receiver_connected = - receiver_room->Connect(config_.url, config_.receiver_token, options); + receiver_room->Connect(config_.url, config_.token_b, options); ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; std::string receiver_identity = receiver_room->localParticipant()->identity(); @@ -210,7 +210,7 @@ TEST_F(RpcIntegrationTest, MaxPayloadSize) { auto caller_room = std::make_unique(); bool caller_connected = - caller_room->Connect(config_.url, config_.caller_token, options); + caller_room->Connect(config_.url, config_.token_a, options); ASSERT_TRUE(caller_connected) << "Caller failed to connect"; bool receiver_visible = @@ -232,8 +232,8 @@ TEST_F(RpcIntegrationTest, MaxPayloadSize) { // Test RPC timeout TEST_F(RpcIntegrationTest, RpcTimeout) { if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " - "LIVEKIT_RECEIVER_TOKEN not set"; + GTEST_SKIP() + << "LIVEKIT_URL, LIVEKIT_TOKEN_A, and LIVEKIT_TOKEN_B not set"; } auto receiver_room = std::make_unique(); @@ -241,7 +241,7 @@ TEST_F(RpcIntegrationTest, RpcTimeout) { options.auto_subscribe = true; bool receiver_connected = - receiver_room->Connect(config_.url, config_.receiver_token, options); + receiver_room->Connect(config_.url, config_.token_b, options); ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; std::string receiver_identity = receiver_room->localParticipant()->identity(); @@ -256,7 +256,7 @@ TEST_F(RpcIntegrationTest, RpcTimeout) { auto caller_room = std::make_unique(); bool caller_connected = - caller_room->Connect(config_.url, config_.caller_token, options); + caller_room->Connect(config_.url, config_.token_a, options); ASSERT_TRUE(caller_connected) << "Caller failed to connect"; bool receiver_visible = @@ -279,8 +279,8 @@ TEST_F(RpcIntegrationTest, RpcTimeout) { // Test RPC with unsupported method TEST_F(RpcIntegrationTest, UnsupportedMethod) { if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " - "LIVEKIT_RECEIVER_TOKEN not set"; + GTEST_SKIP() + << "LIVEKIT_URL, LIVEKIT_TOKEN_A, and LIVEKIT_TOKEN_B not set"; } auto receiver_room = std::make_unique(); @@ -288,14 +288,14 @@ TEST_F(RpcIntegrationTest, UnsupportedMethod) { options.auto_subscribe = true; bool receiver_connected = - receiver_room->Connect(config_.url, config_.receiver_token, options); + receiver_room->Connect(config_.url, config_.token_b, options); ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; std::string receiver_identity = receiver_room->localParticipant()->identity(); auto caller_room = std::make_unique(); bool caller_connected = - caller_room->Connect(config_.url, config_.caller_token, options); + caller_room->Connect(config_.url, config_.token_a, options); ASSERT_TRUE(caller_connected) << "Caller failed to connect"; bool receiver_visible = @@ -319,8 +319,8 @@ TEST_F(RpcIntegrationTest, UnsupportedMethod) { // Test RPC with application error TEST_F(RpcIntegrationTest, ApplicationError) { if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " - "LIVEKIT_RECEIVER_TOKEN not set"; + GTEST_SKIP() + << "LIVEKIT_URL, LIVEKIT_TOKEN_A, and LIVEKIT_TOKEN_B not set"; } auto receiver_room = std::make_unique(); @@ -328,7 +328,7 @@ TEST_F(RpcIntegrationTest, ApplicationError) { options.auto_subscribe = true; bool receiver_connected = - receiver_room->Connect(config_.url, config_.receiver_token, options); + receiver_room->Connect(config_.url, config_.token_b, options); ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; std::string receiver_identity = receiver_room->localParticipant()->identity(); @@ -342,7 +342,7 @@ TEST_F(RpcIntegrationTest, ApplicationError) { auto caller_room = std::make_unique(); bool caller_connected = - caller_room->Connect(config_.url, config_.caller_token, options); + caller_room->Connect(config_.url, config_.token_a, options); ASSERT_TRUE(caller_connected) << "Caller failed to connect"; bool receiver_visible = @@ -366,8 +366,8 @@ TEST_F(RpcIntegrationTest, ApplicationError) { // Test multiple concurrent RPC calls TEST_F(RpcIntegrationTest, ConcurrentRpcCalls) { if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " - "LIVEKIT_RECEIVER_TOKEN not set"; + GTEST_SKIP() + << "LIVEKIT_URL, LIVEKIT_TOKEN_A, and LIVEKIT_TOKEN_B not set"; } auto receiver_room = std::make_unique(); @@ -375,7 +375,7 @@ TEST_F(RpcIntegrationTest, ConcurrentRpcCalls) { options.auto_subscribe = true; bool receiver_connected = - receiver_room->Connect(config_.url, config_.receiver_token, options); + receiver_room->Connect(config_.url, config_.token_b, options); ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; std::string receiver_identity = receiver_room->localParticipant()->identity(); @@ -393,7 +393,7 @@ TEST_F(RpcIntegrationTest, ConcurrentRpcCalls) { auto caller_room = std::make_unique(); bool caller_connected = - caller_room->Connect(config_.url, config_.caller_token, options); + caller_room->Connect(config_.url, config_.token_a, options); ASSERT_TRUE(caller_connected) << "Caller failed to connect"; bool receiver_visible = @@ -434,8 +434,8 @@ TEST_F(RpcIntegrationTest, ConcurrentRpcCalls) { // Integration test: Run for approximately 1 minute TEST_F(RpcIntegrationTest, OneMinuteIntegration) { if (!config_.available) { - GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " - "LIVEKIT_RECEIVER_TOKEN not set"; + GTEST_SKIP() + << "LIVEKIT_URL, LIVEKIT_TOKEN_A, and LIVEKIT_TOKEN_B not set"; } auto receiver_room = std::make_unique(); @@ -443,7 +443,7 @@ TEST_F(RpcIntegrationTest, OneMinuteIntegration) { options.auto_subscribe = true; bool receiver_connected = - receiver_room->Connect(config_.url, config_.receiver_token, options); + receiver_room->Connect(config_.url, config_.token_b, options); ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; std::string receiver_identity = receiver_room->localParticipant()->identity(); @@ -461,7 +461,7 @@ TEST_F(RpcIntegrationTest, OneMinuteIntegration) { auto caller_room = std::make_unique(); bool caller_connected = - caller_room->Connect(config_.url, config_.caller_token, options); + caller_room->Connect(config_.url, config_.token_a, options); ASSERT_TRUE(caller_connected) << "Caller failed to connect"; bool receiver_visible = diff --git a/src/tests/integration/test_video_frame_metadata.cpp b/src/tests/integration/test_video_frame_metadata.cpp index 59dc1557..faf8dcb8 100644 --- a/src/tests/integration/test_video_frame_metadata.cpp +++ b/src/tests/integration/test_video_frame_metadata.cpp @@ -15,12 +15,15 @@ */ #include "tests/common/test_common.h" -#include "video_utils.h" +#include +#include #include #include +#include #include #include +#include namespace livekit::test { @@ -35,8 +38,8 @@ TEST_F(VideoFrameMetadataServerTest, RoomOptions options; ASSERT_TRUE( - receiver_room.Connect(config_.url, config_.receiver_token, options)); - ASSERT_TRUE(sender_room.Connect(config_.url, config_.caller_token, options)); + receiver_room.Connect(config_.url, config_.token_b, options)); + ASSERT_TRUE(sender_room.Connect(config_.url, config_.token_a, options)); ASSERT_NE(sender_room.localParticipant(), nullptr); ASSERT_NE(receiver_room.localParticipant(), nullptr); @@ -54,8 +57,12 @@ TEST_F(VideoFrameMetadataServerTest, sender_identity, track_name, [&mutex, &cv, &received_user_timestamp_us](const VideoFrameEvent &event) { std::lock_guard lock(mutex); - if (event.metadata && event.metadata->user_timestamp_us.has_value()) { - received_user_timestamp_us = event.metadata->user_timestamp_us; + if (!event.metadata) { + return; + } + const auto &user_timestamp_us = event.metadata->user_timestamp_us; + if (user_timestamp_us.has_value() && *user_timestamp_us != 0) { + received_user_timestamp_us = user_timestamp_us; cv.notify_all(); } }); @@ -105,31 +112,57 @@ TEST_F(VideoFrameMetadataServerTest, ASSERT_TRUE(receiver_track_ready) << "Timed out waiting for receiver video track subscription"; - VideoFrame frame = VideoFrame::create(16, 16, VideoBufferType::RGBA); - std::fill(frame.data(), frame.data() + frame.dataSize(), 0x7f); - - const std::uint64_t expected_user_timestamp_us = getTimestampUs(); - VideoCaptureOptions capture_options; - capture_options.timestamp_us = - static_cast(expected_user_timestamp_us); - capture_options.metadata = VideoFrameMetadata{}; - capture_options.metadata->user_timestamp_us = expected_user_timestamp_us; - - source->captureFrame(frame, capture_options); + std::atomic publishing{true}; + std::thread publisher([&]() { + VideoFrame frame = VideoFrame::create(16, 16, VideoBufferType::RGBA); + std::fill(frame.data(), frame.data() + frame.dataSize(), 0x7f); + + while (publishing.load(std::memory_order_relaxed)) { + const std::uint64_t user_timestamp_us = getTimestampUs(); + VideoCaptureOptions capture_options; + capture_options.timestamp_us = + static_cast(user_timestamp_us); + capture_options.metadata = VideoFrameMetadata{}; + capture_options.metadata->user_timestamp_us = user_timestamp_us; + + try { + source->captureFrame(frame, capture_options); + } catch (...) { + publishing.store(false, std::memory_order_relaxed); + break; + } + std::this_thread::sleep_for(50ms); + } + }); + bool got_metadata = false; { std::unique_lock lock(mutex); - ASSERT_TRUE(cv.wait_for(lock, 10s, [&received_user_timestamp_us] { + got_metadata = cv.wait_for(lock, 10s, [&received_user_timestamp_us] { return received_user_timestamp_us.has_value(); - })) - << "Timed out waiting for user timestamp metadata"; - EXPECT_EQ(*received_user_timestamp_us, expected_user_timestamp_us); + }); + } + + publishing.store(false, std::memory_order_relaxed); + publisher.join(); + + std::optional received_user_timestamp_snapshot; + { + std::lock_guard lock(mutex); + received_user_timestamp_snapshot = received_user_timestamp_us; } receiver_room.clearOnVideoFrameCallback(sender_identity, track_name); if (track->publication()) { sender_room.localParticipant()->unpublishTrack(track->publication()->sid()); } + + ASSERT_TRUE(got_metadata) << "Timed out waiting for user timestamp metadata"; + ASSERT_TRUE(received_user_timestamp_snapshot.has_value()); + + const auto received_at = getTimestampUs(); + ASSERT_LE(*received_user_timestamp_snapshot, received_at); + EXPECT_LT(received_at - *received_user_timestamp_snapshot, 1000000u); } } // namespace livekit::test diff --git a/src/tests/stress/test_latency_measurement.cpp b/src/tests/stress/test_latency_measurement.cpp index 23e89bba..9b6b30f8 100644 --- a/src/tests/stress/test_latency_measurement.cpp +++ b/src/tests/stress/test_latency_measurement.cpp @@ -292,7 +292,7 @@ TEST_F(LatencyMeasurementTest, ConnectionTime) { auto start = std::chrono::high_resolution_clock::now(); // Room::Connect() has built-in TRACE_EVENT0 for automatic timing - bool connected = room->Connect(config_.url, config_.caller_token, options); + bool connected = room->Connect(config_.url, config_.token_a, options); auto end = std::chrono::high_resolution_clock::now(); if (connected) { @@ -395,16 +395,16 @@ TEST_F(LatencyMeasurementTest, AudioLatency) { options.auto_subscribe = true; bool receiver_connected = - receiver_room->Connect(config_.url, config_.receiver_token, options); + receiver_room->Connect(config_.url, config_.token_b, options); ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; std::string receiver_identity = receiver_room->localParticipant()->identity(); std::cout << "Receiver connected as: " << receiver_identity << std::endl; - // Create sender room (using caller_token) + // Create sender room (using token_a) auto sender_room = std::make_unique(); bool sender_connected = - sender_room->Connect(config_.url, config_.caller_token, options); + sender_room->Connect(config_.url, config_.token_a, options); ASSERT_TRUE(sender_connected) << "Sender failed to connect"; std::string sender_identity = sender_room->localParticipant()->identity(); @@ -605,9 +605,9 @@ TEST_F(LatencyMeasurementTest, FullDeplexAudioLatency) { RoomOptions options; options.auto_subscribe = true; - ASSERT_TRUE(room_a->Connect(config_.url, config_.caller_token, options)) + ASSERT_TRUE(room_a->Connect(config_.url, config_.token_a, options)) << "Participant A failed to connect"; - ASSERT_TRUE(room_b->Connect(config_.url, config_.receiver_token, options)) + ASSERT_TRUE(room_b->Connect(config_.url, config_.token_b, options)) << "Participant B failed to connect"; std::string id_a = room_a->localParticipant()->identity(); diff --git a/src/tests/stress/test_room_stress.cpp b/src/tests/stress/test_room_stress.cpp index 1f84db10..e98b32d2 100644 --- a/src/tests/stress/test_room_stress.cpp +++ b/src/tests/stress/test_room_stress.cpp @@ -192,7 +192,7 @@ class RoomServerStressTest : public ::testing::Test { livekit::initialize(livekit::LogLevel::Info, livekit::LogSink::kConsole); const char *url_env = std::getenv("LIVEKIT_URL"); - const char *token_env = std::getenv("LIVEKIT_CALLER_TOKEN"); + const char *token_env = std::getenv("LIVEKIT_TOKEN_A"); if (url_env && token_env) { server_url_ = url_env; @@ -210,8 +210,8 @@ class RoomServerStressTest : public ::testing::Test { TEST_F(RoomServerStressTest, RepeatedConnectDisconnect) { if (!server_available_) { - GTEST_SKIP() - << "LIVEKIT_URL and LIVEKIT_TOKEN not set, skipping server stress test"; + GTEST_SKIP() << "LIVEKIT_URL and LIVEKIT_TOKEN_A not set, skipping server " + "stress test"; } const int num_iterations = 10; diff --git a/src/tests/stress/test_rpc_stress.cpp b/src/tests/stress/test_rpc_stress.cpp index 9db7a9ec..89e249fd 100644 --- a/src/tests/stress/test_rpc_stress.cpp +++ b/src/tests/stress/test_rpc_stress.cpp @@ -164,7 +164,7 @@ TEST_F(RpcStressTest, MaxPayloadStress) { options.auto_subscribe = true; bool receiver_connected = - receiver_room->Connect(config_.url, config_.receiver_token, options); + receiver_room->Connect(config_.url, config_.token_b, options); ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; auto receiver_info = receiver_room->room_info(); @@ -189,7 +189,7 @@ TEST_F(RpcStressTest, MaxPayloadStress) { // Create caller room auto caller_room = std::make_unique(); bool caller_connected = - caller_room->Connect(config_.url, config_.caller_token, options); + caller_room->Connect(config_.url, config_.token_a, options); ASSERT_TRUE(caller_connected) << "Caller failed to connect"; auto caller_info = caller_room->room_info(); @@ -366,7 +366,7 @@ TEST_F(RpcStressTest, SmallPayloadStress) { options.auto_subscribe = true; bool receiver_connected = - receiver_room->Connect(config_.url, config_.receiver_token, options); + receiver_room->Connect(config_.url, config_.token_b, options); ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; auto receiver_info = receiver_room->room_info(); @@ -390,7 +390,7 @@ TEST_F(RpcStressTest, SmallPayloadStress) { // Create caller room auto caller_room = std::make_unique(); bool caller_connected = - caller_room->Connect(config_.url, config_.caller_token, options); + caller_room->Connect(config_.url, config_.token_a, options); ASSERT_TRUE(caller_connected) << "Caller failed to connect"; auto caller_info = caller_room->room_info(); @@ -553,11 +553,11 @@ TEST_F(RpcStressTest, BidirectionalRpcStress) { options.auto_subscribe = true; bool a_connected = - room_a->Connect(config_.url, config_.caller_token, options); + room_a->Connect(config_.url, config_.token_a, options); ASSERT_TRUE(a_connected) << "Room A failed to connect"; bool b_connected = - room_b->Connect(config_.url, config_.receiver_token, options); + room_b->Connect(config_.url, config_.token_b, options); ASSERT_TRUE(b_connected) << "Room B failed to connect"; std::string identity_a = room_a->localParticipant()->identity(); @@ -774,7 +774,7 @@ TEST_F(RpcStressTest, HighThroughputBurst) { options.auto_subscribe = true; bool receiver_connected = - receiver_room->Connect(config_.url, config_.receiver_token, options); + receiver_room->Connect(config_.url, config_.token_b, options); ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; std::string receiver_identity = receiver_room->localParticipant()->identity(); @@ -792,7 +792,7 @@ TEST_F(RpcStressTest, HighThroughputBurst) { auto caller_room = std::make_unique(); bool caller_connected = - caller_room->Connect(config_.url, config_.caller_token, options); + caller_room->Connect(config_.url, config_.token_a, options); ASSERT_TRUE(caller_connected) << "Caller failed to connect"; bool receiver_visible =