From 7594c197c7346dca35f70404c08eabbe73d17e67 Mon Sep 17 00:00:00 2001 From: Taekyung Heo <7621438+TaekyungHeo@users.noreply.github.com> Date: Wed, 23 Oct 2024 08:10:09 -0400 Subject: [PATCH] Generate plugin commands --- src/cloudai/_core/command_gen_strategy.py | 26 ++++++ src/cloudai/_core/test_template.py | 34 +++++++ .../nccl_test/slurm_command_gen_strategy.py | 5 + .../strategy/slurm_command_gen_strategy.py | 92 ++++++++++++++++++- 4 files changed, 155 insertions(+), 2 deletions(-) diff --git a/src/cloudai/_core/command_gen_strategy.py b/src/cloudai/_core/command_gen_strategy.py index 16bd04f9..9c8bb389 100644 --- a/src/cloudai/_core/command_gen_strategy.py +++ b/src/cloudai/_core/command_gen_strategy.py @@ -39,3 +39,29 @@ def gen_exec_command(self, tr: TestRun) -> str: str: The generated execution command. """ pass + + @abstractmethod + def gen_srun_command(self, tr: TestRun) -> str: + """ + Generate the Slurm srun command for a test based on the given parameters. + + Args: + tr (TestRun): Contains the test and its run-specific configurations. + + Returns: + str: The generated Slurm srun command. + """ + pass + + @abstractmethod + def gen_srun_success_check(self, tr: TestRun) -> str: + """ + Generate the Slurm success check command to verify if a test run was successful. + + Args: + tr (TestRun): Contains the test and its run-specific configurations. + + Returns: + str: The generated command to check the success of the test run. + """ + pass diff --git a/src/cloudai/_core/test_template.py b/src/cloudai/_core/test_template.py index c0227d3b..0b90b737 100644 --- a/src/cloudai/_core/test_template.py +++ b/src/cloudai/_core/test_template.py @@ -133,6 +133,40 @@ def gen_exec_command(self, tr: TestRun) -> str: ) return self.command_gen_strategy.gen_exec_command(tr) + def gen_srun_command(self, tr: TestRun) -> str: + """ + Generate an Slurm srun command for a test using the provided command generation strategy. + + Args: + tr (TestRun): Contains the test and its run-specific configurations. + + Returns: + str: The generated Slurm srun command. + """ + if self.command_gen_strategy is None: + raise ValueError( + "command_gen_strategy is missing. Ensure the strategy is registered in the Registry " + "by calling the appropriate registration function for the system type." + ) + return self.command_gen_strategy.gen_srun_command(tr) + + def gen_srun_success_check(self, tr: TestRun) -> str: + """ + Generate a Slurm success check command for a test using the provided command generation strategy. + + Args: + tr (TestRun): Contains the test and its run-specific configurations. + + Returns: + str: The generated command to check the success of the test run. + """ + if self.command_gen_strategy is None: + raise ValueError( + "command_gen_strategy is missing. Ensure the strategy is registered in the Registry " + "by calling the appropriate registration function for the system type." + ) + return self.command_gen_strategy.gen_srun_success_check(tr) + def gen_json(self, tr: TestRun) -> Dict[Any, Any]: """ Generate a JSON string representing the Kubernetes job specification for this test using this template. diff --git a/src/cloudai/schema/test_template/nccl_test/slurm_command_gen_strategy.py b/src/cloudai/schema/test_template/nccl_test/slurm_command_gen_strategy.py index e982c28a..10885b5a 100644 --- a/src/cloudai/schema/test_template/nccl_test/slurm_command_gen_strategy.py +++ b/src/cloudai/schema/test_template/nccl_test/slurm_command_gen_strategy.py @@ -17,6 +17,7 @@ from pathlib import Path from typing import Any, Dict, List +from cloudai import TestRun from cloudai.systems.slurm.strategy import SlurmCommandGenStrategy from .slurm_install_strategy import NcclTestSlurmInstallStrategy @@ -83,3 +84,7 @@ def generate_test_command( srun_command_parts.append(extra_cmd_args) return srun_command_parts + + def generate_slurm_success_check(self, tr: TestRun) -> str: + output_file = Path(tr.output_path) / "stdout.txt" + return f'grep -q "Avg bus bandwidth" {output_file}' diff --git a/src/cloudai/systems/slurm/strategy/slurm_command_gen_strategy.py b/src/cloudai/systems/slurm/strategy/slurm_command_gen_strategy.py index 3b7a0649..542b169f 100644 --- a/src/cloudai/systems/slurm/strategy/slurm_command_gen_strategy.py +++ b/src/cloudai/systems/slurm/strategy/slurm_command_gen_strategy.py @@ -18,7 +18,7 @@ from pathlib import Path from typing import Any, Dict, List -from cloudai import CommandGenStrategy, TestRun +from cloudai import CommandGenStrategy, TestRun, TestScenario from cloudai.systems import SlurmSystem from cloudai.util.docker_image_cache_manager import DockerImageCacheManager @@ -63,8 +63,30 @@ def gen_exec_command(self, tr: TestRun) -> str: slurm_args = self._parse_slurm_args( tr.test.test_template.__class__.__name__, env_vars, cmd_args, tr.num_nodes, tr.nodes ) + + prologue_command = self.gen_prologue(tr.prologue, tr.output_path) if tr.prologue else "PROLOGUE_SUCCESS=1\n" srun_command = self._gen_srun_command(slurm_args, env_vars, cmd_args, tr.test.extra_cmd_args) - return self._write_sbatch_script(slurm_args, env_vars, srun_command, tr.output_path) + epilogue_command = self.gen_epilogue(tr.epilogue, tr.output_path) if tr.epilogue else "" + + full_command = "\n".join( + [ + prologue_command, + "if [ $PROLOGUE_SUCCESS -eq 1 ]; then", + f" {srun_command}", + f" {epilogue_command}", + "fi", + ] + ).strip() + + return self._write_sbatch_script(slurm_args, env_vars, full_command, tr.output_path) + + def gen_srun_command(self, tr: TestRun) -> str: + env_vars = self._override_env_vars(self.system.global_env_vars, tr.test.extra_env_vars) + cmd_args = self._override_cmd_args(self.default_cmd_args, tr.test.cmd_args) + slurm_args = self._parse_slurm_args( + tr.test.test_template.__class__.__name__, env_vars, cmd_args, tr.num_nodes, tr.nodes + ) + return self._gen_srun_command(slurm_args, env_vars, cmd_args, tr.test.extra_cmd_args) def _parse_slurm_args( self, @@ -112,6 +134,72 @@ def job_name(self, job_name_prefix: str) -> str: job_name = f"{self.system.account}-{job_name_prefix}.{datetime.now().strftime('%Y%m%d_%H%M%S')}" return job_name + def gen_prologue(self, prologue: TestScenario, base_output_path: Path) -> str: + """ + Generate the prologue command by running all tests defined in the prologue test scenario. + + Args: + prologue (TestScenario): The prologue test scenario containing the tests to be run. + base_output_path (Path): The base output directory path for storing prologue outputs. + + Returns: + str: A string with all the Slurm srun commands generated for the prologue. + """ + if not prologue.test_runs: + return "PROLOGUE_SUCCESS=1\n" + + prologue_output_dir = base_output_path / "prologue" + prologue_output_dir.mkdir(parents=True, exist_ok=True) + + prologue_commands = [] + success_vars = [] + + for idx, tr in enumerate(prologue.test_runs): + plugin_dir = prologue_output_dir / tr.test.name + plugin_dir.mkdir(parents=True, exist_ok=True) + tr.output_path = plugin_dir + + srun_command = tr.test.test_template.gen_srun_command(tr) + prologue_commands.append(srun_command) + + success_var = f"SUCCESS_{idx}" + success_vars.append(success_var) + + success_check_command = tr.test.test_template.gen_srun_success_check(tr) + prologue_commands.append(f"{success_var}=$({success_check_command})") + + combined_success_var = " && ".join(success_vars) + return "\n".join(prologue_commands) + f"\nPROLOGUE_SUCCESS=({combined_success_var})" + + def gen_epilogue(self, epilogue: TestScenario, base_output_path: Path) -> str: + """ + Generate the epilogue command by running all tests defined in the epilogue test scenario. + + Args: + epilogue (TestScenario): The epilogue test scenario containing the tests to be run. + base_output_path (Path): The base output directory path for storing epilogue outputs. + + Returns: + str: A string with all the Slurm srun commands generated for the epilogue. + """ + if not epilogue.test_runs: + return "" + + epilogue_output_dir = base_output_path / "epilogue" + epilogue_output_dir.mkdir(parents=True, exist_ok=True) + + epilogue_commands = [] + + for tr in epilogue.test_runs: + plugin_dir = epilogue_output_dir / tr.test.name + plugin_dir.mkdir(parents=True, exist_ok=True) + tr.output_path = plugin_dir + + srun_command = tr.test.test_template.gen_srun_command(tr) + epilogue_commands.append(srun_command) + + return "\n".join(epilogue_commands) + def _gen_srun_command( self, slurm_args: Dict[str, Any], env_vars: Dict[str, str], cmd_args: Dict[str, str], extra_cmd_args: str ) -> str: