Skip to content

Commit

Permalink
Additional of HPO to CERFACS usecase
Browse files Browse the repository at this point in the history
  • Loading branch information
r-sarma committed Oct 23, 2024
1 parent 06ff1d7 commit 3468c08
Show file tree
Hide file tree
Showing 7 changed files with 394 additions and 16 deletions.
19 changes: 17 additions & 2 deletions use-cases/xtclim/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ The `anomaly.py` file evaluates the network on the available datasets - train, t
## Installation

Please follow the documentation to install the itwinai environment.
After that, install the required libraries with the itwinai environment with:
After that, install the required libraries within the itwinai environment with:

```bash
pip install -r Requirements.txt
Expand All @@ -37,7 +37,7 @@ pip install -r Requirements.txt

The config file `pipeline.yaml` contains all the steps to execute the workflow.
This file also contains all the seasons, and a separate run is launched for each season.
You can launch it from the root of the repository with:
You can launch the pipeline through `train.py` from the root of the repository with:

```bash
python train.py
Expand All @@ -59,6 +59,8 @@ sbatch --export=ALL,DIST_MODE="$DIST_MODE",RUN_NAME="$RUN_NAME",PYTHON_VENV="$PY
```

The results and/or errors are available in `job.out` and `job.err` log files.
Training and inference steps are defined in the pipeline, where distributed resources
are exploited in both the steps.

With MLFLow logger, the logs can be visualized in the MLFlow UI:

Expand All @@ -69,3 +71,16 @@ mlflow ui --backend-store-uri mllogs/mlflow
mlflow ui --backend-store-uri mllogs/mlflow > /dev/null 2>&1 &
```

### Hyperparameter Optimization (HPO)

The repository also provides functionality to perform HPO with Ray. With HPO,
multiple trials with different hyperparameter configurations are run in a distributed
infrastructure, typically in an HPC environment. This allows finding the optimal
configurations which provides the minimal/maximal loss for the investigated network.
The `hpo.py` file contains the implementation, which launches the `pipeline.yaml` pipeline.
To launch an HPO experiment, simply run:
```bash
sbatch slurm_ray.sh
```
The parsing arguments to the `hpo.py` file can be changed to customize the required parameters
that need to be considered in the HPO process.
263 changes: 263 additions & 0 deletions use-cases/xtclim/hpo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
import argparse
import os
import sys
from pathlib import Path
from typing import Dict
import yaml

import matplotlib.pyplot as plt
import ray
import torch
from ray import train, tune

from itwinai.parser import ConfigParser

sys.path.append(os.path.join(os.path.dirname(__file__), 'src'))
sys.path.append(os.path.join(os.path.dirname(__file__), 'preprocessing'))

def read_config(file_path):
with open(file_path, 'r') as f:
config = yaml.safe_load(f)
return config

def run_trial(config: Dict, data: Dict):
"""Execute a single trial using the given configuration (config).
This runs a full training pipeline - you can also specify a pipeline as a dictionary,
e.g. if you only want to run certain parts without changing your config.yaml file
(see below).
Args:
config (dict): A dictionary containing hyperparameters, such as:
- 'batch_size' (int): The size of the batch for training.
- 'lr' (float): The learning rate for the optimizer.
data (dict): A dictionary containing a "pipeline_path" field, which points to the yaml
file containing the pipeline definition
"""
config = read_config('pipeline.yaml')
seasons_list = config['seasons']
for season in seasons_list:
config['pipeline']['init_args']['steps']['training-step']['init_args']['seasons'] = season
model_uri = f"outputs/cvae_model_{season}1d_1memb.pth"
config['pipeline']['init_args']['steps']['evaluation-step']['init_args']['model_uri'] = model_uri
config['pipeline']['init_args']['steps']['evaluation-step']['init_args']['seasons'] = season
parser = ConfigParser(
config=config,
override_keys={
# Set hyperparameters controlled by ray
'batch_size': config['batch_size'],
'lr': config['lr'],
# Override logger field, because performance is logged by ray
#'training_pipeline.init_args.steps.2.init_args.logger': None
}
)
my_pipeline = parser.parse_pipeline(
pipeline_nested_key=data["pipeline_name"],
verbose=False
)
print(f"Running pipeline for season: {season}")
my_pipeline.execute()


def run_hpo(args):
"""Run hyperparameter optimization using Ray Tune.
Either starts a new optimization run or resumes from previous results.
Args:
- args: Command-line arguments parsed by argparse.
"""
if not args.load_old_results:

# Initialize Ray with cluster configuration from environment variables
ray.init(
address=os.environ["ip_head"],
_node_ip_address=os.environ["head_node_ip"],
)

# Define the search space for hyperparameters
search_space = {
'batch_size': tune.choice([64, 128, 256]),
'lr': tune.uniform(1e-5, 1e-3)
}

# TuneConfig for configuring search algorithm and scheduler
tune_config = tune.TuneConfig(
metric=args.metric, # Metric to optimize (loss by default)
mode="min", # Minimize the loss
num_samples=args.num_samples # Number of trials to run
)

# Ray's RunConfig for experiment name and stopping criteria
run_config = train.RunConfig(
name="XTClim-Ray-Experiment",
stop={"training_iteration": args.max_iterations}
)

# Determine GPU and CPU utilization per trial
# We are allocating all available ressources per node evenly across trials
ngpus_per_trial = max(1, args.ngpus // args.num_samples)
ncpus_per_trial = max(1, args.ncpus // args.num_samples)

# Set up Ray Tune Tuner with resources and parameters
resources_per_trial = {"gpu": ngpus_per_trial, "cpu": ncpus_per_trial}
trainable_with_resources = tune.with_resources(
run_trial,
resources=resources_per_trial
)

data = {"pipeline_name": args.pipeline_name}
trainable_with_parameters = tune.with_parameters(
trainable_with_resources,
data=data
)

tuner = tune.Tuner(
trainable_with_parameters,
tune_config=tune_config,
run_config=run_config,
param_space=search_space
)

# Run the hyperparameter optimization and get results
result_grid = tuner.fit()

else:
# Load results from an earlier Ray Tune run
print(f"Loading results from {args.experiment_path}...")

# Restore tuner from saved results
restored_tuner = tune.Tuner.restore(
args.experiment_path,
trainable=run_trial
)
result_grid = restored_tuner.get_results()

# Display experiment statistics
print(f"Number of errored trials: {result_grid.num_errors}")
print(f"Number of terminated trials: {result_grid.num_terminated}")
print(f"Ray Tune experiment path: {result_grid.experiment_path}")

# Get the best result based on the last 10 iterations' average
best_result = result_grid.get_best_result(
scope="last-10-avg",
metric=args.metric,
mode="min"
)
print(f"Best result: {best_result}")

# Print a dataframe with all trial results
result_df = result_grid.get_dataframe()
print(f"All results dataframe: {result_df}")
print(f"All result columns: {result_df.columns}")

# Plot the results for all trials
plot_results(
result_grid,
metric=args.metric,
filename="ray-loss-plot.png"
)
plot_results(
result_grid,
metric="valid_loss",
filename="ray-valid_loss-plot.png"
)


def plot_results(result_grid, metric="loss", filename="plot.png"):
"""Plot the results for all trials and save the plot to a file.
Args:
- result_grid: Results from Ray Tune trials.
- metric: The metric to plot (e.g., 'loss').
- filename: Name of the file to save the plot.
"""
ax = None
for result in result_grid:
label = f"lr={result.config['lr']:.6f}, batch size={result.config['batch_size']}"
if ax is None:
ax = result.metrics_dataframe.plot(
"training_iteration", metric, label=label
)
else:
result.metrics_dataframe.plot(
"training_iteration", metric, ax=ax, label=label
)

ax.set_title(
f"{metric.capitalize()} vs. Training Iteration for All Trials"
)
ax.set_ylabel(metric.capitalize())

plt.savefig(filename)

# Show the plot
plt.show()


# Main entry point for script execution
if __name__ == "__main__":

# Parse command-line arguments
parser = argparse.ArgumentParser(
description='Hyperparameter Optimization with Ray Tune'
)
parser.add_argument(
'--load_old_results',
type=bool,
default=False,
help='Set this to true if you want to load results from an older ray run.'
)
parser.add_argument(
'--pipeline_name',
type=str,
default='training_pipeline',
help='Name of the training pipeline to be used. \
This pipeline has to be defined in a file called "config.yaml". \
Defaults to "training_pipeline"'
)
parser.add_argument(
'--experiment_path',
type=str,
default='~/ray_results/XTClim-Ray-Experiment',
help='Directory where the results of the previous run are stored. \
Set this only if load_old_results is set to True. \
Defaults to ~/ray_results/XTClim-Ray-Experiment'
)
parser.add_argument(
'--num_samples',
type=int,
default=10, help='Number of trials to run'
)
parser.add_argument(
'--ngpus',
type=int,
help='Number of GPUs available on node.'
)
parser.add_argument(
'--ncpus',
type=int,
help='Number of CPUs available on node.'
)
parser.add_argument(
'--metric',
type=str,
default='loss',
help='Metric to optimise.'
)
parser.add_argument(
'--max_iterations',
type=int,
default='20',
help='Maximum iterations per trial'
)

args = parser.parse_args() # Parse the command-line arguments

# Check for available GPU
if torch.cuda.is_available():
device = 'cuda'
print(f"Using GPU: {torch.cuda.get_device_name(torch.cuda.current_device())}")
else:
device = 'cpu'
print("Using CPU")

run_hpo(args)
12 changes: 6 additions & 6 deletions use-cases/xtclim/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ pipeline:
init_args:
experiment_name: XTClim (Cerfacs)
log_freq: epoch
# - class_path: itwinai.loggers.WandBLogger
# init_args:
# log_freq: epoch
- class_path: itwinai.loggers.WandBLogger
init_args:
log_freq: epoch
evaluation-step:
class_path: src.anomaly.XTClimPredictor
init_args:
evaluation: ${evaluation}
batch_size: ${batch_size}
strategy: ${strategy}
evaluation: ${evaluation}
batch_size: ${batch_size}
strategy: ${strategy}
# model_uri and season are dynamically imported
logger:
class_path: itwinai.loggers.LoggersCollection
Expand Down
Loading

0 comments on commit 3468c08

Please sign in to comment.