Overview

The Engine ML python library is built on top of Horovod, MPI, and NCCL. It makes it easy to transition from under-the-desk training to large-scale distributed training.

$ pip install --upgrade -i https://pypi.app.engineml.com/simple engineml

Terminology

TermDefinition
AllreduceA method for reducing the values and distributing the results to all replicas. The reduce operation is generally average and the values are gradients.
NCCLA library of standard collective GPU-to-GPU communication routines such as allreduce
MPIA library containing standardized means of exchanging messages between multiple nodes and having routines such as allreduce
NodeA machine (with some number of GPUs) that runs your experiment
RankThe process number across all nodes. An integer in [0, number of nodes]
ReplicaA copy of the model and code. There is one replica per rank and each replica runs as a separate process. Processes communicate over MPI or NCCL.

Design Decisions

Sane Default Values

All calls to the Engine ML python library will return sane defaults when running locally. This prevents extra code branching that can cause hard to find bugs.

import engineml.tensorflow as eml
init_op = eml.session.init_op()
# Returns a tf.no_op() when running locally or
# an op to synchronize replicas when running in Engine ML
sess.run(init_op)
tip

If you do need separate logic for running on Engine ML, you can always use eml.is_engine_runtime()

Minimally Invasive

There are only a handful of steps required to scale your model from single-node to large-scale distributed training. The code for building a deep model is already very complex. We don't want to add to that complexity.

Below you will walk through the modifications required train your model on multiple machines.

Data Set Partitioning

Each replica has read-only access to data mounted at /engine/data. This directory contains the data from the fields dataBucket and dataBucketSubdirectory specified in engine.yaml. In order to prevent replicas from training on the exact same example, it is best to partition the data set into chunks.

eml.data.distribute accepts a list of items to split into chunks and returns a stable subset of that list for each replica. It is best to shuffle before this operation during training so that each replica has uniform distribution of training examples.

eml.data.distribute can give uneven chunk size across replicas. If you require even chunk sizes, use eml.data.even_distribute. This is useful when each replica must have the same number of batches per epoch. eml.data.even_distribute works by sampling random elements from the chunk until all chunks have the same size. Therefore, it is not recommended to use on your validation set.

Data Partition

info

Engine ML caches data per-machine to make reads faster. Reshuffling data can invalidate this cache, so we recommend only calling eml.data.distribute once.

warning

Metadata operations on large directories (e.g. os.listdir, os.scandir, and glob.glob) can very slow!

Scale the Learning Rate

When training your model across multiple replicas, you are scaling the effective batch size. In order to compensate for this larger batch size, it is a best practice to also scale your optimizer's learning rate.

Linear Scaling Rule: When the minibatch size is multiplied by k, multiply the learning rate by k.

Source: Accurate, Large Minibatch SGD: Training ImageNet in 1 Hour

Synchronize Model Replicas

Since Engine ML runs each replica as a separate process, we need to synchronize the randomly initialized weights before training

Sync Weights

Enable CUDA IPC

By default, TensorFlow greedily uses all GPU memory across all GPUs on a node. A common workaround for this is setting the environment variable CUDA_VISIBLE_DEVICES. Many PyTorch examples also recommend this method of selecting a GPU. This has the unintended consequence of disabling CUDA IPC. CUDA IPC is required for fast distributed training.

Engine ML provides two functions that enable IPC while disabling greedy GPU memory management:

  1. For TensorFlow use eml.session.distribute_config()
  2. For PyTorch use eml.session.init_devices()