Skip to content

Commit

Permalink
Generate plugin commands
Browse files Browse the repository at this point in the history
  • Loading branch information
TaekyungHeo committed Oct 24, 2024
1 parent 2b8b8f1 commit 7594c19
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 2 deletions.
26 changes: 26 additions & 0 deletions src/cloudai/_core/command_gen_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,29 @@ def gen_exec_command(self, tr: TestRun) -> str:
str: The generated execution command.
"""
pass

@abstractmethod
def gen_srun_command(self, tr: TestRun) -> str:
"""
Generate the Slurm srun command for a test based on the given parameters.
Args:
tr (TestRun): Contains the test and its run-specific configurations.
Returns:
str: The generated Slurm srun command.
"""
pass

@abstractmethod
def gen_srun_success_check(self, tr: TestRun) -> str:
"""
Generate the Slurm success check command to verify if a test run was successful.
Args:
tr (TestRun): Contains the test and its run-specific configurations.
Returns:
str: The generated command to check the success of the test run.
"""
pass
34 changes: 34 additions & 0 deletions src/cloudai/_core/test_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,40 @@ def gen_exec_command(self, tr: TestRun) -> str:
)
return self.command_gen_strategy.gen_exec_command(tr)

def gen_srun_command(self, tr: TestRun) -> str:
"""
Generate an Slurm srun command for a test using the provided command generation strategy.
Args:
tr (TestRun): Contains the test and its run-specific configurations.
Returns:
str: The generated Slurm srun command.
"""
if self.command_gen_strategy is None:
raise ValueError(
"command_gen_strategy is missing. Ensure the strategy is registered in the Registry "
"by calling the appropriate registration function for the system type."
)
return self.command_gen_strategy.gen_srun_command(tr)

def gen_srun_success_check(self, tr: TestRun) -> str:
"""
Generate a Slurm success check command for a test using the provided command generation strategy.
Args:
tr (TestRun): Contains the test and its run-specific configurations.
Returns:
str: The generated command to check the success of the test run.
"""
if self.command_gen_strategy is None:
raise ValueError(
"command_gen_strategy is missing. Ensure the strategy is registered in the Registry "
"by calling the appropriate registration function for the system type."
)
return self.command_gen_strategy.gen_srun_success_check(tr)

def gen_json(self, tr: TestRun) -> Dict[Any, Any]:
"""
Generate a JSON string representing the Kubernetes job specification for this test using this template.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from pathlib import Path
from typing import Any, Dict, List

from cloudai import TestRun
from cloudai.systems.slurm.strategy import SlurmCommandGenStrategy

from .slurm_install_strategy import NcclTestSlurmInstallStrategy
Expand Down Expand Up @@ -83,3 +84,7 @@ def generate_test_command(
srun_command_parts.append(extra_cmd_args)

return srun_command_parts

def generate_slurm_success_check(self, tr: TestRun) -> str:
output_file = Path(tr.output_path) / "stdout.txt"
return f'grep -q "Avg bus bandwidth" {output_file}'
92 changes: 90 additions & 2 deletions src/cloudai/systems/slurm/strategy/slurm_command_gen_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from pathlib import Path
from typing import Any, Dict, List

from cloudai import CommandGenStrategy, TestRun
from cloudai import CommandGenStrategy, TestRun, TestScenario
from cloudai.systems import SlurmSystem
from cloudai.util.docker_image_cache_manager import DockerImageCacheManager

Expand Down Expand Up @@ -63,8 +63,30 @@ def gen_exec_command(self, tr: TestRun) -> str:
slurm_args = self._parse_slurm_args(
tr.test.test_template.__class__.__name__, env_vars, cmd_args, tr.num_nodes, tr.nodes
)

prologue_command = self.gen_prologue(tr.prologue, tr.output_path) if tr.prologue else "PROLOGUE_SUCCESS=1\n"
srun_command = self._gen_srun_command(slurm_args, env_vars, cmd_args, tr.test.extra_cmd_args)
return self._write_sbatch_script(slurm_args, env_vars, srun_command, tr.output_path)
epilogue_command = self.gen_epilogue(tr.epilogue, tr.output_path) if tr.epilogue else ""

full_command = "\n".join(
[
prologue_command,
"if [ $PROLOGUE_SUCCESS -eq 1 ]; then",
f" {srun_command}",
f" {epilogue_command}",
"fi",
]
).strip()

return self._write_sbatch_script(slurm_args, env_vars, full_command, tr.output_path)

def gen_srun_command(self, tr: TestRun) -> str:
env_vars = self._override_env_vars(self.system.global_env_vars, tr.test.extra_env_vars)
cmd_args = self._override_cmd_args(self.default_cmd_args, tr.test.cmd_args)
slurm_args = self._parse_slurm_args(
tr.test.test_template.__class__.__name__, env_vars, cmd_args, tr.num_nodes, tr.nodes
)
return self._gen_srun_command(slurm_args, env_vars, cmd_args, tr.test.extra_cmd_args)

def _parse_slurm_args(
self,
Expand Down Expand Up @@ -112,6 +134,72 @@ def job_name(self, job_name_prefix: str) -> str:
job_name = f"{self.system.account}-{job_name_prefix}.{datetime.now().strftime('%Y%m%d_%H%M%S')}"
return job_name

def gen_prologue(self, prologue: TestScenario, base_output_path: Path) -> str:
"""
Generate the prologue command by running all tests defined in the prologue test scenario.
Args:
prologue (TestScenario): The prologue test scenario containing the tests to be run.
base_output_path (Path): The base output directory path for storing prologue outputs.
Returns:
str: A string with all the Slurm srun commands generated for the prologue.
"""
if not prologue.test_runs:
return "PROLOGUE_SUCCESS=1\n"

prologue_output_dir = base_output_path / "prologue"
prologue_output_dir.mkdir(parents=True, exist_ok=True)

prologue_commands = []
success_vars = []

for idx, tr in enumerate(prologue.test_runs):
plugin_dir = prologue_output_dir / tr.test.name
plugin_dir.mkdir(parents=True, exist_ok=True)
tr.output_path = plugin_dir

srun_command = tr.test.test_template.gen_srun_command(tr)
prologue_commands.append(srun_command)

success_var = f"SUCCESS_{idx}"
success_vars.append(success_var)

success_check_command = tr.test.test_template.gen_srun_success_check(tr)
prologue_commands.append(f"{success_var}=$({success_check_command})")

combined_success_var = " && ".join(success_vars)
return "\n".join(prologue_commands) + f"\nPROLOGUE_SUCCESS=({combined_success_var})"

def gen_epilogue(self, epilogue: TestScenario, base_output_path: Path) -> str:
"""
Generate the epilogue command by running all tests defined in the epilogue test scenario.
Args:
epilogue (TestScenario): The epilogue test scenario containing the tests to be run.
base_output_path (Path): The base output directory path for storing epilogue outputs.
Returns:
str: A string with all the Slurm srun commands generated for the epilogue.
"""
if not epilogue.test_runs:
return ""

epilogue_output_dir = base_output_path / "epilogue"
epilogue_output_dir.mkdir(parents=True, exist_ok=True)

epilogue_commands = []

for tr in epilogue.test_runs:
plugin_dir = epilogue_output_dir / tr.test.name
plugin_dir.mkdir(parents=True, exist_ok=True)
tr.output_path = plugin_dir

srun_command = tr.test.test_template.gen_srun_command(tr)
epilogue_commands.append(srun_command)

return "\n".join(epilogue_commands)

def _gen_srun_command(
self, slurm_args: Dict[str, Any], env_vars: Dict[str, str], cmd_args: Dict[str, str], extra_cmd_args: str
) -> str:
Expand Down

0 comments on commit 7594c19

Please sign in to comment.