Requires Pytorch 0.4 or later. Requires StorageClass capable of creating ReadWriteMany persistent volumes.
On GKE you can follow GCFS documentation to enable it.
In this distributed training example we will show how to train a model using DDP in distributed MPI-backend with Openmpi. This example parallelizes the application of the given module by splitting the input across the specified devices by chunking in the batch dimension. The module is replicated on each machine and each device, and each such replica handles a portion of the input. During the backwards pass, gradients from each node are averaged.
This example implements a strong scaling for mnist, which means the global batchsize is fixed no matter how many nodes we use.
See more info about Strong vs Weak Scaling at wiki.
Since this is a strong scaling example, we should perform an average after the all_reduce, which is the same as torch.nn.parallel.DistributedDataParallel.
Deploy the PyTorchJob resource to start training the CPU & GPU models:
cd ks_app
ks env add ${KF_ENV}
ks apply ${KF_ENV} -c train_model_CPU_v1beta1
ks apply ${KF_ENV} -c train_model_GPU_v1beta1
cd ks_app
ks env add ${KF_ENV}
ks apply ${KF_ENV} -c train_model_CPU_v1beta2
ks apply ${KF_ENV} -c train_model_GPU_v1beta2
With the commands above we have created Custom Resource that has been defined and enabled during Kubeflow
installation, namely PyTorchJob
.
If you look at job_mnist_DDP_GPU.yaml few things are worth mentioning.
- We mount our shared persistent disk as /mnt/kubeflow-gcfs for our PyTorchJob, where models will be saved at the end of the epochs
- Allocate one GPU to our container
- Run our PyTorchJob
Each PyTorchJob will run 2 types of Pods.
Master should always have 1 replica. This is main worker which will show us status of overall job and aggregate all the model parameters.
Worker is Pod which will run training. It can have any number of replicas. Refer to Pod definition documentation for details.
There are few things required for this approach to work.
First we need to install openmpi in each worker
- If node is Master, rank=0 - Aggregate all parameters and save model to disk after all epochs complete
- If node is Worker - run feature preparation and parse input dataset for the particular data split/partition
Finally we use torch.nn.parallel.DistributedDataParallel
function to enable distributed training on this model.
This example parallelizes by splitting the input across the specified devices by chunking in the batch dimension.
For that reason we need to prepare a function that will slice input data to batches, which are then run on each worker.
Pytorch offers different ways to implement that, in this particular example we are using torch.utils.data.distributed.DistributedSampler(dataset)
to partition a dataset into different chuncks.
After training is complete, our model can be found in "pytorch/model" PVC.
Next: Serving the Model
Back: Setup