diff --git a/README.md b/README.md index 193fd94..7caa10a 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ This repository contains a variety of Determined examples that are not actively | [LLM Finetuning 2](blog/llm-finetuning-2) | Finetuning Mistral-7B on Text-to-SQL using LoRA and DeepSpeed. | | [LLM Finetuning 3](blog/llm-finetuning-3) | Finetuning Gemma-2B using DPO. | | [Python SDK demo](blog/python_sdk_demo) | Example usage of the Determined Python SDK to run and administer experiments. | +| [Tensor Parallelism](blog/tp) | Profiling tensor parallelism in PyTorch. | ## Computer Vision diff --git a/blog/act-mem-2/README.md b/blog/act-mem-2/README.md index 07f1547..9531f4c 100644 --- a/blog/act-mem-2/README.md +++ b/blog/act-mem-2/README.md @@ -9,3 +9,8 @@ memory. - `attn_script.py` shows the cost of activation memory in the attention layer. - Tests of the code are in `test.py`. - See `requirements.txt` for versions the code was built against. + + +## Contributors + +- [Garrett Goon](https://github.com/garrett361) \ No newline at end of file diff --git a/blog/tp/README.md b/blog/tp/README.md new file mode 100644 index 0000000..79ea909 --- /dev/null +++ b/blog/tp/README.md @@ -0,0 +1,13 @@ +# Tensor Parallelism + +Code accompanying the deep-dive [blog post on Tensor Parallelism](https://determined.ai/blog/tp). + +- The MLP and TP MLP layers are in `layer.py` +- Matmul profiling code in `matmul_profiling.py` +- MLP TP profiling code in `tp_profiling.py` +- Tests of the rearranging tensor sums are in `test_dot_product_{local,distributed}.py` + + +## Contributors + +- [Garrett Goon](https://github.com/garrett361) \ No newline at end of file diff --git a/blog/tp/layers.py b/blog/tp/layers.py new file mode 100644 index 0000000..f559742 --- /dev/null +++ b/blog/tp/layers.py @@ -0,0 +1,138 @@ +from typing import Any, Optional, Union + +import torch +import torch.distributed as dist +import torch.nn as nn + + +class MLP(nn.Module): + """ + Basic MLP (multi-layer perceptron) layer. Dropout is neglected. + """ + + def __init__( + self, + d_model: int, + device: Optional[Union[str, torch.device]] = None, + dtype: Optional[torch.dtype] = None, + ) -> None: + super().__init__() + + self.lin_0 = nn.Linear(d_model, 4 * d_model, device=device, dtype=dtype) + self.act_fn = nn.GELU() + self.lin_1 = nn.Linear(4 * d_model, d_model, device=device, dtype=dtype) + + def forward(self, inputs: torch.Tensor) -> torch.Tensor: + x = self.lin_0(inputs) + x = self.act_fn(x) + x = self.lin_1(x) + return x + + +class AllReduceFwdIdentityBwd(torch.autograd.Function): + @staticmethod + def forward( + ctx: Any, inputs: torch.Tensor, group: Optional[dist.ProcessGroup] = None + ) -> torch.Tensor: + inputs = inputs.clone() + dist.all_reduce(inputs, group=group) + return inputs + + @staticmethod + def backward(ctx: Any, grad_outputs: torch.Tensor) -> tuple[torch.Tensor, None]: + return grad_outputs, None + + +class IdentityFwdAllReduceBwd(torch.autograd.Function): + @staticmethod + def forward( + ctx: Any, inputs: torch.Tensor, group: Optional[dist.ProcessGroup] = None + ) -> torch.Tensor: + ctx.group = group + return inputs + + @staticmethod + def backward(ctx: Any, grad_outputs: torch.Tensor) -> tuple[torch.Tensor, None]: + grad_outputs = grad_outputs.clone() + dist.all_reduce(grad_outputs, group=ctx.group) + return grad_outputs, None + + +class LinearShardedOutputs(nn.Linear): + def __init__( + self, + in_features: int, + out_features: int, + group: dist.ProcessGroup, + device: Optional[Union[str, torch.device]] = None, + dtype: Optional[torch.dtype] = None, + ) -> None: + sharded_out_features, remainder = divmod(out_features, group.size()) + assert not remainder, "out_features must be divisible by the ProcessGroup size" + super().__init__( + in_features=in_features, + out_features=sharded_out_features, + device=device, + dtype=dtype, + ) + + self.group = group + + def forward(self, inputs: torch.Tensor) -> torch.Tensor: + # Wrap the unsharded inputs for backwards-pass correctness. + x = IdentityFwdAllReduceBwd.apply(inputs, self.group) + x = super().forward(x) + return x + + +class LinearShardedInputs(nn.Linear): + def __init__( + self, + in_features: int, + out_features: int, + group: dist.ProcessGroup, + device: Optional[Union[str, torch.device]] = None, + dtype: Optional[torch.dtype] = None, + ) -> None: + sharded_in_features, remainder = divmod(in_features, group.size()) + assert not remainder, "in_features must be divisible by the ProcessGroup size" + super().__init__( + in_features=sharded_in_features, + out_features=out_features, + device=device, + dtype=dtype, + ) + self.group = group + + def forward(self, inputs: torch.Tensor) -> torch.Tensor: + x = inputs @ self.weight.T + # Wrap the mat-mul in an all-reduce forwards-pass correctness. + x = AllReduceFwdIdentityBwd.apply(x, self.group) + # Crucial: add the bias _after_ the all-reduce. + x = x + self.bias + return x + + +class MLPTP(MLP): + """ + Basic Tensor Parallel MLP (multi-layer perceptron) layer. Dropout is neglected. + """ + + def __init__( + self, + d_model: int, + group: Optional[dist.ProcessGroup] = None, + device: Optional[Union[str, torch.device]] = None, + dtype: Optional[torch.dtype] = None, + ) -> None: + nn.Module.__init__(self) + # Fallback to the WORLD process group, if None provided + group = group or dist.group.WORLD + + self.lin_0 = LinearShardedOutputs( + d_model, 4 * d_model, group=group, device=device, dtype=dtype + ) + self.act_fn = nn.GELU() + self.lin_1 = LinearShardedInputs( + 4 * d_model, d_model, group=group, device=device, dtype=dtype + ) diff --git a/blog/tp/matmul.png b/blog/tp/matmul.png new file mode 100644 index 0000000..f23da74 Binary files /dev/null and b/blog/tp/matmul.png differ diff --git a/blog/tp/matmul_profiling.py b/blog/tp/matmul_profiling.py new file mode 100644 index 0000000..91ac4e6 --- /dev/null +++ b/blog/tp/matmul_profiling.py @@ -0,0 +1,91 @@ +import gc +import logging + +import determined as det +import torch + +import utils + +""" +Script for profiling square matmuls on a single GPU. +""" + + +def profile_and_report( + core_context: det.core.Context, + d_model: int, + num_repeats: int, + num_warmups: int, + dtype: torch.dtype = torch.bfloat16, +) -> None: + A = torch.randn(d_model, d_model, device="cuda", dtype=dtype) + B = torch.randn(d_model, d_model, device="cuda", dtype=dtype) + + # Use CUDA events for accurate timing. + timer = utils.CUDAEventTimer() + torch.cuda.synchronize() + + # Warmups + for _ in range(num_warmups): + A @ B + + # Timed region. + for _ in range(num_repeats): + with timer: + A @ B + + # Mean and std TFLOP computations + flops = 2 * d_model**3 + time_s_t = torch.tensor(timer.time_s_list) + tflop_s_gpu_t = flops / time_s_t / 1e12 + metrics = { + "d_model": d_model, + "time_s": timer.time_s_mean, + "time_s_std": timer.time_s_std, + "tflop_s_gpu": tflop_s_gpu_t.mean().item(), + "tflop_s_gpu_std": tflop_s_gpu_t.std().item(), + } + + # Use d_model as the x-axis for plotting purposes. + core_context.train.report_metrics(group="matmul", steps_completed=d_model, metrics=metrics) + + # Memory management + del A + del B + gc.collect() + torch.cuda.empty_cache() + + +def main( + core_context: det.core.Context, + d_model_min: int, + d_model_max: int, + d_model_step: int, + num_repeats: int, + num_warmups: int, +) -> None: + for d_model in range(d_model_min, d_model_max + 1, d_model_step): + profile_and_report( + core_context=core_context, + d_model=d_model, + num_repeats=num_repeats, + num_warmups=num_warmups, + ) + + +if __name__ == "__main__": + info = det.get_cluster_info() + assert info, "This script must run on a determined cluster." + hparams = info.trial.hparams + + with det.core.init() as core_context: + logging.basicConfig(level=logging.INFO, format=det.LOG_FORMAT) + + main( + core_context=core_context, + d_model_min=hparams["d_model_min"], + d_model_max=hparams["d_model_max"], + d_model_step=hparams["d_model_step"], + num_repeats=hparams["num_repeats"], + num_warmups=hparams["num_warmups"], + ) diff --git a/blog/tp/matmul_profiling.yaml b/blog/tp/matmul_profiling.yaml new file mode 100644 index 0000000..97674dd --- /dev/null +++ b/blog/tp/matmul_profiling.yaml @@ -0,0 +1,20 @@ +name: Matmul Profiling +# Adjust the workspace and project names, as appropriate. +workspace: TP Blog Post +project: Matmul Profiling +resources: + slots_per_trial: 1 +searcher: + name: single + metric: not_used + max_length: 1 +hyperparameters: + d_model_min: 256 + d_model_max: 16384 + d_model_step: 256 + num_warmups: 5 + num_repeats: 100 +entrypoint: >- + python3 -m determined.launch.torch_distributed + python3 matmul_profiling.py +max_restarts: 0 diff --git a/blog/tp/mlp_tp.png b/blog/tp/mlp_tp.png new file mode 100644 index 0000000..1aff737 Binary files /dev/null and b/blog/tp/mlp_tp.png differ diff --git a/blog/tp/plots.ipynb b/blog/tp/plots.ipynb new file mode 100644 index 0000000..a0663e6 --- /dev/null +++ b/blog/tp/plots.ipynb @@ -0,0 +1,470 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "6cf61f44-e76f-435a-a4c4-7882c1546d52", + "metadata": {}, + "source": [ + "# Plots" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "1b461d37-b6de-42b7-9b03-063f3c46ef93", + "metadata": {}, + "outputs": [], + "source": [ + "import determined as det\n", + "from collections import defaultdict\n", + "from determined.experimental import client\n", + "import pandas as pd\n", + "import seaborn as sns\n", + "import matplotlib.pyplot as plt\n", + "\n", + "sns.set_theme(style=\"darkgrid\")\n", + "sns.set(rc={\"figure.figsize\": (7.5, 7.5)})\n" + ] + }, + { + "cell_type": "markdown", + "id": "b1be1013-61b9-4290-95e7-891e9cbe0eb5", + "metadata": {}, + "source": [ + "Get all projects in the workspace" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "27c9f4dc-f0b8-45c9-ba3e-5c1ff398b8d3", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "MLP TP Profiling\n", + "Matmul Profiling\n", + "obsolete\n" + ] + } + ], + "source": [ + "workspace = client.get_workspace(\"TP Blog Post\")\n", + "projects_dict = {p.name: p for p in workspace.list_projects()}\n", + "for p in projects_dict:\n", + " print(p)" + ] + }, + { + "cell_type": "markdown", + "id": "5703a1f3-24b5-4d7c-acdd-3c14bf88f4af", + "metadata": {}, + "source": [ + "## Matmul Plots" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "d94ce58b-497c-46de-ac9d-57c8a2c053e1", + "metadata": {}, + "outputs": [], + "source": [ + "matmul_trial = projects_dict[\"Matmul Profiling\"].list_experiments()[0].list_trials()[0]" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "996c7686-7c1d-4de2-8a8a-f46453272b41", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
d_modelTFLOP/sec/GPUTFLOP/sec/GPU_std
02561.0436840.065087
15128.5082061.398408
276829.9998401.571498
3102449.3716813.053242
41280111.7303544.155525
............
5915360267.5995182.319973
6015616268.2694402.898224
6115872267.0128784.683553
6216128267.3208314.209327
6316384268.2875373.332946
\n", + "

64 rows × 3 columns

\n", + "
" + ], + "text/plain": [ + " d_model TFLOP/sec/GPU TFLOP/sec/GPU_std\n", + "0 256 1.043684 0.065087\n", + "1 512 8.508206 1.398408\n", + "2 768 29.999840 1.571498\n", + "3 1024 49.371681 3.053242\n", + "4 1280 111.730354 4.155525\n", + ".. ... ... ...\n", + "59 15360 267.599518 2.319973\n", + "60 15616 268.269440 2.898224\n", + "61 15872 267.012878 4.683553\n", + "62 16128 267.320831 4.209327\n", + "63 16384 268.287537 3.332946\n", + "\n", + "[64 rows x 3 columns]" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "matmul_results_dict = defaultdict(list)\n", + "for m in matmul_trial.iter_metrics(\"matmul\"):\n", + " matmul_results_dict[\"d_model\"].append(m.metrics[\"d_model\"])\n", + " matmul_results_dict[\"TFLOP/sec/GPU\"].append(m.metrics[\"tflop_s_gpu\"])\n", + " matmul_results_dict[\"TFLOP/sec/GPU_std\"].append(m.metrics[\"tflop_s_gpu_std\"])\n", + "matmul_results_df = pd.DataFrame.from_dict(matmul_results_dict)\n", + "matmul_results_df" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "d47fdb30-a4be-493e-8a02-7e190c75c82c", + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "matmul_plot = sns.scatterplot(matmul_results_df, x=\"d_model\", y=\"TFLOP/sec/GPU\")\n", + "plt.suptitle(\"Square Matmul TFLOP/sec\")\n", + "matmul_plot.axhline(y=312, color=\"r\", linestyle=\"--\")\n", + "matmul_plot.figure.savefig(\"matmul.png\", dpi=256, bbox_inches=\"tight\")" + ] + }, + { + "cell_type": "markdown", + "id": "a309a6fa-8bd7-4559-b2e1-3774f0f530f8", + "metadata": {}, + "source": [ + "## MLP TP Plots" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "5024162f-4d26-4d0e-a78d-6444e62248d7", + "metadata": {}, + "outputs": [], + "source": [ + "mlp_tp_trial = projects_dict[\"MLP TP Profiling\"].list_experiments()[0].list_trials()[0]" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "10445d10-70ba-4163-bfc4-a14cc3f7b64c", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
tp_degreed_modeltime_sTFLOP/sec/GPUTFLOP/sec/GPU_std
0110240.000492139.7606051.070486
1115360.000965160.2972110.191192
2120480.001459188.4338993.754535
3125600.002080206.4862062.993310
4130720.002941210.2913972.333767
..................
1518184320.012830216.6374662.120074
1528189440.013551216.7002563.428361
1538194560.014483213.7898102.577091
1548199680.015056216.6678923.947501
1558204800.015662219.1872254.835044
\n", + "

156 rows × 5 columns

\n", + "
" + ], + "text/plain": [ + " tp_degree d_model time_s TFLOP/sec/GPU TFLOP/sec/GPU_std\n", + "0 1 1024 0.000492 139.760605 1.070486\n", + "1 1 1536 0.000965 160.297211 0.191192\n", + "2 1 2048 0.001459 188.433899 3.754535\n", + "3 1 2560 0.002080 206.486206 2.993310\n", + "4 1 3072 0.002941 210.291397 2.333767\n", + ".. ... ... ... ... ...\n", + "151 8 18432 0.012830 216.637466 2.120074\n", + "152 8 18944 0.013551 216.700256 3.428361\n", + "153 8 19456 0.014483 213.789810 2.577091\n", + "154 8 19968 0.015056 216.667892 3.947501\n", + "155 8 20480 0.015662 219.187225 4.835044\n", + "\n", + "[156 rows x 5 columns]" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "mlp_tp_results_dict = defaultdict(list)\n", + "for group in (f\"tp_degree_{n}\" for n in (1, 2, 4, 8)):\n", + " for m in mlp_tp_trial.iter_metrics(group):\n", + " mlp_tp_results_dict[\"tp_degree\"].append(m.metrics[\"tp_degree\"])\n", + " mlp_tp_results_dict[\"d_model\"].append(m.metrics[\"d_model\"])\n", + " mlp_tp_results_dict[\"time_s\"].append(m.metrics[\"time_s\"])\n", + " mlp_tp_results_dict[\"TFLOP/sec/GPU\"].append(m.metrics[\"tflop_s_gpu\"])\n", + " mlp_tp_results_dict[\"TFLOP/sec/GPU_std\"].append(m.metrics[\"tflop_s_gpu_std\"])\n", + "mlp_tp_results_df = pd.DataFrame.from_dict(mlp_tp_results_dict)\n", + "mlp_tp_results_df" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "ed6b17bc-5ddf-470c-9aaa-cfc0529367fb", + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "mlp_tp_plot = sns.lineplot(mlp_tp_results_df,\n", + " x=\"d_model\",\n", + " y=\"TFLOP/sec/GPU\",\n", + " hue=\"tp_degree\",\n", + " palette=[\"blue\", \"orange\", \"purple\",\n", + " \"black\"])\n", + "plt.suptitle(\"MLP TFLOP/sec/GPU, batch_size=1, seq_len=4096\")\n", + "mlp_tp_plot.axhline(y=312, color=\"r\", linestyle=\"--\")\n", + "mlp_tp_plot.figure.savefig(\"mlp_tp.png\", dpi=256, bbox_inches=\"tight\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.2" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/blog/tp/requirements.txt b/blog/tp/requirements.txt new file mode 100644 index 0000000..2de173a --- /dev/null +++ b/blog/tp/requirements.txt @@ -0,0 +1,167 @@ +analytics-python==1.4.post1 +anyio==4.4.0 +appdirs==1.4.4 +appnope==0.1.4 +argcomplete==3.4.0 +argon2-cffi==23.1.0 +argon2-cffi-bindings==21.2.0 +arrow==1.3.0 +asttokens==2.4.1 +async-lru==2.0.4 +attrs==23.2.0 +azure-core==1.30.2 +azure-storage-blob==12.20.0 +Babel==2.15.0 +backoff==1.10.0 +bcrypt==4.1.3 +beautifulsoup4==4.12.3 +bleach==6.1.0 +boto3==1.34.135 +botocore==1.34.135 +cachetools==5.3.3 +certifi==2024.6.2 +cffi==1.16.0 +charset-normalizer==3.3.2 +comm==0.2.2 +contourpy==1.2.1 +cryptography==42.0.8 +cycler==0.12.1 +debugpy==1.8.2 +decorator==5.1.1 +defusedxml==0.7.1 +determined==0.33.0 +docker==7.1.0 +einops==0.8.0 +executing==2.0.1 +fastjsonschema==2.20.0 +filelock==3.14.0 +fonttools==4.53.0 +fqdn==1.5.1 +fsspec==2024.5.0 +gitdb==4.0.11 +GitPython==3.1.43 +google-api-core==2.19.1 +google-api-python-client==2.134.0 +google-auth==2.30.0 +google-auth-httplib2==0.2.0 +google-cloud-core==2.4.1 +google-cloud-storage==2.17.0 +google-crc32c==1.5.0 +google-resumable-media==2.7.1 +googleapis-common-protos==1.63.2 +h11==0.14.0 +httpcore==1.0.5 +httplib2==0.22.0 +httpx==0.27.0 +idna==3.7 +iniconfig==2.0.0 +ipykernel==6.29.4 +ipython==8.24.0 +ipywidgets==8.1.3 +isodate==0.6.1 +isoduration==20.11.0 +jedi==0.19.1 +Jinja2==3.1.4 +jmespath==1.0.1 +json5==0.9.25 +jsonpointer==3.0.0 +jsonschema==4.22.0 +jsonschema-specifications==2023.12.1 +jupyter==1.0.0 +jupyter-console==6.6.3 +jupyter-events==0.10.0 +jupyter-lsp==2.2.5 +jupyter_client==8.6.2 +jupyter_core==5.7.2 +jupyter_server==2.14.1 +jupyter_server_terminals==0.5.3 +jupyterlab==4.2.3 +jupyterlab_pygments==0.3.0 +jupyterlab_server==2.27.2 +jupyterlab_widgets==3.0.11 +kiwisolver==1.4.5 +lomond==0.3.3 +MarkupSafe==2.1.5 +matplotlib==3.9.0 +matplotlib-inline==0.1.7 +mistune==3.0.2 +monotonic==1.6 +mpmath==1.3.0 +nbclient==0.10.0 +nbconvert==7.16.4 +nbformat==5.10.4 +nest-asyncio==1.6.0 +networkx==3.3 +notebook==7.2.1 +notebook_shim==0.2.4 +numpy==2.0.0 +overrides==7.7.0 +packaging==24.0 +pandas==2.2.2 +pandocfilters==1.5.1 +paramiko==3.4.0 +parso==0.8.4 +pathspec==0.12.1 +pexpect==4.9.0 +pillow==10.3.0 +platformdirs==4.2.2 +pluggy==1.5.0 +prometheus_client==0.20.0 +prompt_toolkit==3.0.45 +proto-plus==1.24.0 +protobuf==5.27.2 +psutil==6.0.0 +ptyprocess==0.7.0 +pure-eval==0.2.2 +pyasn1==0.6.0 +pyasn1_modules==0.4.0 +pycparser==2.22 +Pygments==2.18.0 +PyNaCl==1.5.0 +pyOpenSSL==24.1.0 +pyparsing==3.1.2 +pytest==8.2.1 +python-dateutil==2.9.0.post0 +python-json-logger==2.0.7 +pytz==2024.1 +PyYAML==6.0.1 +pyzmq==26.0.3 +qtconsole==5.5.2 +QtPy==2.4.1 +referencing==0.35.1 +requests==2.31.0 +rfc3339-validator==0.1.4 +rfc3986-validator==0.1.1 +rpds-py==0.18.1 +rsa==4.9 +ruamel.yaml==0.18.6 +ruamel.yaml.clib==0.2.8 +s3transfer==0.10.2 +seaborn==0.13.2 +Send2Trash==1.8.3 +setuptools==70.1.1 +six==1.16.0 +smmap==5.0.1 +sniffio==1.3.1 +soupsieve==2.5 +stack-data==0.6.3 +sympy==1.12.1 +tabulate==0.9.0 +termcolor==2.4.0 +terminado==0.18.1 +tinycss2==1.3.0 +torch==2.3.0 +tornado==6.4.1 +tqdm==4.66.4 +traitlets==5.14.3 +types-python-dateutil==2.9.0.20240316 +typing_extensions==4.12.0 +tzdata==2024.1 +uri-template==1.3.0 +uritemplate==4.1.1 +urllib3==2.2.2 +wcwidth==0.2.13 +webcolors==24.6.0 +webencodings==0.5.1 +websocket-client==1.8.0 +widgetsnbextension==4.0.11 diff --git a/blog/tp/test_dot_product_distributed.py b/blog/tp/test_dot_product_distributed.py new file mode 100644 index 0000000..eff58a1 --- /dev/null +++ b/blog/tp/test_dot_product_distributed.py @@ -0,0 +1,65 @@ +""" +A sharded dot-product with intermediate activation function computed over multiple processes. Uses +CPU, the gloo backend, and multi-processing, so that the code can run anywhere. +""" + +import os +from concurrent.futures import ProcessPoolExecutor + +import torch +import torch.distributed as dist + +MASTER_ADDR = "127.0.0.1" +MASTER_PORT = 29500 +WORLD_SIZE = 4 +D_MODEL = 128 + +# Environment variables expected by torch.distributed. +os.environ["MASTER_ADDR"] = MASTER_ADDR +os.environ["MASTER_PORT"] = str(MASTER_PORT) +os.environ["WORLD_SIZE"] = str(WORLD_SIZE) + + +def compute_dot_product(rank: int): + # More torch.distributed env vars. + os.environ["RANK"] = os.environ["LOCAL_RANK"] = str(rank) + + assert ( + not D_MODEL % WORLD_SIZE + ), f"Choose D_MODEL to be divisible by WORLD_SIZE {D_MODEL % WORLD_SIZE=}." + act_fn = torch.nn.GELU() + + # Setup: populate the same tensors on all devices. The full tensors will be used to check + # correctness. + torch.manual_seed(42) + a = torch.randn(D_MODEL) + b = torch.randn(D_MODEL) + + # Each rank uses a different shard for the sharded dot-product + a_sharded = a.reshape(WORLD_SIZE, D_MODEL // WORLD_SIZE)[rank] + b_sharded = b.reshape(WORLD_SIZE, D_MODEL // WORLD_SIZE)[rank] + + # Compute the dot-product via collectives. + + # Each rank first computes their local dot-product using the available shards: + c = a_sharded @ act_fn(b_sharded) + + # The computation is completed by summing over processes: + dist.init_process_group(backend="gloo") + dist.all_reduce(c) + + # Test correctness: + torch.testing.assert_close(c, a @ act_fn(b)) + return f"Correct results on {rank=}" + + +def run(): + with ProcessPoolExecutor(max_workers=WORLD_SIZE) as pool: + ranks_list = [r for r in range(WORLD_SIZE)] + results = pool.map(compute_dot_product, ranks_list) + for r in results: + print(r) + + +if __name__ == "__main__": + run() diff --git a/blog/tp/test_dot_product_local.py b/blog/tp/test_dot_product_local.py new file mode 100644 index 0000000..8d9f43e --- /dev/null +++ b/blog/tp/test_dot_product_local.py @@ -0,0 +1,32 @@ +""" +Demonstrating the equivalence of a basic dot product with intermediate activation function and a +sharded-version of the same calculation. +""" + +import torch + +D_MODEL = 128 +RANKS = 4 + +if __name__ == "__main__": + a = torch.randn(D_MODEL) + b = torch.randn(D_MODEL) + + act_fn = torch.nn.GELU() + # The dot-product, different ways + dot_0 = a @ act_fn(b) + dot_1 = (a * act_fn(b)).sum() + dot_2 = torch.einsum("i, i", a, act_fn(b)) + + a_sharded = a.reshape(RANKS, D_MODEL // RANKS) + b_sharded = b.reshape(RANKS, D_MODEL // RANKS) + + # More equivalent dot-products, using the sharded tensors. + dot_3 = (a_sharded * act_fn(b_sharded)).sum() + dot_4 = (a_sharded @ act_fn(b_sharded).T).trace() + dot_5 = (a_sharded.T @ act_fn(b_sharded)).trace() + dot_6 = torch.einsum("ij, ij", a_sharded, act_fn(b_sharded)) + + for dot_prod in (dot_1, dot_2, dot_3, dot_4, dot_5, dot_6): + torch.testing.assert_close(dot_0, dot_prod) + print("Correct results") diff --git a/blog/tp/test_mlp_tp.py b/blog/tp/test_mlp_tp.py new file mode 100644 index 0000000..cc9ea5b --- /dev/null +++ b/blog/tp/test_mlp_tp.py @@ -0,0 +1,92 @@ +""" +Testing the correctness of the TP implementation. Uses CPU, the gloo backend, and multi-processing, +so that the code can run anywhere. + +""" + +import os +from concurrent.futures import ProcessPoolExecutor + +import torch +import torch.distributed as dist + +import layers + +MASTER_ADDR = "127.0.0.1" +MASTER_PORT = 29500 +WORLD_SIZE = 4 +BATCH_SIZE = 2 +SEQ_LEN = 64 +D_MODEL = 128 + +# Environment variables expected by torch.distributed. +os.environ["MASTER_ADDR"] = MASTER_ADDR +os.environ["MASTER_PORT"] = str(MASTER_PORT) +os.environ["WORLD_SIZE"] = str(WORLD_SIZE) + + +def test_mlp(rank: int): + # More torch.distributed env vars. + os.environ["RANK"] = os.environ["LOCAL_RANK"] = str(rank) + + assert ( + not D_MODEL % WORLD_SIZE + ), f"Choose D_MODEL to be divisible by WORLD_SIZE {D_MODEL % WORLD_SIZE=}." + + # Create two sets of equivalent inputs, both requiring gradients. + torch.manual_seed(42) + inputs = torch.randn(BATCH_SIZE, SEQ_LEN, D_MODEL, requires_grad=True) + inputs_tp = inputs.detach().clone().requires_grad_() + + # Create TP and non-TP MLP layers + dist.init_process_group(backend="gloo") + mlp = layers.MLP(D_MODEL) + mlp_tp = layers.MLPTP(D_MODEL) + + # Give the TP model the same weights as the non-TP model: + with torch.no_grad(): + mlp_tp.lin_0.weight.data = mlp.lin_0.weight.data.tensor_split(WORLD_SIZE, dim=0)[rank] + mlp_tp.lin_0.bias.data = mlp.lin_0.bias.data.tensor_split(WORLD_SIZE, dim=0)[rank] + mlp_tp.lin_1.weight.data = mlp.lin_1.weight.data.tensor_split(WORLD_SIZE, dim=1)[rank] + mlp_tp.lin_1.bias.data = mlp.lin_1.bias.data + + # The outputs should be the same: + outputs = mlp(inputs) + outputs_tp = mlp_tp(inputs_tp) + with torch.no_grad(): + torch.testing.assert_close(outputs, outputs_tp) + + # Perform a backwards pass on a simple loss function. + outputs.pow(2).sum().backward() + outputs_tp.pow(2).sum().backward() + + # Check that the input gradients are the same + with torch.no_grad(): + assert inputs.grad is not None + torch.testing.assert_close(inputs.grad, inputs_tp.grad) + + # And finally check that the parameter gradients are the same: + # Give the TP model the same weights as the non-TP model: + with torch.no_grad(): + mlp_tp.lin_0.weight.grad.data = mlp.lin_0.weight.grad.data.tensor_split(WORLD_SIZE, dim=0)[ + rank + ] + mlp_tp.lin_0.bias.grad.data = mlp.lin_0.bias.grad.data.tensor_split(WORLD_SIZE, dim=0)[rank] + mlp_tp.lin_1.weight.grad.data = mlp.lin_1.weight.grad.data.tensor_split(WORLD_SIZE, dim=1)[ + rank + ] + mlp_tp.lin_1.bias.grad.data = mlp.lin_1.bias.grad.data + + return f"Correct results on {rank=}" + + +def run(): + with ProcessPoolExecutor(max_workers=WORLD_SIZE) as pool: + ranks_list = [r for r in range(WORLD_SIZE)] + results = pool.map(test_mlp, ranks_list) + for r in results: + print(r) + + +if __name__ == "__main__": + run() diff --git a/blog/tp/tp_profiling.py b/blog/tp/tp_profiling.py new file mode 100644 index 0000000..6f8ec73 --- /dev/null +++ b/blog/tp/tp_profiling.py @@ -0,0 +1,163 @@ +import gc +import logging +import os + +import determined as det +import torch +import torch.distributed as dist + +import layers +import utils + +""" +Script for profiling the forward pass of TP MLP layers. Measures the iteration time and computes the +TFLOPs/sec/GPU for all availbable MLP configurations sharded across power-of-two GPUs (including +including the single GPU, non-TP case). + +Only intended for single-node use. +""" + + +def profile_and_report( + core_context: det.core.Context, + batch_size: int, + seq_len: int, + d_model: int, + num_repeats: int, + num_warmups: int, + device: torch.device, + rank: int, + tp_degree: int, + pg_dict: dict[int, dist.ProcessGroup], + dtype: torch.dtype = torch.bfloat16, +) -> None: + # This rank doesn't participate if it's not in the TP group. + if rank >= tp_degree: + return + + inputs = torch.randn(batch_size, seq_len, d_model, device=device, dtype=dtype) + if tp_degree == 1: + mlp = layers.MLP(d_model=d_model, dtype=dtype, device=device) + else: + mlp = layers.MLPTP(d_model=d_model, dtype=dtype, device=device, group=pg_dict[tp_degree]) + + # Use CUDA events for accurate timing. + timer = utils.CUDAEventTimer() + + # Warmups + for _ in range(num_warmups): + mlp(inputs) + + # Timed region. + for _ in range(num_repeats): + with timer: + mlp(inputs) + + # Mean and std TFLOP computations + mlp_flops = 16 * batch_size * seq_len * d_model**2 + + time_s_t = torch.tensor(timer.time_s_list, device=device) + # Use the worst-reported times across GPUs as the true timing metrics, if applicable. + if tp_degree > 1: + time_s_t = time_s_t.to(device) + dist.all_reduce(time_s_t, group=pg_dict[tp_degree], op=dist.ReduceOp.MAX) + time_s_t = time_s_t.cpu() + tflop_s_gpu_t = mlp_flops / time_s_t / 1e12 / tp_degree + metrics = { + "d_model": d_model, + "seq_len": seq_len, + "batch_size": batch_size, + "tp_degree": tp_degree, + "time_s": timer.time_s_mean, + "time_s_std": timer.time_s_std, + "tflop_s_gpu": tflop_s_gpu_t.mean().item(), + "tflop_s_gpu_std": tflop_s_gpu_t.std().item(), + } + + # report metrics on rank zero. Use d_model as the x-axis for plotting purposes. + if not rank: + core_context.train.report_metrics( + group=f"tp_degree_{tp_degree}", steps_completed=d_model, metrics=metrics + ) + + # Memory management + del mlp + del inputs + gc.collect() + torch.cuda.empty_cache() + + +def main( + core_context: det.core.Context, + batch_size: int, + seq_len: int, + d_model_min: int, + d_model_max: int, + d_model_step: int, + num_repeats: int, + num_warmups: int, +) -> None: + world_size = int(os.environ["WORLD_SIZE"]) + rank = int(os.environ["RANK"]) + device = torch.device(f"cuda:{rank}") + torch.cuda.set_device(device) + + # Profile every possible power-of-2 TP group size + def is_power_of_two(n: int) -> bool: + return n & (n - 1) == 0 + + tp_degrees = [n for n in range(1, world_size + 1) if is_power_of_two(n)] + # Create the non-trivial process groups + pg_dict = { + tp_degree: dist.new_group(list(range(tp_degree)), backend="nccl") + for tp_degree in tp_degrees + if tp_degree > 1 + } + + for tp_degree in tp_degrees: + for d_model in range(d_model_min, d_model_max + 1, d_model_step): + dist.barrier() + torch.cuda.synchronize() + profile_and_report( + core_context=core_context, + batch_size=batch_size, + seq_len=seq_len, + d_model=d_model, + num_repeats=num_repeats, + num_warmups=num_warmups, + rank=rank, + tp_degree=tp_degree, + pg_dict=pg_dict, + device=device, + ) + + +if __name__ == "__main__": + info = det.get_cluster_info() + assert info, "This script must run on a determined cluster." + hparams = info.trial.hparams + + # Set up determined's distributed code, if needed + try: + distributed = det.core.DistributedContext.from_torch_distributed() + dist.init_process_group("nccl") + except KeyError: + distributed = None + + try: + with det.core.init(distributed=distributed) as core_context: + logging.basicConfig(level=logging.INFO, format=det.LOG_FORMAT) + + main( + core_context=core_context, + batch_size=hparams["batch_size"], + seq_len=hparams["seq_len"], + d_model_min=hparams["d_model_min"], + d_model_max=hparams["d_model_max"], + d_model_step=hparams["d_model_step"], + num_repeats=hparams["num_repeats"], + num_warmups=hparams["num_warmups"], + ) + finally: + if distributed is not None: + dist.destroy_process_group() diff --git a/blog/tp/tp_profiling.yaml b/blog/tp/tp_profiling.yaml new file mode 100644 index 0000000..eb3c29a --- /dev/null +++ b/blog/tp/tp_profiling.yaml @@ -0,0 +1,22 @@ +name: MLP TP Profiling +# Adjust the workspace and project names, as appropriate. +workspace: TP Blog Post +project: MLP TP Profiling +resources: + slots_per_trial: 8 +searcher: + name: single + metric: not_used + max_length: 1 +hyperparameters: + batch_size: 1 + seq_len: 4096 + d_model_min: 1024 + d_model_max: 20480 + d_model_step: 512 + num_warmups: 5 + num_repeats: 100 +entrypoint: >- + python3 -m determined.launch.torch_distributed + python3 tp_profiling.py +max_restarts: 0 diff --git a/blog/tp/utils.py b/blog/tp/utils.py new file mode 100644 index 0000000..77e42d6 --- /dev/null +++ b/blog/tp/utils.py @@ -0,0 +1,68 @@ +import torch + + +class CUDAEventTimer: + """ + Helper class for timing CUDA operations. + + Example usage: + + ```python + # Time with `start` and `stop` methods: + + timer = CUDAEventTimer() + for iteration in range(repeats): + timer.start() + # Do some computation here + timer.stop() + time_list_s = timer.time_list_s # List of each iteration's duration in seconds + time_s_mean= timer.mean_time_s + + + # Or use as a context manager: + timer = CUDAEventTimer() + with timer: + # Do some computation here + elapsed_time_s = timer.total_time_s + ``` + """ + + def __init__(self) -> None: + self._start_events: list[torch.cuda.Event] = [] + self._end_events: list[torch.cuda.Event] = [] + + @property + def time_s_list(self) -> list[float]: + # https://discuss.pytorch.org/t/how-to-measure-time-in-pytorch/26964/11 + torch.cuda.synchronize() + time_list_s = [ + s.elapsed_time(e) / 1e3 for s, e in zip(self._start_events, self._end_events) + ] + return time_list_s + + @property + def time_s_total(self) -> float: + total_time_s = sum(self.time_s_list) + return total_time_s + + @property + def time_s_mean(self) -> float: + return self.time_s_total / len(self._start_events) + + @property + def time_s_std(self) -> float: + return torch.tensor(self.time_s_list).std().item() + + def start(self) -> None: + self._start_events.append(torch.cuda.Event(enable_timing=True)) + self._end_events.append(torch.cuda.Event(enable_timing=True)) + self._start_events[-1].record() + + def stop(self) -> None: + self._end_events[-1].record() + + def __enter__(self) -> None: + self.start() + + def __exit__(self, *args, **kwargs) -> None: + self.stop()