#4615 fix-4588分支测试通过合并到里程碑

Merged
ychao_1983 merged 5 commits from fix-4588 into V20230808 9 months ago
  1. +1
    -2
      services/ai_task_service/cluster/c2net.go
  2. +71
    -61
      services/ai_task_service/schedule/model_schedule.go

+ 1
- 2
services/ai_task_service/cluster/c2net.go View File

@@ -492,9 +492,8 @@ func buildExecCodeCommand(codeDirPath, modelFilePath, bootFile, computeResource,
paramCode += " --'" + param.Label + "'='" + param.Value + "'"
}
if computeResource == models.NPU {
modelRemoteObsUrl := getNpuModelRemoteObsUrl(jobName)
builder.Next(entity.NewCommand("source", "/home/ma-user/.bashrc")).
Next(entity.NewCommand("python", "/home/ma-user/davinci/train/davincirun.py", "python", "/home/ma-user/openi.py", paramCode, "--model_url="+modelRemoteObsUrl))
Next(entity.NewCommand("python", "/home/ma-user/davinci/train/davincirun.py", "python", "/home/ma-user/grampus.py", paramCode))
} else if computeResource == models.GCU {
builder.Next(entity.NewCommand("cd", codeDirPath))
if modelFilePath != "" {


+ 71
- 61
services/ai_task_service/schedule/model_schedule.go View File

@@ -10,8 +10,6 @@ import (
"strings"
"time"

"code.gitea.io/gitea/modules/modelarts"

"code.gitea.io/gitea/modules/obs"

"code.gitea.io/gitea/models"
@@ -167,12 +165,6 @@ func HandleUnfinishedMigrateRecord(r *models.ModelMigrateRecord) error {
}
}

if r.CurrentStep == models.BucketMoving {
//尝试查询NPU结果目录下是否有文件,有文件则认为已经解压成功
if cloudbrain.ComputeResource == models.NPUResource && IsNPUModelDirHasFile(cloudbrain.JobName, cloudbrain.VersionName) {
TryToUpdateNPUMoveBucketResult(r, cloudbrain.JobName, cloudbrain.VersionName)
}
}
return nil
}

@@ -203,66 +195,28 @@ func LocalMigrateOperate(jobName, computeSource string, r *models.ModelMigrateRe
log.Error("UpdateModelMigrateStatusByStep err. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveFailed, err)
return err
}
//因为调度无法指定桶,所以调度成功后我们还需要移桶
if computeSource == models.NPUResource {
//因为NPU的输出会被压缩,因此需要解压+移桶
if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.OutputPath); err != nil {
log.Error("Failed to obsMkdir_output: %s (%v)", jobName, err)

return err
}
log.Info("DestObjectKey", r.DestObjectKey)
if strings.Contains(r.DestObjectKey, ".") {
isExists, _ := storage.IsObjectExist4Obs(r.DestBucket, r.DestObjectKey)
if !isExists {
//此时没有文件需要解压迁移,直接更新为成功
models.UpdateModelMigrateStatusByStep(r, models.BucketMoveSuccess)
targetObjectPrefix := strings.TrimSuffix(r.DestObjectKey, models.ModelSuffix)
if err := MoveBucketInOpenIOBS(r.DestObjectKey, targetObjectPrefix, r.DestBucket, setting.Bucket); err != nil {
log.Error("MoveBucketInOpenIMinio err.%v", err)
if tmpErr := models.UpdateModelMigrateStatusByStep(r, models.BucketMoveFailed); tmpErr != nil {
log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveFailed, tmpErr)
}
decompress(r.DestBucket+"/"+r.DestObjectKey, setting.Bucket+"/"+strings.TrimSuffix(r.DestObjectKey, models.ModelSuffix))

} else { //如果是文件夹,遍历文件
fileInfos, err := storage.GetOneLevelObjectsUnderDir(r.DestBucket, "", r.DestObjectKey)
if err != nil {
log.Error("UpdateModelMigrateStatusByStep err. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveFailed, err)
return err
}
if len(fileInfos) == 0 {
//此时没有文件需要解压迁移,直接更新为成功
models.UpdateModelMigrateStatusByStep(r, models.BucketMoveSuccess)
}
for _, fileInfo := range fileInfos {
log.Info("decompress file:", fileInfo.FileName)
sourceFilPath := r.DestBucket + "/" + r.DestObjectKey + fileInfo.FileName
if !strings.HasSuffix(r.DestObjectKey, "/") {
sourceFilPath = r.DestBucket + "/" + r.DestObjectKey + "/" + fileInfo.FileName
}
decompress(sourceFilPath, setting.Bucket+"/"+strings.TrimSuffix(r.DestObjectKey, models.ModelSuffix))
}

return err
}

} else {
//因为调度无法指定桶,所以调度成功后我们还需要移桶
if setting.UseLocalMinioMigrate {
if err := MoveBucketJust4LocalMinio(r.DestObjectKey, grampus.GetGPUModelObjectKey(jobName), r.DestBucket, setting.Attachment.Minio.Bucket); err != nil {
log.Error("MoveBucketJust4LocalMinio err.%v", err)
if tmpErr := models.UpdateModelMigrateStatusByStep(r, models.BucketMoveFailed); tmpErr != nil {
log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveFailed, tmpErr)
}
return err
}
} else {
if err := MoveBucketInOpenIMinio(r.DestObjectKey, grampus.GetGPUModelObjectKey(jobName), r.DestBucket, setting.Attachment.Minio.Bucket); err != nil {
log.Error("MoveBucketInOpenIMinio err.%v", err)
if tmpErr := models.UpdateModelMigrateStatusByStep(r, models.BucketMoveFailed); tmpErr != nil {
log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveFailed, tmpErr)
}
return err
if err := MoveBucketInOpenIMinio(r.DestObjectKey, grampus.GetGPUModelObjectKey(jobName), r.DestBucket, setting.Attachment.Minio.Bucket); err != nil {
log.Error("MoveBucketInOpenIMinio err.%v", err)
if tmpErr := models.UpdateModelMigrateStatusByStep(r, models.BucketMoveFailed); tmpErr != nil {
log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveFailed, tmpErr)
}
return err
}

if err := models.UpdateModelMigrateStatusByStep(r, models.BucketMoveSuccess); err != nil {
log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveSuccess, err)
}
}
if err := models.UpdateModelMigrateStatusByStep(r, models.BucketMoveSuccess); err != nil {
log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveSuccess, err)
}
return nil
}
@@ -336,6 +290,40 @@ func MoveBucketInOpenIMinio(objectKeyPrefix, targetObjectPrefix, oldBucket, newB
return nil
}

func MoveBucketInOpenIOBS(objectKeyPrefix, targetObjectPrefix, oldBucket, newBucket string) error {
var client = storage.ObsCli

input := &obs.ListObjectsInput{}
input.Bucket = oldBucket
input.MaxKeys = 1000
input.Prefix = objectKeyPrefix
output, err := client.ListObjects(input)
if err != nil {
log.Error("MoveBucketInOpenIOBS list objects error.err=%v", err)
return err
}

log.Info("MoveBucketInOpenIOBS start.objectKeyPrefix=%s", objectKeyPrefix)
count := 0
for _, object := range output.Contents {
if object.Key == input.Prefix {
continue
}
if strings.HasSuffix(object.Key, "/") {
continue
}
log.Debug("MoveBucketInOpenIOBS object.Key=%s", object.Key)
newObjectKey := strings.Replace(object.Key, objectKeyPrefix, targetObjectPrefix, 1)
err := MoveOBSFileBucket(client, object.Key, newObjectKey, oldBucket, newBucket)
if err != nil {
log.Error("MoveBucketInOpenIOBS MoveOBSFileBucket object.Key=%s Err=%v", object.Key, err)
continue
}
}
log.Info("MoveBucketInOpenIOBS finished.objectKeyPrefix=%s ,total=%d", objectKeyPrefix, count)
return nil
}

func MoveBucketJust4LocalMinio(objectKeyPrefix, targetObjectPrefix, oldBucket, newBucket string) error {
oldPath := path.Join(setting.Attachment.Minio.RealPath, oldBucket, objectKeyPrefix)
newPath := path.Join(setting.Attachment.Minio.RealPath, newBucket, targetObjectPrefix)
@@ -387,6 +375,28 @@ func MoveMinioFileBucket(core *minio.Core, oldObjectKey, newObjectKey, oldBucket
return err
}

func MoveOBSFileBucket(client *obs.ObsClient, oldObjectKey, newObjectKey, oldBucket, newBucket string) error {
input := &obs.CopyObjectInput{}
input.Bucket = newBucket
input.Key = newObjectKey
input.CopySourceBucket = oldBucket
input.CopySourceKey = oldObjectKey
_, err := client.CopyObject(input)

if err != nil {
log.Error("MoveOBSFileBucket CopyObject err oldObjectKey=%s .%v", oldObjectKey, err)
return err
}
delObj := &obs.DeleteObjectInput{}
delObj.Bucket = oldBucket
delObj.Key = oldObjectKey
_, err = client.DeleteObject(delObj)
if err != nil {
log.Error("MoveOBSFileBucket DeleteObject err oldObjectKey=%s .%v", oldObjectKey, err)
}
return err
}

type DecompressReq struct {
SourceFile string `json:"source_file"`
DestPath string `json:"dest_path"`


Loading…
Cancel
Save