|
- # Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
-
- import json
- import os
- import tempfile
- import unittest
-
- import numpy as np
-
- import paddle
- import paddle.nn.functional as F
- from paddle import base, nn, static, utils
- from paddle.base import core
- from paddle.distributed import fleet
- from paddle.distributed.auto_parallel.static.cluster import Cluster
- from paddle.distributed.auto_parallel.static.completion import Completer
- from paddle.distributed.auto_parallel.static.dist_context import (
- DistributedContext,
- )
- from paddle.distributed.auto_parallel.static.mapper import (
- get_comm_volume,
- get_dtype_bytes,
- mapping,
- )
- from paddle.distributed.auto_parallel.static.parallelizer import (
- AutoParallelizer,
- )
- from paddle.distributed.auto_parallel.static.partitioner import Partitioner
- from paddle.distributed.auto_parallel.static.reshard import Resharder
- from paddle.distributed.fleet import auto
-
- if os.getenv("CUDA_VISIBLE_DEVICES") is not None:
- os.environ["CUDA_VISIBLE_DEVICES"] = ""
-
- paddle.enable_static()
- _global_parallel_strategy = None
- _global_process_mesh = None
- _global_num_stages = None
-
- cluster_json = """
- {
- "machines": [
- {
- "hostname": "machine0",
- "addr": "0.0.0.1",
- "port": "768",
- "devices": [
- {
- "global_id": 0,
- "local_id": 0,
- "type": "GPU",
- "model": "A100-SXM4-40GB",
- "sp_gflops": 19500,
- "dp_gflops": 9700,
- "memory": 40
- },
- {
- "global_id": 1,
- "local_id": 1,
- "type": "GPU",
- "model": "A100-SXM4-40GB",
- "sp_gflops": 19500,
- "dp_gflops": 9700,
- "memory": 40
- },
- {
- "global_id": 2,
- "local_id": 2,
- "type": "GPU",
- "model": "A100-SXM4-40GB",
- "sp_gflops": 19500,
- "dp_gflops": 9700,
- "memory": 40
- },
- {
- "global_id": 3,
- "local_id": 3,
- "type": "GPU",
- "model": "A100-SXM4-40GB",
- "sp_gflops": 19500,
- "dp_gflops": 9700,
- "memory": 40
- },
- {
- "global_id": 4,
- "local_id": 0,
- "type": "NIC"
- }
- ],
- "links": [
- {
- "source_global_id": 0,
- "target_global_id": 1,
- "type": "NVL",
- "bandwidth": 42
- },
- {
- "source_global_id": 0,
- "target_global_id": 2,
- "type": "NVL",
- "bandwidth": 42
- },
- {
- "source_global_id": 0,
- "target_global_id": 3,
- "type": "NVL",
- "bandwidth": 42
- },
- {
- "source_global_id": 0,
- "target_global_id": 4,
- "type": "PHB",
- "bandwidth": 12
- },
- {
- "source_global_id": 1,
- "target_global_id": 0,
- "type": "NVL",
- "bandwidth": 42
- },
- {
- "source_global_id": 1,
- "target_global_id": 2,
- "type": "NVL",
- "bandwidth": 42
- },
- {
- "source_global_id": 1,
- "target_global_id": 3,
- "type": "NVL",
- "bandwidth": 42
- },
- {
- "source_global_id": 1,
- "target_global_id": 4,
- "type": "PHB",
- "bandwidth": 12
- },
- {
- "source_global_id": 2,
- "target_global_id": 0,
- "type": "NVL",
- "bandwidth": 42
- },
- {
- "source_global_id": 2,
- "target_global_id": 1,
- "type": "NVL",
- "bandwidth": 42
- },
- {
- "source_global_id": 2,
- "target_global_id": 3,
- "type": "NVL",
- "bandwidth": 42
- },
- {
- "source_global_id": 2,
- "target_global_id": 4,
- "type": "PHB",
- "bandwidth": 12
- },
- {
- "source_global_id": 3,
- "target_global_id": 0,
- "type": "NVL",
- "bandwidth": 42
- },
- {
- "source_global_id": 3,
- "target_global_id": 1,
- "type": "NVL",
- "bandwidth": 42
- },
- {
- "source_global_id": 3,
- "target_global_id": 2,
- "type": "NVL",
- "bandwidth": 42
- },
- {
- "source_global_id": 3,
- "target_global_id": 4,
- "type": "PHB",
- "bandwidth": 12
- },
- {
- "source_global_id": 4,
- "target_global_id": 9,
- "type": "NET",
- "bandwidth": 1
- }
- ]
- },
- {
- "hostname": "machine1",
- "addr": "0.0.0.2",
- "port": "768",
- "devices": [
- {
- "global_id": 5,
- "local_id": 0,
- "type": "GPU",
- "model": "Tesla V100-SXM2-32GB",
- "sp_gflops": 15700,
- "dp_gflops": 7800,
- "memory": 32
- },
- {
- "global_id": 6,
- "local_id": 1,
- "type": "GPU",
- "model": "Tesla V100-SXM2-32GB",
- "sp_gflops": 15700,
- "dp_gflops": 7800,
- "memory": 32
- },
- {
- "global_id": 7,
- "local_id": 2,
- "type": "GPU",
- "model": "Tesla V100-SXM2-32GB",
- "sp_gflops": 15700,
- "dp_gflops": 7800,
- "memory": 32
- },
- {
- "global_id": 8,
- "local_id": 3,
- "type": "GPU",
- "model": "Tesla V100-SXM2-32GB",
- "sp_gflops": 15700,
- "dp_gflops": 7800,
- "memory": 32
- },
- {
- "global_id": 9,
- "local_id": 0,
- "type": "NIC"
- }
- ],
- "links": [
- {
- "source_global_id": 5,
- "target_global_id": 6,
- "type": "NVL",
- "bandwidth": 42
- },
- {
- "source_global_id": 5,
- "target_global_id": 7,
- "type": "NVL",
- "bandwidth": 42
- },
- {
- "source_global_id": 5,
- "target_global_id": 8,
- "type": "NVL",
- "bandwidth": 42
- },
- {
- "source_global_id": 5,
- "target_global_id": 9,
- "type": "PHB",
- "bandwidth": 12
- },
- {
- "source_global_id": 6,
- "target_global_id": 5,
- "type": "NVL",
- "bandwidth": 42
- },
- {
- "source_global_id": 6,
- "target_global_id": 7,
- "type": "NVL",
- "bandwidth": 42
- },
- {
- "source_global_id": 6,
- "target_global_id": 8,
- "type": "NVL",
- "bandwidth": 42
- },
- {
- "source_global_id": 6,
- "target_global_id": 9,
- "type": "PHB",
- "bandwidth": 12
- },
- {
- "source_global_id": 7,
- "target_global_id": 5,
- "type": "NVL",
- "bandwidth": 42
- },
- {
- "source_global_id": 7,
- "target_global_id": 6,
- "type": "NVL",
- "bandwidth": 42
- },
- {
- "source_global_id": 7,
- "target_global_id": 8,
- "type": "NVL",
- "bandwidth": 42
- },
- {
- "source_global_id": 7,
- "target_global_id": 9,
- "type": "PHB",
- "bandwidth": 12
- },
- {
- "source_global_id": 8,
- "target_global_id": 5,
- "type": "NVL",
- "bandwidth": 42
- },
- {
- "source_global_id": 8,
- "target_global_id": 6,
- "type": "NVL",
- "bandwidth": 42
- },
- {
- "source_global_id": 8,
- "target_global_id": 7,
- "type": "NVL",
- "bandwidth": 42
- },
- {
- "source_global_id": 8,
- "target_global_id": 9,
- "type": "PHB",
- "bandwidth": 12
- },
- {
- "source_global_id": 9,
- "target_global_id": 4,
- "type": "NET",
- "bandwidth": 1
- }
- ]
- }
- ]
- }
- """
-
-
- class MLPLayer(nn.Layer):
- def __init__(
- self, hidden_size=64, intermediate_size=4 * 64, initializer_range=0.02
- ):
- super().__init__()
- d_model = hidden_size
- dim_feedforward = intermediate_size
- np.random.seed(2021)
- arr0 = np.random.normal(0, 0.02, size=(d_model, dim_feedforward))
- arr1 = np.random.normal(0, 0.02, size=(dim_feedforward, d_model))
- arr2 = np.random.normal(0, 0.02, size=(d_model, dim_feedforward))
- arr3 = np.random.normal(0, 0.02, size=(dim_feedforward, d_model))
- weight_attr0 = paddle.ParamAttr(
- initializer=paddle.nn.initializer.Assign(arr0)
- )
- weight_attr1 = paddle.ParamAttr(
- initializer=paddle.nn.initializer.Assign(arr1)
- )
- weight_attr2 = paddle.ParamAttr(
- initializer=paddle.nn.initializer.Assign(arr2)
- )
- weight_attr3 = paddle.ParamAttr(
- initializer=paddle.nn.initializer.Assign(arr3)
- )
- bias_attr = None
- self.linear0 = nn.Linear(
- d_model, dim_feedforward, weight_attr0, bias_attr=bias_attr
- )
- self.linear1 = nn.Linear(
- dim_feedforward, d_model, weight_attr1, bias_attr=bias_attr
- )
- self.norm = nn.LayerNorm(d_model, epsilon=1e-5)
- self.linear2 = nn.Linear(
- d_model, dim_feedforward, weight_attr2, bias_attr=bias_attr
- )
- self.linear3 = nn.Linear(
- dim_feedforward, d_model, weight_attr3, bias_attr=bias_attr
- )
-
- def forward(self, input):
- if _global_parallel_strategy == "dp_mp_pp":
- auto.shard_tensor(
- self.linear0.weight, _global_process_mesh[0], [None, "y"]
- )
-
- auto.shard_tensor(
- self.linear1.weight, _global_process_mesh[0], ["y", None]
- )
-
- auto.shard_tensor(
- self.linear2.weight, _global_process_mesh[1], [None, "y"]
- )
-
- auto.shard_tensor(
- self.linear3.weight, _global_process_mesh[1], ["y", None]
- )
-
- out = self.norm(input)
- out = self.linear0(out)
- out = F.gelu(out, approximate=True)
- out = self.linear1(out)
-
- auto.shard_tensor(out, _global_process_mesh[1], ["x", None])
-
- out = self.linear2(out)
- out = F.gelu(out, approximate=True)
- out = self.linear3(out)
- return out
-
-
- def mlp_forward(train_program, start_program):
- with static.program_guard(
- train_program, start_program
- ), utils.unique_name.guard():
- batch_size = 4
- hidden_size = 64
- input = static.data(
- name="input", shape=[batch_size, hidden_size], dtype='float32'
- )
- label = static.data(
- name="label", shape=[batch_size, 1], dtype='float32'
- )
-
- if _global_parallel_strategy == "dp_mp_pp":
- auto.shard_tensor(input, _global_process_mesh[0], ["x", None])
- mlp = MLPLayer(
- hidden_size=hidden_size,
- intermediate_size=4 * hidden_size,
- initializer_range=0.02,
- )
- predict = mlp(input)
- error_cost = paddle.nn.functional.square_error_cost(predict, label)
- loss = paddle.mean(error_cost)
- return loss, train_program, start_program
-
-
- def get_dist_prog(train_program, startup_program, dist_context, rank_id):
- loss, train_program, startup_program = mlp_forward(
- train_program, startup_program
- )
-
- fleet._user_defined_strategy = fleet.DistributedStrategy()
- fleet.user_defined_optimizer = paddle.optimizer.Adam()
- parallelizer = AutoParallelizer(fleet)
- parallelizer._dist_context = dist_context
-
- # auto completion
- completer = Completer(dist_context)
- complete_train_program = completer.complete_forward_annotation(
- train_program
- )
- dist_context.block_state.parse_forward_blocks(complete_train_program)
- params_grads = parallelizer._generate_backward(
- complete_train_program,
- startup_program,
- loss,
- parameter_list=None,
- no_grad_set=None,
- callbacks=None,
- )
-
- partitioner = Partitioner(dist_context, rank_id)
- (
- dist_train_program,
- dist_startup_prog,
- dist_params_grads,
- ) = partitioner.partition(
- complete_train_program, startup_program, params_grads
- )
-
- partitioned_optimize_ops = parallelizer._apply_optimize(
- dist_train_program, dist_startup_prog, dist_params_grads
- )
-
- resharder = Resharder(
- dist_train_program,
- dist_startup_prog,
- rank_id,
- dist_context,
- dist_params_grads,
- )
- resharder.reshard()
- return dist_train_program, dist_startup_prog
-
-
- def is_in_machine(device_local_id, machine):
- for device in machine.devices.values():
- if device_local_id == device.local_id:
- return True
- return False
-
-
- def get_device_local_ids(machine):
- local_ids = []
- for device in machine.devices.values():
- local_ids.append[device.local_id]
- return local_ids
-
-
- class TestAutoParallelMapper(unittest.TestCase):
- def setUp(self):
- self.temp_dir = tempfile.TemporaryDirectory()
-
- def tearDown(self):
- self.temp_dir.cleanup()
-
- def test_mapper_dp_mp_pp(self):
- cluster_json_path = os.path.join(
- self.temp_dir.name, "auto_parallel_cluster.json"
- )
- cluster_json_object = json.loads(cluster_json)
- with open(cluster_json_path, "w") as cluster_json_file:
- json.dump(cluster_json_object, cluster_json_file)
- cluster = Cluster()
- cluster.build_from_file(cluster_json_path)
-
- global _global_parallel_strategy
- _global_parallel_strategy = "dp_mp_pp"
- global _global_num_stages
- _global_num_stages = 2
- global _global_process_mesh
- _global_process_mesh = [
- auto.ProcessMesh([[0, 1], [2, 3]], dim_names=["x", "y"]),
- auto.ProcessMesh([[4, 5], [6, 7]], dim_names=["x", "y"]),
- ]
- processes = [0, 1, 2, 3, 4, 5, 6, 7]
-
- dist_programs = {}
- for rank_id in processes:
- train_program = static.Program()
- startup_program = static.Program()
- dist_context = DistributedContext()
- dist_train_program, dist_startup_prog = get_dist_prog(
- train_program, startup_program, dist_context, rank_id
- )
- # if rank_id == 0:
- # print_program_with_dist_attr(dist_train_program, dist_context)
- dist_programs[rank_id] = [dist_train_program, None]
-
- rank_mapping = mapping(dist_programs, cluster)
-
- all_mapped_ranks = set()
- for machine_id, machine_mapping in rank_mapping.items():
- machine = cluster.machines[machine_id]
- machine_mapped_ranks = set()
- machine_mapped_device_local_ids = set()
- for rank, device_ids in machine_mapping["ranks"].items():
- # Only allow one process to one device mapping
- self.assertEqual(len(device_ids), 1)
- self.assertTrue(is_in_machine(device_ids[0], machine))
- machine_mapped_ranks.add(rank)
- machine_mapped_device_local_ids.add(device_ids[0])
- self.assertEqual(
- len(machine_mapped_ranks), len(machine_mapped_device_local_ids)
- )
- all_mapped_ranks.update(machine_mapped_ranks)
- self.assertEqual(set(processes), all_mapped_ranks)
-
- def test_mapper_misc(self):
- self.assertEqual(get_dtype_bytes(paddle.float64), 8)
- self.assertEqual(get_dtype_bytes(paddle.float32), 4)
- self.assertEqual(get_dtype_bytes(paddle.float16), 2)
- self.assertEqual(get_dtype_bytes(paddle.bfloat16), 2)
- self.assertEqual(get_dtype_bytes(paddle.int64), 8)
- self.assertEqual(get_dtype_bytes(paddle.int32), 4)
- self.assertEqual(get_dtype_bytes(paddle.int16), 2)
- self.assertEqual(get_dtype_bytes(paddle.int8), 1)
- self.assertEqual(get_dtype_bytes(paddle.uint8), 1)
- self.assertRaises(ValueError, get_dtype_bytes, "unknown type")
- train_program = static.Program()
- startup_program = static.Program()
- ring_id = 0
- root_id = 0
- nranks = 2
- with base.program_guard(train_program, startup_program):
- input = paddle.static.data(
- name="input", shape=[-1, 10, 10], dtype='float32'
- )
- output = train_program.current_block().create_var(
- name="outofbroadcast",
- dtype='float32',
- type=core.VarDesc.VarType.LOD_TENSOR,
- persistable=False,
- stop_gradient=False,
- )
- broadcast_op = train_program.global_block().append_op(
- type="c_broadcast",
- inputs={'X': input},
- attrs={'ring_id': ring_id, 'root': root_id},
- outputs={'Out': output},
- )
- self.assertEqual(get_comm_volume(broadcast_op, 0, 1), 400)
- self.assertIsNone(get_comm_volume(broadcast_op, 1, 0))
- allgather_op = train_program.global_block().append_op(
- type="c_allgather",
- inputs={'X': input},
- attrs={'ring_id': ring_id, 'nranks': nranks},
- outputs={'Out': output},
- )
- self.assertEqual(get_comm_volume(allgather_op, 0, 1), 400)
- self.assertIsNone(get_comm_volume(allgather_op, 0, 0))
- reduce_op = train_program.global_block().append_op(
- type="c_reduce_sum",
- inputs={'X': input},
- attrs={'ring_id': ring_id, 'root_id': root_id},
- outputs={'Out': output},
- )
- self.assertIsNone(get_comm_volume(reduce_op, 0, 1))
- self.assertEqual(get_comm_volume(reduce_op, 1, 0), 400)
- cast_op = train_program.global_block().append_op(
- type="cast",
- inputs={"X": input},
- outputs={"Out": output},
- attrs={
- "in_dtype": base.core.VarDesc.VarType.FP32,
- "out_dtype": base.core.VarDesc.VarType.FP32,
- },
- )
- self.assertRaises(ValueError, get_comm_volume, cast_op, 0, 1)
-
-
- if __name__ == '__main__':
- unittest.main()
|