Skip to content

Commit

Permalink
rewrite epochtimetracker and refactor scalability plot code
Browse files Browse the repository at this point in the history
  • Loading branch information
jarlsondre committed Oct 29, 2024
1 parent 0ee5714 commit 5b6ebba
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 248 deletions.
99 changes: 57 additions & 42 deletions src/itwinai/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ def generate_gpu_energy_plot(
import uuid
import matplotlib.pyplot as plt

from itwinai.torch.monitoring.plotting import gpu_energy_plot, read_energy_df
from itwinai.torch.monitoring.plotting import gpu_energy_plot
from itwinai.scalability import convert_matching_files_to_dataframe

log_dir_path = Path(log_dir)
if not log_dir_path.exists():
Expand All @@ -64,7 +65,9 @@ def generate_gpu_energy_plot(
if pattern.lower() == "none":
pattern = None

gpu_utilization_df = read_energy_df(pattern=pattern, log_dir=log_dir_path)
gpu_utilization_df = convert_matching_files_to_dataframe(
pattern=pattern, log_dir=log_dir_path
)
gpu_energy_plot(gpu_utilization_df=gpu_utilization_df)

output_path = Path(output_file)
Expand Down Expand Up @@ -122,10 +125,10 @@ def generate_communication_plot(
import matplotlib.pyplot as plt

from itwinai.torch.profiling.communication_plot import (
create_combined_comm_overhead_df,
create_stacked_plot,
get_comp_fraction_full_array,
)
from itwinai.scalability import convert_matching_files_to_dataframe

log_dir_path = Path(log_dir)
if not log_dir_path.exists():
Expand All @@ -136,7 +139,16 @@ def generate_communication_plot(
if pattern.lower() == "none":
pattern = None

communication_df = create_combined_comm_overhead_df(log_dir=log_dir_path, pattern=pattern)
expected_columns = {
"strategy",
"num_gpus",
"global_rank",
"name",
"self_cuda_time_total",
}
communication_df = convert_matching_files_to_dataframe(
log_dir=log_dir_path, pattern=pattern, expected_columns=expected_columns
)
values = get_comp_fraction_full_array(communication_df, print_table=True)

strategies = sorted(communication_df["strategy"].unique())
Expand Down Expand Up @@ -171,46 +183,14 @@ def generate_communication_plot(


@app.command()
def sanity_check(
torch: Annotated[
Optional[bool], typer.Option(help=("Check also itwinai.torch modules."))
] = False,
tensorflow: Annotated[
Optional[bool], typer.Option(help=("Check also itwinai.tensorflow modules."))
] = False,
all: Annotated[Optional[bool], typer.Option(help=("Check all modules."))] = False,
):
"""Run sanity checks on the installation of itwinai and its dependencies by trying
to import itwinai modules. By default, only itwinai core modules (neither torch, nor
tensorflow) are tested."""
from itwinai.tests.sanity_check import (
sanity_check_all,
sanity_check_slim,
sanity_check_tensorflow,
sanity_check_torch,
)

all = (torch and tensorflow) or all
if all:
sanity_check_all()
elif torch:
sanity_check_torch()
elif tensorflow:
sanity_check_tensorflow()
else:
sanity_check_slim()


@app.command()
def scalability_report(
def generate_scalability_plot(
pattern: Annotated[
str, typer.Option(help="Python pattern matching names of CSVs in sub-folders.")
],
log_dir: Annotated[
str, typer.Option(help="Directory location for the data files to read")
],
plot_title: Annotated[Optional[str], typer.Option(help=("Plot name."))] = None,
# skip_id: Annotated[Optional[int], typer.Option(help=("Skip epoch ID."))] = None,
archive: Annotated[
Optional[str],
typer.Option(help=("Archive name to backup the data, without extension.")),
Expand All @@ -236,28 +216,63 @@ def scalability_report(
)

log_dir_path = Path(log_dir)
if pattern.lower() == None:
pattern = None

combined_df, csv_files = read_scalability_files(
pattern=pattern, log_dir=log_dir_path
)
print("Merged CSV:")
print(combined_df)

avg_times = (
avg_time_df = (
combined_df.drop(columns="epoch_id")
.groupby(["name", "nodes"])
.mean()
.reset_index()
)
print("\nAvg over name and nodes:")
print(avg_times.rename(columns=dict(time="avg(time)")))
print(avg_time_df.rename(columns=dict(time="avg(time)")))

plot_png = f"scaling_plot_{plot_title}.png"
create_absolute_plot(avg_times)
create_relative_plot(plot_title, avg_times)
create_absolute_plot(avg_time_df)
create_relative_plot(plot_title, avg_time_df)

if archive is not None:
archive_data(archive, csv_files, plot_png, avg_times)
archive_data(archive, csv_files, plot_png, avg_time_df)


@app.command()
def sanity_check(
torch: Annotated[
Optional[bool], typer.Option(help=("Check also itwinai.torch modules."))
] = False,
tensorflow: Annotated[
Optional[bool], typer.Option(help=("Check also itwinai.tensorflow modules."))
] = False,
all: Annotated[Optional[bool], typer.Option(help=("Check all modules."))] = False,
):
"""Run sanity checks on the installation of itwinai and its dependencies by trying
to import itwinai modules. By default, only itwinai core modules (neither torch, nor
tensorflow) are tested."""
from itwinai.tests.sanity_check import (
sanity_check_all,
sanity_check_slim,
sanity_check_tensorflow,
sanity_check_torch,
)

all = (torch and tensorflow) or all
if all:
sanity_check_all()
elif torch:
sanity_check_torch()
elif tensorflow:
sanity_check_tensorflow()
else:
sanity_check_slim()




@app.command()
Expand Down
75 changes: 31 additions & 44 deletions src/itwinai/loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,13 @@
from abc import ABC, abstractmethod
from contextlib import contextmanager
from typing import Any, Dict, List, Literal, Optional, Tuple, Union
from pathlib import Path

from deepspeed.runtime.zero.utils import isinstance_namedtuple
import mlflow
import prov4ml
import wandb
import pandas as pd
from typing_extensions import override

BASE_EXP_NAME: str = 'default_experiment'
Expand Down Expand Up @@ -1148,49 +1151,33 @@ def log(


class EpochTimeTracker:
"""Profiler for epoch execution time used to support scaling tests.
It uses CSV files to store, for each epoch, the ``name`` of the
experiment, the number of compute ``nodes`` used, the ``epoch_id``,
and the execution ``time`` in seconds.
Args:
series_name (str): name of the experiment/job.
csv_file (str): path to CSV file to store experiments times.
"""

def __init__(self, series_name: str, csv_file: str) -> None:
self.series_name = series_name
self._data = []
self.csv_file = csv_file
with open(csv_file, 'w') as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerow(['name', 'nodes', 'epoch_id', 'time'])
"""Tracker for epoch execution time during training. """

def __init__(self, strategy_name: str, save_path: Union[Path, str], num_nodes: int) -> None:
if isinstance(save_path, str):
save_path = Path(save_path)

self.save_path: Path = save_path
self.strategy_name = strategy_name
self.num_nodes = num_nodes
self.data = {
"name": [],
"nodes": [],
"epoch_id": [],
"time": []
}

def add_epoch_time(self, epoch_idx: int, time: float) -> None:
"""Add row to the current experiment's CSV file in append mode.
Args:
epoch_idx (int): epoch order idx.
time (float): epoch execution time (seconds).
"""
n_nodes = os.environ.get('SLURM_NNODES', -1)
fields = (self.series_name, n_nodes, epoch_idx, time)
self._data.append(fields)
with open(self.csv_file, 'a') as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerow(fields)

def save(self, csv_file: Optional[str] = None) -> None:
"""Save data to a new CSV file.
Args:
csv_file (Optional[str], optional): path to the CSV file.
If not given, uses the one given in the constructor.
Defaults to None.
"""
if not csv_file:
csv_file = self.csv_file
with open(csv_file, 'w') as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerow(['name', 'nodes', 'epoch_id', 'time'])
csvwriter.writerows(self._data)
"""Add epoch time to data."""
self.data["epoch_id"].append(epoch_idx)
self.data["time"].append(time)

def save(self) -> None:
"""Save data to a new CSV file. """
df = pd.DataFrame(self.data)
df["name"] = self.strategy_name
df["nodes"] = self.num_nodes

self.save_path.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(self.save_path, index=False)
print(f"Saving EpochTimeTracking data to '{self.save_path.resolve()}'.")
Loading

0 comments on commit 5b6ebba

Please sign in to comment.