diff --git a/.github/scripts/copy_sources.sh b/.github/scripts/copy_sources.sh index 0df256bb9e..0430dab1a4 100755 --- a/.github/scripts/copy_sources.sh +++ b/.github/scripts/copy_sources.sh @@ -57,6 +57,7 @@ cp $2/LICENSE $tmp_dir cp $2/README.md $tmp_dir cp $2/tests/slo_workloads/.dockerignore $tmp_dir/tests/slo_workloads cp $2/tests/slo_workloads/Dockerfile $tmp_dir/tests/slo_workloads +cp $2/tests/slo_workloads/Dockerfile.userver $tmp_dir/tests/slo_workloads cp $2/include/ydb-cpp-sdk/type_switcher.h $tmp_dir/include/ydb-cpp-sdk/type_switcher.h cp $2/src/version.h $tmp_dir/src/version.h diff --git a/.github/workflows/slo.yml b/.github/workflows/slo.yml index 0618526617..091ef7c661 100644 --- a/.github/workflows/slo.yml +++ b/.github/workflows/slo.yml @@ -2,275 +2,178 @@ name: SLO on: pull_request: - types: [opened, reopened, synchronize] - branches: - - main - workflow_dispatch: - inputs: - github_issue: - description: "GitHub issue number where the SLO results will be reported" - required: true - baseline_ref: - description: "Baseline commit/branch/tag to compare against (leave empty to auto-detect merge-base with main)" - required: false - slo_workload_duration_seconds: - description: "Duration of the SLO workload in seconds" - required: false - default: "600" - slo_workload_read_max_rps: - description: "Maximum read RPS for the SLO workload" - required: false - default: "1000" - slo_workload_write_max_rps: - description: "Maximum write RPS for the SLO workload" - required: false - default: "100" + types: [opened, reopened, synchronize, labeled] jobs: ydb-slo-action: - name: Run YDB SLO Tests + if: contains(github.event.pull_request.labels.*.name, 'SLO') + + name: Run YDB SLO Tests (${{ matrix.sdk.name }}) runs-on: ubuntu-latest + permissions: + contents: read + strategy: + fail-fast: false matrix: - compiler: [clang, gcc] - include: - - workload: table + sdk: + - name: cpp-native + preset: release-test-clang + dockerfile: Dockerfile + command: "" + - name: cpp-userver + preset: release-test-clang + dockerfile: Dockerfile.userver + command: "" concurrency: - group: slo-${{ github.ref }}-${{ matrix.os }}-${{ matrix.workload }}-${{ matrix.compiler }} + group: slo-${{ github.ref }}-${{ matrix.sdk.name }} cancel-in-progress: true steps: - name: Install dependencies run: | + set -euxo pipefail YQ_VERSION=v4.48.2 BUILDX_VERSION=0.30.1 COMPOSE_VERSION=2.40.3 - sudo curl -L https://github.com/mikefarah/yq/releases/download/${YQ_VERSION}/yq_linux_amd64 -o /usr/local/bin/yq && \ - sudo chmod +x /usr/local/bin/yq + sudo curl -fLo /usr/local/bin/yq \ + "https://github.com/mikefarah/yq/releases/download/${YQ_VERSION}/yq_linux_amd64" + sudo chmod +x /usr/local/bin/yq - echo "Updating Docker plugins..." sudo mkdir -p /usr/local/lib/docker/cli-plugins - echo "Installing Docker Buildx ${BUILDX_VERSION}..." sudo curl -fLo /usr/local/lib/docker/cli-plugins/docker-buildx \ "https://github.com/docker/buildx/releases/download/v${BUILDX_VERSION}/buildx-v${BUILDX_VERSION}.linux-amd64" sudo chmod +x /usr/local/lib/docker/cli-plugins/docker-buildx - echo "Installing Docker Compose ${COMPOSE_VERSION}..." sudo curl -fLo /usr/local/lib/docker/cli-plugins/docker-compose \ "https://github.com/docker/compose/releases/download/v${COMPOSE_VERSION}/docker-compose-linux-x86_64" sudo chmod +x /usr/local/lib/docker/cli-plugins/docker-compose - echo "Installed versions:" yq --version docker --version docker buildx version docker compose version - - name: Checkout current version + - name: Checkout current SDK version uses: actions/checkout@v5 with: - path: current + path: sdk-current fetch-depth: 0 submodules: true - name: Determine baseline commit id: baseline + working-directory: sdk-current run: | - cd current - if [[ -n "${{ inputs.baseline_ref }}" ]]; then - BASELINE="${{ inputs.baseline_ref }}" - else - BASELINE=$(git merge-base HEAD origin/main) - fi - echo "sha=$BASELINE" >> $GITHUB_OUTPUT + set -euo pipefail + BASELINE=$(git merge-base HEAD origin/main) + echo "sha=${BASELINE}" >> "$GITHUB_OUTPUT" - # Try to determine a human-readable ref name for baseline - # Check if baseline is on main - if git merge-base --is-ancestor $BASELINE origin/main && \ - [ "$(git rev-parse origin/main)" = "$BASELINE" ]; then + if git merge-base --is-ancestor "${BASELINE}" origin/main && \ + [ "$(git rev-parse origin/main)" = "${BASELINE}" ]; then BASELINE_REF="main" else - # Try to find a branch containing this commit - BRANCH=$(git branch -r --contains $BASELINE | grep -v HEAD | head -1 | sed 's/.*\///' || echo "") - if [ -n "$BRANCH" ]; then + BRANCH=$(git branch -r --contains "${BASELINE}" | grep -v HEAD | head -1 | sed 's|.*/||' || echo "") + if [ -n "${BRANCH}" ]; then BASELINE_REF="${BRANCH}@${BASELINE:0:7}" else BASELINE_REF="${BASELINE:0:7}" fi fi - echo "ref=$BASELINE_REF" >> $GITHUB_OUTPUT + echo "ref=${BASELINE_REF}" >> "$GITHUB_OUTPUT" - - name: Checkout baseline version + - name: Checkout baseline SDK version uses: actions/checkout@v5 with: ref: ${{ steps.baseline.outputs.sha }} - path: baseline + path: sdk-baseline fetch-depth: 1 submodules: true - - name: Build Workload Image - run: | - echo "Cleaning up Docker system before builds..." - docker system prune -af --volumes - docker builder prune -af - df -h - - # Build current version - if [ -f "$GITHUB_WORKSPACE/current/tests/slo_workloads/Dockerfile" ]; then - echo "Building current app image..." - cd "$GITHUB_WORKSPACE/current" - - # Use SLO-specific .dockerignore - cp tests/slo_workloads/.dockerignore .dockerignore - - docker build -t ydb-app-current \ - --build-arg REF="${{ github.head_ref || github.ref_name }}" \ - --build-arg PRESET=release-test-${{ matrix.compiler }} \ - -f tests/slo_workloads/Dockerfile . - - # Clean up .dockerignore - rm -f .dockerignore - else - echo "No current app Dockerfile found" - exit 1 - fi - - docker system prune -f --volumes - docker builder prune -af - - # Build baseline version - if [ -f "$GITHUB_WORKSPACE/baseline/tests/slo_workloads/Dockerfile" ]; then - echo "Building baseline app image..." - cd "$GITHUB_WORKSPACE/baseline" - - # Use SLO-specific .dockerignore - cp tests/slo_workloads/.dockerignore .dockerignore + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 - docker build -t ydb-app-baseline \ - --build-arg REF="${{ steps.baseline.outputs.ref }}" \ - --build-arg PRESET=release-test-${{ matrix.compiler }} \ - -f tests/slo_workloads/Dockerfile . - - # Clean up .dockerignore - rm -f .dockerignore - else - echo "No baseline app Dockerfile found" - exit 1 - fi - - docker system prune -f --volumes - docker builder prune -af + # Use current's workload harness (Dockerfile, sources, .dockerignore) for + # both builds so only the SDK library differs between current and + # baseline. + - name: Stage workload harness + run: | + set -euxo pipefail + rm -rf sdk-baseline/tests/slo_workloads + cp -a sdk-current/tests/slo_workloads sdk-baseline/tests/slo_workloads + cp sdk-current/tests/slo_workloads/.dockerignore sdk-current/.dockerignore + cp sdk-baseline/tests/slo_workloads/.dockerignore sdk-baseline/.dockerignore + + - name: Restore ccache + id: ccache + uses: actions/cache@v4 + with: + path: ccache + key: slo-ccache-${{ matrix.sdk.name }}-${{ matrix.sdk.preset }}-${{ github.run_id }} + restore-keys: | + slo-ccache-${{ matrix.sdk.name }}-${{ matrix.sdk.preset }}- - echo "Final disk space after builds:" - df -h + - name: Inject ccache into BuildKit + uses: reproducible-containers/buildkit-cache-dance@v3.1.2 + with: + cache-map: | + { + "ccache": "/root/.ccache" + } + skip-extraction: false + + - name: Build current workload image + uses: docker/build-push-action@v6 + env: + DOCKER_BUILD_SUMMARY: "false" + DOCKER_BUILD_RECORD_UPLOAD: "false" + with: + context: sdk-current + file: sdk-current/tests/slo_workloads/${{ matrix.sdk.dockerfile }} + platforms: linux/amd64 + tags: ydb-app-current + load: true + build-args: PRESET=${{ matrix.sdk.preset }} + cache-from: type=gha,scope=slo-${{ matrix.sdk.name }}-${{ matrix.sdk.preset }} + cache-to: type=gha,mode=max,scope=slo-${{ matrix.sdk.name }}-${{ matrix.sdk.preset }} + + - name: Build baseline workload image + id: baseline-build + continue-on-error: true + uses: docker/build-push-action@v6 + env: + DOCKER_BUILD_SUMMARY: "false" + DOCKER_BUILD_RECORD_UPLOAD: "false" + with: + context: sdk-baseline + file: sdk-baseline/tests/slo_workloads/${{ matrix.sdk.dockerfile }} + platforms: linux/amd64 + tags: ydb-app-baseline + load: true + build-args: PRESET=${{ matrix.sdk.preset }} + cache-from: type=gha,scope=slo-${{ matrix.sdk.name }}-${{ matrix.sdk.preset }} + + - name: Fall back to current image for baseline + if: steps.baseline-build.outcome == 'failure' + run: | + echo "Baseline build failed; reusing current image as baseline." + docker tag ydb-app-current ydb-app-baseline - - name: Initialize YDB SLO - uses: ydb-platform/ydb-slo-action/init@main + - name: Run SLO Tests + uses: ydb-platform/ydb-slo-action/init@v2 + timeout-minutes: 30 with: - github_issue: ${{ github.event.inputs.github_issue }} + github_issue: ${{ github.event.pull_request.number }} github_token: ${{ secrets.GITHUB_TOKEN }} - workload_name: ${{ matrix.workload }}-${{ matrix.compiler }} + workload_name: ${{ matrix.sdk.name }} + workload_duration: "600" workload_current_ref: ${{ github.head_ref || github.ref_name }} + workload_current_image: ydb-app-current + workload_current_command: ${{ matrix.sdk.command }} --read-rps 1000 --write-rps 100 workload_baseline_ref: ${{ steps.baseline.outputs.ref }} - - - name: Prepare SLO Database - run: | - echo "Preparing SLO database..." - docker run --rm --network ydb_ydb-net \ - --add-host "ydb:172.28.0.11" \ - --add-host "ydb:172.28.0.12" \ - --add-host "ydb:172.28.0.13" \ - --add-host "ydb:172.28.0.99" \ - ydb-app-current --connection-string grpc://ydb:2136/?database=/Root/testdb create --dont-push - - - name: Run SLO Tests (parallel) - timeout-minutes: 15 - run: | - DURATION=${{ inputs.slo_workload_duration_seconds || 600 }} - READ_RPS=${{ inputs.slo_workload_read_max_rps || 1000 }} - WRITE_RPS=${{ inputs.slo_workload_write_max_rps || 100 }} - - ARGS="--connection-string grpc://ydb:2136/?database=/Root/testdb run \ - --metrics-push-url http://prometheus:9090/api/v1/otlp/v1/metrics \ - --time $DURATION \ - --read-rps $READ_RPS \ - --write-rps $WRITE_RPS \ - --read-timeout 100 \ - --write-timeout 100" - - echo "Starting ydb-app-current..." - docker run -d \ - --name ydb-app-current \ - --network ydb_ydb-net \ - --add-host "ydb:172.28.0.11" \ - --add-host "ydb:172.28.0.12" \ - --add-host "ydb:172.28.0.13" \ - --add-host "ydb:172.28.0.99" \ - ydb-app-current $ARGS - - echo "Starting ydb-app-baseline..." - docker run -d \ - --name ydb-app-baseline \ - --network ydb_ydb-net \ - --add-host "ydb:172.28.0.11" \ - --add-host "ydb:172.28.0.12" \ - --add-host "ydb:172.28.0.13" \ - --add-host "ydb:172.28.0.99" \ - ydb-app-baseline $ARGS - - # Show initial logs - echo "" - echo "==================== INITIAL CURRENT LOGS ====================" - docker logs -n 15 ydb-app-current 2>&1 || echo "No current container" - echo "" - echo "==================== INITIAL BASELINE LOGS ====================" - docker logs -n 15 ydb-app-baseline 2>&1 || echo "No baseline container" - echo "" - - # Wait for workloads to complete - echo "Waiting for workloads to complete (${DURATION}s)..." - sleep ${DURATION} - - # Stop containers after workload duration and wait for graceful shutdown - echo "Stopping containers after ${DURATION}s..." - docker stop --timeout=30 ydb-app-current ydb-app-baseline 2>&1 || true - - # Force kill if still running - docker kill ydb-app-current ydb-app-baseline 2>&1 || true - - # Check exit codes - CURRENT_EXIT=$(docker inspect ydb-app-current --format='{{.State.ExitCode}}' 2>/dev/null || echo "1") - BASELINE_EXIT=$(docker inspect ydb-app-baseline --format='{{.State.ExitCode}}' 2>/dev/null || echo "0") - - echo "Current container exit code: $CURRENT_EXIT" - echo "Baseline container exit code: $BASELINE_EXIT" - - # Show final logs - echo "" - echo "==================== FINAL CURRENT LOGS ====================" - docker logs -n 15 ydb-app-current 2>&1 || echo "No current container" - echo "" - echo "==================== FINAL BASELINE LOGS ====================" - docker logs -n 15 ydb-app-baseline 2>&1 || echo "No baseline container" - echo "" - - echo "SUCCESS: Workloads completed successfully" - - - if: always() - name: Store logs - run: | - docker logs ydb-app-current > current.log 2>&1 || echo "No current container" - docker logs ydb-app-baseline > baseline.log 2>&1 || echo "No baseline container" - - - if: always() - uses: actions/upload-artifact@v4 - with: - name: ${{ matrix.workload }}-${{ matrix.compiler }}-slo-cpp-sdk-logs - path: | - ./current.log - ./baseline.log - retention-days: 1 + workload_baseline_image: ydb-app-baseline + workload_baseline_command: ${{ matrix.sdk.command }} --read-rps 1000 --write-rps 100 diff --git a/.github/workflows/slo_report.yml b/.github/workflows/slo_report.yml index 0a7c2e3483..0ccd36abe9 100644 --- a/.github/workflows/slo_report.yml +++ b/.github/workflows/slo_report.yml @@ -7,17 +7,53 @@ on: - completed jobs: - ydb-slo-action-report: + publish-slo-report: + if: github.event.workflow_run.conclusion == 'success' runs-on: ubuntu-latest name: Publish YDB SLO Report permissions: + actions: read checks: write contents: read pull-requests: write - if: github.event.workflow_run.conclusion == 'success' steps: - name: Publish YDB SLO Report - uses: ydb-platform/ydb-slo-action/report@main + uses: ydb-platform/ydb-slo-action/report@v2 with: github_token: ${{ secrets.GITHUB_TOKEN }} github_run_id: ${{ github.event.workflow_run.id }} + + remove-slo-label: + needs: publish-slo-report + if: always() && github.event.workflow_run.event == 'pull_request' + runs-on: ubuntu-latest + name: Remove SLO Label + permissions: + pull-requests: write + steps: + - name: Remove SLO label from PR + uses: actions/github-script@v7 + with: + script: | + const pullRequests = context.payload.workflow_run.pull_requests; + if (pullRequests && pullRequests.length > 0) { + for (const pr of pullRequests) { + try { + await github.rest.issues.removeLabel({ + owner: context.repo.owner, + repo: context.repo.repo, + issue_number: pr.number, + name: 'SLO' + }); + console.log(`Removed SLO label from PR #${pr.number}`); + } catch (error) { + if (error.status === 404) { + console.log(`SLO label not found on PR #${pr.number}, skipping`); + } else { + throw error; + } + } + } + } else { + console.log('No pull requests associated with this workflow run'); + } diff --git a/tests/slo_workloads/CMakeLists.txt b/tests/slo_workloads/CMakeLists.txt index d99d107ac4..69b177fcba 100644 --- a/tests/slo_workloads/CMakeLists.txt +++ b/tests/slo_workloads/CMakeLists.txt @@ -1,2 +1,7 @@ add_subdirectory(key_value) add_subdirectory(utils) + +option(YDB_CPP_SDK_BUILD_SLO_USERVER "Build userver SLO workload (requires userver)" OFF) +if (YDB_CPP_SDK_BUILD_SLO_USERVER) + add_subdirectory(userver) +endif() diff --git a/tests/slo_workloads/Dockerfile b/tests/slo_workloads/Dockerfile index 091ce23066..f87be8f8df 100644 --- a/tests/slo_workloads/Dockerfile +++ b/tests/slo_workloads/Dockerfile @@ -1,21 +1,57 @@ +# syntax=docker/dockerfile:1.7 FROM ubuntu:22.04 ARG PRESET=release-test-clang -ARG REF=unknown -# Install software-properties-common for add-apt-repository -RUN apt-get -y update && apt-get -y install software-properties-common && add-apt-repository ppa:ubuntu-toolchain-r/test +# ccache settings consumed by the configure/build steps below. The cache dir +# is materialised by the BuildKit cache mount on those RUN steps; values +# elsewhere in the image are inert. +ENV CCACHE_DIR=/root/.ccache +ENV CCACHE_MAXSIZE=2G +ENV CCACHE_COMPRESS=true +ENV CCACHE_COMPILERCHECK=content + +# Every RUN that hits the network retries on transient failures so one +# flake doesn't throw away 30 min of previous build work. apt gets five +# Acquire retries + 60 s timeouts; wget gets the equivalent via WGET_OPTS. +RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \ + echo 'Acquire::http::Timeout "60";' >> /etc/apt/apt.conf.d/80-retries && \ + echo 'Acquire::https::Timeout "60";' >> /etc/apt/apt.conf.d/80-retries + +ENV WGET_OPTS="--tries=5 --waitretry=15 --timeout=60 --retry-connrefused --retry-on-http-error=500,502,503,504" + +# Install software-properties-common and add the gcc-13 PPA. +# Acquire::Retries only retries HTTP errors; TCP connect timeouts to +# ppa.launchpadcontent.net still drop through and kill the step. Wrap the +# whole command in a shell retry loop with exponential backoff so a CDN +# blip doesn't throw away 30 minutes of downstream build work. +RUN for i in 1 2 3 4 5; do \ + apt-get -y update && \ + apt-get -y install software-properties-common && \ + add-apt-repository -y ppa:ubuntu-toolchain-r/test && \ + apt-get -y update && \ + break; \ + echo "add-apt-repository attempt $i failed; sleeping $((i * 15))s"; \ + sleep $((i * 15)); \ + done && \ + apt-cache show gcc-13 > /dev/null # fail fast if PPA never came up # Install C++ tools and libraries -RUN apt-get -y update && apt-get -y install \ - git gdb wget ninja-build libidn11-dev ragel yasm libc-ares-dev libre2-dev \ - rapidjson-dev zlib1g-dev libxxhash-dev libzstd-dev libsnappy-dev libgtest-dev libgmock-dev \ - libbz2-dev liblz4-dev libdouble-conversion-dev libssl-dev libstdc++-13-dev gcc-13 g++-13 \ - && apt-get clean && rm -rf /var/lib/apt/lists/* +RUN for i in 1 2 3 4 5; do \ + apt-get -y install \ + git gdb wget ninja-build libidn11-dev ragel yasm libc-ares-dev libre2-dev \ + rapidjson-dev zlib1g-dev libxxhash-dev libzstd-dev libsnappy-dev libgtest-dev libgmock-dev \ + libbz2-dev liblz4-dev libdouble-conversion-dev libssl-dev libstdc++-13-dev gcc-13 g++-13 && \ + break; \ + echo "apt-get install attempt $i failed; sleeping $((i * 15))s"; \ + sleep $((i * 15)); \ + apt-get -y update || true; \ + done && \ + apt-get clean && rm -rf /var/lib/apt/lists/* # Install CMake ENV CMAKE_VERSION=3.27.7 -RUN wget https://github.com/Kitware/CMake/releases/download/v${CMAKE_VERSION}/cmake-${CMAKE_VERSION}-linux-x86_64.sh \ +RUN wget $WGET_OPTS https://github.com/Kitware/CMake/releases/download/v${CMAKE_VERSION}/cmake-${CMAKE_VERSION}-linux-x86_64.sh \ -q -O cmake-install.sh \ && chmod u+x cmake-install.sh \ && ./cmake-install.sh --skip-license --prefix=/usr/local \ @@ -23,7 +59,7 @@ RUN wget https://github.com/Kitware/CMake/releases/download/v${CMAKE_VERSION}/cm # Install LLVM ENV LLVM_VERSION=16 -RUN wget https://apt.llvm.org/llvm.sh && \ +RUN wget $WGET_OPTS https://apt.llvm.org/llvm.sh && \ chmod u+x llvm.sh && \ ./llvm.sh ${LLVM_VERSION} && \ rm llvm.sh @@ -40,7 +76,7 @@ RUN update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-13 10000 && \ # Install abseil-cpp ENV ABSEIL_CPP_VERSION=20230802.0 ENV ABSEIL_CPP_INSTALL_DIR=/root/ydb_deps/absl -RUN wget -O abseil-cpp-${ABSEIL_CPP_VERSION}.tar.gz https://github.com/abseil/abseil-cpp/archive/refs/tags/${ABSEIL_CPP_VERSION}.tar.gz && \ +RUN wget $WGET_OPTS -O abseil-cpp-${ABSEIL_CPP_VERSION}.tar.gz https://github.com/abseil/abseil-cpp/archive/refs/tags/${ABSEIL_CPP_VERSION}.tar.gz && \ tar -xvzf abseil-cpp-${ABSEIL_CPP_VERSION}.tar.gz && cd abseil-cpp-${ABSEIL_CPP_VERSION} && \ mkdir build && cd build && \ cmake -G Ninja -DCMAKE_BUILD_TYPE=Release -DABSL_PROPAGATE_CXX_STD=ON .. && \ @@ -51,7 +87,7 @@ RUN wget -O abseil-cpp-${ABSEIL_CPP_VERSION}.tar.gz https://github.com/abseil/ab # Install protobuf ENV PROTOBUF_VERSION=3.21.12 ENV PROTOBUF_INSTALL_DIR=/root/ydb_deps/protobuf -RUN wget -O protobuf-${PROTOBUF_VERSION}.tar.gz https://github.com/protocolbuffers/protobuf/archive/refs/tags/v${PROTOBUF_VERSION}.tar.gz && \ +RUN wget $WGET_OPTS -O protobuf-${PROTOBUF_VERSION}.tar.gz https://github.com/protocolbuffers/protobuf/archive/refs/tags/v${PROTOBUF_VERSION}.tar.gz && \ tar -xvzf protobuf-${PROTOBUF_VERSION}.tar.gz && cd protobuf-${PROTOBUF_VERSION} && \ mkdir build && cd build && \ cmake -G Ninja -DCMAKE_BUILD_TYPE=Release -Dprotobuf_BUILD_TESTS=OFF -Dprotobuf_INSTALL=ON -Dprotobuf_ABSL_PROVIDER=package .. && \ @@ -62,7 +98,7 @@ RUN wget -O protobuf-${PROTOBUF_VERSION}.tar.gz https://github.com/protocolbuffe # Install grpc ENV GRPC_VERSION=1.54.3 ENV GRPC_INSTALL_DIR=/root/ydb_deps/grpc -RUN wget -O grpc-${GRPC_VERSION}.tar.gz https://github.com/grpc/grpc/archive/refs/tags/v${GRPC_VERSION}.tar.gz && \ +RUN wget $WGET_OPTS -O grpc-${GRPC_VERSION}.tar.gz https://github.com/grpc/grpc/archive/refs/tags/v${GRPC_VERSION}.tar.gz && \ tar -xvzf grpc-${GRPC_VERSION}.tar.gz && cd grpc-${GRPC_VERSION} && \ mkdir build && cd build && \ cmake -G Ninja -DCMAKE_PREFIX_PATH="${ABSEIL_CPP_INSTALL_DIR};${PROTOBUF_INSTALL_DIR}" \ @@ -79,7 +115,7 @@ RUN wget -O grpc-${GRPC_VERSION}.tar.gz https://github.com/grpc/grpc/archive/ref # Install base64 ENV BASE64_VERSION=0.5.2 ENV BASE64_INSTALL_DIR=/root/ydb_deps/base64 -RUN wget -O base64-${BASE64_VERSION}.tar.gz https://github.com/aklomp/base64/archive/refs/tags/v${BASE64_VERSION}.tar.gz && \ +RUN wget $WGET_OPTS -O base64-${BASE64_VERSION}.tar.gz https://github.com/aklomp/base64/archive/refs/tags/v${BASE64_VERSION}.tar.gz && \ tar -xvzf base64-${BASE64_VERSION}.tar.gz && cd base64-${BASE64_VERSION} && \ mkdir build && cd build && \ cmake -G Ninja -DCMAKE_BUILD_TYPE=Release .. && \ @@ -90,7 +126,7 @@ RUN wget -O base64-${BASE64_VERSION}.tar.gz https://github.com/aklomp/base64/arc # Install brotli ENV BROTLI_VERSION=1.1.0 ENV BROTLI_INSTALL_DIR=/root/ydb_deps/brotli -RUN wget -O brotli-${BROTLI_VERSION}.tar.gz https://github.com/google/brotli/archive/refs/tags/v${BROTLI_VERSION}.tar.gz && \ +RUN wget $WGET_OPTS -O brotli-${BROTLI_VERSION}.tar.gz https://github.com/google/brotli/archive/refs/tags/v${BROTLI_VERSION}.tar.gz && \ tar -xvzf brotli-${BROTLI_VERSION}.tar.gz && cd brotli-${BROTLI_VERSION} && \ mkdir build && cd build && \ cmake -G Ninja -DCMAKE_BUILD_TYPE=Release .. && \ @@ -101,7 +137,7 @@ RUN wget -O brotli-${BROTLI_VERSION}.tar.gz https://github.com/google/brotli/arc # Install jwt-cpp ENV JWT_CPP_VERSION=0.7.0 ENV JWT_CPP_INSTALL_DIR=/root/ydb_deps/jwt-cpp -RUN wget -O jwt-cpp-${JWT_CPP_VERSION}.tar.gz https://github.com/Thalhammer/jwt-cpp/archive/refs/tags/v${JWT_CPP_VERSION}.tar.gz && \ +RUN wget $WGET_OPTS -O jwt-cpp-${JWT_CPP_VERSION}.tar.gz https://github.com/Thalhammer/jwt-cpp/archive/refs/tags/v${JWT_CPP_VERSION}.tar.gz && \ tar -xvzf jwt-cpp-${JWT_CPP_VERSION}.tar.gz && cd jwt-cpp-${JWT_CPP_VERSION} && \ mkdir build && cd build && \ cmake -G Ninja -DCMAKE_BUILD_TYPE=Release .. && \ @@ -111,7 +147,7 @@ RUN wget -O jwt-cpp-${JWT_CPP_VERSION}.tar.gz https://github.com/Thalhammer/jwt- # Install ccache 4.8.1 or above ENV CCACHE_VERSION=4.8.1 -RUN wget https://github.com/ccache/ccache/releases/download/v${CCACHE_VERSION}/ccache-${CCACHE_VERSION}-linux-x86_64.tar.xz \ +RUN wget $WGET_OPTS https://github.com/ccache/ccache/releases/download/v${CCACHE_VERSION}/ccache-${CCACHE_VERSION}-linux-x86_64.tar.xz \ && tar -xf ccache-${CCACHE_VERSION}-linux-x86_64.tar.xz \ && cp ccache-${CCACHE_VERSION}-linux-x86_64/ccache /usr/local/bin/ \ && rm -rf ccache-${CCACHE_VERSION}-linux-x86_64 ccache-${CCACHE_VERSION}-linux-x86_64.tar.xz @@ -120,7 +156,13 @@ COPY . /ydb-cpp-sdk WORKDIR /ydb-cpp-sdk RUN rm -rf build -RUN cmake -DSLO_BRANCH_REF=${REF} --preset ${PRESET} -RUN cmake --build --preset default --target slo-key-value +RUN --mount=type=cache,target=/root/.ccache,sharing=locked \ + cmake --preset ${PRESET} \ + -DCMAKE_C_COMPILER_LAUNCHER=ccache \ + -DCMAKE_CXX_COMPILER_LAUNCHER=ccache +RUN --mount=type=cache,target=/root/.ccache,sharing=locked \ + ccache --zero-stats >/dev/null \ + && cmake --build --preset default --target slo-key-value \ + && ccache --show-stats ENTRYPOINT ["./build/tests/slo_workloads/key_value/slo-key-value"] diff --git a/tests/slo_workloads/Dockerfile.userver b/tests/slo_workloads/Dockerfile.userver new file mode 100644 index 0000000000..737c718140 --- /dev/null +++ b/tests/slo_workloads/Dockerfile.userver @@ -0,0 +1,206 @@ +# syntax=docker/dockerfile:1.7 + +# Build userver .deb package (core and other components; YDB is built from source in-repo). +FROM ghcr.io/userver-framework/ubuntu-22.04-userver-base:latest AS userver-deb +ARG USERVER_VERSION=v2.12 +RUN git clone --depth 1 --branch ${USERVER_VERSION} \ + https://github.com/userver-framework/userver.git /userver +WORKDIR /userver +ENV BUILD_OPTIONS="-DUSERVER_FEATURE_YDB=OFF" +RUN ./scripts/build_and_install.sh + +FROM ubuntu:22.04 AS base + +ENV DEBIAN_FRONTEND=noninteractive +ARG PRESET=release-test-clang + +# ccache settings consumed by the configure/build steps below. +ENV CCACHE_DIR=/root/.ccache +ENV CCACHE_MAXSIZE=2G +ENV CCACHE_COMPRESS=true +ENV CCACHE_COMPILERCHECK=content + +# Every RUN that hits the network retries on transient failures so one flake +# doesn't throw away 30 min of previous build work. +RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \ + echo 'Acquire::http::Timeout "60";' >> /etc/apt/apt.conf.d/80-retries && \ + echo 'Acquire::https::Timeout "60";' >> /etc/apt/apt.conf.d/80-retries + +ENV WGET_OPTS="--tries=5 --waitretry=15 --timeout=60 --retry-connrefused --retry-on-http-error=500,502,503,504" + +# Install software-properties-common and add the gcc-13 PPA with shell-level +# retry loop — Acquire::Retries doesn't cover TCP connect timeouts. +RUN for i in 1 2 3 4 5; do \ + apt-get -y update && \ + apt-get -y install software-properties-common && \ + add-apt-repository -y ppa:ubuntu-toolchain-r/test && \ + apt-get -y update && \ + break; \ + echo "add-apt-repository attempt $i failed; sleeping $((i * 15))s"; \ + sleep $((i * 15)); \ + done && \ + apt-cache show gcc-13 > /dev/null + +# Install C++ tools and libraries +RUN for i in 1 2 3 4 5; do \ + apt-get -y install \ + git gdb wget ninja-build libidn11-dev ragel yasm libc-ares-dev libre2-dev \ + rapidjson-dev zlib1g-dev libxxhash-dev libzstd-dev libsnappy-dev libgtest-dev libgmock-dev \ + libbz2-dev liblz4-dev libdouble-conversion-dev libssl-dev libstdc++-13-dev gcc-13 g++-13 && \ + break; \ + echo "apt-get install attempt $i failed; sleeping $((i * 15))s"; \ + sleep $((i * 15)); \ + apt-get -y update || true; \ + done && \ + apt-get clean && rm -rf /var/lib/apt/lists/* + +# Install CMake +ENV CMAKE_VERSION=3.27.7 +RUN wget $WGET_OPTS https://github.com/Kitware/CMake/releases/download/v${CMAKE_VERSION}/cmake-${CMAKE_VERSION}-linux-x86_64.sh \ + -q -O cmake-install.sh \ + && chmod u+x cmake-install.sh \ + && ./cmake-install.sh --skip-license --prefix=/usr/local \ + && rm cmake-install.sh + +# Install LLVM +ENV LLVM_VERSION=16 +RUN wget $WGET_OPTS https://apt.llvm.org/llvm.sh && \ + chmod u+x llvm.sh && \ + ./llvm.sh ${LLVM_VERSION} && \ + rm llvm.sh + +RUN update-alternatives --install /usr/bin/clang clang /usr/bin/clang-${LLVM_VERSION} 10000 && \ + update-alternatives --install /usr/bin/clangd clangd /usr/bin/clangd-${LLVM_VERSION} 10000 && \ + update-alternatives --install /usr/bin/clang++ clang++ /usr/bin/clang++-${LLVM_VERSION} 10000 + +RUN update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-13 10000 && \ + update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-13 10000 + +# Install abseil-cpp +ENV ABSEIL_CPP_VERSION=20230802.0 +ENV ABSEIL_CPP_INSTALL_DIR=/root/ydb_deps/absl +RUN wget $WGET_OPTS -O abseil-cpp-${ABSEIL_CPP_VERSION}.tar.gz https://github.com/abseil/abseil-cpp/archive/refs/tags/${ABSEIL_CPP_VERSION}.tar.gz && \ + tar -xvzf abseil-cpp-${ABSEIL_CPP_VERSION}.tar.gz && cd abseil-cpp-${ABSEIL_CPP_VERSION} && \ + mkdir build && cd build && \ + cmake -G Ninja -DCMAKE_BUILD_TYPE=Release -DABSL_PROPAGATE_CXX_STD=ON .. && \ + cmake --build . --config Release && \ + cmake --install . --config Release --prefix ${ABSEIL_CPP_INSTALL_DIR} && \ + rm -rf abseil-cpp-${ABSEIL_CPP_VERSION}.tar.gz abseil-cpp-${ABSEIL_CPP_VERSION} + +# Install protobuf +ENV PROTOBUF_VERSION=3.21.12 +ENV PROTOBUF_INSTALL_DIR=/root/ydb_deps/protobuf +RUN wget $WGET_OPTS -O protobuf-${PROTOBUF_VERSION}.tar.gz https://github.com/protocolbuffers/protobuf/archive/refs/tags/v${PROTOBUF_VERSION}.tar.gz && \ + tar -xvzf protobuf-${PROTOBUF_VERSION}.tar.gz && cd protobuf-${PROTOBUF_VERSION} && \ + mkdir build && cd build && \ + cmake -G Ninja -DCMAKE_BUILD_TYPE=Release -Dprotobuf_BUILD_TESTS=OFF -Dprotobuf_INSTALL=ON -Dprotobuf_ABSL_PROVIDER=package .. && \ + cmake --build . --config Release && \ + cmake --install . --config Release --prefix ${PROTOBUF_INSTALL_DIR} && \ + rm -rf protobuf-${PROTOBUF_VERSION}.tar.gz protobuf-${PROTOBUF_VERSION} + +# Install grpc +ENV GRPC_VERSION=1.54.3 +ENV GRPC_INSTALL_DIR=/root/ydb_deps/grpc +RUN wget $WGET_OPTS -O grpc-${GRPC_VERSION}.tar.gz https://github.com/grpc/grpc/archive/refs/tags/v${GRPC_VERSION}.tar.gz && \ + tar -xvzf grpc-${GRPC_VERSION}.tar.gz && cd grpc-${GRPC_VERSION} && \ + mkdir build && cd build && \ + cmake -G Ninja -DCMAKE_PREFIX_PATH="${ABSEIL_CPP_INSTALL_DIR};${PROTOBUF_INSTALL_DIR}" \ + -DCMAKE_BUILD_TYPE=Release -DCMAKE_CXX_STANDARD=17 \ + -DgRPC_INSTALL=ON -DgRPC_BUILD_TESTS=OFF -DgRPC_BUILD_CSHARP_EXT=OFF \ + -DgRPC_ZLIB_PROVIDER=package -DgRPC_CARES_PROVIDER=package -DgRPC_RE2_PROVIDER=package \ + -DgRPC_SSL_PROVIDER=package -DgRPC_PROTOBUF_PROVIDER=package -DgRPC_ABSL_PROVIDER=package \ + -DgRPC_BUILD_GRPC_NODE_PLUGIN=OFF -DgRPC_BUILD_GRPC_OBJECTIVE_C_PLUGIN=OFF -DgRPC_BUILD_GRPC_PHP_PLUGIN=OFF \ + -DgRPC_BUILD_GRPC_RUBY_PLUGIN=OFF -DgRPC_BUILD_GRPC_CSHARP_PLUGIN=OFF -DgRPC_BUILD_GRPC_PYTHON_PLUGIN=OFF .. && \ + cmake --build . --config Release && \ + cmake --install . --config Release --prefix ${GRPC_INSTALL_DIR} && \ + rm -rf grpc-${GRPC_VERSION}.tar.gz grpc-${GRPC_VERSION} + +# Install base64 +ENV BASE64_VERSION=0.5.2 +ENV BASE64_INSTALL_DIR=/root/ydb_deps/base64 +RUN wget $WGET_OPTS -O base64-${BASE64_VERSION}.tar.gz https://github.com/aklomp/base64/archive/refs/tags/v${BASE64_VERSION}.tar.gz && \ + tar -xvzf base64-${BASE64_VERSION}.tar.gz && cd base64-${BASE64_VERSION} && \ + mkdir build && cd build && \ + cmake -G Ninja -DCMAKE_BUILD_TYPE=Release .. && \ + cmake --build . --config Release && \ + cmake --install . --config Release --prefix ${BASE64_INSTALL_DIR} && \ + rm -rf base64-${BASE64_VERSION}.tar.gz base64-${BASE64_VERSION} + +# Install brotli +ENV BROTLI_VERSION=1.1.0 +ENV BROTLI_INSTALL_DIR=/root/ydb_deps/brotli +RUN wget $WGET_OPTS -O brotli-${BROTLI_VERSION}.tar.gz https://github.com/google/brotli/archive/refs/tags/v${BROTLI_VERSION}.tar.gz && \ + tar -xvzf brotli-${BROTLI_VERSION}.tar.gz && cd brotli-${BROTLI_VERSION} && \ + mkdir build && cd build && \ + cmake -G Ninja -DCMAKE_BUILD_TYPE=Release .. && \ + cmake --build . --config Release && \ + cmake --install . --config Release --prefix ${BROTLI_INSTALL_DIR} && \ + rm -rf brotli-${BROTLI_VERSION}.tar.gz brotli-${BROTLI_VERSION} + +# Install jwt-cpp +ENV JWT_CPP_VERSION=0.7.0 +ENV JWT_CPP_INSTALL_DIR=/root/ydb_deps/jwt-cpp +RUN wget $WGET_OPTS -O jwt-cpp-${JWT_CPP_VERSION}.tar.gz https://github.com/Thalhammer/jwt-cpp/archive/refs/tags/v${JWT_CPP_VERSION}.tar.gz && \ + tar -xvzf jwt-cpp-${JWT_CPP_VERSION}.tar.gz && cd jwt-cpp-${JWT_CPP_VERSION} && \ + mkdir build && cd build && \ + cmake -G Ninja -DCMAKE_BUILD_TYPE=Release .. && \ + cmake --build . --config Release && \ + cmake --install . --config Release --prefix ${JWT_CPP_INSTALL_DIR} && \ + rm -rf jwt-cpp-${JWT_CPP_VERSION}.tar.gz jwt-cpp-${JWT_CPP_VERSION} + +# Install ccache +ENV CCACHE_VERSION=4.8.1 +RUN wget $WGET_OPTS https://github.com/ccache/ccache/releases/download/v${CCACHE_VERSION}/ccache-${CCACHE_VERSION}-linux-x86_64.tar.xz \ + && tar -xf ccache-${CCACHE_VERSION}-linux-x86_64.tar.xz \ + && cp ccache-${CCACHE_VERSION}-linux-x86_64/ccache /usr/local/bin/ \ + && rm -rf ccache-${CCACHE_VERSION}-linux-x86_64 ccache-${CCACHE_VERSION}-linux-x86_64.tar.xz + +FROM base + +ARG PRESET=release-test-clang +ARG USERVER_VERSION=v2.12 + +COPY --from=userver-deb /userver/libuserver-all-dev*.deb /tmp/ +# libuserver-all-dev pulls build-essential (gcc-11), which would override +# gcc-13 alternatives. Reassert gcc-13 after install. +RUN for i in 1 2 3 4 5; do \ + apt-get -y update && \ + apt-get -y install --no-install-recommends /tmp/libuserver-all-dev*.deb && \ + break; \ + echo "userver .deb install attempt $i failed; sleeping $((i * 15))s"; \ + sleep $((i * 15)); \ + done \ + && rm -rf /tmp/libuserver-all-dev*.deb /var/lib/apt/lists/* \ + && update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-13 10000 \ + && update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-13 10000 \ + && update-alternatives --set gcc /usr/bin/gcc-13 \ + && update-alternatives --set g++ /usr/bin/g++-13 + +# Userver source checkout for chaotic codegen tools used during configure. +RUN for i in 1 2 3 4 5; do \ + git clone --depth 1 --branch ${USERVER_VERSION} \ + https://github.com/userver-framework/userver.git /userver-src && \ + break; \ + echo "userver clone attempt $i failed; sleeping $((i * 15))s"; \ + sleep $((i * 15)); \ + rm -rf /userver-src; \ + done \ + && chmod +x /userver-src/chaotic/bin-dynamic-configs/chaotic-gen-dynamic-configs \ + /userver-src/chaotic/bin/chaotic-gen 2>/dev/null || true +ENV USERVER_SOURCE_DIR=/userver-src + +COPY . /ydb-cpp-sdk +WORKDIR /ydb-cpp-sdk +RUN rm -rf build + +RUN --mount=type=cache,target=/root/.ccache,sharing=locked \ + cmake --preset ${PRESET} \ + -DCMAKE_C_COMPILER_LAUNCHER=ccache \ + -DCMAKE_CXX_COMPILER_LAUNCHER=ccache \ + -DYDB_CPP_SDK_BUILD_SLO_USERVER=ON +RUN --mount=type=cache,target=/root/.ccache,sharing=locked \ + ccache --zero-stats >/dev/null \ + && cmake --build --preset default --target slo-userver-key-value \ + && ccache --show-stats + +ENTRYPOINT ["./build/tests/slo_workloads/userver/key_value/slo-userver-key-value"] diff --git a/tests/slo_workloads/key_value/create.cpp b/tests/slo_workloads/key_value/create.cpp index a79db76a12..a105caba7f 100644 --- a/tests/slo_workloads/key_value/create.cpp +++ b/tests/slo_workloads/key_value/create.cpp @@ -8,7 +8,7 @@ using namespace NYdb::NTable; namespace { void CreateTable(TTableClient& client, const std::string& prefix) { - RetryBackoff(client, 5, [prefix](TSession session) { + NYdb::NStatusHelpers::ThrowOnError(client.RetryOperationSync([prefix](TSession session) { auto desc = TTableBuilder() .AddNullableColumn("object_id_key", EPrimitiveType::Uint32) .AddNullableColumn("object_id", EPrimitiveType::Uint32) @@ -27,7 +27,7 @@ namespace { , std::move(desc) , std::move(tableSettings) ).ExtractValueSync(); - }); + })); } } //namespace diff --git a/tests/slo_workloads/key_value/drop.cpp b/tests/slo_workloads/key_value/drop.cpp index ae749bc903..e22c92c894 100644 --- a/tests/slo_workloads/key_value/drop.cpp +++ b/tests/slo_workloads/key_value/drop.cpp @@ -4,15 +4,20 @@ using namespace NLastGetopt; using namespace NYdb; using namespace NYdb::NTable; -static void DropTable(TTableClient& client, const std::string& path) { - NYdb::NStatusHelpers::ThrowOnError(client.RetryOperationSync([path](TSession session) { - return session.DropTable(path).ExtractValueSync(); - })); -} - int DropTable(TDatabaseOptions& dbOptions) { TTableClient client(dbOptions.Driver); - DropTable(client, JoinPath(dbOptions.Prefix, TableName)); + const std::string path = JoinPath(dbOptions.Prefix, TableName); + TStatus status = client.RetryOperationSync([path](TSession session) { + TStatus dropStatus = session.DropTable(path).ExtractValueSync(); + if (dropStatus.GetStatus() == EStatus::NOT_FOUND) { + return TStatus(EStatus::SUCCESS, NYdb::NIssue::TIssues()); + } + return dropStatus; + }); + if (!status.IsSuccess()) { + Cerr << "DropTable failed: " << status << Endl; + return EXIT_FAILURE; + } Cout << "Table dropped." << Endl; return EXIT_SUCCESS; } diff --git a/tests/slo_workloads/key_value/key_value.cpp b/tests/slo_workloads/key_value/key_value.cpp index 1cb40f3620..23bd3ff184 100644 --- a/tests/slo_workloads/key_value/key_value.cpp +++ b/tests/slo_workloads/key_value/key_value.cpp @@ -50,6 +50,7 @@ int DoCreate(TDatabaseOptions& dbOptions, int argc, char** argv) { jobs->Start(); jobs->Wait(); jobs->ShowProgress(); + jobs.reset(); return EXIT_SUCCESS; } @@ -95,6 +96,7 @@ int DoRun(TDatabaseOptions& dbOptions, int argc, char** argv) { Cout << "All jobs finished: " << TInstant::Now().ToRfc822StringLocal() << Endl; jobs->ShowProgress(); + jobs.reset(); return EXIT_SUCCESS; } diff --git a/tests/slo_workloads/userver/CMakeLists.txt b/tests/slo_workloads/userver/CMakeLists.txt new file mode 100644 index 0000000000..ec86159d8f --- /dev/null +++ b/tests/slo_workloads/userver/CMakeLists.txt @@ -0,0 +1,6 @@ +find_package(userver REQUIRED COMPONENTS core) + +include(${CMAKE_CURRENT_LIST_DIR}/cmake/UserverYdbFromTree.cmake) +userver_ydb_from_tree() + +add_subdirectory(key_value) diff --git a/tests/slo_workloads/userver/cmake/SetupYdbCppSDK.cmake b/tests/slo_workloads/userver/cmake/SetupYdbCppSDK.cmake new file mode 100644 index 0000000000..0f647dbfce --- /dev/null +++ b/tests/slo_workloads/userver/cmake/SetupYdbCppSDK.cmake @@ -0,0 +1,8 @@ +# Override userver's SetupYdbCppSDK.cmake: use in-tree YDB-CPP-SDK targets +# from this repo instead of downloading ydb-cpp-sdk via CPM. + +if(NOT TARGET YDB-CPP-SDK::Table) + message(FATAL_ERROR "In-tree YDB-CPP-SDK must be configured before building userver-ydb") +endif() + +set(ydb-cpp-sdk_INCLUDE_DIRS "") diff --git a/tests/slo_workloads/userver/cmake/UserverYdbFromTree.cmake b/tests/slo_workloads/userver/cmake/UserverYdbFromTree.cmake new file mode 100644 index 0000000000..974292234e --- /dev/null +++ b/tests/slo_workloads/userver/cmake/UserverYdbFromTree.cmake @@ -0,0 +1,55 @@ +function(userver_ydb_from_tree) + if(TARGET userver::ydb) + return() + endif() + + if(NOT TARGET userver-core AND TARGET userver::core) + add_library(userver-core ALIAS userver::core) + endif() + + if(NOT USERVER_ROOT_DIR) + if(DEFINED ENV{USERVER_SOURCE_DIR}) + set(USERVER_ROOT_DIR "$ENV{USERVER_SOURCE_DIR}" CACHE PATH "" FORCE) + else() + message(FATAL_ERROR "USERVER_SOURCE_DIR must point to a userver source tree (matching the installed .deb version)") + endif() + endif() + + if(NOT EXISTS "${USERVER_ROOT_DIR}/ydb/CMakeLists.txt") + message(FATAL_ERROR "userver ydb sources not found at ${USERVER_ROOT_DIR}/ydb") + endif() + + set(USERVER_INSTALL OFF CACHE BOOL "" FORCE) + set(USERVER_BUILD_TESTS OFF CACHE BOOL "" FORCE) + + # The .deb is built with USERVER_FEATURE_YDB=OFF and omits scripts/chaotic and + # chaotic-gen-dynamic-configs; use the matching source tree for codegen instead. + set(USERVER_CMAKE_DIR "${USERVER_ROOT_DIR}/cmake" CACHE PATH "" FORCE) + set(USERVER_CHAOTIC_SCRIPTS_PATH "${USERVER_ROOT_DIR}/scripts/chaotic" CACHE PATH "" FORCE) + + list(PREPEND CMAKE_MODULE_PATH + "${CMAKE_CURRENT_LIST_DIR}/cmake" + "${USERVER_ROOT_DIR}/cmake" + ) + list(PREPEND CMAKE_PROGRAM_PATH + "${USERVER_ROOT_DIR}/chaotic/bin-dynamic-configs" + "${USERVER_ROOT_DIR}/chaotic/bin" + ) + + include(${USERVER_ROOT_DIR}/cmake/PrepareInstall.cmake) + include(${USERVER_ROOT_DIR}/cmake/UserverCodegenTarget.cmake) + include(${USERVER_ROOT_DIR}/cmake/UserverModule.cmake) + include(${USERVER_ROOT_DIR}/cmake/ChaoticGen.cmake) + + set(_userver_chaotic_dynamic_configs_bin + "${USERVER_ROOT_DIR}/chaotic/bin-dynamic-configs/chaotic-gen-dynamic-configs") + if(EXISTS "${_userver_chaotic_dynamic_configs_bin}") + set_property(GLOBAL PROPERTY userver_chaotic_dynamic_configs_bin "${_userver_chaotic_dynamic_configs_bin}") + endif() + + add_subdirectory(${USERVER_ROOT_DIR}/ydb ${CMAKE_CURRENT_BINARY_DIR}/userver-ydb-build) + + if(TARGET userver-ydb AND NOT TARGET userver::ydb) + add_library(userver::ydb ALIAS userver-ydb) + endif() +endfunction() diff --git a/tests/slo_workloads/userver/key_value/CMakeLists.txt b/tests/slo_workloads/userver/key_value/CMakeLists.txt new file mode 100644 index 0000000000..afda760ba2 --- /dev/null +++ b/tests/slo_workloads/userver/key_value/CMakeLists.txt @@ -0,0 +1,28 @@ +add_executable(slo-userver-key-value) + +target_sources(slo-userver-key-value PRIVATE + main.cpp + userver_utils.cpp + userver_table_client.cpp + key_value.cpp + create.cpp + run.cpp + # CreateTable / DropTable: reuse native sources verbatim. + ${CMAKE_SOURCE_DIR}/tests/slo_workloads/key_value/create.cpp + ${CMAKE_SOURCE_DIR}/tests/slo_workloads/key_value/drop.cpp +) + +target_link_libraries(slo-userver-key-value PRIVATE + slo-utils-base # Shared utils: ParseOptions*, GetTableStats, YdbStatusToString, etc. + userver::ydb # ydb::TableClient (workload queries) + userver::core # engine::RunStandalone, Semaphore, AsyncNoSpan +) + +# userver::ydb impl headers (ydb/impl/driver.hpp) live under the source tree, not in the .deb. +if(DEFINED USERVER_ROOT_DIR) + target_include_directories(slo-userver-key-value PRIVATE "${USERVER_ROOT_DIR}/ydb/src") +elseif(DEFINED ENV{USERVER_SOURCE_DIR}) + target_include_directories(slo-userver-key-value PRIVATE "$ENV{USERVER_SOURCE_DIR}/ydb/src") +endif() + +vcs_info(slo-userver-key-value) diff --git a/tests/slo_workloads/userver/key_value/create.cpp b/tests/slo_workloads/userver/key_value/create.cpp new file mode 100644 index 0000000000..87d5a118f5 --- /dev/null +++ b/tests/slo_workloads/userver/key_value/create.cpp @@ -0,0 +1,145 @@ +#include "key_value.h" +#include "userver_table_client.h" + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +using namespace NYdb; +using namespace NYdb::NTable; + +namespace uydb = userver::ydb; + +namespace { + +std::atomic ShouldStop{false}; + +} // namespace + +int DoCreate(TDatabaseOptions& dbOptions, int argc, char** argv) { + TCreateOptions createOptions{ {dbOptions} }; + if (!ParseOptionsCreate(argc, argv, createOptions)) { + return EXIT_FAILURE; + } + + createOptions.CommonOptions.MaxInfly = createOptions.CommonOptions.MaxInputThreads; + + int result = CreateTable(dbOptions); + if (result) { + return result; + } + + std::uint32_t maxId = GetTableStats(dbOptions, TableName).MaxId; + + createOptions.CommonOptions.ReactionTime = TDuration::Seconds(20); + + Cout << TInstant::Now().ToRfc822StringLocal() << " Uploading initial content... do 'kill -USR1 " << GetPID() + << "' for progress details or Ctrl/Cmd+C to interrupt" << Endl; + + ShouldStop.store(false); + signal(SIGINT, [](int) { + Cerr << TInstant::Now().ToRfc822StringLocal() << " SIGINT received. Stopping..." << Endl; + ShouldStop.store(true); + }); + + const auto& opts = createOptions.CommonOptions; + const std::string& prefix = opts.DatabaseOptions.Prefix; + + auto& ydbClient = userver_slo::GetTableClient(); + + userver::engine::AsyncNoSpan([&]() { + TStat stats( + opts.DontPushMetrics ? std::nullopt : std::make_optional(opts.MetricsPushUrl), + "generate" + ); + stats.Start(); + + TPackGenerator packGenerator( + opts, + createOptions.PackSize, + &BuildValueFromRecord, + createOptions.Count, + maxId + ); + + userver::engine::Semaphore semaphore{ + static_cast(opts.MaxInfly)}; + + std::atomic succeeded{0}; + std::atomic failed{0}; + std::atomic total{0}; + + const TString query = Sprintf(R"( +--!syntax_v1 +PRAGMA TablePathPrefix("%s"); + +DECLARE $items AS + List>; + +UPSERT INTO `%s` SELECT * FROM AS_TABLE($items); + +)", prefix.c_str(), TableName.c_str()); + + std::vector> tasks; + + std::vector pack; + while (!ShouldStop.load() && packGenerator.GetNextPack(pack)) { + semaphore.lock_shared(); + total.fetch_add(1); + + auto params = userver_slo::PackValuesToPreparedArgs(pack); + + tasks.push_back(userver::engine::AsyncNoSpan( + [&ydbClient, &semaphore, &stats, &succeeded, &failed, + &opts, query, params = std::move(params)]() mutable { + auto stat = stats.StartRequest(); + try { + uydb::OperationSettings settings; + settings.client_timeout_ms = std::chrono::milliseconds(opts.ReactionTime.MilliSeconds()); + + ydbClient.ExecuteDataQuery(settings, uydb::Query{query}, std::move(params)); + + stats.FinishRequest(stat, TStatus(EStatus::SUCCESS, NYdb::NIssue::TIssues())); + succeeded.fetch_add(1); + } catch (const uydb::YdbResponseError& e) { + stats.FinishRequest(stat, TStatus(e.GetStatus().GetStatus(), NYdb::NIssue::TIssues())); + failed.fetch_add(1); + } catch (const std::exception&) { + stats.FinishRequest(stat, TStatus(EStatus::CLIENT_INTERNAL_ERROR, NYdb::NIssue::TIssues())); + failed.fetch_add(1); + } + semaphore.unlock_shared(); + } + )); + } + + for (auto& task : tasks) { + task.Wait(); + } + + stats.Finish(); + + TStringBuilder report; + report << Endl << "======- GenerateInitialContentJob report -======" << Endl; + report << "Total: " << total.load() << ", Succeeded: " << succeeded.load() + << ", Failed: " << failed.load() << Endl; + stats.PrintStatistics(report); + report << "========================================" << Endl; + Cout << report; + }).Wait(); + + return EXIT_SUCCESS; +} diff --git a/tests/slo_workloads/userver/key_value/key_value.cpp b/tests/slo_workloads/userver/key_value/key_value.cpp new file mode 100644 index 0000000000..08fee6bdde --- /dev/null +++ b/tests/slo_workloads/userver/key_value/key_value.cpp @@ -0,0 +1,16 @@ +#include "key_value.h" + +using namespace NYdb; + +const std::string TableName = "key_value"; + +NYdb::TValue BuildValueFromRecord(const TKeyValueRecordData& recordData) { + NYdb::TValueBuilder value; + value.BeginStruct(); + value.AddMember("object_id_key").Uint32(GetHash(recordData.ObjectId)); + value.AddMember("object_id").Uint32(recordData.ObjectId); + value.AddMember("timestamp").Uint64(recordData.Timestamp); + value.AddMember("payload").Utf8(recordData.Payload); + value.EndStruct(); + return value.Build(); +} diff --git a/tests/slo_workloads/userver/key_value/key_value.h b/tests/slo_workloads/userver/key_value/key_value.h new file mode 100644 index 0000000000..462e27dea3 --- /dev/null +++ b/tests/slo_workloads/userver/key_value/key_value.h @@ -0,0 +1,24 @@ +#pragma once + +#include "userver_utils.h" + +#include +#include + +#include + +#include + +extern const std::string TableName; + +NYdb::TValue BuildValueFromRecord(const TKeyValueRecordData& recordData); + +int CreateTable(TDatabaseOptions& dbOptions); +int DropTable(TDatabaseOptions& dbOptions); + +// Creates a table and fills it with initial content (userver coroutine-based) +int DoCreate(TDatabaseOptions& dbOptions, int argc, char** argv); +// Runs read/write workload (userver coroutine-based) +int DoRun(TDatabaseOptions& dbOptions, int argc, char** argv); +// Drops the table +int DoCleanup(TDatabaseOptions& dbOptions, int argc); diff --git a/tests/slo_workloads/userver/key_value/main.cpp b/tests/slo_workloads/userver/key_value/main.cpp new file mode 100644 index 0000000000..1b9ba8a2de --- /dev/null +++ b/tests/slo_workloads/userver/key_value/main.cpp @@ -0,0 +1,6 @@ +#include "userver_utils.h" +#include "key_value.h" + +int main(int argc, char** argv) { + return DoMain(argc, argv, DoCreate, DoRun, DoCleanup); +} diff --git a/tests/slo_workloads/userver/key_value/run.cpp b/tests/slo_workloads/userver/key_value/run.cpp new file mode 100644 index 0000000000..2de02015ca --- /dev/null +++ b/tests/slo_workloads/userver/key_value/run.cpp @@ -0,0 +1,313 @@ +#include "key_value.h" +#include "userver_table_client.h" + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +using namespace NYdb; +using namespace NYdb::NTable; + +namespace uydb = userver::ydb; + +namespace { + +std::atomic ShouldStop{false}; + +struct TProgressReporter { + TStat* ReadStats = nullptr; + TStat* WriteStats = nullptr; + std::atomic* ReadSucceeded = nullptr; + std::atomic* ReadFailed = nullptr; + std::atomic* WriteSucceeded = nullptr; + std::atomic* WriteFailed = nullptr; + std::atomic* WriteGenerated = nullptr; + + void ShowProgress() const { + TStringBuilder report; + if (ReadStats) { + report << Endl << "======- ReadJob report (Thread A) -======" << Endl; + report << "Succeeded: " << ReadSucceeded->load() + << ", Failed: " << ReadFailed->load() << Endl; + ReadStats->PrintStatistics(report); + report << "========================================" << Endl; + } + if (WriteStats) { + report << Endl << "=====- WriteJob report (Thread B) -=====" << Endl; + report << "Generated: " << WriteGenerated->load() + << ", Succeeded: " << WriteSucceeded->load() + << ", Failed: " << WriteFailed->load() << Endl; + WriteStats->PrintStatistics(report); + report << "==========================================" << Endl; + } + Cout << report; + } +}; + +TProgressReporter* GlobalReporter = nullptr; + +void ExecuteQuery( + uydb::TableClient& client, + const uydb::OperationSettings& settings, + const uydb::Query& query, + uydb::PreparedArgsBuilder params, + TStat& stats, + std::atomic& succeeded, + std::atomic& failed) +{ + auto stat = stats.StartRequest(); + try { + client.ExecuteDataQuery(settings, query, std::move(params)); + stats.FinishRequest(stat, TStatus(EStatus::SUCCESS, NYdb::NIssue::TIssues())); + succeeded.fetch_add(1); + } catch (const uydb::YdbResponseError& e) { + stats.FinishRequest(stat, TStatus(e.GetStatus().GetStatus(), NYdb::NIssue::TIssues())); + failed.fetch_add(1); + } catch (const std::exception&) { + stats.FinishRequest(stat, TStatus(EStatus::CLIENT_INTERNAL_ERROR, NYdb::NIssue::TIssues())); + failed.fetch_add(1); + } +} + +} // namespace + +int DoRun(TDatabaseOptions& dbOptions, int argc, char** argv) { + TRunOptions runOptions{ {dbOptions} }; + if (!ParseOptionsRun(argc, argv, runOptions)) { + return EXIT_FAILURE; + } + + Cout << TInstant::Now().ToRfc822StringLocal() << " Creating and initializing jobs..." << Endl; + + std::uint32_t maxId = GetTableStats(dbOptions, TableName).MaxId; + + const std::string& prefix = dbOptions.Prefix; + + auto& ydbClient = userver_slo::GetTableClient(); + + std::unique_ptr readStats; + std::unique_ptr writeStats; + + std::atomic readSucceeded{0}; + std::atomic readFailed{0}; + std::atomic writeSucceeded{0}; + std::atomic writeFailed{0}; + std::atomic writeGenerated{0}; + + if (!runOptions.DontRunA) { + readStats = std::make_unique( + runOptions.CommonOptions.DontPushMetrics ? std::nullopt : std::make_optional(runOptions.CommonOptions.MetricsPushUrl), + "read" + ); + } + if (!runOptions.DontRunB) { + writeStats = std::make_unique( + runOptions.CommonOptions.DontPushMetrics ? std::nullopt : std::make_optional(runOptions.CommonOptions.MetricsPushUrl), + "write" + ); + } + + TProgressReporter reporter; + reporter.ReadStats = readStats.get(); + reporter.WriteStats = writeStats.get(); + reporter.ReadSucceeded = &readSucceeded; + reporter.ReadFailed = &readFailed; + reporter.WriteSucceeded = &writeSucceeded; + reporter.WriteFailed = &writeFailed; + reporter.WriteGenerated = &writeGenerated; + GlobalReporter = &reporter; + + ShouldStop.store(false); + signal(SIGUSR1, [](int) { + Cout << TInstant::Now().ToRfc822StringLocal() << " SIGUSR1 handle" << Endl; + if (GlobalReporter) { + GlobalReporter->ShowProgress(); + } + }); + signal(SIGINT, [](int) { + Cerr << TInstant::Now().ToRfc822StringLocal() << " SIGINT received. Stopping..." << Endl; + ShouldStop.store(true); + }); + + TInstant start = TInstant::Now(); + TInstant deadline = start + TDuration::Seconds(runOptions.CommonOptions.SecondsToRun); + + Cout << "Jobs launched. Do 'kill -USR1 " << GetPID() + << "' for progress details or 'kill -INT " << GetPID() << "' (Ctrl/Cmd + C) to interrupt" << Endl + << " Start time: " << start.ToRfc822StringLocal() << Endl + << "Estimated finish time: " << deadline.ToRfc822StringLocal() << Endl; + + std::vector> jobTasks; + + if (!runOptions.DontRunA) { + auto readRps = runOptions.Read_rps; + auto readTimeout = runOptions.ReadTimeout; + auto maxInfly = runOptions.CommonOptions.MaxInfly; + auto objectIdRange = static_cast(maxId * 1.25); + + const TString readQuery = Sprintf(R"( +--!syntax_v1 +PRAGMA TablePathPrefix("%s"); + +DECLARE $object_id_key AS Uint32; +DECLARE $object_id AS Uint32; + +SELECT * FROM `%s` WHERE `object_id_key` = $object_id_key AND `object_id` = $object_id; + +)", prefix.c_str(), TableName.c_str()); + + jobTasks.push_back(userver::engine::AsyncNoSpan( + [&ydbClient, &readStats, &readSucceeded, &readFailed, + readRps, readTimeout, maxInfly, + objectIdRange, readQuery, deadline]() { + + readStats->Start(); + TRpsProvider rpsProvider(readRps); + rpsProvider.Reset(); + + userver::engine::Semaphore semaphore{ + static_cast(maxInfly)}; + + std::vector> tasks; + + while (!ShouldStop.load() && TInstant::Now() < deadline) { + std::uint32_t idToSelect = RandomNumber() % objectIdRange; + const std::uint32_t objectIdKey = GetHash(idToSelect); + + rpsProvider.Use(); + + semaphore.lock_shared(); + + tasks.push_back(userver::engine::AsyncNoSpan( + [&ydbClient, &semaphore, &readStats, &readSucceeded, &readFailed, + readTimeout, readQuery, objectIdKey, idToSelect]() { + + uydb::OperationSettings settings; + settings.client_timeout_ms = std::chrono::milliseconds(readTimeout.MilliSeconds()); + + auto builder = ydbClient.GetBuilder(); + builder.Add("$object_id_key", objectIdKey); + builder.Add("$object_id", idToSelect); + + ExecuteQuery( + ydbClient, settings, uydb::Query{readQuery}, + std::move(builder), *readStats, + readSucceeded, readFailed); + + semaphore.unlock_shared(); + } + )); + } + + for (auto& task : tasks) { + task.Wait(); + } + + readStats->Finish(); + } + )); + } + + if (!runOptions.DontRunB) { + auto writeRps = runOptions.Write_rps; + auto writeTimeout = runOptions.WriteTimeout; + auto maxInfly = runOptions.CommonOptions.MaxInfly; + + const TCommonOptions writeCommon = runOptions.CommonOptions; + + const TString writeQuery = Sprintf(R"( +--!syntax_v1 +PRAGMA TablePathPrefix("%s"); + +DECLARE $items AS + List>; + +UPSERT INTO `%s` SELECT * FROM AS_TABLE($items); + +)", prefix.c_str(), TableName.c_str()); + + jobTasks.push_back(userver::engine::AsyncNoSpan( + [&ydbClient, &writeStats, &writeSucceeded, &writeFailed, &writeGenerated, + writeRps, writeTimeout, maxInfly, + writeCommon, maxId, writeQuery, deadline]() { + + writeStats->Start(); + TRpsProvider rpsProvider(writeRps); + rpsProvider.Reset(); + + TKeyValueGenerator generator(writeCommon, maxId); + + userver::engine::Semaphore semaphore{ + static_cast(maxInfly)}; + + std::vector> tasks; + + while (!ShouldStop.load() && TInstant::Now() < deadline) { + const auto value = BuildValueFromRecord(generator.Get()); + writeGenerated.fetch_add(1); + + rpsProvider.Use(); + + semaphore.lock_shared(); + + tasks.push_back(userver::engine::AsyncNoSpan( + [&ydbClient, &semaphore, &writeStats, &writeSucceeded, &writeFailed, + writeTimeout, writeQuery, value]() { + + uydb::OperationSettings settings; + settings.client_timeout_ms = std::chrono::milliseconds(writeTimeout.MilliSeconds()); + + auto params = userver_slo::PackValuesToPreparedArgs({value}); + + ExecuteQuery( + ydbClient, settings, uydb::Query{writeQuery}, + std::move(params), *writeStats, + writeSucceeded, writeFailed); + + semaphore.unlock_shared(); + } + )); + } + + for (auto& task : tasks) { + task.Wait(); + } + + writeStats->Finish(); + } + )); + } + + for (auto& task : jobTasks) { + task.Wait(); + } + + Cout << "All jobs finished: " << TInstant::Now().ToRfc822StringLocal() << Endl; + + reporter.ShowProgress(); + + GlobalReporter = nullptr; + + return EXIT_SUCCESS; +} + +int DoCleanup(TDatabaseOptions& dbOptions, int argc) { + if (argc > 1) { + Cerr << "Unexpected arguments after cleanup" << Endl; + return EXIT_FAILURE; + } + return DropTable(dbOptions); +} diff --git a/tests/slo_workloads/userver/key_value/userver_table_client.cpp b/tests/slo_workloads/userver/key_value/userver_table_client.cpp new file mode 100644 index 0000000000..728c6f074f --- /dev/null +++ b/tests/slo_workloads/userver/key_value/userver_table_client.cpp @@ -0,0 +1,94 @@ +#include "userver_table_client.h" + +#include +#include + +#include +#include + +namespace userver_slo { +namespace { + +struct Clients final { + std::shared_ptr driver; + std::shared_ptr table_client; +}; + +Clients& GetClients() { + static Clients clients; + return clients; +} + +} // namespace + +void InitTableClient( + const std::string& endpoint, + const std::string& database, + const std::shared_ptr& credentials_provider_factory, + const std::string& oauth_token, + const bool prefer_local_dc +) { + auto& clients = GetClients(); + if (clients.table_client) { + return; + } + + userver::ydb::impl::DriverSettings driver_settings; + driver_settings.endpoint = endpoint; + driver_settings.database = database; + driver_settings.prefer_local_dc = prefer_local_dc; + driver_settings.credentials_provider_factory = credentials_provider_factory; + if (!oauth_token.empty()) { + driver_settings.oauth_token = oauth_token; + } + + const userver::ydb::impl::TableSettings table_settings; + const userver::ydb::OperationSettings operation_settings{ + 3, + std::chrono::minutes{3}, + std::chrono::minutes{3}, + std::chrono::minutes{3}, + userver::ydb::TransactionMode::kSerializableRW, + std::chrono::minutes{3}, + }; + + clients.driver = std::make_shared("slo", std::move(driver_settings)); + clients.table_client = std::make_shared( + table_settings, + operation_settings, + userver::dynamic_config::GetDefaultSource(), + clients.driver + ); +} + +userver::ydb::TableClient& GetTableClient() { + auto& clients = GetClients(); + if (!clients.table_client) { + throw std::runtime_error("userver TableClient is not initialized"); + } + return *clients.table_client; +} + +void ShutdownTableClient() { + auto& clients = GetClients(); + clients.table_client.reset(); + clients.driver.reset(); +} + +userver::ydb::PreparedArgsBuilder PackValuesToPreparedArgs( + const std::vector& items, + const std::string& name +) { + NYdb::TValueBuilder items_as_list; + items_as_list.BeginList(); + for (const auto& item : items) { + items_as_list.AddListItem(item); + } + items_as_list.EndList(); + + NYdb::TParamsBuilder params_builder; + params_builder.AddParam(name, items_as_list.Build()); + return userver::ydb::PreparedArgsBuilder(std::move(params_builder)); +} + +} // namespace userver_slo diff --git a/tests/slo_workloads/userver/key_value/userver_table_client.h b/tests/slo_workloads/userver/key_value/userver_table_client.h new file mode 100644 index 0000000000..40dc888126 --- /dev/null +++ b/tests/slo_workloads/userver/key_value/userver_table_client.h @@ -0,0 +1,32 @@ +#pragma once + +#include +#include + +#include +#include + +#include +#include +#include + +namespace userver_slo { + +void InitTableClient( + const std::string& endpoint, + const std::string& database, + const std::shared_ptr& credentials_provider_factory, + const std::string& oauth_token, + bool prefer_local_dc +); + +userver::ydb::TableClient& GetTableClient(); + +void ShutdownTableClient(); + +userver::ydb::PreparedArgsBuilder PackValuesToPreparedArgs( + const std::vector& items, + const std::string& name = "$items" +); + +} // namespace userver_slo diff --git a/tests/slo_workloads/userver/key_value/userver_utils.cpp b/tests/slo_workloads/userver/key_value/userver_utils.cpp new file mode 100644 index 0000000000..5661675fbe --- /dev/null +++ b/tests/slo_workloads/userver/key_value/userver_utils.cpp @@ -0,0 +1,218 @@ +// Userver-specific DoMain() override. +// Wraps command execution inside userver::engine::RunStandalone() so that +// coroutine-based ydb::TableClient and engine primitives work correctly. +// All other utils (ParseOptions*, GetTableStats, YdbStatusToString, etc.) +// come from slo-utils-base via the shared header. + +#include "userver_utils.h" +#include "userver_table_client.h" + +#include + +#include +#include +#include +#include + +#include + +#include + +using namespace NLastGetopt; +using namespace NYdb; + +// Override DoMain to wrap command dispatch in RunStandalone. +// The native DoMain (from slo-utils-base) runs commands directly in the calling +// thread. The userver version needs the coroutine engine running for +// ydb::TableClient, engine::Semaphore, AsyncNoSpan, SleepFor, WaitAny, etc. +int DoMain(int argc, char** argv, TCreateCommand create, TRunCommand run, TCleanupCommand cleanup) { + TOpts opts = TOpts::Default(); + + std::string connectionString; + std::string prefix; + std::string token; + std::string tokenFile; + std::string iamSaKeyFile; + std::string statConfigFile; + std::string balancingPolicy; + + std::string defaultConnectionString = DefaultConnectionStringFromEnv(); + + auto& connOpt = opts.AddLongOption('c', "connection-string", "YDB connection string").RequiredArgument("SCHEMA://HOST:PORT/?DATABASE=DATABASE") + .StoreResult(&connectionString); + if (!defaultConnectionString.empty()) { + connOpt.DefaultValue(defaultConnectionString); + } else { + connOpt.Required(); + } + opts.AddLongOption('p', "prefix", "Base prefix for tables").RequiredArgument("PATH") + .StoreResult(&prefix); + opts.AddLongOption('k', "token", "security token").RequiredArgument("TOKEN") + .StoreResult(&token); + opts.AddLongOption('f', "token-file", "security token file").RequiredArgument("PATH") + .StoreResult(&tokenFile); + opts.AddLongOption("iam-sa-key-file", "IAM service account key file").RequiredArgument("SECRET") + .StoreResult(&iamSaKeyFile); + opts.AddLongOption('s', "stat-config", "statistics config file").Optional().RequiredArgument("PATH") + .StoreResult(&statConfigFile); + opts.AddLongOption('b', "balancing-policy", "Balancing policy").Optional().DefaultValue("use-all-nodes").RequiredArgument("(use-all-nodes|prefer-local-dc|prefer-primary-pile)") + .StoreResult(&balancingPolicy); + opts.AddHelpOption('h'); + opts.SetFreeArgsMin(0); + opts.SetFreeArgTitle(0, "", GetCmdList()); + opts.ArgPermutation_ = NLastGetopt::REQUIRE_ORDER; + // Run-phase options (--read-rps, --write-rps, …) reach DoMain when the + // caller invokes the workload without an explicit subcommand (the v2 SLO + // action contract). Tolerate them here so the global parser stops at the + // first unknown option instead of erroring; they are forwarded to the + // run phase below. + opts.AllowUnknownLongOptions_ = true; + + TOptsParseResult res(&opts, argc, argv); + size_t freeArgsPos = res.GetFreeArgsPos(); + argc -= freeArgsPos; + argv += freeArgsPos; + + ECommandType command = (argc > 0) ? ParseCommand(*argv) : ECommandType::All; + if (command == ECommandType::Unknown) { + if (argv[0][0] == '-') { + command = ECommandType::All; + } else { + Cerr << "Unknown command '" << *argv << "'" << Endl; + return EXIT_FAILURE; + } + } + + if (prefix.empty()) { + prefix = GetDatabase(connectionString); + } + if (prefix.empty()) { + prefix = GetEnv("YDB_DATABASE"); + } + + if (!ParseToken(token, tokenFile)) { + return EXIT_FAILURE; + } + + auto config = TDriverConfig(connectionString); + + std::shared_ptr credentials_provider_factory; + if (!iamSaKeyFile.empty()) { + Cout << "Enabling IAM authentication..." << Endl; + credentials_provider_factory = + CreateIamJwtFileCredentialsProviderFactory(TIamJwtFilename{.JwtFilename = iamSaKeyFile}); + } else if (!token.empty()) { + Cout << "Enabling OAuth authentication..." << Endl; + credentials_provider_factory = CreateOAuthCredentialsProviderFactory(token); + } else { + Cerr << "Warning: No authentication methods provided." << Endl; + } + + if (credentials_provider_factory) { + config.SetCredentialsProviderFactory(credentials_provider_factory); + } + + if (balancingPolicy == "use-all-nodes") { + config.SetBalancingPolicy(TBalancingPolicy::UseAllNodes()); + } else if (balancingPolicy == "prefer-local-dc") { + config.SetBalancingPolicy(TBalancingPolicy::UsePreferableLocation()); + } else if (balancingPolicy == "prefer-primary-pile") { + config.SetBalancingPolicy(TBalancingPolicy::UsePreferablePileState()); + } else { + Cerr << "Unknown balancing policy: " << balancingPolicy << Endl; + return EXIT_FAILURE; + } + + const bool prefer_local_dc = balancingPolicy == "prefer-local-dc"; + + TDriver driver(config); + + StartStatCollecting(driver, statConfigFile); + + TDatabaseOptions dbOptions{driver, prefix}; + int result = EXIT_FAILURE; + + try { + // Wrap command execution in userver engine so coroutine-based + // userver::ydb::TableClient and engine primitives work. + // DDL operations (create table, drop table) use native SDK directly + // and don't need the engine, but workload queries use + // userver::ydb::TableClient which requires the engine to be running. + userver::engine::RunStandalone(4, [&] { + userver_slo::InitTableClient( + config.GetEndpoint(), + prefix, + credentials_provider_factory, + token, + prefer_local_dc + ); + + switch (command) { + case ECommandType::Create: + Cout << "Launching create command..." << Endl; + result = create(dbOptions, argc, argv); + break; + case ECommandType::Run: + Cout << "Launching run command..." << Endl; + result = run(dbOptions, argc, argv); + break; + case ECommandType::Cleanup: + Cout << "Launching cleanup command..." << Endl; + result = cleanup(dbOptions, argc); + break; + case ECommandType::All: { + Cout << "Launching full lifecycle: create -> run -> cleanup" << Endl; + // Forward leftover argv to the run phase so options + // like --read-rps / --write-rps take effect. argv[0] + // here is the first run-phase option (no subcommand + // keyword was supplied), so prepend a synthetic + // program name for ParseOptionsRun. + char programName[] = "slo"; + std::vector runArgv; + runArgv.reserve(argc + 2); + runArgv.push_back(programName); + for (int i = 0; i < argc; ++i) { + runArgv.push_back(argv[i]); + } + runArgv.push_back(nullptr); + int fakeArgc = 1; + char* fakeArgv[] = { programName, nullptr }; + + Cout << "[all] Launching create command..." << Endl; + result = create(dbOptions, fakeArgc, fakeArgv); + if (!result) { + Cout << "[all] Launching run command..." << Endl; + result = run(dbOptions, static_cast(runArgv.size() - 1), runArgv.data()); + } + Cout << "[all] Launching cleanup command..." << Endl; + int cleanupRc = cleanup(dbOptions, fakeArgc); + // Cleanup runs while chaos-monkey is still killing + // nodes, so a DropTable failure here is expected noise + // and must not mask a successful run. Surface the + // run's status; only fall back to cleanup status when + // run itself failed. + if (cleanupRc && !result) { + Cerr << "[all] Warning: cleanup failed (exit " << cleanupRc + << ") but run succeeded; ignoring cleanup exit code." << Endl; + } else if (cleanupRc) { + Cerr << "[all] Warning: cleanup failed (exit " << cleanupRc + << "); preserving earlier run failure." << Endl; + } + break; + } + default: + Cerr << "Unknown command" << Endl; + result = EXIT_FAILURE; + break; + } + // Destroy userver clients while the engine is still running. + userver_slo::ShutdownTableClient(); + }); + } catch (const NYdb::NStatusHelpers::TYdbErrorException& e) { + Cerr << "Exception caught: " << e << Endl; + driver.Stop(true); + return EXIT_FAILURE; + } + driver.Stop(true); + return result; +} diff --git a/tests/slo_workloads/userver/key_value/userver_utils.h b/tests/slo_workloads/userver/key_value/userver_utils.h new file mode 100644 index 0000000000..dde60901d6 --- /dev/null +++ b/tests/slo_workloads/userver/key_value/userver_utils.h @@ -0,0 +1,7 @@ +#pragma once + +// Re-export the shared utils header. The userver workload uses the same +// CLI parsing, option structs, and helper functions as the native workload. +// Only DoMain() is overridden in userver_utils.cpp to wrap command execution +// inside userver::engine::RunStandalone(). +#include diff --git a/tests/slo_workloads/utils/CMakeLists.txt b/tests/slo_workloads/utils/CMakeLists.txt index e8589a568f..4dd24019a2 100644 --- a/tests/slo_workloads/utils/CMakeLists.txt +++ b/tests/slo_workloads/utils/CMakeLists.txt @@ -1,6 +1,21 @@ -add_library(slo-utils) +include(FetchContent) -target_link_libraries(slo-utils PUBLIC +FetchContent_Declare( + hdr_histogram + GIT_REPOSITORY https://github.com/HdrHistogram/HdrHistogram_c.git + GIT_TAG 0.11.8 + EXCLUDE_FROM_ALL +) +set(HDR_HISTOGRAM_BUILD_PROGRAMS OFF CACHE BOOL "" FORCE) +set(HDR_HISTOGRAM_BUILD_SHARED OFF CACHE BOOL "" FORCE) +set(HDR_LOG_REQUIRED OFF CACHE BOOL "" FORCE) +FetchContent_MakeAvailable(hdr_histogram) + +# slo-utils-base: shared helpers, no DoMain. The userver workload links this +# and supplies its own DoMain that wraps command dispatch in RunStandalone. +add_library(slo-utils-base) + +target_link_libraries(slo-utils-base PUBLIC yutil getopt YDB-CPP-SDK::Table @@ -9,11 +24,11 @@ target_link_libraries(slo-utils PUBLIC opentelemetry-cpp::otlp_http_metric_exporter ) -if (SLO_BRANCH_REF) - target_compile_definitions(slo-utils PRIVATE REF=${SLO_BRANCH_REF}) -endif() +target_link_libraries(slo-utils-base PRIVATE + hdr_histogram_static +) -target_sources(slo-utils PRIVATE +target_sources(slo-utils-base PRIVATE executor.cpp generator.cpp job.cpp @@ -21,3 +36,16 @@ target_sources(slo-utils PRIVATE statistics.cpp utils.cpp ) + +# slo-utils: slo-utils-base + DoMain. Native workload links this. +# The set of .o files linked into slo-key-value is byte-equivalent to the +# pre-split single-library layout used in slo/align-with-action-v2. +add_library(slo-utils) + +target_link_libraries(slo-utils PUBLIC + slo-utils-base +) + +target_sources(slo-utils PRIVATE + utils_main.cpp +) diff --git a/tests/slo_workloads/utils/executor.cpp b/tests/slo_workloads/utils/executor.cpp index 28add35c98..aa0a637905 100644 --- a/tests/slo_workloads/utils/executor.cpp +++ b/tests/slo_workloads/utils/executor.cpp @@ -16,62 +16,17 @@ TInsistentClient::TInsistentClient(const TCommonOptions& opts) .AllowRequestMigration(true) ) , ClientMaxRetries(opts.MaxRetries) - , Timeout(opts.ReactionTime) - , RetryTimeout(Timeout / 2) - , SessionTimeout(Timeout + ReactionTimeDelay) - , UseApplicationTimeout(opts.UseApplicationTimeout) - , SendPreventiveRequest(opts.SendPreventiveRequest) + , SessionTimeout(opts.ReactionTime + ReactionTimeDelay) { - if (UseApplicationTimeout || SendPreventiveRequest) { - CallbackQueue.Start(opts.MaxCallbackThreads); - // Thread that executes timeout callbacks - auto threadFunc = [this]() { - TDuration timeToSleep; - while (!ShouldStop.WaitT(timeToSleep)) { - TInstant wakeupTime; - TInstant now; - with_lock(CallbacksLock) { - now = TInstant::Now(); - while (!TimeoutCallbacks.empty() && now >= TimeoutCallbacks.front().ExecucionTime) { - Y_UNUSED(CallbackQueue.AddFunc(TimeoutCallbacks.front().Callback)); - RemoveTimeoutIter(TimeoutCallbacks.front().context); - } - while (!RetryCallbacks.empty() && now >= RetryCallbacks.front().ExecucionTime) { - Y_UNUSED(CallbackQueue.AddFunc(RetryCallbacks.front().Callback)); - RemoveRetryIter(RetryCallbacks.front().context); - } - if (RetryCallbacks.empty()) { - wakeupTime = now + RetryTimeout; - } else { - wakeupTime = RetryCallbacks.front().ExecucionTime; - } - if (!TimeoutCallbacks.empty()) { - wakeupTime = Min(wakeupTime, TimeoutCallbacks.front().ExecucionTime); - } - } - timeToSleep = wakeupTime - now; - } - }; - WorkThread.reset(SystemThreadFactory()->Run(threadFunc).Release()); - } } TInsistentClient::~TInsistentClient() { - ShouldStop.Signal(); - if (UseApplicationTimeout || SendPreventiveRequest) { - if (WorkThread) { - WorkThread->Join(); - } else { - Cerr << (TStringBuilder() << "TInsistentClient::~TINsistentClient Error: WorkThread is not running." << Endl); - } - CallbackQueue.Stop(); - } Client.Stop().Wait(WaitTimeout); } void TInsistentClient::Report(TStringBuilder& out) const { - out << "Client retries sent: total " << CounterSStart.load() - << ", successful " << CounterSOk.load() << Endl; + out << "Operations dispatched: " << CounterStart.load() + << ", succeeded: " << CounterOk.load() << Endl; } std::uint64_t TInsistentClient::GetActiveSessions() const { @@ -79,112 +34,27 @@ std::uint64_t TInsistentClient::GetActiveSessions() const { return static_cast(sessions); } -void TInsistentClient::ClearContext(std::shared_ptr& context) { - if (SendPreventiveRequest) { - RemoveRetryIter(context); - } - if (UseApplicationTimeout) { - RemoveTimeoutIter(context); - } -} - -void TInsistentClient::RemoveRetryIter(std::shared_ptr& context) { - if (context->RetryIter.Valid) { - context->RetryIter.Valid = false; - RetryCallbacks.erase(context->RetryIter.RealIter); - } -} - -void TInsistentClient::RemoveTimeoutIter(std::shared_ptr& context) { - if (context->TimeoutIter.Valid) { - context->TimeoutIter.Valid = false; - TimeoutCallbacks.erase(context->TimeoutIter.RealIter); - } -} - TAsyncFinalStatus TInsistentClient::ExecuteWithRetry(const NYdb::NTable::TTableClient::TOperationFunc& operation) { TTracedPromise promise = TTracedPromise( NThreading::NewPromise(), &ExecutorPromises ); - std::shared_ptr context = std::make_shared(); - auto launchOperation = [this, operation, promise, context](bool firstTime) mutable { - with_lock(context->Lock) { - if (context->Finished) { - return; - } - } - auto callback = [promise, context, firstTime, this](const NYdb::TAsyncStatus& future) mutable { - Y_ABORT_UNLESS(future.HasValue()); - // Not setting promise under lock to avoid deadlock - bool firstCallback = false; - with_lock(context->Lock) { - if (!context->Finished) { - context->Finished = true; - firstCallback = true; - } - } - if (firstCallback) { - promise.SetValue(future.GetValue()); - with_lock(CallbacksLock) { - if (firstTime) { - CounterFOk.fetch_add(1); - } else { - CounterSOk.fetch_add(1); - } - ClearContext(context); - } - } - }; - if (firstTime) { - CounterFStart.fetch_add(1); - } else { - CounterSStart.fetch_add(1); - } - NYdb::NTable::TRetryOperationSettings settings; - settings.MaxRetries(ClientMaxRetries); - settings.GetSessionClientTimeout(SessionTimeout); - auto future = Client.RetryOperation(operation, settings); - future.Subscribe(std::move(callback)); - }; + CounterStart.fetch_add(1); - with_lock(CallbacksLock) { - TInstant now = TInstant::Now(); + NYdb::NTable::TRetryOperationSettings settings; + settings.MaxRetries(ClientMaxRetries); + settings.GetSessionClientTimeout(SessionTimeout); - if (SendPreventiveRequest) { - auto onRetryTimeout = [launchOperation]() mutable { - launchOperation(false); - }; - - RetryCallbacks.push_back({ now + RetryTimeout, onRetryTimeout, context }); - context->RetryIter = { --RetryCallbacks.end() }; - } - - if (UseApplicationTimeout) { - auto onTimeout = [this, promise, context]() mutable { - // Not setting promise under lock to avoid deadlock - bool firstCallback = false; - with_lock(context->Lock) { - if (!context->Finished) { - context->Finished = true; - firstCallback = true; - } - } - if (firstCallback) { - promise.SetValue(TFinalStatus()); - with_lock(CallbacksLock) { - ClearContext(context); - } - } - }; - - TimeoutCallbacks.push_back({ now + Timeout, onTimeout, context }); - context->TimeoutIter = { --TimeoutCallbacks.end() }; + auto future = Client.RetryOperation(operation, settings); + future.Subscribe([promise, this](const NYdb::TAsyncStatus& f) mutable { + Y_ABORT_UNLESS(f.HasValue()); + const auto& status = f.GetValue(); + if (status.IsSuccess()) { + CounterOk.fetch_add(1); } - } - - launchOperation(true); + promise.SetValue(status); + }); return promise.GetFuture(); } diff --git a/tests/slo_workloads/utils/executor.h b/tests/slo_workloads/utils/executor.h index 69ed261a35..c20df89d6d 100644 --- a/tests/slo_workloads/utils/executor.h +++ b/tests/slo_workloads/utils/executor.h @@ -50,26 +50,6 @@ class TTracedPromise : public NThreading::TPromise { class TInsistentClient { public: - struct TDelayedCallback; - - struct TCheckedIterator { - std::list::iterator RealIter; - bool Valid = true; - }; - - struct TOperationContext { - bool Finished = false; - TAdaptiveLock Lock; - TCheckedIterator RetryIter; - TCheckedIterator TimeoutIter; - }; - - struct TDelayedCallback { - TInstant ExecucionTime; - std::function Callback; - std::shared_ptr context; - }; - TInsistentClient(const TCommonOptions& opts); ~TInsistentClient(); void Report(TStringBuilder& out) const; @@ -77,32 +57,12 @@ class TInsistentClient { std::uint64_t GetActiveSessions() const; private: - void ClearContext(std::shared_ptr& context); - void RemoveRetryIter(std::shared_ptr& context); - void RemoveTimeoutIter(std::shared_ptr& context); - - TThreadPool CallbackQueue; NYdb::NTable::TTableClient Client; std::uint32_t ClientMaxRetries; - TDuration Timeout; - TDuration RetryTimeout; TDuration SessionTimeout; - TAdaptiveLock CallbacksLock; - std::unique_ptr WorkThread; - TManualEvent ShouldStop; - std::list RetryCallbacks; - std::list TimeoutCallbacks; - bool UseApplicationTimeout; - bool SendPreventiveRequest; - - // Ok received on the First try - std::atomic CounterFOk = 0; - // Ok received on the Second try - std::atomic CounterSOk = 0; - // First try launches (= total) - std::atomic CounterFStart = 0; - // Second try launches - std::atomic CounterSStart = 0; + + std::atomic CounterStart = 0; + std::atomic CounterOk = 0; }; class TExecutor { diff --git a/tests/slo_workloads/utils/metrics.cpp b/tests/slo_workloads/utils/metrics.cpp index 50e1f859c0..708c83a563 100644 --- a/tests/slo_workloads/utils/metrics.cpp +++ b/tests/slo_workloads/utils/metrics.cpp @@ -11,21 +11,97 @@ #include +#include + +#include + +#include +#include +#include +#include +#include +#include using namespace std::chrono_literals; -#ifdef REF -static constexpr const std::string_view REF_LABEL = Y_STRINGIZE(REF); -#else -static constexpr const std::string_view REF_LABEL = "unknown"; -#endif +namespace { + +constexpr std::int64_t kHdrMinLatencyNs = 1'000; // 1 us +constexpr std::int64_t kHdrMaxLatencyNs = 60'000'000'000; // 60 s +constexpr int kHdrSignificantFigures = 3; + +std::string ResolveWorkloadRef() { + std::string ref = GetEnv("WORKLOAD_REF"); + return ref.empty() ? "unknown" : ref; +} + +// Thread-safe HDR histogram. Only successful latencies are recorded; errors +// are excluded from the percentile stream (operation_status="success"). +class TLatencyRecorder { +public: + TLatencyRecorder() { + hdr_histogram* raw = nullptr; + int rc = hdr_init(kHdrMinLatencyNs, kHdrMaxLatencyNs, kHdrSignificantFigures, &raw); + Y_ABORT_UNLESS(rc == 0, "hdr_init failed: %d", rc); + Histogram_.reset(raw); + } + + void Record(TDuration d) { + std::int64_t ns = static_cast(d.NanoSeconds()); + if (ns < kHdrMinLatencyNs) { + ns = kHdrMinLatencyNs; + } else if (ns > kHdrMaxLatencyNs) { + ns = kHdrMaxLatencyNs; + } + std::lock_guard lock(Mutex_); + hdr_record_value(Histogram_.get(), ns); + } + + struct TPercentiles { + double P50 = 0.0; + double P95 = 0.0; + double P99 = 0.0; + bool HasData = false; + }; + + // Snapshot all three percentiles from one consistent HDR state and reset + // the window — so each export cycle's gauge reflects only the last + // interval's latencies. Reading p50/p95/p99 in one critical section + // matches the Java workload's batch-callback pattern (avoids the race + // where p99 would observe a histogram already reset by p50). + TPercentiles SnapshotAndReset() { + TPercentiles out; + std::lock_guard lock(Mutex_); + if (Histogram_->total_count == 0) { + return out; + } + out.HasData = true; + out.P50 = hdr_value_at_percentile(Histogram_.get(), 50.0) / 1e9; + out.P95 = hdr_value_at_percentile(Histogram_.get(), 95.0) / 1e9; + out.P99 = hdr_value_at_percentile(Histogram_.get(), 99.0) / 1e9; + hdr_reset(Histogram_.get()); + return out; + } + +private: + struct THdrDeleter { + void operator()(hdr_histogram* h) const noexcept { if (h) hdr_close(h); } + }; + + std::mutex Mutex_; + std::unique_ptr Histogram_; +}; -class TOtelMetricsPusher : public IMetricsPusher { +// Process-wide pusher: ONE MeterProvider with one OTLP exporter shared by +// all operation types. Publishing duplicate MeterProviders against the same +// Prometheus endpoint produces racing `target_info` writes for the same +// resource label set, which Prometheus rejects as `out of order sample`. +class TOtelSharedPusher { public: - TOtelMetricsPusher(const std::string& metricsPushUrl, const std::string& operationType) - : OperationType_(operationType) + explicit TOtelSharedPusher(const std::string& metricsPushUrl) + : Ref_(ResolveWorkloadRef()) , CommonAttributes_{ - {"ref", std::string(REF_LABEL)}, + {"ref", Ref_}, {"sdk", "cpp"}, {"sdk_version", NYdb::GetSdkSemver()} } @@ -36,15 +112,16 @@ class TOtelMetricsPusher : public IMetricsPusher { auto exporter = opentelemetry::exporter::otlp::OtlpHttpMetricExporterFactory::Create(exporterOptions); opentelemetry::sdk::metrics::PeriodicExportingMetricReaderOptions readerOptions; - readerOptions.export_interval_millis = 250ms; - readerOptions.export_timeout_millis = 200ms; + readerOptions.export_interval_millis = 1000ms; + readerOptions.export_timeout_millis = 500ms; - auto metricReader = opentelemetry::sdk::metrics::PeriodicExportingMetricReaderFactory::Create(std::move(exporter), readerOptions); + auto metricReader = opentelemetry::sdk::metrics::PeriodicExportingMetricReaderFactory::Create( + std::move(exporter), readerOptions); - // Create MeterContext with resource auto context = std::make_unique( std::unique_ptr(new opentelemetry::sdk::metrics::ViewRegistry()), - opentelemetry::sdk::resource::Resource::Create(opentelemetry::common::MakeKeyValueIterableView(CommonAttributes_)) + opentelemetry::sdk::resource::Resource::Create( + opentelemetry::common::MakeKeyValueIterableView(CommonAttributes_)) ); MeterProvider_ = opentelemetry::sdk::metrics::MeterProviderFactory::Create(std::move(context)); @@ -55,113 +132,201 @@ class TOtelMetricsPusher : public IMetricsPusher { InitMetrics(); } - void PushRequestData(const TRequestData& requestData) override { - if (requestData.Status == NYdb::EStatus::SUCCESS) { - OperationsSuccessTotal_->Add(1, MergeAttributes({{"operation_type", OperationType_}})); - } else { - ErrorsTotal_->Add(1, MergeAttributes({{"status", YdbStatusToString(requestData.Status)}})); - OperationsFailureTotal_->Add(1, MergeAttributes({{"operation_type", OperationType_}})); + ~TOtelSharedPusher() { + // Remove observable-gauge callbacks before MeterProvider tears down + // the readers, so a final collection in flight cannot see this object + // half-destroyed. + if (LatencyP50_) LatencyP50_->RemoveCallback(&TOtelSharedPusher::ObserveP50, this); + if (LatencyP95_) LatencyP95_->RemoveCallback(&TOtelSharedPusher::ObserveP95, this); + if (LatencyP99_) LatencyP99_->RemoveCallback(&TOtelSharedPusher::ObserveP99, this); + // MeterProvider destructor calls Shutdown(); do not call it explicitly + // here — the OTel SDK rejects a second Shutdown with a warning. + } + + void Record(const std::string& operationType, const TRequestData& data) { + const bool success = data.Status == NYdb::EStatus::SUCCESS; + auto& series = GetOrCreateSeries(operationType); + + OperationsTotal_->Add(uint64_t{1}, + opentelemetry::common::MakeKeyValueIterableView( + success ? series.SuccessAttrs : series.ErrorAttrs)); + + // sdk_retry_attempts_total = total number of technical attempts + // including the first one. RetryAttempts counts only post-first + // attempts, so add 1 to include the initial attempt. + RetryAttemptsTotal_->Add(data.RetryAttempts + 1, + opentelemetry::common::MakeKeyValueIterableView(series.RetryAttrs)); + + if (success) { + series.Recorder.Record(data.Delay); } - OperationsTotal_->Add(1, MergeAttributes({{"operation_type", OperationType_}})); - OperationLatencySeconds_->Record(requestData.Delay.SecondsFloat(), MergeAttributes({{"operation_type", OperationType_}, {"status", YdbStatusToString(requestData.Status)}})); - RetryAttempts_->Record(requestData.RetryAttempts, MergeAttributes({{"operation_type", OperationType_}})); } private: - void InitMetrics() { - ErrorsTotal_ = Meter_->CreateUInt64Counter("sdk_errors_total", - "Total number of errors encountered, categorized by error type." - ); - - OperationsTotal_ = Meter_->CreateUInt64Counter("sdk_operations_total", - "Total number of operations, categorized by type attempted by the SDK." - ); - - OperationsSuccessTotal_ = Meter_->CreateUInt64Counter("sdk_operations_success_total", - "Total number of successful operations, categorized by type." - ); + struct TSeries { + TLatencyRecorder Recorder; + // Pre-merged attribute maps used on the hot path so Record() does not + // allocate / copy CommonAttributes_ per call. + std::map SuccessAttrs; + std::map ErrorAttrs; + std::map RetryAttrs; + + // Cached snapshot, refreshed by EnsureSnapshot() in whichever + // observable-gauge callback fires first per export cycle. All three + // callbacks read from these atomics so p50/p95/p99 land in the same + // export with consistent values from one HDR snapshot — independent + // of the order the SDK iterates instruments. + std::atomic P50{0.0}; + std::atomic P95{0.0}; + std::atomic P99{0.0}; + std::atomic HasData{false}; + std::mutex SnapshotMutex; + std::chrono::steady_clock::time_point LastSnapshot{}; + }; + + // Half the export interval — guarantees one snapshot per cycle while + // tolerating arbitrary callback ordering. + static constexpr std::chrono::milliseconds kSnapshotFreshness{500}; + + void EnsureSnapshot(TSeries& series) { + auto now = std::chrono::steady_clock::now(); + std::lock_guard lock(series.SnapshotMutex); + if (now - series.LastSnapshot < kSnapshotFreshness) { + return; + } + auto snap = series.Recorder.SnapshotAndReset(); + if (snap.HasData) { + series.P50.store(snap.P50); + series.P95.store(snap.P95); + series.P99.store(snap.P99); + series.HasData.store(true); + } else { + series.HasData.store(false); + } + series.LastSnapshot = now; + } - OperationsFailureTotal_ = Meter_->CreateUInt64Counter("sdk_operations_failure_total", - "Total number of failed operations, categorized by type." - ); + TSeries& GetOrCreateSeries(const std::string& op) { + { + std::shared_lock lock(SeriesMutex_); + auto it = Series_.find(op); + if (it != Series_.end()) { + return *it->second; + } + } + std::unique_lock lock(SeriesMutex_); + auto& slot = Series_[op]; + if (!slot) { + slot = std::make_unique(); + slot->SuccessAttrs = CommonAttributes_; + slot->SuccessAttrs["operation_type"] = op; + slot->SuccessAttrs["operation_status"] = "success"; + slot->ErrorAttrs = CommonAttributes_; + slot->ErrorAttrs["operation_type"] = op; + slot->ErrorAttrs["operation_status"] = "error"; + slot->RetryAttrs = CommonAttributes_; + slot->RetryAttrs["operation_type"] = op; + } + return *slot; + } - OperationLatencySeconds_ = CreateDoubleHistogram("sdk_operation_latency_seconds", - "Latency of operations performed by the SDK in seconds, categorized by type and status.", - { - 0.001, // 1 ms - 0.002, // 2 ms - 0.003, // 3 ms - 0.004, // 4 ms - 0.005, // 5 ms - 0.0075, // 7.5 ms - 0.010, // 10 ms - 0.020, // 20 ms - 0.050, // 50 ms - 0.100, // 100 ms - 0.200, // 200 ms - 0.500, // 500 ms - 1.000, // 1 s - }, - "s" - ); + void InitMetrics() { + OperationsTotal_ = Meter_->CreateUInt64Counter("sdk_operations_total", + "Total number of operations, categorized by operation type and status."); + RetryAttemptsTotal_ = Meter_->CreateUInt64Counter("sdk_retry_attempts_total", + "Total number of retry attempts (including the first attempt), categorized by operation type."); + + LatencyP50_ = Meter_->CreateDoubleObservableGauge( + "sdk_operation_latency_p50_seconds", + "P50 latency of successful operations in seconds.", "s"); + LatencyP95_ = Meter_->CreateDoubleObservableGauge( + "sdk_operation_latency_p95_seconds", + "P95 latency of successful operations in seconds.", "s"); + LatencyP99_ = Meter_->CreateDoubleObservableGauge( + "sdk_operation_latency_p99_seconds", + "P99 latency of successful operations in seconds.", "s"); + + LatencyP50_->AddCallback(&TOtelSharedPusher::ObserveP50, this); + LatencyP95_->AddCallback(&TOtelSharedPusher::ObserveP95, this); + LatencyP99_->AddCallback(&TOtelSharedPusher::ObserveP99, this); + } - RetryAttempts_ = Meter_->CreateInt64Gauge("sdk_retry_attempts", - "Current retry attempts, categorized by operation type." - ); + static void ObserveP50(opentelemetry::metrics::ObserverResult r, void* s) { + ObservePercentile(r, s, &TSeries::P50); + } + static void ObserveP95(opentelemetry::metrics::ObserverResult r, void* s) { + ObservePercentile(r, s, &TSeries::P95); + } + static void ObserveP99(opentelemetry::metrics::ObserverResult r, void* s) { + ObservePercentile(r, s, &TSeries::P99); } - std::unique_ptr> CreateDoubleHistogram( - const std::string& name, - const std::string& description, - const std::vector& buckets, - const std::string& unit = {}) + // Each callback ensures a fresh snapshot exists for the current export + // cycle (EnsureSnapshot is a no-op if one was taken less than + // kSnapshotFreshness ago). Whichever of P50/P95/P99 the SDK invokes first + // performs the snapshot+reset; the others read cached atomics. Order + // between the three callbacks is irrelevant. + static void ObservePercentile(opentelemetry::metrics::ObserverResult result, void* state, + std::atomic TSeries::*field) { - auto selector = std::make_unique( - opentelemetry::sdk::metrics::InstrumentType::kHistogram, - name, - unit - ); + auto* self = static_cast(state); + auto obs = opentelemetry::nostd::get< + opentelemetry::nostd::shared_ptr>>(result); + + std::shared_lock lock(self->SeriesMutex_); + for (const auto& [op, series] : self->Series_) { + self->EnsureSnapshot(*series); + if (!series->HasData.load()) { + continue; + } + obs->Observe((series.get()->*field).load(), + opentelemetry::common::MakeKeyValueIterableView(series->SuccessAttrs)); + } + } - auto meterSelector = std::make_unique( - "slo_workloads", - NYdb::GetSdkSemver(), - "" - ); + std::string Ref_; + std::map CommonAttributes_; - auto histogramConfig = std::make_shared(); - histogramConfig->boundaries_ = buckets; + std::unique_ptr MeterProvider_; + std::shared_ptr Meter_; - auto view = std::make_unique( - "", - "", - opentelemetry::sdk::metrics::AggregationType::kHistogram, - histogramConfig - ); + std::unique_ptr> OperationsTotal_; + std::unique_ptr> RetryAttemptsTotal_; + std::shared_ptr LatencyP50_; + std::shared_ptr LatencyP95_; + std::shared_ptr LatencyP99_; - MeterProvider_->AddView(std::move(selector), std::move(meterSelector), std::move(view)); + std::shared_mutex SeriesMutex_; + std::unordered_map> Series_; +}; - return Meter_->CreateDoubleHistogram(name, description, unit); - } +std::mutex g_sharedMu; +std::weak_ptr g_shared; - // Helper to merge common attributes with metric-specific ones - std::map MergeAttributes(const std::map& metricAttrs) const { - std::map result = CommonAttributes_; - result.insert(metricAttrs.begin(), metricAttrs.end()); - return result; +std::shared_ptr GetOrCreateSharedPusher(const std::string& url) { + std::lock_guard lock(g_sharedMu); + auto sp = g_shared.lock(); + if (!sp) { + sp = std::make_shared(url); + g_shared = sp; } + return sp; +} - std::string OperationType_; - std::map CommonAttributes_; // ref, sdk, sdk_version +class TOtelMetricsPusherWrapper : public IMetricsPusher { +public: + TOtelMetricsPusherWrapper(std::shared_ptr shared, std::string operationType) + : Shared_(std::move(shared)) + , OperationType_(std::move(operationType)) + {} - std::unique_ptr MeterProvider_; - std::shared_ptr Meter_; + void PushRequestData(const TRequestData& requestData) override { + Shared_->Record(OperationType_, requestData); + } - std::unique_ptr> ErrorsTotal_; - std::unique_ptr> OperationsTotal_; - std::unique_ptr> OperationsSuccessTotal_; - std::unique_ptr> OperationsFailureTotal_; - std::unique_ptr> OperationLatencySeconds_; - std::unique_ptr> RetryAttempts_; +private: + std::shared_ptr Shared_; + std::string OperationType_; }; class TNoopMetricsPusher : public IMetricsPusher { @@ -169,8 +334,10 @@ class TNoopMetricsPusher : public IMetricsPusher { void PushRequestData([[maybe_unused]] const TRequestData& requestData) override {} }; +} // namespace + std::unique_ptr CreateOtelMetricsPusher(const std::string& metricsPushUrl, const std::string& operationType) { - return std::make_unique(metricsPushUrl, operationType); + return std::make_unique(GetOrCreateSharedPusher(metricsPushUrl), operationType); } std::unique_ptr CreateNoopMetricsPusher() { diff --git a/tests/slo_workloads/utils/statistics.cpp b/tests/slo_workloads/utils/statistics.cpp index 15789e7620..dddc35c56a 100644 --- a/tests/slo_workloads/utils/statistics.cpp +++ b/tests/slo_workloads/utils/statistics.cpp @@ -36,6 +36,11 @@ TStat::TStat(const std::optional& metricsPushUrl, const std::string MetricsPushQueue.Start(20); } +TStat::~TStat() { + MetricsPushQueue.Stop(); + MetricsPusher.reset(); +} + void TStat::Start() { StartTime = TInstant::Now(); } @@ -71,9 +76,12 @@ void TStat::FinishRequest(const std::shared_ptr& unit, const TFinalSt } ScheduleMetricsPush([this, delay, status, unit]() { + NYdb::EStatus requestStatus = status + ? status->GetStatus() + : NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED; MetricsPusher->PushRequestData({ .Delay = delay, - .Status = status->GetStatus(), + .Status = requestStatus, .RetryAttempts = unit->RetryAttempts }); }); diff --git a/tests/slo_workloads/utils/statistics.h b/tests/slo_workloads/utils/statistics.h index 9e7a092430..c61a267db4 100644 --- a/tests/slo_workloads/utils/statistics.h +++ b/tests/slo_workloads/utils/statistics.h @@ -62,6 +62,8 @@ class TStat { TInstant GetStartTime() const; + ~TStat(); + private: void ScheduleMetricsPush(std::function func); diff --git a/tests/slo_workloads/utils/utils.cpp b/tests/slo_workloads/utils/utils.cpp index b7572c2981..a189432541 100644 --- a/tests/slo_workloads/utils/utils.cpp +++ b/tests/slo_workloads/utils/utils.cpp @@ -7,6 +7,8 @@ #include #include #include +#include +#include #include #include #include @@ -110,113 +112,22 @@ std::string GetDatabase(const std::string& connectionString) { return {}; } -int DoMain(int argc, char** argv, TCreateCommand create, TRunCommand run, TCleanupCommand cleanup) { - TOpts opts = TOpts::Default(); - - std::string connectionString; - std::string prefix; - std::string token; - std::string tokenFile; - std::string iamSaKeyFile; - std::string statConfigFile; - std::string balancingPolicy; - - opts.AddLongOption('c', "connection-string", "YDB connection string").Required().RequiredArgument("SCHEMA://HOST:PORT/?DATABASE=DATABASE") - .StoreResult(&connectionString); - opts.AddLongOption('p', "prefix", "Base prefix for tables").RequiredArgument("PATH") - .StoreResult(&prefix); - opts.AddLongOption('k', "token", "security token").RequiredArgument("TOKEN") - .StoreResult(&token); - opts.AddLongOption('f', "token-file", "security token file").RequiredArgument("PATH") - .StoreResult(&tokenFile); - opts.AddLongOption("iam-sa-key-file", "IAM service account key file").RequiredArgument("SECRET") - .StoreResult(&iamSaKeyFile); - opts.AddLongOption('s', "stat-config", "statistics config file").Optional().RequiredArgument("PATH") - .StoreResult(&statConfigFile); - opts.AddLongOption('b', "balancing-policy", "Balancing policy").Optional().DefaultValue("use-all-nodes").RequiredArgument("(use-all-nodes|prefer-local-dc|prefer-primary-pile)") - .StoreResult(&balancingPolicy); - opts.AddHelpOption('h'); - opts.SetFreeArgsMin(1); - opts.SetFreeArgTitle(0, "", GetCmdList()); - opts.ArgPermutation_ = NLastGetopt::REQUIRE_ORDER; - - TOptsParseResult res(&opts, argc, argv); - size_t freeArgsPos = res.GetFreeArgsPos(); - argc -= freeArgsPos; - argv += freeArgsPos; - ECommandType command = ParseCommand(*argv); - if (command == ECommandType::Unknown) { - Cerr << "Unknown command '" << *argv << "'" << Endl; - return EXIT_FAILURE; - } - - if (prefix.empty()) { - prefix = GetDatabase(connectionString); - } - - if (!ParseToken(token, tokenFile)) { - return EXIT_FAILURE; - } - - auto config = TDriverConfig(connectionString); - - if (!iamSaKeyFile.empty()) { - Cout << "Enabling IAM authentication..." << Endl; - TIamJwtFilename iamJwtFilename{ .JwtFilename = iamSaKeyFile }; - config.SetCredentialsProviderFactory(CreateIamJwtFileCredentialsProviderFactory(iamJwtFilename)); - } else if (!token.empty()) { - Cout << "Enabling OAuth authentication..." << Endl; - config.SetCredentialsProviderFactory(CreateOAuthCredentialsProviderFactory(token)); - } else { - Cerr << "Warning: No authentication methods provided." << Endl; +std::string DefaultConnectionStringFromEnv() { + std::string cs = GetEnv("YDB_CONNECTION_STRING"); + if (!cs.empty()) { + return cs; } - - if (balancingPolicy == "use-all-nodes") { - config.SetBalancingPolicy(TBalancingPolicy::UseAllNodes()); - } else if (balancingPolicy == "prefer-local-dc") { - config.SetBalancingPolicy(TBalancingPolicy::UsePreferableLocation()); - } else if (balancingPolicy == "prefer-primary-pile") { - config.SetBalancingPolicy(TBalancingPolicy::UsePreferablePileState()); - } else { - Cerr << "Unknown balancing policy: " << balancingPolicy << Endl; - return EXIT_FAILURE; + std::string endpoint = GetEnv("YDB_ENDPOINT"); + std::string database = GetEnv("YDB_DATABASE"); + if (!endpoint.empty() && !database.empty()) { + return TStringBuilder() << endpoint << "/?database=" << database; } - - TDriver driver(config); - - StartStatCollecting(driver, statConfigFile); - - TDatabaseOptions dbOptions{ driver, prefix }; - int result; - try { - switch (command) { - case ECommandType::Create: - Cout << "Launching create command..." << Endl; - result = create(dbOptions, argc, argv); - break; - case ECommandType::Run: - Cout << "Launching run command..." << Endl; - result = run(dbOptions, argc, argv); - break; - case ECommandType::Cleanup: - Cout << "Launching cleanup command..." << Endl; - result = cleanup(dbOptions, argc); - break; - default: - Cerr << "Unknown command" << Endl; - return EXIT_FAILURE; - } - } - catch (const NYdb::NStatusHelpers::TYdbErrorException& e) { - Cerr << "Exception caught: " << e << Endl; - return EXIT_FAILURE; - } - driver.Stop(true); - return result; + return {}; } + std::string GetCmdList() { - return "create, run, cleanup"; + return "create, run, cleanup (omit to run create -> run -> cleanup in one process)"; } ECommandType ParseCommand(const char* cmd) { @@ -425,6 +336,11 @@ TTableStats GetTableStats(TDatabaseOptions& dbOptions, const std::string& tableN } void ParseOptionsCommon(TOpts& opts, TCommonOptions& options) { + std::string metricsPushUrlFromEnv = GetEnv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT"); + if (!metricsPushUrlFromEnv.empty()) { + options.MetricsPushUrl = metricsPushUrlFromEnv; + } + opts.AddLongOption("threads", "Number of threads to use").RequiredArgument("NUM") .DefaultValue(options.MaxInputThreads).StoreResult(&options.MaxInputThreads); opts.AddLongOption("stop-on-error", "Stop thread if an error occured").NoArgument() @@ -437,11 +353,6 @@ void ParseOptionsCommon(TOpts& opts, TCommonOptions& options) { .SetFlag(&options.DontPushMetrics).DefaultValue(options.DontPushMetrics); opts.AddLongOption("metrics-push-url", "URL to push metrics").RequiredArgument("URL") .DefaultValue(options.MetricsPushUrl).StoreResult(&options.MetricsPushUrl); - opts.AddLongOption("app-timeout", "Use application timeout (over SDK)").NoArgument() - .SetFlag(&options.UseApplicationTimeout).DefaultValue(options.UseApplicationTimeout); - opts.AddLongOption("prevention-request", "Send prevention request at 1/2 of timeout").NoArgument() - .SetFlag(&options.SendPreventiveRequest).DefaultValue(options.SendPreventiveRequest); - opts.MutuallyExclusive("dont-push", "metrics-push-url"); } @@ -485,6 +396,18 @@ bool ParseOptionsCreate(int argc, char** argv, TCreateOptions& createOptions) { bool ParseOptionsRun(int argc, char** argv, TRunOptions& runOptions) { TOpts opts = TOpts::Default(); ParseOptionsCommon(opts, runOptions.CommonOptions); + + if (std::string workloadDuration = GetEnv("WORKLOAD_DURATION"); !workloadDuration.empty()) { + try { + std::uint32_t parsed = FromString(workloadDuration); + if (parsed > 0) { + runOptions.CommonOptions.SecondsToRun = parsed; + } + } catch (const std::exception& e) { + Cerr << "Invalid WORKLOAD_DURATION env value '" << workloadDuration << "': " << e.what() << Endl; + } + } + opts.AddLongOption("time", "Time to run (Seconds)").RequiredArgument("Seconds") .DefaultValue(runOptions.CommonOptions.SecondsToRun).StoreResult(&runOptions.CommonOptions.SecondsToRun); opts.AddLongOption("read-rps", "Request generation rate for read requests (Thread A)").RequiredArgument("NUM") diff --git a/tests/slo_workloads/utils/utils.h b/tests/slo_workloads/utils/utils.h index 65be9f4891..91986a8ac6 100644 --- a/tests/slo_workloads/utils/utils.h +++ b/tests/slo_workloads/utils/utils.h @@ -49,8 +49,6 @@ struct TCommonOptions { std::uint32_t MaxRetries = 50; TDuration ReactionTime = DefaultReactionTime; bool StopOnError = false; - bool UseApplicationTimeout = false; - bool SendPreventiveRequest = false; // Generator options: std::uint32_t MinLength = 20; @@ -98,7 +96,8 @@ enum class ECommandType { Unknown, Create, Run, - Cleanup + Cleanup, + All, // No free-arg passed: execute Create -> Run -> Cleanup in one process }; struct TTableStats { @@ -115,30 +114,12 @@ int DoMain(int argc, char** argv, TCreateCommand create, TRunCommand run, TClean std::string GetCmdList(); ECommandType ParseCommand(const char* cmd); -std::string JoinPath(const std::string& prefix, const std::string& path); +std::string GetDatabase(const std::string& connectionString); +std::string DefaultConnectionStringFromEnv(); +bool ParseToken(std::string& token, std::string& tokenFile); +void StartStatCollecting(NYdb::TDriver& driver, const std::string& statConfigFile); -inline void RetryBackoff( - NYdb::NTable::TTableClient& client, - std::uint32_t retries, - const NYdb::NTable::TTableClient::TOperationSyncFunc& func -) { - TDuration delay = TDuration::Seconds(5); - while (retries) { - NYdb::TStatus status = client.RetryOperationSync(func); - if (status.IsSuccess()) { - return; - } - --retries; - if (!retries) { - Cerr << "Create request failed after all retries." << Endl; - Cerr << status << Endl; - NYdb::NStatusHelpers::ThrowOnError(status); - } - Cerr << "Create request failed. Sleeping for " << delay << Endl; - Sleep(delay); - delay *= 2; - } -} +std::string JoinPath(const std::string& prefix, const std::string& path); std::string GenerateRandomString(std::uint32_t minLength, std::uint32_t maxLength); diff --git a/tests/slo_workloads/utils/utils_main.cpp b/tests/slo_workloads/utils/utils_main.cpp new file mode 100644 index 0000000000..53cf56dd24 --- /dev/null +++ b/tests/slo_workloads/utils/utils_main.cpp @@ -0,0 +1,186 @@ +#include "utils.h" + +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace NLastGetopt; +using namespace NYdb; +int DoMain(int argc, char** argv, TCreateCommand create, TRunCommand run, TCleanupCommand cleanup) { + TOpts opts = TOpts::Default(); + + std::string connectionString; + std::string prefix; + std::string token; + std::string tokenFile; + std::string iamSaKeyFile; + std::string statConfigFile; + std::string balancingPolicy; + + std::string defaultConnectionString = DefaultConnectionStringFromEnv(); + + auto& connOpt = opts.AddLongOption('c', "connection-string", "YDB connection string").RequiredArgument("SCHEMA://HOST:PORT/?DATABASE=DATABASE") + .StoreResult(&connectionString); + if (!defaultConnectionString.empty()) { + connOpt.DefaultValue(defaultConnectionString); + } else { + connOpt.Required(); + } + opts.AddLongOption('p', "prefix", "Base prefix for tables").RequiredArgument("PATH") + .StoreResult(&prefix); + opts.AddLongOption('k', "token", "security token").RequiredArgument("TOKEN") + .StoreResult(&token); + opts.AddLongOption('f', "token-file", "security token file").RequiredArgument("PATH") + .StoreResult(&tokenFile); + opts.AddLongOption("iam-sa-key-file", "IAM service account key file").RequiredArgument("SECRET") + .StoreResult(&iamSaKeyFile); + opts.AddLongOption('s', "stat-config", "statistics config file").Optional().RequiredArgument("PATH") + .StoreResult(&statConfigFile); + opts.AddLongOption('b', "balancing-policy", "Balancing policy").Optional().DefaultValue("use-all-nodes").RequiredArgument("(use-all-nodes|prefer-local-dc|prefer-primary-pile)") + .StoreResult(&balancingPolicy); + opts.AddHelpOption('h'); + opts.SetFreeArgsMin(0); + opts.SetFreeArgTitle(0, "", GetCmdList()); + opts.ArgPermutation_ = NLastGetopt::REQUIRE_ORDER; + // Run-phase options (--read-rps, --write-rps, …) reach DoMain when the + // caller invokes the workload without an explicit subcommand (the v2 SLO + // action contract). Tolerate them here so the global parser stops at the + // first unknown option instead of erroring; they are forwarded to the + // run phase below. + opts.AllowUnknownLongOptions_ = true; + + TOptsParseResult res(&opts, argc, argv); + size_t freeArgsPos = res.GetFreeArgsPos(); + argc -= freeArgsPos; + argv += freeArgsPos; + + ECommandType command = (argc > 0) ? ParseCommand(*argv) : ECommandType::All; + if (command == ECommandType::Unknown) { + if (argv[0][0] == '-') { + // First leftover token is an option, not a subcommand keyword: + // treat as implicit All mode and let the run phase parse it. + command = ECommandType::All; + } else { + Cerr << "Unknown command '" << *argv << "'" << Endl; + return EXIT_FAILURE; + } + } + + if (prefix.empty()) { + prefix = GetDatabase(connectionString); + } + if (prefix.empty()) { + // YDB SLO action sets YDB_CONNECTION_STRING in path form + // (grpc://host:port/Root/testdb), which GetDatabase can't parse. + // Fall back to YDB_DATABASE which the action sets alongside it. + prefix = GetEnv("YDB_DATABASE"); + } + + if (!ParseToken(token, tokenFile)) { + return EXIT_FAILURE; + } + + auto config = TDriverConfig(connectionString); + + if (!iamSaKeyFile.empty()) { + Cout << "Enabling IAM authentication..." << Endl; + TIamJwtFilename iamJwtFilename{ .JwtFilename = iamSaKeyFile }; + config.SetCredentialsProviderFactory(CreateIamJwtFileCredentialsProviderFactory(iamJwtFilename)); + } else if (!token.empty()) { + Cout << "Enabling OAuth authentication..." << Endl; + config.SetCredentialsProviderFactory(CreateOAuthCredentialsProviderFactory(token)); + } else { + Cerr << "Warning: No authentication methods provided." << Endl; + } + + if (balancingPolicy == "use-all-nodes") { + config.SetBalancingPolicy(TBalancingPolicy::UseAllNodes()); + } else if (balancingPolicy == "prefer-local-dc") { + config.SetBalancingPolicy(TBalancingPolicy::UsePreferableLocation()); + } else if (balancingPolicy == "prefer-primary-pile") { + config.SetBalancingPolicy(TBalancingPolicy::UsePreferablePileState()); + } else { + Cerr << "Unknown balancing policy: " << balancingPolicy << Endl; + return EXIT_FAILURE; + } + + TDriver driver(config); + + StartStatCollecting(driver, statConfigFile); + + TDatabaseOptions dbOptions{ driver, prefix }; + int result; + try { + switch (command) { + case ECommandType::Create: + Cout << "Launching create command..." << Endl; + result = create(dbOptions, argc, argv); + break; + case ECommandType::Run: + Cout << "Launching run command..." << Endl; + result = run(dbOptions, argc, argv); + break; + case ECommandType::Cleanup: + Cout << "Launching cleanup command..." << Endl; + result = cleanup(dbOptions, argc); + break; + case ECommandType::All: { + Cout << "Launching full lifecycle: create -> run -> cleanup" << Endl; + // Forward leftover argv to the run phase so options like + // --read-rps / --write-rps take effect. argv[0] here is the first + // run-phase option (no subcommand keyword was supplied), so + // prepend a synthetic program name for ParseOptionsRun. + char programName[] = "slo"; + std::vector runArgv; + runArgv.reserve(argc + 2); + runArgv.push_back(programName); + for (int i = 0; i < argc; ++i) { + runArgv.push_back(argv[i]); + } + runArgv.push_back(nullptr); + int fakeArgc = 1; + char* fakeArgv[] = { programName, nullptr }; + + Cout << "[all] Launching create command..." << Endl; + result = create(dbOptions, fakeArgc, fakeArgv); + if (!result) { + Cout << "[all] Launching run command..." << Endl; + result = run(dbOptions, static_cast(runArgv.size() - 1), runArgv.data()); + } + Cout << "[all] Launching cleanup command..." << Endl; + int cleanupRc = cleanup(dbOptions, fakeArgc); + // Cleanup runs while chaos-monkey is still killing nodes, so a + // DropTable failure here is expected noise and must not mask a + // successful run. Surface the run's status; only fall back to + // the cleanup status when run itself failed and we have nothing + // else to report. + if (cleanupRc && !result) { + Cerr << "[all] Warning: cleanup failed (exit " << cleanupRc + << ") but run succeeded; ignoring cleanup exit code." << Endl; + } else if (cleanupRc) { + Cerr << "[all] Warning: cleanup failed (exit " << cleanupRc + << "); preserving earlier run failure." << Endl; + } + break; + } + default: + Cerr << "Unknown command" << Endl; + return EXIT_FAILURE; + } + } + catch (const NYdb::NStatusHelpers::TYdbErrorException& e) { + Cerr << "Exception caught: " << e << Endl; + return EXIT_FAILURE; + } + driver.Stop(true); + return result; +} diff --git a/tests/slo_workloads/verify_userver_docker.sh b/tests/slo_workloads/verify_userver_docker.sh new file mode 100755 index 0000000000..aa9034c34d --- /dev/null +++ b/tests/slo_workloads/verify_userver_docker.sh @@ -0,0 +1,136 @@ +#!/usr/bin/env bash +# Smoke-test the userver SLO workload Docker image against a local YDB instance. +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/../.." && pwd)" +COMPOSE_FILE="${REPO_ROOT}/examples/otel_tracing/docker-compose.yml" +IMAGE_NAME="${IMAGE_NAME:-ydb-slo-userver-verify}" +PRESET="${PRESET:-release-test-clang}" +CONNECTION_STRING="${CONNECTION_STRING:-grpc://127.0.0.1:2136/?database=/local}" +YDB_IMAGE="${YDB_IMAGE:-cr.yandex/yc/yandex-docker-local-ydb:25.2.1}" +YDB_CONTAINER="${YDB_CONTAINER:-ydb-slo-userver-verify-ydb}" + +failures=0 +USE_COMPOSE=0 + +log() { + echo "[verify_userver_docker] $*" +} + +run_step() { + local name="$1" + shift + log "Running: ${name}" + if "$@"; then + log "PASS: ${name}" + else + log "FAIL: ${name} (exit $?)" + failures=$((failures + 1)) + fi +} + +have_docker_compose() { + docker compose version >/dev/null 2>&1 +} + +start_ydb() { + if have_docker_compose; then + USE_COMPOSE=1 + log "Starting local YDB via docker compose..." + docker compose -f "${COMPOSE_FILE}" up -d ydb + return + fi + + USE_COMPOSE=0 + log "Starting local YDB via docker run (docker compose not available)..." + docker rm -f "${YDB_CONTAINER}" >/dev/null 2>&1 || true + docker run -d --name "${YDB_CONTAINER}" --network host \ + -e GRPC_TLS_PORT=2135 \ + -e GRPC_PORT=2136 \ + -e MON_PORT=8765 \ + -e YDB_DEFAULT_LOG_LEVEL=NOTICE \ + -e YDB_USE_IN_MEMORY_PDISKS=true \ + "${YDB_IMAGE}" >/dev/null +} + +wait_for_ydb() { + log "Waiting for YDB to become healthy..." + for _ in $(seq 1 60); do + if [ "${USE_COMPOSE}" -eq 1 ]; then + if docker compose -f "${COMPOSE_FILE}" ps ydb 2>/dev/null | grep -q "(healthy)"; then + return 0 + fi + elif docker exec "${YDB_CONTAINER}" /bin/sh -c \ + "/ydb -e grpc://localhost:2136 -d /local scheme ls" >/dev/null 2>&1; then + return 0 + fi + sleep 2 + done + return 1 +} + +cleanup() { + log "Stopping local YDB..." + if [ "${USE_COMPOSE}" -eq 1 ]; then + docker compose -f "${COMPOSE_FILE}" stop ydb 2>/dev/null || true + else + docker rm -f "${YDB_CONTAINER}" >/dev/null 2>&1 || true + fi +} + +trap cleanup EXIT + +cd "${REPO_ROOT}" + +if [ "${SKIP_BUILD:-0}" != "1" ]; then + log "Building userver SLO Docker image (preset=${PRESET})..." + cp tests/slo_workloads/.dockerignore .dockerignore + docker build -t "${IMAGE_NAME}" \ + --network=host \ + --build-arg REF=local \ + --build-arg PRESET="${PRESET}" \ + -f tests/slo_workloads/Dockerfile.userver . + rm -f .dockerignore +else + log "Skipping Docker image build (SKIP_BUILD=1)..." +fi + +start_ydb + +if ! wait_for_ydb; then + log "FAIL: YDB did not become healthy in time" + if [ "${USE_COMPOSE}" -eq 1 ]; then + docker compose -f "${COMPOSE_FILE}" logs ydb || true + else + docker logs "${YDB_CONTAINER}" || true + fi + exit 1 +fi + +run_workload() { + docker run --rm --network host "${IMAGE_NAME}" \ + --connection-string "${CONNECTION_STRING}" "$@" +} + +run_step "create" \ + run_workload create --dont-push + +run_step "run" \ + run_workload run \ + --time 30 \ + --read-rps 50 \ + --write-rps 10 \ + --read-timeout 100 \ + --write-timeout 100 \ + --dont-push + +run_step "cleanup" \ + run_workload cleanup + +if [ "${failures}" -ne 0 ]; then + log "${failures} step(s) failed" + exit 1 +fi + +log "All steps passed"