|
|
@@ -10,8 +10,6 @@ import ( |
|
|
|
"strings" |
|
|
|
"time" |
|
|
|
|
|
|
|
"code.gitea.io/gitea/modules/modelarts" |
|
|
|
|
|
|
|
"code.gitea.io/gitea/modules/obs" |
|
|
|
|
|
|
|
"code.gitea.io/gitea/models" |
|
|
@@ -167,12 +165,6 @@ func HandleUnfinishedMigrateRecord(r *models.ModelMigrateRecord) error { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if r.CurrentStep == models.BucketMoving { |
|
|
|
//尝试查询NPU结果目录下是否有文件,有文件则认为已经解压成功 |
|
|
|
if cloudbrain.ComputeResource == models.NPUResource && IsNPUModelDirHasFile(cloudbrain.JobName, cloudbrain.VersionName) { |
|
|
|
TryToUpdateNPUMoveBucketResult(r, cloudbrain.JobName, cloudbrain.VersionName) |
|
|
|
} |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
@@ -203,66 +195,28 @@ func LocalMigrateOperate(jobName, computeSource string, r *models.ModelMigrateRe |
|
|
|
log.Error("UpdateModelMigrateStatusByStep err. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveFailed, err) |
|
|
|
return err |
|
|
|
} |
|
|
|
//因为调度无法指定桶,所以调度成功后我们还需要移桶 |
|
|
|
if computeSource == models.NPUResource { |
|
|
|
//因为NPU的输出会被压缩,因此需要解压+移桶 |
|
|
|
if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.OutputPath); err != nil { |
|
|
|
log.Error("Failed to obsMkdir_output: %s (%v)", jobName, err) |
|
|
|
|
|
|
|
return err |
|
|
|
} |
|
|
|
log.Info("DestObjectKey", r.DestObjectKey) |
|
|
|
if strings.Contains(r.DestObjectKey, ".") { |
|
|
|
isExists, _ := storage.IsObjectExist4Obs(r.DestBucket, r.DestObjectKey) |
|
|
|
if !isExists { |
|
|
|
//此时没有文件需要解压迁移,直接更新为成功 |
|
|
|
models.UpdateModelMigrateStatusByStep(r, models.BucketMoveSuccess) |
|
|
|
targetObjectPrefix := strings.TrimSuffix(r.DestObjectKey, models.ModelSuffix) |
|
|
|
if err := MoveBucketInOpenIOBS(r.DestObjectKey, targetObjectPrefix, r.DestBucket, setting.Bucket); err != nil { |
|
|
|
log.Error("MoveBucketInOpenIMinio err.%v", err) |
|
|
|
if tmpErr := models.UpdateModelMigrateStatusByStep(r, models.BucketMoveFailed); tmpErr != nil { |
|
|
|
log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveFailed, tmpErr) |
|
|
|
} |
|
|
|
decompress(r.DestBucket+"/"+r.DestObjectKey, setting.Bucket+"/"+strings.TrimSuffix(r.DestObjectKey, models.ModelSuffix)) |
|
|
|
|
|
|
|
} else { //如果是文件夹,遍历文件 |
|
|
|
fileInfos, err := storage.GetOneLevelObjectsUnderDir(r.DestBucket, "", r.DestObjectKey) |
|
|
|
if err != nil { |
|
|
|
log.Error("UpdateModelMigrateStatusByStep err. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveFailed, err) |
|
|
|
return err |
|
|
|
} |
|
|
|
if len(fileInfos) == 0 { |
|
|
|
//此时没有文件需要解压迁移,直接更新为成功 |
|
|
|
models.UpdateModelMigrateStatusByStep(r, models.BucketMoveSuccess) |
|
|
|
} |
|
|
|
for _, fileInfo := range fileInfos { |
|
|
|
log.Info("decompress file:", fileInfo.FileName) |
|
|
|
sourceFilPath := r.DestBucket + "/" + r.DestObjectKey + fileInfo.FileName |
|
|
|
if !strings.HasSuffix(r.DestObjectKey, "/") { |
|
|
|
sourceFilPath = r.DestBucket + "/" + r.DestObjectKey + "/" + fileInfo.FileName |
|
|
|
} |
|
|
|
decompress(sourceFilPath, setting.Bucket+"/"+strings.TrimSuffix(r.DestObjectKey, models.ModelSuffix)) |
|
|
|
} |
|
|
|
|
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
} else { |
|
|
|
//因为调度无法指定桶,所以调度成功后我们还需要移桶 |
|
|
|
if setting.UseLocalMinioMigrate { |
|
|
|
if err := MoveBucketJust4LocalMinio(r.DestObjectKey, grampus.GetGPUModelObjectKey(jobName), r.DestBucket, setting.Attachment.Minio.Bucket); err != nil { |
|
|
|
log.Error("MoveBucketJust4LocalMinio err.%v", err) |
|
|
|
if tmpErr := models.UpdateModelMigrateStatusByStep(r, models.BucketMoveFailed); tmpErr != nil { |
|
|
|
log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveFailed, tmpErr) |
|
|
|
} |
|
|
|
return err |
|
|
|
} |
|
|
|
} else { |
|
|
|
if err := MoveBucketInOpenIMinio(r.DestObjectKey, grampus.GetGPUModelObjectKey(jobName), r.DestBucket, setting.Attachment.Minio.Bucket); err != nil { |
|
|
|
log.Error("MoveBucketInOpenIMinio err.%v", err) |
|
|
|
if tmpErr := models.UpdateModelMigrateStatusByStep(r, models.BucketMoveFailed); tmpErr != nil { |
|
|
|
log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveFailed, tmpErr) |
|
|
|
} |
|
|
|
return err |
|
|
|
if err := MoveBucketInOpenIMinio(r.DestObjectKey, grampus.GetGPUModelObjectKey(jobName), r.DestBucket, setting.Attachment.Minio.Bucket); err != nil { |
|
|
|
log.Error("MoveBucketInOpenIMinio err.%v", err) |
|
|
|
if tmpErr := models.UpdateModelMigrateStatusByStep(r, models.BucketMoveFailed); tmpErr != nil { |
|
|
|
log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveFailed, tmpErr) |
|
|
|
} |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
if err := models.UpdateModelMigrateStatusByStep(r, models.BucketMoveSuccess); err != nil { |
|
|
|
log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveSuccess, err) |
|
|
|
} |
|
|
|
} |
|
|
|
if err := models.UpdateModelMigrateStatusByStep(r, models.BucketMoveSuccess); err != nil { |
|
|
|
log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveSuccess, err) |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
@@ -336,6 +290,40 @@ func MoveBucketInOpenIMinio(objectKeyPrefix, targetObjectPrefix, oldBucket, newB |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func MoveBucketInOpenIOBS(objectKeyPrefix, targetObjectPrefix, oldBucket, newBucket string) error { |
|
|
|
var client = storage.ObsCli |
|
|
|
|
|
|
|
input := &obs.ListObjectsInput{} |
|
|
|
input.Bucket = oldBucket |
|
|
|
input.MaxKeys = 1000 |
|
|
|
input.Prefix = objectKeyPrefix |
|
|
|
output, err := client.ListObjects(input) |
|
|
|
if err != nil { |
|
|
|
log.Error("MoveBucketInOpenIOBS list objects error.err=%v", err) |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
log.Info("MoveBucketInOpenIOBS start.objectKeyPrefix=%s", objectKeyPrefix) |
|
|
|
count := 0 |
|
|
|
for _, object := range output.Contents { |
|
|
|
if object.Key == input.Prefix { |
|
|
|
continue |
|
|
|
} |
|
|
|
if strings.HasSuffix(object.Key, "/") { |
|
|
|
continue |
|
|
|
} |
|
|
|
log.Debug("MoveBucketInOpenIOBS object.Key=%s", object.Key) |
|
|
|
newObjectKey := strings.Replace(object.Key, objectKeyPrefix, targetObjectPrefix, 1) |
|
|
|
err := MoveOBSFileBucket(client, object.Key, newObjectKey, oldBucket, newBucket) |
|
|
|
if err != nil { |
|
|
|
log.Error("MoveBucketInOpenIOBS MoveOBSFileBucket object.Key=%s Err=%v", object.Key, err) |
|
|
|
continue |
|
|
|
} |
|
|
|
} |
|
|
|
log.Info("MoveBucketInOpenIOBS finished.objectKeyPrefix=%s ,total=%d", objectKeyPrefix, count) |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func MoveBucketJust4LocalMinio(objectKeyPrefix, targetObjectPrefix, oldBucket, newBucket string) error { |
|
|
|
oldPath := path.Join(setting.Attachment.Minio.RealPath, oldBucket, objectKeyPrefix) |
|
|
|
newPath := path.Join(setting.Attachment.Minio.RealPath, newBucket, targetObjectPrefix) |
|
|
@@ -387,6 +375,28 @@ func MoveMinioFileBucket(core *minio.Core, oldObjectKey, newObjectKey, oldBucket |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
func MoveOBSFileBucket(client *obs.ObsClient, oldObjectKey, newObjectKey, oldBucket, newBucket string) error { |
|
|
|
input := &obs.CopyObjectInput{} |
|
|
|
input.Bucket = newBucket |
|
|
|
input.Key = newObjectKey |
|
|
|
input.CopySourceBucket = oldBucket |
|
|
|
input.CopySourceKey = oldObjectKey |
|
|
|
_, err := client.CopyObject(input) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
log.Error("MoveOBSFileBucket CopyObject err oldObjectKey=%s .%v", oldObjectKey, err) |
|
|
|
return err |
|
|
|
} |
|
|
|
delObj := &obs.DeleteObjectInput{} |
|
|
|
delObj.Bucket = oldBucket |
|
|
|
delObj.Key = oldObjectKey |
|
|
|
_, err = client.DeleteObject(delObj) |
|
|
|
if err != nil { |
|
|
|
log.Error("MoveOBSFileBucket DeleteObject err oldObjectKey=%s .%v", oldObjectKey, err) |
|
|
|
} |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
type DecompressReq struct { |
|
|
|
SourceFile string `json:"source_file"` |
|
|
|
DestPath string `json:"dest_path"` |
|
|
|