|
- # encoding:utf-8
- """Train and get checkpoint files."""
-
- import os
- import time
- device_id = int(os.getenv('DEVICE_ID', '0'))
- device_num = int(os.getenv('RANK_SIZE', '8'))
- sync_lock = "/tmp/sync.lock"
- if device_id % device_num == 0 and not os.path.exists(sync_lock):
- os.system("pip uninstall numpy -y")
- os.system('pip install numpy')
- try:
- os.mknod(sync_lock)
- except IOError:
- pass
-
- while True:
- if os.path.exists(sync_lock):
- break
- time.sleep(10)
-
- import argparse
- import ast
- import moxing as mox
- import mindspore.nn as nn
- from mindspore import context, Tensor
- from mindspore.communication.management import init, get_rank
- from mindspore.train.callback import CheckpointConfig, ModelCheckpoint, LossMonitor, TimeMonitor, Callback
- from mindspore.train import Model
- from mindspore.context import ParallelMode
- from mindspore.train.serialization import load_checkpoint, load_param_into_net
- from mindspore.common import set_seed
- from src.config import config
- from src.dataset import create_EfficientDet_datasets
- from src.monitor import Monitor
- from src.lr_schedule import get_lr_cosine
- from src.mind_backbone import EfficientDetBackbone
- from src.efficientdet.loss import FocalLoss
- import os
- import numpy as np
- from mindspore.nn.wrap.loss_scale import DynamicLossScaleUpdateCell
- import mindspore as ms
- import mindspore.nn as nn
- from mindspore.ops import operations as P
- from mindspore.ops import functional as F
- from mindspore.ops import composite as C
- from mindspore.common.tensor import Tensor
- from mindspore.common.parameter import Parameter
- from mindspore.common import dtype as mstype
- from mindspore.nn.wrap.grad_reducer import DistributedGradReducer
- from mindspore.context import ParallelMode
- import mindspore.common.initializer as weight_init
- from mindspore.nn import TrainOneStepCell
- from mindspore.communication.management import get_group_size
- from mindspore import context, FixedLossScaleManager, DynamicLossScaleManager
-
- import math
-
- set_seed(1)
-
-
- grad_scale = C.MultitypeFuncGraph("grad_scale")
- reciprocal = P.Reciprocal()
-
- @grad_scale.register("Tensor", "Tensor")
- def tensor_grad_scale(scale, grad):
- return grad * reciprocal(scale)
-
- GRADIENT_CLIP_TYPE = 1
- GRADIENT_CLIP_VALUE = 1.0
-
- clip_grad = C.MultitypeFuncGraph("clip_grad")
-
-
- @clip_grad.register("Number", "Number", "Tensor")
- def _clip_grad(clip_type, clip_value, grad):
- """
- Clip gradients.
-
- Inputs:
- clip_type (int): The way to clip, 0 for 'value', 1 for 'norm'.
- clip_value (float): Specifies how much to clip.
- grad (tuple[Tensor]): Gradients.
-
- Outputs:
- tuple[Tensor], clipped gradients.
- """
- if clip_type not in (0, 1):
- return grad
- dt = F.dtype(grad)
- if clip_type == 0:
- new_grad = C.clip_by_value(grad, F.cast(F.tuple_to_array((-clip_value,)), dt),
- F.cast(F.tuple_to_array((clip_value,)), dt))
- else:
- new_grad = nn.ClipByNorm()(grad, F.cast(F.tuple_to_array((clip_value,)), dt))
- return new_grad
-
-
- class BertTrainOneStepWithLossScaleCell(nn.TrainOneStepWithLossScaleCell):
-
- def __init__(self, network, optimizer, scale_update_cell=None):
- super(BertTrainOneStepWithLossScaleCell, self).__init__(network, optimizer, scale_update_cell)
- self.cast = P.Cast()
- self.degree = 1
- if self.reducer_flag:
- self.degree = get_group_size()
- self.grad_reducer = DistributedGradReducer(optimizer.parameters, False, self.degree)
-
- self.loss_scale = None
- self.loss_scaling_manager = scale_update_cell
- if scale_update_cell:
- self.loss_scale = Parameter(Tensor(scale_update_cell.get_loss_scale(), dtype=mstype.float32))
-
- def construct(self,
- x, y,
- sens=None):
- """Defines the computation performed."""
- weights = self.weights
- loss = self.network(x, y)
- if sens is None:
- scaling_sens = self.loss_scale
- else:
- scaling_sens = sens
- status, scaling_sens = self.start_overflow_check(loss, scaling_sens)
- grads = self.grad(self.network, weights)(x, y,
- self.cast(scaling_sens,
- mstype.float32))
- # apply grad reducer on grads
- grads = self.grad_reducer(grads)
- degree_sens = self.cast(scaling_sens * self.degree, mstype.float32)
- grads = self.hyper_map(F.partial(grad_scale, degree_sens), grads)
- grads = self.hyper_map(F.partial(clip_grad, GRADIENT_CLIP_TYPE, GRADIENT_CLIP_VALUE), grads)
-
- cond = self.get_overflow_status(status, grads)
- overflow = cond
- if sens is None:
- overflow = self.loss_scaling_manager(self.loss_scale, cond)
- if not overflow:
- self.optimizer(grads)
- return loss, cond, scaling_sens
-
-
- def _calculate_fan_in_and_fan_out(tensor):
- """
- _calculate_fan_in_and_fan_out
- """
- dimensions = len(tensor)
- if dimensions < 2:
- raise ValueError("Fan in and fan out can not be computed for tensor"
- " with fewer than 2 dimensions")
- if dimensions == 2: # Linear
- fan_in = tensor[1]
- fan_out = tensor[0]
- else:
- num_input_fmaps = tensor[1]
- num_output_fmaps = tensor[0]
- receptive_field_size = 1
- if dimensions > 2:
- receptive_field_size = tensor[2] * tensor[3]
- fan_in = num_input_fmaps * receptive_field_size
- fan_out = num_output_fmaps * receptive_field_size
- return fan_in, fan_out
-
- def init_weights(model):
- # 返回所有模块的迭代器
- for name, cell in model.cells_and_names():
- is_conv_layer = isinstance(cell, nn.Conv2d)
-
- if is_conv_layer:
-
- if "conv_list" in name or "header" in name:
- fan_in, fan_out = _calculate_fan_in_and_fan_out(cell.weight.shape)
- sigma = math.sqrt(1. / float(fan_in)) # 这里计算的是std 而不是bound
- data = ms.Tensor(np.random.normal(loc = 0, scale=sigma, size=cell.weight.shape).astype(np.float32))
- cell.weight.set_data(weight_init.initializer(data, cell.weight.shape))
- else:
- cell.weight.set_data(weight_init.initializer(weight_init.HeUniform(),
- cell.weight.shape,
- cell.weight.dtype))
-
- if cell.has_bias is True:
- if "header_cls" in name:
- bias_value = -np.log((1 - 0.01) / 0.01)
- cell.bias.set_data(weight_init.initializer(bias_value, cell.bias.shape))
- else:
- cell.bias.set_data(weight_init.initializer('zeros', cell.bias.shape))
-
-
- def get_param_groups(network):
- """Param groups for optimizer."""
- decay_params = []
- no_decay_params = []
- for x in network.trainable_params():
- parameter_name = x.name
- if parameter_name.endswith('.bias'):
- # all bias not using weight decay
- no_decay_params.append(x)
- elif parameter_name.endswith('.gamma'):
- # bn weight bias not using weight decay, be carefully for now x not include BN
- no_decay_params.append(x)
- elif parameter_name.endswith('.beta'):
- # bn weight bias not using weight decay, be carefully for now x not include BN
- no_decay_params.append(x)
- else:
- decay_params.append(x)
-
- return [{'params': no_decay_params, 'weight_decay': 0.0}, {'params': decay_params}]
-
-
- class WithLossCell(nn.Cell):
- def __init__(self, backbone, loss):
- super(WithLossCell, self).__init__()
- self.backbone = backbone
- self.loss = loss
-
- def construct(self, x, y):
- _, reg, cls, anchor = self.backbone(x)
- cls_loss, reg_loss = self.loss(reg, cls, anchor, y)
- return cls_loss + reg_loss
-
-
- class TransferCallback(Callback):
-
- def __init__(self, local_train_path, obs_train_path):
- super(TransferCallback, self).__init__()
- self.local_train_path = local_train_path
- self.obs_train_path = obs_train_path
-
- def step_end(self, run_context):
- cb_params = run_context.original_args()
- current_epoch = cb_params.cur_epoch_num
- if current_epoch % 10 == 0 and current_epoch != 0:
- # mox.file.copy_parallel(self.local_train_url, self.obs_train_path)
- mox.file.copy_parallel(self.local_train_path, self.obs_train_path)
-
-
- def main():
- parser = argparse.ArgumentParser(description="EfficientDet training")
- parser.add_argument("--distribute", type=ast.literal_eval, default=True, help="Run distribute, default is False.")
- parser.add_argument("--workers", type=int, default=8, help="Num parallel workers.")
- parser.add_argument("--data_url", type=str, default=None, help="mindrecord dir")
- parser.add_argument("--train_url", type=str, default=None, help="ckpt output dir in obs")
- parser.add_argument("--lr", type=float, default=0.1, help="Learning rate, default is 0.1.")
- parser.add_argument("--mode", type=str, default="sink", help="Run sink mode or not, default is sink.")
- parser.add_argument("--epoch_size", type=int, default=300, help="Epoch size, default is 500.")
- parser.add_argument("--batch_size", type=int, default=32, help="Batch size, default is 32.")
- parser.add_argument("--checkpoint_path", type=str, default=None, help="Pretrained Checkpoint file path.")
- # parser.add_argument("--pretrained_backbone", type=str, default=None, help="backbone ckpt file path.")
- parser.add_argument("--pre_trained_epoch_size", type=int, default=0, help="Pretrained epoch size.")
- parser.add_argument("--save_checkpoint_epochs", type=int, default=1, help="Save checkpoint epochs, default is 5.")
- parser.add_argument("--loss_scale", type=int, default=1024, help="Loss scale, default is 1024.")
- parser.add_argument("--filter_weight", type=ast.literal_eval, default=False, help="Filter weight parameters, default is False.")
- parser.add_argument("--run_platform", type=str, default="Ascend", choices="Ascend", help="run platform, only support Ascend.")
-
- args_opt = parser.parse_args()
- device_id = int(os.getenv('DEVICE_ID'), 0)
- print("device_id:{}".format(device_id))
- device_num = int(os.getenv("RANK_SIZE", 1))
-
- args_opt.distribute = True if device_num > 1 else False
-
- if args_opt.run_platform == "Ascend":
- context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
- if args_opt.distribute:
- context.set_context(device_id=device_id, enable_auto_mixed_precision=True)
- init()
- context.set_auto_parallel_context(parallel_mode=ParallelMode.DATA_PARALLEL, gradients_mean=True,
- device_num=device_num)
- else:
- raise ValueError("Unsupported platform.")
-
- if args_opt.checkpoint_path:
- checkpoint_path = "/cache/pre"
- mox.file.make_dirs(checkpoint_path)
- checkpoint_path = os.path.join(checkpoint_path, "efdet.ckpt")
- mox.file.copy(args_opt.checkpoint_path, checkpoint_path)
-
- local_data_url = "/cache/data/" + str(device_id)
- mox.file.make_dirs(local_data_url)
-
- local_train_url = "/cache/ckpt"
- mox.file.make_dirs(local_train_url)
-
- filename = "EfficientDet.mindrecord0"
-
- # 生成 EfficientDet.mindrecord
- mox.file.copy_parallel(args_opt.data_url, local_data_url)
- local_data_path = os.path.join(local_data_url, filename)
-
- dataset = create_EfficientDet_datasets(local_data_path, repeat_num=1,
- num_parallel_workers=args_opt.workers,
- batch_size=args_opt.batch_size, device_num=device_num, rank=device_id)
- dataset_size = dataset.get_dataset_size()
-
- print("Create dataset done!")
-
- net = EfficientDetBackbone(90, 0, False, True)
- net.set_train()
-
-
- init_weights(net)
-
- if args_opt.checkpoint_path:
- print("----------------------------------")
- print("load checkpoint :" + args_opt.checkpoint_path)
- param_dict = load_checkpoint(checkpoint_path)
- load_param_into_net(net, param_dict) # param_dict = load_checkpoint(ckpt_path)
- print("----------------------------------")
-
-
- loss = FocalLoss()
-
- net_withloss = WithLossCell(net, loss)
-
-
- lr = Tensor(get_lr_cosine(init_lr=0.012, steps_per_epoch=dataset_size, warmup_epochs=int(args_opt.epoch_size / 20),
- max_epoch=args_opt.epoch_size, t_max=args_opt.epoch_size, eta_min=0.0))
-
-
- opt = nn.Momentum(filter(lambda x: x.requires_grad, net.get_parameters()), lr,
- config.momentum, config.weight_decay)
-
- update_cell = DynamicLossScaleUpdateCell(loss_scale_value=2 ** 10, scale_factor=2, scale_window=1000)
-
- net_with_grads = BertTrainOneStepWithLossScaleCell(net_withloss,
- optimizer=opt,
- scale_update_cell=update_cell)
-
- model = Model(net_with_grads, amp_level="O0")
-
-
- transferCb = TransferCallback(local_train_url, args_opt.train_url)
-
- # cb = [Monitor(lr_init=lr.asnumpy())]
- cb = [LossMonitor(), TimeMonitor()]
-
- config_ck = CheckpointConfig(save_checkpoint_steps=dataset_size * args_opt.save_checkpoint_epochs,
- keep_checkpoint_max=config.keep_checkpoint_max)
-
- ckpt_cb = ModelCheckpoint(prefix="EfficientDet", directory=local_train_url, config=config_ck)
- print("============== Starting Training ==============")
-
- if device_id == 0:
- cb += [ckpt_cb, transferCb]
-
- # is_sink = True if args_opt.distribute else False
-
- # 分析算子的性能
- model.train(args_opt.epoch_size, dataset, callbacks=cb, dataset_sink_mode=True)
-
- # profiler.analyse()
- print("============== End Training ==============")
-
-
- if __name__ == '__main__':
-
- main()
|