-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Redirect dist_pipeline and pipeline_tutorials tutorials to pipelining_tutorial #3146
Merged
Merged
Changes from 1 commit
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
99d75ce
Redirect dist_pipeline and pipelinining tutorials
svekars e8cfda8
Undo pipelining tutorial
svekars 0eab6ee
Update link
svekars 3d002c6
Redirect pipeline_tutorial to pipelining_tutorial
svekars e61d5f6
Update intermediate_source/pipeline_tutorial.rst
svekars b1f2b90
Merge branch 'main' into add-redirect-dist-tutorial
svekars File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
Distributed Pipeline Parallelism Using RPC | ||
========================================== | ||
|
||
This tutorial has been deprecated. | ||
|
||
Redirecting in 3 seconds... | ||
|
||
.. raw:: html | ||
|
||
<meta http-equiv="Refresh" content="3; url='https://pytorch.org/tutorials/index.html'" /> | ||
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,236 +1,11 @@ | ||
Introduction to Distributed Pipeline Parallelism | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is up-to-date. We can keep this. |
||
================================================ | ||
**Authors**: `Howard Huang <https://github.com/H-Huang>`_ | ||
|
||
.. note:: | ||
|edit| View and edit this tutorial in `github <https://github.com/pytorch/tutorials/blob/main/intermediate_source/pipelining_tutorial.rst>`__. | ||
This tutorial has been deprecated. | ||
|
||
This tutorial uses a gpt-style transformer model to demonstrate implementing distributed | ||
pipeline parallelism with `torch.distributed.pipelining <https://pytorch.org/docs/main/distributed.pipelining.html>`__ | ||
APIs. | ||
Redirecting in 3 seconds... | ||
|
||
.. grid:: 2 | ||
.. raw:: html | ||
|
||
.. grid-item-card:: :octicon:`mortar-board;1em;` What you will learn | ||
:class-card: card-prerequisites | ||
<meta http-equiv="Refresh" content="3; url='https://pytorch.org/tutorials/index.html'" /> | ||
|
||
* How to use ``torch.distributed.pipelining`` APIs | ||
* How to apply pipeline parallelism to a transformer model | ||
* How to utilize different schedules on a set of microbatches | ||
|
||
|
||
.. grid-item-card:: :octicon:`list-unordered;1em;` Prerequisites | ||
:class-card: card-prerequisites | ||
|
||
* Familiarity with `basic distributed training <https://pytorch.org/tutorials/beginner/dist_overview.html>`__ in PyTorch | ||
|
||
Setup | ||
----- | ||
|
||
With ``torch.distributed.pipelining`` we will be partitioning the execution of a model and scheduling computation on micro-batches. We will be using a simplified version | ||
of a transformer decoder model. The model architecture is for educational purposes and has multiple transformer decoder layers as we want to demonstrate how to split the model into different | ||
chunks. First, let us define the model: | ||
|
||
.. code:: python | ||
|
||
import torch | ||
import torch.nn as nn | ||
from dataclasses import dataclass | ||
|
||
@dataclass | ||
class ModelArgs: | ||
dim: int = 512 | ||
n_layers: int = 8 | ||
n_heads: int = 8 | ||
vocab_size: int = 10000 | ||
|
||
class Transformer(nn.Module): | ||
def __init__(self, model_args: ModelArgs): | ||
super().__init__() | ||
|
||
self.tok_embeddings = nn.Embedding(model_args.vocab_size, model_args.dim) | ||
|
||
# Using a ModuleDict lets us delete layers witout affecting names, | ||
# ensuring checkpoints will correctly save and load. | ||
self.layers = torch.nn.ModuleDict() | ||
for layer_id in range(model_args.n_layers): | ||
self.layers[str(layer_id)] = nn.TransformerDecoderLayer(model_args.dim, model_args.n_heads) | ||
|
||
self.norm = nn.LayerNorm(model_args.dim) | ||
self.output = nn.Linear(model_args.dim, model_args.vocab_size) | ||
|
||
def forward(self, tokens: torch.Tensor): | ||
# Handling layers being 'None' at runtime enables easy pipeline splitting | ||
h = self.tok_embeddings(tokens) if self.tok_embeddings else tokens | ||
|
||
for layer in self.layers.values(): | ||
h = layer(h, h) | ||
|
||
h = self.norm(h) if self.norm else h | ||
output = self.output(h).float() if self.output else h | ||
return output | ||
|
||
Then, we need to import the necessary libraries in our script and initialize the distributed training process. In this case, we are defining some global variables to use | ||
later in the script: | ||
|
||
.. code:: python | ||
|
||
import os | ||
import torch.distributed as dist | ||
from torch.distributed.pipelining import pipeline, SplitPoint, PipelineStage, ScheduleGPipe | ||
|
||
global rank, device, pp_group, stage_index, num_stages | ||
def init_distributed(): | ||
global rank, device, pp_group, stage_index, num_stages | ||
rank = int(os.environ["LOCAL_RANK"]) | ||
world_size = int(os.environ["WORLD_SIZE"]) | ||
device = torch.device(f"cuda:{rank}") if torch.cuda.is_available() else torch.device("cpu") | ||
dist.init_process_group() | ||
|
||
# This group can be a sub-group in the N-D parallel case | ||
pp_group = dist.new_group() | ||
stage_index = rank | ||
num_stages = world_size | ||
|
||
The ``rank``, ``world_size``, and ``init_process_group()`` code should seem familiar to you as those are commonly used in | ||
all distributed programs. The globals specific to pipeline parallelism include ``pp_group`` which is the process | ||
group that will be used for send/recv communications, ``stage_index`` which, in this example, is a single rank | ||
per stage so the index is equivalent to the rank, and ``num_stages`` which is equivalent to world_size. | ||
|
||
The ``num_stages`` is used to set the number of stages that will be used in the pipeline parallelism schedule. For example, | ||
for ``num_stages=4``, a microbatch will need to go through 4 forwards and 4 backwards before it is completed. The ``stage_index`` | ||
is necessary for the framework to know how to communicate between stages. For example, for the first stage (``stage_index=0``), it will | ||
use data from the dataloader and does not need to receive data from any previous peers to perform its computation. | ||
|
||
|
||
Step 1: Partition the Transformer Model | ||
--------------------------------------- | ||
|
||
There are two different ways of partitioning the model: | ||
|
||
First is the manual mode in which we can manually create two instances of the model by deleting portions of | ||
attributes of the model. In this example for a 2 stage (2 ranks) the model is cut in half. | ||
|
||
.. code:: python | ||
|
||
def manual_model_split(model, example_input_microbatch, model_args) -> PipelineStage: | ||
if stage_index == 0: | ||
# prepare the first stage model | ||
for i in range(4, 8): | ||
del model.layers[str(i)] | ||
model.norm = None | ||
model.output = None | ||
stage_input_microbatch = example_input_microbatch | ||
|
||
elif stage_index == 1: | ||
# prepare the second stage model | ||
for i in range(4): | ||
del model.layers[str(i)] | ||
model.tok_embeddings = None | ||
stage_input_microbatch = torch.randn(example_input_microbatch.shape[0], example_input_microbatch.shape[1], model_args.dim) | ||
|
||
stage = PipelineStage( | ||
model, | ||
stage_index, | ||
num_stages, | ||
device, | ||
input_args=stage_input_microbatch, | ||
) | ||
return stage | ||
|
||
As we can see the first stage does not have the layer norm or the output layer, and it only includes the first four transformer blocks. | ||
The second stage does not have the input embedding layers, but includes the output layers and the final four transformer blocks. The function | ||
then returns the ``PipelineStage`` for the current rank. | ||
|
||
The second method is the tracer-based mode which automatically splits the model based on a ``split_spec`` argument. Using the pipeline specification, we can instruct | ||
``torch.distributed.pipelining`` where to split the model. In the following code block, | ||
we are splitting before the before 4th transformer decoder layer, mirroring the manual split described above. Similarly, | ||
we can retrieve a ``PipelineStage`` by calling ``build_stage`` after this splitting is done. | ||
|
||
.. code:: python | ||
def tracer_model_split(model, example_input_microbatch) -> PipelineStage: | ||
pipe = pipeline( | ||
module=model, | ||
mb_args=(example_input_microbatch,), | ||
split_spec={ | ||
"layers.4": SplitPoint.BEGINNING, | ||
} | ||
) | ||
stage = pipe.build_stage(stage_index, device, pp_group) | ||
return stage | ||
|
||
|
||
Step 2: Define The Main Execution | ||
--------------------------------- | ||
|
||
In the main function we will create a particular pipeline schedule that the stages should follow. ``torch.distributed.pipelining`` | ||
supports multiple schedules including supports multiple schedules, including single-stage-per-rank schedules ``GPipe`` and ``1F1B``, | ||
as well as multiple-stage-per-rank schedules such as ``Interleaved1F1B`` and ``LoopedBFS``. | ||
|
||
.. code:: python | ||
|
||
if __name__ == "__main__": | ||
init_distributed() | ||
num_microbatches = 4 | ||
model_args = ModelArgs() | ||
model = Transformer(model_args) | ||
|
||
# Dummy data | ||
x = torch.ones(32, 500, dtype=torch.long) | ||
y = torch.randint(0, model_args.vocab_size, (32, 500), dtype=torch.long) | ||
example_input_microbatch = x.chunk(num_microbatches)[0] | ||
|
||
# Option 1: Manual model splitting | ||
stage = manual_model_split(model, example_input_microbatch, model_args) | ||
|
||
# Option 2: Tracer model splitting | ||
# stage = tracer_model_split(model, example_input_microbatch) | ||
|
||
x = x.to(device) | ||
y = y.to(device) | ||
|
||
def tokenwise_loss_fn(outputs, targets): | ||
loss_fn = nn.CrossEntropyLoss() | ||
outputs = outputs.view(-1, model_args.vocab_size) | ||
targets = targets.view(-1) | ||
return loss_fn(outputs, targets) | ||
|
||
schedule = ScheduleGPipe(stage, n_microbatches=num_microbatches, loss_fn=tokenwise_loss_fn) | ||
|
||
if rank == 0: | ||
schedule.step(x) | ||
elif rank == 1: | ||
losses = [] | ||
output = schedule.step(target=y, losses=losses) | ||
dist.destroy_process_group() | ||
|
||
In the example above, we are using the manual method to split the model, but the code can be uncommented to also try the | ||
tracer-based model splitting function. In our schedule, we need to pass in the number of microbatches and | ||
the loss function used to evaluate the targets. | ||
|
||
The ``.step()`` function processes the entire minibatch and automatically splits it into microbatches based | ||
on the ``n_microbatches`` passed previously. The microbatches are then operated on according to the schedule class. | ||
In the example above, we are using GPipe, which follows a simple all-forwards and then all-backwards schedule. The output | ||
returned from rank 1 will be the same as if the model was on a single GPU and run with the entire batch. Similarly, | ||
we can pass in a ``losses`` container to store the corresponding losses for each microbatch. | ||
|
||
Step 3: Launch the Distributed Processes | ||
---------------------------------------- | ||
|
||
Finally, we are ready to run the script. We will use ``torchrun`` to create a single host, 2-process job. | ||
Our script is already written in a way rank 0 that performs the required logic for pipeline stage 0, and rank 1 | ||
performs the logic for pipeline stage 1. | ||
|
||
``torchrun --nnodes 1 --nproc_per_node 2 pipelining_tutorial.py`` | ||
|
||
Conclusion | ||
---------- | ||
|
||
In this tutorial, we have learned how to implement distributed pipeline parallelism using PyTorch's ``torch.distributed.pipelining`` APIs. | ||
We explored setting up the environment, defining a transformer model, and partitioning it for distributed training. | ||
We discussed two methods of model partitioning, manual and tracer-based, and demonstrated how to schedule computations on | ||
micro-batches across different stages. Finally, we covered the execution of the pipeline schedule and the launch of distributed | ||
processes using ``torchrun``. | ||
|
||
For a production ready usage of pipeline parallelism as well as composition with other distributed techniques, see also | ||
`TorchTitan end to end example of 3D parallelism <https://github.com/pytorch/torchtitan>`__. |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can redirect users to the new pipeline tutorial. https://pytorch.org/tutorials/intermediate/pipelining_tutorial.html