|
|
@@ -0,0 +1,374 @@ |
|
|
|
package schedule |
|
|
|
|
|
|
|
import ( |
|
|
|
"bytes" |
|
|
|
"code.gitea.io/gitea/models" |
|
|
|
"code.gitea.io/gitea/modules/grampus" |
|
|
|
"code.gitea.io/gitea/modules/labelmsg" |
|
|
|
"code.gitea.io/gitea/modules/log" |
|
|
|
"code.gitea.io/gitea/modules/redis/redis_key" |
|
|
|
"code.gitea.io/gitea/modules/redis/redis_lock" |
|
|
|
"code.gitea.io/gitea/modules/setting" |
|
|
|
"code.gitea.io/gitea/modules/storage" |
|
|
|
"code.gitea.io/gitea/modules/util" |
|
|
|
"encoding/json" |
|
|
|
"errors" |
|
|
|
"fmt" |
|
|
|
"github.com/minio/minio-go" |
|
|
|
"os/exec" |
|
|
|
"path" |
|
|
|
"strings" |
|
|
|
"time" |
|
|
|
) |
|
|
|
|
|
|
|
const NPUModelDefaultName = "models.zip" |
|
|
|
|
|
|
|
func GetModelScheduleStatus(jobId string) (models.ModelMigrateStatus, error) { |
|
|
|
job, err := models.GetCloudbrainByJobID(jobId) |
|
|
|
if err != nil { |
|
|
|
log.Error("GetModelScheduleStatus GetCloudbrainByJobID err.jobId=%s err=%v", jobId, err) |
|
|
|
return 0, errors.New("jobId not correct") |
|
|
|
} |
|
|
|
if !job.IsTerminal() { |
|
|
|
log.Info("GetModelScheduleStatus job is not terminal.jobId=%s", jobId) |
|
|
|
return models.ModelMigrateWaiting, nil |
|
|
|
} |
|
|
|
|
|
|
|
record, err := models.GetModelMigrateRecordByCloudbrainId(job.ID) |
|
|
|
if err != nil { |
|
|
|
log.Error("GetModelScheduleStatus GetModelMigrateRecordByCloudbrainId err.jobId=%s err=%v", jobId, err) |
|
|
|
if models.IsErrRecordNotExist(err) { |
|
|
|
return models.ModelMigrateSuccess, nil |
|
|
|
} |
|
|
|
return models.ModelMigrateFailed, err |
|
|
|
} |
|
|
|
|
|
|
|
if !record.IsFinished() { |
|
|
|
go HandleUnfinishedMigrateRecord(record) |
|
|
|
} |
|
|
|
|
|
|
|
return record.Status, nil |
|
|
|
} |
|
|
|
|
|
|
|
func RetryModelMigrate(jobId string) error { |
|
|
|
job, err := models.GetCloudbrainByJobID(jobId) |
|
|
|
if err != nil { |
|
|
|
log.Error("RetryModelMigrate GetCloudbrainByJobID err.jobId=%s err=%v", jobId, err) |
|
|
|
return errors.New("jobId not correct") |
|
|
|
} |
|
|
|
if !job.IsTerminal() { |
|
|
|
log.Info("RetryModelMigrate job is not terminal.jobId=%s", jobId) |
|
|
|
return errors.New("task is not terminal") |
|
|
|
} |
|
|
|
|
|
|
|
//避免并发问题,先尝试获取锁,获取锁以后再查最新的记录 |
|
|
|
lock := redis_lock.NewDistributeLock(redis_key.RecordHandleLock(jobId)) |
|
|
|
success, err := lock.LockWithWait(10*time.Second, 10*time.Second) |
|
|
|
if err != nil { |
|
|
|
log.Error("HandleUnfinishedMigrateRecord lock err.jobId=%d %v", jobId, err) |
|
|
|
return err |
|
|
|
} |
|
|
|
if !success { |
|
|
|
log.Error("HandleUnfinishedMigrateRecord lock failed.ID=%d ", jobId) |
|
|
|
return nil |
|
|
|
} |
|
|
|
defer lock.UnLock() |
|
|
|
|
|
|
|
record, err := models.GetModelMigrateRecordByCloudbrainId(job.ID) |
|
|
|
if err != nil { |
|
|
|
log.Error("RetryModelMigrate GetModelMigrateRecordByCloudbrainId err.jobId=%s err=%v", jobId, err) |
|
|
|
if models.IsErrRecordNotExist(err) { |
|
|
|
return nil |
|
|
|
} |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
//只有两种情况可以再次调度,一是虎鲸调度失败 二是本地移桶失败 |
|
|
|
if record.CurrentStep == models.GrampusMigrateFailed { |
|
|
|
log.Info("retry PostModelMigrate. record.id = %d", record.ID) |
|
|
|
_, err := grampus.PostModelMigrate(jobId) |
|
|
|
if err != nil { |
|
|
|
log.Error("PostModelMigrate err.%v", err) |
|
|
|
return err |
|
|
|
} |
|
|
|
models.IncreaseModelMigrateRetryCount(record.ID) |
|
|
|
if err := models.RollBackMigrateStatus(record, models.GrampusMigrating); err != nil { |
|
|
|
log.Error("UpdateModelMigrateStatusByStep err.%v", err) |
|
|
|
return err |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
if record.CurrentStep == models.BucketMoveFailed { |
|
|
|
log.Info("retry BucketMove. record.id = %d", record.ID) |
|
|
|
if err := models.RollBackMigrateStatus(record, models.GrampusMigrateSuccess); err != nil { |
|
|
|
log.Error("UpdateModelMigrateStatusByStep err.%v", err) |
|
|
|
return err |
|
|
|
} |
|
|
|
models.IncreaseModelMigrateRetryCount(record.ID) |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
return errors.New("No need to retry,the model migration has been successful or is in the process.") |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
func HandleUnfinishedMigrateRecords() { |
|
|
|
list, err := models.GetUnfinishedModelMigrateRecords() |
|
|
|
if err != nil { |
|
|
|
log.Error("GetUnfinishedModelMigrateRecords err=%v", err) |
|
|
|
return |
|
|
|
} |
|
|
|
for _, r := range list { |
|
|
|
HandleUnfinishedMigrateRecord(r) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func HandleUnfinishedMigrateRecord(r *models.ModelMigrateRecord) error { |
|
|
|
cloudbrain, err := models.GetCloudbrainByID(fmt.Sprint(r.CloudbrainID)) |
|
|
|
if err != nil { |
|
|
|
log.Error("GetCloudbrainByID err. %v", err) |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
lock := redis_lock.NewDistributeLock(redis_key.RecordHandleLock(cloudbrain.JobID)) |
|
|
|
success, err := lock.LockWithWait(10*time.Second, 10*time.Second) |
|
|
|
if err != nil { |
|
|
|
log.Error("HandleUnfinishedMigrateRecord lock err.ID=%d %v", r.ID, err) |
|
|
|
return err |
|
|
|
} |
|
|
|
if !success { |
|
|
|
log.Error("HandleUnfinishedMigrateRecord lock failed.ID=%d ", r.ID) |
|
|
|
return nil |
|
|
|
} |
|
|
|
defer lock.UnLock() |
|
|
|
|
|
|
|
//拿到锁以后重新查询一次 |
|
|
|
r, err = models.GetModelMigrateRecordById(r.ID) |
|
|
|
if err != nil { |
|
|
|
log.Error("RetryModelMigrate GetModelMigrateRecordById err.Id=%s err=%v", r.ID, err) |
|
|
|
if models.IsErrRecordNotExist(err) { |
|
|
|
return nil |
|
|
|
} |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
if r.CurrentStep == models.GrampusMigrateInit || r.CurrentStep == models.GrampusMigrating { |
|
|
|
if err := UpdateModelMigrateStatusFromGrampus(r, cloudbrain.JobID); err != nil { |
|
|
|
log.Error("UpdateModelMigrateStatusFromGrampus err. %v", err) |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if r.CurrentStep == models.GrampusMigrateSuccess { |
|
|
|
if err := LocalMigrateOperate(cloudbrain.JobName, cloudbrain.ComputeResource, r); err != nil { |
|
|
|
log.Error("LocalMigrateOperate err. %v", err) |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if r.CurrentStep == models.BucketMoving { |
|
|
|
//尝试查询NPU结果目录下是否有文件,有文件则认为已经解压成功 |
|
|
|
if cloudbrain.ComputeResource == models.NPUResource && IsNPUModelDirHasFile(cloudbrain.JobName, cloudbrain.VersionName) { |
|
|
|
TryToUpdateNPUMoveBucketResult(r, cloudbrain.JobName, cloudbrain.VersionName) |
|
|
|
} |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func UpdateModelMigrateStatusFromGrampus(r *models.ModelMigrateRecord, jobId string) error { |
|
|
|
res, err := grampus.ModelMigrateInfo(jobId) |
|
|
|
if err != nil { |
|
|
|
log.Error("ModelMigrateInfo err. r.ID=%d %v", r.ID, err) |
|
|
|
return err |
|
|
|
} |
|
|
|
log.Info("grampus ModelMigrateInfo r.ID=%d res=%+v", r.ID, res) |
|
|
|
newStep := models.GrampusMigrateResponse(res.Status).ConvertToModelMigrateStep() |
|
|
|
if newStep == r.CurrentStep { |
|
|
|
log.Info("The status has not changed. r.ID=%d status=%d", r.ID, res.Status) |
|
|
|
return nil |
|
|
|
} |
|
|
|
err = updateModelMigrateFromRes(r, res) |
|
|
|
if err != nil { |
|
|
|
log.Error("updateModelMigrateFromRes err. r.ID=%d %v", r.ID, err) |
|
|
|
return err |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func LocalMigrateOperate(jobName, computeSource string, r *models.ModelMigrateRecord) error { |
|
|
|
log.Info("Grampus model migrate succeed,objectKey = %s computeSource= %s", r.DestObjectKey, computeSource) |
|
|
|
err := models.UpdateModelMigrateStatusByStep(r, models.BucketMoving) |
|
|
|
if err != nil { |
|
|
|
log.Error("UpdateModelMigrateStatusByStep err. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveFailed, err) |
|
|
|
return err |
|
|
|
} |
|
|
|
if computeSource == models.NPUResource { |
|
|
|
//因为NPU的输出会被压缩,因此需要解压+移桶 |
|
|
|
decompress(r.DestBucket+"/"+r.DestObjectKey, setting.Bucket+"/"+strings.TrimSuffix(r.DestObjectKey, models.ModelSuffix)) |
|
|
|
} else { |
|
|
|
//因为调度无法指定桶,所以调度成功后我们还需要移桶 |
|
|
|
if setting.UseLocalMinioMigrate { |
|
|
|
if err := MoveBucketJust4LocalMinio(r.DestObjectKey, grampus.GetGPUModelObjectKey(jobName), r.DestBucket, setting.Attachment.Minio.Bucket); err != nil { |
|
|
|
log.Error("MoveBucketJust4LocalMinio err.%v", err) |
|
|
|
if tmpErr := models.UpdateModelMigrateStatusByStep(r, models.BucketMoveFailed); tmpErr != nil { |
|
|
|
log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveFailed, tmpErr) |
|
|
|
} |
|
|
|
return err |
|
|
|
} |
|
|
|
} else { |
|
|
|
if err := MoveBucketInOpenIMinio(r.DestObjectKey, grampus.GetGPUModelObjectKey(jobName), r.DestBucket, setting.Attachment.Minio.Bucket); err != nil { |
|
|
|
log.Error("MoveBucketInOpenIMinio err.%v", err) |
|
|
|
if tmpErr := models.UpdateModelMigrateStatusByStep(r, models.BucketMoveFailed); tmpErr != nil { |
|
|
|
log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveFailed, tmpErr) |
|
|
|
} |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if err := models.UpdateModelMigrateStatusByStep(r, models.BucketMoveSuccess); err != nil { |
|
|
|
log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveSuccess, err) |
|
|
|
} |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func TryToUpdateNPUMoveBucketResult(record *models.ModelMigrateRecord, jobName, versionName string) error { |
|
|
|
if IsNPUModelDirHasFile(jobName, versionName) { |
|
|
|
if err := models.UpdateModelMigrateStatusByStep(record, models.BucketMoveSuccess); err != nil { |
|
|
|
log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", record.ID, models.BucketMoveSuccess, err) |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func updateModelMigrateFromRes(r *models.ModelMigrateRecord, res *models.GrampusModelMigrateInfoResponse) error { |
|
|
|
step := models.GrampusMigrateResponse(res.Status).ConvertToModelMigrateStep() |
|
|
|
err := models.UpdateModelMigrateStatusByStep(r, step) |
|
|
|
if err != nil { |
|
|
|
log.Error("UpdateModelMigrateStatusByStep err,ID=%d err=%v", r.ID, err) |
|
|
|
return err |
|
|
|
} |
|
|
|
r.DestBucket = res.DestBucket |
|
|
|
r.DestEndpoint = res.DestEndpoint |
|
|
|
r.DestObjectKey = res.DestObjectKey |
|
|
|
r.DestProxy = res.DestProxy |
|
|
|
r.Remark = strings.TrimPrefix(r.Remark+";"+util.TruncateString(res.FailedReason, 200), ";") |
|
|
|
r.SrcBucket = res.SrcBucket |
|
|
|
r.SrcEndpoint = res.SrcEndpoint |
|
|
|
r.SrcObjectKey = res.SrcObjectKey |
|
|
|
err = models.UpdateModelMigrateRecordByStep(r) |
|
|
|
if err != nil { |
|
|
|
log.Error("updateModelMigrateFromRes UpdateModelMigrateRecord error.id=%d.err=%v", r.ID, err) |
|
|
|
return err |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func MoveBucketInOpenIMinio(objectKeyPrefix, targetObjectPrefix, oldBucket, newBucket string) error { |
|
|
|
var core = storage.ScheduleMinioCore |
|
|
|
objectInfo := core.Client.ListObjects(oldBucket, objectKeyPrefix, true, nil) |
|
|
|
log.Info("MoveBucketInOpenIMinio start.objectKeyPrefix=%s", objectKeyPrefix) |
|
|
|
count := 0 |
|
|
|
for object := range objectInfo { |
|
|
|
count++ |
|
|
|
if object.Err != nil { |
|
|
|
log.Error("MoveBucketInOpenIMinio object.Err=%v", object.Err) |
|
|
|
return object.Err |
|
|
|
} |
|
|
|
log.Debug("MoveBucketInOpenIMinio object.Key=%s", object.Key) |
|
|
|
newObjectKey := strings.Replace(object.Key, objectKeyPrefix, targetObjectPrefix, 1) |
|
|
|
err := MoveMinioFileBucket(core, object.Key, newObjectKey, oldBucket, newBucket) |
|
|
|
if err != nil { |
|
|
|
log.Error("MoveBucketInOpenIMinio MoveMinioFileBucket object.Key=%s Err=%v", object.Key, err) |
|
|
|
continue |
|
|
|
} |
|
|
|
} |
|
|
|
log.Info("MoveBucketInOpenIMinio finished.objectKeyPrefix=%s ,total=%d", objectKeyPrefix, count) |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func MoveBucketJust4LocalMinio(objectKeyPrefix, targetObjectPrefix, oldBucket, newBucket string) error { |
|
|
|
oldPath := path.Join(setting.Attachment.Minio.RealPath, oldBucket, objectKeyPrefix) |
|
|
|
newPath := path.Join(setting.Attachment.Minio.RealPath, newBucket, targetObjectPrefix) |
|
|
|
log.Info("MoveBucketJust4LocalMinio start.oldPath=%s newPath=%s", oldPath, newPath) |
|
|
|
//重命名原有文件夹,防止已有该文件 |
|
|
|
err, errStr := sudoMv(newPath, fmt.Sprintf("%s_%d", newPath, time.Now().Unix())) |
|
|
|
if err != nil { |
|
|
|
log.Error("MoveBucketJust4LocalMinio sudoMv error.oldPath=%s newPath=%s Err=%v errStr=%s ", oldPath, newPath, err, errStr) |
|
|
|
} |
|
|
|
//移动(重命名)文件夹 |
|
|
|
err, errStr = sudoMv(oldPath, newPath) |
|
|
|
if err != nil { |
|
|
|
log.Error("MoveBucketJust4LocalMinio sudoMv error.oldPath=%s newPath=%s Err=%v errStr=%s ", oldPath, newPath, err, errStr) |
|
|
|
return err |
|
|
|
} |
|
|
|
log.Info("MoveBucketInOpenIMinio finished.oldPath=%s newPath=%s ", oldPath, newPath) |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func sudoMv(oldPath, newPath string) (error, string) { |
|
|
|
c := fmt.Sprintf("sudo mv %s %s", oldPath, newPath) |
|
|
|
log.Info("start to sudoMv,oldPath=%s newPath=%s", oldPath, newPath) |
|
|
|
cmd := exec.Command("/bin/sh", "-c", c) |
|
|
|
var stdout, stderr bytes.Buffer |
|
|
|
cmd.Stdout = &stdout // 标准输出 |
|
|
|
cmd.Stderr = &stderr // 标准错误 |
|
|
|
err := cmd.Run() |
|
|
|
outStr, errStr := string(stdout.Bytes()), string(stderr.Bytes()) |
|
|
|
log.Debug("out:\n%s\nerr:\n%s\n", outStr, errStr) |
|
|
|
if err != nil { |
|
|
|
log.Error("cmd.Run() failed,oldPath=%s newPath=%s err=%v\n", oldPath, newPath, err) |
|
|
|
return err, errStr |
|
|
|
} |
|
|
|
return nil, errStr |
|
|
|
} |
|
|
|
|
|
|
|
func MoveMinioFileBucket(core *minio.Core, oldObjectKey, newObjectKey, oldBucket, newBucket string) error { |
|
|
|
_, err := core.CopyObject(oldBucket, oldObjectKey, newBucket, newObjectKey, map[string]string{}) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
log.Error("MoveBucketInOpenIMinio CopyObject err oldObjectKey=%s .%v", oldObjectKey, err) |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
err = core.RemoveObject(oldBucket, oldObjectKey) |
|
|
|
if err != nil { |
|
|
|
log.Error("MoveBucketInOpenIMinio RemoveObject err oldObjectKey=%s .%v", oldObjectKey, err) |
|
|
|
} |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
type DecompressReq struct { |
|
|
|
SourceFile string `json:"source_file"` |
|
|
|
DestPath string `json:"dest_path"` |
|
|
|
} |
|
|
|
|
|
|
|
func decompress(sourceFile, destPath string) { |
|
|
|
req, _ := json.Marshal(DecompressReq{ |
|
|
|
SourceFile: sourceFile, |
|
|
|
DestPath: destPath, |
|
|
|
}) |
|
|
|
err := labelmsg.SendDecompressAttachToLabelOBS(string(req)) |
|
|
|
if err != nil { |
|
|
|
log.Error("SendDecompressTask to labelsystem (%s) failed:%s", sourceFile, err.Error()) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func IsNPUModelDirHasFile(jobName string, versionName string) bool { |
|
|
|
prefix := strings.TrimPrefix(path.Join(setting.TrainJobModelPath, jobName, setting.OutPutPath, versionName), "/") |
|
|
|
if !strings.HasSuffix(prefix, "/") { |
|
|
|
prefix += "/" |
|
|
|
} |
|
|
|
fileInfos, err := storage.GetOneLevelAllObjectUnderDir(setting.Bucket, prefix, "") |
|
|
|
if err != nil { |
|
|
|
log.Info("IsNPUModelDirHasFile.get TrainJobListModel failed:", err) |
|
|
|
return false |
|
|
|
} |
|
|
|
|
|
|
|
if len(fileInfos) > 0 { |
|
|
|
return true |
|
|
|
} |
|
|
|
return len(fileInfos) > 0 |
|
|
|
} |