diff --git a/src/itwinai/cli.py b/src/itwinai/cli.py index 85e527c5..d7a2aa47 100644 --- a/src/itwinai/cli.py +++ b/src/itwinai/cli.py @@ -53,7 +53,8 @@ def generate_gpu_energy_plot( import uuid import matplotlib.pyplot as plt - from itwinai.torch.monitoring.plotting import gpu_energy_plot, read_energy_df + from itwinai.torch.monitoring.plotting import gpu_energy_plot + from itwinai.scalability import convert_matching_files_to_dataframe log_dir_path = Path(log_dir) if not log_dir_path.exists(): @@ -64,7 +65,9 @@ def generate_gpu_energy_plot( if pattern.lower() == "none": pattern = None - gpu_utilization_df = read_energy_df(pattern=pattern, log_dir=log_dir_path) + gpu_utilization_df = convert_matching_files_to_dataframe( + pattern=pattern, log_dir=log_dir_path + ) gpu_energy_plot(gpu_utilization_df=gpu_utilization_df) output_path = Path(output_file) @@ -122,10 +125,10 @@ def generate_communication_plot( import matplotlib.pyplot as plt from itwinai.torch.profiling.communication_plot import ( - create_combined_comm_overhead_df, create_stacked_plot, get_comp_fraction_full_array, ) + from itwinai.scalability import convert_matching_files_to_dataframe log_dir_path = Path(log_dir) if not log_dir_path.exists(): @@ -136,7 +139,16 @@ def generate_communication_plot( if pattern.lower() == "none": pattern = None - communication_df = create_combined_comm_overhead_df(log_dir=log_dir_path, pattern=pattern) + expected_columns = { + "strategy", + "num_gpus", + "global_rank", + "name", + "self_cuda_time_total", + } + communication_df = convert_matching_files_to_dataframe( + log_dir=log_dir_path, pattern=pattern, expected_columns=expected_columns + ) values = get_comp_fraction_full_array(communication_df, print_table=True) strategies = sorted(communication_df["strategy"].unique()) @@ -171,38 +183,7 @@ def generate_communication_plot( @app.command() -def sanity_check( - torch: Annotated[ - Optional[bool], typer.Option(help=("Check also itwinai.torch modules.")) - ] = False, - tensorflow: Annotated[ - Optional[bool], typer.Option(help=("Check also itwinai.tensorflow modules.")) - ] = False, - all: Annotated[Optional[bool], typer.Option(help=("Check all modules."))] = False, -): - """Run sanity checks on the installation of itwinai and its dependencies by trying - to import itwinai modules. By default, only itwinai core modules (neither torch, nor - tensorflow) are tested.""" - from itwinai.tests.sanity_check import ( - sanity_check_all, - sanity_check_slim, - sanity_check_tensorflow, - sanity_check_torch, - ) - - all = (torch and tensorflow) or all - if all: - sanity_check_all() - elif torch: - sanity_check_torch() - elif tensorflow: - sanity_check_tensorflow() - else: - sanity_check_slim() - - -@app.command() -def scalability_report( +def generate_scalability_plot( pattern: Annotated[ str, typer.Option(help="Python pattern matching names of CSVs in sub-folders.") ], @@ -210,7 +191,6 @@ def scalability_report( str, typer.Option(help="Directory location for the data files to read") ], plot_title: Annotated[Optional[str], typer.Option(help=("Plot name."))] = None, - # skip_id: Annotated[Optional[int], typer.Option(help=("Skip epoch ID."))] = None, archive: Annotated[ Optional[str], typer.Option(help=("Archive name to backup the data, without extension.")), @@ -236,6 +216,8 @@ def scalability_report( ) log_dir_path = Path(log_dir) + if pattern.lower() == None: + pattern = None combined_df, csv_files = read_scalability_files( pattern=pattern, log_dir=log_dir_path @@ -243,21 +225,54 @@ def scalability_report( print("Merged CSV:") print(combined_df) - avg_times = ( + avg_time_df = ( combined_df.drop(columns="epoch_id") .groupby(["name", "nodes"]) .mean() .reset_index() ) print("\nAvg over name and nodes:") - print(avg_times.rename(columns=dict(time="avg(time)"))) + print(avg_time_df.rename(columns=dict(time="avg(time)"))) plot_png = f"scaling_plot_{plot_title}.png" - create_absolute_plot(avg_times) - create_relative_plot(plot_title, avg_times) + create_absolute_plot(avg_time_df) + create_relative_plot(plot_title, avg_time_df) if archive is not None: - archive_data(archive, csv_files, plot_png, avg_times) + archive_data(archive, csv_files, plot_png, avg_time_df) + + +@app.command() +def sanity_check( + torch: Annotated[ + Optional[bool], typer.Option(help=("Check also itwinai.torch modules.")) + ] = False, + tensorflow: Annotated[ + Optional[bool], typer.Option(help=("Check also itwinai.tensorflow modules.")) + ] = False, + all: Annotated[Optional[bool], typer.Option(help=("Check all modules."))] = False, +): + """Run sanity checks on the installation of itwinai and its dependencies by trying + to import itwinai modules. By default, only itwinai core modules (neither torch, nor + tensorflow) are tested.""" + from itwinai.tests.sanity_check import ( + sanity_check_all, + sanity_check_slim, + sanity_check_tensorflow, + sanity_check_torch, + ) + + all = (torch and tensorflow) or all + if all: + sanity_check_all() + elif torch: + sanity_check_torch() + elif tensorflow: + sanity_check_tensorflow() + else: + sanity_check_slim() + + @app.command() diff --git a/src/itwinai/loggers.py b/src/itwinai/loggers.py index e827d005..c4798b6d 100644 --- a/src/itwinai/loggers.py +++ b/src/itwinai/loggers.py @@ -62,10 +62,13 @@ from abc import ABC, abstractmethod from contextlib import contextmanager from typing import Any, Dict, List, Literal, Optional, Tuple, Union +from pathlib import Path +from deepspeed.runtime.zero.utils import isinstance_namedtuple import mlflow import prov4ml import wandb +import pandas as pd from typing_extensions import override BASE_EXP_NAME: str = 'default_experiment' @@ -1148,49 +1151,33 @@ def log( class EpochTimeTracker: - """Profiler for epoch execution time used to support scaling tests. - It uses CSV files to store, for each epoch, the ``name`` of the - experiment, the number of compute ``nodes`` used, the ``epoch_id``, - and the execution ``time`` in seconds. - - Args: - series_name (str): name of the experiment/job. - csv_file (str): path to CSV file to store experiments times. - """ - - def __init__(self, series_name: str, csv_file: str) -> None: - self.series_name = series_name - self._data = [] - self.csv_file = csv_file - with open(csv_file, 'w') as csvfile: - csvwriter = csv.writer(csvfile) - csvwriter.writerow(['name', 'nodes', 'epoch_id', 'time']) + """Tracker for epoch execution time during training. """ + + def __init__(self, strategy_name: str, save_path: Union[Path, str], num_nodes: int) -> None: + if isinstance(save_path, str): + save_path = Path(save_path) + + self.save_path: Path = save_path + self.strategy_name = strategy_name + self.num_nodes = num_nodes + self.data = { + "name": [], + "nodes": [], + "epoch_id": [], + "time": [] + } def add_epoch_time(self, epoch_idx: int, time: float) -> None: - """Add row to the current experiment's CSV file in append mode. - - Args: - epoch_idx (int): epoch order idx. - time (float): epoch execution time (seconds). - """ - n_nodes = os.environ.get('SLURM_NNODES', -1) - fields = (self.series_name, n_nodes, epoch_idx, time) - self._data.append(fields) - with open(self.csv_file, 'a') as csvfile: - csvwriter = csv.writer(csvfile) - csvwriter.writerow(fields) - - def save(self, csv_file: Optional[str] = None) -> None: - """Save data to a new CSV file. - - Args: - csv_file (Optional[str], optional): path to the CSV file. - If not given, uses the one given in the constructor. - Defaults to None. - """ - if not csv_file: - csv_file = self.csv_file - with open(csv_file, 'w') as csvfile: - csvwriter = csv.writer(csvfile) - csvwriter.writerow(['name', 'nodes', 'epoch_id', 'time']) - csvwriter.writerows(self._data) + """Add epoch time to data.""" + self.data["epoch_id"].append(epoch_idx) + self.data["time"].append(time) + + def save(self) -> None: + """Save data to a new CSV file. """ + df = pd.DataFrame(self.data) + df["name"] = self.strategy_name + df["nodes"] = self.num_nodes + + self.save_path.parent.mkdir(parents=True, exist_ok=True) + df.to_csv(self.save_path, index=False) + print(f"Saving EpochTimeTracking data to '{self.save_path.resolve()}'.") diff --git a/src/itwinai/scalability.py b/src/itwinai/scalability.py index 19c3e249..44464b10 100644 --- a/src/itwinai/scalability.py +++ b/src/itwinai/scalability.py @@ -11,15 +11,74 @@ import pandas as pd from pathlib import Path +from typing import Optional, Union +from re import compile, Pattern, Match +from itertools import cycle + +def convert_matching_files_to_dataframe( + log_dir: Path, pattern: Optional[str], expected_columns: Optional[set] = None +) -> pd.DataFrame: + """Reads and combines all files in a folder that matches the given regex pattern + into a single DataFrame. The files must be formatted as csv files. If pattern is + None, we assume a match on all files. + + Raises: + ValueError: If not all expected columns are found in the stored DataFrame. + ValueError: If no matching files are found in the given logging directory. + """ + re_pattern: Optional[Pattern] = None + if pattern is not None: + re_pattern = compile(pattern) + + if expected_columns is None: + expected_columns = set() + + dataframes = [] + for entry in log_dir.iterdir(): + match: Union[bool, Match] = True + if re_pattern is not None: + match = re_pattern.search(str(entry)) + + if not match: + continue + + df = pd.read_csv(entry) + if not expected_columns.issubset(df.columns): + missing_columns = expected_columns - set(df.columns) + raise ValueError( + f"Invalid data format! File at '{str(entry)}' doesn't contain all" + f" necessary columns. \nMissing columns: {missing_columns}" + ) + + dataframes.append(df) + + if len(dataframes) == 0: + if pattern is None: + error_message = f"Unable to find any files in {log_dir.resolve()}!" + else: + error_message = ( + f"No files matched pattern, '{pattern}', in log_dir, " + f"{log_dir.resolve()}!" + ) + raise ValueError(error_message) + + return pd.concat(dataframes) def read_scalability_files(pattern: str, log_dir: Path): - pattern_re = re.compile(pattern) all_matching_files = [] dataframes = [] + re_pattern: Optional[Pattern] = None + if pattern is not None: + re_pattern = compile(pattern) for entry in log_dir.iterdir(): - if not pattern_re.search(str(entry)): + match: Union[bool, Match] = True + if re_pattern is not None: + match = re_pattern.search(str(entry)) + + if not match: continue + all_matching_files.append(entry.resolve()) df = pd.read_csv(entry) dataframes.append(df) @@ -62,72 +121,74 @@ def create_absolute_plot(avg_times): sns.reset_orig() -def create_relative_plot(plot_title: str, avg_times): +def create_relative_plot(plot_title: str, avg_epoch_time_df: pd.DataFrame): sns.set_theme() - fig, sp_up_ax = plt.subplots(1, 1, figsize=(6, 4)) + fig, speedup_axis = plt.subplots(1, 1, figsize=(6, 4)) if plot_title is not None: fig.suptitle(plot_title) - sp_up_ax.set_yscale("log") - sp_up_ax.set_xscale("log") + speedup_axis.set_yscale("log") + speedup_axis.set_xscale("log") - markers = iter("ov^s*dXpD.+12348") + marker_cycle = cycle("ov^s*dXpD.+12348") - series_names = sorted(set(avg_times.name.values)) - for name in series_names: - df = avg_times[avg_times.name == name].drop(columns="name") + strategy_names = sorted(set(avg_epoch_time_df.name.values)) + for strategy in strategy_names: + strategy_data = avg_epoch_time_df[avg_epoch_time_df.name == strategy].drop(columns="name") + # Derived columns + strategy_data["num_gpus"] = strategy_data["nodes"] * 4 + strategy_data["ideal_speedup"] = strategy_data["nodes"].astype(float) + base_time = strategy_data["time"].iloc[0] - df["NGPUs"] = df["nodes"] * 4 - # speedup - df["Speedup - ideal"] = df["nodes"].astype(float) - df["Speedup"] = df["time"].iloc[0] / df["time"] - df["Nworkers"] = 1 + strategy_data["speedup"] = base_time / strategy_data["time"] + strategy_data["n_workers"] = 1 - # efficiency - df["Threadscaled Sim. Time / s"] = df["time"] * df["nodes"] * df["Nworkers"] - df["Efficiency"] = ( - df["Threadscaled Sim. Time / s"].iloc[0] / df["Threadscaled Sim. Time / s"] - ) + # Efficiency calculations + strategy_data["scaled_sim_time_s"] = strategy_data["time"] * strategy_data["nodes"] * strategy_data["n_workers"] + base_scaled_time = strategy_data["scaled_sim_time_s"].iloc[0] + strategy_data["efficiency"] = base_scaled_time / strategy_data["scaled_sim_time_s"] - sp_up_ax.plot( - df["NGPUs"].values, - df["Speedup"].values, - marker=next(markers), + speedup_axis.plot( + strategy_data["num_gpus"].values, + strategy_data["speedup"].values, + marker=next(marker_cycle), lw=1.0, - label=name, + label=strategy, alpha=0.7, ) - sp_up_ax.plot( - df["NGPUs"].values, - df["Speedup - ideal"].values, + # Plotting the ideal line + speedup_axis.plot( + avg_epoch_time_df["num_gpus"].values, + avg_epoch_time_df["ideal_speedup"].values, ls="dashed", lw=1.0, c="k", label="ideal", ) - sp_up_ax.legend(ncol=1) + speedup_axis.legend(ncol=1) - sp_up_ax.set_xticks(df["NGPUs"].values) - sp_up_ax.get_xaxis().set_major_formatter(matplotlib.ticker.ScalarFormatter()) + speedup_axis.set_xticks(strategy_data["num_gpus"].values) + speedup_axis.get_xaxis().set_major_formatter(matplotlib.ticker.ScalarFormatter()) - sp_up_ax.set_ylabel("Speedup") - sp_up_ax.set_xlabel("NGPUs (4 per node)") - sp_up_ax.grid() + speedup_axis.set_ylabel("Speedup") + speedup_axis.set_xlabel("Number of GPUs (4 per node)") + speedup_axis.grid() - # Sort legend - handles, labels = sp_up_ax.get_legend_handles_labels() - order = np.argsort(labels) - plt.legend([handles[idx] for idx in order], [labels[idx] for idx in order]) + # Sorted legend + handles, labels = speedup_axis.get_legend_handles_labels() + sorted_handles_labels = sorted(zip(handles, labels), key=lambda x: x[1]) + sorted_handles, sorted_labels = zip(*sorted_handles_labels) + plt.legend(sorted_handles, sorted_labels) - plot_png = f"scaling_plot_{plot_title}.png" + # Save path and save + plot_path = Path(f"scaling_plot_{plot_title}.png") plt.tight_layout() - plt.savefig(plot_png, bbox_inches="tight", format="png", dpi=300) - print("Saved scaling plot to: ", plot_png) + plt.savefig(plot_path, bbox_inches="tight", format="png", dpi=300) + print("Saved scaling plot to:", plot_path) sns.reset_orig() - def archive_data(archive, csv_files, plot_path, avg_times): if "/" in archive: raise ValueError( diff --git a/src/itwinai/torch/monitoring/plotting.py b/src/itwinai/torch/monitoring/plotting.py index 4465a36c..f8cb1143 100644 --- a/src/itwinai/torch/monitoring/plotting.py +++ b/src/itwinai/torch/monitoring/plotting.py @@ -1,6 +1,4 @@ -from pathlib import Path -from re import Match, Pattern, compile -from typing import Optional, Tuple, Union, List +from typing import Tuple, List import matplotlib import matplotlib.pyplot as plt @@ -14,50 +12,6 @@ matplotlib.use("Agg") -def read_energy_df(pattern: Optional[str], log_dir: Path) -> pd.DataFrame: - """Read files matching the given regex pattern from directory and converting them - into a Pandas DataFrame. If pattern is None, we assume a match on all files. - Expects that the existence of ``log_dir`` is handled before calling this function. - - Args: - pattern: The regex string used to match files. - log_dir: The directory to search for files in. - - Raises: - ValueError: If no matching files are found in the given logging directory. - """ - - pattern_re: Optional[Pattern] = None - if pattern is not None: - pattern_re = compile(pattern) - - # Load and concatenate dataframes - dataframes = [] - for entry in log_dir.iterdir(): - match: Union[bool, Match] = True - if pattern_re is not None: - match = pattern_re.search(str(entry)) - - if not match: - continue - - print(f"Loading data from file: '{entry}' when creating energy DataFrame") - df = pd.read_csv(entry) - dataframes.append(df) - - if len(dataframes) == 0: - if pattern is None: - error_message = f"Unable to find any files in {log_dir.resolve()}!" - else: - error_message = ( - f"No files matched pattern, '{pattern}', in log_dir, " - f"{log_dir.resolve()}!" - ) - raise ValueError(error_message) - - return pd.concat(dataframes) - - def calculate_aggregated_energy_expenditure( gpu_utilization_df: pd.DataFrame, ) -> pd.DataFrame: diff --git a/src/itwinai/torch/profiling/communication_plot.py b/src/itwinai/torch/profiling/communication_plot.py index 285a62da..d58832c5 100644 --- a/src/itwinai/torch/profiling/communication_plot.py +++ b/src/itwinai/torch/profiling/communication_plot.py @@ -1,6 +1,4 @@ -from pathlib import Path -from re import Match, Pattern, compile -from typing import Any, List, Optional, Tuple, Union +from typing import Any, List, Tuple import matplotlib import matplotlib.pyplot as plt @@ -9,13 +7,12 @@ import seaborn as sns from matplotlib.patches import Patch +# from itwinai.scalability import convert_matching_files_to_dataframe + # Doing this because otherwise I get an error about X11 Forwarding which I believe # is due to the server trying to pass the image to the client computer matplotlib.use("Agg") -# import logging -# from logging import Logger as PythonLogger - def calculate_comp_and_comm_time(df: pd.DataFrame) -> Tuple[float, float]: """Calculates the time spent computing and time spent communicating and returns a @@ -142,60 +139,6 @@ def create_stacked_plot( return fig, ax -def create_combined_comm_overhead_df( - log_dir: Path, pattern: Optional[str] -) -> pd.DataFrame: - """Reads and combines all files in a folder that matches the given regex pattern - into a single DataFrame. The files must be formatted as csv files. If pattern is - None, we assume a match on all files. - - Raises: - ValueError: If not all expected columns are found in the stored DataFrame. - ValueError: If no matching files are found in the given logging directory. - """ - re_pattern: Optional[Pattern] = None - if pattern is not None: - re_pattern = compile(pattern) - - dataframes = [] - expected_columns = { - "strategy", - "num_gpus", - "global_rank", - "name", - "self_cuda_time_total", - } - for entry in log_dir.iterdir(): - match: Union[bool, Match] = True - if re_pattern is not None: - match = re_pattern.search(str(entry)) - - if not match: - continue - - df = pd.read_csv(entry) - if not expected_columns.issubset(df.columns): - missing_columns = expected_columns - set(df.columns) - raise ValueError( - f"Invalid data format! File at '{str(entry)}' doesn't contain all" - f" necessary columns. \nMissing columns: {missing_columns}" - ) - - dataframes.append(df) - - if len(dataframes) == 0: - if pattern is None: - error_message = f"Unable to find any files in {log_dir.resolve()}!" - else: - error_message = ( - f"No files matched pattern, '{pattern}', in log_dir, " - f"{log_dir.resolve()}!" - ) - raise ValueError(error_message) - - return pd.concat(dataframes) - - def get_comp_fraction_full_array( df: pd.DataFrame, print_table: bool = False ) -> np.ndarray: diff --git a/use-cases/eurac/config.yaml b/use-cases/eurac/config.yaml index 8912e898..64ee45f7 100644 --- a/use-cases/eurac/config.yaml +++ b/use-cases/eurac/config.yaml @@ -6,7 +6,7 @@ tmp_stats: /p/scratch/intertwin/datasets/eurac/stats experiment: "drought use case lstm" run_name: "alps_test" -epochs: 5 +epochs: 2 random_seed: 1010 lr: 0.001 batch_size: 256 diff --git a/use-cases/eurac/trainer.py b/use-cases/eurac/trainer.py index 770c8259..dbc03fc0 100644 --- a/use-cases/eurac/trainer.py +++ b/use-cases/eurac/trainer.py @@ -1,6 +1,6 @@ import os from pathlib import Path -from timeit import default_timer as timer +from timeit import default_timer from typing import Dict, Literal, Optional, Union, Any, Tuple import pandas as pd @@ -148,19 +148,26 @@ def set_epoch(self, epoch: int): self.train_loader.sampler.set_epoch(epoch) self.val_loader.sampler.set_epoch(epoch) - @profile_torch_trainer + # @profile_torch_trainer @measure_gpu_utilization def train(self): """Override version of hython to support distributed strategy.""" # Tracking epoch times for scaling test if self.strategy.is_main_worker: - num_nodes = os.environ.get("SLURM_NNODES", "unk") - series_name = os.environ.get("DIST_MODE", "unk") + "-torch" - file_name = f"epochtime_{series_name}_{num_nodes}N.csv" - file_path = Path("logs_epoch") / file_name + num_nodes = int(os.environ.get("SLURM_NNODES", "unk")) + # series_name = os.environ.get("DIST_MODE", "unk") + "-torch" + # series_name = self.strategy.name + # file_name = f"epochtime_{series_name}_{num_nodes}N.csv" + epoch_time_output_dir = Path("scalability_metrics/epoch-time") + epoch_time_file_name = f"epochtime_{self.strategy.name}_{num_nodes}N.csv" + epoch_time_output_path = epoch_time_output_dir / epoch_time_file_name + epoch_time_tracker = EpochTimeTracker( - series_name=series_name, csv_file=file_path + strategy_name=self.strategy.name, + save_path=epoch_time_output_path, + num_nodes=num_nodes ) + trainer = RNNTrainer( RNNTrainParams( experiment=self.config.experiment, @@ -182,7 +189,7 @@ def train(self): best_loss = float("inf") for epoch in tqdm(range(self.epochs)): - epoch_start_time = timer() + epoch_start_time = default_timer() self.set_epoch(epoch) self.model.train() @@ -249,12 +256,11 @@ def train(self): best_loss = avg_val_loss best_model = self.model.state_dict() - epoch_end_time = timer() - epoch_time_tracker.add_epoch_time( - epoch - 1, epoch_end_time - epoch_start_time - ) + epoch_time = default_timer() - epoch_start_time + epoch_time_tracker.add_epoch_time(epoch + 1, epoch_time) if self.strategy.is_main_worker: + epoch_time_tracker.save() self.model.load_state_dict(best_model) self.log( item=self.model,