|
- #!/usr/bin/env python
- """Runing training on Qizhi OpenI
- """
- import argparse
- import functools
- import os
- import subprocess
- import sys
- import time
- from typing import Any, Callable
-
- import math
- import mindspore
-
- from mindspore.context import ParallelMode
- import mindspore.ops as ops
- import moxing as mox
- from mindspore.train.callback import Callback
-
-
- class UploadOutput(Callback):
- def __init__(self, train_dir, obs_train_url):
- self.train_dir = train_dir
- self.obs_train_url = obs_train_url
- def epoch_end(self,run_context):
- try:
- mox.file.copy_parallel(self.train_dir , self.obs_train_url )
- print("Successfully Upload {} to {}".format(self.train_dir ,self.obs_train_url ))
- except Exception as e:
- print('moxing upload {} to {} failed: '.format(self.train_dir ,self.obs_train_url ) + str(e))
- return
-
- ### Copy single dataset from obs to training image###
- def ObsToEnv(obs_data_url, data_dir):
- try:
- mox.file.copy_parallel(obs_data_url, data_dir)
- print("Successfully Download {} to {}".format(obs_data_url, data_dir))
- except Exception as e:
- print('moxing download {} to {} failed: '.format(obs_data_url, data_dir) + str(e))
- #Set a cache file to determine whether the data has been copied to obs.
- #If this file exists during multi-card training, there is no need to copy the dataset multiple times.
- f = open("/cache/download_input.txt", 'w')
- f.close()
- try:
- if os.path.exists("/cache/download_input.txt"):
- print("download_input succeed")
- except Exception as e:
- print("download_input failed")
- return
- ### Copy the output to obs###
- def EnvToObs(train_dir, obs_train_url):
- try:
- mox.file.copy_parallel(train_dir, obs_train_url)
- print("Successfully Upload {} to {}".format(train_dir,obs_train_url))
- except Exception as e:
- print('moxing upload {} to {} failed: '.format(train_dir,obs_train_url) + str(e))
- return
- def DownloadFromQizhi(obs_data_url, data_dir):
- device_num = int(os.getenv('RANK_SIZE'))
- if device_num == 1:
- ObsToEnv(obs_data_url,data_dir)
- # context.set_context(mode=context.GRAPH_MODE,device_target=args.device_target)
- # if device_num > 1:
- # # set device_id and init for multi-card training
- # # context.set_context(mode=context.GRAPH_MODE, device_target=args.device_target, device_id=int(os.getenv('ASCEND_DEVICE_ID')))
- # # context.reset_auto_parallel_context()
- # # context.set_auto_parallel_context(device_num = device_num, parallel_mode=ParallelMode.DATA_PARALLEL, gradients_mean=True, parameter_broadcast=True)
- # # init()
- # #Copying obs data does not need to be executed multiple times, just let the 0th card copy the data
- # local_rank=int(os.getenv('RANK_ID'))
- # if local_rank%8==0:
- # ObsToEnv(obs_data_url,data_dir)
- # #If the cache file does not exist, it means that the copy data has not been completed,
- # #and Wait for 0th card to finish copying data
- # while not os.path.exists("/cache/download_input.txt"):
- # time.sleep(1)
- # return
- def UploadToQizhi(train_dir, obs_train_url):
- device_num = int(os.getenv('RANK_SIZE'))
- local_rank=int(os.getenv('RANK_ID'))
- if device_num == 1:
- EnvToObs(train_dir, obs_train_url)
- if device_num > 1:
- if local_rank%8==0:
- EnvToObs(train_dir, obs_train_url)
- return
-
- LOCAL_RANK = int(os.getenv("RANK_ID", 0))
-
-
- def run_with_single_rank(
- local_rank: int = 0, signal: str = "/tmp/SUCCESS"
- ) -> Callable[..., Any]:
- """Run the task on 0th rank, perform synchronzation before return"""
-
- def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
- @functools.wraps(func)
- def wrapper(*args: Any, **kwargs: Any) -> Any:
- if local_rank == 0:
- result = func(*args, **kwargs)
- with open(signal, "w") as f:
- f.write("\n")
- return result
- else:
- while not os.path.isfile(signal):
- time.sleep(1)
-
- return wrapper
-
- return decorator
-
-
- @run_with_single_rank(local_rank=LOCAL_RANK, signal="/tmp/INSTALL_SUCCESS")
- def install_packages(project_dir: str) -> None:
- url = "https://pypi.tuna.tsinghua.edu.cn/simple"
- requirement_txt = os.path.join(project_dir, "requirements.txt")
- subprocess.check_call(
- [sys.executable, "-m", "pip", "install", "--upgrade", "pip"]
- )
- subprocess.check_call(
- [sys.executable, "-m", "pip", "install", "-i", url, "-r", requirement_txt]
- )
-
-
- def parse_args() -> argparse.Namespace:
-
- parser = argparse.ArgumentParser()
- parser.add_argument("--config_path", type=str, default="configs/ICDAR2015_config.yaml",#"configs/CTW1500_config.yaml",
- help="Config file (.yaml) path")
- # add arguments
- # the follow arguments are proviced by OpenI, do not change.
- parser.add_argument("--device_target", help="Device target")
- parser.add_argument("--data_url", help="Path of the data url in S3")
- parser.add_argument(
- "--train_url", help="Path of the training output in S3")
- parser.add_argument("--ckpt_url", help="Path of the ckpt in S3")
- args = parser.parse_args()
-
- return args
-
-
- if __name__ == "__main__":
- print(os.environ)
-
- args = parse_args()
-
- # locate the path of the project
- project_dir = os.path.dirname(os.path.abspath(__file__))
-
- args.project_dir = project_dir
- args.config_path = os.path.join(project_dir, args.config_path)
-
- # install necessary packages
- install_packages(project_dir)
-
- from train_openi import train
-
- args.train_dir = '/cache/output'
-
- if not os.path.exists(args.train_dir):
- os.makedirs(args.train_dir)
-
- ###Initialize and copy data to training image
- # DownloadFromQizhi(args.data_url, data_dir=args.project_dir+'/data')
- ###The dataset path is used here:data_dir +"/train"
-
- # start traning job
- train(args)
-
- # copy output from local to s3
- UploadToQizhi(args.train_dir,args.train_url)
|