|
- import random
- import os
- import yaml
- import numpy as np
- from pathlib import Path
-
- import mindspore as ms
- from mindspore import context, nn, ops
- from mindspore.context import ParallelMode
- from mindspore.communication.management import init, get_rank, get_group_size
-
- from mindyolo.models import create_loss, create_model
- from mindyolo.optim import create_group_param, create_lr_scheduler, create_warmup_momentum_scheduler, \
- create_optimizer, EMA
- from mindyolo.data import COCODataset, create_loader
- from mindyolo.data import COCO80_TO_COCO91_CLASS
- from mindyolo.utils import logger
- from mindyolo.utils.config import parse_args
- from mindyolo.utils.checkpoint_manager import CheckpointManager
- from mindyolo.utils.metrics import non_max_suppression, scale_coords, xyxy2xywh
- from mindyolo.utils.modelarts import sync_data
-
- def train(cfg):
- set_seed(cfg.get('seed', 2))
-
- # Set Context
- context.set_context(mode=cfg.ms_mode, device_target=cfg.device_target, max_call_depth=2000)
- if cfg.device_target == "Ascend":
- device_id = int(os.getenv('DEVICE_ID', 0))
- context.set_context(device_id=device_id)
- elif cfg.device_target == "GPU" and cfg.get('ms_enable_graph_kernel', False):
- context.set_context(enable_graph_kernel=True)
-
- # Set Parallel
- rank, rank_size, parallel_mode = 0, 1, ParallelMode.STAND_ALONE
- if cfg.is_parallel:
- init()
- rank, rank_size, parallel_mode = get_rank(), get_group_size(), ParallelMode.DATA_PARALLEL
- context.set_auto_parallel_context(device_num=rank_size, parallel_mode=parallel_mode, gradients_mean=True)
- main_device = (rank % rank_size == 0)
-
- # Set default cfg
- total_batch_size = cfg.per_batch_size * rank_size
- cfg.sync_bn = cfg.sync_bn and context.get_context("device_target") == "Ascend" and cfg.rank_size > 1
- cfg.accumulate = max(1, np.round(cfg.nbs / cfg.total_batch_size)) \
- if cfg.auto_accumulate else cfg.get('accumulate', 1)
- assert len(cfg.data.names) == cfg.data.nc, '%g names found for nc=%g dataset in %s' % \
- (len(cfg.data.names), cfg.data.nc, cfg.config)
-
- # Directories and Save run settings
- cfg.ckpt_save_dir = os.path.join(cfg.save_dir, 'weights')
- cfg.sync_lock_dir = os.path.join(cfg.save_dir, 'sync_locks') if not cfg.enable_modelarts else '/tmp/sync_locks'
- if main_device:
- os.makedirs(cfg.ckpt_save_dir, exist_ok=True)
- with open(os.path.join(cfg.save_dir, "cfg.yaml"), 'w') as f:
- yaml.dump(vars(cfg), f, sort_keys=False)
- # sync_lock for run_eval
- os.makedirs(cfg.sync_lock_dir, exist_ok=False)
-
- # Set Logger
- logger.setup_logging(logger_name="MindYOLO", log_level="INFO", rank_id=rank, device_per_servers=rank_size)
- logger.setup_logging_file(log_dir=os.path.join(cfg.save_dir, "logs"))
-
- # Modelarts: Copy data, from the s3 bucket to the computing node; Reset dataset dir.
- if cfg.enable_modelarts:
- from mindyolo.utils.modelarts import sync_data
- os.makedirs(cfg.data_dir, exist_ok=True)
- sync_data(cfg.data_url, cfg.data_dir)
- sync_data(cfg.save_dir, cfg.train_url)
- if cfg.ckpt_url:
- sync_data(cfg.ckpt_url, cfg.ckpt_dir) # pretrain ckpt
- cfg.data.dataset_dir = os.path.join(cfg.data_dir, cfg.data.dataset_dir)
- cfg.weight = os.path.join(cfg.ckpt_dir, cfg.weight) if cfg.weight else ''
- cfg.ema_weight = os.path.join(cfg.ckpt_dir, cfg.ema_weight) if cfg.ema_weight else ''
-
- # Create Network
- network = create_model(model_name=cfg.network.model_name,
- model_cfg=cfg.network,
- num_classes=cfg.data.nc,
- sync_bn=cfg.sync_bn)
- if cfg.ema and main_device:
- ema_network = create_model(model_name=cfg.network.model_name,
- model_cfg=cfg.network,
- num_classes=cfg.data.nc)
- ema = EMA(network, ema_network)
- else:
- ema = None
-
- load_pretrain(network, cfg.weight, ema=ema, ema_weight=cfg.ema_weight) # load pretrain
- freeze_layers(network, cfg.freeze) # freeze Layers
- ms.amp.auto_mixed_precision(network, amp_level=cfg.ms_amp_level)
- if ema:
- ms.amp.auto_mixed_precision(ema.ema, amp_level=cfg.ms_amp_level)
-
- # Create Dataset
- dataset_train = COCODataset(
- dataset_dir=cfg.data.dataset_dir,
- image_dir=cfg.data.train_img_dir,
- anno_path = cfg.data.train_anno_path,
- img_size=cfg.img_size,
- transforms_dict=cfg.data.train_transforms,
- is_training=True,
- rect=cfg.rect,
- batch_size=total_batch_size,
- stride=max(cfg.network.stride),
- )
- loader_train = create_loader(
- dataset=dataset_train,
- batch_collate_fn=dataset_train.batch_collate_fn,
- dataset_column_names=dataset_train.dataset_column_names,
- batch_size=cfg.per_batch_size,
- epoch_size=cfg.epochs,
- rank=rank,
- rank_size=rank_size,
- shuffle=True,
- drop_remainder=True,
- num_parallel_workers=cfg.data.num_parallel_workers,
- python_multiprocessing=True
- )
- steps_per_epoch = int(len(dataset_train) / total_batch_size)
-
- if cfg.run_eval:
- dataset_eval = COCODataset(
- dataset_dir=cfg.data.dataset_dir,
- image_dir=cfg.data.train_img_dir,
- anno_path=cfg.data.train_anno_path,
- img_size=cfg.img_size,
- transforms_dict=cfg.data.train_transforms,
- is_training=True,
- rect=cfg.rect,
- batch_size=total_batch_size,
- stride=max(cfg.network.stride),
- )
- loader_eval = create_loader(
- dataset=dataset_eval,
- batch_collate_fn=dataset_eval.batch_collate_fn,
- dataset_column_names=dataset_eval.dataset_column_names,
- batch_size=cfg.per_batch_size * 2,
- epoch_size=1, rank=0, rank_size=1, shuffle=False, drop_remainder=False,
- num_parallel_workers=cfg.data.num_parallel_workers,
- python_multiprocessing=True
- )
-
- # Create Loss
- cfg.loss.loss_item_name = cfg.loss.get('loss_item_name', ['loss', 'lbox', 'lobj', 'lcls'])
- loss = create_loss(
- **cfg.loss,
- anchors=cfg.network.get('anchors', None),
- stride=cfg.network.get('stride', None),
- nc=cfg.data.get('nc', None)
- )
- ms.amp.auto_mixed_precision(loss, amp_level=cfg.ms_amp_level)
-
- # Create Optimizer
- cfg.optimizer.warmup_epochs = cfg.optimizer.get('warmup_epochs', 0)
- cfg.optimizer.min_warmup_step = cfg.optimizer.get('min_warmup_step', 0)
- cfg.optimizer.epochs = cfg.epochs
- cfg.optimizer.nbs = cfg.nbs
- cfg.optimizer.accumulate = cfg.accumulate
- cfg.optimizer.steps_per_epoch = steps_per_epoch
- lr = create_lr_scheduler(**cfg.optimizer)
- params = create_group_param(params=network.trainable_params(), **cfg.optimizer)
- optimizer = create_optimizer(params=params, lr=lr, **cfg.optimizer)
- warmup_momentum = create_warmup_momentum_scheduler(**cfg.optimizer)
-
- # Create train_step_fn
- if cfg.is_parallel:
- mean = context.get_auto_parallel_context("gradients_mean")
- degree = context.get_auto_parallel_context("device_num")
- reducer = nn.DistributedGradReducer(optimizer.parameters, mean, degree)
- else:
- reducer = ops.functional.identity
-
- scaler = get_loss_scaler(
- type=cfg.ms_loss_scaler,
- ms_loss_scaler_value=cfg.get('ms_loss_scaler_value', 2 ** 10),
- )
-
- train_step_fn = get_train_step_fn(network=network,
- loss_fn=loss,
- optimizer=optimizer,
- rank_size=rank_size,
- scaler=scaler,
- reducer=reducer,
- overflow_still_update=cfg.overflow_still_update,
- ms_jit=cfg.ms_jit)
- accumulate_grads_fn = get_accumulate_grads_fn()
- network.set_train(True)
- optimizer.set_train(True)
-
-
- # Strating Training
- global_step = 0
- accumulate_cur_step = 0
- accumulate_grads = None
- warmup_steps = max(round(warmup_epochs * steps_per_epoch), min_warmup_step)
- ckpt_save_dir = self.cfg.ckpt_save_dir
- keep_checkpoint_max = self.cfg.keep_checkpoint_max
- enable_modelarts = self.cfg.enable_modelarts
- sync_lock_dir = self.cfg.sync_lock_dir
- model_name = os.path.basename(self.cfg.config)[:-5] # delete ".yaml"
- manager = CheckpointManager(ckpt_save_policy='latest_k')
- manager_ema = CheckpointManager(ckpt_save_policy='latest_k') if self.ema else None
- manager_best = CheckpointManager(ckpt_save_policy='top_k') if self.run_eval else None
- ckpt_filelist_best = []
-
- self.dataloader = self.dataloader.repeat(self.epochs)
- loader = self.dataloader.create_dict_iterator(output_numpy=True, num_epochs=1)
- s_step_time = time.time()
- s_epoch_time = time.time()
- for i, data in enumerate(loader):
- cur_epoch = (i // self.steps_per_epoch) + 1
- cur_step = (i % self.steps_per_epoch) + 1
-
- self.global_step += 1
- if self.global_step < self.warmup_steps:
- xp, fp = [0, self.warmup_steps], [1, self.cfg.nbs / self.cfg.total_batch_size]
- self.accumulate = max(1, np.interp(self.global_step, xp,
- fp).round()) if self.auto_accumulate else self.accumulate
- if self.warmup_momentum and isinstance(self.optimizer, (nn.SGD, nn.Momentum)):
- dtype = self.optimizer.momentum.dtype
- self.optimizer.momentum = Tensor(self.warmup_momentum[i], dtype)
-
- imgs, batch_idx, gt_class, gt_bbox = data["image"], data['batch_idx'], data["gt_class"], data["gt_bbox"]
- labels = np.concatenate((batch_idx, gt_class, gt_bbox), -1) # (bs, N, 6)
- imgs, labels = Tensor(imgs, self.input_dtype), Tensor(labels, self.input_dtype)
- size = None
- if self.multi_scale:
- gs = max(int(np.array(self.stride).max()), 32)
- sz = random.randrange(self.img_size * 0.5, self.img_size * 1.5 + gs) // gs * gs # size
- sf = sz / max(imgs.shape[2:]) # scale factor
- if sf != 1:
- size = tuple(
- [math.ceil(x * sf / gs) * gs for x in imgs.shape[2:]]) # new shape (stretched to gs-multiple)
-
- self.train_step(imgs, labels, size, cur_step=cur_step, cur_epoch=cur_epoch)
-
- # train log
- if cur_step % self.log_interval == 0:
- logger.info(f"Epoch {self.epochs}/{cur_epoch}, Step {self.steps_per_epoch}/{cur_step}, "
- f"step time: {(time.time() - s_step_time) * 1000 / self.log_interval:.2f} ms")
- s_step_time = time.time()
-
- # run eval per epoch on main device
- if self.run_eval and (i + 1) % self.steps_per_epoch == 0:
- s_eval_time = time.time()
- sync_lock = os.path.join(sync_lock_dir, "/run_eval_sync.lock" + str(cur_epoch))
- # single device run eval only
- if self.main_device and not os.path.exists(sync_lock):
- eval_network = self.ema.ema if self.ema else self.network
- _train_status = eval_network.training
- eval_network.set_train(False)
- accuracy = self.eval(eval_network, self.eval_dataloader, self.eval_dataset)
- accuracy = accuracy[0] if isinstance(accuracy, (list, tuple)) else accuracy
- eval_network.set_train(_train_status)
-
- save_path_best = os.path.join(ckpt_save_dir, f"best/{model_name}-{cur_epoch}_{self.steps_per_epoch}"
- f"_acc{accuracy:.2f}.ckpt")
- ckpt_filelist_best = manager_best.save_ckpoint(eval_network, num_ckpt=keep_checkpoint_max,
- metric=accuracy, save_path=save_path_best)
- logger.info(f"Epoch {self.epochs}/{cur_epoch}, eval accuracy: {accuracy:.2f}, "
- f"run_eval time: {(time.time() - s_eval_time):.2f} s.")
- try:
- os.mknod(sync_lock)
- except IOError:
- pass
- # other device wait for lock sign
- while True:
- if os.path.exists(sync_lock):
- break
- time.sleep(1)
-
- # save checkpoint per epoch on main device
- if self.main_device and (i + 1) % self.steps_per_epoch == 0:
- # Save Checkpoint
- ms.save_checkpoint(self.optimizer, os.path.join(ckpt_save_dir, f'optim_{model_name}.ckpt'),
- async_save=True)
- save_path = os.path.join(ckpt_save_dir, f"{model_name}-{cur_epoch}_{self.steps_per_epoch}.ckpt")
- manager.save_ckpoint(self.network, num_ckpt=keep_checkpoint_max, save_path=save_path)
- if self.ema:
- save_path_ema = os.path.join(ckpt_save_dir,
- f"EMA_{model_name}-{cur_epoch}_{self.steps_per_epoch}.ckpt")
- manager_ema.save_ckpoint(self.ema.ema, num_ckpt=keep_checkpoint_max, save_path=save_path_ema)
- logger.info(f"Saving model to {save_path}")
-
- if enable_modelarts:
- sync_data(save_path, self.cfg.train_url + "/weights/" + save_path.split("/")[-1])
- if self.ema:
- sync_data(save_path_ema, self.cfg.train_url + "/weights/" + save_path_ema.split("/")[-1])
-
- logger.info(f"Epoch {self.epochs}/{cur_epoch}, epoch time: {(time.time() - s_epoch_time) / 60:.2f} min.")
- s_epoch_time = time.time()
-
- if enable_modelarts and ckpt_filelist_best:
- for p in ckpt_filelist_best:
- sync_data(p, self.cfg.train_url + '/weights/best/' + p.split("/")[-1])
-
- logger.info("End Train.")
-
-
-
- def set_seed(seed=2):
- np.random.seed(seed)
- random.seed(seed)
- ms.set_seed(seed)
-
-
- def load_pretrain(network, weight, ema=None, ema_weight=None):
- if weight.endswith('.ckpt'):
- param_dict = ms.load_checkpoint(weight)
- ms.load_param_into_net(network, param_dict)
- logger.info(f"Pretrain model load from \"{weight}\" success.")
- if ema:
- if ema_weight.endswith('.ckpt'):
- param_dict_ema = ms.load_checkpoint(ema_weight)
- ms.load_param_into_net(ema.ema, param_dict_ema)
- logger.info(f"Ema pretrain model load from \"{ema_weight}\" success.")
- else:
- ema.clone_from_model()
- logger.info("ema_weight not exist, default pretrain weight is currently used.")
-
-
- def freeze_layers(network, freeze=[]):
- if len(freeze) > 0:
- freeze = [f'model.{x}.' for x in freeze] # parameter names to freeze (full or partial)
- for n, p in network.parameters_and_names():
- if any(x in n for x in freeze):
- logger.info('freezing %s' % n)
- p.requires_grad = False
-
-
- def get_loss_scaler(type='static', ms_loss_scaler_value=1024., scale_factor=2, scale_window=2000):
- if type == 'dynamic':
- from mindspore.amp import DynamicLossScaler
- loss_scaler = DynamicLossScaler(scale_value=ms_loss_scaler_value,
- scale_factor=scale_factor,
- scale_window=scale_window)
- elif type == 'static':
- from mindspore.amp import StaticLossScaler
- loss_scaler = StaticLossScaler(ms_loss_scaler_value)
- elif type in ('none', 'None'):
- from mindspore.amp import StaticLossScaler
- loss_scaler = StaticLossScaler(1.0)
- else:
- raise NotImplementedError(f"Not support ms_loss_scaler: {type}")
- return loss_scaler
-
-
- def get_train_step_fn(network, loss_fn, optimizer, rank_size, scaler, reducer, overflow_still_update=False, ms_jit=False):
- from mindyolo.utils.all_finite import all_finite
-
- def forward_func(x, label, sizes=None):
- if sizes is not None:
- x = ops.interpolate(x, sizes=sizes, coordinate_transformation_mode="asymmetric", mode="bilinear")
- pred = network(x)
- loss, loss_items = loss_fn(pred, label, x)
- loss *= rank_size
- return scaler.scale(loss), ops.stop_gradient(loss_items)
-
- grad_fn = ops.value_and_grad(forward_func, grad_position=None, weights=optimizer.parameters, has_aux=True)
-
- def train_step_func(x, label, sizes=None, optimizer_update=True):
- (loss, loss_items), grads = grad_fn(x, label, sizes)
- grads = reducer(grads)
- unscaled_grads = scaler.unscale(grads)
- grads_finite = all_finite(unscaled_grads)
-
- if optimizer_update:
- if grads_finite:
- loss = ops.depend(loss, optimizer(unscaled_grads))
- else:
- if overflow_still_update:
- loss = ops.depend(loss, optimizer(unscaled_grads))
-
- return scaler.unscale(loss), loss_items, unscaled_grads, grads_finite
-
- @ms.ms_function
- def jit_warpper(*args):
- return train_step_func(*args)
-
- return train_step_func if not ms_jit else jit_warpper
-
-
- def get_accumulate_grads_fn():
- hyper_map = ops.HyperMap()
-
- def accu_fn(g1, g2):
- g1 = g1 + g2
- return g1
-
- def accumulate_grads_fn(accumulate_grads, grads):
- success = hyper_map(accu_fn, accumulate_grads, grads)
- return success
-
- return accumulate_grads_fn
-
-
- if __name__ == '__main__':
- cfg = parse_args('train')
- train(cfg)
-
-
|