diff --git a/docs/toolbox.generated/Fine_Tuning.ray_fine_tuning_job.rst b/docs/toolbox.generated/Fine_Tuning.ray_fine_tuning_job.rst new file mode 100644 index 0000000000..454fd71033 --- /dev/null +++ b/docs/toolbox.generated/Fine_Tuning.ray_fine_tuning_job.rst @@ -0,0 +1,147 @@ +:orphan: + +.. + _Auto-generated file, do not edit manually ... + _Toolbox generate command: repo generate_toolbox_rst_documentation + _ Source component: Fine_Tuning.ray_fine_tuning_job + + +fine_tuning ray_fine_tuning_job +=============================== + +Run a simple Ray fine-tuning Job. + + + + +Parameters +---------- + + +``name`` + +* The name of the fine-tuning job to create + + +``namespace`` + +* The name of the namespace where the scheduler load will be generated + + +``pvc_name`` + +* The name of the PVC where the model and dataset are stored + + +``model_name`` + +* The name of the model to use inside the /dataset directory of the PVC + + +``ft_scripts_dir`` + +* Directory where the fine-tuning scripts are stored + + +``dataset_name`` + +* The name of the dataset to use inside the /model directory of the PVC + + +``dataset_replication`` + +* Number of replications of the dataset to use, to artificially extend or reduce the fine-tuning effort + +* default value: ``1`` + + +``dataset_transform`` + +* Name of the transformation to apply to the dataset + + +``dataset_prefer_cache`` + +* If True, and the dataset has to be transformed/duplicated, save and/or load it from the PVC + +* default value: ``True`` + + +``dataset_prepare_cache_only`` + +* If True, only prepare the dataset cache file and do not run the fine-tuning. + + +``dataset_response_template`` + +* The delimiter marking the beginning of the response in the dataset samples + +* default value: ``\n### Label:`` + + +``container_image`` + +* The image to use for the fine-tuning container + +* default value: ``quay.io/rhoai/ray:2.35.0-py39-cu121-torch24-fa26`` + + +``ray_version`` + +* The version identifier passed to the RayCluster object + +* default value: ``2.35.0`` + + +``gpu`` + +* The number of GPUs to request for the fine-tuning job + +* default value: ``1`` + + +``memory`` + +* The number of RAM gigs to request for to the fine-tuning job (in Gigs) + +* default value: ``10`` + + +``cpu`` + +* The number of CPU cores to request for the fine-tuning job (in cores) + +* default value: ``1`` + + +``request_equals_limits`` + +* If True, sets the 'limits' of the job with the same value as the request. + + +``prepare_only`` + +* If True, only prepare the environment but do not run the fine-tuning job. + + +``delete_other`` + +* If True, delete the other PyTorchJobs before running + + +``worker_replicas`` + +* Number of worker replicas to deploy + +* default value: ``2`` + + +``hyper_parameters`` + +* Dictionnary of hyper-parameters to pass to sft-trainer + + +``sleep_forever`` + +* If true, sleeps forever instead of running the fine-tuning command. + diff --git a/docs/toolbox.generated/index.rst b/docs/toolbox.generated/index.rst index e9a5e1ad8d..b0c8f7ada3 100644 --- a/docs/toolbox.generated/index.rst +++ b/docs/toolbox.generated/index.rst @@ -80,6 +80,7 @@ Toolbox Documentation +* :doc:`ray_fine_tuning_job ` Run a simple Ray fine-tuning Job. * :doc:`run_fine_tuning_job ` Run a simple fine-tuning Job. * :doc:`run_quality_evaluation ` Run a simple fine-tuning Job. diff --git a/projects/fine_tuning/toolbox/fine_tuning.py b/projects/fine_tuning/toolbox/fine_tuning.py index 3d99ff4ed9..ea9b8fcfd1 100644 --- a/projects/fine_tuning/toolbox/fine_tuning.py +++ b/projects/fine_tuning/toolbox/fine_tuning.py @@ -123,3 +123,74 @@ def run_quality_evaluation( """ return RunAnsibleRole(locals()) + + + @AnsibleRole("fine_tuning_ray_fine_tuning_job") + @AnsibleMappedParams + def ray_fine_tuning_job( + self, + name, + namespace, + pvc_name, + + model_name, + ft_scripts_dir, + + dataset_name, + dataset_replication=1, + dataset_transform=None, + dataset_prefer_cache=True, + dataset_prepare_cache_only=False, + dataset_response_template="\n### Label:", + container_image="quay.io/rhoai/ray:2.35.0-py39-cu121-torch24-fa26", + ray_version="2.35.0", + gpu=1, + memory=10, + cpu=1, + request_equals_limits=False, + + prepare_only=False, + delete_other=False, + + worker_replicas=2, + + hyper_parameters={}, + + sleep_forever=False, + ): + """ + Run a simple Ray fine-tuning Job. + + Args: + name: the name of the fine-tuning job to create + namespace: the name of the namespace where the scheduler load will be generated + pvc_name: the name of the PVC where the model and dataset are stored + + model_name: the name of the model to use inside the /dataset directory of the PVC + + ft_scripts_dir: directory where the fine-tuning scripts are stored + + dataset_name: the name of the dataset to use inside the /model directory of the PVC + dataset_replication: number of replications of the dataset to use, to artificially extend or reduce the fine-tuning effort + dataset_transform: name of the transformation to apply to the dataset + dataset_prefer_cache: if True, and the dataset has to be transformed/duplicated, save and/or load it from the PVC + dataset_prepare_cache_only: if True, only prepare the dataset cache file and do not run the fine-tuning. + dataset_response_template: the delimiter marking the beginning of the response in the dataset samples + container_image: the image to use for the fine-tuning container + gpu: the number of GPUs to request for the fine-tuning job + memory: the number of RAM gigs to request for to the fine-tuning job (in Gigs) + cpu: the number of CPU cores to request for the fine-tuning job (in cores) + request_equals_limits: if True, sets the 'limits' of the job with the same value as the request. + + prepare_only: if True, only prepare the environment but do not run the fine-tuning job. + delete_other: if True, delete the other PyTorchJobs before running + + worker_replicas: number of worker replicas to deploy + + hyper_parameters: dictionnary of hyper-parameters to pass to sft-trainer + + sleep_forever: if true, sleeps forever instead of running the fine-tuning command. + ray_version: the version identifier passed to the RayCluster object + """ + + return RunAnsibleRole(locals()) diff --git a/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/defaults/main/config.yml b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/defaults/main/config.yml new file mode 100644 index 0000000000..f7e0d891dc --- /dev/null +++ b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/defaults/main/config.yml @@ -0,0 +1,78 @@ +# Auto-generated file, do not edit manually ... +# Toolbox generate command: repo generate_ansible_default_settings +# Source component: Fine_Tuning.ray_fine_tuning_job + +# Parameters +# the name of the fine-tuning job to create +# Mandatory value +fine_tuning_ray_fine_tuning_job_name: + +# the name of the namespace where the scheduler load will be generated +# Mandatory value +fine_tuning_ray_fine_tuning_job_namespace: + +# the name of the PVC where the model and dataset are stored +# Mandatory value +fine_tuning_ray_fine_tuning_job_pvc_name: + +# the name of the model to use inside the /dataset directory of the PVC +# Mandatory value +fine_tuning_ray_fine_tuning_job_model_name: + +# directory where the fine-tuning scripts are stored +# Mandatory value +fine_tuning_ray_fine_tuning_job_ft_scripts_dir: + +# the name of the dataset to use inside the /model directory of the PVC +# Mandatory value +fine_tuning_ray_fine_tuning_job_dataset_name: + +# number of replications of the dataset to use, to artificially extend or reduce the fine-tuning effort +fine_tuning_ray_fine_tuning_job_dataset_replication: 1 + +# name of the transformation to apply to the dataset +fine_tuning_ray_fine_tuning_job_dataset_transform: null + +# if True, and the dataset has to be transformed/duplicated, save and/or load it from the PVC +fine_tuning_ray_fine_tuning_job_dataset_prefer_cache: true + +# if True, only prepare the dataset cache file and do not run the fine-tuning. +fine_tuning_ray_fine_tuning_job_dataset_prepare_cache_only: false + +# the delimiter marking the beginning of the response in the dataset samples +fine_tuning_ray_fine_tuning_job_dataset_response_template: ' + + ### Label:' + +# the image to use for the fine-tuning container +fine_tuning_ray_fine_tuning_job_container_image: quay.io/rhoai/ray:2.35.0-py39-cu121-torch24-fa26 + +# the version identifier passed to the RayCluster object +fine_tuning_ray_fine_tuning_job_ray_version: 2.35.0 + +# the number of GPUs to request for the fine-tuning job +fine_tuning_ray_fine_tuning_job_gpu: 1 + +# the number of RAM gigs to request for to the fine-tuning job (in Gigs) +fine_tuning_ray_fine_tuning_job_memory: 10 + +# the number of CPU cores to request for the fine-tuning job (in cores) +fine_tuning_ray_fine_tuning_job_cpu: 1 + +# if True, sets the 'limits' of the job with the same value as the request. +fine_tuning_ray_fine_tuning_job_request_equals_limits: false + +# if True, only prepare the environment but do not run the fine-tuning job. +fine_tuning_ray_fine_tuning_job_prepare_only: false + +# if True, delete the other PyTorchJobs before running +fine_tuning_ray_fine_tuning_job_delete_other: false + +# number of worker replicas to deploy +fine_tuning_ray_fine_tuning_job_worker_replicas: 2 + +# dictionnary of hyper-parameters to pass to sft-trainer +fine_tuning_ray_fine_tuning_job_hyper_parameters: {} + +# if true, sleeps forever instead of running the fine-tuning command. +fine_tuning_ray_fine_tuning_job_sleep_forever: false diff --git a/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/entrypoint/convert_alpaca.py b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/entrypoint/convert_alpaca.py new file mode 100644 index 0000000000..17e8a07d4c --- /dev/null +++ b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/entrypoint/convert_alpaca.py @@ -0,0 +1,36 @@ +import sys +import pathlib + +import datasets + +PROMPT_DICT = { + "prompt_input": ( + "Below is an instruction that describes a task, paired with an input that provides further context. " + "Write a response that appropriately completes the request.\n\n" + "### Instruction:\n{instruction}\n\n### Input:\n{input}\n\n### Label:" + ), + "prompt_no_input": ( + "Below is an instruction that describes a task. " + "Write a response that appropriately completes the request.\n\n" + "### Instruction:\n{instruction}\n\n### Label:" + ), +} + +src = pathlib.Path(sys.argv[1]) +dest = pathlib.Path(sys.argv[2]) + +def format_alpaca_fn(example): + prompt_input, prompt_no_input = PROMPT_DICT['prompt_input'], PROMPT_DICT['prompt_no_input'] + output = prompt_input.format_map(example) if example.get("input", "") != "" else prompt_no_input.format_map(example) + output = f"{output} {example['output']}" + return {"output": output} + + +print(f"Converting {src} from Alpaca format to SFTTrainer ...") +ds = datasets.load_dataset('json', data_files=str(src)) + +alpaca_ds = ds['train'].map(format_alpaca_fn, remove_columns=['instruction', 'input']) + +print(f"Saving into {dest} ...") + +alpaca_ds.to_json(dest) diff --git a/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/entrypoint/convert_dataset_helper.py b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/entrypoint/convert_dataset_helper.py new file mode 100644 index 0000000000..1fd2ef5d95 --- /dev/null +++ b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/entrypoint/convert_dataset_helper.py @@ -0,0 +1,56 @@ +from transformers import AutoTokenizer +import logging +import json +import os + +logger = logging.getLogger("sft_trainer") + +def load_fms_hf_tuning_configuration(): + config_file = os.environ.get("SFT_TRAINER_CONFIG_JSON_PATH") + if not config_file: + logging.warn("No SFT_TRAINER_CONFIG_JSON_PATH available...") + return {} + + with open(config_file) as f: + config = json.load(f) + + return config + + +def get_tokenizer(path_model): + tokenizer = AutoTokenizer.from_pretrained(path_model) + + special_tokens_dict = dict() + + DEFAULT_PAD_TOKEN = "" + DEFAULT_EOS_TOKEN = "" + DEFAULT_BOS_TOKEN = "" + DEFAULT_UNK_TOKEN = "" + + if tokenizer.pad_token is None: + logger.warning("PAD token set to default, missing in tokenizer") + special_tokens_dict["pad_token"] = DEFAULT_PAD_TOKEN + if tokenizer.eos_token is None: + logger.warning("EOS token set to default, missing in tokenizer") + special_tokens_dict["eos_token"] = DEFAULT_EOS_TOKEN + if tokenizer.bos_token is None: + logger.warning("BOS token set to default, missing in tokenizer") + special_tokens_dict["bos_token"] = DEFAULT_BOS_TOKEN + if tokenizer.unk_token is None: + logger.warning("UNK token set to default, missing in tokenizer") + special_tokens_dict["unk_token"] = DEFAULT_UNK_TOKEN + + tokenizer.add_special_tokens(special_tokens_dict) + + return tokenizer + + +def get_tokens(tokenizer, line): + data = json.loads(line) + decoded = tokenizer.encode(data["output"], padding=True) + + return decoded + + +def get_token_count(tokenizer, line): + return len(get_tokens(tokenizer, line)) diff --git a/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/entrypoint/convert_replicate.py b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/entrypoint/convert_replicate.py new file mode 100644 index 0000000000..48c68bb941 --- /dev/null +++ b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/entrypoint/convert_replicate.py @@ -0,0 +1,75 @@ +#! /usr/bin/env python + +import sys +import pathlib +import random + +import convert_dataset_helper + +fms_config = convert_dataset_helper.load_fms_hf_tuning_configuration() + +max_seq_length = fms_config["max_seq_length"] +tokenizer = convert_dataset_helper.get_tokenizer(fms_config["model_name_or_path"]) + +src = pathlib.Path(sys.argv[1]) +dst = pathlib.Path(sys.argv[2]) +FACTOR = float(sys.argv[3]) + +print(f"Replicating {src} with a factor of {FACTOR}...") +print(f"Filtering out samples with more than {max_seq_length=} tokens") + +with open(src) as src_f: + orig_length = len(src_f.readlines()) + print(f"Length of {src}: {orig_length} lines") + +dst.unlink(missing_ok=True) + +factor = FACTOR +samples_too_long = 0 +while factor >= 1: + print(f"Saving 1x {src} ...") + with open(src) as src_f, open(dst, "a") as dst_f: + for line in src_f.readlines(): + if convert_dataset_helper.get_token_count(tokenizer, line) > max_seq_length: + if factor == FACTOR: # count them only once + samples_too_long += 1 + continue + print(line.strip(), file=dst_f) + + factor -= 1 + + with open(dst) as dst_f: + new_length = len(dst_f.readlines()) + print(f"Length of {dst}: {new_length} lines") + +if 0 < factor < 1: + newline_count = int(orig_length * factor) + + print(f"Saving {factor}x {src}: {newline_count}/{orig_length} lines ...") + + with open(src) as src_f: + data = src_f.readlines() + + # use a fixed seed to get always the same results + random.Random(4).shuffle(data) + data_it = iter(data) + with open(dst, "a") as dst_f: + lines = 0 + while lines < newline_count: + line = next(data_it) + + if convert_dataset_helper.get_token_count(tokenizer, line) > max_seq_length: + if factor == FACTOR: # count them only once + samples_too_long += 1 + continue + + print(line.strip(), file=dst_f) + lines += 1 + + +with open(dst) as dst_f: + new_length = len(dst_f.readlines()) + + +print(f"Length of {dst}: {new_length} lines") +print(f"Removed {samples_too_long} samples longer than {max_seq_length=} tokens.") diff --git a/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/entrypoint/job_entrypoint.sh b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/entrypoint/job_entrypoint.sh new file mode 100644 index 0000000000..55462c56fc --- /dev/null +++ b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/entrypoint/job_entrypoint.sh @@ -0,0 +1,109 @@ +# !/bin/bash + +set -o pipefail +set -o errexit +set -o nounset +set -o errtrace +set -x + +# echo "Source dataset: $DATASET_SOURCE" + +# MAX_SEQ_LENGTH=$(cat "$FT_CONFIG_JSON_PATH" | grep max_seq_length | awk '{print $2}' | cut -d"," -f1) +# DATASET_CACHE_FILE="/mnt/storage/dataset/$(basename "${DATASET_TRANSFORM:-}")_replicate_${DATASET_REPLICATION}_max${MAX_SEQ_LENGTH}tokens_$(basename "${DATASET_SOURCE}")" + +# prepare_dataset() { +# if [[ -f "${DATASET_CACHE_FILE:-}" ]]; then +# echo "Found dataset cache file $DATASET_PREFER_CACHE. Not regenerating it." +# return +# fi + +# if [[ "${DATASET_TRANSFORM:-}" ]]; then +# echo "Dataset transformation: $DATASET_TRANSFORM" + +# python "$DATASET_TRANSFORM" "$DATASET_SOURCE" "$DATASET_CACHE_FILE" +# else +# cp "$DATASET_SOURCE" "$DATASET_CACHE_FILE" +# fi + +# if [[ "${DATASET_REPLICATION:-1}" != 1 ]]; then +# echo "Dataset replication factor: $DATASET_REPLICATION" +# python /mnt/entrypoint/convert_replicate.py "$DATASET_CACHE_FILE" /tmp/temp_ds.json "$DATASET_REPLICATION" +# mv /tmp/temp_ds.json "$DATASET_CACHE_FILE" +# fi +# } + +# prepare_dataset + +# echo "# sha256sum of the dataset files" +# sha256sum "$DATASET_SOURCE" "$DATASET_CACHE_FILE" + +DATASET_FILE=/mnt/storage/dataset/ray-finetune-llm-deepspeed_train.jsonl +DATASET_TEST_FILE=/mnt/storage/dataset/ray-finetune-llm-deepspeed_test.jsonl + +if [[ "${DATASET_PREPARE_CACHE_ONLY:-0}" == true ]]; then + echo "DATASET_PREPARE_CACHE_ONLY is set, stopping here." + exit 0 +fi + +echo "# configuration:" +cat "$FT_CONFIG_JSON_PATH" + +echo "# sha256sum of the $MODEL_NAME files" +if [[ -f "/mnt/storage/model/${MODEL_NAME}.sha256sum" ]]; then + cat "/mnt/storage/model/${MODEL_NAME}.sha256sum" +else + time find "/mnt/storage/model/$MODEL_NAME" ! -path '*/.git/*' -type f -exec sha256sum {} \; | tee -a "/mnt/storage/model/${MODEL_NAME}.sha256sum" +fi + +if [[ -e /dev/nvidiactl ]]; then + echo "# GPU available:" + nvidia-smi -L +else + echo "No GPU seem to be available." +fi + +cd /mnt/ft-scripts + +if [[ "${SLEEP_FOREVER:-}" ]]; then + set +x + echo "Sleep flag enabled, sleeping forever." + echo "Fine-tuning command:" + cat <Job retries 3 times ... +fi + +find "$DEST_DIR" -type f +rm -r "$DEST_DIR" diff --git a/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/entrypoint/prepare_dataset.sh b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/entrypoint/prepare_dataset.sh new file mode 100644 index 0000000000..a48e527e5d --- /dev/null +++ b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/entrypoint/prepare_dataset.sh @@ -0,0 +1,72 @@ +# !/bin/bash + +set -o pipefail +set -o errexit +set -o nounset +set -o errtrace +set -x + +echo "Source dataset: $DATASET_SOURCE" + +prepare_dataset() { + MAX_SEQ_LENGTH=$(cat "$FT_CONFIG_JSON_PATH" | grep max_seq_length | awk '{print $2}' | cut -d"," -f1) + DATASET_PREFER_CACHE_FILE="/mnt/storage/dataset/$(basename "${DATASET_TRANSFORM:-}")_replicate_${DATASET_REPLICATION}_max${MAX_SEQ_LENGTH}tokens_$(basename "${DATASET_SOURCE}")" + if [[ -n "${DATASET_PREFER_CACHE:-}" && -f "${DATASET_PREFER_CACHE_FILE:-}" ]]; then + echo "Found dataset cache file $DATASET_PREFER_CACHE. Not regenerating it." + cp "$DATASET_PREFER_CACHE_FILE" "$DATASET_DEST" + return + fi + + if [[ "${DATASET_TRANSFORM:-}" ]]; then + echo "Dataset transformation: $DATASET_TRANSFORM" + + python "$DATASET_TRANSFORM" "$DATASET_SOURCE" "$DATASET_DEST" + else + cp "$DATASET_SOURCE" "$DATASET_DEST" + fi + + if [[ "${DATASET_REPLICATION:-1}" != 1 ]]; then + echo "Dataset replication factor: $DATASET_REPLICATION" + python /mnt/entrypoint/convert_replicate.py "$DATASET_DEST" /tmp/temp_ds.json "$DATASET_REPLICATION" + mv /tmp/temp_ds.json "$DATASET_DEST" + fi + + if [[ -n "${DATASET_PREFER_CACHE:-}" ]]; then + echo "Saving dataset cache into $DATASET_PREFER_CACHE_FILE" + cp "$DATASET_DEST" "$DATASET_PREFER_CACHE_FILE" + fi +} + +prepare_dataset + +CACHE_FILE="${DATASET_DEST}.study_dataset.cache" +if [ ! -f "$CACHE_FILE" ]; then + + SFT_TRAINER_CONFIG_JSON_PATH="$FT_CONFIG_JSON_PATH" python /mnt/entrypoint/study_dataset.py > "$CACHE_FILE" +fi +cat "$CACHE_FILE" + +echo "# sha256sum of the dataset files" +sha256sum "$DATASET_SOURCE" "$DATASET_DEST" + +if [[ "${DATASET_PREPARE_CACHE_ONLY:-0}" == true ]]; then + echo "DATASET_PREPARE_CACHE_ONLY is set, stopping here." + exit 0 +fi + +echo "# configuration:" +cat "$FT_CONFIG_JSON_PATH" + +echo "# sha256sum of the $MODEL_NAME files" +if [[ -f "/mnt/storage/model/${MODEL_NAME}.sha256sum" ]]; then + cat "/mnt/storage/model/${MODEL_NAME}.sha256sum" +else + time find "/mnt/storage/model/$MODEL_NAME" ! -path '*/.git/*' -type f -exec sha256sum {} \; | tee -a "/mnt/storage/model/${MODEL_NAME}.sha256sum" +fi + +if [[ -e /dev/nvidiactl ]]; then + echo "# GPU available:" + nvidia-smi -L +else + echo "No GPU seem to be available." +fi diff --git a/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/entrypoint/ray_entrypoint.sh b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/entrypoint/ray_entrypoint.sh new file mode 100644 index 0000000000..9fd1203402 --- /dev/null +++ b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/entrypoint/ray_entrypoint.sh @@ -0,0 +1,63 @@ +# !/bin/bash + +set -o pipefail +set -o errexit +set -o nounset +set -o errtrace +set -x + +echo "# configuration:" +cat "$FT_CONFIG_JSON_PATH" + +echo "# sha256sum of the $MODEL_NAME files" +if [[ -f "/mnt/storage/model/${MODEL_NAME}.sha256sum" ]]; then + cat "/mnt/storage/model/${MODEL_NAME}.sha256sum" +else + time find "/mnt/storage/model/$MODEL_NAME" ! -path '*/.git/*' -type f -exec sha256sum {} \; | tee -a "/mnt/storage/model/${MODEL_NAME}.sha256sum" +fi + +if [[ -e /dev/nvidiactl ]]; then + echo "# GPU available:" + nvidia-smi -L +else + echo "No GPU seem to be available." +fi + +if [[ "${SLEEP_FOREVER:-}" ]]; then + set +x + echo "Sleep flag enabled, sleeping forever." + echo "Fine-tuning command:" + cat < [!IMPORTANT] +> This example has been tested with the configurations listed in the [validation](#validation) section. +> Its configuration space is highly dimensional, with application configuration tighly coupled to runtime / hardware configuration. +> It is your responsibility to adapt it, and validate it works as expected, with your configuration(s), on your target environment(s). + +## Requirements + +* An OpenShift cluster with OpenShift AI (RHOAI) 2.10+ installed: + * The `codeflare`, `dashboard`, `ray` and `workbenches` components enabled; +* Sufficient worker nodes for your configuration(s) with NVIDIA GPUs (Ampere-based recommended) or AMD GPUs (AMD Instinct MI300X); +* An AWS S3 bucket to store experimentation results. + +## Setup + +* Access the OpenShift AI dashboard, for example from the top navigation bar menu: +![](./docs/01.png) +* Log in, then go to _Data Science Projects_ and create a project: +![](./docs/02.png) +* Once the project is created, click on _Create a workbench_: +![](./docs/03.png) +* Then create a workbench with the following settings: +![](./docs/04a.png) +![](./docs/04b.png) +> [!NOTE] +> You can reuse an existing data connection, if you have one already configured for S3. +* And click on _Open_ when it's ready: +![](./docs/05.png) +* From the Notebook server, clone this repository, i.e., `https://github.com/opendatahub-io/distributed-workloads.git`: +![](./docs/06.png) +* Navigate to the `distributed-workloads/examples/ray-finetune-llm-deepspeed` directory and open the `ray-finetune-llm-deepspeed` notebook: +![](./docs/07.png) +* Finally, change the connection parameters for the CodeFlare SDK. + Mind the Ray cluster is configured to use NVIDIA GPUs by default. + If you use different accelerators, e.g. AMD GPUs, the Ray cluster configuration must be changed accordingly. + +## Experimentation + +Once you've reviewed your setup is correct, you can execute the notebook step-by-step. +It creates a Ray cluster, and submits the fine-tuning job to it. + +Once you've run the `cluster.details()` command, it prints the Ray cluster dashboard URL, that you can click to access the UI: + +![Ray Dashboard](./docs/dashboard.png) + +You can also setup [TensorBoard](https://github.com/tensorflow/tensorboard) to visualise your training experiments, as you change hyper-parameters, by running the following commands from a terminal: + +* Install TensorBoard in the Ray head node: + ```console + kubectl exec `kubectl get pod -l ray.io/node-type=head -o name` -- pip install tensorboard + ``` +* Start TensorBoard server: + ```console + kubectl exec `kubectl get pod -l ray.io/node-type=head -o name` -- tensorboard --logdir /tmp/ray --bind_all --port 6006 + ``` +* Port-foward the TensorBoard UI endpoint: + ```console + kubectl port-forward `kubectl get pod -l ray.io/node-type=head -o name` 6006:6006 + ``` + +You can then access TensorBoard from your Web browser at http://localhost:6006. + +Here is an example of a visualization, that compares experimentations with different combinations of context length and batch size: +![TensorBoard](./docs/tensorboard.png) + +> [!IMPORTANT] +> TensorBoard is not part of OpenShift AI. + +By default, at the end of each epoch, the checkpoints are stored in the configured S3 bucket, e.g.: + +![AWS S3](./docs/s3.png) + +## Validation + +This example has been validated on the following configurations: + +### Llama 3 8B - GSM8k - LoRA + +* OpenShift cluster: + * ROSA-hosted 4.14.20 + * 5 `g5.8xlarge` (NVIDIA A10 GPU) worker nodes +* Ray cluster: + ```python + ClusterConfiguration( + num_workers=4, + worker_cpu_requests=8, + worker_cpu_limits=16, + head_cpus=8, + worker_memory_requests=32, + worker_memory_limits=64, + head_memory=64, + head_extended_resource_requests={'nvidia.com/gpu':1}, + worker_extended_resource_requests={'nvidia.com/gpu':1}, + ) + ``` +* Ray job: + ```python + ray_finetune_llm_deepspeed.py " + "--model-name=meta-llama/Meta-Llama-3.1-8B " + "--ds-config=./deepspeed_configs/zero_3_offload_optim_param.json " + f"--storage-path=s3://{s3_bucket}/ray_finetune_llm_deepspeed/ " + "--lora " + "--num-devices=5 " + "--batch-size-per-device=6 " + "--eval-batch-size-per-device=8 " + ``` + +### Llama 3 8B - GSM8k - LoRA + +* OpenShift cluster: + * OCP 4.14.35, 4 AMD MI300X GPUs / single node +* Ray cluster: + ```python + ClusterConfiguration( + num_workers=3, + worker_cpu_requests=8, + worker_cpu_limits=16, + head_cpus=16, + worker_memory_requests=96, + worker_memory_limits=96, + head_memory=96, + head_extended_resource_requests={'amd.com/gpu':1}, + worker_extended_resource_requests={'amd.com/gpu':1}, + image="quay.io/rhoai/ray:2.35.0-py39-rocm61-torch24-fa26", + ) + ``` +* Ray job: + ```python + ray_finetune_llm_deepspeed.py " + "--model-name=meta-llama/Meta-Llama-3.1-8B " + "--ds-config=./deepspeed_configs/zero_3_offload_optim_param.json " + f"--storage-path=s3://{s3_bucket}/ray_finetune_llm_deepspeed/ " + "--lora " + "--num-devices=4 " + "--batch-size-per-device=64 " + "--eval-batch-size-per-device=64 " + "--ctx-len=1024" + ``` + +### Llama 2 7B - GSM8k - LoRA + +* OpenShift cluster: + * ROSA-hosted 4.14.20 + * 6 `g5.8xlarge` (NVIDIA A10 GPU) worker nodes +* Ray cluster: + ```python + ClusterConfiguration( + num_workers=5, + worker_cpu_requests=8, + worker_cpu_limits=8, + head_cpus=16, + worker_memory_requests=48, + worker_memory_limits=48, + head_memory=48, + head_extended_resource_requests={'nvidia.com/gpu':1}, + worker_extended_resource_requests={'nvidia.com/gpu':1}, + ) + ``` +* Ray job: + ```python + ray_finetune_llm_deepspeed.py " + "--model-name=meta-llama/Llama-2-7b-chat-hf " + "--ds-config=./deepspeed_configs/zero_3_llama_2_7b.json " + f"--storage-path=s3://{s3_bucket}/ray_finetune_llm_deepspeed/ " + "--lora " + "--num-devices=6 " + "--batch-size-per-device=16 " + "--eval-batch-size-per-device=32 " + ``` + +### Llama 2 13B - GSM8k - LoRA + +* OpenShift cluster: + * ROSA-hosted 4.14.20 + * 6 `g5.8xlarge` (NVIDIA A10 GPU) worker nodes +* Ray cluster: + ```python + ClusterConfiguration( + num_workers=5, + worker_cpu_requests=8, + worker_cpu_limits=8, + head_cpus=16, + worker_memory_requests=48, + worker_memory_limits=48, + head_memory=48, + head_extended_resource_requests={'nvidia.com/gpu':1}, + worker_extended_resource_requests={'nvidia.com/gpu':1}, + ) + ``` +* Ray job: + ```python + ray_finetune_llm_deepspeed.py " + "--model-name=meta-llama/Llama-2-13b-chat-hf " + "--ds-config=./deepspeed_configs/zero_3_llama_2_13b.json " + f"--storage-path=s3://{s3_bucket}/ray_finetune_llm_deepspeed/ " + "--lora " + "--num-devices=6 " + "--batch-size-per-device=16 " + "--eval-batch-size-per-device=32 " + ``` + +### Llama 2 70B - GSM8k - LoRA + +* OpenShift cluster: + * DGX A100 Server (8x NVIDIA A100 / 40GB HBM) +* Ray cluster: + ```python + ClusterConfiguration( + num_workers=7, + worker_cpu_requests=16, + worker_cpu_limits=16, + head_cpus=16, + worker_memory_requests=128, + worker_memory_limits=128, + head_memory=128, + head_extended_resource_requests={'nvidia.com/gpu':1}, + worker_extended_resource_requests={'nvidia.com/gpu':1}, + ) + ``` +* Ray job: + ```python + ray_finetune_llm_deepspeed.py " + "--model-name=meta-llama/Llama-2-70b-chat-hf " + "--ds-config=./deepspeed_configs/zero_3_llama_2_70b.json " + f"--storage-path=s3://{s3_bucket}/ray_finetune_llm_deepspeed/ " + "--lora " + "--num-devices=8 " + "--batch-size-per-device=8 " + "--eval-batch-size-per-device=8 " + ``` + + +[^1]: https://github.com/ray-project/ray/tree/master/doc/source/templates/04_finetuning_llms_with_deepspeed diff --git a/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/deepspeed_configs/zero_3_llama_2_13b.json b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/deepspeed_configs/zero_3_llama_2_13b.json new file mode 100644 index 0000000000..aa036d7401 --- /dev/null +++ b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/deepspeed_configs/zero_3_llama_2_13b.json @@ -0,0 +1,35 @@ +{ + "fp16": { + "enabled": "auto" + }, + "bf16": { + "enabled": "auto" + }, + "zero_optimization": { + "stage": 3, + "offload_optimizer": { + "device": "cpu", + "pin_memory": true + }, + "offload_param": { + "device": "cpu", + "pin_memory": true + }, + "overlap_comm": true, + "contiguous_gradients": true, + "sub_group_size": 1e9, + "reduce_bucket_size": 5e8, + "stage3_prefetch_bucket_size": 5e8, + "stage3_param_persistence_threshold": 1e6, + "stage3_max_live_parameters": 1e9, + "stage3_max_reuse_distance": 1e9, + "stage3_gather_16bit_weights_on_model_save": true, + "round_robin_gradients": true + }, + "gradient_accumulation_steps": "auto", + "gradient_clipping": "auto", + "steps_per_print": 10, + "train_batch_size": "auto", + "train_micro_batch_size_per_gpu": "auto", + "wall_clock_breakdown": false +} \ No newline at end of file diff --git a/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/deepspeed_configs/zero_3_llama_2_70b.json b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/deepspeed_configs/zero_3_llama_2_70b.json new file mode 100644 index 0000000000..23c70b4f7e --- /dev/null +++ b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/deepspeed_configs/zero_3_llama_2_70b.json @@ -0,0 +1,28 @@ +{ + "fp16": { + "enabled": false + }, + "bf16": { + "enabled": true + }, + "zero_optimization": { + "stage": 3, + "offload_optimizer": { + "device": "cpu", + "pin_memory": false + }, + "overlap_comm": true, + "contiguous_gradients": true, + "reduce_bucket_size": "auto", + "stage3_prefetch_bucket_size": "auto", + "stage3_param_persistence_threshold": "auto", + "gather_16bit_weights_on_model_save": true, + "round_robin_gradients": true + }, + "gradient_accumulation_steps": "auto", + "gradient_clipping": "auto", + "steps_per_print": 10, + "train_batch_size": "auto", + "train_micro_batch_size_per_gpu": "auto", + "wall_clock_breakdown": false +} \ No newline at end of file diff --git a/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/deepspeed_configs/zero_3_llama_2_7b.json b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/deepspeed_configs/zero_3_llama_2_7b.json new file mode 100644 index 0000000000..f1ddac17f2 --- /dev/null +++ b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/deepspeed_configs/zero_3_llama_2_7b.json @@ -0,0 +1,35 @@ +{ + "fp16": { + "enabled": "auto" + }, + "bf16": { + "enabled": "auto" + }, + "zero_optimization": { + "stage": 3, + "offload_optimizer": { + "device": "cpu", + "pin_memory": true + }, + "offload_param": { + "device": "cpu", + "pin_memory": true + }, + "overlap_comm": true, + "contiguous_gradients": true, + "sub_group_size": 1e9, + "reduce_bucket_size": 5e8, + "stage3_prefetch_bucket_size": 5e8, + "stage3_param_persistence_threshold": 1e6, + "stage3_max_live_parameters": 1e9, + "stage3_max_reuse_distance": 1e9, + "stage3_gather_16bit_weights_on_model_save": true, + "round_robin_gradients": true + }, + "gradient_accumulation_steps": "auto", + "gradient_clipping": "auto", + "steps_per_print": 10, + "train_batch_size": "auto", + "train_micro_batch_size_per_gpu": "auto", + "wall_clock_breakdown": false +} \ No newline at end of file diff --git a/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/lora.json b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/lora.json new file mode 100644 index 0000000000..b953a4c984 --- /dev/null +++ b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/lora.json @@ -0,0 +1,11 @@ +{ + "r": 8, + "lora_alpha": 16, + "lora_dropout": 0.05, + "target_modules": ["q_proj", "v_proj", "k_proj", "o_proj", "gate_proj", "up_proj", "down_proj", "embed_tokens", "lm_head"], + "task_type": "CAUSAL_LM", + "modules_to_save": [], + "bias": "none", + "fan_in_fan_out": false, + "init_lora_weights": true +} \ No newline at end of file diff --git a/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/ray_finetune_llm_deepspeed.ipynb b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/ray_finetune_llm_deepspeed.ipynb new file mode 100644 index 0000000000..d81da032e7 --- /dev/null +++ b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/ray_finetune_llm_deepspeed.ipynb @@ -0,0 +1,219 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 36, + "id": "204a90fb-da94-426a-8c0c-3a0c61b01086", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from codeflare_sdk.cluster.cluster import Cluster, ClusterConfiguration\n", + "from codeflare_sdk.cluster.auth import TokenAuthentication\n", + "import os\n", + "import sys" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fc30c26b-d439-4d74-b3fe-d9e84db29a1b", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Create the training and evaluation datasets.\n", + "# This can be run only once.\n", + "!{sys.executable} -m pip install datasets\n", + "import create_dataset\n", + "create_dataset.gsm8k_qa_no_tokens_template()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "440b98a3-8ed3-4072-9cc2-763d0e6c6f00", + "metadata": {}, + "outputs": [], + "source": [ + "# Authenticate the CodeFlare SDK\n", + "# On OpenShift, you can retrieve the token by running `oc whoami -t`,\n", + "# and the server with `oc cluster-info`.\n", + "auth = TokenAuthentication(\n", + " token = '',\n", + " server = '',\n", + " skip_tls=False\n", + ")\n", + "auth.login()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4d340f86-1a04-48d3-a5e7-067faecfc17a", + "metadata": {}, + "outputs": [], + "source": [ + "# Configure the Ray cluster\n", + "cluster = Cluster(ClusterConfiguration(\n", + " name='ray',\n", + " namespace='ray-finetune-llm-deepspeed',\n", + " num_workers=7,\n", + " worker_cpu_requests=16,\n", + " worker_cpu_limits=16,\n", + " head_cpus=16,\n", + " worker_memory_requests=128,\n", + " worker_memory_limits=256,\n", + " head_memory=128,\n", + " # Use the following parameters with NVIDIA GPUs\n", + " image=\"quay.io/rhoai/ray:2.35.0-py39-cu121-torch24-fa26\",\n", + " head_extended_resource_requests={'nvidia.com/gpu':1},\n", + " worker_extended_resource_requests={'nvidia.com/gpu':1},\n", + " # Or replace them with these parameters for AMD GPUs\n", + " # image=\"quay.io/rhoai/ray:2.35.0-py39-rocm61-torch24-fa26\",\n", + " # head_extended_resource_requests={'amd.com/gpu':1},\n", + " # worker_extended_resource_requests={'amd.com/gpu':1},\n", + "))" + ] + }, + { + "cell_type": "code", + "execution_count": 30, + "id": "cee11013-8646-4cda-94a2-f8e731baa1ca", + "metadata": {}, + "outputs": [], + "source": [ + "# Create the Ray cluster\n", + "cluster.up()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "709a09df-3871-4791-9763-5dcdc081bec4", + "metadata": {}, + "outputs": [], + "source": [ + "cluster.wait_ready()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "24ccda92-a0f0-4845-a13c-6aa735e75d5a", + "metadata": {}, + "outputs": [], + "source": [ + "cluster.details()" + ] + }, + { + "cell_type": "code", + "execution_count": 40, + "id": "fb29e733-eac5-4f3d-bbfa-543e8ee7fd1b", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Initialize the Job Submission Client\n", + "client = cluster.job_client" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "086f3337", + "metadata": {}, + "outputs": [], + "source": [ + "# The S3 bucket where to store checkpoint.\n", + "# It can be set manually, otherwise it's retrieved from configured the data connection.\n", + "s3_bucket = ''\n", + "if not s3_bucket:\n", + " s3_bucket = os.environ.get('AWS_S3_BUCKET')\n", + "assert s3_bucket, \"An S3 bucket must be provided to store checkpoints\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ae2be5d8-66c7-46e2-ba3b-fa2f8a03b27f", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "submission_id = client.submit_job(\n", + " entrypoint=\"python ray_finetune_llm_deepspeed.py \"\n", + " \"--model-name=meta-llama/Meta-Llama-3.1-8B \"\n", + " \"--lora \"\n", + " \"--num-devices=8 \"\n", + " \"--num-epochs=3 \"\n", + " \"--ds-config=./deepspeed_configs/zero_3_offload_optim_param.json \"\n", + " f\"--storage-path=s3://{s3_bucket}/ray_finetune_llm_deepspeed/ \"\n", + " \"--batch-size-per-device=32 \"\n", + " \"--eval-batch-size-per-device=32 \",\n", + " runtime_env={\n", + " \"env_vars\": {\n", + " 'AWS_ACCESS_KEY_ID': os.environ.get('AWS_ACCESS_KEY_ID'),\n", + " 'AWS_SECRET_ACCESS_KEY': os.environ.get('AWS_SECRET_ACCESS_KEY'),\n", + " 'AWS_DEFAULT_REGION': os.environ.get('AWS_DEFAULT_REGION')\n", + " },\n", + " 'pip': 'requirements.txt',\n", + " 'working_dir': './',\n", + " \"excludes\": [\"/docs/\", \"*.ipynb\", \"*.md\"]\n", + " },\n", + ")\n", + "print(submission_id)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8476f19b-1d51-44f5-8889-c5b01ed36343", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "client.stop_job(submission_id)" + ] + }, + { + "cell_type": "code", + "execution_count": 42, + "id": "f456f161-5122-4057-a5ac-f7f6b38651ec", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "cluster.down()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3.9", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.16" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/ray_finetune_llm_deepspeed.py b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/ray_finetune_llm_deepspeed.py new file mode 100644 index 0000000000..81e118a49b --- /dev/null +++ b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/ray_finetune_llm_deepspeed.py @@ -0,0 +1,743 @@ +import argparse +from filelock import FileLock +import functools +import json +import math +import os +from pathlib import Path +import tempfile +import time +import tree +from typing import Tuple +import urllib +from urllib.parse import urljoin + +try: + import deepspeed # noqa: F401 +except ImportError as e: + raise RuntimeError( + "Please install deepspeed with `pip install --user deepspeed`." + ) from e + +from accelerate import Accelerator, DeepSpeedPlugin +from accelerate.utils import DummyOptim, DummyScheduler, set_seed +import torch +import torch.nn as nn +import tqdm +from transformers import ( + AutoModelForCausalLM, + AutoTokenizer, + get_linear_schedule_with_warmup, +) + +from peft import LoraConfig, get_peft_model +import ray +from ray import train +import ray.util.scheduling_strategies +from ray.train.torch import TorchTrainer +from ray.train import Checkpoint + +from utils import ( + get_checkpoint_and_refs_dir, + get_mirror_link, + download_model, + get_download_path, +) + +urllib.parse.uses_relative.append("s3") +urllib.parse.uses_netloc.append("s3") + +OPTIM_BETAS = (0.9, 0.999) +OPTIM_EPS = 1e-8 +NUM_WARMUP_STEPS = 10 +OPTIM_WEIGHT_DECAY = 0.0 +ATTENTION_LAYER_NAME = "self_attn" + + +def get_expected_lora_num_parameters( + model, lora_config: LoraConfig, attn_layer_name: str = ATTENTION_LAYER_NAME +): + """Calculate the expected number of parameters for lora fine-tuning.""" + sum_params = 0 + num_attention_layers = 0 + modules = model.named_modules() + loraified_modules = 0 + # We calculate the number of parameters we need for lora fine-tuning by calculating + # the sizes of the decomposed weight matrices according to the paper. + for full_name, target in modules: + layer_name = full_name.split(".")[-1] + + if layer_name == attn_layer_name: + # Detected another attention layer (for example, llama 2 70b should have 80 + # of these) + num_attention_layers += 1 + elif layer_name in lora_config.modules_to_save: + # Detect another non-lora module to save, which will also contribute to the + # number of check-pointed parameters. This will result in one set of + # trainable parameters ".original_module.weight" and another one with + # ".modules_to_save.default.weight" + # Therefore, each layer contributes 2 x the number of actual elements in + # that layer. + sum_params += 2 * target.weight.numel() + print( + "Found non-lora-layer to checkpoint: ", + layer_name, + " with num params ", + target.weight.numel(), + ) + else: + for module_name in lora_config.target_modules: + if layer_name == module_name: + loraified_modules += 1 + if isinstance(target, nn.Linear): + # Target is attention weight + sum_params += ( + target.in_features + target.out_features + ) * lora_config.r + elif isinstance(target, nn.Embedding): + # Target is linear weight + sum_params += ( + target.embedding_dim + target.num_embeddings + ) * lora_config.r + + print( + f"Detected {num_attention_layers} attention layers, containing" + f" {loraified_modules} modules to modify according to LoRA's `target_modules`." + f" This should yield {sum_params} trainable parameters." + ) + + return sum_params + + +def get_number_of_params(model: nn.Module): + sum = 0 + for name, param in model.named_parameters(): + if param.requires_grad: + sum += param.numel() + return sum + + +def collate_fn(batch, tokenizer, block_size, device): + out_batch = tokenizer( + list(map(lambda m: tokenizer.apply_chat_template(m, + tokenize=False, + add_generation_prompt=False, + add_special_tokens=False), + batch["messages"])), + padding="max_length", + max_length=block_size, + truncation=True, + return_tensors="pt", + ) + out_batch["labels"] = out_batch["input_ids"].clone() + + out_batch = tree.map_structure(lambda x: x.to(device), out_batch) + + return out_batch + + +def get_pretrained_path(model_id: str): + mirror_uri = get_mirror_link(model_id) + ckpt_path, _ = get_checkpoint_and_refs_dir( + model_id=model_id, bucket_uri=mirror_uri, s3_sync_args=["--no-sign-request"] + ) + return ckpt_path + + +def get_tokenizer(model_name, **kwargs): + pretrained_path = get_pretrained_path(model_name) + # Context for legacy=True: https://github.com/huggingface/transformers/issues/25176 + tokenizer = AutoTokenizer.from_pretrained(pretrained_path, legacy=True) + tokenizer.pad_token = tokenizer.eos_token + if kwargs.get("special_tokens", None): + tokenizer.add_tokens(kwargs.get("special_tokens"), special_tokens=True) + if kwargs.get("chat_template", None): + tokenizer.chat_template = kwargs.get("chat_template") + + return tokenizer + + +def evaluate( + *, model, eval_ds, accelerator, bsize, ds_kwargs, as_test: bool = False +) -> Tuple[float, float]: + model.eval() + losses = [] + + eval_dataloader = eval_ds.iter_torch_batches(batch_size=bsize, **ds_kwargs) + eval_ds_len = len(list(eval_ds.iter_batches(batch_size=1))) + for step, batch in tqdm.tqdm( + enumerate(eval_dataloader), total=eval_ds_len // (bsize + 1) + ): + with torch.no_grad(): + outputs = model(**batch) + + loss = outputs.loss + # The tensors are gathered by concatenating them on the first dimension, so we + # add a new dimension to the scalar loss to get a tensor of shape (K,) for K + # workers. + losses.append(accelerator.gather(loss[None])) + + if as_test: + break + + # We stack losses so that we have a tensor of shape (T, K) where T is the number of + # steps and K is the number of workers. + losses = torch.stack(losses) + try: + eval_loss = torch.mean(losses).item() + perplexity = math.exp(eval_loss) + except OverflowError: + perplexity = float("inf") + return perplexity, eval_loss + + +def _test_tokenizer(model_name): + # This function tests that adding special tokens does not + # result in un-expected tokenization + # Context: https://github.com/huggingface/transformers/issues/25176 + tokenizer = get_tokenizer(model_name=model_name, special_tokens=[""]) + testoutput = tokenizer("inform")["input_ids"] + expected = tokenizer("inform")["input_ids"] + assert testoutput[-1] == expected[-1], ( + "The tokenizer is not working as expected with special tokens, " + f"testoutput={testoutput}, expected={expected}" + ) + + +def checkpoint_model( + checkpoint_folder, ckpt_id, model, epoch, last_global_step, **kwargs +): + """Utility function for checkpointing model + optimizer dictionaries + The main purpose for this is to be able to resume training from that instant again. + """ + checkpoint_state_dict = { + "epoch": epoch, + "last_global_step": last_global_step, + } + # Add extra kwargs too + checkpoint_state_dict.update(kwargs) + + # In here model will be a DeepspeedEngine object + model.save_checkpoint(checkpoint_folder, ckpt_id, checkpoint_state_dict) + status_msg = ( + f"checkpointing: checkpoint_folder={checkpoint_folder}, ckpt_id={ckpt_id}" + ) + print(status_msg) + + +def training_function(kwargs: dict): + print("training_function called") + + # Train has a bug somewhere that causes ACCELERATE_TORCH_DEVICE to not be set + # properly on multi-gpu nodes + cuda_visible_device = os.environ["CUDA_VISIBLE_DEVICES"].split(",") + local_rank = int(os.environ["LOCAL_RANK"]) + device_id = cuda_visible_device[local_rank] + os.environ["ACCELERATE_TORCH_DEVICE"] = f"cuda:{device_id}" + + config = kwargs["config"] + args = argparse.Namespace(**kwargs["args"]) + chat_template = kwargs.get("chat_template", []) + special_tokens = kwargs.get("special_tokens", []) + model_id = config["model_name"] + + # We need to download the model weights on this machine if they don't exit. + # We need to acquire a lock to ensure that only one process downloads the model + bucket_uri = get_mirror_link(model_id) + download_path = get_download_path(model_id) + base_path = Path(download_path).parent + base_path.mkdir(parents=True, exist_ok=True) + lock_file = str(base_path / f'{model_id.replace("/", "--")}.lock') + with FileLock(lock_file): + download_model( + model_id=model_id, bucket_uri=bucket_uri, s3_sync_args=["--no-sign-request"] + ) + + # Sample hyperparameters for learning rate, batch size, seed and a few other HPs + lr = config["lr"] + num_epochs = int(config["num_epochs"]) + seed = int(config["seed"]) + batch_size = int(config["batch_size"]) + gradient_accumulation_steps = int(config["gradient_accumulation_steps"]) + + # Get deepspeed config to set up the batch size per device + ds_plugin = config["ds_plugin"] + ds_plugin.hf_ds_config.config["train_micro_batch_size_per_gpu"] = batch_size + + # Initialize accelerator + accelerator = Accelerator( + deepspeed_plugin=ds_plugin, + gradient_accumulation_steps=gradient_accumulation_steps, + mixed_precision=args.mx, + ) + + set_seed(seed) + + # train_ds is the local shard for this model + train_ds = train.get_dataset_shard("train") + valid_ds = train.get_dataset_shard("valid") + + train_ds_len = len(list(train_ds.iter_batches(batch_size=1))) + + _test_tokenizer(args.model_name) + tokenizer = get_tokenizer(model_name=args.model_name, + chat_template=chat_template, + special_tokens=special_tokens) + collate_partial = functools.partial( + collate_fn, + tokenizer=tokenizer, + block_size=config["block_size"], + device=accelerator.device, + ) + + pretrained_path = get_pretrained_path(model_id) + print(f"Loading model from {pretrained_path} ...") + s = time.time() + model = AutoModelForCausalLM.from_pretrained( + pretrained_path, + trust_remote_code=True, + torch_dtype=torch.bfloat16, + # `use_cache=True` is incompatible with gradient checkpointing. + use_cache=False, + attn_implementation="flash_attention_2", + ) + print(f"Done loading model in {time.time() - s} seconds.") + + model.resize_token_embeddings(len(tokenizer)) + + if config["lora"]: + # Apply LoRA + s = time.time() + lora_config = LoraConfig(**config["lora_config"]) + + expected_num_parameters = get_expected_lora_num_parameters( + lora_config=lora_config, model=model + ) + + print(f"Attempting to apply LoRA config: {lora_config}") + + model.enable_input_require_grads() + model = get_peft_model(model, lora_config) + + num_parameters = get_number_of_params(model) + + if num_parameters != expected_num_parameters: + raise ValueError( + f"Expected {expected_num_parameters} parameters, got {num_parameters} " + f"parameters. LoRA-ification failed." + ) + + print( + f"LoRA-ification done in {time.time() - s} seconds. Estimated checkpoint " + f"size (fp16): {num_parameters * 2 / 1e6} MB" + ) + + print(f"Number of check-pointed parameters: {get_number_of_params(model)}") + + print("Model initialized with pretrained weights. Training starting...") + if not args.no_grad_ckpt: + model.gradient_checkpointing_enable() + + optimizer_cls = ( + torch.optim.AdamW + if accelerator.state.deepspeed_plugin is None + or "optimizer" not in accelerator.state.deepspeed_plugin.deepspeed_config + else DummyOptim + ) + + optimizer = optimizer_cls( + model.parameters(), + lr=lr, + betas=OPTIM_BETAS, + weight_decay=OPTIM_WEIGHT_DECAY, + eps=OPTIM_EPS, + ) + + # Instantiate scheduler + # Creates Dummy Scheduler if `scheduler` was specified in the config file or + # else, creates `args.lr_scheduler_type` Scheduler + # get train and valid dataset lengths + + num_steps_per_epoch = math.ceil(train_ds_len / args.batch_size_per_device) + total_training_steps = ( + num_steps_per_epoch * num_epochs // gradient_accumulation_steps + ) + + if ( + accelerator.state.deepspeed_plugin is None + or "scheduler" not in accelerator.state.deepspeed_plugin.deepspeed_config + ): + lr_scheduler = get_linear_schedule_with_warmup( + optimizer=optimizer, + num_warmup_steps=NUM_WARMUP_STEPS * args.num_devices, + num_training_steps=total_training_steps * args.num_devices, + ) + else: + lr_scheduler = DummyScheduler( + optimizer, + warmup_num_steps=NUM_WARMUP_STEPS * args.num_devices, + total_num_steps=total_training_steps * args.num_devices, + ) + + # Prepare everything + # There is no specific order to remember, we just need to unpack the objects in the + # same order we gave them to the prepare method. + s = time.time() + model, optimizer, lr_scheduler = accelerator.prepare(model, optimizer, lr_scheduler) + print(f"Prepare done in {time.time() - s} seconds.") + + # Now we train the model + if accelerator.is_main_process: + print("Starting training ...") + print("Number of batches on main process", train_ds_len // batch_size) + + for epoch in range(num_epochs): + fwd_time_sum, bwd_time_sum, optim_step_time_sum = 0, 0, 0 + s_epoch = time.time() + model.train() + loss_sum = torch.tensor(0.0).to(accelerator.device) + + train_dataloader = train_ds.iter_torch_batches( + batch_size=batch_size, + collate_fn=collate_partial, + ) + + for step, batch in tqdm.tqdm( + enumerate(train_dataloader), total=train_ds_len // batch_size + 1 + ): + + # We could avoid this line since we set the accelerator with + # `device_placement=True`. + with accelerator.accumulate(model): + s_fwd = time.time() + outputs = model(**batch) + loss = outputs.loss + loss_sum += loss.item() + e_fwd = time.time() + fwd_time = e_fwd - s_fwd + fwd_time_sum += fwd_time + s_bwd = time.time() + accelerator.backward(loss) + e_bwd = time.time() + bwd_time = e_bwd - s_bwd + bwd_time_sum += bwd_time + + s_opt_step = time.time() + optimizer.step() + lr_scheduler.step() + optimizer.zero_grad() + e_opt_step = time.time() + optim_step_time_sum += e_opt_step - s_opt_step + + if accelerator.is_main_process: + accelerator.print( + f"[epoch {epoch} step {step}] " + f"loss: {loss.item()} step-time: {e_opt_step - s_fwd}" + ) + + aggregated_loss = torch.mean(accelerator.gather(loss[None])).item() + + if config["as_test"]: + break + + # as long as this is not the last step report here + if step != (train_ds_len // batch_size - 1): + train.report( + { + "epoch": epoch, + "iteration": step, + "train_loss_batch": aggregated_loss, + "avg_train_loss_epoch": None, + "eval_loss": None, + "perplexity": None, + "num_iterations": step + 1, + "train_time_per_epoch": None, + "eval_time_per_epoch": None, + "fwd_time": fwd_time, + "bwd_time": bwd_time, + "avg_fwd_time_per_epoch": None, + "avg_bwd_time_per_epoch": None, + "learning_rate": lr_scheduler.get_lr()[0], + } + ) + + e_epoch = time.time() + accelerator.print("Train time per epoch: ", e_epoch - s_epoch) + + eval_s_epoch = time.time() + print("Running evaluation ...") + perplex, eloss = evaluate( + model=model, + eval_ds=valid_ds, + accelerator=accelerator, + bsize=config["eval_batch_size"], + ds_kwargs={"collate_fn": collate_partial}, + as_test=config["as_test"], + ) + accelerator.print("Eval result loss", eloss) + accelerator.print("Eval perplex", perplex) + + eval_e_epoch = time.time() + accelerator.print("Eval time per epoch: ", eval_e_epoch - eval_s_epoch) + accelerator.print("avg fwd time: ", fwd_time_sum / (step + 1)) + accelerator.print("avg bwd time: ", bwd_time_sum / (step + 1)) + accelerator.print("avg opt step time: ", optim_step_time_sum / (step + 1)) + + metrics = { + "epoch": epoch, + "iteration": step, + "train_loss_batch": aggregated_loss, + "avg_train_loss_epoch": loss_sum.item() / (step + 1), + "eval_loss": eloss, + "perplexity": perplex, + "num_iterations": step + 1, + "train_time_per_epoch": e_epoch - s_epoch, + "eval_time_per_epoch": eval_e_epoch - eval_s_epoch, + "fwd_time": fwd_time, + "bwd_time": bwd_time, + "avg_fwd_time_per_epoch": fwd_time_sum / (step + 1), + "avg_bwd_time_per_epoch": bwd_time_sum / (step + 1), + "learning_rate": lr_scheduler.get_lr()[0], + } + + with tempfile.TemporaryDirectory(dir=args.output_dir) as temp_checkpoint_dir: + accelerator.print(f"Saving the model locally at {temp_checkpoint_dir}") + accelerator.wait_for_everyone() + + checkpoint_save_start = time.perf_counter() + + if accelerator.is_main_process: + print("Saving tokenizer and config.") + tokenizer.save_pretrained(temp_checkpoint_dir) + + accelerator.wait_for_everyone() + + # Checkpointing strategy 1: Distributed checkpointing + # This checkpointing method makes deepspeed checkpoints on each node + # and then Ray Train will aggregate them to a central s3 bucket. + # It should be done on all processes (not just the Rank 0) + # aggregate_on_rank_0 = False + # checkpoint_model( + # checkpoint_folder=tempdir, + # ckpt_id=epoch, + # model=model, + # epoch=epoch, + # last_global_step=step + # ) + + # Checkpointing strategy 2: Aggregate model on the rank 0 worker then upload + aggregate_on_rank_0 = True + unwrapped_model = accelerator.unwrap_model(model) + unwrapped_model.save_pretrained( + temp_checkpoint_dir, + is_main_process=accelerator.is_main_process, + save_function=accelerator.save, + safe_serialization=True, + state_dict=accelerator.get_state_dict(model), + ) + accelerator.wait_for_everyone() + print("Checkpoint save time: ", time.perf_counter() - checkpoint_save_start) + + checkpoint_upload_start = time.perf_counter() + + # Create the checkpoint object to report to Ray Train and upload to storage. + # If we aggregated the model on rank 0, we only need to report + # the checkpoint from the rank 0 worker, since all other checkpoint + # directories are empty (`save_pretrained` was a noop for other workers). + if aggregate_on_rank_0: + checkpoint = ( + Checkpoint.from_directory(temp_checkpoint_dir) + if accelerator.is_main_process + else None + ) + else: + # Distributed checkpointing should upload shards from each worker. + checkpoint = Checkpoint.from_directory(temp_checkpoint_dir) + + # Note: After `train.report`, in the case of remote storage, + # the checkpoint directory will be uploaded to the remote storage. + train.report(metrics, checkpoint=checkpoint) + + print( + "Checkpoint upload time: ", + time.perf_counter() - checkpoint_upload_start, + ) + print( + "Total checkpointing time: ", + time.perf_counter() - checkpoint_save_start, + ) + + if perplex < args.stop_perplexity: + print(f"Perplexity reached {perplex} < {args.stop_perplexity}. Stopping.") + break + + if config["as_test"]: + break + + +def parse_args(): + parser = argparse.ArgumentParser(description="LLM fine-tuning with DeepSpeed") + + parser.add_argument("--model-name", type=str, default="meta-llama/Meta-Llama-3.1-8B") + + parser.add_argument("--train-path", type=str, default="./data/train.jsonl", + help="Path to training jsonl file") + + parser.add_argument("--test-path", type=str, default="./data/test.jsonl", + help="Path to testing jsonl file") + + parser.add_argument("--dataset-config", type=str, default="./data/config.json", + help="Path to the config file") + + parser.add_argument("--num-devices", "-nd", type=int, default=4, + help="Number of devices to use.") + + parser.add_argument("--mx", type=str, choices=["no", "fp16", "bf16", "fp8"], default="bf16", + help="Whether to use mixed precision. Choose between fp16 and bf16 (bfloat16). " + "Bf16 requires PyTorch >= 1.10 and an Nvidia Ampere GPU.") + + parser.add_argument("--ds-config", type=str, default="./deepspeed_configs/zero_3_offload_optim_param.json", + help="Deepspeed config json to use.") + + parser.add_argument("--lora", action="store_true", default=False, + help="If passed, will enable parameter efficient fine-tuning with LoRA.") + + parser.add_argument("--lora-config", type=str, default="./lora_configs/lora.json", + help="Lora config json to use.") + + parser.add_argument("--num-epochs", type=int, default=1, + help="Number of epochs to train for.") + + parser.add_argument("--lr", type=float, default=1e-4, + help="Learning rate to use.") + + parser.add_argument("--ctx-len", type=int, default=512, + help="Maximum context length for the model input sequences.") + + parser.add_argument("--batch-size-per-device", "-bs", type=int, default=16, + help="Batch size to use per device.") + + parser.add_argument("--eval-batch-size-per-device", type=int, default=64, + help="Batch size to use per device (For evaluation).") + + parser.add_argument("--grad-accum", type=int, default=1, + help="Gradient accumulation steps.") + + parser.add_argument("--output-dir", type=str, default="/tmp", + help="Path to output directory.") + + parser.add_argument("--storage-path", type=str, + help="Path to results and checkpoints storage") + + parser.add_argument("--no-grad-ckpt", action="store_true", + help="If passed, will not use gradient checkpointing.") + + parser.add_argument("--num-checkpoints-to-keep", type=int, default=1, + help="Number of checkpoints to keep, if None, all checkpoints will be kept, " + "if set to n>=1, the top n checkpoint with min. evaluation perplexity " + "will be kept.") + + parser.add_argument("--stop-perplexity", type=float, default=0, + help="Target perplexity to reach after which to stop training. Default is 0. " + "If 0, training will not stop on perplexity.") + + parser.add_argument("--as-test", action="store_true", + help="If passed, will run the script in test mode.") + + args = parser.parse_args() + + return args + + +def main(): + args = parse_args() + + if not args.output_dir: + raise ValueError("--output-dir must be specified") + + # update the config with args so that we have access to them. + config = vars(args) + config.update( + **{ + "lr": args.lr, + "num_epochs": args.num_epochs, + "seed": 42, + "batch_size": args.batch_size_per_device, + "gradient_accumulation_steps": args.grad_accum, + "model_name": args.model_name, + "block_size": args.ctx_len, + "eval_batch_size": args.eval_batch_size_per_device, + } + ) + + # Add LoRA config if needed + if args.lora: + with open(args.lora_config, "r") as json_file: + lora_config = json.load(json_file) + config["lora_config"] = lora_config + + # Add deepspeed plugin to the config + ds_plugin = DeepSpeedPlugin(hf_ds_config=config.get("ds_config")) + config.update(ds_plugin=ds_plugin) + + ray.init() + + # Read data + train_ds = ray.data.read_json(args.train_path) + if args.test_path is not None: + valid_ds = ray.data.read_json(args.test_path) + else: + valid_ds = None + + # Config file + chat_template = None + special_tokens = None + if os.path.isfile(args.dataset_config): + with open(args.dataset_config, "r") as json_file: + dataset_config = json.load(json_file) + chat_template = dataset_config.get("chat_template", None) + special_tokens = dataset_config.get("special_tokens", None) + + trial_name = f"{args.model_name}".split("/")[-1] + if args.lora: + trial_name += "-lora" + + trainer = TorchTrainer( + training_function, + train_loop_config={ + "config": config, + "args": vars(args), + "chat_template": chat_template, + "special_tokens": special_tokens, + }, + run_config=train.RunConfig( + storage_path=urljoin(args.storage_path, args.model_name.split("/")[-1]), + checkpoint_config=train.CheckpointConfig( + num_to_keep=args.num_checkpoints_to_keep, + checkpoint_score_attribute="perplexity", + checkpoint_score_order="min", + ), + ), + scaling_config=train.ScalingConfig( + num_workers=args.num_devices, + use_gpu=True, + resources_per_worker={"GPU": 1}, + ), + datasets={"train": train_ds, "valid": valid_ds}, + dataset_config=ray.train.DataConfig(datasets_to_split=["train", "valid"]), + ) + + result: train.Result = trainer.fit() + # `best_checkpoints` are sorted in increasing score order. + # (Ex: in this case, negative perplexity, since we set `checkpoint_score_order=min`) + best_checkpoint, best_checkpoint_metrics = result.best_checkpoints[-1] + + print("Results are stored at:") + print(result.path) + print("Best checkpoint is stored at:") + print(best_checkpoint) + print(f"With perplexity: {best_checkpoint_metrics['perplexity']}") + + +if __name__ == "__main__": + main() diff --git a/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/ray_test.py b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/ray_test.py new file mode 100644 index 0000000000..ff2750d5bc --- /dev/null +++ b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/ray_test.py @@ -0,0 +1,41 @@ +from codeflare_sdk.cluster.cluster import Cluster, ClusterConfiguration +from codeflare_sdk.cluster.auth import TokenAuthentication +import os +import sys + + +from codeflare_sdk import KubeConfigFileAuthentication + +auth = KubeConfigFileAuthentication(kube_config_path="/tmp/kubeconfig") +auth.load_kube_config() + + +# Configure the Ray cluster +cluster = Cluster(ClusterConfiguration( + name='ray', + namespace='fine-tuning-testing', + num_workers=7, + worker_cpu_requests=16, + worker_cpu_limits=16, + head_cpu_requests=16, + head_cpu_limits=16, + worker_memory_requests=120, + worker_memory_limits=256, + head_memory_requests=100, + head_memory_limits=120, + # Use the following parameters with NVIDIA GPUs + image="quay.io/rhoai/ray:2.35.0-py39-cu121-torch24-fa26", + head_extended_resource_requests={'nvidia.com/gpu':1}, + worker_extended_resource_requests={'nvidia.com/gpu':1}, + + # Or replace them with these parameters for AMD GPUs + # image="quay.io/rhoai/ray:2.35.0-py39-rocm61-torch24-fa26", + # head_extended_resource_requests={'amd.com/gpu':1}, + # worker_extended_resource_requests={'amd.com/gpu':1}, +)) +try: + cluster.down() +except: pass + +cluster.up() +cluster.wait_ready() diff --git a/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/requirements.txt b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/requirements.txt new file mode 100644 index 0000000000..28d0dea8dd --- /dev/null +++ b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/requirements.txt @@ -0,0 +1,4 @@ +accelerate==0.31.0 +datasets==2.19.2 +peft==0.11.1 +transformers==4.44.0 diff --git a/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/training.py b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/training.py new file mode 100644 index 0000000000..3c05a09167 --- /dev/null +++ b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/training.py @@ -0,0 +1,132 @@ +import torch +import torch.distributed + +from torch.nn.parallel import DistributedDataParallel as DDP +from datasets import load_dataset +from peft import get_peft_model, LoraConfig +from transformers import ( + AutoModelForCausalLM, + AutoTokenizer, + TrainingArguments, + Trainer, + DataCollatorForLanguageModeling, +) + + +TASK_TYPE = "CAUSAL_LLM" +MODEL_NAME_OR_PATH = "/mnt/storage/model/granite-3b-code-instruct" +TOKENIZER_NAME_OR_PATH = None +DATASET_PATH = "/mnt/output/dataset.json" +CACHE_DIR = "./cache_dir" +OUTPUT_DIR = "/mnt/output/fine-tuning" +MAX_SEQ_LENGTH = 1024 + + +def train_func(): + # https://www.kubeflow.org/docs/components/training/getting-started/ + # setup Pytorch DDP. WORLD_SIZE and RANK environments will be set by Training Operator + torch.distributed.init_process_group(backend="nccl") + world_size = torch.distributed.get_world_size() + rank = torch.distributed.get_rank() + + # setup the device + device = torch.device( + f"cuda:{rank}" if torch.cuda.is_available() else "cpu" + ) # or hardcode it to cuda + torch.cuda.set_device(device) + + # define peft config (XXX: hardcoded for now) + peft_config = LoraConfig( + r=8, + lora_alpha=16, + lora_dropout=0.1, + bias="none", + target_modules=["q_proj", "k_proj"], + task_type=TASK_TYPE, + ) + + # load pre-trained model + model = AutoModelForCausalLM.from_pretrained( + MODEL_NAME_OR_PATH, + cache_dir=CACHE_DIR, + attn_implementation="flash_attention_2", # XXX: make this conditional + ) + + # wrap the model with LoRA: https://huggingface.co/docs/peft/main/conceptual_guides/lora + model = get_peft_model(model, peft_config) + # model.print_trainable_parameters() + + model = model.to(device) + model = DDP(model, device_ids=[rank]) + + # load dataset and tokenizer + dataset = load_dataset("json", data_files=DATASET_PATH, split="train") + tokenizer = AutoTokenizer.from_pretrained( + (TOKENIZER_NAME_OR_PATH if TOKENIZER_NAME_OR_PATH else MODEL_NAME_OR_PATH), + cache_dir=CACHE_DIR, + use_fast=True, + torch_dtype=torch.float16, + ) + + # XXX: tokenizer might be missing special tokens + if tokenizer.pad_token is None: + tokenizer.add_special_tokens({"pad_token": ""}) + + def preprocess_func(samples): + result = tokenizer( + samples["output"], + truncation=True, + padding="max_length", + max_length=MAX_SEQ_LENGTH, + return_tensors="pt", + ) + + for key, value in result.items(): + result[key] = value.tolist() + + return result + + tokenized_dataset = dataset.map( + preprocess_func, batched=True, remove_columns=dataset.column_names + ) + + # to handle dynamic padding during training + data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False) + + # define training arguments + training_args = TrainingArguments( + output_dir=OUTPUT_DIR, + learning_rate=1e-3, + per_device_train_batch_size=4, + per_device_eval_batch_size=4, + num_train_epochs=2, + weight_decay=0.01, + save_strategy="epoch", + remove_unused_columns=False, # to solve error: No columns in the dataset match the model's forward method signature. The following columns have been ignored: [input_ids, attention_mask, output]. + fp16=True, + ) + + # init trainer + trainer = Trainer( + model=model, + args=training_args, + train_dataset=tokenized_dataset, + data_collator=data_collator, + tokenizer=tokenizer, + ) + + trainer.train() + + # cleanup + torch.distributed.destroy_process_group() + + if rank == 0: + print("LoRA fine tuning completed") + trainer.save_model("lora_model") + + # to run inference: + # model = model.merge_and_unload() + # model.save_pretrained("merged_lora_model") + + +train_func() diff --git a/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/utils.py b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/utils.py new file mode 100644 index 0000000000..43bab9076f --- /dev/null +++ b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/utils.py @@ -0,0 +1,36 @@ +from typing import List, Optional +import os +import subprocess +import logging + +logger = logging.getLogger(__name__) + + + + +def get_checkpoint_and_refs_dir( + model_id: str, + bucket_uri: str, + s3_sync_args: Optional[List[str]] = None, + mkdir: bool = False, +) -> str: + + + return model_id, "not used" + + +def get_download_path(model_id: str): + return model_id + + +def download_model( + model_id: str, + bucket_uri: str, + s3_sync_args: Optional[List[str]] = None, + tokenizer_only: bool = False, +) -> None: + pass + + +def get_mirror_link(model_id: str) -> str: + return "not used" diff --git a/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/zero_3_offload_optim_param.json b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/zero_3_offload_optim_param.json new file mode 100644 index 0000000000..8fa6aecccd --- /dev/null +++ b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/files/ray-finetune-llm-deepspeed/zero_3_offload_optim_param.json @@ -0,0 +1,32 @@ +{ + "fp16": { + "enabled": false + }, + "bf16": { + "enabled": true + }, + "zero_optimization": { + "stage": 3, + "offload_optimizer": { + "device": "cpu", + "pin_memory": false + }, + "offload_param": { + "device": "cpu", + "pin_memory": false + }, + "overlap_comm": true, + "contiguous_gradients": true, + "reduce_bucket_size": "auto", + "stage3_prefetch_bucket_size": "auto", + "stage3_param_persistence_threshold": "auto", + "gather_16bit_weights_on_model_save": true, + "round_robin_gradients": true + }, + "gradient_accumulation_steps": "auto", + "gradient_clipping": "auto", + "steps_per_print": 10, + "train_batch_size": "auto", + "train_micro_batch_size_per_gpu": "auto", + "wall_clock_breakdown": false +} diff --git a/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/meta/main.yml b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/meta/main.yml new file mode 100644 index 0000000000..bd0638df3e --- /dev/null +++ b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/meta/main.yml @@ -0,0 +1,3 @@ +--- +dependencies: + - role: check_deps diff --git a/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/tasks/main.yml b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/tasks/main.yml new file mode 100644 index 0000000000..6cd2f93098 --- /dev/null +++ b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/tasks/main.yml @@ -0,0 +1,296 @@ +--- +- name: Create the src directory + file: + path: "{{ artifact_extra_logs_dir }}/src/" + state: directory + mode: '0755' + +- name: Create the artifacts directory + file: + path: "{{ artifact_extra_logs_dir }}/artifacts/" + state: directory + mode: '0755' + when: not fine_tuning_ray_fine_tuning_job_prepare_only + +- name: Make the name k8s safe + set_fact: + job_name_safe: "{{ fine_tuning_ray_fine_tuning_job_name | replace('.', '-') | replace('_', '-') }}" + +- name: Delete the fine-tuning job configmaps, if any + command: + oc delete configmap + -ltopsail.fine-tuning-jobname={{ job_name_safe }} + --ignore-not-found + -n {{ fine_tuning_ray_fine_tuning_job_namespace }} + when: fine_tuning_ray_fine_tuning_job_delete_other | bool + +- name: Prepare the config file template + template: + src: "{{ fine_tuning_job_config_template }}" + dest: "{{ artifact_extra_logs_dir }}/src/config_base.yaml" + mode: '0400' + +- name: Save the hype-parameters overrides into a file + shell: | + set -o pipefail; + + cat << EOF | yq -y > "{{ artifact_extra_logs_dir }}/src/config_override.yaml" + {{ fine_tuning_ray_fine_tuning_job_hyper_parameters | to_yaml }} + EOF + +- name: Convert the config to json + shell: + set -o pipefail; + + cat "{{ artifact_extra_logs_dir }}/src/config_base.yaml" + {% if fine_tuning_ray_fine_tuning_job_hyper_parameters %} + "{{ artifact_extra_logs_dir }}/src/config_override.yaml" + {% endif %} + | yq + > "{{ artifact_extra_logs_dir }}/src/config_final.json" + +- name: Prepare the config ConfigMap + shell: | + set -o pipefail; + + oc create cm {{ job_name_safe }}-config \ + -n {{ fine_tuning_ray_fine_tuning_job_namespace }} \ + --from-file=config.json=<(cat "{{ artifact_extra_logs_dir }}/src/config_final.json") \ + --dry-run=client \ + -oyaml \ + | yq -Y '. | .metadata.labels = {"topsail.fine-tuning-jobname": "{{ job_name_safe }}"}' \ + | tee -a "{{ artifact_extra_logs_dir }}/src/configmap_config.yaml" \ + | oc apply -f- + +- name: Prepare the entrypoint ConfigMap + shell: | + set -o pipefail; + + oc create cm {{ job_name_safe }}-entrypoint \ + -n {{ fine_tuning_ray_fine_tuning_job_namespace }} \ + --from-file=$(find "{{ fine_tuning_job_entrypoint_dir }}" -maxdepth 1 -not -type d | tr '\n' ,)/dev/null \ + --dry-run=client \ + -oyaml \ + | yq -Y '. | .metadata.labels = {"topsail.fine-tuning-jobname": "{{ job_name_safe }}"}' \ + | tee -a "{{ artifact_extra_logs_dir }}/src/configmap_entrypoint.yaml" \ + | oc apply -f- + +- name: Prepare the fine-tuning scripts ConfigMap + shell: | + set -o pipefail; + + oc create cm {{ job_name_safe }}-ft-scripts \ + -n {{ fine_tuning_ray_fine_tuning_job_namespace }} \ + --from-file="$(find "{{ fine_tuning_ray_fine_tuning_job_ft_scripts_dir }}" -not -type d -not -name '*.pyc' | tr '\n' ,)/dev/null" \ + --dry-run=client \ + -oyaml \ + | yq -Y '. | .metadata.labels = {"topsail.fine-tuning-jobname": "{{ job_name_safe }}"}' \ + | tee -a "{{ artifact_extra_logs_dir }}/src/configmap_ft_scripts.yaml" \ + | oc apply -f- + +- name: Load the content of the requirement file + shell: + set -o pipefail; + cat "{{ fine_tuning_ray_fine_tuning_job_ft_scripts_dir }}/requirements.txt" | sed 's/^/- /' + register: requirements_cmd + +- name: Prepare the cluster template file + template: + src: "{{ fine_tuning_cluster_template }}" + dest: "{{ artifact_extra_logs_dir }}/src/ray_cluster.yaml" + mode: '0400' + +- name: Prepare the job template file + template: + src: "{{ fine_tuning_job_template }}" + dest: "{{ artifact_extra_logs_dir }}/src/ray_job.yaml" + mode: '0700' + +- name: Add the cluster to the job + shell: + set -o pipefail; + + cluster=$(cat "{{ artifact_extra_logs_dir }}/src/ray_cluster.yaml" | yq .spec); + + yq --yaml-roundtrip --in-place \ + --argjson cluster "$cluster" + '.spec.rayClusterSpec = $cluster' \ + "{{ artifact_extra_logs_dir }}/src/ray_job.yaml" + +- name: Delete the Ray jobs and clusters, if they exist + command: + oc delete raycluster,rayjobs + --all + -n {{ fine_tuning_ray_fine_tuning_job_namespace }} + --ignore-not-found + when: fine_tuning_ray_fine_tuning_job_delete_other | bool + +- name: Delete the fine-tuning job, if it exists + shell: + oc delete -f "{{ artifact_extra_logs_dir }}/src/ray_job.yaml" --ignore-not-found; + oc delete -f "{{ artifact_extra_logs_dir }}/src/ray_cluster.yaml" --ignore-not-found; + +- name: Exit the play in 'prepare_only' mode + meta: end_play + when: fine_tuning_ray_fine_tuning_job_prepare_only | bool + +- name: Create the RayJob + command: + oc create -f "{{ artifact_extra_logs_dir }}/src/ray_job.yaml" + +- name: Wait for the job completion + block: + - name: Wait for the cluster to be created + command: + oc get rayjob/{{ job_name_safe }} -ojsonpath={.status.rayClusterName} + register: ray_cluster_name_cmd + retries: 10 + delay: 6 + until: ray_cluster_name_cmd.stdout + + - name: Save the cluster name + set_fact: + ray_cluster_name: "{{ ray_cluster_name_cmd.stdout }}" + + - name: Wait for the cluster's head Pod to start running + shell: + set -o pipefail; + oc get pods -l 'ray.io/identifier={{ ray_cluster_name }}-head' + -n {{ fine_tuning_ray_fine_tuning_job_namespace }} + --no-headers | awk '{print $3}' + register: wait_pod_start + retries: 20 + delay: 5 + until: wait_pod_start.stdout in ["Running", "Error", "Init:Error", "Completed", "NotReady", "CrashLoopBackOff", "ContainerCreating", "ImagePullBackOff"] + + - name: Fail if the Pod did not start successfully + fail: msg="Pod in error state" + when: wait_pod_start.stdout in ["Error", "Init:Error", "CrashLoopBackOff", "ImagePullBackOff"] + + - name: Wait for the Pod to fetch the image + when: wait_pod_start.stdout in ["ContainerCreating"] + block: + - name: Wait for the Pod to fetch the image + shell: + set -o pipefail; + oc get pods -l 'ray.io/identifier={{ ray_cluster_name }}-head' + -n {{ fine_tuning_ray_fine_tuning_job_namespace }} + --no-headers | awk '{print $3}' + register: wait_pod_fetch + retries: 720 + delay: 10 + until: wait_pod_fetch.stdout in ["Running", "Error", "Init:Error", "Completed", "NotReady", "CrashLoopBackOff", "ImagePullBackOff"] + + - name: Fail if the Pod did not start successfully + fail: msg="Pod in error state" + when: wait_pod_fetch.stdout in ["Error", "Init:Error", "CrashLoopBackOff", "ImagePullBackOff"] + + - name: Wait for the cluster to become Ready + shell: oc get rayjobs/{{ job_name_safe }} -ojsonpath={.status.jobDeploymentStatus} + register: ray_job_deployment_status + retries: 12 + delay: 10 + until: ray_job_deployment_status.stdout not in ["", "Initializing"] + + - name: Fail if the Job did not start running + fail: msg="Pod in error state" + when: ray_job_deployment_status.stdout not in ["Running"] + + - name: Finish here if sleeping forever + when: fine_tuning_ray_fine_tuning_job_sleep_forever | bool + meta: end_play + + - name: Wait for the job to complete + command: + oc get rayjob/{{ job_name_safe }} -ojsonpath={.status.jobStatus} + register: ray_job_status_cmd + retries: 720 + delay: 30 + until: ray_job_status_cmd.stdout in ["FAILED", "SUCCEEDED"] + + - name: Fail if the Job did not complete properly + fail: msg="Job in '{{ ray_job_status_cmd.stdout }}' state" + when: ray_job_status_cmd.stdout not in ["SUCCEEDED"] + + - name: Check if the script succeeded + shell: + set -o pipefail; + + oc logs job/{{ job_name_safe }} + -n {{ fine_tuning_ray_fine_tuning_job_namespace }} + | grep "SCRIPT SUCCEEDED" + register: script_succeeded_cmd + + always: + - name: Capture the state of the fine-tuning Pod resource + shell: + set -o pipefail; + + oc get pod + -lray.io/cluster={{ ray_cluster_name }} + -ojson + -n {{ fine_tuning_ray_fine_tuning_job_namespace }} + > {{ artifact_extra_logs_dir }}/artifacts/pod.json; + + oc get pod + -lray.io/cluster={{ ray_cluster_name }} + -oyaml + -n {{ fine_tuning_ray_fine_tuning_job_namespace }} + > {{ artifact_extra_logs_dir }}/artifacts/pod.yaml; + + oc get pod + -lray.io/cluster={{ ray_cluster_name }} + -owide + -n {{ fine_tuning_ray_fine_tuning_job_namespace }} + > {{ artifact_extra_logs_dir }}/artifacts/pod.status; + + oc describe pod + -lray.io/cluster={{ ray_cluster_name }} + -n {{ fine_tuning_ray_fine_tuning_job_namespace }} + > {{ artifact_extra_logs_dir }}/artifacts/pod.desc + + oc logs $( + oc get pods -l 'ray.io/identifier={{ ray_cluster_name }}-head' + -n {{ fine_tuning_ray_fine_tuning_job_namespace }} + -oname | head -1) + -n {{ fine_tuning_ray_fine_tuning_job_namespace }} + > {{ artifact_extra_logs_dir }}/artifacts/pod.log + ignore_errors: true + + - name: Capture the state of the RayCluster resource + shell: + set -o pipefail; + + oc get RayCluster/{{ ray_cluster_name }} + -oyaml + -n {{ fine_tuning_ray_fine_tuning_job_namespace }} + > {{ artifact_extra_logs_dir }}/artifacts/raycluster.yaml; + + oc get RayCluster/{{ ray_cluster_name }} + -owide + -n {{ fine_tuning_ray_fine_tuning_job_namespace }} + > {{ artifact_extra_logs_dir }}/artifacts/raycluster.status; + + oc describe RayCluster/{{ ray_cluster_name }} + -n {{ fine_tuning_ray_fine_tuning_job_namespace }} + > {{ artifact_extra_logs_dir }}/artifacts/raycluster.desc + ignore_errors: true + + - name: Capture the state of the RayJob resource + shell: + set -o pipefail; + + oc get RayJob/{{ job_name_safe }} + -oyaml + -n {{ fine_tuning_ray_fine_tuning_job_namespace }} + > {{ artifact_extra_logs_dir }}/artifacts/rayjob.yaml; + + oc get RayCluster/{{ job_name_safe }} + -owide + -n {{ fine_tuning_ray_fine_tuning_job_namespace }} + > {{ artifact_extra_logs_dir }}/artifacts/rayjob.status; + + oc describe RayCluster/{{ job_name_safe }} + -n {{ fine_tuning_ray_fine_tuning_job_namespace }} + > {{ artifact_extra_logs_dir }}/artifacts/rayjob.desc + ignore_errors: true diff --git a/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/templates/base_config.yaml.j2 b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/templates/base_config.yaml.j2 new file mode 100644 index 0000000000..a4c6b0528c --- /dev/null +++ b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/templates/base_config.yaml.j2 @@ -0,0 +1,13 @@ +--- +# DATASET_SOURCE: {{ fine_tuning_ray_fine_tuning_job_dataset_name }} +# DATASET_TRANSFORM: {{ fine_tuning_ray_fine_tuning_job_dataset_transform }} +# DATASET_REPLICATION: {{ fine_tuning_ray_fine_tuning_job_dataset_replication }} + +training_data_path: "/mnt/output/dataset.json" # aka DATASET_DEST +model_name_or_path: "/mnt/storage/model/{{ fine_tuning_ray_fine_tuning_job_model_name }}" + +response_template: "{{ fine_tuning_ray_fine_tuning_job_dataset_response_template }}" + +output_dir: "/mnt/output/fine-tuning" + +max_seq_length: 4096 diff --git a/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/templates/ray-job.yaml.j2 b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/templates/ray-job.yaml.j2 new file mode 100644 index 0000000000..d728038688 --- /dev/null +++ b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/templates/ray-job.yaml.j2 @@ -0,0 +1,153 @@ +apiVersion: ray.io/v1 +kind: RayJob +metadata: + name: rayjob-sample +spec: + # submissionMode specifies how RayJob submits the Ray job to the RayCluster. + # The default value is "K8sJobMode", meaning RayJob will submit the Ray job via a submitter Kubernetes Job. + # The alternative value is "HTTPMode", indicating that KubeRay will submit the Ray job by sending an HTTP request to the RayCluster. + # submissionMode: "K8sJobMode" + entrypoint: python /home/ray/samples/sample_code.py + # shutdownAfterJobFinishes specifies whether the RayCluster should be deleted after the RayJob finishes. Default is false. + # shutdownAfterJobFinishes: false + + # ttlSecondsAfterFinished specifies the number of seconds after which the RayCluster will be deleted after the RayJob finishes. + # ttlSecondsAfterFinished: 10 + + # activeDeadlineSeconds is the duration in seconds that the RayJob may be active before + # KubeRay actively tries to terminate the RayJob; value must be positive integer. + # activeDeadlineSeconds: 120 + + # RuntimeEnvYAML represents the runtime environment configuration provided as a multi-line YAML string. + # See https://docs.ray.io/en/latest/ray-core/handling-dependencies.html for details. + # (New in KubeRay version 1.0.) + runtimeEnvYAML: | + pip: + - requests==2.26.0 + - pendulum==2.1.2 + env_vars: + counter_name: "test_counter" + + # Suspend specifies whether the RayJob controller should create a RayCluster instance. + # If a job is applied with the suspend field set to true, the RayCluster will not be created and we will wait for the transition to false. + # If the RayCluster is already created, it will be deleted. In the case of transition to false, a new RayCluste rwill be created. + # suspend: false + + # rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller. + rayClusterSpec: + rayVersion: '2.9.0' # should match the Ray version in the image of the containers + # Ray head pod template + headGroupSpec: + # The `rayStartParams` are used to configure the `ray start` command. + # See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay. + # See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`. + rayStartParams: + dashboard-host: '0.0.0.0' + #pod template + template: + spec: + containers: + - name: ray-head + image: rayproject/ray:2.9.0 + ports: + - containerPort: 6379 + name: gcs-server + - containerPort: 8265 # Ray dashboard + name: dashboard + - containerPort: 10001 + name: client + resources: + limits: + cpu: "1" + requests: + cpu: "200m" + volumeMounts: + - mountPath: /home/ray/samples + name: code-sample + volumes: + # You set volumes at the Pod level, then mount them into containers inside that Pod + - name: code-sample + configMap: + # Provide the name of the ConfigMap you want to mount. + name: ray-job-code-sample + # An array of keys from the ConfigMap to create as files + items: + - key: sample_code.py + path: sample_code.py + workerGroupSpecs: + # the pod replicas in this group typed worker + - replicas: 1 + minReplicas: 1 + maxReplicas: 5 + # logical group name, for this called small-group, also can be functional + groupName: small-group + # The `rayStartParams` are used to configure the `ray start` command. + # See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay. + # See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`. + rayStartParams: {} + #pod template + template: + spec: + containers: + - name: ray-worker # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name', or '123-abc' + image: rayproject/ray:2.9.0 + lifecycle: + preStop: + exec: + command: [ "/bin/sh","-c","ray stop" ] + resources: + limits: + cpu: "1" + requests: + cpu: "200m" + # SubmitterPodTemplate is the template for the pod that will run the `ray job submit` command against the RayCluster. + # If SubmitterPodTemplate is specified, the first container is assumed to be the submitter container. + # submitterPodTemplate: + # spec: + # restartPolicy: Never + # containers: + # - name: my-custom-rayjob-submitter-pod + # image: rayproject/ray:2.9.0 + # # If Command is not specified, the correct command will be supplied at runtime using the RayJob spec `entrypoint` field. + # # Specifying Command is not recommended. + # # command: ["sh", "-c", "ray job submit --address=http://$RAY_DASHBOARD_ADDRESS --submission-id=$RAY_JOB_SUBMISSION_ID -- echo hello world"] + + +######################Ray code sample################################# +# this sample is from https://docs.ray.io/en/latest/cluster/job-submission.html#quick-start-example +# it is mounted into the container and executed to show the Ray job at work +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: ray-job-code-sample +data: + sample_code.py: | + import ray + import os + import requests + + ray.init() + + @ray.remote + class Counter: + def __init__(self): + # Used to verify runtimeEnv + self.name = os.getenv("counter_name") + assert self.name == "test_counter" + self.counter = 0 + + def inc(self): + self.counter += 1 + + def get_counter(self): + return "{} got {}".format(self.name, self.counter) + + counter = Counter.remote() + + for _ in range(5): + ray.get(counter.inc.remote()) + print(ray.get(counter.get_counter.remote())) + + # Verify that the correct runtime env was used for the job. + assert requests.__version__ == "2.26.0" diff --git a/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/templates/ray_cluster.yaml.j2 b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/templates/ray_cluster.yaml.j2 new file mode 100644 index 0000000000..377dfea4b6 --- /dev/null +++ b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/templates/ray_cluster.yaml.j2 @@ -0,0 +1,125 @@ +# this resource (.spec) is currently used AS PART of the the RayJob, +# NOT directly. +apiVersion: ray.io/v1 +kind: RayCluster +metadata: + name: {{ fine_tuning_ray_fine_tuning_job_name }} + namespace: {{ fine_tuning_ray_fine_tuning_job_namespace }} +spec: + headGroupSpec: + enableIngress: false + rayStartParams: + block: 'true' + dashboard-host: 0.0.0.0 + num-gpus: '1' + resources: '"{}"' + serviceType: ClusterIP + template: + spec: + containers: + - name: ray-head + ports: + - containerPort: 6379 + name: gcs + protocol: TCP + - containerPort: 8265 + name: dashboard + protocol: TCP + - containerPort: 10001 + name: client + protocol: TCP + image: &head_image "{{ fine_tuning_ray_fine_tuning_job_container_image }}" + + env: &head_env + + - name: FT_CONFIG_JSON_PATH + value: /mnt/config/config.json + - name: DATASET_SOURCE + value: "/mnt/storage/dataset/{{ fine_tuning_ray_fine_tuning_job_dataset_name }}" + - name: DATASET_REPLICATION + value: "{{ fine_tuning_ray_fine_tuning_job_dataset_replication }}" +{% if fine_tuning_ray_fine_tuning_job_dataset_transform %} + - name: DATASET_TRANSFORM + value: "/mnt/entrypoint/{{ fine_tuning_ray_fine_tuning_job_dataset_transform }}" +{% endif %} +{% if fine_tuning_ray_fine_tuning_job_dataset_prefer_cache %} + - name: DATASET_PREFER_CACHE + value: "true" +{% endif %} +{% if fine_tuning_ray_fine_tuning_job_dataset_prepare_cache_only %} + - name: DATASET_PREPARE_CACHE_ONLY + value: "true" +{% endif %} + +{% if fine_tuning_ray_fine_tuning_job_gpu %} + - name: NUM_GPUS + value: "{{ fine_tuning_ray_fine_tuning_job_gpu }}" +{% endif %} + - name: MODEL_NAME + value: "{{ fine_tuning_ray_fine_tuning_job_model_name}}" +{% if fine_tuning_ray_fine_tuning_job_sleep_forever %} + - name: SLEEP_FOREVER + value: "true" +{% endif %} + resources: &head_resources + requests: &head_request_block +{% if fine_tuning_ray_fine_tuning_job_gpu %} + nvidia.com/gpu: "{{ fine_tuning_ray_fine_tuning_job_gpu }}" +{% endif %} + memory: "{{ fine_tuning_ray_fine_tuning_job_memory }}Gi" + cpu: "{{ fine_tuning_ray_fine_tuning_job_cpu }}" +{% if fine_tuning_ray_fine_tuning_job_request_equals_limits %} + limits: *head_request_block +{% elif fine_tuning_ray_fine_tuning_job_gpu %} + limits: + nvidia.com/gpu: "{{ fine_tuning_ray_fine_tuning_job_gpu }}" +{% endif %} + + volumeMounts: &head_volume_mounts + + - name: storage-volume + mountPath: /mnt/storage + - name: ft-scripts-volume + mountPath: /mnt/ft-scripts + - name: entrypoint-volume + mountPath: /mnt/entrypoint + - name: config-volume + mountPath: /mnt/config + - name: output-volume + mountPath: /mnt/output + + volumes: &head_volumes + - name: storage-volume + persistentVolumeClaim: + claimName: {{ fine_tuning_ray_fine_tuning_job_pvc_name }} + - name: config-volume + configMap: + name: {{ job_name_safe }}-config + - name: entrypoint-volume + configMap: + name: {{ job_name_safe }}-entrypoint + - name: ft-scripts-volume + configMap: + name: {{ job_name_safe }}-ft-scripts + - name: output-volume + emptyDir: {} + + rayVersion: {{ fine_tuning_ray_fine_tuning_job_ray_version }} + workerGroupSpecs: + - groupName: {{ fine_tuning_ray_fine_tuning_job_name }} + maxReplicas: {{ fine_tuning_ray_fine_tuning_job_worker_replicas }} + minReplicas: {{ fine_tuning_ray_fine_tuning_job_worker_replicas }} + rayStartParams: + block: "true" + num-gpus: "1" + resources: '"{}"' + replicas: {{ fine_tuning_ray_fine_tuning_job_worker_replicas }} + template: + spec: + containers: + - name: machine-learning + image: *head_image + env: *head_env + resources: *head_resources + volumeMounts: *head_volume_mounts + volumes: *head_volumes diff --git a/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/templates/ray_job.yaml.j2 b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/templates/ray_job.yaml.j2 new file mode 100644 index 0000000000..fa1b4c5da5 --- /dev/null +++ b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/templates/ray_job.yaml.j2 @@ -0,0 +1,36 @@ +apiVersion: ray.io/v1 +kind: RayJob +metadata: + name: {{ job_name_safe }} + namespace: {{ fine_tuning_ray_fine_tuning_job_namespace }} +spec: + # submissionMode specifies how RayJob submits the Ray job to the RayCluster. + # The default value is "K8sJobMode", meaning RayJob will submit the Ray job via a submitter Kubernetes Job. + # The alternative value is "HTTPMode", indicating that KubeRay will submit the Ray job by sending an HTTP request to the RayCluster. + # submissionMode: "K8sJobMode" + entrypoint: bash /mnt/entrypoint/{{ fine_tuning_job_entrypoint_name }} + # shutdownAfterJobFinishes specifies whether the RayCluster should be deleted after the RayJob finishes. Default is false. + shutdownAfterJobFinishes: true + + # ttlSecondsAfterFinished specifies the number of seconds after which the RayCluster will be deleted after the RayJob finishes. + # ttlSecondsAfterFinished: 10 + + # activeDeadlineSeconds is the duration in seconds that the RayJob may be active before + # KubeRay actively tries to terminate the RayJob; value must be positive integer. + # activeDeadlineSeconds: 120 + + # RuntimeEnvYAML represents the runtime environment configuration provided as a multi-line YAML string. + # See https://docs.ray.io/en/latest/ray-core/handling-dependencies.html for details. + # (New in KubeRay version 1.0.) + runtimeEnvYAML: | + pip: +{{ requirements_cmd.stdout | indent(4, True) }} + + + # Suspend specifies whether the RayJob controller should create a RayCluster instance. + # If a job is applied with the suspend field set to true, the RayCluster will not be created and we will wait for the transition to false. + # If the RayCluster is already created, it will be deleted. In the case of transition to false, a new RayCluste rwill be created. + # suspend: false + + # rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller. + rayClusterSpec: {} # the RayCluster from `ray_cluster.yaml.j2` will be inserted here diff --git a/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/vars/main/resources.yml b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/vars/main/resources.yml new file mode 100644 index 0000000000..54ea6d2a1e --- /dev/null +++ b/projects/fine_tuning/toolbox/fine_tuning_ray_fine_tuning_job/vars/main/resources.yml @@ -0,0 +1,8 @@ +--- +fine_tuning_cluster_template: templates/ray_cluster.yaml.j2 +fine_tuning_job_template: templates/ray_job.yaml.j2 +fine_tuning_job_config_template: templates/base_config.yaml.j2 + +fine_tuning_job_entrypoint_dir: "{{ role_path }}/files/entrypoint" +fine_tuning_job_entrypoint_name: job_entrypoint.sh +__safe: [fine_tuning_job_entrypoint_name]