fix-4588
into V20230808
9 months ago
@@ -492,9 +492,8 @@ func buildExecCodeCommand(codeDirPath, modelFilePath, bootFile, computeResource, | |||
paramCode += " --'" + param.Label + "'='" + param.Value + "'" | |||
} | |||
if computeResource == models.NPU { | |||
modelRemoteObsUrl := getNpuModelRemoteObsUrl(jobName) | |||
builder.Next(entity.NewCommand("source", "/home/ma-user/.bashrc")). | |||
Next(entity.NewCommand("python", "/home/ma-user/davinci/train/davincirun.py", "python", "/home/ma-user/openi.py", paramCode, "--model_url="+modelRemoteObsUrl)) | |||
Next(entity.NewCommand("python", "/home/ma-user/davinci/train/davincirun.py", "python", "/home/ma-user/grampus.py", paramCode)) | |||
} else if computeResource == models.GCU { | |||
builder.Next(entity.NewCommand("cd", codeDirPath)) | |||
if modelFilePath != "" { | |||
@@ -10,8 +10,6 @@ import ( | |||
"strings" | |||
"time" | |||
"code.gitea.io/gitea/modules/modelarts" | |||
"code.gitea.io/gitea/modules/obs" | |||
"code.gitea.io/gitea/models" | |||
@@ -167,12 +165,6 @@ func HandleUnfinishedMigrateRecord(r *models.ModelMigrateRecord) error { | |||
} | |||
} | |||
if r.CurrentStep == models.BucketMoving { | |||
//尝试查询NPU结果目录下是否有文件,有文件则认为已经解压成功 | |||
if cloudbrain.ComputeResource == models.NPUResource && IsNPUModelDirHasFile(cloudbrain.JobName, cloudbrain.VersionName) { | |||
TryToUpdateNPUMoveBucketResult(r, cloudbrain.JobName, cloudbrain.VersionName) | |||
} | |||
} | |||
return nil | |||
} | |||
@@ -203,53 +195,17 @@ func LocalMigrateOperate(jobName, computeSource string, r *models.ModelMigrateRe | |||
log.Error("UpdateModelMigrateStatusByStep err. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveFailed, err) | |||
return err | |||
} | |||
if computeSource == models.NPUResource { | |||
//因为NPU的输出会被压缩,因此需要解压+移桶 | |||
if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.OutputPath); err != nil { | |||
log.Error("Failed to obsMkdir_output: %s (%v)", jobName, err) | |||
return err | |||
} | |||
log.Info("DestObjectKey", r.DestObjectKey) | |||
if strings.Contains(r.DestObjectKey, ".") { | |||
isExists, _ := storage.IsObjectExist4Obs(r.DestBucket, r.DestObjectKey) | |||
if !isExists { | |||
//此时没有文件需要解压迁移,直接更新为成功 | |||
models.UpdateModelMigrateStatusByStep(r, models.BucketMoveSuccess) | |||
} | |||
decompress(r.DestBucket+"/"+r.DestObjectKey, setting.Bucket+"/"+strings.TrimSuffix(r.DestObjectKey, models.ModelSuffix)) | |||
} else { //如果是文件夹,遍历文件 | |||
fileInfos, err := storage.GetOneLevelObjectsUnderDir(r.DestBucket, "", r.DestObjectKey) | |||
if err != nil { | |||
log.Error("UpdateModelMigrateStatusByStep err. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveFailed, err) | |||
return err | |||
} | |||
if len(fileInfos) == 0 { | |||
//此时没有文件需要解压迁移,直接更新为成功 | |||
models.UpdateModelMigrateStatusByStep(r, models.BucketMoveSuccess) | |||
} | |||
for _, fileInfo := range fileInfos { | |||
log.Info("decompress file:", fileInfo.FileName) | |||
sourceFilPath := r.DestBucket + "/" + r.DestObjectKey + fileInfo.FileName | |||
if !strings.HasSuffix(r.DestObjectKey, "/") { | |||
sourceFilPath = r.DestBucket + "/" + r.DestObjectKey + "/" + fileInfo.FileName | |||
} | |||
decompress(sourceFilPath, setting.Bucket+"/"+strings.TrimSuffix(r.DestObjectKey, models.ModelSuffix)) | |||
} | |||
} | |||
} else { | |||
//因为调度无法指定桶,所以调度成功后我们还需要移桶 | |||
if setting.UseLocalMinioMigrate { | |||
if err := MoveBucketJust4LocalMinio(r.DestObjectKey, grampus.GetGPUModelObjectKey(jobName), r.DestBucket, setting.Attachment.Minio.Bucket); err != nil { | |||
log.Error("MoveBucketJust4LocalMinio err.%v", err) | |||
if computeSource == models.NPUResource { | |||
targetObjectPrefix := strings.TrimSuffix(r.DestObjectKey, models.ModelSuffix) | |||
if err := MoveBucketInOpenIOBS(r.DestObjectKey, targetObjectPrefix, r.DestBucket, setting.Bucket); err != nil { | |||
log.Error("MoveBucketInOpenIMinio err.%v", err) | |||
if tmpErr := models.UpdateModelMigrateStatusByStep(r, models.BucketMoveFailed); tmpErr != nil { | |||
log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveFailed, tmpErr) | |||
} | |||
return err | |||
} | |||
} else { | |||
if err := MoveBucketInOpenIMinio(r.DestObjectKey, grampus.GetGPUModelObjectKey(jobName), r.DestBucket, setting.Attachment.Minio.Bucket); err != nil { | |||
log.Error("MoveBucketInOpenIMinio err.%v", err) | |||
@@ -259,11 +215,9 @@ func LocalMigrateOperate(jobName, computeSource string, r *models.ModelMigrateRe | |||
return err | |||
} | |||
} | |||
if err := models.UpdateModelMigrateStatusByStep(r, models.BucketMoveSuccess); err != nil { | |||
log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveSuccess, err) | |||
} | |||
} | |||
return nil | |||
} | |||
@@ -336,6 +290,40 @@ func MoveBucketInOpenIMinio(objectKeyPrefix, targetObjectPrefix, oldBucket, newB | |||
return nil | |||
} | |||
func MoveBucketInOpenIOBS(objectKeyPrefix, targetObjectPrefix, oldBucket, newBucket string) error { | |||
var client = storage.ObsCli | |||
input := &obs.ListObjectsInput{} | |||
input.Bucket = oldBucket | |||
input.MaxKeys = 1000 | |||
input.Prefix = objectKeyPrefix | |||
output, err := client.ListObjects(input) | |||
if err != nil { | |||
log.Error("MoveBucketInOpenIOBS list objects error.err=%v", err) | |||
return err | |||
} | |||
log.Info("MoveBucketInOpenIOBS start.objectKeyPrefix=%s", objectKeyPrefix) | |||
count := 0 | |||
for _, object := range output.Contents { | |||
if object.Key == input.Prefix { | |||
continue | |||
} | |||
if strings.HasSuffix(object.Key, "/") { | |||
continue | |||
} | |||
log.Debug("MoveBucketInOpenIOBS object.Key=%s", object.Key) | |||
newObjectKey := strings.Replace(object.Key, objectKeyPrefix, targetObjectPrefix, 1) | |||
err := MoveOBSFileBucket(client, object.Key, newObjectKey, oldBucket, newBucket) | |||
if err != nil { | |||
log.Error("MoveBucketInOpenIOBS MoveOBSFileBucket object.Key=%s Err=%v", object.Key, err) | |||
continue | |||
} | |||
} | |||
log.Info("MoveBucketInOpenIOBS finished.objectKeyPrefix=%s ,total=%d", objectKeyPrefix, count) | |||
return nil | |||
} | |||
func MoveBucketJust4LocalMinio(objectKeyPrefix, targetObjectPrefix, oldBucket, newBucket string) error { | |||
oldPath := path.Join(setting.Attachment.Minio.RealPath, oldBucket, objectKeyPrefix) | |||
newPath := path.Join(setting.Attachment.Minio.RealPath, newBucket, targetObjectPrefix) | |||
@@ -387,6 +375,28 @@ func MoveMinioFileBucket(core *minio.Core, oldObjectKey, newObjectKey, oldBucket | |||
return err | |||
} | |||
func MoveOBSFileBucket(client *obs.ObsClient, oldObjectKey, newObjectKey, oldBucket, newBucket string) error { | |||
input := &obs.CopyObjectInput{} | |||
input.Bucket = newBucket | |||
input.Key = newObjectKey | |||
input.CopySourceBucket = oldBucket | |||
input.CopySourceKey = oldObjectKey | |||
_, err := client.CopyObject(input) | |||
if err != nil { | |||
log.Error("MoveOBSFileBucket CopyObject err oldObjectKey=%s .%v", oldObjectKey, err) | |||
return err | |||
} | |||
delObj := &obs.DeleteObjectInput{} | |||
delObj.Bucket = oldBucket | |||
delObj.Key = oldObjectKey | |||
_, err = client.DeleteObject(delObj) | |||
if err != nil { | |||
log.Error("MoveOBSFileBucket DeleteObject err oldObjectKey=%s .%v", oldObjectKey, err) | |||
} | |||
return err | |||
} | |||
type DecompressReq struct { | |||
SourceFile string `json:"source_file"` | |||
DestPath string `json:"dest_path"` | |||
Dear OpenI User
Thank you for your continuous support to the Openl Qizhi Community AI Collaboration Platform. In order to protect your usage rights and ensure network security, we updated the Openl Qizhi Community AI Collaboration Platform Usage Agreement in January 2024. The updated agreement specifies that users are prohibited from using intranet penetration tools. After you click "Agree and continue", you can continue to use our services. Thank you for your cooperation and understanding.
For more agreement content, please refer to the《Openl Qizhi Community AI Collaboration Platform Usage Agreement》