|
- ''' Model training pipeline in QiZhi NPU'''
- import os
- import logging
- import time
- import mindspore as ms
- import numpy as np
- import moxing as mox
- from mindspore import nn, Tensor
- from mindspore import FixedLossScaleManager, Model
- from mindspore.communication import init, get_rank, get_group_size
- from mindspore import load_checkpoint, load_param_into_net
-
- from mindcv.models import create_model
- from mindcv.data import create_dataset, create_transforms, create_loader
- from mindcv.loss import create_loss
- from mindcv.optim import create_optimizer
- from mindcv.scheduler import create_scheduler
- from mindcv.utils import StateMonitor
- from mindcv.utils import AllReduceSum as Allreduce
- from config import parse_args
- from mindspore.profiler import Profiler
- # from openi import c2net_multidataset_to_env as DatasetToEnv
-
- ms.set_seed(1)
- np.random.seed(1)
-
- logger = logging.getLogger('train')
- logger.setLevel(logging.INFO)
- h1 = logging.StreamHandler()
- formatter1 = logging.Formatter('%(message)s', )
- logger.addHandler(h1)
- h1.setFormatter(formatter1)
-
- def do_zip_compress(dirpath):
- print("原始文件夹路径:" + dirpath)
- output_name = f"{dirpath}.zip"
- parent_name = os.path.dirname(dirpath)
- print("压缩文件夹目录:", parent_name)
- zip = zipfile.ZipFile(output_name, "w", zipfile.ZIP_DEFLATED)
- # 多层级压缩
- for root, dirs, files in os.walk(dirpath):
- for file in files:
- if str(file).startswith("~$"):
- continue
- filepath = os.path.join(root, file)
- print("压缩文件路径:" + filepath)
- writepath = os.path.relpath(filepath, parent_name)
- print(writepath)
-
- zip.write(filepath, writepath)
- zip.close()
-
- def train(args):
- ''' main train function'''
- ms.set_context(mode=args.mode)
- ms.set_context(reserve_class_name_in_scope=False)
-
- if args.distribute:
- init()
- device_num = get_group_size()
- rank_id = get_rank()
- ms.set_auto_parallel_context(device_num=device_num,
- parallel_mode='data_parallel',
- gradients_mean=True)
- else:
- device_num = None
- rank_id = None
-
- # create dataset
- dataset_train = create_dataset(
- name=args.dataset,
- root=args.data_dir,
- split=args.train_split,
- shuffle=args.shuffle,
- num_samples=args.num_samples,
- num_shards=device_num,
- shard_id=rank_id,
- num_parallel_workers=args.num_parallel_workers,
- download=args.dataset_download)
-
- if args.num_classes is None:
- num_classes = dataset_train.num_classes()
- else:
- num_classes = args.num_classes
-
- # create transforms
- transform_list = create_transforms(
- dataset_name=args.dataset,
- is_training=True,
- image_resize=args.image_resize,
- scale=args.scale,
- ratio=args.ratio,
- hflip=args.hflip,
- vflip=args.vflip,
- color_jitter=args.color_jitter,
- interpolation=args.interpolation,
- auto_augment=args.auto_augment,
- mean=args.mean,
- std=args.std,
- re_prob=args.re_prob,
- re_scale=args.re_scale,
- re_ratio=args.re_ratio,
- re_value=args.re_value,
- re_max_attempts=args.re_max_attempts
- )
-
- # load dataset
- loader_train = create_loader(
- dataset=dataset_train,
- batch_size=args.batch_size,
- drop_remainder=args.drop_remainder,
- is_training=True,
- mixup=args.mixup,
- cutmix=args.cutmix,
- cutmix_prob=args.cutmix_prob,
- num_classes=num_classes,
- transform=transform_list,
- num_parallel_workers=args.num_parallel_workers,
- )
-
- if args.val_while_train:
- dataset_eval = create_dataset(
- name=args.dataset,
- root=args.data_dir,
- split=args.val_split,
- num_shards=device_num,
- shard_id=rank_id,
- num_parallel_workers=args.num_parallel_workers,
- download=args.dataset_download)
-
- transform_list_eval = create_transforms(
- dataset_name=args.dataset,
- is_training=False,
- image_resize=args.image_resize,
- crop_pct=args.crop_pct,
- interpolation=args.interpolation,
- mean=args.mean,
- std=args.std
- )
-
- loader_eval = create_loader(
- dataset=dataset_eval,
- batch_size=args.batch_size,
- drop_remainder=False,
- is_training=False,
- transform=transform_list_eval,
- num_parallel_workers=args.num_parallel_workers,
- )
- # validation dataset count
- eval_count = dataset_eval.get_dataset_size()
- if args.distribute:
- all_reduce = Allreduce()
- eval_count = all_reduce(Tensor(eval_count, ms.int32))
- else:
- loader_eval = None
-
- num_batches = loader_train.get_dataset_size()
- # Train dataset count
- train_count = dataset_train.get_dataset_size()
- if args.distribute:
- all_reduce = Allreduce()
- train_count = all_reduce(Tensor(train_count, ms.int32))
-
- # create model
- network = create_model(model_name=args.model,
- num_classes=num_classes,
- in_channels=args.in_channels,
- drop_rate=args.drop_rate,
- drop_path_rate=args.drop_path_rate,
- pretrained=args.pretrained,
- checkpoint_path=args.ckpt_path)
-
- # param_dict = load_checkpoint(args.ckpt_path)# path = /home/work/user-job-dir/V00XX/LeViT-128S.ckpt
- # load_param_into_net(network, param_dict)
-
- # data_dtype = ms.float32
- if args.device_target == 'Ascend':
- network.to_float(ms.float32)
- for _, cell in network.cells_and_names():
- if isinstance(cell, (nn.BatchNorm2d, nn.LayerNorm)):
- cell.to_float(ms.float16)
- # data_dtype = ms.float16
- network.set_train()
-
- num_params = sum([param.size for param in network.get_parameters()])
-
- # create loss
- loss = create_loss(name=args.loss,
- reduction=args.reduction,
- label_smoothing=args.label_smoothing,
- aux_factor=args.aux_factor)
-
- # create learning rate schedule
- lr_scheduler = create_scheduler(num_batches,
- scheduler=args.scheduler,
- lr=args.lr,
- min_lr=args.min_lr,
- warmup_epochs=args.warmup_epochs,
- decay_epochs=args.decay_epochs,
- decay_rate=args.decay_rate,
- milestones=args.multi_step_decay_milestones,
- num_epochs=args.epoch_size)
-
- # resume training if ckpt_path is given
- if args.ckpt_path != '' and args.resume_opt:
- opt_ckpt_path = os.path.join(args.ckpt_save_dir, f'optim_{args.model}.ckpt')
- else:
- opt_ckpt_path = ''
-
- # create optimizer
- # TODO: consistent naming opt, name, dataset_name
- optimizer = create_optimizer(network.trainable_params(),
- opt=args.opt,
- lr=lr_scheduler,
- weight_decay=args.weight_decay,
- momentum=args.momentum,
- nesterov=args.use_nesterov,
- filter_bias_and_bn=args.filter_bias_and_bn,
- loss_scale=args.loss_scale,
- checkpoint_path=opt_ckpt_path)
-
- # Define eval metrics.
- eval_metrics = {'Top_1_Accuracy': nn.Top1CategoricalAccuracy()}
-
- # init model
- if args.loss_scale > 1.0:
- loss_scale_manager = FixedLossScaleManager(loss_scale=args.loss_scale, drop_overflow_update=False)
- model = Model(network, loss_fn=loss, optimizer=optimizer, metrics=eval_metrics, amp_level=args.amp_level,
- loss_scale_manager=loss_scale_manager)
- else:
- model = Model(network, loss_fn=loss, optimizer=optimizer, metrics=eval_metrics, amp_level=args.amp_level)
-
- # checkpoint_path = "/home/work/user-job-dir/V0017/LeViT-128S.ckpt"
- # param_dict = load_checkpoint(checkpoint_path)
- # load_param_into_net(model, param_dict)
-
- # callback
- # save checkpoint, summary training loss
- # recorad val acc and do model selection if val dataset is availabe
- begin_epoch = 0
- # if args.ckpt_path != '':
- # if args.ckpt_path != '':
- # begin_step = optimizer.global_step.asnumpy()[0]
- # begin_epoch = args.ckpt_path.split('/')[-1].split('-')[1].split('_')[0]
- # begin_epoch = int(begin_epoch)
-
- summary_dir = f"./{args.ckpt_save_dir}/summary"
- assert (args.ckpt_save_policy != 'top_k' or args.val_while_train == True), \
- "ckpt_save_policy is top_k, val_while_train must be True."
- state_cb = StateMonitor(model, summary_dir=summary_dir,
- dataset_val=loader_eval,
- val_interval=args.val_interval,
- metric_name="Top_1_Accuracy",
- ckpt_save_dir=args.ckpt_save_dir,
- ckpt_save_interval=args.ckpt_save_interval,
- # best_ckpt_name=args.model + '_best.ckpt',
- rank_id=rank_id,
- device_num=device_num,
- log_interval=args.log_interval,
- keep_checkpoint_max=args.keep_checkpoint_max,
- model_name=args.model,
- last_epoch=begin_epoch,
- ckpt_save_policy=args.ckpt_save_policy)
-
- callbacks = [state_cb]
- # log
- if rank_id in [None, 0]:
- logger.info(f"-" * 40)
- logger.info(f"Num devices: {device_num if device_num is not None else 1} \n"
- f"Distributed mode: {args.distribute} \n"
- f"Num training samples: {train_count}")
- if args.val_while_train:
- logger.info(f"Num validation samples: {eval_count}")
- logger.info(f"Num classes: {num_classes} \n"
- f"Num batches: {num_batches} \n"
- f"Batch size: {args.batch_size} \n"
- f"Auto augment: {args.auto_augment} \n"
- f"Model: {args.model} \n"
- f"Model param: {num_params} \n"
- f"Num epochs: {args.epoch_size} \n"
- f"Optimizer: {args.opt} \n"
- f"LR: {args.lr} \n"
- f"LR Scheduler: {args.scheduler}")
- logger.info(f"-" * 40)
-
- if args.ckpt_path != '':
- logger.info(f"Resume training from {args.ckpt_path}, last step: {begin_step}, last epoch: {begin_epoch}")
- else:
- logger.info('Start training')
-
- # inp = np.random.uniform(-1, 1, (4, 3, 224, 224)).astype(np.float32)
- # inp.tofile("/home/work/user-job-dir/V0118/res.bin")
- # print("save end")
-
- # profiler = ms.Profiler(output_path=f"./{args.ckpt_save_dir}/profiler")
- model.train(args.epoch_size, loader_train, callbacks=callbacks, dataset_sink_mode=args.dataset_sink_mode)
- # profiler.analyse()
- # file_path = f"./{args.ckpt_save_dir}/profiler"
- # do_zip_compress(file_path)
-
-
-
-
-
- if __name__ == '__main__':
- args = parse_args()
- # os.system("export MINDSPORE_DUMP_CONFIG = "{}"".format(args.dump_path))
-
-
- # os.system("bash /home/work/user-job-dir/V0156/utils.sh")
- # os.system("pwd")
- if not os.path.exists(args.ckpt_save_dir):
- os.makedirs(args.ckpt_save_dir)
- # if not os.path.exists(os.path.join(args.ckpt_save_dir, "dump_data")):
- # os.makedirs(os.path.join(args.ckpt_save_dir, "dump_data"))
- # if not os.path.exists("/cache/user-job-dir/workspace/dump_data"):
- # os.makedirs("/cache/user-job-dir/workspace/dump_data")
-
- # sh_pa = "export MINDSPORE_DUMP_CONFIG={}".format(args.dump_path)
- # sh_pa2 = "export MS_DIAGNOSTIC_DATA_PATH={}".format(args.ckpt_save_dir + "/dump_data")
- # os.system(sh_pa)
- # os.system(sh_pa2)
-
- # os.environ['MINDSPORE_DUMP_CONFIG'] = args.dump_path
- # os.environ['MS_DIAGNOSTIC_DATA_PATH'] = '/cache/outputs/'
-
- # modelarts
- mox.file.copy_parallel(src_url=os.path.join(args.data_url, 'imagenet'), dst_url=args.data_dir)
- train(args)
- os.system("pwd")
- # modelarts
- mox.file.copy_parallel(src_url=args.ckpt_save_dir, dst_url=os.path.join(args.train_url))
- mox.file.copy_parallel(src_url='/cache/outputs', dst_url=args.train_url)
- # os.system("cd /cache/user-job-dir/workspace")
- # os.system("ls")
- # os.makedirs(os.path.join(args.train_url, "dump_data"))
- # mox.file.copy_parallel(src_url="/cache/user-job-dir/workspace/dump_data", dst_url=os.path.join(args.train_url, "dump_data"))
-
-
|