@@ -12,6 +12,8 @@ import (
"path"
"strings"
"code.gitea.io/gitea/entity"
"code.gitea.io/gitea/manager/client/grampus"
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/cloudbrain"
"code.gitea.io/gitea/modules/context"
@@ -21,6 +23,7 @@ import (
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/storage"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/services/ai_task_service/storage_helper"
uuid "github.com/satori/go.uuid"
)
@@ -34,7 +37,7 @@ const (
MXNET_ENGINE = 6
ModelMountPath = "/model"
CodeMountPath = "/code"
DataSetMountPath = "/dataset"
DataSetMountPath = "/tmp/ dataset"
LogFile = "log.txt"
DefaultBranchName = "master"
SubTaskName = "task1"
@@ -205,7 +208,6 @@ func createNpuTrainJob(modelConvert *models.AiModelConvert, ctx *context.Context
return err
}
deleteLocalDir(codeLocalPath)
intputshape := strings.Split(modelConvert.InputShape, ",")
n := "256"
c := "1"
@@ -416,12 +418,22 @@ func createGpuTrainJob(modelConvert *models.AiModelConvert, ctx *context.Context
}
log.Info("command=" + command)
codePath := setting.JobPath + modelConvert.ID + CodeMountPath
downloadConvertCode(setting.ModelConvert.ConvertRepoPath, codePath, DefaultBranchName)
codeTmpPath := setting.JobPath + modelConvert.ID + CodeMountPath + "tmp"
uploader := storage_helper.SelectUploaderFromStorageType(entity.MINIO)
codeRemoteDir := path.Join(uploader.GetJobDefaultObjectKeyPrefix(modelConvert.ID), "code")
log.Info("codePath=" + codePath)
log.Info("codeTmpPath=" + codeTmpPath)
log.Info("codeRemoteDir=" + codeRemoteDir)
downloadConvertCode(setting.ModelConvert.ConvertRepoPath, codeTmpPath, DefaultBranchName)
Zip(codePath+"/master.zip", codeTmpPath)
uploadCodeToMinio(codePath+"/", modelConvert.ID, CodeMountPath+"/")
deleteLocalDir(codePath)
deleteLocalDir(codeTmpPath)
minioCodePath := setting.Attachment.Minio.RealPath + setting.Attachment.Minio.Bucket + "/" + setting.CBCodePathPrefix + modelConvert.ID + "/code"
log.Info("minio codePath=" + minioCodePath)
@@ -436,73 +448,330 @@ func createGpuTrainJob(modelConvert *models.AiModelConvert, ctx *context.Context
minioModelPath := setting.Attachment.Minio.RealPath + setting.Attachment.Minio.Bucket + "/" + setting.CBCodePathPrefix + modelConvert.ID + "/model"
log.Info("minio model path=" + minioModelPath)
if TrainResourceSpecs == nil {
json.Unmarshal([]byte(setting.TrainResourceSpecs), &TrainResourceSpecs)
}
resourceSpec := TrainResourceSpecs.ResourceSpec[setting.ModelConvert.GPU_Resource_Specs_ID]
jobResult, err := cloudbrain.CreateJob(modelConvert.ID, models.CreateJobParams{
JobName: modelConvert.ID,
RetryCount: 1,
GpuType: setting.ModelConvert.GpuQueue,
Image: IMAGE_URL,
TaskRoles: []models.TaskRole{
{
Name: SubTaskName,
TaskNumber: 1,
MinSucceededTaskCount: 1,
MinFailedTaskCount: 1,
CPUNumber: resourceSpec.CpuNum,
GPUNumber: resourceSpec.GpuNum,
MemoryMB: resourceSpec.MemMiB,
ShmMB: resourceSpec.ShareMemMiB,
Command: command,
NeedIBDevice: false,
IsMainRole: false,
UseNNI: false,
},
},
Volumes: []models.Volume{
{
HostPath: models.StHostPath{
Path: minioCodePath,
MountPath: CodeMountPath,
ReadOnly: false,
datasetRemoteDir := path.Join(uploader.GetJobDefaultObjectKeyPrefix(modelConvert.ID), "dataset")
outputRemoteDir := path.Join(uploader.GetJobDefaultObjectKeyPrefix(modelConvert.ID), "model")
datasetDirectoryObjectKey := datasetRemoteDir
if !strings.HasSuffix(datasetRemoteDir, "/") {
datasetDirectoryObjectKey = datasetRemoteDir + "/"
}
codeObjectKey := codeRemoteDir + "/master.zip"
log.Info("codeObjectKey=" + codeObjectKey)
log.Info("uploader.GetRealPath(codeObjectKey)=" + uploader.GetRealPath(codeObjectKey))
req := entity.CreateTrainTaskRequest{
Name: modelConvert.ID,
DisplayJobName: modelConvert.Name,
Description: "",
TaskConfig: getGrampusTrainTaskConfig(),
Tasks: []entity.TrainTask{{
Command: command,
Name: modelConvert.ID,
ResourceSpecId: setting.ModelConvert.GPU_Resource_Specs_ID, //toDO
ImageId: "", //form.ImageID,
ImageUrl: IMAGE_URL,
Datasets: []entity.ContainerData{
entity.ContainerData{
ContainerPath: DataSetMountPath,
Name: "dataset",
ReadOnly: false,
ObjectKey: datasetDirectoryObjectKey,
RealPath: uploader.GetRealPath(datasetRemoteDir),
Bucket: uploader.GetBucket(),
EndPoint: uploader.GetEndpoint(),
GetBackEndpoint: uploader.GetEndpoint(),
IsDir: true,
StorageType: entity.MINIO,
},
},
{
HostPath: models.StHostPath{
Path: dataActualPath,
MountPath: DataSetMountPath,
ReadOnly: true,
Code: []entity.ContainerData{
entity.ContainerData{
Name: strings.ToLower(ctx.Repo.Repository.Name),
Bucket: uploader.GetBucket(),
EndPoint: uploader.GetEndpoint(),
ObjectKey: codeObjectKey,
ReadOnly: true,
ContainerPath: "/tmp/code/master.zip",
RealPath: uploader.GetRealPath(codeObjectKey),
IsDir: true,
S3DownloadUrl: uploader.GetS3DownloadUrl(codeObjectKey),
StorageType: entity.MINIO,
},
},
{
HostPath: models.StHostPath{
Path: minioModelPath,
MountPath: ModelMountPath,
ReadOnly: false,
Queues: []models.ResourceQueue{
models.ResourceQueue{
AiCenterCode: setting.ModelConvert.GPU_AiCenter_Code,
},
},
},
})
PreTrainModel: nil,
BootFile: "",
OutPut: []entity.ContainerData{{
ContainerPath: "/tmp/output",
ReadOnly: false,
ObjectKey: outputRemoteDir,
RealPath: uploader.GetRealPath(outputRemoteDir),
Bucket: uploader.GetBucket(),
EndPoint: uploader.GetEndpoint(),
GetBackEndpoint: uploader.GetEndpoint(),
IsDir: true,
StorageType: entity.MINIO,
}},
Params: models.Parameters{},
Spec: &models.Specification{
ID: setting.ModelConvert.GPU_Spec_ID,
SourceSpecId: setting.ModelConvert.GPU_Resource_Specs_ID,
AccCardsNum: 1,
AccCardType: setting.ModelConvert.GPU_Spec_AccCardType,
ComputeResource: "GPU",
AiCenterCode: setting.ModelConvert.GPU_AiCenter_Code,
},
RepoName: ctx.Repo.Repository.Name,
WorkServerNumber: 1,
}},
}
reqJson, _ := json.Marshal(req)
log.Info("reqJson=" + string(reqJson))
jobResult, err := createGrampusTrainJob(req, command)
if err != nil {
log.Error("CreateJob failed:", err.Error(), ctx.Data["MsgID"])
models.UpdateModelConvertFailed(modelConvert.ID, "FAILED", err.Error())
return err
}
if jobResult.Code != Success {
log.Error("CreateJob(%s) failed:%s", modelConvert.ID, jobResult.Msg, ctx.Data["MsgID"])
models.UpdateModelConvertFailed(modelConvert.ID, "FAILED", err.Error())
return errors.New(jobResult.Msg)
jobResultJson, _ := json.Marshal(jobResult)
log.Info("jobResultJson=" + string(jobResultJson))
if jobResult.ErrorCode != 0 {
log.Error("CreateJob(%s) failed:%s", modelConvert.ID, jobResult.ErrorMsg, ctx.Data["MsgID"])
models.UpdateModelConvertFailed(modelConvert.ID, "FAILED", jobResult.ErrorMsg)
return errors.New(jobResult.ErrorMsg)
}
var jobID = jobResult.Payload["jobId"].(string)
var jobID = jobResult.JobInfo.JobID
log.Info("jobId=" + jobID)
models.UpdateModelConvertCBTI(modelConvert.ID, jobID)
return nil
}
func getGrampusTrainTaskConfig() *entity.AITaskBaseConfig {
codePath := "/tmp/code"
datasetPath := "/tmp/dataset"
pretrainModelPath := "/tmp/pretrainmodel"
outputPath := "/tmp/output"
var config = &entity.AITaskBaseConfig{
ContainerSteps: map[entity.ContainerDataType]*entity.ContainerBuildOpts{
entity.ContainerCode: {
ContainerPath: codePath,
StorageRelativePath: cloudbrain.CodeMountPath,
ReadOnly: false,
AcceptStorageType: []entity.StorageType{entity.MINIO, entity.OBS},
},
entity.ContainerDataset: {
ContainerPath: datasetPath,
ReadOnly: true,
AcceptStorageType: []entity.StorageType{entity.MINIO, entity.OBS},
},
entity.ContainerPreTrainModel: {
ContainerPath: pretrainModelPath,
ReadOnly: true,
AcceptStorageType: []entity.StorageType{entity.MINIO, entity.OBS},
},
entity.ContainerOutPutPath: {
ContainerPath: outputPath,
StorageRelativePath: cloudbrain.ModelMountPath,
ReadOnly: false,
AcceptStorageType: []entity.StorageType{entity.MINIO},
MKDIR: false,
},
},
}
config.ActionType = models.ActionCreateGrampusGPUTrainTask
config.IsActionUseJobId = true
return config
}
func createGrampusTrainJob(req entity.CreateTrainTaskRequest, exeCommand string) (*models.CreateGrampusJobResponse, error) {
jobResult, err := grampus.CreateJob(convertTrainReq2Grampus(req, exeCommand))
if err != nil {
log.Error("CreateNoteBook failed: %v", err.Error())
return nil, err
}
return jobResult, nil
}
func convertTrainReq2Grampus(req entity.CreateTrainTaskRequest, exeCommand string) models.CreateGrampusJobRequest {
command := generateGrampusTrainCommand(req, exeCommand)
tasks := make([]models.GrampusTasks, len(req.Tasks))
for i := 0; i < len(req.Tasks); i++ {
t := req.Tasks[i]
tasks[i] = convertTrainTask2Grampus(t, command)
}
return models.CreateGrampusJobRequest{Name: req.Name, Tasks: tasks}
}
func convertTrainTask2Grampus(t entity.TrainTask, command string) models.GrampusTasks {
return models.GrampusTasks{
Name: t.Name,
ResourceSpecId: t.ResourceSpecId,
ImageId: t.ImageId,
ImageUrl: t.ImageUrl,
Datasets: convertContainerArray2GrampusArray(t.Datasets),
Code: convertContainerArray2Grampus(t.Code),
Command: command,
CenterID: []string{t.Queues[0].AiCenterCode},
ReplicaNum: 1,
Models: convertContainerArray2GrampusArray(t.PreTrainModel),
BootFile: t.BootFile,
OutPut: convertContainerArray2Grampus(t.OutPut),
WorkServerNumber: t.WorkServerNumber,
}
}
func convertContainerArray2GrampusArray(containerDatas []entity.ContainerData) []models.GrampusDataset {
res := make([]models.GrampusDataset, len(containerDatas))
for i := 0; i < len(containerDatas); i++ {
d := containerDatas[i]
res[i] = convertContainer2Grampus(d)
}
return res
}
func convertContainerArray2Grampus(containerDatas []entity.ContainerData) models.GrampusDataset {
res := models.GrampusDataset{}
if containerDatas != nil && len(containerDatas) > 0 {
res = convertContainer2Grampus(containerDatas[0])
}
return res
}
func convertContainer2Grampus(d entity.ContainerData) models.GrampusDataset {
return models.GrampusDataset{
Name: d.Name,
Bucket: d.Bucket,
EndPoint: d.EndPoint,
ObjectKey: d.ObjectKey,
ContainerPath: d.ContainerPath,
ReadOnly: d.ReadOnly,
GetBackEndpoint: d.GetBackEndpoint,
Size: d.Size,
}
}
func generateGrampusTrainCommand(req entity.CreateTrainTaskRequest, exeCommand string) string {
t := req.Tasks[0]
containerConfig := req.TaskConfig
computeResource := t.Spec.ComputeResource
var codePath = containerConfig.GetContainerPath(entity.ContainerCode)
var modelPath = containerConfig.GetContainerPath(entity.ContainerPreTrainModel)
var datasetPath = containerConfig.GetContainerPath(entity.ContainerDataset)
var outputPath = containerConfig.GetContainerPath(entity.ContainerOutPutPath)
builder := &entity.CommandBuilder{}
builder.
//mkdir dirs
Add(buildMkdirCommand(codePath, modelPath, datasetPath, outputPath)).
//unzip code
Add(buildUnzipCodeCommand(codePath, t.Code[0].ContainerPath, computeResource)).
//unzip dataset
Add(buildUnzipDatasetCommand(t.Datasets, datasetPath, computeResource)).
//export
Add(buildExportCommand(req.Name, computeResource)).
//exec code
Add(buildExeCommand(exeCommand))
return builder.ToString()
}
func buildExeCommand(exeCommand ...string) *entity.CommandBuilder {
builder := &entity.CommandBuilder{}
for _, dir := range exeCommand {
builder.Next(entity.NewCommand(dir))
}
return builder
}
func buildMkdirCommand(dirs ...string) *entity.CommandBuilder {
builder := &entity.CommandBuilder{}
for _, dir := range dirs {
builder.Next(entity.NewCommand("mkdir", "-p", dir))
}
return builder
}
func buildUnzipCodeCommand(codeConfigPath, codeFilePath, computeSource string) *entity.CommandBuilder {
builder := &entity.CommandBuilder{}
if computeSource == models.NPU {
return builder
}
builder.
Next(entity.NewCommand("echo", "'start to unzip code'")).
Next(entity.NewCommand("cd", codeConfigPath)).
Next(entity.NewCommand("unzip", "-q", codeFilePath)).
Next(entity.NewCommand("echo", "'unzip code finished'")).
Next(entity.NewCommand("ls", "-l")).
Next(entity.NewCommand("ls", "-l", "mnist_pytorchexample_gpu"))
return builder
}
func buildUnzipDatasetCommand(datasets []entity.ContainerData, datasetPath, computeSource string) *entity.CommandBuilder {
builder := &entity.CommandBuilder{}
if computeSource == models.NPU {
return builder
}
if len(datasets) == 0 {
return nil
}
builder.Next(entity.NewCommand("cd", datasetPath)).
Next(entity.NewCommand("echo", "'start to unzip datasets'"))
fileDatasets := make([]entity.ContainerData, 0)
for _, dataset := range datasets {
if !dataset.IsDir {
fileDatasets = append(fileDatasets, dataset)
}
}
//单数据集
if len(fileDatasets) == 1 {
if strings.HasSuffix(fileDatasets[0].Name, ".tar.gz") {
builder.Next(entity.NewCommand("tar", "--strip-components=1", "-zxvf", "'"+fileDatasets[0].Name+"'"))
} else {
builder.Next(entity.NewCommand("unzip", "-q", "'"+fileDatasets[0].Name+"'"))
}
builder.Next(entity.NewCommand("ls", "-l"))
builder.Next(entity.NewCommand("echo", "'unzip datasets finished'"))
return builder
}
//多数据集
for i := 0; i < len(fileDatasets); i++ {
name := fileDatasets[i].Name
if strings.HasSuffix(name, ".tar.gz") {
builder.Next(entity.NewCommand("tar", "-zxvf", name))
} else {
builder.Next(entity.NewCommand("unzip", "-q", "'"+name+"'", "-d", "'./"+strings.TrimSuffix(name, ".zip")+"'"))
}
}
builder.Next(entity.NewCommand("ls", "-l"))
builder.Next(entity.NewCommand("echo", "'unzip datasets finished'"))
return builder
}
func buildExportCommand(jobName, computeResource string) *entity.CommandBuilder {
builder := &entity.CommandBuilder{}
if computeResource == models.NPU {
outputRemotePath := setting.CodePathPrefix + jobName + modelarts.OutputPath
builder.Next(entity.NewCommand("export", "bucket="+setting.Grampus.Env, "&&", "export", "remote_path="+outputRemotePath))
} else {
outputRemotePath := setting.CBCodePathPrefix + jobName + cloudbrain.ModelMountPath + "/"
builder.Next(entity.NewCommand("export", "env="+setting.Grampus.Env, "&&", "export", "remote_path="+outputRemotePath))
}
return builder
}
func deleteLocalDir(dirpath string) {
//TODO delete
_err := os.RemoveAll(dirpath)
@@ -527,15 +796,13 @@ func getGpuModelConvertCommand(name string, modelFile string, modelConvert *mode
h = inputshape[2]
w = inputshape[3]
}
command += "python3 /code/" + bootfile + " --model " + modelFile + " --n " + n + " --c " + c + " --h " + h + " --w " + w
command += "list -all /tmp/code;list -all /tmp/dataset; python3 /tmp /code/" + bootfile + " --model " + modelFile + " --n " + n + " --c " + c + " --h " + h + " --w " + w
if modelConvert.DestFormat == CONVERT_FORMAT_TRT {
if modelConvert.NetOutputFormat == NetOutputFormat_FP16 {
command += " --fp16 True"
} else {
command += " --fp16 False"
}
}
command += " > " + ModelMountPath + " /" + name + "-" + LogFile
command += " > /tmp/output /" + name + "-" + LogFile
return command
}
@@ -557,9 +824,13 @@ func DeleteModelConvert(ctx *context.Context) {
func deleteCloudBrainTask(task *models.AiModelConvert) {
if task.IsGpuTrainTask() {
log.Info("delete cloudbrain one resource.")
log.Info("delete grampus model convert task.")
_, err := grampus.DeleteJob(task.CloudBrainTaskId)
if err != nil {
log.Error("Delete grampus job failed:%v", err)
}
dirPath := setting.CBCodePathPrefix + task.ID + "/"
err := storage.Attachments.DeleteDir(dirPath)
err = storage.Attachments.DeleteDir(dirPath)
if err != nil {
log.Error("DeleteDir(%s) failed:%v", dirPath, err)
}
@@ -579,7 +850,7 @@ func stopModelConvert(id string) error {
return err
}
if job.IsGpuTrainTask() {
err = cloudbrain .StopJob(job.CloudBrainTaskId)
_, err = grampus .StopJob(job.CloudBrainTaskId)
if err != nil {
log.Error("Stop cloudbrain Job(%s) failed:%v", job.CloudBrainTaskId, err)
}
@@ -662,34 +933,33 @@ func ShowModelConvertInfo(ctx *context.Context) {
ctx.HTML(200, tplModelConvertInfo)
return
}
result, err := cloudbrain .GetJob(job.CloudBrainTaskId)
jobResult, err := grampus .GetJob(job.CloudBrainTaskId)
if err != nil {
log.Info("error:" + err.Error())
ctx.Data["error"] = err.Error()
ctx.HTML(200, tplModelConvertInfo)
return
}
if result != nil {
jobRes, _ := models.ConvertToJobResultPayload(result.Payload)
ctx.Data["result"] = jobRes
taskRoles := jobRes.TaskRoles
taskRes, _ := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{}))
ctx.Data["taskRes"] = taskRes
ctx.Data["ExitDiagnostics"] = taskRes.TaskStatuses[0].ExitDiagnostics
ctx.Data["AppExitDiagnostics"] = jobRes.JobStatus.AppExitDiagnostics
job.Status = jobRes.JobStatus.State
if jobRes.JobStatus.State != string(models.JobWaiting) && jobRes.JobStatus.State != string(models.JobFailed) {
job.ContainerIp = taskRes.TaskStatuses[0].ContainerIP
job.ContainerID = taskRes.TaskStatuses[0].ContainerID
job.Status = taskRes.TaskStatuses[0].State
jobEvent, err := grampus.GetTrainJobEvents(job.CloudBrainTaskId)
jobEventJson, _ := json.Marshal(jobEvent)
log.Info("jobEventJson=" + string(jobEventJson))
if jobEvent != nil {
ctx.Data["AppExitDiagnostics"] = string(jobEventJson)
}
if jobResult != nil {
ctx.Data["ExitDiagnostics"] = jobResult.ExitDiagnostics
if jobResult.JobInfo.Status == models.GrampusStatusPending {
job.Status = models.GrampusStatusWaiting
} else {
job.Status = strings.ToUpper(jobResult.JobInfo.Status)
}
if jobRes.JobStatus.State != string(models.JobWaiting) {
models.ModelComputeAndSetDuration(job, jobRes)
job.StartTime = timeutil.TimeStamp(jobResult.JobInfo.StartedAt)
job.EndTime = timeutil.TimeStamp(jobResult.JobInfo.CompletedAt)
if strings.ToUpper(jobResult.JobInfo.Status) != models.GrampusStatusWaiting && jobResult.JobInfo.Status != models.GrampusStatusPending {
models.ModelConvertSetDuration(job)
err = models.UpdateModelConvert(job)
if err != nil {
log.Error("UpdateModelConvert failed:", err)
log.Error("UpdateJob failed:", err)
}
}
}