diff --git a/.gitignore b/.gitignore index 9b55990fb..0a75961e3 100644 --- a/.gitignore +++ b/.gitignore @@ -87,6 +87,7 @@ ehthumbs.db Thumbs.db *.log +slurm-* install/ results/ .* diff --git a/conf/experimental/test/deepep_low_latency.toml b/conf/experimental/test/deepep_low_latency.toml deleted file mode 100644 index b76342b4e..000000000 --- a/conf/experimental/test/deepep_low_latency.toml +++ /dev/null @@ -1,43 +0,0 @@ -# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES -# Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -name = "deepep_low_latency" -description = "DeepEP MoE Benchmark - Low Latency Mode" -test_template_name = "DeepEP" - -[cmd_args] -# Local .sqsh file: -# docker_image_url = "/.autodirect/mswg2/E2E/Regression_logs/squash/yoel/dp-benchmark-shuffle.sqsh" -# Container registry: -docker_image_url = "gitlab-master.nvidia.com/ybenabou/warehouse/deepep:dp-benchmark" - -mode = "low_latency" - -tokens = 128 -num_experts = 256 -num_topk = 1 -hidden_size = 7168 -data_type = "bfloat16" -allow_nvlink_for_low_latency = false -allow_mnnvl = false -round_scale = false -use_ue8m0 = false -num_warmups = 20 -num_iterations = 50 -shuffle_columns = false -use_kineto_profiler = false -config_file_path = "/tmp/config.yaml" -results_dir = "/workspace/dp-benchmark/results" diff --git a/conf/experimental/test/deepep_standard.toml b/conf/experimental/test/deepep_standard.toml deleted file mode 100644 index e50c4e6df..000000000 --- a/conf/experimental/test/deepep_standard.toml +++ /dev/null @@ -1,43 +0,0 @@ -# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES -# Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -name = "deepep_standard" -description = "DeepEP MoE Benchmark - Standard Mode" -test_template_name = "DeepEP" - -[cmd_args] -# Local .sqsh file: -# docker_image_url = "/.autodirect/mswg2/E2E/Regression_logs/squash/yoel/dp-benchmark-shuffle.sqsh" -# Container registry (uses your Docker credentials): -docker_image_url = "gitlab-master.nvidia.com/ybenabou/warehouse/deepep:dp-benchmark" - -mode = "standard" - -tokens = 1024 -num_experts = 256 -num_topk = 8 -hidden_size = 7168 -data_type = "bfloat16" -allow_nvlink_for_low_latency = false -allow_mnnvl = false -round_scale = false -use_ue8m0 = false -num_warmups = 20 -num_iterations = 50 -shuffle_columns = false -use_kineto_profiler = false -config_file_path = "/tmp/config.yaml" -results_dir = "/workspace/dp-benchmark/results" diff --git a/conf/experimental/test/deepep_test_ep_v2.toml b/conf/experimental/test/deepep_test_ep_v2.toml new file mode 100644 index 000000000..5f164fecb --- /dev/null +++ b/conf/experimental/test/deepep_test_ep_v2.toml @@ -0,0 +1,27 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +name = "deepep_test_ep_v2" +description = "Official DeepEP V2 elastic test_ep" +test_template_name = "DeepEP" + +[cmd_args] +docker_image_url = "/your/path/to/the/container" +subtest_name = "test_ep" +elastic_tests_root = "/path/in/the/container/to/the/tests/folder" +num_processes = 8 +num_sms = 0 +num_qps = 0 +num_allocated_qps = 0 +num_tokens = 4096 +hidden = 7168 +num_topk = 8 +num_experts = 256 +do_cpu_sync = 1 +allow_hybrid_mode = 1 +allow_multiple_reduction = 1 +prefer_overlap_with_compute = 0 +seed = 0 +skip_check = false +skip_perf_test = false diff --git a/conf/experimental/test/deepep_test_internode.toml b/conf/experimental/test/deepep_test_internode.toml new file mode 100644 index 000000000..ab569ef12 --- /dev/null +++ b/conf/experimental/test/deepep_test_internode.toml @@ -0,0 +1,19 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +name = "deepep_test_internode" +description = "Official DeepEP V1 legacy test_internode" +test_template_name = "DeepEP" + +[cmd_args] +docker_image_url = "/your/path/to/the/container" +subtest_name = "test_internode" +legacy_tests_root = "/path/in/the/container/to/the/tests/folder" +num_processes = 8 +num_tokens = 4096 +hidden = 7168 +num_topk = 8 +num_experts = 256 +pressure_test_mode = 0 +test_ll_compatibility = false diff --git a/conf/experimental/test/deepep_test_intranode.toml b/conf/experimental/test/deepep_test_intranode.toml new file mode 100644 index 000000000..f8d68e0b2 --- /dev/null +++ b/conf/experimental/test/deepep_test_intranode.toml @@ -0,0 +1,18 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +name = "deepep_test_intranode" +description = "Official DeepEP V1 legacy test_intranode" +test_template_name = "DeepEP" + +[cmd_args] +docker_image_url = "/your/path/to/the/container" +subtest_name = "test_intranode" +legacy_tests_root = "/path/in/the/container/to/the/tests/folder" +num_processes = 8 +num_tokens = 4096 +hidden = 7168 +num_topk = 8 +num_experts = 256 +allow_mnnvl = false diff --git a/conf/experimental/test/deepep_test_low_latency.toml b/conf/experimental/test/deepep_test_low_latency.toml new file mode 100644 index 000000000..270923214 --- /dev/null +++ b/conf/experimental/test/deepep_test_low_latency.toml @@ -0,0 +1,22 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +name = "deepep_test_low_latency" +description = "Official DeepEP V1 legacy test_low_latency" +test_template_name = "DeepEP" + +[cmd_args] +docker_image_url = "/your/path/to/the/container" +subtest_name = "test_low_latency" +legacy_tests_root = "/path/in/the/container/to/the/tests/folder" +num_processes = 8 +num_tokens = 128 +hidden = 7168 +num_topk = 8 +num_experts = 288 +allow_mnnvl = false +disable_nvlink = false +use_logfmt = false +pressure_test = false +shrink_test = false diff --git a/conf/experimental/test/moe_benchmark_low_latency.toml b/conf/experimental/test/moe_benchmark_low_latency.toml new file mode 100644 index 000000000..9d1a7eefc --- /dev/null +++ b/conf/experimental/test/moe_benchmark_low_latency.toml @@ -0,0 +1,32 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +name = "moe_benchmark_low_latency" +description = "MoE Benchmark - DeepEP low-latency mode plus matrix export" +test_template_name = "MoEBenchmark" + +[cmd_args] +docker_image_url = "/your/path/to/the/container" +benchmark_root = "/path/in/the/container/to/the/tests/folder" +mode = "low_latency" +tokens = 128 +num_experts = 288 +num_topk = 8 +hidden_size = 7168 +data_type = "bfloat16" +allow_nvlink_for_low_latency = false +allow_mnnvl = false +round_scale = false +use_ue8m0 = false +num_warmups = 20 +num_iterations = 50 +shuffle_columns = false +use_kineto_profiler = false +enable_tuning = false +config_file_path = "/tmp/config.yaml" +results_dir = "/workspace/dp-benchmark/results" + +[extra_env_vars] +NUM_QPS_PER_RANK = "12" +NUM_SMS = "24" diff --git a/conf/experimental/test/moe_benchmark_standard.toml b/conf/experimental/test/moe_benchmark_standard.toml new file mode 100644 index 000000000..b30560d7e --- /dev/null +++ b/conf/experimental/test/moe_benchmark_standard.toml @@ -0,0 +1,32 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +name = "moe_benchmark_standard" +description = "MoE Benchmark - DeepEP standard mode plus matrix export" +test_template_name = "MoEBenchmark" + +[cmd_args] +docker_image_url = "/your/path/to/the/container" +benchmark_root = "/workspace/dp-benchmark/benchmark" +mode = "standard" +tokens = 4096 +num_experts = 256 +num_topk = 8 +hidden_size = 7168 +data_type = "bfloat16" +allow_nvlink_for_low_latency = false +allow_mnnvl = false +round_scale = false +use_ue8m0 = false +num_warmups = 20 +num_iterations = 50 +shuffle_columns = false +use_kineto_profiler = false +enable_tuning = false +config_file_path = "/tmp/config.yaml" +results_dir = "/workspace/dp-benchmark/results" + +[extra_env_vars] +NUM_QPS_PER_RANK = "12" +NUM_SMS = "24" diff --git a/conf/experimental/test_scenario/deepep.toml b/conf/experimental/test/nccl_test_alltoallv.toml similarity index 58% rename from conf/experimental/test_scenario/deepep.toml rename to conf/experimental/test/nccl_test_alltoallv.toml index 95335d20e..f314a1c64 100644 --- a/conf/experimental/test_scenario/deepep.toml +++ b/conf/experimental/test/nccl_test_alltoallv.toml @@ -1,5 +1,5 @@ # SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES -# Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,16 +14,24 @@ # See the License for the specific language governing permissions and # limitations under the License. -name = "deepep-benchmark" +name = "nccl_test_alltoallv" +description = "NCCL AlltoAllv" +test_template_name = "NcclTest" -[[Tests]] -id = "Tests.1" -test_name = "deepep_standard" -num_nodes = 2 -time_limit = "00:30:00" +[cmd_args] +docker_image_url = "/your/path/to/the/container" +subtest_name = "alltoallv_perf_mpi" +nthreads = 1 +ngpus = 1 +minbytes = "512M" +maxbytes = "512M" +stepfactor = 2 +iters = 10 +warmup_iters = 1 +check = 1 +blocking = 0 +use_deepep_matrix = true -[[Tests]] -id = "Tests.2" -test_name = "deepep_low_latency" -num_nodes = 2 -time_limit = "00:30:00" +[extra_env_vars] +NCCL_P2P_DISABLE = "1" +NCCL_SHM_DISABLE = "1" diff --git a/conf/experimental/test/ucc_alltoallv_deepep.toml b/conf/experimental/test/ucc_alltoallv_deepep.toml new file mode 100644 index 000000000..97110e0b6 --- /dev/null +++ b/conf/experimental/test/ucc_alltoallv_deepep.toml @@ -0,0 +1,40 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name = "ucc_alltoallv_deepep" +description = "UCC AlltoAllv" +test_template_name = "UCCTest" + +[cmd_args] +docker_image_url = "/your/path/to/the/container" +collective = "alltoallv" +b = 1 +e = "8M" +use_deepep_matrix = true + +[extra_env_vars] +UCX_IB_GID_INDEX = "auto" +UCX_TLS = "cuda_copy,rc" +UCX_RNDV_THRESH = "0" +UCX_RNDV_SCHEME = "get_zcopy" +MELLANOX_VISIBLE_DEVICES = "0,3,4,5,6,9,10,11" +CUDA_VISIBLE_DEVICES = "0,1,2,3,4,5,6,7" +UCC_CL_HIER_FULL_SBGP_TLS = "ucp" +UCC_CL_HIER_NODE_SBGP_TLS = "cuda" +UCC_TLS = "ucp,cuda" +UCC_CL_HIER_TUNE = "alltoallv:0-inf:@node_split" +UCC_TL_UCP_ALLTOALLV_PAIRWISE_NUM_POSTS = "8" +UCC_CLS = "basic,hier" diff --git a/conf/experimental/test_scenario/deepep_official.toml b/conf/experimental/test_scenario/deepep_official.toml new file mode 100644 index 000000000..9cc43065f --- /dev/null +++ b/conf/experimental/test_scenario/deepep_official.toml @@ -0,0 +1,11 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +name = "deepep-official-tests" + +[[Tests]] +id = "Tests.deepep_test_internode" +test_name = "deepep_test_internode" +num_nodes = 2 +time_limit = "00:30:00" diff --git a/conf/experimental/test_scenario/moe_benchmark.toml b/conf/experimental/test_scenario/moe_benchmark.toml new file mode 100644 index 000000000..c4c607613 --- /dev/null +++ b/conf/experimental/test_scenario/moe_benchmark.toml @@ -0,0 +1,29 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +name = "moe-benchmark" + +[[Tests]] +id = "Tests.moe_benchmark" +test_name = "moe_benchmark_standard" +num_nodes = 2 +time_limit = "00:30:00" + +[[Tests]] +id = "Tests.ucc_alltoallv" +test_name = "ucc_alltoallv_deepep" +num_nodes = 2 +time_limit = "00:30:00" + [[Tests.dependencies]] + type = "start_post_comp" + id = "Tests.moe_benchmark" + +[[Tests]] +id = "Tests.nccl_alltoallv" +test_name = "nccl_test_alltoallv" +num_nodes = 2 +time_limit = "00:30:00" + [[Tests.dependencies]] + type = "start_post_comp" + id = "Tests.ucc_alltoallv" diff --git a/src/cloudai/registration.py b/src/cloudai/registration.py index 4305f1355..c53d14124 100644 --- a/src/cloudai/registration.py +++ b/src/cloudai/registration.py @@ -79,10 +79,15 @@ def register_all(): DDLBTestSlurmCommandGenStrategy, ) from cloudai.workloads.deepep import ( - DeepEPReportGenerationStrategy, DeepEPSlurmCommandGenStrategy, DeepEPTestDefinition, ) + from cloudai.workloads.moe_benchmark import ( + MoEBenchmarkReportGenerationStrategy, + MoEBenchmarkSlurmCommandGenStrategy, + MoEBenchmarkTestDefinition, + MoEBenchmarkThroughputReporter, + ) from cloudai.workloads.dynamo_mocker import ( DynamoMockerReportGenerationStrategy, DynamoMockerStandaloneCommandGenStrategy, @@ -233,6 +238,7 @@ def register_all(): Registry().add_command_gen_strategy(SlurmSystem, ChakraReplayTestDefinition, ChakraReplaySlurmCommandGenStrategy) Registry().add_command_gen_strategy(SlurmSystem, DeepEPTestDefinition, DeepEPSlurmCommandGenStrategy) + Registry().add_command_gen_strategy(SlurmSystem, MoEBenchmarkTestDefinition, MoEBenchmarkSlurmCommandGenStrategy) Registry().add_command_gen_strategy(SlurmSystem, SlurmContainerTestDefinition, SlurmContainerCommandGenStrategy) Registry().add_command_gen_strategy( SlurmSystem, TritonInferenceTestDefinition, TritonInferenceSlurmCommandGenStrategy @@ -262,6 +268,7 @@ def register_all(): Registry().add_test_definition("DDLBTest", DDLBTestDefinition) Registry().add_test_definition("ChakraReplay", ChakraReplayTestDefinition) Registry().add_test_definition("DeepEP", DeepEPTestDefinition) + Registry().add_test_definition("MoEBenchmark", MoEBenchmarkTestDefinition) Registry().add_test_definition("Sleep", SleepTestDefinition) Registry().add_test_definition("NeMoLauncher", NeMoLauncherTestDefinition) Registry().add_test_definition("NeMoRun", NeMoRunTestDefinition) @@ -287,7 +294,7 @@ def register_all(): Registry().add_agent("grid_search", GridSearchAgent) Registry().add_report(ChakraReplayTestDefinition, ChakraReplayReportGenerationStrategy) - Registry().add_report(DeepEPTestDefinition, DeepEPReportGenerationStrategy) + Registry().add_report(MoEBenchmarkTestDefinition, MoEBenchmarkReportGenerationStrategy) Registry().add_report(GPTTestDefinition, JaxToolboxReportGenerationStrategy) Registry().add_report(GrokTestDefinition, JaxToolboxReportGenerationStrategy) Registry().add_report(MegatronRunTestDefinition, CheckpointTimingReportGenerationStrategy) @@ -311,6 +318,11 @@ def register_all(): Registry().add_report(VllmTestDefinition, VLLMBenchReportGenerationStrategy) Registry().add_scenario_report("per_test", PerTestReporter, ReportConfig(enable=True)) + Registry().add_scenario_report( + "moe_benchmark_throughput", + MoEBenchmarkThroughputReporter, + ReportConfig(enable=True), + ) Registry().add_scenario_report("status", StatusReporter, ReportConfig(enable=True)) Registry().add_scenario_report("dse", DSEReporter, ReportConfig(enable=True)) Registry().add_scenario_report("tarball", TarballReporter, ReportConfig(enable=True)) diff --git a/src/cloudai/workloads/deepep/__init__.py b/src/cloudai/workloads/deepep/__init__.py index f145a3b3a..2732ac2cd 100644 --- a/src/cloudai/workloads/deepep/__init__.py +++ b/src/cloudai/workloads/deepep/__init__.py @@ -15,12 +15,10 @@ # limitations under the License. from .deepep import DeepEPCmdArgs, DeepEPTestDefinition -from .report_generation_strategy import DeepEPReportGenerationStrategy from .slurm_command_gen_strategy import DeepEPSlurmCommandGenStrategy __all__ = [ "DeepEPCmdArgs", - "DeepEPReportGenerationStrategy", "DeepEPSlurmCommandGenStrategy", "DeepEPTestDefinition", ] diff --git a/src/cloudai/workloads/deepep/deepep.py b/src/cloudai/workloads/deepep/deepep.py index 1b01c88b6..23972ee08 100644 --- a/src/cloudai/workloads/deepep/deepep.py +++ b/src/cloudai/workloads/deepep/deepep.py @@ -14,39 +14,68 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Literal, Optional +from pathlib import PurePosixPath +from typing import ClassVar, Literal, Optional from cloudai.core import DockerImage, Installable from cloudai.models.workload import CmdArgs, TestDefinition class DeepEPCmdArgs(CmdArgs): - """DeepEP benchmark command arguments.""" + """Command arguments for the official DeepEP test scripts.""" docker_image_url: str - mode: Literal["standard", "low_latency"] = "standard" - tokens: int = 1024 - num_experts: int = 256 + subtest_name: Literal["test_internode", "test_intranode", "test_low_latency", "test_ep"] = "test_internode" + deep_ep_root: str = "/workspace/DeepEP" + legacy_tests_root: Optional[str] = None + elastic_tests_root: Optional[str] = None + python_executable: str = "python" + + num_processes: int = 8 + num_tokens: int = 4096 + hidden: int = 7168 num_topk: int = 8 - hidden_size: int = 7168 - data_type: Literal["bfloat16", "fp8"] = "bfloat16" - allow_nvlink_for_low_latency: bool = False + num_experts: int = 256 + + # V1 legacy internode/intranode/low-latency flags. + num_topk_groups: Optional[int] = None allow_mnnvl: bool = False - round_scale: bool = False - use_ue8m0: bool = False - num_warmups: int = 20 - num_iterations: int = 50 - shuffle_columns: bool = False - use_kineto_profiler: bool = False - num_sms: int = 24 - num_qps_per_rank: int = 12 - config_file_path: str = "/tmp/config.yaml" - results_dir: str = "/workspace/dp-benchmark/results" + test_ll_compatibility: bool = False + pressure_test_mode: int = 0 + pressure_test: bool = False + shrink_test: bool = False + disable_nvlink: bool = False + use_logfmt: bool = False + + # V2 elastic/test_ep flags. + num_sms: int = 0 + num_qps: int = 0 + num_allocated_qps: int = 0 + num_gpu_timeout_secs: int = 100 + num_cpu_timeout_secs: int = 100 + sl_idx: int = 0 + do_cpu_sync: int = 1 + allow_hybrid_mode: int = 1 + allow_multiple_reduction: int = 1 + prefer_overlap_with_compute: int = 0 + deterministic: bool = False + seed: int = 0 + skip_check: bool = False + skip_perf_test: bool = False + do_pressure_test: bool = False + reuse_elastic_buffer: bool = False + test_first_only: bool = False + unbalanced_ratio: float = 1.0 + precise_unbalanced_ratio: bool = False + masked_ratio: float = 0.0 + dump_profile_traces: str = "" + ignore_local_traffic: bool = False class DeepEPTestDefinition(TestDefinition): - """Test object for DeepEP MoE benchmark.""" + """Test object for official DeepEP v1/v2 test scripts.""" + container_runtime_root: ClassVar[str] = "/workspace/DeepEP" cmd_args: DeepEPCmdArgs _docker_image: Optional[DockerImage] = None @@ -54,7 +83,7 @@ class DeepEPTestDefinition(TestDefinition): def docker_image(self) -> DockerImage: if not self._docker_image: if not self.cmd_args.docker_image_url: - raise ValueError("docker_image_url is required for DeepEP benchmark") + raise ValueError("docker_image_url is required for DeepEP tests") self._docker_image = DockerImage(url=self.cmd_args.docker_image_url) return self._docker_image @@ -63,15 +92,5 @@ def installables(self) -> list[Installable]: return [self.docker_image] @property - def cmd_args_dict(self) -> dict: - """Return command arguments as dict, excluding CloudAI-specific fields.""" - return self.cmd_args.model_dump( - exclude={ - "docker_image_url", - "mode", - "num_sms", - "num_qps_per_rank", - "config_file_path", - "results_dir", - } - ) + def container_runtime_root_path(self) -> PurePosixPath: + return PurePosixPath(self.cmd_args.deep_ep_root or self.container_runtime_root) diff --git a/src/cloudai/workloads/deepep/slurm_command_gen_strategy.py b/src/cloudai/workloads/deepep/slurm_command_gen_strategy.py index 60bbf3000..60be5c1c0 100644 --- a/src/cloudai/workloads/deepep/slurm_command_gen_strategy.py +++ b/src/cloudai/workloads/deepep/slurm_command_gen_strategy.py @@ -1,37 +1,91 @@ # SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES -# Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from pathlib import Path -from typing import List, cast + +from pathlib import PurePosixPath +from typing import Any, List, cast from cloudai.systems.slurm import SlurmCommandGenStrategy from .deepep import DeepEPCmdArgs, DeepEPTestDefinition +_LEGACY_SUBTESTS = {"test_internode", "test_intranode", "test_low_latency"} + +_CLI_FIELDS_BY_SUBTEST = { + "test_internode": ( + "num_processes", + "num_tokens", + "hidden", + "num_topk_groups", + "num_topk", + "pressure_test_mode", + "num_experts", + "test_ll_compatibility", + ), + "test_intranode": ( + "num_processes", + "num_tokens", + "hidden", + "num_topk", + "num_experts", + "allow_mnnvl", + ), + "test_low_latency": ( + "num_processes", + "num_tokens", + "hidden", + "num_topk", + "num_experts", + "allow_mnnvl", + "disable_nvlink", + "use_logfmt", + "pressure_test", + "shrink_test", + ), + "test_ep": ( + "num_processes", + "num_sms", + "num_qps", + "num_allocated_qps", + "num_gpu_timeout_secs", + "num_cpu_timeout_secs", + "sl_idx", + "num_tokens", + "hidden", + "num_topk", + "num_experts", + "do_cpu_sync", + "allow_hybrid_mode", + "allow_multiple_reduction", + "prefer_overlap_with_compute", + "deterministic", + "seed", + "skip_check", + "skip_perf_test", + "do_pressure_test", + "reuse_elastic_buffer", + "test_first_only", + "unbalanced_ratio", + "precise_unbalanced_ratio", + "masked_ratio", + "dump_profile_traces", + "ignore_local_traffic", + ), +} + +def _flag_name(field_name: str) -> str: + return f"--{field_name.replace('_', '-')}" + + class DeepEPSlurmCommandGenStrategy(SlurmCommandGenStrategy): - """Command generation strategy for DeepEP benchmark on Slurm systems.""" + """Command generation strategy for official DeepEP v1/v2 tests.""" - def _append_head_node_detection(self, batch_script_content: List[str]) -> None: - """ - Append bash commands to detect head node IP for torchrun. + @property + def tdef(self) -> DeepEPTestDefinition: + return cast(DeepEPTestDefinition, self.test_run.test) - Args: - batch_script_content: The list of script lines to append to. - """ + def _append_head_node_detection(self, batch_script_content: List[str]) -> None: batch_script_content.extend( [ "", @@ -44,96 +98,107 @@ def _append_head_node_detection(self, batch_script_content: List[str]) -> None: "echo Num Nodes: ${#nodes[@]}", "echo Head Node IP: $head_node_ip", "", + "export MASTER_ADDR=$head_node_ip", + "export MASTER_PORT=29500", + "", ] ) def _append_sbatch_directives(self, batch_script_content: List[str]) -> None: - """ - Append SBATCH directives and head node detection setup for DeepEP. - - Args: - batch_script_content: The list of script lines to append to. - """ - super()._append_sbatch_directives(batch_script_content) + num_nodes, node_list = self.get_cached_nodes_spec() + + self._add_reservation(batch_script_content) + batch_script_content.append(f"#SBATCH --output={self.test_run.output_path.absolute() / 'stdout.txt'}") + batch_script_content.append(f"#SBATCH --error={self.test_run.output_path.absolute() / 'stderr.txt'}") + batch_script_content.append(f"#SBATCH --partition={self.system.default_partition}") + if self.system.account: + batch_script_content.append(f"#SBATCH --account={self.system.account}") + if node_list: + batch_script_content.append(f"#SBATCH --nodelist={','.join(node_list)}") + batch_script_content.append(f"#SBATCH -N {num_nodes}") + batch_script_content.append(f"#SBATCH --gpus-per-node={self.system.gpus_per_node}") + batch_script_content.append(f"#SBATCH --gres=gpu:{self.system.gpus_per_node}") + batch_script_content.append("#SBATCH --ntasks-per-node=1") + if self.test_run.time_limit: + batch_script_content.append(f"#SBATCH --time={self.test_run.time_limit}") + batch_script_content.append( + "\nexport SLURM_JOB_MASTER_NODE=$(scontrol show hostname $SLURM_JOB_NODELIST | head -n 1)" + ) self._append_head_node_detection(batch_script_content) - def _container_mounts(self) -> List[str]: - """Return container mounts specific to DeepEP benchmark.""" - tdef: DeepEPTestDefinition = cast(DeepEPTestDefinition, self.test_run.test) - cmd_args: DeepEPCmdArgs = tdef.cmd_args + def _gen_srun_command(self) -> str: + srun_command_parts = self.gen_srun_prefix(use_pretest_extras=True, with_num_nodes=False) + num_nodes, _ = self.get_cached_nodes_spec() + srun_command_parts.extend([f"--nodes={num_nodes}", f"--ntasks={num_nodes}", "--ntasks-per-node=1"]) - config_file_path = self.test_run.output_path / "config.yaml" - self._generate_config_yaml(config_file_path, cmd_args) + nsys_command_parts = self.gen_nsys_command() + test_command_parts = self.generate_test_command() - mounts = [ - f"{config_file_path.parent.absolute()}:{config_file_path.parent.absolute()}", - f"{self.test_run.output_path.absolute()}:{cmd_args.results_dir}", - ] + with (self.test_run.output_path / "env_vars.sh").open("w") as f: + for key, value in self.final_env_vars.items(): + f.write(f'export {key}="{value}"\n') - return mounts + full_test_cmd = ( + f'bash -c "source {(self.test_run.output_path / "env_vars.sh").absolute()}; ' + + " ".join(nsys_command_parts + test_command_parts) + + '"' + ) - def image_path(self) -> str | None: - """Return the Docker image path for DeepEP benchmark.""" - tdef: DeepEPTestDefinition = cast(DeepEPTestDefinition, self.test_run.test) - return str(tdef.docker_image.installed_path) + return " ".join(srun_command_parts) + " " + full_test_cmd - def generate_test_command(self) -> List[str]: - """Generate the test command for DeepEP benchmark.""" - tdef: DeepEPTestDefinition = cast(DeepEPTestDefinition, self.test_run.test) - cmd_args: DeepEPCmdArgs = tdef.cmd_args + def _container_mounts(self) -> list[str]: + return [] - if cmd_args.mode == "standard": - benchmark_script = "/workspace/dp-benchmark/benchmark/benchmark.py" + def image_path(self) -> str | None: + return str(self.tdef.docker_image.installed_path) + + def _script_path(self, cmd_args: DeepEPCmdArgs) -> str: + deep_ep_root = PurePosixPath(cmd_args.deep_ep_root) + if cmd_args.subtest_name in _LEGACY_SUBTESTS: + if cmd_args.legacy_tests_root: + tests_root = PurePosixPath(cmd_args.legacy_tests_root) + else: + tests_root = deep_ep_root / "tests" / "legacy" + elif cmd_args.elastic_tests_root: + tests_root = PurePosixPath(cmd_args.elastic_tests_root) else: - benchmark_script = "/workspace/dp-benchmark/benchmark/benchmark_ll.py" + tests_root = deep_ep_root / "tests" / "elastic" + + return str(tests_root / f"{cmd_args.subtest_name}.py") - _, nodes = self.system.get_nodes_by_spec(self.test_run.nnodes, self.test_run.nodes) - num_nodes = len(nodes) if nodes else self.test_run.nnodes + def _append_cli_field(self, parts: list[str], field_name: str, value: Any) -> None: + if value is None or value == "": + return - config_file_path = self.test_run.output_path / "config.yaml" - command_parts = [ + flag = _flag_name(field_name) + if isinstance(value, bool): + if value: + parts.append(flag) + return + + parts.extend([flag, str(value)]) + + def generate_test_command(self) -> List[str]: + cmd_args = self.tdef.cmd_args + num_nodes, _ = self.get_cached_nodes_spec() + parts: list[str] = [ "torchrun", f"--nnodes={num_nodes}", "--nproc_per_node=1", - "--rdzv_id=$RANDOM", + "--rdzv_id=\\${SLURM_JOB_ID:-0}", "--rdzv_backend=c10d", - "--rdzv_endpoint=$head_node_ip:29500", - benchmark_script, - str(config_file_path.absolute()), + "--rdzv_endpoint=\\${MASTER_ADDR}:\\${MASTER_PORT}", + self._script_path(cmd_args), ] - return command_parts - - def _generate_config_yaml(self, config_path: Path, cmd_args: DeepEPCmdArgs) -> None: - """ - Generate YAML configuration file for DeepEP benchmark. - - Args: - config_path: Path where to write the config file. - cmd_args: Command arguments for the benchmark. - """ - tdef: DeepEPTestDefinition = cast(DeepEPTestDefinition, self.test_run.test) - - config_lines = [ - "# DeepEP Benchmark Configuration", - "# Generated by CloudAI", - "", - ] - - for key, value in tdef.cmd_args_dict.items(): - if isinstance(value, bool): - config_lines.append(f"{key}: {str(value).lower()}") - elif isinstance(value, str): - config_lines.append(f'{key}: "{value}"') - else: - config_lines.append(f"{key}: {value}") + for field_name in _CLI_FIELDS_BY_SUBTEST[cmd_args.subtest_name]: + self._append_cli_field(parts, field_name, getattr(cmd_args, field_name)) - config_path.parent.mkdir(parents=True, exist_ok=True) + if self.test_run.test.extra_cmd_args: + parts.append(self.test_run.test.extra_args_str) - with open(config_path, "w") as f: - f.write("\n".join(config_lines)) + return parts def gen_srun_success_check(self) -> str: - """Check if DeepEP benchmark completed successfully.""" output_file = self.test_run.output_path / "stdout.txt" - return f'grep -q "global_bw\\|deepep_time" {output_file} && echo 1 || echo 0' + return f'grep -Eq "\\[testing\\]|dispatch|combine|passed|tuning|Best" {output_file} && echo 1 || echo 0' diff --git a/src/cloudai/workloads/moe_benchmark/__init__.py b/src/cloudai/workloads/moe_benchmark/__init__.py new file mode 100644 index 000000000..d2a4cbae2 --- /dev/null +++ b/src/cloudai/workloads/moe_benchmark/__init__.py @@ -0,0 +1,16 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from .moe_benchmark import MoEBenchmarkCmdArgs, MoEBenchmarkTestDefinition +from .report_generation_strategy import MoEBenchmarkReportGenerationStrategy +from .slurm_command_gen_strategy import MoEBenchmarkSlurmCommandGenStrategy +from .throughput_reporter import MoEBenchmarkThroughputReporter + +__all__ = [ + "MoEBenchmarkCmdArgs", + "MoEBenchmarkReportGenerationStrategy", + "MoEBenchmarkSlurmCommandGenStrategy", + "MoEBenchmarkTestDefinition", + "MoEBenchmarkThroughputReporter", +] diff --git a/src/cloudai/workloads/moe_benchmark/combined_report.py b/src/cloudai/workloads/moe_benchmark/combined_report.py new file mode 100644 index 000000000..425145b09 --- /dev/null +++ b/src/cloudai/workloads/moe_benchmark/combined_report.py @@ -0,0 +1,63 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""MoE benchmark dependency helpers for Slurm UCC/NCCL.""" + +from __future__ import annotations + +from pathlib import Path + +from cloudai.core import TestRun + +MOE_BENCHMARK_PREV_MOUNT = "/cloudai_moe_benchmark_prev" + + +def start_post_comp_chain(test_run: TestRun) -> list[TestRun]: + """Follow ``start_post_comp`` (e.g. UCC -> NCCL -> MoE benchmark).""" + dep = test_run.dependencies.get("start_post_comp") + if dep is None: + return [] + chain: list[TestRun] = [] + seen: set[int] = set() + cur: TestRun | None = dep.test_run + while cur is not None and id(cur) not in seen: + seen.add(id(cur)) + chain.append(cur) + nxt = cur.dependencies.get("start_post_comp") + cur = nxt.test_run if nxt else None + return chain + + +def _has_ucc_matrix_under(root: Path) -> bool: + if (root / "ucc_matrix.txt").is_file(): + return True + return any(root.glob("**/ucc_matrix.txt")) + + +def moe_benchmark_root(test_run: TestRun) -> Path | None: + """MoE benchmark job directory (``ucc_matrix`` or BENCHMARK stdout), walking ``start_post_comp``.""" + for tr in start_post_comp_chain(test_run): + root = tr.output_path + if _has_ucc_matrix_under(root): + return root + st = root / "stdout.txt" + if st.is_file(): + try: + if "BENCHMARK: DeepEP Results" in st.read_text(errors="replace")[:250000]: + return root + except OSError: + continue + return None + + +def moe_benchmark_results_json_files(test_output_path: Path) -> list[Path]: + """All ``results.json`` paths under ``results/benchmark_*`` or top-level ``benchmark_*``.""" + found: list[Path] = [] + for pattern in ("results/benchmark_*_ranks_*", "benchmark_*_ranks_*"): + for d in sorted(test_output_path.glob(pattern)): + if d.is_dir(): + rj = d / "results.json" + if rj.is_file(): + found.append(rj) + return found diff --git a/src/cloudai/workloads/moe_benchmark/moe_benchmark.py b/src/cloudai/workloads/moe_benchmark/moe_benchmark.py new file mode 100644 index 000000000..df3bb7c6a --- /dev/null +++ b/src/cloudai/workloads/moe_benchmark/moe_benchmark.py @@ -0,0 +1,68 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from typing import Literal, Optional + +from cloudai.core import DockerImage, Installable +from cloudai.models.workload import CmdArgs, TestDefinition + + +class MoEBenchmarkCmdArgs(CmdArgs): + """Command arguments for the custom MoE benchmark that compares EP/alltoallv backends.""" + + docker_image_url: str + benchmark_root: str = "/workspace/dp-benchmark/benchmark" + mode: Literal["standard", "low_latency"] = "standard" + tokens: int = 1024 + num_experts: int = 256 + num_topk: int = 8 + hidden_size: int = 7168 + data_type: Literal["bfloat16", "fp8"] = "bfloat16" + allow_nvlink_for_low_latency: bool = False + allow_mnnvl: bool = False + round_scale: bool = False + use_ue8m0: bool = False + num_warmups: int = 20 + num_iterations: int = 50 + shuffle_columns: bool = False + use_kineto_profiler: bool = False + enable_tuning: bool = False + num_sms: int = 24 + num_qps_per_rank: int = 12 + config_file_path: str = "/tmp/config.yaml" + results_dir: str = "/workspace/dp-benchmark/results" + + +class MoEBenchmarkTestDefinition(TestDefinition): + """Test object for the custom MoE benchmark.""" + + cmd_args: MoEBenchmarkCmdArgs + _docker_image: Optional[DockerImage] = None + + @property + def docker_image(self) -> DockerImage: + if not self._docker_image: + if not self.cmd_args.docker_image_url: + raise ValueError("docker_image_url is required for MoE benchmark") + self._docker_image = DockerImage(url=self.cmd_args.docker_image_url) + return self._docker_image + + @property + def installables(self) -> list[Installable]: + return [self.docker_image] + + @property + def cmd_args_dict(self) -> dict: + """Return command arguments as dict, excluding CloudAI/container-only fields.""" + return self.cmd_args.model_dump( + exclude={ + "docker_image_url", + "benchmark_root", + "mode", + "num_sms", + "num_qps_per_rank", + "config_file_path", + "results_dir", + } + ) diff --git a/src/cloudai/workloads/deepep/report_generation_strategy.py b/src/cloudai/workloads/moe_benchmark/report_generation_strategy.py similarity index 57% rename from src/cloudai/workloads/deepep/report_generation_strategy.py rename to src/cloudai/workloads/moe_benchmark/report_generation_strategy.py index fc33cd741..5bb0f01d0 100644 --- a/src/cloudai/workloads/deepep/report_generation_strategy.py +++ b/src/cloudai/workloads/moe_benchmark/report_generation_strategy.py @@ -1,18 +1,6 @@ # SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES # Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. from __future__ import annotations @@ -25,49 +13,28 @@ from cloudai.core import ReportGenerationStrategy from cloudai.report_generator.tool.csv_report_tool import CSVReportTool from cloudai.util.lazy_imports import lazy +from cloudai.workloads.moe_benchmark.combined_report import moe_benchmark_results_json_files if TYPE_CHECKING: import pandas as pd -class DeepEPReportGenerationStrategy(ReportGenerationStrategy): - """Strategy for generating reports from DeepEP benchmark outputs.""" +class MoEBenchmarkReportGenerationStrategy(ReportGenerationStrategy): + """Strategy for generating reports from MoE benchmark outputs.""" def can_handle_directory(self) -> bool: - """ - Check if this directory contains DeepEP benchmark results. - - Returns: - bool: True if directory contains DeepEP results. - """ - # Check for results subdirectories created by DeepEP directory_path = self.test_run.output_path - matching_dirs = list(directory_path.glob("results/benchmark_*_ranks_*")) - - if matching_dirs: - # Check if any of them has results.json - for result_dir in matching_dirs: - if (result_dir / "results.json").exists(): - return True - - return False + return bool(moe_benchmark_results_json_files(directory_path)) def generate_report(self) -> None: - """Generate a report from DeepEP benchmark results.""" + """Generate a report from MoE benchmark results.""" directory_path = self.test_run.output_path test_name = self.test_run.test.name - results_dirs = list(directory_path.glob("results/benchmark_*_ranks_*")) - - if not results_dirs: - return - all_results = [] - for result_dir in results_dirs: - results_json = result_dir / "results.json" - if not results_json.exists(): - continue + for results_json in moe_benchmark_results_json_files(directory_path): + result_dir = results_json.parent try: with open(results_json, "r") as f: @@ -76,6 +43,9 @@ def generate_report(self) -> None: logging.debug(f"Error parsing {results_json}: {e}") continue + if not isinstance(results_data, list): + continue + match = re.match(r"benchmark_(\d+)_ranks_(.+?)_(low_latency|standard)", result_dir.name) num_ranks, timestamp, mode = 0, "unknown", "unknown" if match: @@ -84,6 +54,8 @@ def generate_report(self) -> None: mode = match.group(3) for result in results_data: + if not isinstance(result, dict): + continue result["num_ranks"] = num_ranks result["timestamp"] = timestamp result["mode"] = mode @@ -99,6 +71,9 @@ def generate_report(self) -> None: "num_tokens", "hidden", "deepep_time", + "bus_bw_avg", + "bus_bw_min", + "bus_bw_max", "global_bw", "simple_rdma_bw", "simple_nvl_bw", @@ -112,14 +87,6 @@ def generate_report(self) -> None: self._generate_csv_report(df, directory_path, test_name) def _generate_csv_report(self, df: pd.DataFrame, directory_path: Path, test_name: str) -> None: - """ - Generate a CSV report from the DataFrame. - - Args: - df (pd.DataFrame): DataFrame containing the benchmark results. - directory_path (Path): Output directory path for saving the CSV report. - test_name (str): Name of the test. - """ csv_report_tool = CSVReportTool(directory_path) csv_report_tool.set_dataframe(df) csv_report_tool.finalize_report(Path(f"cloudai_{test_name}_report.csv")) diff --git a/src/cloudai/workloads/moe_benchmark/slurm_command_gen_strategy.py b/src/cloudai/workloads/moe_benchmark/slurm_command_gen_strategy.py new file mode 100644 index 000000000..88c9d872a --- /dev/null +++ b/src/cloudai/workloads/moe_benchmark/slurm_command_gen_strategy.py @@ -0,0 +1,86 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from pathlib import Path, PurePosixPath +from typing import List, cast + +from cloudai.systems.slurm import SlurmCommandGenStrategy + +from .moe_benchmark import MoEBenchmarkCmdArgs, MoEBenchmarkTestDefinition + + +class MoEBenchmarkSlurmCommandGenStrategy(SlurmCommandGenStrategy): + """Command generation strategy for the custom MoE benchmark on Slurm systems.""" + + def _append_head_node_detection(self, batch_script_content: List[str]) -> None: + batch_script_content.extend( + [ + "", + "nodes=( $( scontrol show hostnames $SLURM_JOB_NODELIST ) )", + "nodes_array=($nodes)", + "head_node=${nodes_array[0]}", + 'head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address)', + "", + "echo Nodes: $SLURM_JOB_NODELIST", + "echo Num Nodes: ${#nodes[@]}", + "echo Head Node IP: $head_node_ip", + "", + "export MASTER_ADDR=$head_node_ip", + "export MASTER_PORT=29500", + "", + ] + ) + + def _append_sbatch_directives(self, batch_script_content: List[str]) -> None: + super()._append_sbatch_directives(batch_script_content) + self._append_head_node_detection(batch_script_content) + + def _container_mounts(self) -> List[str]: + """Return container mounts specific to the MoE benchmark.""" + tdef: MoEBenchmarkTestDefinition = cast(MoEBenchmarkTestDefinition, self.test_run.test) + cmd_args: MoEBenchmarkCmdArgs = tdef.cmd_args + + config_file_path = self.test_run.output_path / "config.yaml" + self._generate_config_yaml(config_file_path) + + return [ + f"{config_file_path.absolute()}:{cmd_args.config_file_path}", + f"{self.test_run.output_path.absolute()}:{cmd_args.results_dir}", + ] + + def image_path(self) -> str | None: + tdef: MoEBenchmarkTestDefinition = cast(MoEBenchmarkTestDefinition, self.test_run.test) + return str(tdef.docker_image.installed_path) + + def generate_test_command(self) -> List[str]: + tdef: MoEBenchmarkTestDefinition = cast(MoEBenchmarkTestDefinition, self.test_run.test) + cmd_args: MoEBenchmarkCmdArgs = tdef.cmd_args + + script_name = "benchmark.py" if cmd_args.mode == "standard" else "benchmark_ll.py" + benchmark_script = str(PurePosixPath(cmd_args.benchmark_root) / script_name) + + return ["python", benchmark_script, cmd_args.config_file_path] + + def _generate_config_yaml(self, config_path: Path) -> None: + tdef: MoEBenchmarkTestDefinition = cast(MoEBenchmarkTestDefinition, self.test_run.test) + + config_lines = ["# MoE Benchmark Configuration", "# Generated by CloudAI", ""] + for key, value in tdef.cmd_args_dict.items(): + if isinstance(value, bool): + config_lines.append(f"{key}: {str(value).lower()}") + elif isinstance(value, str): + config_lines.append(f'{key}: "{value}"') + else: + config_lines.append(f"{key}: {value}") + + config_path.parent.mkdir(parents=True, exist_ok=True) + with open(config_path, "w") as f: + f.write("\n".join(config_lines)) + + def gen_srun_success_check(self) -> str: + output_file = self.test_run.output_path / "stdout.txt" + return ( + 'grep -Eq "global_bw|RDMA BW \\(GB/s\\)|NVLink BW \\(GB/s\\)|Bus BW \\(GB/s\\)|Global BW \\(GB/s\\)" ' + f'{output_file} && echo 1 || echo 0' + ) diff --git a/src/cloudai/workloads/moe_benchmark/throughput_reporter.py b/src/cloudai/workloads/moe_benchmark/throughput_reporter.py new file mode 100644 index 000000000..657570ccd --- /dev/null +++ b/src/cloudai/workloads/moe_benchmark/throughput_reporter.py @@ -0,0 +1,264 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Scenario-level MoE throughput summary: standalone SVG file.""" + +from __future__ import annotations + +import html +import json +import logging +import re +from pathlib import Path + +from cloudai.core import Reporter +from cloudai.workloads.moe_benchmark.combined_report import moe_benchmark_results_json_files +from cloudai.workloads.moe_benchmark.moe_benchmark import MoEBenchmarkTestDefinition +from cloudai.workloads.nccl_test.nccl import NCCLTestDefinition +from cloudai.workloads.nccl_test.performance_report_generation_strategy import extract_nccl_data +from cloudai.workloads.ucc_test.ucc import UCCTestDefinition + + +def _read_latest_moe_results_rows(test_output: Path) -> list[object]: + paths = moe_benchmark_results_json_files(test_output) + if not paths: + return [] + latest = max(paths, key=lambda p: p.stat().st_mtime) + try: + data = json.loads(latest.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError) as e: + logging.debug("MoE benchmark results.json unreadable %s: %s", latest, e) + return [] + return data if isinstance(data, list) else [] + + +def _extract_moe_bus_bw(row: object) -> tuple[str, float] | None: + if not isinstance(row, dict): + return None + op = row.get("operation") + if not isinstance(op, str) or "bus_bw_avg" not in row: + return None + op_l = op.lower() + if op_l not in ("dispatch", "combine"): + return None + try: + return op_l, float(row["bus_bw_avg"]) + except (TypeError, ValueError): + return None + + +def _moe_benchmark_dispatch_combine_bars(test_output: Path) -> list[tuple[str, float, str]]: + """From latest ``results.json``: one bar per ``dispatch`` / ``combine`` row (``bus_bw_avg``).""" + by_op: dict[str, float] = {} + for row in _read_latest_moe_results_rows(test_output): + extracted = _extract_moe_bus_bw(row) + if extracted is not None: + by_op[extracted[0]] = extracted[1] + + out: list[tuple[str, float, str]] = [] + if "dispatch" in by_op: + out.append(("MoE dispatch", by_op["dispatch"], "#2ca02c")) + if "combine" in by_op: + out.append(("MoE combine", by_op["combine"], "#31a354")) + return out + + +def _mean_ucc_bus_bw_gb_s(test_output: Path) -> float | None: + for name in ("stdout.txt", "ucc_perftest_capture.log"): + path = test_output / name + if not path.is_file(): + continue + v = _parse_ucc_perftest_mean_bus_avg(path) + if v is not None: + return v + return None + + +def _parse_ucc_perftest_mean_bus_avg(path: Path) -> float | None: + """Mean of ``Bus Bandwidth ... avg`` column over numeric data rows (8 fields).""" + try: + text = path.read_text(encoding="utf-8", errors="replace") + except OSError: + return None + avgs: list[float] = [] + for line in text.splitlines(): + parts = re.split(r"\s+", line.strip()) + if len(parts) != 8: + continue + if not parts[0].isdigit() or not parts[1].isdigit(): + continue + try: + sz = float(parts[1]) + bavg = float(parts[5]) + except ValueError: + continue + if sz < 1048576: + continue + avgs.append(bavg) + if not avgs: + return None + return float(sum(avgs) / len(avgs)) + + +def _mean_nccl_oop_busbw_gb_s(test_output: Path) -> float | None: + rows, _, _, _ = extract_nccl_data(test_output / "stdout.txt") + if not rows: + return None + vals: list[float] = [] + for parts in rows: + try: + vals.append(float(parts[7])) + except (IndexError, ValueError): + continue + if not vals: + return None + return float(sum(vals) / len(vals)) + + +def _write_moe_throughput_svg( + path: Path, + *, + scenario_name: str, + labels: list[str], + values: list[float], + colors: list[str], + y_axis_label: str, +) -> None: + """Bar chart + value markers; standalone SVG.""" + n = len(labels) + ml, mr, mt = 72, 44, 72 + ih = 300 + mb = max(100, 36 + n * 18) + h = mt + ih + mb + w = max(720, min(1280, ml + mr + max(1, n) * 92)) + + iw = w - ml - mr + y0 = mt + ih + vmin, vmax = 0.0, max(values) * 1.12 if values else 1.0 + if vmax <= vmin: + vmax = vmin + 1.0 + + def ypx(v: float) -> float: + return y0 - (v - vmin) / (vmax - vmin) * ih + + slot = iw / max(n, 1) + bar_w = min(56.0, slot * 0.55) + centers = [ml + (i + 0.5) * slot for i in range(n)] + pts = [(cx, ypx(v)) for cx, v in zip(centers, values, strict=True)] + + parts: list[str] = [ + '', + f'', + '', + f'{html.escape(scenario_name)}', + f'{html.escape(y_axis_label)}', + f'', + f'', + ] + + for g in (0.25, 0.5, 0.75): + gy = y0 - g * ih + parts.append(f'') + gv = vmin + g * (vmax - vmin) + parts.append( + f'{gv:.1f}' + ) + + for cx, val, col, _ in zip(centers, values, colors, labels, strict=True): + top = ypx(val) + x1 = cx - bar_w / 2 + hbar = y0 - top + parts.append( + f'' + ) + + for (cx, cy), val, col, lab in zip(pts, values, colors, labels, strict=True): + col_esc = html.escape(col) + parts.append( + f'' + ) + parts.append( + f'{val:.2f}' + ) + parts.append( + f'' + f"{html.escape(lab)}" + ) + + y_axis_mid = mt + ih / 2 + parts.append( + f'{html.escape(y_axis_label)}' + ) + + leg_y = y0 + 38 + parts.append(f'Summary') + for i, (lab, val, col) in enumerate(zip(labels, values, colors, strict=True)): + parts.append( + f'' + f'{html.escape(lab)}' + f": {val:.4f} GB/s" + ) + + parts.append("") + path.write_text("\n".join(parts), encoding="utf-8") + + +class MoEBenchmarkThroughputReporter(Reporter): + """After the scenario finishes, write one standalone SVG chart under the results root.""" + + def generate(self) -> None: + self.load_test_runs() + moe_trs = [tr for tr in self.trs if isinstance(tr.test, MoEBenchmarkTestDefinition)] + if not moe_trs: + logging.debug("Skipping moe_benchmark_throughput: no MoEBenchmark test in scenario.") + return + + categories: list[str] = [] + values: list[float] = [] + colors: list[str] = [] + + moe_bars = _moe_benchmark_dispatch_combine_bars(moe_trs[0].output_path) + if not moe_bars: + logging.warning( + "Skipping moe_benchmark_throughput: no dispatch/combine bus_bw_avg in results.json under %s", + moe_trs[0].output_path, + ) + return + for lab, val, col in moe_bars: + categories.append(lab) + values.append(val) + colors.append(col) + + ucc_trs = [tr for tr in self.trs if isinstance(tr.test, UCCTestDefinition)] + if ucc_trs: + uval = _mean_ucc_bus_bw_gb_s(ucc_trs[0].output_path) + if uval is not None: + categories.append("UCC") + values.append(uval) + colors.append("#1f77b4") + else: + logging.debug("UCC test present but bus bandwidth not parsed from outputs.") + + nccl_trs = [tr for tr in self.trs if isinstance(tr.test, NCCLTestDefinition)] + if nccl_trs: + nval = _mean_nccl_oop_busbw_gb_s(nccl_trs[0].output_path) + if nval is not None: + categories.append("NCCL") + values.append(nval) + colors.append("#ff7f0e") + else: + logging.debug("NCCL test present but perf table not parsed from stdout.") + + out = self.results_root / f"{self.test_scenario.name}-moe-throughput.svg" + _write_moe_throughput_svg( + out, + scenario_name=self.test_scenario.name, + labels=categories, + values=values, + colors=colors, + y_axis_label="Mean bus bandwidth (GB/s)", + ) diff --git a/src/cloudai/workloads/nccl_test/nccl.py b/src/cloudai/workloads/nccl_test/nccl.py index c0381c5b0..2968cf2af 100644 --- a/src/cloudai/workloads/nccl_test/nccl.py +++ b/src/cloudai/workloads/nccl_test/nccl.py @@ -29,6 +29,7 @@ class NCCLCmdArgs(CmdArgs): "all_reduce_perf_mpi", "all_gather_perf_mpi", "alltoall_perf_mpi", + "alltoallv_perf_mpi", "broadcast_perf_mpi", "gather_perf_mpi", "hypercube_perf_mpi", @@ -41,6 +42,7 @@ class NCCLCmdArgs(CmdArgs): "all_reduce_perf", "all_gather_perf", "alltoall_perf", + "alltoallv_perf", "broadcast_perf", "gather_perf", "hypercube_perf", @@ -55,6 +57,7 @@ class NCCLCmdArgs(CmdArgs): "all_reduce_perf_mpi", "all_gather_perf_mpi", "alltoall_perf_mpi", + "alltoallv_perf_mpi", "broadcast_perf_mpi", "gather_perf_mpi", "hypercube_perf_mpi", @@ -67,6 +70,7 @@ class NCCLCmdArgs(CmdArgs): "all_reduce_perf", "all_gather_perf", "alltoall_perf", + "alltoallv_perf", "broadcast_perf", "gather_perf", "hypercube_perf", @@ -97,6 +101,8 @@ class NCCLCmdArgs(CmdArgs): blocking: Union[int, list[int]] = 0 cudagraph: Union[int, list[int]] = 0 stepfactor: Optional[Union[int, list[int]]] = None + use_deepep_matrix: bool = False + alltoallv_matrix_container_path: str = "/tmp/traffic_matrix.txt" class NCCLTestDefinition(TestDefinition): diff --git a/src/cloudai/workloads/nccl_test/slurm_command_gen_strategy.py b/src/cloudai/workloads/nccl_test/slurm_command_gen_strategy.py index 1295187e0..1844d2f65 100644 --- a/src/cloudai/workloads/nccl_test/slurm_command_gen_strategy.py +++ b/src/cloudai/workloads/nccl_test/slurm_command_gen_strategy.py @@ -14,18 +14,77 @@ # See the License for the specific language governing permissions and # limitations under the License. +from pathlib import Path from typing import List, cast from cloudai.systems.slurm import SlurmCommandGenStrategy +from cloudai.workloads.moe_benchmark.combined_report import MOE_BENCHMARK_PREV_MOUNT, moe_benchmark_root -from .nccl import NCCLTestDefinition +from .nccl import NCCLCmdArgs, NCCLTestDefinition + +_ALLTOALLV_MATRIX_ENV = "ALLTOALLV_MATRIX_FILE" +_NCCL_TESTS_ALLTOALLV_PERF = "/opt/nccl-tests/build/alltoallv_perf" + + +def _nccl_cmd_scalar(value: object) -> object: + if isinstance(value, list): + return value[0] if value else value + return value + + +def _nccl_matrix_path_under_deepep_output(dep_out: Path) -> Path | None: + """DeepEP writes nccl_matrix.txt under the dependency test output or a timestamped benchmark subdir.""" + direct = dep_out / "nccl_matrix.txt" + if direct.is_file(): + return direct + nested = sorted( + dep_out.glob("**/nccl_matrix.txt"), + key=lambda p: p.stat().st_mtime, + reverse=True, + ) + return nested[0] if nested else None class NcclTestSlurmCommandGenStrategy(SlurmCommandGenStrategy): """Command generation strategy for NCCL tests on Slurm systems.""" + def _deepep_nccl_matrix_host_path(self) -> Path | None: + tdef: NCCLTestDefinition = cast(NCCLTestDefinition, self.test_run.test) + if not tdef.cmd_args.use_deepep_matrix: + return None + root = moe_benchmark_root(self.test_run) + if root is None: + return None + return _nccl_matrix_path_under_deepep_output(root) + def _container_mounts(self) -> List[str]: - return [] + tdef: NCCLTestDefinition = cast(NCCLTestDefinition, self.test_run.test) + if not tdef.cmd_args.use_deepep_matrix: + return [] + + matrix_host = self._deepep_nccl_matrix_host_path() + if matrix_host is None: + return [] + + dest = tdef.cmd_args.alltoallv_matrix_container_path + mounts: List[str] = [f"{matrix_host.resolve()}:{dest}"] + + dr = moe_benchmark_root(self.test_run) + if dr is not None: + mounts.append(f"{dr.resolve()}:{MOE_BENCHMARK_PREV_MOUNT}:ro") + return mounts + + @property + def final_env_vars(self) -> dict[str, str | list[str]]: + env_vars = dict(super().final_env_vars) + tdef: NCCLTestDefinition = cast(NCCLTestDefinition, self.test_run.test) + if tdef.cmd_args.use_deepep_matrix and self._deepep_nccl_matrix_host_path() is not None: + env_vars[_ALLTOALLV_MATRIX_ENV] = tdef.cmd_args.alltoallv_matrix_container_path + return env_vars + + @final_env_vars.setter + def final_env_vars(self, value: dict[str, str | list[str]]) -> None: + super().final_env_vars = value def image_path(self) -> str | None: tdef: NCCLTestDefinition = cast(NCCLTestDefinition, self.test_run.test) @@ -33,10 +92,35 @@ def image_path(self) -> str | None: def generate_test_command(self) -> List[str]: tdef: NCCLTestDefinition = cast(NCCLTestDefinition, self.test_run.test) + if tdef.cmd_args.subtest_name == "alltoallv_perf_mpi": + a = tdef.cmd_args + parts: List[str] = [ + _NCCL_TESTS_ALLTOALLV_PERF, + "-b", + str(_nccl_cmd_scalar(a.minbytes)), + "-e", + str(_nccl_cmd_scalar(a.maxbytes)), + "-g", + str(_nccl_cmd_scalar(a.ngpus)), + "-w", + str(_nccl_cmd_scalar(a.warmup_iters)), + "-n", + str(_nccl_cmd_scalar(a.iters)), + ] + if self.test_run.test.extra_cmd_args: + parts.append(self.test_run.test.extra_args_str) + return parts + srun_command_parts = [f"{tdef.cmd_args.subtest_name}"] + skip_cli = { + "docker_image_url", + "subtest_name", + "use_deepep_matrix", + "alltoallv_matrix_container_path", + } nccl_test_args = tdef.cmd_args.model_dump().keys() for arg in nccl_test_args: - if arg in {"docker_image_url", "subtest_name"}: + if arg in skip_cli: continue value = getattr(tdef.cmd_args, arg) diff --git a/src/cloudai/workloads/ucc_test/slurm_command_gen_strategy.py b/src/cloudai/workloads/ucc_test/slurm_command_gen_strategy.py index 80e1a34c7..3bd941c9e 100644 --- a/src/cloudai/workloads/ucc_test/slurm_command_gen_strategy.py +++ b/src/cloudai/workloads/ucc_test/slurm_command_gen_strategy.py @@ -14,18 +14,59 @@ # See the License for the specific language governing permissions and # limitations under the License. +from pathlib import Path from typing import List, cast from cloudai.systems.slurm import SlurmCommandGenStrategy +from cloudai.workloads.moe_benchmark.combined_report import MOE_BENCHMARK_PREV_MOUNT, moe_benchmark_root from .ucc import UCCCmdArgs, UCCTestDefinition +_UCC_GEN_MATRIX_CONTAINER = "/opt/hpcx/ucc/tools/perf/generator/input_matrices.txt" + + +def _ucc_matrix_path_under_deepep_output(dep_out: Path) -> Path | None: + """DeepEP writes ucc_matrix.txt under a timestamped benchmark subdir; resolve either layout.""" + direct = dep_out / "ucc_matrix.txt" + if direct.is_file(): + return direct + nested = sorted( + dep_out.glob("**/ucc_matrix.txt"), + key=lambda p: p.stat().st_mtime, + reverse=True, + ) + return nested[0] if nested else None + class UCCTestSlurmCommandGenStrategy(SlurmCommandGenStrategy): """Command generation strategy for UCC tests on Slurm systems.""" + def _deepep_ucc_matrix_host_path(self) -> Path | None: + tdef: UCCTestDefinition = cast(UCCTestDefinition, self.test_run.test) + if not tdef.cmd_args.use_deepep_matrix: + return None + dep_out = moe_benchmark_root(self.test_run) + if dep_out is None: + return None + return _ucc_matrix_path_under_deepep_output(dep_out) + def _container_mounts(self) -> List[str]: - return [] + tdef: UCCTestDefinition = cast(UCCTestDefinition, self.test_run.test) + if not tdef.cmd_args.use_deepep_matrix: + return [] + + deepep_root = moe_benchmark_root(self.test_run) + if deepep_root is None: + return [] + + matrix_host = self._deepep_ucc_matrix_host_path() + if matrix_host is None: + return [] + + return [ + f"{matrix_host.resolve()}:{_UCC_GEN_MATRIX_CONTAINER}", + f"{deepep_root.resolve()}:{MOE_BENCHMARK_PREV_MOUNT}:ro", + ] def image_path(self) -> str | None: tdef: UCCTestDefinition = cast(UCCTestDefinition, self.test_run.test) @@ -41,6 +82,8 @@ def generate_test_command(self) -> List[str]: srun_command_parts.append(f"-e {tdef_cmd_args.e}") if tdef_cmd_args.gen is not None: srun_command_parts.append(f"--gen {tdef_cmd_args.gen}") + elif self._deepep_ucc_matrix_host_path() is not None: + srun_command_parts.append(f"--gen file:name={_UCC_GEN_MATRIX_CONTAINER}") srun_command_parts.append("-m cuda") srun_command_parts.append("-F") diff --git a/src/cloudai/workloads/ucc_test/ucc.py b/src/cloudai/workloads/ucc_test/ucc.py index 982387937..35c723a4c 100644 --- a/src/cloudai/workloads/ucc_test/ucc.py +++ b/src/cloudai/workloads/ucc_test/ucc.py @@ -69,6 +69,7 @@ class UCCCmdArgs(CmdArgs): b: Union[int, list[int]] = 1 e: Union[str, list[str]] = "8M" gen: Union[str, list[str], None] = None + use_deepep_matrix: bool = False class UCCTestDefinition(TestDefinition): diff --git a/tests/ref_data/deepep-benchmark.sbatch b/tests/ref_data/deepep-benchmark.sbatch index f3eb086e2..c5e15e9e6 100644 --- a/tests/ref_data/deepep-benchmark.sbatch +++ b/tests/ref_data/deepep-benchmark.sbatch @@ -7,6 +7,7 @@ #SBATCH -N 2 #SBATCH --gpus-per-node=8 #SBATCH --gres=gpu:8 +#SBATCH --ntasks-per-node=1 export SLURM_JOB_MASTER_NODE=$(scontrol show hostname $SLURM_JOB_NODELIST | head -n 1) @@ -19,9 +20,12 @@ echo Nodes: $SLURM_JOB_NODELIST echo Num Nodes: ${#nodes[@]} echo Head Node IP: $head_node_ip +export MASTER_ADDR=$head_node_ip +export MASTER_PORT=29500 -srun --export=ALL --mpi=pmix -N2 --container-image=docker/image:url --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__OUTPUT_DIR__/output:__OUTPUT_DIR__/output,__OUTPUT_DIR__/output:/workspace/dp-benchmark/results --output=__OUTPUT_DIR__/output/mapping-stdout.txt --error=__OUTPUT_DIR__/output/mapping-stderr.txt bash -c "echo \$(date): \$(hostname):node \${SLURM_NODEID}:rank \${SLURM_PROCID}." -srun --export=ALL --mpi=pmix -N2 --container-image=docker/image:url --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__OUTPUT_DIR__/output:__OUTPUT_DIR__/output,__OUTPUT_DIR__/output:/workspace/dp-benchmark/results --ntasks=2 --ntasks-per-node=1 --output=__OUTPUT_DIR__/output/metadata/node-%N.toml --error=__OUTPUT_DIR__/output/metadata/nodes.err bash /cloudai_install/slurm-metadata.sh +srun --export=ALL --mpi=pmix -N2 --container-image=docker/image:url --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output --output=__OUTPUT_DIR__/output/mapping-stdout.txt --error=__OUTPUT_DIR__/output/mapping-stderr.txt bash -c "echo \$(date): \$(hostname):node \${SLURM_NODEID}:rank \${SLURM_PROCID}." -srun --export=ALL --mpi=pmix -N2 --container-image=docker/image:url --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__OUTPUT_DIR__/output:__OUTPUT_DIR__/output,__OUTPUT_DIR__/output:/workspace/dp-benchmark/results bash -c "source __OUTPUT_DIR__/output/env_vars.sh; torchrun --nnodes=2 --nproc_per_node=1 --rdzv_id=$RANDOM --rdzv_backend=c10d --rdzv_endpoint=$head_node_ip:29500 /workspace/dp-benchmark/benchmark/benchmark.py __OUTPUT_DIR__/output/config.yaml" +srun --export=ALL --mpi=pmix -N2 --container-image=docker/image:url --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output --ntasks=2 --ntasks-per-node=1 --output=__OUTPUT_DIR__/output/metadata/node-%N.toml --error=__OUTPUT_DIR__/output/metadata/nodes.err bash /cloudai_install/slurm-metadata.sh + +srun --export=ALL --mpi=pmix --nodes=2 --ntasks=2 --ntasks-per-node=1 --container-image=docker/image:url --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output bash -c "source __OUTPUT_DIR__/output/env_vars.sh; torchrun --nnodes=2 --nproc_per_node=1 --rdzv_id=\${SLURM_JOB_ID:-0} --rdzv_backend=c10d --rdzv_endpoint=\${MASTER_ADDR}:\${MASTER_PORT} /workspace/DeepEP/tests/legacy/test_internode.py --num-processes 8 --num-tokens 4096 --hidden 7168 --num-topk 8 --pressure-test-mode 0 --num-experts 256" diff --git a/tests/ref_data/moe-benchmark.sbatch b/tests/ref_data/moe-benchmark.sbatch new file mode 100644 index 000000000..e44c4a27c --- /dev/null +++ b/tests/ref_data/moe-benchmark.sbatch @@ -0,0 +1,31 @@ +#!/bin/bash +# generated by CloudAI@__CLOUDAI_VERSION__ +#SBATCH --job-name=__JOB_NAME__ +#SBATCH --output=__OUTPUT_DIR__/output/stdout.txt +#SBATCH --error=__OUTPUT_DIR__/output/stderr.txt +#SBATCH --partition=main +#SBATCH -N 2 +#SBATCH --gpus-per-node=8 +#SBATCH --gres=gpu:8 +#SBATCH --ntasks-per-node=8 + +export SLURM_JOB_MASTER_NODE=$(scontrol show hostname $SLURM_JOB_NODELIST | head -n 1) + +nodes=( $( scontrol show hostnames $SLURM_JOB_NODELIST ) ) +nodes_array=($nodes) +head_node=${nodes_array[0]} +head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address) + +echo Nodes: $SLURM_JOB_NODELIST +echo Num Nodes: ${#nodes[@]} +echo Head Node IP: $head_node_ip + +export MASTER_ADDR=$head_node_ip +export MASTER_PORT=29500 + + +srun --export=ALL --mpi=pmix -N2 --container-image=docker/image:url --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__OUTPUT_DIR__/output/config.yaml:/tmp/config.yaml,__OUTPUT_DIR__/output:/workspace/dp-benchmark/results --output=__OUTPUT_DIR__/output/mapping-stdout.txt --error=__OUTPUT_DIR__/output/mapping-stderr.txt bash -c "echo \$(date): \$(hostname):node \${SLURM_NODEID}:rank \${SLURM_PROCID}." + +srun --export=ALL --mpi=pmix -N2 --container-image=docker/image:url --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__OUTPUT_DIR__/output/config.yaml:/tmp/config.yaml,__OUTPUT_DIR__/output:/workspace/dp-benchmark/results --ntasks=2 --ntasks-per-node=1 --output=__OUTPUT_DIR__/output/metadata/node-%N.toml --error=__OUTPUT_DIR__/output/metadata/nodes.err bash /cloudai_install/slurm-metadata.sh + +srun --export=ALL --mpi=pmix -N2 --container-image=docker/image:url --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__OUTPUT_DIR__/output/config.yaml:/tmp/config.yaml,__OUTPUT_DIR__/output:/workspace/dp-benchmark/results bash -c "source __OUTPUT_DIR__/output/env_vars.sh; python /workspace/dp-benchmark/benchmark/benchmark.py /tmp/config.yaml" diff --git a/tests/test_acceptance.py b/tests/test_acceptance.py index 151c6fb9e..797d64e3a 100644 --- a/tests/test_acceptance.py +++ b/tests/test_acceptance.py @@ -46,6 +46,10 @@ DeepEPCmdArgs, DeepEPTestDefinition, ) +from cloudai.workloads.moe_benchmark import ( + MoEBenchmarkCmdArgs, + MoEBenchmarkTestDefinition +) from cloudai.workloads.jax_toolbox import ( GPTCmdArgs, GPTTestDefinition, @@ -273,6 +277,7 @@ def build_special_test_run( "ai-dynamo", "nixl-perftest", "nixl-kvbench", + "moe-benchmark", "deepep-benchmark", "osu-bench", "sglang", @@ -550,15 +555,28 @@ def test_req(request, slurm_system: SlurmSystem, partial_tr: partial[TestRun]) - ), ), ), + "moe-benchmark": lambda: create_test_run( + partial_tr, + "moe-benchmark", + MoEBenchmarkTestDefinition( + name="moe-benchmark", + description="MoE Benchmark", + test_template_name="MoEBenchmark", + cmd_args=MoEBenchmarkCmdArgs( + docker_image_url="docker/image:url", + ), + ), + ), "deepep-benchmark": lambda: create_test_run( partial_tr, "deepep-benchmark", DeepEPTestDefinition( name="deepep-benchmark", - description="DeepEP MoE Benchmark", - test_template_name="deepep-benchmark", + description="DeepEP internode test", + test_template_name="DeepEP", cmd_args=DeepEPCmdArgs( docker_image_url="docker/image:url", + subtest_name="test_internode", ), ), ), @@ -698,7 +716,7 @@ def test_req(request, slurm_system: SlurmSystem, partial_tr: partial[TestRun]) - tr.num_nodes = 3 if request.param == "ai-dynamo": tr.num_nodes = 2 - if request.param == "deepep-benchmark": + if request.param in {"moe-benchmark", "deepep-benchmark"}: tr.num_nodes = 2 if request.param in {"sglang-disagg-2nodes", "vllm-disagg-2nodes"}: tr.num_nodes = 2 diff --git a/tests/test_init.py b/tests/test_init.py index 5aa2cd0ff..26aa4da71 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -58,6 +58,11 @@ MegatronBridgeTestDefinition, ) from cloudai.workloads.megatron_run import MegatronRunSlurmCommandGenStrategy, MegatronRunTestDefinition +from cloudai.workloads.moe_benchmark import ( + MoEBenchmarkSlurmCommandGenStrategy, + MoEBenchmarkTestDefinition, + MoEBenchmarkThroughputReporter, +) from cloudai.workloads.nccl_test import ( NcclComparisonReport, NCCLTestDefinition, @@ -133,6 +138,7 @@ def test_runners(): CMD_GEN_STRATEGIES = { (SlurmSystem, ChakraReplayTestDefinition): ChakraReplaySlurmCommandGenStrategy, (SlurmSystem, DeepEPTestDefinition): DeepEPSlurmCommandGenStrategy, + (SlurmSystem, MoEBenchmarkTestDefinition): MoEBenchmarkSlurmCommandGenStrategy, (SlurmSystem, GPTTestDefinition): JaxToolboxSlurmCommandGenStrategy, (SlurmSystem, GrokTestDefinition): JaxToolboxSlurmCommandGenStrategy, (SlurmSystem, NCCLTestDefinition): NcclTestSlurmCommandGenStrategy, @@ -232,13 +238,14 @@ def test_installers(): def test_definitions(): test_defs = Registry().test_definitions_map - assert len(test_defs) == 26 + assert len(test_defs) == 27 for tdef in [ ("UCCTest", UCCTestDefinition), ("DDLBTest", DDLBTestDefinition), ("NcclTest", NCCLTestDefinition), ("ChakraReplay", ChakraReplayTestDefinition), ("DeepEP", DeepEPTestDefinition), + ("MoEBenchmark", MoEBenchmarkTestDefinition), ("Sleep", SleepTestDefinition), ("NeMoLauncher", NeMoLauncherTestDefinition), ("NeMoRun", NeMoRunTestDefinition), @@ -268,6 +275,7 @@ def test_scenario_reports(): scenario_reports = Registry().scenario_reports assert list(scenario_reports.keys()) == [ "per_test", + "moe_benchmark_throughput", "status", "dse", "tarball", @@ -277,6 +285,7 @@ def test_scenario_reports(): ] assert list(scenario_reports.values()) == [ PerTestReporter, + MoEBenchmarkThroughputReporter, StatusReporter, DSEReporter, TarballReporter, @@ -290,6 +299,7 @@ def test_report_configs(): configs = Registry().report_configs assert list(configs.keys()) == [ "per_test", + "moe_benchmark_throughput", "status", "dse", "tarball", diff --git a/tests/test_test_scenario.py b/tests/test_test_scenario.py index 8b390e8a1..9da396b8a 100644 --- a/tests/test_test_scenario.py +++ b/tests/test_test_scenario.py @@ -40,10 +40,6 @@ from cloudai.workloads.ai_dynamo import AIDynamoReportGenerationStrategy, AIDynamoTestDefinition from cloudai.workloads.aiconfig import AiconfiguratorReportGenerationStrategy, AiconfiguratorTestDefinition from cloudai.workloads.chakra_replay import ChakraReplayReportGenerationStrategy, ChakraReplayTestDefinition -from cloudai.workloads.deepep import ( - DeepEPReportGenerationStrategy, - DeepEPTestDefinition, -) from cloudai.workloads.dynamo_mocker import DynamoMockerReportGenerationStrategy, DynamoMockerTestDefinition from cloudai.workloads.jax_toolbox import ( GPTTestDefinition, @@ -58,6 +54,7 @@ MegatronRunReportGenerationStrategy, MegatronRunTestDefinition, ) +from cloudai.workloads.moe_benchmark import MoEBenchmarkReportGenerationStrategy, MoEBenchmarkTestDefinition from cloudai.workloads.nccl_test import ( NCCLCmdArgs, NCCLTestDefinition, @@ -687,7 +684,7 @@ def test_default_reporters_size(self): "tdef,expected_reporters", [ (ChakraReplayTestDefinition, {ChakraReplayReportGenerationStrategy}), - (DeepEPTestDefinition, {DeepEPReportGenerationStrategy}), + (MoEBenchmarkTestDefinition, {MoEBenchmarkReportGenerationStrategy}), (GPTTestDefinition, {JaxToolboxReportGenerationStrategy}), (GrokTestDefinition, {JaxToolboxReportGenerationStrategy}), (