From e912a9c2eaad481c2efa951d1ffcfb70dd5b21c1 Mon Sep 17 00:00:00 2001 From: ybenabou Date: Thu, 5 Mar 2026 13:09:47 +0200 Subject: [PATCH 1/4] add tuning option --- conf/experimental/test/deepep_standard.toml | 1 + src/cloudai/workloads/deepep/deepep.py | 1 + src/cloudai/workloads/nccl_test/nccl.py | 4 ++++ 3 files changed, 6 insertions(+) diff --git a/conf/experimental/test/deepep_standard.toml b/conf/experimental/test/deepep_standard.toml index a6451fff3..aa54fd184 100644 --- a/conf/experimental/test/deepep_standard.toml +++ b/conf/experimental/test/deepep_standard.toml @@ -39,5 +39,6 @@ num_warmups = 20 num_iterations = 50 shuffle_columns = false use_kineto_profiler = false +enable_tuning = true config_file_path = "/tmp/config.yaml" results_dir = "/workspace/dp-benchmark/results" diff --git a/src/cloudai/workloads/deepep/deepep.py b/src/cloudai/workloads/deepep/deepep.py index 5cea353a8..e976440c5 100644 --- a/src/cloudai/workloads/deepep/deepep.py +++ b/src/cloudai/workloads/deepep/deepep.py @@ -38,6 +38,7 @@ class DeepEPCmdArgs(CmdArgs): num_iterations: int = 50 shuffle_columns: bool = False use_kineto_profiler: bool = False + enable_tuning: bool = False num_sms: int = 24 num_qps_per_rank: int = 12 config_file_path: str = "/tmp/config.yaml" diff --git a/src/cloudai/workloads/nccl_test/nccl.py b/src/cloudai/workloads/nccl_test/nccl.py index c0381c5b0..7d00fcebb 100644 --- a/src/cloudai/workloads/nccl_test/nccl.py +++ b/src/cloudai/workloads/nccl_test/nccl.py @@ -29,6 +29,7 @@ class NCCLCmdArgs(CmdArgs): "all_reduce_perf_mpi", "all_gather_perf_mpi", "alltoall_perf_mpi", + "alltoallv_perf_mpi", "broadcast_perf_mpi", "gather_perf_mpi", "hypercube_perf_mpi", @@ -41,6 +42,7 @@ class NCCLCmdArgs(CmdArgs): "all_reduce_perf", "all_gather_perf", "alltoall_perf", + "alltoallv_perf", "broadcast_perf", "gather_perf", "hypercube_perf", @@ -55,6 +57,7 @@ class NCCLCmdArgs(CmdArgs): "all_reduce_perf_mpi", "all_gather_perf_mpi", "alltoall_perf_mpi", + "alltoallv_perf_mpi", "broadcast_perf_mpi", "gather_perf_mpi", "hypercube_perf_mpi", @@ -67,6 +70,7 @@ class NCCLCmdArgs(CmdArgs): "all_reduce_perf", "all_gather_perf", "alltoall_perf", + "alltoallv_perf", "broadcast_perf", "gather_perf", "hypercube_perf", From 1b85a6e3bad75226f0f3dd459c1b44fee5b3c483 Mon Sep 17 00:00:00 2001 From: ybenabou Date: Thu, 14 May 2026 16:51:29 +0300 Subject: [PATCH 2/4] add ucc/nccl all2allv --- conf/experimental/test/deepep_standard.toml | 4 +- .../test/nccl_test_alltoallv.toml | 34 +++ .../deepep_with_nccl_alltoallv.toml | 36 +++ .../deepep_with_ucc_alltoallv.toml | 34 +++ src/cloudai/registration.py | 2 + src/cloudai/workloads/deepep/__init__.py | 2 + .../deepep/deepep_combined_report.py | 63 +++++ .../deepep/deepep_moe_throughput_reporter.py | 261 ++++++++++++++++++ .../deepep/report_generation_strategy.py | 31 +-- .../deepep/slurm_command_gen_strategy.py | 29 +- src/cloudai/workloads/nccl_test/nccl.py | 2 + .../nccl_test/slurm_command_gen_strategy.py | 90 +++++- .../ucc_test/slurm_command_gen_strategy.py | 45 ++- src/cloudai/workloads/ucc_test/ucc.py | 1 + 14 files changed, 591 insertions(+), 43 deletions(-) create mode 100644 conf/experimental/test/nccl_test_alltoallv.toml create mode 100644 conf/experimental/test_scenario/deepep_with_nccl_alltoallv.toml create mode 100644 conf/experimental/test_scenario/deepep_with_ucc_alltoallv.toml create mode 100644 src/cloudai/workloads/deepep/deepep_combined_report.py create mode 100644 src/cloudai/workloads/deepep/deepep_moe_throughput_reporter.py diff --git a/conf/experimental/test/deepep_standard.toml b/conf/experimental/test/deepep_standard.toml index c821f461b..cfe8ea869 100644 --- a/conf/experimental/test/deepep_standard.toml +++ b/conf/experimental/test/deepep_standard.toml @@ -20,13 +20,12 @@ test_template_name = "DeepEP" [cmd_args] # Local .sqsh file: -# docker_image_url = "/.autodirect/mswg2/E2E/Regression_logs/squash/yoel/dp-benchmark-shuffle.sqsh" # Container registry (uses your Docker credentials): docker_image_url = "gitlab-master.nvidia.com/ybenabou/warehouse/deepep:dp-benchmark" mode = "standard" -tokens = 1024 +tokens = 4096 num_experts = 256 num_topk = 8 hidden_size = 7168 @@ -39,6 +38,5 @@ num_warmups = 20 num_iterations = 50 shuffle_columns = false use_kineto_profiler = false -enable_tuning = true config_file_path = "/tmp/config.yaml" results_dir = "/workspace/dp-benchmark/results" diff --git a/conf/experimental/test/nccl_test_alltoallv.toml b/conf/experimental/test/nccl_test_alltoallv.toml new file mode 100644 index 000000000..2062feb88 --- /dev/null +++ b/conf/experimental/test/nccl_test_alltoallv.toml @@ -0,0 +1,34 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name = "nccl_test_alltoallv" +description = "NCCL AlltoAllv" +test_template_name = "NcclTest" + +[cmd_args] +docker_image_url = "gitlab-master.nvidia.com/ybenabou/warehouse/deepep:dp-benchmark" +# Container provides /opt/nccl-tests/build/alltoallv_perf. +subtest_name = "alltoallv_perf_mpi" +nthreads = 1 +ngpus = 1 +minbytes = "512M" +maxbytes = "512M" +stepfactor = 2 +iters = 10 +warmup_iters = 1 +check = 1 +blocking = 0 +use_deepep_matrix = true diff --git a/conf/experimental/test_scenario/deepep_with_nccl_alltoallv.toml b/conf/experimental/test_scenario/deepep_with_nccl_alltoallv.toml new file mode 100644 index 000000000..111209b98 --- /dev/null +++ b/conf/experimental/test_scenario/deepep_with_nccl_alltoallv.toml @@ -0,0 +1,36 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name = "deepep-with-nccl-alltoallv" + +# First run the DeepEP benchmark which generates the traffic matrix +[[Tests]] +id = "Tests.deepep" +test_name = "deepep_standard" +num_nodes = 2 +nodes = ["dgx-gaia-55", "dgx-gaia-56"] +time_limit = "00:30:00" + +# Then run NCCL AlltoAllv test using the generated matrix +[[Tests]] +id = "Tests.nccl_alltoallv" +test_name = "nccl_alltoallv" +num_nodes = 2 +nodes = ["dgx-gaia-55", "dgx-gaia-56"] +time_limit = "00:30:00" + [[Tests.dependencies]] + type = "start_post_comp" + id = "Tests.deepep" diff --git a/conf/experimental/test_scenario/deepep_with_ucc_alltoallv.toml b/conf/experimental/test_scenario/deepep_with_ucc_alltoallv.toml new file mode 100644 index 000000000..0c58c8e32 --- /dev/null +++ b/conf/experimental/test_scenario/deepep_with_ucc_alltoallv.toml @@ -0,0 +1,34 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name = "deepep-with-ucc-alltoallv" + +# First run the DeepEP benchmark which generates the traffic matrix +[[Tests]] +id = "Tests.deepep" +test_name = "deepep_standard" +num_nodes = 2 +time_limit = "00:30:00" + +# Then run UCC AlltoAllv test using the generated matrix (auto-converted) +[[Tests]] +id = "Tests.ucc_alltoallv" +test_name = "ucc_alltoallv_deepep" +num_nodes = 2 +time_limit = "00:30:00" + [[Tests.dependencies]] + type = "start_post_comp" + id = "Tests.deepep" diff --git a/src/cloudai/registration.py b/src/cloudai/registration.py index 5ad03b8d0..94f22d6f8 100644 --- a/src/cloudai/registration.py +++ b/src/cloudai/registration.py @@ -82,6 +82,7 @@ def register_all(): DeepEPReportGenerationStrategy, DeepEPSlurmCommandGenStrategy, DeepEPTestDefinition, + DeepEPMoEThroughputReporter, ) from cloudai.workloads.jax_toolbox import ( GPTTestDefinition, @@ -301,6 +302,7 @@ def register_all(): Registry().add_report(VllmTestDefinition, VLLMBenchReportGenerationStrategy) Registry().add_scenario_report("per_test", PerTestReporter, ReportConfig(enable=True)) + Registry().add_scenario_report("deepep_moe_throughput", DeepEPMoEThroughputReporter, ReportConfig(enable=True)) Registry().add_scenario_report("status", StatusReporter, ReportConfig(enable=True)) Registry().add_scenario_report("dse", DSEReporter, ReportConfig(enable=True)) Registry().add_scenario_report("tarball", TarballReporter, ReportConfig(enable=True)) diff --git a/src/cloudai/workloads/deepep/__init__.py b/src/cloudai/workloads/deepep/__init__.py index f145a3b3a..a559eaa79 100644 --- a/src/cloudai/workloads/deepep/__init__.py +++ b/src/cloudai/workloads/deepep/__init__.py @@ -15,11 +15,13 @@ # limitations under the License. from .deepep import DeepEPCmdArgs, DeepEPTestDefinition +from .deepep_moe_throughput_reporter import DeepEPMoEThroughputReporter from .report_generation_strategy import DeepEPReportGenerationStrategy from .slurm_command_gen_strategy import DeepEPSlurmCommandGenStrategy __all__ = [ "DeepEPCmdArgs", + "DeepEPMoEThroughputReporter", "DeepEPReportGenerationStrategy", "DeepEPSlurmCommandGenStrategy", "DeepEPTestDefinition", diff --git a/src/cloudai/workloads/deepep/deepep_combined_report.py b/src/cloudai/workloads/deepep/deepep_combined_report.py new file mode 100644 index 000000000..a29504048 --- /dev/null +++ b/src/cloudai/workloads/deepep/deepep_combined_report.py @@ -0,0 +1,63 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""DeepEP dependency helpers for Slurm UCC/NCCL.""" + +from __future__ import annotations + +from pathlib import Path + +from cloudai.core import TestRun + +DEEPEP_PREV_MOUNT = "/cloudai_deepep_prev" + + +def start_post_comp_chain(test_run: TestRun) -> list[TestRun]: + """Follow ``start_post_comp`` (e.g. UCC → NCCL → DeepEP).""" + dep = test_run.dependencies.get("start_post_comp") + if dep is None: + return [] + chain: list[TestRun] = [] + seen: set[int] = set() + cur: TestRun | None = dep.test_run + while cur is not None and id(cur) not in seen: + seen.add(id(cur)) + chain.append(cur) + nxt = cur.dependencies.get("start_post_comp") + cur = nxt.test_run if nxt else None + return chain + + +def _has_ucc_matrix_under(root: Path) -> bool: + if (root / "ucc_matrix.txt").is_file(): + return True + return any(root.glob("**/ucc_matrix.txt")) + + +def deepep_benchmark_root(test_run: TestRun) -> Path | None: + """DeepEP job directory (``ucc_matrix`` or BENCHMARK stdout), walking ``start_post_comp``.""" + for tr in start_post_comp_chain(test_run): + root = tr.output_path + if _has_ucc_matrix_under(root): + return root + st = root / "stdout.txt" + if st.is_file(): + try: + if "BENCHMARK: DeepEP Results" in st.read_text(errors="replace")[:250000]: + return root + except OSError: + continue + return None + + +def deepep_results_json_files(test_output_path: Path) -> list[Path]: + """All ``results.json`` paths under ``results/benchmark_*`` or top-level ``benchmark_*``.""" + found: list[Path] = [] + for pattern in ("results/benchmark_*_ranks_*", "benchmark_*_ranks_*"): + for d in sorted(test_output_path.glob(pattern)): + if d.is_dir(): + rj = d / "results.json" + if rj.is_file(): + found.append(rj) + return found diff --git a/src/cloudai/workloads/deepep/deepep_moe_throughput_reporter.py b/src/cloudai/workloads/deepep/deepep_moe_throughput_reporter.py new file mode 100644 index 000000000..7bef381f9 --- /dev/null +++ b/src/cloudai/workloads/deepep/deepep_moe_throughput_reporter.py @@ -0,0 +1,261 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Scenario-level MoE throughput summary: standalone SVG file.""" + +from __future__ import annotations + +import html +import json +import logging +import re +from pathlib import Path + +from cloudai._core.base_reporter import Reporter +from cloudai.workloads.deepep.deepep import DeepEPTestDefinition +from cloudai.workloads.deepep.deepep_combined_report import deepep_results_json_files +from cloudai.workloads.nccl_test.nccl import NCCLTestDefinition +from cloudai.workloads.nccl_test.performance_report_generation_strategy import extract_nccl_data +from cloudai.workloads.ucc_test.ucc import UCCTestDefinition + + +def _deepep_dispatch_combine_bars(test_output: Path) -> list[tuple[str, float, str]]: + """From latest ``results.json``: one bar per ``dispatch`` / ``combine`` row (``bus_bw_avg``).""" + paths = deepep_results_json_files(test_output) + if not paths: + return [] + latest = max(paths, key=lambda p: p.stat().st_mtime) + try: + rows = json.loads(latest.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError) as e: + logging.debug("DeepEP results.json unreadable %s: %s", latest, e) + return [] + if not isinstance(rows, list): + return [] + + by_op: dict[str, float] = {} + for row in rows: + if not isinstance(row, dict): + continue + op = row.get("operation") + if not isinstance(op, str) or "bus_bw_avg" not in row: + continue + op_l = op.lower() + if op_l not in ("dispatch", "combine"): + continue + try: + by_op[op_l] = float(row["bus_bw_avg"]) + except (TypeError, ValueError): + continue + + out: list[tuple[str, float, str]] = [] + # Stable order: dispatch then combine, only if present in JSON + if "dispatch" in by_op: + out.append(("DeepEP dispatch", by_op["dispatch"], "#2ca02c")) + if "combine" in by_op: + out.append(("DeepEP combine", by_op["combine"], "#31a354")) + return out + + +def _mean_ucc_bus_bw_gb_s(test_output: Path) -> float | None: + for name in ("stdout.txt", "ucc_perftest_capture.log"): + path = test_output / name + if not path.is_file(): + continue + v = _parse_ucc_perftest_mean_bus_avg(path) + if v is not None: + return v + return None + + +def _parse_ucc_perftest_mean_bus_avg(path: Path) -> float | None: + """Mean of ``Bus Bandwidth … avg`` column over numeric data rows (8 fields).""" + try: + text = path.read_text(encoding="utf-8", errors="replace") + except OSError: + return None + avgs: list[float] = [] + for line in text.splitlines(): + parts = re.split(r"\s+", line.strip()) + if len(parts) != 8: + continue + if not parts[0].isdigit() or not parts[1].isdigit(): + continue + try: + sz = float(parts[1]) + bavg = float(parts[5]) + except ValueError: + continue + if sz < 1048576: + continue + avgs.append(bavg) + if not avgs: + return None + return float(sum(avgs) / len(avgs)) + + +def _mean_nccl_oop_busbw_gb_s(test_output: Path) -> float | None: + rows, _, _, _ = extract_nccl_data(test_output / "stdout.txt") + if not rows: + return None + vals: list[float] = [] + for parts in rows: + try: + vals.append(float(parts[7])) + except (IndexError, ValueError): + continue + if not vals: + return None + return float(sum(vals) / len(vals)) + + +def _write_moe_throughput_svg( + path: Path, + *, + scenario_name: str, + labels: list[str], + values: list[float], + colors: list[str], + y_axis_label: str, +) -> None: + """Bar chart + value markers; standalone SVG.""" + n = len(labels) + ml, mr, mt = 72, 44, 72 + ih = 300 + mb = max(100, 36 + n * 18) + h = mt + ih + mb + w = max(720, min(1280, ml + mr + max(1, n) * 92)) + + iw = w - ml - mr + y0 = mt + ih + vmin, vmax = 0.0, max(values) * 1.12 if values else 1.0 + if vmax <= vmin: + vmax = vmin + 1.0 + + def ypx(v: float) -> float: + return y0 - (v - vmin) / (vmax - vmin) * ih + + slot = iw / max(n, 1) + bar_w = min(56.0, slot * 0.55) + centers = [ml + (i + 0.5) * slot for i in range(n)] + pts = [(cx, ypx(v)) for cx, v in zip(centers, values, strict=True)] + + parts: list[str] = [ + '', + f'', + '', + f'{html.escape(scenario_name)}', + f'{html.escape(y_axis_label)}', + f'', + f'', + ] + + for g in (0.25, 0.5, 0.75): + gy = y0 - g * ih + parts.append( + f'' + ) + gv = vmin + g * (vmax - vmin) + parts.append( + f'{gv:.1f}' + ) + + for cx, val, col, lab in zip(centers, values, colors, labels, strict=True): + top = ypx(val) + x1 = cx - bar_w / 2 + hbar = y0 - top + parts.append( + f'' + ) + + for (cx, cy), val, col, lab in zip(pts, values, colors, labels, strict=True): + parts.append( + f'' + ) + parts.append( + f'{val:.2f}' + ) + parts.append( + f'' + f"{html.escape(lab)}" + ) + + parts.append( + f'{html.escape(y_axis_label)}' + ) + + leg_y = y0 + 38 + parts.append(f'Summary') + for i, (lab, val, col) in enumerate(zip(labels, values, colors, strict=True)): + parts.append( + f'' + f'{html.escape(lab)}' + f": {val:.4f} GB/s" + ) + + parts.append("") + + path.write_text("\n".join(parts), encoding="utf-8") + + +class DeepEPMoEThroughputReporter(Reporter): + """After the scenario finishes, write one standalone SVG chart under the results root.""" + + def generate(self) -> None: + self.load_test_runs() + deepep_trs = [tr for tr in self.trs if isinstance(tr.test, DeepEPTestDefinition)] + if not deepep_trs: + logging.debug("Skipping deepep_moe_throughput: no DeepEP test in scenario.") + return + + categories: list[str] = [] + values: list[float] = [] + colors: list[str] = [] + + deepep_bars = _deepep_dispatch_combine_bars(deepep_trs[0].output_path) + if not deepep_bars: + logging.warning( + "Skipping deepep_moe_throughput: no dispatch/combine bus_bw_avg in DeepEP results.json under %s", + deepep_trs[0].output_path, + ) + return + for lab, val, col in deepep_bars: + categories.append(lab) + values.append(val) + colors.append(col) + + ucc_trs = [tr for tr in self.trs if isinstance(tr.test, UCCTestDefinition)] + if ucc_trs: + uval = _mean_ucc_bus_bw_gb_s(ucc_trs[0].output_path) + if uval is not None: + categories.append("UCC") + values.append(uval) + colors.append("#1f77b4") + else: + logging.debug("UCC test present but bus bandwidth not parsed from outputs.") + + nccl_trs = [tr for tr in self.trs if isinstance(tr.test, NCCLTestDefinition)] + if nccl_trs: + nval = _mean_nccl_oop_busbw_gb_s(nccl_trs[0].output_path) + if nval is not None: + categories.append("NCCL") + values.append(nval) + colors.append("#ff7f0e") + else: + logging.debug("NCCL test present but perf table not parsed from stdout.") + + out = self.results_root / f"{self.test_scenario.name}-moe-throughput.svg" + _write_moe_throughput_svg( + out, + scenario_name=self.test_scenario.name, + labels=categories, + values=values, + colors=colors, + y_axis_label="Mean bus bandwidth (GB/s)", + ) + logging.info("Generated MoE throughput comparison at %s", out) diff --git a/src/cloudai/workloads/deepep/report_generation_strategy.py b/src/cloudai/workloads/deepep/report_generation_strategy.py index fc33cd741..91a621496 100644 --- a/src/cloudai/workloads/deepep/report_generation_strategy.py +++ b/src/cloudai/workloads/deepep/report_generation_strategy.py @@ -25,6 +25,7 @@ from cloudai.core import ReportGenerationStrategy from cloudai.report_generator.tool.csv_report_tool import CSVReportTool from cloudai.util.lazy_imports import lazy +from cloudai.workloads.deepep.deepep_combined_report import deepep_results_json_files if TYPE_CHECKING: import pandas as pd @@ -40,34 +41,18 @@ def can_handle_directory(self) -> bool: Returns: bool: True if directory contains DeepEP results. """ - # Check for results subdirectories created by DeepEP directory_path = self.test_run.output_path - matching_dirs = list(directory_path.glob("results/benchmark_*_ranks_*")) - - if matching_dirs: - # Check if any of them has results.json - for result_dir in matching_dirs: - if (result_dir / "results.json").exists(): - return True - - return False + return bool(deepep_results_json_files(directory_path)) def generate_report(self) -> None: """Generate a report from DeepEP benchmark results.""" directory_path = self.test_run.output_path test_name = self.test_run.test.name - results_dirs = list(directory_path.glob("results/benchmark_*_ranks_*")) - - if not results_dirs: - return - all_results = [] - for result_dir in results_dirs: - results_json = result_dir / "results.json" - if not results_json.exists(): - continue + for results_json in deepep_results_json_files(directory_path): + result_dir = results_json.parent try: with open(results_json, "r") as f: @@ -76,6 +61,9 @@ def generate_report(self) -> None: logging.debug(f"Error parsing {results_json}: {e}") continue + if not isinstance(results_data, list): + continue + match = re.match(r"benchmark_(\d+)_ranks_(.+?)_(low_latency|standard)", result_dir.name) num_ranks, timestamp, mode = 0, "unknown", "unknown" if match: @@ -84,6 +72,8 @@ def generate_report(self) -> None: mode = match.group(3) for result in results_data: + if not isinstance(result, dict): + continue result["num_ranks"] = num_ranks result["timestamp"] = timestamp result["mode"] = mode @@ -99,6 +89,9 @@ def generate_report(self) -> None: "num_tokens", "hidden", "deepep_time", + "bus_bw_avg", + "bus_bw_min", + "bus_bw_max", "global_bw", "simple_rdma_bw", "simple_nvl_bw", diff --git a/src/cloudai/workloads/deepep/slurm_command_gen_strategy.py b/src/cloudai/workloads/deepep/slurm_command_gen_strategy.py index 60bbf3000..3dc021900 100644 --- a/src/cloudai/workloads/deepep/slurm_command_gen_strategy.py +++ b/src/cloudai/workloads/deepep/slurm_command_gen_strategy.py @@ -27,7 +27,7 @@ class DeepEPSlurmCommandGenStrategy(SlurmCommandGenStrategy): def _append_head_node_detection(self, batch_script_content: List[str]) -> None: """ - Append bash commands to detect head node IP for torchrun. + Append bash commands to detect head node IP. Args: batch_script_content: The list of script lines to append to. @@ -44,6 +44,9 @@ def _append_head_node_detection(self, batch_script_content: List[str]) -> None: "echo Num Nodes: ${#nodes[@]}", "echo Head Node IP: $head_node_ip", "", + "export MASTER_ADDR=$head_node_ip", + "export MASTER_PORT=29500", + "", ] ) @@ -66,7 +69,7 @@ def _container_mounts(self) -> List[str]: self._generate_config_yaml(config_file_path, cmd_args) mounts = [ - f"{config_file_path.parent.absolute()}:{config_file_path.parent.absolute()}", + f"{config_file_path.absolute()}:{cmd_args.config_file_path}", f"{self.test_run.output_path.absolute()}:{cmd_args.results_dir}", ] @@ -87,23 +90,12 @@ def generate_test_command(self) -> List[str]: else: benchmark_script = "/workspace/dp-benchmark/benchmark/benchmark_ll.py" - _, nodes = self.system.get_nodes_by_spec(self.test_run.nnodes, self.test_run.nodes) - num_nodes = len(nodes) if nodes else self.test_run.nnodes - - config_file_path = self.test_run.output_path / "config.yaml" - command_parts = [ - "torchrun", - f"--nnodes={num_nodes}", - "--nproc_per_node=1", - "--rdzv_id=$RANDOM", - "--rdzv_backend=c10d", - "--rdzv_endpoint=$head_node_ip:29500", + return [ + "python", benchmark_script, - str(config_file_path.absolute()), + cmd_args.config_file_path, ] - return command_parts - def _generate_config_yaml(self, config_path: Path, cmd_args: DeepEPCmdArgs) -> None: """ Generate YAML configuration file for DeepEP benchmark. @@ -136,4 +128,7 @@ def _generate_config_yaml(self, config_path: Path, cmd_args: DeepEPCmdArgs) -> N def gen_srun_success_check(self) -> str: """Check if DeepEP benchmark completed successfully.""" output_file = self.test_run.output_path / "stdout.txt" - return f'grep -q "global_bw\\|deepep_time" {output_file} && echo 1 || echo 0' + return ( + 'grep -Eq "global_bw|RDMA BW \\(GB/s\\)|NVLink BW \\(GB/s\\)|Bus BW \\(GB/s\\)|Global BW \\(GB/s\\)" ' + f'{output_file} && echo 1 || echo 0' + ) diff --git a/src/cloudai/workloads/nccl_test/nccl.py b/src/cloudai/workloads/nccl_test/nccl.py index 7d00fcebb..2968cf2af 100644 --- a/src/cloudai/workloads/nccl_test/nccl.py +++ b/src/cloudai/workloads/nccl_test/nccl.py @@ -101,6 +101,8 @@ class NCCLCmdArgs(CmdArgs): blocking: Union[int, list[int]] = 0 cudagraph: Union[int, list[int]] = 0 stepfactor: Optional[Union[int, list[int]]] = None + use_deepep_matrix: bool = False + alltoallv_matrix_container_path: str = "/tmp/traffic_matrix.txt" class NCCLTestDefinition(TestDefinition): diff --git a/src/cloudai/workloads/nccl_test/slurm_command_gen_strategy.py b/src/cloudai/workloads/nccl_test/slurm_command_gen_strategy.py index 1295187e0..80795a07c 100644 --- a/src/cloudai/workloads/nccl_test/slurm_command_gen_strategy.py +++ b/src/cloudai/workloads/nccl_test/slurm_command_gen_strategy.py @@ -14,18 +14,77 @@ # See the License for the specific language governing permissions and # limitations under the License. +from pathlib import Path from typing import List, cast from cloudai.systems.slurm import SlurmCommandGenStrategy +from cloudai.workloads.deepep.deepep_combined_report import DEEPEP_PREV_MOUNT, deepep_benchmark_root -from .nccl import NCCLTestDefinition +from .nccl import NCCLCmdArgs, NCCLTestDefinition + +_ALLTOALLV_MATRIX_ENV = "ALLTOALLV_MATRIX_FILE" +_NCCL_TESTS_ALLTOALLV_PERF = "/opt/nccl-tests/build/alltoallv_perf" + + +def _nccl_cmd_scalar(value: object) -> object: + if isinstance(value, list): + return value[0] if value else value + return value + + +def _nccl_matrix_path_under_deepep_output(dep_out: Path) -> Path | None: + """DeepEP writes nccl_matrix.txt under the dependency test output or a timestamped benchmark subdir.""" + direct = dep_out / "nccl_matrix.txt" + if direct.is_file(): + return direct + nested = sorted( + dep_out.glob("**/nccl_matrix.txt"), + key=lambda p: p.stat().st_mtime, + reverse=True, + ) + return nested[0] if nested else None class NcclTestSlurmCommandGenStrategy(SlurmCommandGenStrategy): """Command generation strategy for NCCL tests on Slurm systems.""" + def _deepep_nccl_matrix_host_path(self) -> Path | None: + tdef: NCCLTestDefinition = cast(NCCLTestDefinition, self.test_run.test) + if not tdef.cmd_args.use_deepep_matrix: + return None + root = deepep_benchmark_root(self.test_run) + if root is None: + return None + return _nccl_matrix_path_under_deepep_output(root) + def _container_mounts(self) -> List[str]: - return [] + tdef: NCCLTestDefinition = cast(NCCLTestDefinition, self.test_run.test) + if not tdef.cmd_args.use_deepep_matrix: + return [] + + matrix_host = self._deepep_nccl_matrix_host_path() + if matrix_host is None: + return [] + + dest = tdef.cmd_args.alltoallv_matrix_container_path + mounts: List[str] = [f"{matrix_host.resolve()}:{dest}"] + + dr = deepep_benchmark_root(self.test_run) + if dr is not None: + mounts.append(f"{dr.resolve()}:{DEEPEP_PREV_MOUNT}:ro") + return mounts + + @property + def final_env_vars(self) -> dict[str, str | list[str]]: + env_vars = dict(super().final_env_vars) + tdef: NCCLTestDefinition = cast(NCCLTestDefinition, self.test_run.test) + if tdef.cmd_args.use_deepep_matrix and self._deepep_nccl_matrix_host_path() is not None: + env_vars[_ALLTOALLV_MATRIX_ENV] = tdef.cmd_args.alltoallv_matrix_container_path + return env_vars + + @final_env_vars.setter + def final_env_vars(self, value: dict[str, str | list[str]]) -> None: + super().final_env_vars = value def image_path(self) -> str | None: tdef: NCCLTestDefinition = cast(NCCLTestDefinition, self.test_run.test) @@ -33,10 +92,35 @@ def image_path(self) -> str | None: def generate_test_command(self) -> List[str]: tdef: NCCLTestDefinition = cast(NCCLTestDefinition, self.test_run.test) + if tdef.cmd_args.subtest_name == "alltoallv_perf_mpi": + a = tdef.cmd_args + parts: List[str] = [ + _NCCL_TESTS_ALLTOALLV_PERF, + "-b", + str(_nccl_cmd_scalar(a.minbytes)), + "-e", + str(_nccl_cmd_scalar(a.maxbytes)), + "-g", + str(_nccl_cmd_scalar(a.ngpus)), + "-w", + str(_nccl_cmd_scalar(a.warmup_iters)), + "-n", + str(_nccl_cmd_scalar(a.iters)), + ] + if self.test_run.test.extra_cmd_args: + parts.append(self.test_run.test.extra_args_str) + return parts + srun_command_parts = [f"{tdef.cmd_args.subtest_name}"] + skip_cli = { + "docker_image_url", + "subtest_name", + "use_deepep_matrix", + "alltoallv_matrix_container_path", + } nccl_test_args = tdef.cmd_args.model_dump().keys() for arg in nccl_test_args: - if arg in {"docker_image_url", "subtest_name"}: + if arg in skip_cli: continue value = getattr(tdef.cmd_args, arg) diff --git a/src/cloudai/workloads/ucc_test/slurm_command_gen_strategy.py b/src/cloudai/workloads/ucc_test/slurm_command_gen_strategy.py index 80e1a34c7..43f0df435 100644 --- a/src/cloudai/workloads/ucc_test/slurm_command_gen_strategy.py +++ b/src/cloudai/workloads/ucc_test/slurm_command_gen_strategy.py @@ -14,18 +14,59 @@ # See the License for the specific language governing permissions and # limitations under the License. +from pathlib import Path from typing import List, cast from cloudai.systems.slurm import SlurmCommandGenStrategy +from cloudai.workloads.deepep.deepep_combined_report import DEEPEP_PREV_MOUNT, deepep_benchmark_root from .ucc import UCCCmdArgs, UCCTestDefinition +_UCC_GEN_MATRIX_CONTAINER = "/opt/hpcx/ucc/tools/perf/generator/input_matrices.txt" + + +def _ucc_matrix_path_under_deepep_output(dep_out: Path) -> Path | None: + """DeepEP writes ucc_matrix.txt under a timestamped benchmark subdir; resolve either layout.""" + direct = dep_out / "ucc_matrix.txt" + if direct.is_file(): + return direct + nested = sorted( + dep_out.glob("**/ucc_matrix.txt"), + key=lambda p: p.stat().st_mtime, + reverse=True, + ) + return nested[0] if nested else None + class UCCTestSlurmCommandGenStrategy(SlurmCommandGenStrategy): """Command generation strategy for UCC tests on Slurm systems.""" + def _deepep_ucc_matrix_host_path(self) -> Path | None: + tdef: UCCTestDefinition = cast(UCCTestDefinition, self.test_run.test) + if not tdef.cmd_args.use_deepep_matrix: + return None + dep_out = deepep_benchmark_root(self.test_run) + if dep_out is None: + return None + return _ucc_matrix_path_under_deepep_output(dep_out) + def _container_mounts(self) -> List[str]: - return [] + tdef: UCCTestDefinition = cast(UCCTestDefinition, self.test_run.test) + if not tdef.cmd_args.use_deepep_matrix: + return [] + + deepep_root = deepep_benchmark_root(self.test_run) + if deepep_root is None: + return [] + + matrix_host = self._deepep_ucc_matrix_host_path() + if matrix_host is None: + return [] + + return [ + f"{matrix_host.resolve()}:{_UCC_GEN_MATRIX_CONTAINER}", + f"{deepep_root.resolve()}:{DEEPEP_PREV_MOUNT}:ro", + ] def image_path(self) -> str | None: tdef: UCCTestDefinition = cast(UCCTestDefinition, self.test_run.test) @@ -41,6 +82,8 @@ def generate_test_command(self) -> List[str]: srun_command_parts.append(f"-e {tdef_cmd_args.e}") if tdef_cmd_args.gen is not None: srun_command_parts.append(f"--gen {tdef_cmd_args.gen}") + elif self._deepep_ucc_matrix_host_path() is not None: + srun_command_parts.append(f"--gen file:name={_UCC_GEN_MATRIX_CONTAINER}") srun_command_parts.append("-m cuda") srun_command_parts.append("-F") diff --git a/src/cloudai/workloads/ucc_test/ucc.py b/src/cloudai/workloads/ucc_test/ucc.py index 982387937..35c723a4c 100644 --- a/src/cloudai/workloads/ucc_test/ucc.py +++ b/src/cloudai/workloads/ucc_test/ucc.py @@ -69,6 +69,7 @@ class UCCCmdArgs(CmdArgs): b: Union[int, list[int]] = 1 e: Union[str, list[str]] = "8M" gen: Union[str, list[str], None] = None + use_deepep_matrix: bool = False class UCCTestDefinition(TestDefinition): From 72a6f840d2adc31ff78a99da861de13c08ba48f1 Mon Sep 17 00:00:00 2001 From: ybenabou Date: Tue, 2 Jun 2026 17:02:04 +0300 Subject: [PATCH 3/4] add deepep v1/v2 and first version of the moe-benchmark --- .gitignore | 1 + .../experimental/test/deepep_low_latency.toml | 43 ---- conf/experimental/test/deepep_standard.toml | 42 ---- conf/experimental/test/deepep_test_ep_v2.toml | 27 ++ .../test/deepep_test_internode.toml | 19 ++ .../test/deepep_test_intranode.toml | 18 ++ .../test/deepep_test_low_latency.toml | 22 ++ .../test/moe_benchmark_low_latency.toml | 32 +++ .../test/moe_benchmark_standard.toml | 35 +++ .../test/nccl_test_alltoallv.toml | 7 +- .../ucc_alltoallv_deepep.toml} | 38 +-- conf/experimental/test_scenario/deepep.toml | 29 --- .../test_scenario/deepep_official.toml | 11 + .../deepep_with_nccl_alltoallv.toml | 36 --- .../test_scenario/moe_benchmark.toml | 29 +++ src/cloudai/registration.py | 14 +- src/cloudai/workloads/deepep/__init__.py | 4 - src/cloudai/workloads/deepep/deepep.py | 86 ++++--- .../deepep/slurm_command_gen_strategy.py | 236 +++++++++++------- .../workloads/moe_benchmark/__init__.py | 16 ++ .../combined_report.py} | 12 +- .../workloads/moe_benchmark/moe_benchmark.py | 68 +++++ .../report_generation_strategy.py | 38 +-- .../slurm_command_gen_strategy.py | 90 +++++++ .../throughput_reporter.py} | 65 ++--- .../nccl_test/slurm_command_gen_strategy.py | 8 +- .../ucc_test/slurm_command_gen_strategy.py | 8 +- tests/test_acceptance.py | 22 +- tests/test_init.py | 10 + tests/test_test_scenario.py | 7 +- 30 files changed, 681 insertions(+), 392 deletions(-) delete mode 100644 conf/experimental/test/deepep_low_latency.toml delete mode 100644 conf/experimental/test/deepep_standard.toml create mode 100644 conf/experimental/test/deepep_test_ep_v2.toml create mode 100644 conf/experimental/test/deepep_test_internode.toml create mode 100644 conf/experimental/test/deepep_test_intranode.toml create mode 100644 conf/experimental/test/deepep_test_low_latency.toml create mode 100644 conf/experimental/test/moe_benchmark_low_latency.toml create mode 100644 conf/experimental/test/moe_benchmark_standard.toml rename conf/experimental/{test_scenario/deepep_with_ucc_alltoallv.toml => test/ucc_alltoallv_deepep.toml} (53%) delete mode 100644 conf/experimental/test_scenario/deepep.toml create mode 100644 conf/experimental/test_scenario/deepep_official.toml delete mode 100644 conf/experimental/test_scenario/deepep_with_nccl_alltoallv.toml create mode 100644 conf/experimental/test_scenario/moe_benchmark.toml create mode 100644 src/cloudai/workloads/moe_benchmark/__init__.py rename src/cloudai/workloads/{deepep/deepep_combined_report.py => moe_benchmark/combined_report.py} (80%) create mode 100644 src/cloudai/workloads/moe_benchmark/moe_benchmark.py rename src/cloudai/workloads/{deepep => moe_benchmark}/report_generation_strategy.py (66%) create mode 100644 src/cloudai/workloads/moe_benchmark/slurm_command_gen_strategy.py rename src/cloudai/workloads/{deepep/deepep_moe_throughput_reporter.py => moe_benchmark/throughput_reporter.py} (76%) diff --git a/.gitignore b/.gitignore index 9b55990fb..0a75961e3 100644 --- a/.gitignore +++ b/.gitignore @@ -87,6 +87,7 @@ ehthumbs.db Thumbs.db *.log +slurm-* install/ results/ .* diff --git a/conf/experimental/test/deepep_low_latency.toml b/conf/experimental/test/deepep_low_latency.toml deleted file mode 100644 index b76342b4e..000000000 --- a/conf/experimental/test/deepep_low_latency.toml +++ /dev/null @@ -1,43 +0,0 @@ -# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES -# Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -name = "deepep_low_latency" -description = "DeepEP MoE Benchmark - Low Latency Mode" -test_template_name = "DeepEP" - -[cmd_args] -# Local .sqsh file: -# docker_image_url = "/.autodirect/mswg2/E2E/Regression_logs/squash/yoel/dp-benchmark-shuffle.sqsh" -# Container registry: -docker_image_url = "gitlab-master.nvidia.com/ybenabou/warehouse/deepep:dp-benchmark" - -mode = "low_latency" - -tokens = 128 -num_experts = 256 -num_topk = 1 -hidden_size = 7168 -data_type = "bfloat16" -allow_nvlink_for_low_latency = false -allow_mnnvl = false -round_scale = false -use_ue8m0 = false -num_warmups = 20 -num_iterations = 50 -shuffle_columns = false -use_kineto_profiler = false -config_file_path = "/tmp/config.yaml" -results_dir = "/workspace/dp-benchmark/results" diff --git a/conf/experimental/test/deepep_standard.toml b/conf/experimental/test/deepep_standard.toml deleted file mode 100644 index cfe8ea869..000000000 --- a/conf/experimental/test/deepep_standard.toml +++ /dev/null @@ -1,42 +0,0 @@ -# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES -# Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -name = "deepep_standard" -description = "DeepEP MoE Benchmark - Standard Mode" -test_template_name = "DeepEP" - -[cmd_args] -# Local .sqsh file: -# Container registry (uses your Docker credentials): -docker_image_url = "gitlab-master.nvidia.com/ybenabou/warehouse/deepep:dp-benchmark" - -mode = "standard" - -tokens = 4096 -num_experts = 256 -num_topk = 8 -hidden_size = 7168 -data_type = "bfloat16" -allow_nvlink_for_low_latency = false -allow_mnnvl = false -round_scale = false -use_ue8m0 = false -num_warmups = 20 -num_iterations = 50 -shuffle_columns = false -use_kineto_profiler = false -config_file_path = "/tmp/config.yaml" -results_dir = "/workspace/dp-benchmark/results" diff --git a/conf/experimental/test/deepep_test_ep_v2.toml b/conf/experimental/test/deepep_test_ep_v2.toml new file mode 100644 index 000000000..5f164fecb --- /dev/null +++ b/conf/experimental/test/deepep_test_ep_v2.toml @@ -0,0 +1,27 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +name = "deepep_test_ep_v2" +description = "Official DeepEP V2 elastic test_ep" +test_template_name = "DeepEP" + +[cmd_args] +docker_image_url = "/your/path/to/the/container" +subtest_name = "test_ep" +elastic_tests_root = "/path/in/the/container/to/the/tests/folder" +num_processes = 8 +num_sms = 0 +num_qps = 0 +num_allocated_qps = 0 +num_tokens = 4096 +hidden = 7168 +num_topk = 8 +num_experts = 256 +do_cpu_sync = 1 +allow_hybrid_mode = 1 +allow_multiple_reduction = 1 +prefer_overlap_with_compute = 0 +seed = 0 +skip_check = false +skip_perf_test = false diff --git a/conf/experimental/test/deepep_test_internode.toml b/conf/experimental/test/deepep_test_internode.toml new file mode 100644 index 000000000..ab569ef12 --- /dev/null +++ b/conf/experimental/test/deepep_test_internode.toml @@ -0,0 +1,19 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +name = "deepep_test_internode" +description = "Official DeepEP V1 legacy test_internode" +test_template_name = "DeepEP" + +[cmd_args] +docker_image_url = "/your/path/to/the/container" +subtest_name = "test_internode" +legacy_tests_root = "/path/in/the/container/to/the/tests/folder" +num_processes = 8 +num_tokens = 4096 +hidden = 7168 +num_topk = 8 +num_experts = 256 +pressure_test_mode = 0 +test_ll_compatibility = false diff --git a/conf/experimental/test/deepep_test_intranode.toml b/conf/experimental/test/deepep_test_intranode.toml new file mode 100644 index 000000000..f8d68e0b2 --- /dev/null +++ b/conf/experimental/test/deepep_test_intranode.toml @@ -0,0 +1,18 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +name = "deepep_test_intranode" +description = "Official DeepEP V1 legacy test_intranode" +test_template_name = "DeepEP" + +[cmd_args] +docker_image_url = "/your/path/to/the/container" +subtest_name = "test_intranode" +legacy_tests_root = "/path/in/the/container/to/the/tests/folder" +num_processes = 8 +num_tokens = 4096 +hidden = 7168 +num_topk = 8 +num_experts = 256 +allow_mnnvl = false diff --git a/conf/experimental/test/deepep_test_low_latency.toml b/conf/experimental/test/deepep_test_low_latency.toml new file mode 100644 index 000000000..270923214 --- /dev/null +++ b/conf/experimental/test/deepep_test_low_latency.toml @@ -0,0 +1,22 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +name = "deepep_test_low_latency" +description = "Official DeepEP V1 legacy test_low_latency" +test_template_name = "DeepEP" + +[cmd_args] +docker_image_url = "/your/path/to/the/container" +subtest_name = "test_low_latency" +legacy_tests_root = "/path/in/the/container/to/the/tests/folder" +num_processes = 8 +num_tokens = 128 +hidden = 7168 +num_topk = 8 +num_experts = 288 +allow_mnnvl = false +disable_nvlink = false +use_logfmt = false +pressure_test = false +shrink_test = false diff --git a/conf/experimental/test/moe_benchmark_low_latency.toml b/conf/experimental/test/moe_benchmark_low_latency.toml new file mode 100644 index 000000000..9d1a7eefc --- /dev/null +++ b/conf/experimental/test/moe_benchmark_low_latency.toml @@ -0,0 +1,32 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +name = "moe_benchmark_low_latency" +description = "MoE Benchmark - DeepEP low-latency mode plus matrix export" +test_template_name = "MoEBenchmark" + +[cmd_args] +docker_image_url = "/your/path/to/the/container" +benchmark_root = "/path/in/the/container/to/the/tests/folder" +mode = "low_latency" +tokens = 128 +num_experts = 288 +num_topk = 8 +hidden_size = 7168 +data_type = "bfloat16" +allow_nvlink_for_low_latency = false +allow_mnnvl = false +round_scale = false +use_ue8m0 = false +num_warmups = 20 +num_iterations = 50 +shuffle_columns = false +use_kineto_profiler = false +enable_tuning = false +config_file_path = "/tmp/config.yaml" +results_dir = "/workspace/dp-benchmark/results" + +[extra_env_vars] +NUM_QPS_PER_RANK = "12" +NUM_SMS = "24" diff --git a/conf/experimental/test/moe_benchmark_standard.toml b/conf/experimental/test/moe_benchmark_standard.toml new file mode 100644 index 000000000..612f2c8d5 --- /dev/null +++ b/conf/experimental/test/moe_benchmark_standard.toml @@ -0,0 +1,35 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +name = "moe_benchmark_standard" +description = "MoE Benchmark - DeepEP standard mode plus matrix export" +test_template_name = "MoEBenchmark" + +[cmd_args] +docker_image_url = "/your/path/to/the/container" +benchmark_root = "/workspace/dp-benchmark/benchmark" +mode = "standard" +tokens = 4096 +num_experts = 256 +num_topk = 8 +hidden_size = 7168 +data_type = "bfloat16" +allow_nvlink_for_low_latency = false +allow_mnnvl = false +round_scale = false +use_ue8m0 = false +num_warmups = 20 +num_iterations = 50 +shuffle_columns = false +use_kineto_profiler = false +enable_tuning = false +config_file_path = "/tmp/config.yaml" +results_dir = "/workspace/dp-benchmark/results" + +[extra_env_vars] +NUM_QPS_PER_RANK = "12" +NUM_SMS = "24" + + + diff --git a/conf/experimental/test/nccl_test_alltoallv.toml b/conf/experimental/test/nccl_test_alltoallv.toml index 2062feb88..da29f48d6 100644 --- a/conf/experimental/test/nccl_test_alltoallv.toml +++ b/conf/experimental/test/nccl_test_alltoallv.toml @@ -19,8 +19,7 @@ description = "NCCL AlltoAllv" test_template_name = "NcclTest" [cmd_args] -docker_image_url = "gitlab-master.nvidia.com/ybenabou/warehouse/deepep:dp-benchmark" -# Container provides /opt/nccl-tests/build/alltoallv_perf. +docker_image_url = "/your/path/to/the/container" subtest_name = "alltoallv_perf_mpi" nthreads = 1 ngpus = 1 @@ -32,3 +31,7 @@ warmup_iters = 1 check = 1 blocking = 0 use_deepep_matrix = true + +[extra_env_vars] +NCCL_P2P_DISABLE = "1" +NCCL_SHM_DISABLE = "1" \ No newline at end of file diff --git a/conf/experimental/test_scenario/deepep_with_ucc_alltoallv.toml b/conf/experimental/test/ucc_alltoallv_deepep.toml similarity index 53% rename from conf/experimental/test_scenario/deepep_with_ucc_alltoallv.toml rename to conf/experimental/test/ucc_alltoallv_deepep.toml index 0c58c8e32..97110e0b6 100644 --- a/conf/experimental/test_scenario/deepep_with_ucc_alltoallv.toml +++ b/conf/experimental/test/ucc_alltoallv_deepep.toml @@ -14,21 +14,27 @@ # See the License for the specific language governing permissions and # limitations under the License. -name = "deepep-with-ucc-alltoallv" +name = "ucc_alltoallv_deepep" +description = "UCC AlltoAllv" +test_template_name = "UCCTest" -# First run the DeepEP benchmark which generates the traffic matrix -[[Tests]] -id = "Tests.deepep" -test_name = "deepep_standard" -num_nodes = 2 -time_limit = "00:30:00" +[cmd_args] +docker_image_url = "/your/path/to/the/container" +collective = "alltoallv" +b = 1 +e = "8M" +use_deepep_matrix = true -# Then run UCC AlltoAllv test using the generated matrix (auto-converted) -[[Tests]] -id = "Tests.ucc_alltoallv" -test_name = "ucc_alltoallv_deepep" -num_nodes = 2 -time_limit = "00:30:00" - [[Tests.dependencies]] - type = "start_post_comp" - id = "Tests.deepep" +[extra_env_vars] +UCX_IB_GID_INDEX = "auto" +UCX_TLS = "cuda_copy,rc" +UCX_RNDV_THRESH = "0" +UCX_RNDV_SCHEME = "get_zcopy" +MELLANOX_VISIBLE_DEVICES = "0,3,4,5,6,9,10,11" +CUDA_VISIBLE_DEVICES = "0,1,2,3,4,5,6,7" +UCC_CL_HIER_FULL_SBGP_TLS = "ucp" +UCC_CL_HIER_NODE_SBGP_TLS = "cuda" +UCC_TLS = "ucp,cuda" +UCC_CL_HIER_TUNE = "alltoallv:0-inf:@node_split" +UCC_TL_UCP_ALLTOALLV_PAIRWISE_NUM_POSTS = "8" +UCC_CLS = "basic,hier" diff --git a/conf/experimental/test_scenario/deepep.toml b/conf/experimental/test_scenario/deepep.toml deleted file mode 100644 index 95335d20e..000000000 --- a/conf/experimental/test_scenario/deepep.toml +++ /dev/null @@ -1,29 +0,0 @@ -# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES -# Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -name = "deepep-benchmark" - -[[Tests]] -id = "Tests.1" -test_name = "deepep_standard" -num_nodes = 2 -time_limit = "00:30:00" - -[[Tests]] -id = "Tests.2" -test_name = "deepep_low_latency" -num_nodes = 2 -time_limit = "00:30:00" diff --git a/conf/experimental/test_scenario/deepep_official.toml b/conf/experimental/test_scenario/deepep_official.toml new file mode 100644 index 000000000..9cc43065f --- /dev/null +++ b/conf/experimental/test_scenario/deepep_official.toml @@ -0,0 +1,11 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +name = "deepep-official-tests" + +[[Tests]] +id = "Tests.deepep_test_internode" +test_name = "deepep_test_internode" +num_nodes = 2 +time_limit = "00:30:00" diff --git a/conf/experimental/test_scenario/deepep_with_nccl_alltoallv.toml b/conf/experimental/test_scenario/deepep_with_nccl_alltoallv.toml deleted file mode 100644 index 111209b98..000000000 --- a/conf/experimental/test_scenario/deepep_with_nccl_alltoallv.toml +++ /dev/null @@ -1,36 +0,0 @@ -# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES -# Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -name = "deepep-with-nccl-alltoallv" - -# First run the DeepEP benchmark which generates the traffic matrix -[[Tests]] -id = "Tests.deepep" -test_name = "deepep_standard" -num_nodes = 2 -nodes = ["dgx-gaia-55", "dgx-gaia-56"] -time_limit = "00:30:00" - -# Then run NCCL AlltoAllv test using the generated matrix -[[Tests]] -id = "Tests.nccl_alltoallv" -test_name = "nccl_alltoallv" -num_nodes = 2 -nodes = ["dgx-gaia-55", "dgx-gaia-56"] -time_limit = "00:30:00" - [[Tests.dependencies]] - type = "start_post_comp" - id = "Tests.deepep" diff --git a/conf/experimental/test_scenario/moe_benchmark.toml b/conf/experimental/test_scenario/moe_benchmark.toml new file mode 100644 index 000000000..c4c607613 --- /dev/null +++ b/conf/experimental/test_scenario/moe_benchmark.toml @@ -0,0 +1,29 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +name = "moe-benchmark" + +[[Tests]] +id = "Tests.moe_benchmark" +test_name = "moe_benchmark_standard" +num_nodes = 2 +time_limit = "00:30:00" + +[[Tests]] +id = "Tests.ucc_alltoallv" +test_name = "ucc_alltoallv_deepep" +num_nodes = 2 +time_limit = "00:30:00" + [[Tests.dependencies]] + type = "start_post_comp" + id = "Tests.moe_benchmark" + +[[Tests]] +id = "Tests.nccl_alltoallv" +test_name = "nccl_test_alltoallv" +num_nodes = 2 +time_limit = "00:30:00" + [[Tests.dependencies]] + type = "start_post_comp" + id = "Tests.ucc_alltoallv" diff --git a/src/cloudai/registration.py b/src/cloudai/registration.py index b0a00ba37..8a6a2a63b 100644 --- a/src/cloudai/registration.py +++ b/src/cloudai/registration.py @@ -79,10 +79,14 @@ def register_all(): DDLBTestSlurmCommandGenStrategy, ) from cloudai.workloads.deepep import ( - DeepEPReportGenerationStrategy, DeepEPSlurmCommandGenStrategy, DeepEPTestDefinition, - DeepEPMoEThroughputReporter, + ) + from cloudai.workloads.moe_benchmark import ( + MoEBenchmarkReportGenerationStrategy, + MoEBenchmarkSlurmCommandGenStrategy, + MoEBenchmarkTestDefinition, + MoEBenchmarkThroughputReporter, ) from cloudai.workloads.dynamo_mocker import ( DynamoMockerReportGenerationStrategy, @@ -234,6 +238,7 @@ def register_all(): Registry().add_command_gen_strategy(SlurmSystem, ChakraReplayTestDefinition, ChakraReplaySlurmCommandGenStrategy) Registry().add_command_gen_strategy(SlurmSystem, DeepEPTestDefinition, DeepEPSlurmCommandGenStrategy) + Registry().add_command_gen_strategy(SlurmSystem, MoEBenchmarkTestDefinition, MoEBenchmarkSlurmCommandGenStrategy) Registry().add_command_gen_strategy(SlurmSystem, SlurmContainerTestDefinition, SlurmContainerCommandGenStrategy) Registry().add_command_gen_strategy( SlurmSystem, TritonInferenceTestDefinition, TritonInferenceSlurmCommandGenStrategy @@ -263,6 +268,7 @@ def register_all(): Registry().add_test_definition("DDLBTest", DDLBTestDefinition) Registry().add_test_definition("ChakraReplay", ChakraReplayTestDefinition) Registry().add_test_definition("DeepEP", DeepEPTestDefinition) + Registry().add_test_definition("MoEBenchmark", MoEBenchmarkTestDefinition) Registry().add_test_definition("Sleep", SleepTestDefinition) Registry().add_test_definition("NeMoLauncher", NeMoLauncherTestDefinition) Registry().add_test_definition("NeMoRun", NeMoRunTestDefinition) @@ -288,7 +294,7 @@ def register_all(): Registry().add_agent("grid_search", GridSearchAgent) Registry().add_report(ChakraReplayTestDefinition, ChakraReplayReportGenerationStrategy) - Registry().add_report(DeepEPTestDefinition, DeepEPReportGenerationStrategy) + Registry().add_report(MoEBenchmarkTestDefinition, MoEBenchmarkReportGenerationStrategy) Registry().add_report(GPTTestDefinition, JaxToolboxReportGenerationStrategy) Registry().add_report(GrokTestDefinition, JaxToolboxReportGenerationStrategy) Registry().add_report(MegatronRunTestDefinition, CheckpointTimingReportGenerationStrategy) @@ -312,7 +318,7 @@ def register_all(): Registry().add_report(VllmTestDefinition, VLLMBenchReportGenerationStrategy) Registry().add_scenario_report("per_test", PerTestReporter, ReportConfig(enable=True)) - Registry().add_scenario_report("deepep_moe_throughput", DeepEPMoEThroughputReporter, ReportConfig(enable=True)) + Registry().add_scenario_report("moe_benchmark_throughput", MoEBenchmarkThroughputReporter, ReportConfig(enable=True)) Registry().add_scenario_report("status", StatusReporter, ReportConfig(enable=True)) Registry().add_scenario_report("dse", DSEReporter, ReportConfig(enable=True)) Registry().add_scenario_report("tarball", TarballReporter, ReportConfig(enable=True)) diff --git a/src/cloudai/workloads/deepep/__init__.py b/src/cloudai/workloads/deepep/__init__.py index a559eaa79..2732ac2cd 100644 --- a/src/cloudai/workloads/deepep/__init__.py +++ b/src/cloudai/workloads/deepep/__init__.py @@ -15,14 +15,10 @@ # limitations under the License. from .deepep import DeepEPCmdArgs, DeepEPTestDefinition -from .deepep_moe_throughput_reporter import DeepEPMoEThroughputReporter -from .report_generation_strategy import DeepEPReportGenerationStrategy from .slurm_command_gen_strategy import DeepEPSlurmCommandGenStrategy __all__ = [ "DeepEPCmdArgs", - "DeepEPMoEThroughputReporter", - "DeepEPReportGenerationStrategy", "DeepEPSlurmCommandGenStrategy", "DeepEPTestDefinition", ] diff --git a/src/cloudai/workloads/deepep/deepep.py b/src/cloudai/workloads/deepep/deepep.py index 44c09bd65..bafb70804 100644 --- a/src/cloudai/workloads/deepep/deepep.py +++ b/src/cloudai/workloads/deepep/deepep.py @@ -14,40 +14,70 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Literal, Optional +from pathlib import PurePosixPath +from typing import ClassVar, Literal, Optional from cloudai.core import DockerImage, Installable from cloudai.models.workload import CmdArgs, TestDefinition class DeepEPCmdArgs(CmdArgs): - """DeepEP benchmark command arguments.""" + """Command arguments for the official DeepEP test scripts.""" docker_image_url: str - mode: Literal["standard", "low_latency"] = "standard" - tokens: int = 1024 - num_experts: int = 256 + subtest_name: Literal["test_internode", "test_intranode", "test_low_latency", "test_ep"] = "test_internode" + deep_ep_root: str = "/workspace/DeepEP" + legacy_tests_root: Optional[str] = None + elastic_tests_root: Optional[str] = None + python_executable: str = "python" + + num_processes: int = 8 + num_tokens: int = 4096 + hidden: int = 7168 num_topk: int = 8 - hidden_size: int = 7168 - data_type: Literal["bfloat16", "fp8"] = "bfloat16" - allow_nvlink_for_low_latency: bool = False + num_experts: int = 256 + + # V1 legacy internode/intranode/low-latency flags. + num_topk_groups: Optional[int] = None allow_mnnvl: bool = False - round_scale: bool = False - use_ue8m0: bool = False - num_warmups: int = 20 - num_iterations: int = 50 - shuffle_columns: bool = False - use_kineto_profiler: bool = False - enable_tuning: bool = False - num_sms: int = 24 - num_qps_per_rank: int = 12 - config_file_path: str = "/tmp/config.yaml" - results_dir: str = "/workspace/dp-benchmark/results" + test_ll_compatibility: bool = False + pressure_test_mode: int = 0 + pressure_test: bool = False + shrink_test: bool = False + disable_nvlink: bool = False + use_logfmt: bool = False + shuffle_expert_columns: bool = False + shuffle_seed: int = 1 + + # V2 elastic/test_ep flags. + num_sms: int = 0 + num_qps: int = 0 + num_allocated_qps: int = 0 + num_gpu_timeout_secs: int = 100 + num_cpu_timeout_secs: int = 100 + sl_idx: int = 0 + do_cpu_sync: int = 1 + allow_hybrid_mode: int = 1 + allow_multiple_reduction: int = 1 + prefer_overlap_with_compute: int = 0 + deterministic: bool = False + seed: int = 0 + skip_check: bool = False + skip_perf_test: bool = False + do_pressure_test: bool = False + reuse_elastic_buffer: bool = False + test_first_only: bool = False + unbalanced_ratio: float = 1.0 + precise_unbalanced_ratio: bool = False + masked_ratio: float = 0.0 + dump_profile_traces: str = "" + ignore_local_traffic: bool = False class DeepEPTestDefinition(TestDefinition): - """Test object for DeepEP MoE benchmark.""" + """Test object for official DeepEP v1/v2 test scripts.""" + container_runtime_root: ClassVar[str] = "/workspace/DeepEP" cmd_args: DeepEPCmdArgs _docker_image: Optional[DockerImage] = None @@ -55,7 +85,7 @@ class DeepEPTestDefinition(TestDefinition): def docker_image(self) -> DockerImage: if not self._docker_image: if not self.cmd_args.docker_image_url: - raise ValueError("docker_image_url is required for DeepEP benchmark") + raise ValueError("docker_image_url is required for DeepEP tests") self._docker_image = DockerImage(url=self.cmd_args.docker_image_url) return self._docker_image @@ -64,15 +94,5 @@ def installables(self) -> list[Installable]: return [self.docker_image] @property - def cmd_args_dict(self) -> dict: - """Return command arguments as dict, excluding CloudAI-specific fields.""" - return self.cmd_args.model_dump( - exclude={ - "docker_image_url", - "mode", - "num_sms", - "num_qps_per_rank", - "config_file_path", - "results_dir", - } - ) + def container_runtime_root_path(self) -> PurePosixPath: + return PurePosixPath(self.cmd_args.deep_ep_root or self.container_runtime_root) diff --git a/src/cloudai/workloads/deepep/slurm_command_gen_strategy.py b/src/cloudai/workloads/deepep/slurm_command_gen_strategy.py index 3dc021900..5759a87a3 100644 --- a/src/cloudai/workloads/deepep/slurm_command_gen_strategy.py +++ b/src/cloudai/workloads/deepep/slurm_command_gen_strategy.py @@ -1,37 +1,91 @@ # SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES -# Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from pathlib import Path -from typing import List, cast + +from pathlib import PurePosixPath +from typing import Any, List, cast from cloudai.systems.slurm import SlurmCommandGenStrategy from .deepep import DeepEPCmdArgs, DeepEPTestDefinition +_LEGACY_SUBTESTS = {"test_internode", "test_intranode", "test_low_latency"} + +_CLI_FIELDS_BY_SUBTEST = { + "test_internode": ( + "num_processes", + "num_tokens", + "hidden", + "num_topk_groups", + "num_topk", + "pressure_test_mode", + "num_experts", + "test_ll_compatibility", + ), + "test_intranode": ( + "num_processes", + "num_tokens", + "hidden", + "num_topk", + "num_experts", + "allow_mnnvl", + ), + "test_low_latency": ( + "num_processes", + "num_tokens", + "hidden", + "num_topk", + "num_experts", + "allow_mnnvl", + "disable_nvlink", + "use_logfmt", + "pressure_test", + "shrink_test", + ), + "test_ep": ( + "num_processes", + "num_sms", + "num_qps", + "num_allocated_qps", + "num_gpu_timeout_secs", + "num_cpu_timeout_secs", + "sl_idx", + "num_tokens", + "hidden", + "num_topk", + "num_experts", + "do_cpu_sync", + "allow_hybrid_mode", + "allow_multiple_reduction", + "prefer_overlap_with_compute", + "deterministic", + "seed", + "skip_check", + "skip_perf_test", + "do_pressure_test", + "reuse_elastic_buffer", + "test_first_only", + "unbalanced_ratio", + "precise_unbalanced_ratio", + "masked_ratio", + "dump_profile_traces", + "ignore_local_traffic", + ), +} + +def _flag_name(field_name: str) -> str: + return f"--{field_name.replace('_', '-')}" + + class DeepEPSlurmCommandGenStrategy(SlurmCommandGenStrategy): - """Command generation strategy for DeepEP benchmark on Slurm systems.""" + """Command generation strategy for official DeepEP v1/v2 tests.""" - def _append_head_node_detection(self, batch_script_content: List[str]) -> None: - """ - Append bash commands to detect head node IP. + @property + def tdef(self) -> DeepEPTestDefinition: + return cast(DeepEPTestDefinition, self.test_run.test) - Args: - batch_script_content: The list of script lines to append to. - """ + def _append_head_node_detection(self, batch_script_content: List[str]) -> None: batch_script_content.extend( [ "", @@ -51,84 +105,98 @@ def _append_head_node_detection(self, batch_script_content: List[str]) -> None: ) def _append_sbatch_directives(self, batch_script_content: List[str]) -> None: - """ - Append SBATCH directives and head node detection setup for DeepEP. - - Args: - batch_script_content: The list of script lines to append to. - """ - super()._append_sbatch_directives(batch_script_content) + num_nodes, node_list = self.get_cached_nodes_spec() + + self._add_reservation(batch_script_content) + batch_script_content.append(f"#SBATCH --output={self.test_run.output_path.absolute() / 'stdout.txt'}") + batch_script_content.append(f"#SBATCH --error={self.test_run.output_path.absolute() / 'stderr.txt'}") + batch_script_content.append(f"#SBATCH --partition={self.system.default_partition}") + if self.system.account: + batch_script_content.append(f"#SBATCH --account={self.system.account}") + if node_list: + batch_script_content.append(f"#SBATCH --nodelist={','.join(node_list)}") + batch_script_content.append(f"#SBATCH -N {num_nodes}") + batch_script_content.append(f"#SBATCH --gpus-per-node={self.system.gpus_per_node}") + batch_script_content.append(f"#SBATCH --gres=gpu:{self.system.gpus_per_node}") + batch_script_content.append("#SBATCH --ntasks-per-node=1") + if self.test_run.time_limit: + batch_script_content.append(f"#SBATCH --time={self.test_run.time_limit}") + batch_script_content.append( + "\nexport SLURM_JOB_MASTER_NODE=$(scontrol show hostname $SLURM_JOB_NODELIST | head -n 1)" + ) self._append_head_node_detection(batch_script_content) - def _container_mounts(self) -> List[str]: - """Return container mounts specific to DeepEP benchmark.""" - tdef: DeepEPTestDefinition = cast(DeepEPTestDefinition, self.test_run.test) - cmd_args: DeepEPCmdArgs = tdef.cmd_args + def _gen_srun_command(self) -> str: + srun_command_parts = self.gen_srun_prefix(use_pretest_extras=True, with_num_nodes=False) + num_nodes, _ = self.get_cached_nodes_spec() + srun_command_parts.extend([f"--nodes={num_nodes}", f"--ntasks={num_nodes}", "--ntasks-per-node=1"]) - config_file_path = self.test_run.output_path / "config.yaml" - self._generate_config_yaml(config_file_path, cmd_args) + nsys_command_parts = self.gen_nsys_command() + test_command_parts = self.generate_test_command() - mounts = [ - f"{config_file_path.absolute()}:{cmd_args.config_file_path}", - f"{self.test_run.output_path.absolute()}:{cmd_args.results_dir}", - ] + with (self.test_run.output_path / "env_vars.sh").open("w") as f: + for key, value in self.final_env_vars.items(): + f.write(f'export {key}="{value}"\n') - return mounts + full_test_cmd = ( + f'bash -c "source {(self.test_run.output_path / "env_vars.sh").absolute()}; ' + + " ".join(nsys_command_parts + test_command_parts) + + '"' + ) - def image_path(self) -> str | None: - """Return the Docker image path for DeepEP benchmark.""" - tdef: DeepEPTestDefinition = cast(DeepEPTestDefinition, self.test_run.test) - return str(tdef.docker_image.installed_path) + return " ".join(srun_command_parts) + " " + full_test_cmd - def generate_test_command(self) -> List[str]: - """Generate the test command for DeepEP benchmark.""" - tdef: DeepEPTestDefinition = cast(DeepEPTestDefinition, self.test_run.test) - cmd_args: DeepEPCmdArgs = tdef.cmd_args + def _container_mounts(self) -> list[str]: + return [] + + def image_path(self) -> str | None: + return str(self.tdef.docker_image.installed_path) - if cmd_args.mode == "standard": - benchmark_script = "/workspace/dp-benchmark/benchmark/benchmark.py" + def _script_path(self, cmd_args: DeepEPCmdArgs) -> str: + deep_ep_root = PurePosixPath(cmd_args.deep_ep_root) + if cmd_args.subtest_name in _LEGACY_SUBTESTS: + tests_root = PurePosixPath(cmd_args.legacy_tests_root) if cmd_args.legacy_tests_root else deep_ep_root / "tests" / "legacy" else: - benchmark_script = "/workspace/dp-benchmark/benchmark/benchmark_ll.py" + tests_root = PurePosixPath(cmd_args.elastic_tests_root) if cmd_args.elastic_tests_root else deep_ep_root / "tests" / "elastic" - return [ - "python", - benchmark_script, - cmd_args.config_file_path, - ] + return str(tests_root / f"{cmd_args.subtest_name}.py") + + def _append_cli_field(self, parts: list[str], field_name: str, value: Any) -> None: + if value is None or value == "": + return - def _generate_config_yaml(self, config_path: Path, cmd_args: DeepEPCmdArgs) -> None: - """ - Generate YAML configuration file for DeepEP benchmark. + flag = _flag_name(field_name) + if isinstance(value, bool): + if field_name in _BOOL_VALUE_FIELDS: + if value: + parts.extend([flag, str(value)]) + elif value: + parts.append(flag) + return - Args: - config_path: Path where to write the config file. - cmd_args: Command arguments for the benchmark. - """ - tdef: DeepEPTestDefinition = cast(DeepEPTestDefinition, self.test_run.test) + parts.extend([flag, str(value)]) - config_lines = [ - "# DeepEP Benchmark Configuration", - "# Generated by CloudAI", - "", + def generate_test_command(self) -> List[str]: + cmd_args = self.tdef.cmd_args + num_nodes, _ = self.get_cached_nodes_spec() + parts: list[str] = [ + "torchrun", + f"--nnodes={num_nodes}", + "--nproc_per_node=1", + "--rdzv_id=\\${SLURM_JOB_ID:-0}", + "--rdzv_backend=c10d", + "--rdzv_endpoint=\\${MASTER_ADDR}:\\${MASTER_PORT}", + self._script_path(cmd_args), ] - for key, value in tdef.cmd_args_dict.items(): - if isinstance(value, bool): - config_lines.append(f"{key}: {str(value).lower()}") - elif isinstance(value, str): - config_lines.append(f'{key}: "{value}"') - else: - config_lines.append(f"{key}: {value}") + for field_name in _CLI_FIELDS_BY_SUBTEST[cmd_args.subtest_name]: + self._append_cli_field(parts, field_name, getattr(cmd_args, field_name)) - config_path.parent.mkdir(parents=True, exist_ok=True) + if self.test_run.test.extra_cmd_args: + parts.append(self.test_run.test.extra_args_str) - with open(config_path, "w") as f: - f.write("\n".join(config_lines)) + return parts def gen_srun_success_check(self) -> str: - """Check if DeepEP benchmark completed successfully.""" output_file = self.test_run.output_path / "stdout.txt" - return ( - 'grep -Eq "global_bw|RDMA BW \\(GB/s\\)|NVLink BW \\(GB/s\\)|Bus BW \\(GB/s\\)|Global BW \\(GB/s\\)" ' - f'{output_file} && echo 1 || echo 0' - ) + return f'grep -Eq "\\[testing\\]|dispatch|combine|passed|tuning|Best" {output_file} && echo 1 || echo 0' diff --git a/src/cloudai/workloads/moe_benchmark/__init__.py b/src/cloudai/workloads/moe_benchmark/__init__.py new file mode 100644 index 000000000..d2a4cbae2 --- /dev/null +++ b/src/cloudai/workloads/moe_benchmark/__init__.py @@ -0,0 +1,16 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from .moe_benchmark import MoEBenchmarkCmdArgs, MoEBenchmarkTestDefinition +from .report_generation_strategy import MoEBenchmarkReportGenerationStrategy +from .slurm_command_gen_strategy import MoEBenchmarkSlurmCommandGenStrategy +from .throughput_reporter import MoEBenchmarkThroughputReporter + +__all__ = [ + "MoEBenchmarkCmdArgs", + "MoEBenchmarkReportGenerationStrategy", + "MoEBenchmarkSlurmCommandGenStrategy", + "MoEBenchmarkTestDefinition", + "MoEBenchmarkThroughputReporter", +] diff --git a/src/cloudai/workloads/deepep/deepep_combined_report.py b/src/cloudai/workloads/moe_benchmark/combined_report.py similarity index 80% rename from src/cloudai/workloads/deepep/deepep_combined_report.py rename to src/cloudai/workloads/moe_benchmark/combined_report.py index a29504048..425145b09 100644 --- a/src/cloudai/workloads/deepep/deepep_combined_report.py +++ b/src/cloudai/workloads/moe_benchmark/combined_report.py @@ -2,7 +2,7 @@ # Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -"""DeepEP dependency helpers for Slurm UCC/NCCL.""" +"""MoE benchmark dependency helpers for Slurm UCC/NCCL.""" from __future__ import annotations @@ -10,11 +10,11 @@ from cloudai.core import TestRun -DEEPEP_PREV_MOUNT = "/cloudai_deepep_prev" +MOE_BENCHMARK_PREV_MOUNT = "/cloudai_moe_benchmark_prev" def start_post_comp_chain(test_run: TestRun) -> list[TestRun]: - """Follow ``start_post_comp`` (e.g. UCC → NCCL → DeepEP).""" + """Follow ``start_post_comp`` (e.g. UCC -> NCCL -> MoE benchmark).""" dep = test_run.dependencies.get("start_post_comp") if dep is None: return [] @@ -35,8 +35,8 @@ def _has_ucc_matrix_under(root: Path) -> bool: return any(root.glob("**/ucc_matrix.txt")) -def deepep_benchmark_root(test_run: TestRun) -> Path | None: - """DeepEP job directory (``ucc_matrix`` or BENCHMARK stdout), walking ``start_post_comp``.""" +def moe_benchmark_root(test_run: TestRun) -> Path | None: + """MoE benchmark job directory (``ucc_matrix`` or BENCHMARK stdout), walking ``start_post_comp``.""" for tr in start_post_comp_chain(test_run): root = tr.output_path if _has_ucc_matrix_under(root): @@ -51,7 +51,7 @@ def deepep_benchmark_root(test_run: TestRun) -> Path | None: return None -def deepep_results_json_files(test_output_path: Path) -> list[Path]: +def moe_benchmark_results_json_files(test_output_path: Path) -> list[Path]: """All ``results.json`` paths under ``results/benchmark_*`` or top-level ``benchmark_*``.""" found: list[Path] = [] for pattern in ("results/benchmark_*_ranks_*", "benchmark_*_ranks_*"): diff --git a/src/cloudai/workloads/moe_benchmark/moe_benchmark.py b/src/cloudai/workloads/moe_benchmark/moe_benchmark.py new file mode 100644 index 000000000..df3bb7c6a --- /dev/null +++ b/src/cloudai/workloads/moe_benchmark/moe_benchmark.py @@ -0,0 +1,68 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from typing import Literal, Optional + +from cloudai.core import DockerImage, Installable +from cloudai.models.workload import CmdArgs, TestDefinition + + +class MoEBenchmarkCmdArgs(CmdArgs): + """Command arguments for the custom MoE benchmark that compares EP/alltoallv backends.""" + + docker_image_url: str + benchmark_root: str = "/workspace/dp-benchmark/benchmark" + mode: Literal["standard", "low_latency"] = "standard" + tokens: int = 1024 + num_experts: int = 256 + num_topk: int = 8 + hidden_size: int = 7168 + data_type: Literal["bfloat16", "fp8"] = "bfloat16" + allow_nvlink_for_low_latency: bool = False + allow_mnnvl: bool = False + round_scale: bool = False + use_ue8m0: bool = False + num_warmups: int = 20 + num_iterations: int = 50 + shuffle_columns: bool = False + use_kineto_profiler: bool = False + enable_tuning: bool = False + num_sms: int = 24 + num_qps_per_rank: int = 12 + config_file_path: str = "/tmp/config.yaml" + results_dir: str = "/workspace/dp-benchmark/results" + + +class MoEBenchmarkTestDefinition(TestDefinition): + """Test object for the custom MoE benchmark.""" + + cmd_args: MoEBenchmarkCmdArgs + _docker_image: Optional[DockerImage] = None + + @property + def docker_image(self) -> DockerImage: + if not self._docker_image: + if not self.cmd_args.docker_image_url: + raise ValueError("docker_image_url is required for MoE benchmark") + self._docker_image = DockerImage(url=self.cmd_args.docker_image_url) + return self._docker_image + + @property + def installables(self) -> list[Installable]: + return [self.docker_image] + + @property + def cmd_args_dict(self) -> dict: + """Return command arguments as dict, excluding CloudAI/container-only fields.""" + return self.cmd_args.model_dump( + exclude={ + "docker_image_url", + "benchmark_root", + "mode", + "num_sms", + "num_qps_per_rank", + "config_file_path", + "results_dir", + } + ) diff --git a/src/cloudai/workloads/deepep/report_generation_strategy.py b/src/cloudai/workloads/moe_benchmark/report_generation_strategy.py similarity index 66% rename from src/cloudai/workloads/deepep/report_generation_strategy.py rename to src/cloudai/workloads/moe_benchmark/report_generation_strategy.py index 91a621496..5bb0f01d0 100644 --- a/src/cloudai/workloads/deepep/report_generation_strategy.py +++ b/src/cloudai/workloads/moe_benchmark/report_generation_strategy.py @@ -1,18 +1,6 @@ # SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES # Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. from __future__ import annotations @@ -25,33 +13,27 @@ from cloudai.core import ReportGenerationStrategy from cloudai.report_generator.tool.csv_report_tool import CSVReportTool from cloudai.util.lazy_imports import lazy -from cloudai.workloads.deepep.deepep_combined_report import deepep_results_json_files +from cloudai.workloads.moe_benchmark.combined_report import moe_benchmark_results_json_files if TYPE_CHECKING: import pandas as pd -class DeepEPReportGenerationStrategy(ReportGenerationStrategy): - """Strategy for generating reports from DeepEP benchmark outputs.""" +class MoEBenchmarkReportGenerationStrategy(ReportGenerationStrategy): + """Strategy for generating reports from MoE benchmark outputs.""" def can_handle_directory(self) -> bool: - """ - Check if this directory contains DeepEP benchmark results. - - Returns: - bool: True if directory contains DeepEP results. - """ directory_path = self.test_run.output_path - return bool(deepep_results_json_files(directory_path)) + return bool(moe_benchmark_results_json_files(directory_path)) def generate_report(self) -> None: - """Generate a report from DeepEP benchmark results.""" + """Generate a report from MoE benchmark results.""" directory_path = self.test_run.output_path test_name = self.test_run.test.name all_results = [] - for results_json in deepep_results_json_files(directory_path): + for results_json in moe_benchmark_results_json_files(directory_path): result_dir = results_json.parent try: @@ -105,14 +87,6 @@ def generate_report(self) -> None: self._generate_csv_report(df, directory_path, test_name) def _generate_csv_report(self, df: pd.DataFrame, directory_path: Path, test_name: str) -> None: - """ - Generate a CSV report from the DataFrame. - - Args: - df (pd.DataFrame): DataFrame containing the benchmark results. - directory_path (Path): Output directory path for saving the CSV report. - test_name (str): Name of the test. - """ csv_report_tool = CSVReportTool(directory_path) csv_report_tool.set_dataframe(df) csv_report_tool.finalize_report(Path(f"cloudai_{test_name}_report.csv")) diff --git a/src/cloudai/workloads/moe_benchmark/slurm_command_gen_strategy.py b/src/cloudai/workloads/moe_benchmark/slurm_command_gen_strategy.py new file mode 100644 index 000000000..ac1c56aa4 --- /dev/null +++ b/src/cloudai/workloads/moe_benchmark/slurm_command_gen_strategy.py @@ -0,0 +1,90 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from pathlib import Path, PurePosixPath +from typing import List, cast + +from cloudai.systems.slurm import SlurmCommandGenStrategy + +from .moe_benchmark import MoEBenchmarkCmdArgs, MoEBenchmarkTestDefinition + + +class MoEBenchmarkSlurmCommandGenStrategy(SlurmCommandGenStrategy): + """Command generation strategy for the custom MoE benchmark on Slurm systems.""" + + def _append_head_node_detection(self, batch_script_content: List[str]) -> None: + batch_script_content.extend( + [ + "", + "nodes=( $( scontrol show hostnames $SLURM_JOB_NODELIST ) )", + "nodes_array=($nodes)", + "head_node=${nodes_array[0]}", + 'head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address)', + "", + "echo Nodes: $SLURM_JOB_NODELIST", + "echo Num Nodes: ${#nodes[@]}", + "echo Head Node IP: $head_node_ip", + "", + "export MASTER_ADDR=$head_node_ip", + "export MASTER_PORT=29500", + "", + ] + ) + + def _append_sbatch_directives(self, batch_script_content: List[str]) -> None: + super()._append_sbatch_directives(batch_script_content) + self._append_head_node_detection(batch_script_content) + + def _container_mounts(self) -> List[str]: + """Return container mounts specific to the MoE benchmark.""" + tdef: MoEBenchmarkTestDefinition = cast(MoEBenchmarkTestDefinition, self.test_run.test) + cmd_args: MoEBenchmarkCmdArgs = tdef.cmd_args + + config_file_path = self.test_run.output_path / "config.yaml" + self._generate_config_yaml(config_file_path, cmd_args) + + return [ + f"{config_file_path.absolute()}:{cmd_args.config_file_path}", + f"{self.test_run.output_path.absolute()}:{cmd_args.results_dir}", + ] + + def image_path(self) -> str | None: + tdef: MoEBenchmarkTestDefinition = cast(MoEBenchmarkTestDefinition, self.test_run.test) + return str(tdef.docker_image.installed_path) + + def generate_test_command(self) -> List[str]: + tdef: MoEBenchmarkTestDefinition = cast(MoEBenchmarkTestDefinition, self.test_run.test) + cmd_args: MoEBenchmarkCmdArgs = tdef.cmd_args + + if cmd_args.mode == "standard": + script_name = "benchmark.py" + else: + script_name = "benchmark_ll.py" + + benchmark_script = str(PurePosixPath(cmd_args.benchmark_root) / script_name) + + return ["python", benchmark_script, cmd_args.config_file_path] + + def _generate_config_yaml(self, config_path: Path, cmd_args: MoEBenchmarkCmdArgs) -> None: + tdef: MoEBenchmarkTestDefinition = cast(MoEBenchmarkTestDefinition, self.test_run.test) + + config_lines = ["# MoE Benchmark Configuration", "# Generated by CloudAI", ""] + for key, value in tdef.cmd_args_dict.items(): + if isinstance(value, bool): + config_lines.append(f"{key}: {str(value).lower()}") + elif isinstance(value, str): + config_lines.append(f'{key}: "{value}"') + else: + config_lines.append(f"{key}: {value}") + + config_path.parent.mkdir(parents=True, exist_ok=True) + with open(config_path, "w") as f: + f.write("\n".join(config_lines)) + + def gen_srun_success_check(self) -> str: + output_file = self.test_run.output_path / "stdout.txt" + return ( + 'grep -Eq "global_bw|RDMA BW \\(GB/s\\)|NVLink BW \\(GB/s\\)|Bus BW \\(GB/s\\)|Global BW \\(GB/s\\)" ' + f'{output_file} && echo 1 || echo 0' + ) diff --git a/src/cloudai/workloads/deepep/deepep_moe_throughput_reporter.py b/src/cloudai/workloads/moe_benchmark/throughput_reporter.py similarity index 76% rename from src/cloudai/workloads/deepep/deepep_moe_throughput_reporter.py rename to src/cloudai/workloads/moe_benchmark/throughput_reporter.py index 7bef381f9..43af1641d 100644 --- a/src/cloudai/workloads/deepep/deepep_moe_throughput_reporter.py +++ b/src/cloudai/workloads/moe_benchmark/throughput_reporter.py @@ -13,23 +13,23 @@ from pathlib import Path from cloudai._core.base_reporter import Reporter -from cloudai.workloads.deepep.deepep import DeepEPTestDefinition -from cloudai.workloads.deepep.deepep_combined_report import deepep_results_json_files +from cloudai.workloads.moe_benchmark.combined_report import moe_benchmark_results_json_files +from cloudai.workloads.moe_benchmark.moe_benchmark import MoEBenchmarkTestDefinition from cloudai.workloads.nccl_test.nccl import NCCLTestDefinition from cloudai.workloads.nccl_test.performance_report_generation_strategy import extract_nccl_data from cloudai.workloads.ucc_test.ucc import UCCTestDefinition -def _deepep_dispatch_combine_bars(test_output: Path) -> list[tuple[str, float, str]]: +def _moe_benchmark_dispatch_combine_bars(test_output: Path) -> list[tuple[str, float, str]]: """From latest ``results.json``: one bar per ``dispatch`` / ``combine`` row (``bus_bw_avg``).""" - paths = deepep_results_json_files(test_output) + paths = moe_benchmark_results_json_files(test_output) if not paths: return [] latest = max(paths, key=lambda p: p.stat().st_mtime) try: rows = json.loads(latest.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError) as e: - logging.debug("DeepEP results.json unreadable %s: %s", latest, e) + logging.debug("MoE benchmark results.json unreadable %s: %s", latest, e) return [] if not isinstance(rows, list): return [] @@ -50,11 +50,10 @@ def _deepep_dispatch_combine_bars(test_output: Path) -> list[tuple[str, float, s continue out: list[tuple[str, float, str]] = [] - # Stable order: dispatch then combine, only if present in JSON if "dispatch" in by_op: - out.append(("DeepEP dispatch", by_op["dispatch"], "#2ca02c")) + out.append(("MoE dispatch", by_op["dispatch"], "#2ca02c")) if "combine" in by_op: - out.append(("DeepEP combine", by_op["combine"], "#31a354")) + out.append(("MoE combine", by_op["combine"], "#31a354")) return out @@ -70,7 +69,7 @@ def _mean_ucc_bus_bw_gb_s(test_output: Path) -> float | None: def _parse_ucc_perftest_mean_bus_avg(path: Path) -> float | None: - """Mean of ``Bus Bandwidth … avg`` column over numeric data rows (8 fields).""" + """Mean of ``Bus Bandwidth ... avg`` column over numeric data rows (8 fields).""" try: text = path.read_text(encoding="utf-8", errors="replace") except OSError: @@ -153,13 +152,9 @@ def ypx(v: float) -> float: for g in (0.25, 0.5, 0.75): gy = y0 - g * ih - parts.append( - f'' - ) + parts.append(f'') gv = vmin + g * (vmax - vmin) - parts.append( - f'{gv:.1f}' - ) + parts.append(f'{gv:.1f}') for cx, val, col, lab in zip(centers, values, colors, labels, strict=True): top = ypx(val) @@ -171,23 +166,11 @@ def ypx(v: float) -> float: ) for (cx, cy), val, col, lab in zip(pts, values, colors, labels, strict=True): - parts.append( - f'' - ) - parts.append( - f'{val:.2f}' - ) - parts.append( - f'' - f"{html.escape(lab)}" - ) + parts.append(f'') + parts.append(f'{val:.2f}') + parts.append(f'{html.escape(lab)}') - parts.append( - f'{html.escape(y_axis_label)}' - ) + parts.append(f'{html.escape(y_axis_label)}') leg_y = y0 + 38 parts.append(f'Summary') @@ -199,32 +182,31 @@ def ypx(v: float) -> float: ) parts.append("") - path.write_text("\n".join(parts), encoding="utf-8") -class DeepEPMoEThroughputReporter(Reporter): +class MoEBenchmarkThroughputReporter(Reporter): """After the scenario finishes, write one standalone SVG chart under the results root.""" def generate(self) -> None: self.load_test_runs() - deepep_trs = [tr for tr in self.trs if isinstance(tr.test, DeepEPTestDefinition)] - if not deepep_trs: - logging.debug("Skipping deepep_moe_throughput: no DeepEP test in scenario.") + moe_trs = [tr for tr in self.trs if isinstance(tr.test, MoEBenchmarkTestDefinition)] + if not moe_trs: + logging.debug("Skipping moe_benchmark_throughput: no MoEBenchmark test in scenario.") return categories: list[str] = [] values: list[float] = [] colors: list[str] = [] - deepep_bars = _deepep_dispatch_combine_bars(deepep_trs[0].output_path) - if not deepep_bars: + moe_bars = _moe_benchmark_dispatch_combine_bars(moe_trs[0].output_path) + if not moe_bars: logging.warning( - "Skipping deepep_moe_throughput: no dispatch/combine bus_bw_avg in DeepEP results.json under %s", - deepep_trs[0].output_path, + "Skipping moe_benchmark_throughput: no dispatch/combine bus_bw_avg in results.json under %s", + moe_trs[0].output_path, ) return - for lab, val, col in deepep_bars: + for lab, val, col in moe_bars: categories.append(lab) values.append(val) colors.append(col) @@ -258,4 +240,3 @@ def generate(self) -> None: colors=colors, y_axis_label="Mean bus bandwidth (GB/s)", ) - logging.info("Generated MoE throughput comparison at %s", out) diff --git a/src/cloudai/workloads/nccl_test/slurm_command_gen_strategy.py b/src/cloudai/workloads/nccl_test/slurm_command_gen_strategy.py index 80795a07c..1844d2f65 100644 --- a/src/cloudai/workloads/nccl_test/slurm_command_gen_strategy.py +++ b/src/cloudai/workloads/nccl_test/slurm_command_gen_strategy.py @@ -18,7 +18,7 @@ from typing import List, cast from cloudai.systems.slurm import SlurmCommandGenStrategy -from cloudai.workloads.deepep.deepep_combined_report import DEEPEP_PREV_MOUNT, deepep_benchmark_root +from cloudai.workloads.moe_benchmark.combined_report import MOE_BENCHMARK_PREV_MOUNT, moe_benchmark_root from .nccl import NCCLCmdArgs, NCCLTestDefinition @@ -52,7 +52,7 @@ def _deepep_nccl_matrix_host_path(self) -> Path | None: tdef: NCCLTestDefinition = cast(NCCLTestDefinition, self.test_run.test) if not tdef.cmd_args.use_deepep_matrix: return None - root = deepep_benchmark_root(self.test_run) + root = moe_benchmark_root(self.test_run) if root is None: return None return _nccl_matrix_path_under_deepep_output(root) @@ -69,9 +69,9 @@ def _container_mounts(self) -> List[str]: dest = tdef.cmd_args.alltoallv_matrix_container_path mounts: List[str] = [f"{matrix_host.resolve()}:{dest}"] - dr = deepep_benchmark_root(self.test_run) + dr = moe_benchmark_root(self.test_run) if dr is not None: - mounts.append(f"{dr.resolve()}:{DEEPEP_PREV_MOUNT}:ro") + mounts.append(f"{dr.resolve()}:{MOE_BENCHMARK_PREV_MOUNT}:ro") return mounts @property diff --git a/src/cloudai/workloads/ucc_test/slurm_command_gen_strategy.py b/src/cloudai/workloads/ucc_test/slurm_command_gen_strategy.py index 43f0df435..3bd941c9e 100644 --- a/src/cloudai/workloads/ucc_test/slurm_command_gen_strategy.py +++ b/src/cloudai/workloads/ucc_test/slurm_command_gen_strategy.py @@ -18,7 +18,7 @@ from typing import List, cast from cloudai.systems.slurm import SlurmCommandGenStrategy -from cloudai.workloads.deepep.deepep_combined_report import DEEPEP_PREV_MOUNT, deepep_benchmark_root +from cloudai.workloads.moe_benchmark.combined_report import MOE_BENCHMARK_PREV_MOUNT, moe_benchmark_root from .ucc import UCCCmdArgs, UCCTestDefinition @@ -45,7 +45,7 @@ def _deepep_ucc_matrix_host_path(self) -> Path | None: tdef: UCCTestDefinition = cast(UCCTestDefinition, self.test_run.test) if not tdef.cmd_args.use_deepep_matrix: return None - dep_out = deepep_benchmark_root(self.test_run) + dep_out = moe_benchmark_root(self.test_run) if dep_out is None: return None return _ucc_matrix_path_under_deepep_output(dep_out) @@ -55,7 +55,7 @@ def _container_mounts(self) -> List[str]: if not tdef.cmd_args.use_deepep_matrix: return [] - deepep_root = deepep_benchmark_root(self.test_run) + deepep_root = moe_benchmark_root(self.test_run) if deepep_root is None: return [] @@ -65,7 +65,7 @@ def _container_mounts(self) -> List[str]: return [ f"{matrix_host.resolve()}:{_UCC_GEN_MATRIX_CONTAINER}", - f"{deepep_root.resolve()}:{DEEPEP_PREV_MOUNT}:ro", + f"{deepep_root.resolve()}:{MOE_BENCHMARK_PREV_MOUNT}:ro", ] def image_path(self) -> str | None: diff --git a/tests/test_acceptance.py b/tests/test_acceptance.py index 151c6fb9e..081bb5636 100644 --- a/tests/test_acceptance.py +++ b/tests/test_acceptance.py @@ -46,6 +46,10 @@ DeepEPCmdArgs, DeepEPTestDefinition, ) +from cloudai.workloads.moe_benchmark import ( + MoEBenchmarkCmdArgs, + MoEBenchmarkTestDefinition +) from cloudai.workloads.jax_toolbox import ( GPTCmdArgs, GPTTestDefinition, @@ -273,7 +277,7 @@ def build_special_test_run( "ai-dynamo", "nixl-perftest", "nixl-kvbench", - "deepep-benchmark", + "moe-benchmark", "osu-bench", "sglang", "sglang-disagg", @@ -550,14 +554,14 @@ def test_req(request, slurm_system: SlurmSystem, partial_tr: partial[TestRun]) - ), ), ), - "deepep-benchmark": lambda: create_test_run( + "moe-benchmark": lambda: create_test_run( partial_tr, - "deepep-benchmark", - DeepEPTestDefinition( - name="deepep-benchmark", - description="DeepEP MoE Benchmark", - test_template_name="deepep-benchmark", - cmd_args=DeepEPCmdArgs( + "moe-benchmark", + MoEBenchmarkTestDefinition( + name="moe-benchmark", + description="MoE Benchmark", + test_template_name="MoEBenchmark", + cmd_args=MoEBenchmarkCmdArgs( docker_image_url="docker/image:url", ), ), @@ -698,7 +702,7 @@ def test_req(request, slurm_system: SlurmSystem, partial_tr: partial[TestRun]) - tr.num_nodes = 3 if request.param == "ai-dynamo": tr.num_nodes = 2 - if request.param == "deepep-benchmark": + if request.param == "moe-benchmark": tr.num_nodes = 2 if request.param in {"sglang-disagg-2nodes", "vllm-disagg-2nodes"}: tr.num_nodes = 2 diff --git a/tests/test_init.py b/tests/test_init.py index 5aa2cd0ff..e84aa4ef7 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -46,6 +46,11 @@ DynamoMockerStandaloneCommandGenStrategy, DynamoMockerTestDefinition, ) +from cloudai.workloads.moe_benchmark import ( + MoEBenchmarkSlurmCommandGenStrategy, + MoEBenchmarkTestDefinition, + MoEBenchmarkThroughputReporter, +) from cloudai.workloads.jax_toolbox import ( GPTTestDefinition, GrokTestDefinition, @@ -133,6 +138,7 @@ def test_runners(): CMD_GEN_STRATEGIES = { (SlurmSystem, ChakraReplayTestDefinition): ChakraReplaySlurmCommandGenStrategy, (SlurmSystem, DeepEPTestDefinition): DeepEPSlurmCommandGenStrategy, + (SlurmSystem, MoEBenchmarkTestDefinition): MoEBenchmarkSlurmCommandGenStrategy, (SlurmSystem, GPTTestDefinition): JaxToolboxSlurmCommandGenStrategy, (SlurmSystem, GrokTestDefinition): JaxToolboxSlurmCommandGenStrategy, (SlurmSystem, NCCLTestDefinition): NcclTestSlurmCommandGenStrategy, @@ -239,6 +245,7 @@ def test_definitions(): ("NcclTest", NCCLTestDefinition), ("ChakraReplay", ChakraReplayTestDefinition), ("DeepEP", DeepEPTestDefinition), + ("MoEBenchmark", MoEBenchmarkTestDefinition), ("Sleep", SleepTestDefinition), ("NeMoLauncher", NeMoLauncherTestDefinition), ("NeMoRun", NeMoRunTestDefinition), @@ -268,6 +275,7 @@ def test_scenario_reports(): scenario_reports = Registry().scenario_reports assert list(scenario_reports.keys()) == [ "per_test", + "moe_benchmark_throughput", "status", "dse", "tarball", @@ -277,6 +285,7 @@ def test_scenario_reports(): ] assert list(scenario_reports.values()) == [ PerTestReporter, + MoEBenchmarkThroughputReporter, StatusReporter, DSEReporter, TarballReporter, @@ -290,6 +299,7 @@ def test_report_configs(): configs = Registry().report_configs assert list(configs.keys()) == [ "per_test", + "moe_benchmark_throughput", "status", "dse", "tarball", diff --git a/tests/test_test_scenario.py b/tests/test_test_scenario.py index 8b390e8a1..8b3ae905f 100644 --- a/tests/test_test_scenario.py +++ b/tests/test_test_scenario.py @@ -41,10 +41,13 @@ from cloudai.workloads.aiconfig import AiconfiguratorReportGenerationStrategy, AiconfiguratorTestDefinition from cloudai.workloads.chakra_replay import ChakraReplayReportGenerationStrategy, ChakraReplayTestDefinition from cloudai.workloads.deepep import ( - DeepEPReportGenerationStrategy, DeepEPTestDefinition, ) from cloudai.workloads.dynamo_mocker import DynamoMockerReportGenerationStrategy, DynamoMockerTestDefinition +from cloudai.workloads.moe_benchmark import ( + MoEBenchmarkReportGenerationStrategy, + MoEBenchmarkTestDefinition +) from cloudai.workloads.jax_toolbox import ( GPTTestDefinition, GrokTestDefinition, @@ -687,7 +690,7 @@ def test_default_reporters_size(self): "tdef,expected_reporters", [ (ChakraReplayTestDefinition, {ChakraReplayReportGenerationStrategy}), - (DeepEPTestDefinition, {DeepEPReportGenerationStrategy}), + (MoEBenchmarkTestDefinition, {MoEBenchmarkReportGenerationStrategy}), (GPTTestDefinition, {JaxToolboxReportGenerationStrategy}), (GrokTestDefinition, {JaxToolboxReportGenerationStrategy}), ( From f81856e6e49edb47a1f3039158623af81b8aa566 Mon Sep 17 00:00:00 2001 From: ybenabou Date: Tue, 2 Jun 2026 18:49:01 +0300 Subject: [PATCH 4/4] fix pytest --- .../test/moe_benchmark_standard.toml | 3 - .../test/nccl_test_alltoallv.toml | 2 +- src/cloudai/registration.py | 6 +- src/cloudai/workloads/deepep/deepep.py | 2 - .../deepep/slurm_command_gen_strategy.py | 14 ++-- .../slurm_command_gen_strategy.py | 10 +-- .../moe_benchmark/throughput_reporter.py | 72 ++++++++++++------- tests/ref_data/deepep-benchmark.sbatch | 10 ++- tests/ref_data/moe-benchmark.sbatch | 31 ++++++++ tests/test_acceptance.py | 16 ++++- tests/test_init.py | 12 ++-- tests/test_test_scenario.py | 8 +-- 12 files changed, 124 insertions(+), 62 deletions(-) create mode 100644 tests/ref_data/moe-benchmark.sbatch diff --git a/conf/experimental/test/moe_benchmark_standard.toml b/conf/experimental/test/moe_benchmark_standard.toml index 612f2c8d5..b30560d7e 100644 --- a/conf/experimental/test/moe_benchmark_standard.toml +++ b/conf/experimental/test/moe_benchmark_standard.toml @@ -30,6 +30,3 @@ results_dir = "/workspace/dp-benchmark/results" [extra_env_vars] NUM_QPS_PER_RANK = "12" NUM_SMS = "24" - - - diff --git a/conf/experimental/test/nccl_test_alltoallv.toml b/conf/experimental/test/nccl_test_alltoallv.toml index da29f48d6..f314a1c64 100644 --- a/conf/experimental/test/nccl_test_alltoallv.toml +++ b/conf/experimental/test/nccl_test_alltoallv.toml @@ -34,4 +34,4 @@ use_deepep_matrix = true [extra_env_vars] NCCL_P2P_DISABLE = "1" -NCCL_SHM_DISABLE = "1" \ No newline at end of file +NCCL_SHM_DISABLE = "1" diff --git a/src/cloudai/registration.py b/src/cloudai/registration.py index 8a6a2a63b..c53d14124 100644 --- a/src/cloudai/registration.py +++ b/src/cloudai/registration.py @@ -318,7 +318,11 @@ def register_all(): Registry().add_report(VllmTestDefinition, VLLMBenchReportGenerationStrategy) Registry().add_scenario_report("per_test", PerTestReporter, ReportConfig(enable=True)) - Registry().add_scenario_report("moe_benchmark_throughput", MoEBenchmarkThroughputReporter, ReportConfig(enable=True)) + Registry().add_scenario_report( + "moe_benchmark_throughput", + MoEBenchmarkThroughputReporter, + ReportConfig(enable=True), + ) Registry().add_scenario_report("status", StatusReporter, ReportConfig(enable=True)) Registry().add_scenario_report("dse", DSEReporter, ReportConfig(enable=True)) Registry().add_scenario_report("tarball", TarballReporter, ReportConfig(enable=True)) diff --git a/src/cloudai/workloads/deepep/deepep.py b/src/cloudai/workloads/deepep/deepep.py index bafb70804..23972ee08 100644 --- a/src/cloudai/workloads/deepep/deepep.py +++ b/src/cloudai/workloads/deepep/deepep.py @@ -46,8 +46,6 @@ class DeepEPCmdArgs(CmdArgs): shrink_test: bool = False disable_nvlink: bool = False use_logfmt: bool = False - shuffle_expert_columns: bool = False - shuffle_seed: int = 1 # V2 elastic/test_ep flags. num_sms: int = 0 diff --git a/src/cloudai/workloads/deepep/slurm_command_gen_strategy.py b/src/cloudai/workloads/deepep/slurm_command_gen_strategy.py index 5759a87a3..60be5c1c0 100644 --- a/src/cloudai/workloads/deepep/slurm_command_gen_strategy.py +++ b/src/cloudai/workloads/deepep/slurm_command_gen_strategy.py @@ -155,9 +155,14 @@ def image_path(self) -> str | None: def _script_path(self, cmd_args: DeepEPCmdArgs) -> str: deep_ep_root = PurePosixPath(cmd_args.deep_ep_root) if cmd_args.subtest_name in _LEGACY_SUBTESTS: - tests_root = PurePosixPath(cmd_args.legacy_tests_root) if cmd_args.legacy_tests_root else deep_ep_root / "tests" / "legacy" + if cmd_args.legacy_tests_root: + tests_root = PurePosixPath(cmd_args.legacy_tests_root) + else: + tests_root = deep_ep_root / "tests" / "legacy" + elif cmd_args.elastic_tests_root: + tests_root = PurePosixPath(cmd_args.elastic_tests_root) else: - tests_root = PurePosixPath(cmd_args.elastic_tests_root) if cmd_args.elastic_tests_root else deep_ep_root / "tests" / "elastic" + tests_root = deep_ep_root / "tests" / "elastic" return str(tests_root / f"{cmd_args.subtest_name}.py") @@ -167,10 +172,7 @@ def _append_cli_field(self, parts: list[str], field_name: str, value: Any) -> No flag = _flag_name(field_name) if isinstance(value, bool): - if field_name in _BOOL_VALUE_FIELDS: - if value: - parts.extend([flag, str(value)]) - elif value: + if value: parts.append(flag) return diff --git a/src/cloudai/workloads/moe_benchmark/slurm_command_gen_strategy.py b/src/cloudai/workloads/moe_benchmark/slurm_command_gen_strategy.py index ac1c56aa4..88c9d872a 100644 --- a/src/cloudai/workloads/moe_benchmark/slurm_command_gen_strategy.py +++ b/src/cloudai/workloads/moe_benchmark/slurm_command_gen_strategy.py @@ -42,7 +42,7 @@ def _container_mounts(self) -> List[str]: cmd_args: MoEBenchmarkCmdArgs = tdef.cmd_args config_file_path = self.test_run.output_path / "config.yaml" - self._generate_config_yaml(config_file_path, cmd_args) + self._generate_config_yaml(config_file_path) return [ f"{config_file_path.absolute()}:{cmd_args.config_file_path}", @@ -57,16 +57,12 @@ def generate_test_command(self) -> List[str]: tdef: MoEBenchmarkTestDefinition = cast(MoEBenchmarkTestDefinition, self.test_run.test) cmd_args: MoEBenchmarkCmdArgs = tdef.cmd_args - if cmd_args.mode == "standard": - script_name = "benchmark.py" - else: - script_name = "benchmark_ll.py" - + script_name = "benchmark.py" if cmd_args.mode == "standard" else "benchmark_ll.py" benchmark_script = str(PurePosixPath(cmd_args.benchmark_root) / script_name) return ["python", benchmark_script, cmd_args.config_file_path] - def _generate_config_yaml(self, config_path: Path, cmd_args: MoEBenchmarkCmdArgs) -> None: + def _generate_config_yaml(self, config_path: Path) -> None: tdef: MoEBenchmarkTestDefinition = cast(MoEBenchmarkTestDefinition, self.test_run.test) config_lines = ["# MoE Benchmark Configuration", "# Generated by CloudAI", ""] diff --git a/src/cloudai/workloads/moe_benchmark/throughput_reporter.py b/src/cloudai/workloads/moe_benchmark/throughput_reporter.py index 43af1641d..657570ccd 100644 --- a/src/cloudai/workloads/moe_benchmark/throughput_reporter.py +++ b/src/cloudai/workloads/moe_benchmark/throughput_reporter.py @@ -12,7 +12,7 @@ import re from pathlib import Path -from cloudai._core.base_reporter import Reporter +from cloudai.core import Reporter from cloudai.workloads.moe_benchmark.combined_report import moe_benchmark_results_json_files from cloudai.workloads.moe_benchmark.moe_benchmark import MoEBenchmarkTestDefinition from cloudai.workloads.nccl_test.nccl import NCCLTestDefinition @@ -20,34 +20,41 @@ from cloudai.workloads.ucc_test.ucc import UCCTestDefinition -def _moe_benchmark_dispatch_combine_bars(test_output: Path) -> list[tuple[str, float, str]]: - """From latest ``results.json``: one bar per ``dispatch`` / ``combine`` row (``bus_bw_avg``).""" +def _read_latest_moe_results_rows(test_output: Path) -> list[object]: paths = moe_benchmark_results_json_files(test_output) if not paths: return [] latest = max(paths, key=lambda p: p.stat().st_mtime) try: - rows = json.loads(latest.read_text(encoding="utf-8")) + data = json.loads(latest.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError) as e: logging.debug("MoE benchmark results.json unreadable %s: %s", latest, e) return [] - if not isinstance(rows, list): - return [] + return data if isinstance(data, list) else [] + +def _extract_moe_bus_bw(row: object) -> tuple[str, float] | None: + if not isinstance(row, dict): + return None + op = row.get("operation") + if not isinstance(op, str) or "bus_bw_avg" not in row: + return None + op_l = op.lower() + if op_l not in ("dispatch", "combine"): + return None + try: + return op_l, float(row["bus_bw_avg"]) + except (TypeError, ValueError): + return None + + +def _moe_benchmark_dispatch_combine_bars(test_output: Path) -> list[tuple[str, float, str]]: + """From latest ``results.json``: one bar per ``dispatch`` / ``combine`` row (``bus_bw_avg``).""" by_op: dict[str, float] = {} - for row in rows: - if not isinstance(row, dict): - continue - op = row.get("operation") - if not isinstance(op, str) or "bus_bw_avg" not in row: - continue - op_l = op.lower() - if op_l not in ("dispatch", "combine"): - continue - try: - by_op[op_l] = float(row["bus_bw_avg"]) - except (TypeError, ValueError): - continue + for row in _read_latest_moe_results_rows(test_output): + extracted = _extract_moe_bus_bw(row) + if extracted is not None: + by_op[extracted[0]] = extracted[1] out: list[tuple[str, float, str]] = [] if "dispatch" in by_op: @@ -154,9 +161,11 @@ def ypx(v: float) -> float: gy = y0 - g * ih parts.append(f'') gv = vmin + g * (vmax - vmin) - parts.append(f'{gv:.1f}') + parts.append( + f'{gv:.1f}' + ) - for cx, val, col, lab in zip(centers, values, colors, labels, strict=True): + for cx, val, col, _ in zip(centers, values, colors, labels, strict=True): top = ypx(val) x1 = cx - bar_w / 2 hbar = y0 - top @@ -166,11 +175,24 @@ def ypx(v: float) -> float: ) for (cx, cy), val, col, lab in zip(pts, values, colors, labels, strict=True): - parts.append(f'') - parts.append(f'{val:.2f}') - parts.append(f'{html.escape(lab)}') + col_esc = html.escape(col) + parts.append( + f'' + ) + parts.append( + f'{val:.2f}' + ) + parts.append( + f'' + f"{html.escape(lab)}" + ) - parts.append(f'{html.escape(y_axis_label)}') + y_axis_mid = mt + ih / 2 + parts.append( + f'{html.escape(y_axis_label)}' + ) leg_y = y0 + 38 parts.append(f'Summary') diff --git a/tests/ref_data/deepep-benchmark.sbatch b/tests/ref_data/deepep-benchmark.sbatch index f3eb086e2..c5e15e9e6 100644 --- a/tests/ref_data/deepep-benchmark.sbatch +++ b/tests/ref_data/deepep-benchmark.sbatch @@ -7,6 +7,7 @@ #SBATCH -N 2 #SBATCH --gpus-per-node=8 #SBATCH --gres=gpu:8 +#SBATCH --ntasks-per-node=1 export SLURM_JOB_MASTER_NODE=$(scontrol show hostname $SLURM_JOB_NODELIST | head -n 1) @@ -19,9 +20,12 @@ echo Nodes: $SLURM_JOB_NODELIST echo Num Nodes: ${#nodes[@]} echo Head Node IP: $head_node_ip +export MASTER_ADDR=$head_node_ip +export MASTER_PORT=29500 -srun --export=ALL --mpi=pmix -N2 --container-image=docker/image:url --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__OUTPUT_DIR__/output:__OUTPUT_DIR__/output,__OUTPUT_DIR__/output:/workspace/dp-benchmark/results --output=__OUTPUT_DIR__/output/mapping-stdout.txt --error=__OUTPUT_DIR__/output/mapping-stderr.txt bash -c "echo \$(date): \$(hostname):node \${SLURM_NODEID}:rank \${SLURM_PROCID}." -srun --export=ALL --mpi=pmix -N2 --container-image=docker/image:url --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__OUTPUT_DIR__/output:__OUTPUT_DIR__/output,__OUTPUT_DIR__/output:/workspace/dp-benchmark/results --ntasks=2 --ntasks-per-node=1 --output=__OUTPUT_DIR__/output/metadata/node-%N.toml --error=__OUTPUT_DIR__/output/metadata/nodes.err bash /cloudai_install/slurm-metadata.sh +srun --export=ALL --mpi=pmix -N2 --container-image=docker/image:url --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output --output=__OUTPUT_DIR__/output/mapping-stdout.txt --error=__OUTPUT_DIR__/output/mapping-stderr.txt bash -c "echo \$(date): \$(hostname):node \${SLURM_NODEID}:rank \${SLURM_PROCID}." -srun --export=ALL --mpi=pmix -N2 --container-image=docker/image:url --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__OUTPUT_DIR__/output:__OUTPUT_DIR__/output,__OUTPUT_DIR__/output:/workspace/dp-benchmark/results bash -c "source __OUTPUT_DIR__/output/env_vars.sh; torchrun --nnodes=2 --nproc_per_node=1 --rdzv_id=$RANDOM --rdzv_backend=c10d --rdzv_endpoint=$head_node_ip:29500 /workspace/dp-benchmark/benchmark/benchmark.py __OUTPUT_DIR__/output/config.yaml" +srun --export=ALL --mpi=pmix -N2 --container-image=docker/image:url --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output --ntasks=2 --ntasks-per-node=1 --output=__OUTPUT_DIR__/output/metadata/node-%N.toml --error=__OUTPUT_DIR__/output/metadata/nodes.err bash /cloudai_install/slurm-metadata.sh + +srun --export=ALL --mpi=pmix --nodes=2 --ntasks=2 --ntasks-per-node=1 --container-image=docker/image:url --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output bash -c "source __OUTPUT_DIR__/output/env_vars.sh; torchrun --nnodes=2 --nproc_per_node=1 --rdzv_id=\${SLURM_JOB_ID:-0} --rdzv_backend=c10d --rdzv_endpoint=\${MASTER_ADDR}:\${MASTER_PORT} /workspace/DeepEP/tests/legacy/test_internode.py --num-processes 8 --num-tokens 4096 --hidden 7168 --num-topk 8 --pressure-test-mode 0 --num-experts 256" diff --git a/tests/ref_data/moe-benchmark.sbatch b/tests/ref_data/moe-benchmark.sbatch new file mode 100644 index 000000000..e44c4a27c --- /dev/null +++ b/tests/ref_data/moe-benchmark.sbatch @@ -0,0 +1,31 @@ +#!/bin/bash +# generated by CloudAI@__CLOUDAI_VERSION__ +#SBATCH --job-name=__JOB_NAME__ +#SBATCH --output=__OUTPUT_DIR__/output/stdout.txt +#SBATCH --error=__OUTPUT_DIR__/output/stderr.txt +#SBATCH --partition=main +#SBATCH -N 2 +#SBATCH --gpus-per-node=8 +#SBATCH --gres=gpu:8 +#SBATCH --ntasks-per-node=8 + +export SLURM_JOB_MASTER_NODE=$(scontrol show hostname $SLURM_JOB_NODELIST | head -n 1) + +nodes=( $( scontrol show hostnames $SLURM_JOB_NODELIST ) ) +nodes_array=($nodes) +head_node=${nodes_array[0]} +head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address) + +echo Nodes: $SLURM_JOB_NODELIST +echo Num Nodes: ${#nodes[@]} +echo Head Node IP: $head_node_ip + +export MASTER_ADDR=$head_node_ip +export MASTER_PORT=29500 + + +srun --export=ALL --mpi=pmix -N2 --container-image=docker/image:url --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__OUTPUT_DIR__/output/config.yaml:/tmp/config.yaml,__OUTPUT_DIR__/output:/workspace/dp-benchmark/results --output=__OUTPUT_DIR__/output/mapping-stdout.txt --error=__OUTPUT_DIR__/output/mapping-stderr.txt bash -c "echo \$(date): \$(hostname):node \${SLURM_NODEID}:rank \${SLURM_PROCID}." + +srun --export=ALL --mpi=pmix -N2 --container-image=docker/image:url --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__OUTPUT_DIR__/output/config.yaml:/tmp/config.yaml,__OUTPUT_DIR__/output:/workspace/dp-benchmark/results --ntasks=2 --ntasks-per-node=1 --output=__OUTPUT_DIR__/output/metadata/node-%N.toml --error=__OUTPUT_DIR__/output/metadata/nodes.err bash /cloudai_install/slurm-metadata.sh + +srun --export=ALL --mpi=pmix -N2 --container-image=docker/image:url --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__OUTPUT_DIR__/output/config.yaml:/tmp/config.yaml,__OUTPUT_DIR__/output:/workspace/dp-benchmark/results bash -c "source __OUTPUT_DIR__/output/env_vars.sh; python /workspace/dp-benchmark/benchmark/benchmark.py /tmp/config.yaml" diff --git a/tests/test_acceptance.py b/tests/test_acceptance.py index 081bb5636..797d64e3a 100644 --- a/tests/test_acceptance.py +++ b/tests/test_acceptance.py @@ -278,6 +278,7 @@ def build_special_test_run( "nixl-perftest", "nixl-kvbench", "moe-benchmark", + "deepep-benchmark", "osu-bench", "sglang", "sglang-disagg", @@ -566,6 +567,19 @@ def test_req(request, slurm_system: SlurmSystem, partial_tr: partial[TestRun]) - ), ), ), + "deepep-benchmark": lambda: create_test_run( + partial_tr, + "deepep-benchmark", + DeepEPTestDefinition( + name="deepep-benchmark", + description="DeepEP internode test", + test_template_name="DeepEP", + cmd_args=DeepEPCmdArgs( + docker_image_url="docker/image:url", + subtest_name="test_internode", + ), + ), + ), "megatron-bridge": lambda: create_test_run( partial_tr, "megatron-bridge", @@ -702,7 +716,7 @@ def test_req(request, slurm_system: SlurmSystem, partial_tr: partial[TestRun]) - tr.num_nodes = 3 if request.param == "ai-dynamo": tr.num_nodes = 2 - if request.param == "moe-benchmark": + if request.param in {"moe-benchmark", "deepep-benchmark"}: tr.num_nodes = 2 if request.param in {"sglang-disagg-2nodes", "vllm-disagg-2nodes"}: tr.num_nodes = 2 diff --git a/tests/test_init.py b/tests/test_init.py index e84aa4ef7..26aa4da71 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -46,11 +46,6 @@ DynamoMockerStandaloneCommandGenStrategy, DynamoMockerTestDefinition, ) -from cloudai.workloads.moe_benchmark import ( - MoEBenchmarkSlurmCommandGenStrategy, - MoEBenchmarkTestDefinition, - MoEBenchmarkThroughputReporter, -) from cloudai.workloads.jax_toolbox import ( GPTTestDefinition, GrokTestDefinition, @@ -63,6 +58,11 @@ MegatronBridgeTestDefinition, ) from cloudai.workloads.megatron_run import MegatronRunSlurmCommandGenStrategy, MegatronRunTestDefinition +from cloudai.workloads.moe_benchmark import ( + MoEBenchmarkSlurmCommandGenStrategy, + MoEBenchmarkTestDefinition, + MoEBenchmarkThroughputReporter, +) from cloudai.workloads.nccl_test import ( NcclComparisonReport, NCCLTestDefinition, @@ -238,7 +238,7 @@ def test_installers(): def test_definitions(): test_defs = Registry().test_definitions_map - assert len(test_defs) == 26 + assert len(test_defs) == 27 for tdef in [ ("UCCTest", UCCTestDefinition), ("DDLBTest", DDLBTestDefinition), diff --git a/tests/test_test_scenario.py b/tests/test_test_scenario.py index 8b3ae905f..9da396b8a 100644 --- a/tests/test_test_scenario.py +++ b/tests/test_test_scenario.py @@ -40,14 +40,7 @@ from cloudai.workloads.ai_dynamo import AIDynamoReportGenerationStrategy, AIDynamoTestDefinition from cloudai.workloads.aiconfig import AiconfiguratorReportGenerationStrategy, AiconfiguratorTestDefinition from cloudai.workloads.chakra_replay import ChakraReplayReportGenerationStrategy, ChakraReplayTestDefinition -from cloudai.workloads.deepep import ( - DeepEPTestDefinition, -) from cloudai.workloads.dynamo_mocker import DynamoMockerReportGenerationStrategy, DynamoMockerTestDefinition -from cloudai.workloads.moe_benchmark import ( - MoEBenchmarkReportGenerationStrategy, - MoEBenchmarkTestDefinition -) from cloudai.workloads.jax_toolbox import ( GPTTestDefinition, GrokTestDefinition, @@ -61,6 +54,7 @@ MegatronRunReportGenerationStrategy, MegatronRunTestDefinition, ) +from cloudai.workloads.moe_benchmark import MoEBenchmarkReportGenerationStrategy, MoEBenchmarkTestDefinition from cloudai.workloads.nccl_test import ( NCCLCmdArgs, NCCLTestDefinition,