#4343 fix-4166- 多节点结果回传

Merged
zouap merged 10 commits from fix-4166 into V20230531 10 months ago
  1. +2
    -2
      routers/repo/grampus.go
  2. +37
    -8
      services/ai_task_service/schedule/model_schedule.go

+ 2
- 2
routers/repo/grampus.go View File

@@ -1210,12 +1210,12 @@ func grampusTrainJobNpuCreate(ctx *context.Context, form auth.CreateGrampusTrain
}

//todo: upload code (send to file_server todo this work?)
if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.OutputPath); err != nil {
/**if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.OutputPath); err != nil {
log.Error("Failed to obsMkdir_output: %s (%v)", repo.FullName(), err)
grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tpl, &form)
return
}
}*/

if err := uploadCodeToObs(codeLocalPath, jobName, ""); err != nil {
log.Error("Failed to uploadCodeToObs: %s (%v)", repo.FullName(), err)


+ 37
- 8
services/ai_task_service/schedule/model_schedule.go View File

@@ -2,6 +2,18 @@ package schedule

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"os/exec"
"path"
"strings"
"time"

"code.gitea.io/gitea/modules/modelarts"

"code.gitea.io/gitea/modules/obs"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/grampus"
"code.gitea.io/gitea/modules/labelmsg"
@@ -11,14 +23,7 @@ import (
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/storage"
"code.gitea.io/gitea/modules/util"
"encoding/json"
"errors"
"fmt"
"github.com/minio/minio-go"
"os/exec"
"path"
"strings"
"time"
)

const NPUModelDefaultName = "models.zip"
@@ -205,6 +210,12 @@ func LocalMigrateOperate(jobName, computeSource string, r *models.ModelMigrateRe
}
if computeSource == models.NPUResource {
//因为NPU的输出会被压缩,因此需要解压+移桶
if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.OutputPath); err != nil {
log.Error("Failed to obsMkdir_output: %s (%v)", jobName, err)

return err
}
log.Info("DestObjectKey", r.DestObjectKey)
if strings.Contains(r.DestObjectKey, ".") {
decompress(r.DestBucket+"/"+r.DestObjectKey, setting.Bucket+"/"+strings.TrimSuffix(r.DestObjectKey, models.ModelSuffix))

@@ -216,7 +227,12 @@ func LocalMigrateOperate(jobName, computeSource string, r *models.ModelMigrateRe
}

for _, fileInfo := range fileInfos {
decompress(r.DestBucket+"/"+r.DestObjectKey+"/"+fileInfo.FileName, setting.Bucket+"/"+strings.TrimSuffix(r.DestObjectKey, models.ModelSuffix))
log.Info("decompress file:", fileInfo.FileName)
sourceFilPath := r.DestBucket + "/" + r.DestObjectKey + fileInfo.FileName
if !strings.HasSuffix(r.DestObjectKey, "/") {
sourceFilPath = r.DestBucket + "/" + r.DestObjectKey + "/" + fileInfo.FileName
}
decompress(sourceFilPath, setting.Bucket+"/"+strings.TrimSuffix(r.DestObjectKey, models.ModelSuffix))
}

}
@@ -248,6 +264,19 @@ func LocalMigrateOperate(jobName, computeSource string, r *models.ModelMigrateRe
return nil
}

func obsMkdir(dir string) error {
input := &obs.PutObjectInput{}
input.Bucket = setting.Bucket
input.Key = dir
_, err := storage.ObsCli.PutObject(input)
if err != nil {
log.Error("PutObject(%s) failed: %s", input.Key, err.Error())
return err
}

return nil
}

func TryToUpdateNPUMoveBucketResult(record *models.ModelMigrateRecord, jobName, versionName string) error {
if IsNPUModelDirHasFile(jobName, versionName) {
if err := models.UpdateModelMigrateStatusByStep(record, models.BucketMoveSuccess); err != nil {


Loading…
Cancel
Save