diff --git a/models/cloudbrain.go b/models/cloudbrain.go index ea6d0338e9..17761a1dc5 100755 --- a/models/cloudbrain.go +++ b/models/cloudbrain.go @@ -87,6 +87,8 @@ const ( ModelArtsTrainJobCheckRunning ModelArtsJobStatus = "CHECK_RUNNING" //审核作业正在运行中 ModelArtsTrainJobCheckRunningCompleted ModelArtsJobStatus = "CHECK_RUNNING_COMPLETED" //审核作业已经完成 ModelArtsTrainJobCheckFailed ModelArtsJobStatus = "CHECK_FAILED" //审核作业失败 + + DURATION_STR_ZERO = "00:00:00" ) type Cloudbrain struct { @@ -174,7 +176,7 @@ func (task *Cloudbrain) ComputeAndSetDuration() { func ConvertDurationToStr(duration int64) string { if duration == 0 { - return "00:00:00" + return DURATION_STR_ZERO } return util.AddZero(duration/3600) + ":" + util.AddZero(duration%3600/60) + ":" + util.AddZero(duration%60) } @@ -1323,6 +1325,7 @@ func CloudbrainsVersionList(opts *CloudbrainsOptions) ([]*CloudbrainInfo, int, e } func CreateCloudbrain(cloudbrain *Cloudbrain) (err error) { + cloudbrain.TrainJobDuration = DURATION_STR_ZERO if _, err = x.Insert(cloudbrain); err != nil { return err } @@ -1467,6 +1470,15 @@ func GetCloudBrainUnStoppedJob() ([]*Cloudbrain, error) { Find(&cloudbrains) } +func GetStoppedJobWithNoDurationJob() ([]*Cloudbrain, error) { + cloudbrains := make([]*Cloudbrain, 0) + return cloudbrains, x. + In("status", ModelArtsTrainJobCompleted, ModelArtsTrainJobFailed, ModelArtsTrainJobKilled, ModelArtsStopped, JobStopped, JobFailed, JobSucceeded). + Where("train_job_duration is null or train_job_duration = '' "). + Limit(100). + Find(&cloudbrains) +} + func GetCloudbrainCountByUserID(userID int64, jobType string) (int, error) { count, err := x.In("status", JobWaiting, JobRunning).And("job_type = ? and user_id = ? and type = ?", jobType, userID, TypeCloudBrainOne).Count(new(Cloudbrain)) return int(count), err diff --git a/modules/cloudbrain/cloudbrain.go b/modules/cloudbrain/cloudbrain.go index 54ac0c7acf..9aae447b03 100755 --- a/modules/cloudbrain/cloudbrain.go +++ b/modules/cloudbrain/cloudbrain.go @@ -158,10 +158,12 @@ func GenerateTask(ctx *context.Context, displayJobName, jobName, image, command, if ResourceSpecs == nil { json.Unmarshal([]byte(setting.ResourceSpecs), &ResourceSpecs) } + for _, spec := range ResourceSpecs.ResourceSpec { if resourceSpecId == spec.Id { resourceSpec = spec } + } if resourceSpec == nil { diff --git a/routers/private/internal.go b/routers/private/internal.go index 0dd725ca3f..d80a706cc5 100755 --- a/routers/private/internal.go +++ b/routers/private/internal.go @@ -6,6 +6,7 @@ package private import ( + "code.gitea.io/gitea/routers/repo" "strings" "code.gitea.io/gitea/modules/log" @@ -45,6 +46,7 @@ func RegisterRoutes(m *macaron.Macaron) { m.Post("/tool/update_all_repo_commit_cnt", UpdateAllRepoCommitCnt) m.Post("/tool/repo_stat/:date", RepoStatisticManually) m.Post("/tool/update_repo_visit/:date", UpdateRepoVisit) + m.Post("/task/history_handle/duration", repo.HandleTaskWithNoDuration) }, CheckInternalToken) } diff --git a/routers/repo/cloudbrain.go b/routers/repo/cloudbrain.go index 0905efd54d..fc1dbfbd00 100755 --- a/routers/repo/cloudbrain.go +++ b/routers/repo/cloudbrain.go @@ -419,13 +419,16 @@ func cloudBrainShow(ctx *context.Context, tpName base.TplName) { } } if task.TrainJobDuration == "" { - var duration int64 - if task.Status == string(models.JobRunning) { - duration = time.Now().Unix() - int64(task.CreatedUnix) - } else { - duration = int64(task.UpdatedUnix) - int64(task.CreatedUnix) + if task.Duration == 0 { + var duration int64 + if task.Status == string(models.JobRunning) { + duration = time.Now().Unix() - int64(task.CreatedUnix) + } else { + duration = int64(task.UpdatedUnix) - int64(task.CreatedUnix) + } + task.Duration = duration } - task.TrainJobDuration = models.ConvertDurationToStr(duration) + task.TrainJobDuration = models.ConvertDurationToStr(task.Duration) } ctx.Data["duration"] = task.TrainJobDuration ctx.Data["task"] = task @@ -1062,6 +1065,156 @@ func SyncCloudbrainStatus() { return } +func HandleTaskWithNoDuration(ctx *context.Context) { + log.Info("HandleTaskWithNoDuration start") + count := 0 + for { + cloudBrains, err := models.GetStoppedJobWithNoDurationJob() + if err != nil { + log.Error("HandleTaskWithNoTrainJobDuration failed:", err.Error()) + break + } + if len(cloudBrains) == 0 { + log.Info("HandleTaskWithNoTrainJobDuration:no task need handle") + break + } + handleNoDurationTask(cloudBrains) + count += len(cloudBrains) + if len(cloudBrains) < 100 { + log.Info("HandleTaskWithNoTrainJobDuration:task less than 100") + break + } + } + log.Info("HandleTaskWithNoTrainJobDuration:count=%d", count) + ctx.JSON(200, "success") +} + +func handleNoDurationTask(cloudBrains []*models.Cloudbrain) { + for _, task := range cloudBrains { + log.Info("Handle job ,%+v", task) + if task.Type == models.TypeCloudBrainOne { + result, err := cloudbrain.GetJob(task.JobID) + if err != nil { + log.Error("GetJob(%s) failed:%v", task.JobName, err) + updateDefaultDuration(task) + continue + } + + if result != nil { + if result.Msg != "success" { + updateDefaultDuration(task) + continue + } + jobRes, err := models.ConvertToJobResultPayload(result.Payload) + if err != nil || len(jobRes.TaskRoles) == 0 { + updateDefaultDuration(task) + continue + } + taskRoles := jobRes.TaskRoles + taskRes, err := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{})) + if err != nil || len(taskRes.TaskStatuses) == 0 { + updateDefaultDuration(task) + continue + } + task.Status = taskRes.TaskStatuses[0].State + startTime := taskRes.TaskStatuses[0].StartAt.Unix() + endTime := taskRes.TaskStatuses[0].FinishedAt.Unix() + log.Info("task startTime = %v endTime= %v ,jobId=%d", startTime, endTime, task.ID) + if startTime > 0 { + task.StartTime = timeutil.TimeStamp(startTime) + } else { + task.StartTime = task.CreatedUnix + } + if endTime > 0 { + task.EndTime = timeutil.TimeStamp(endTime) + } else { + task.EndTime = task.UpdatedUnix + } + + if task.EndTime < task.StartTime { + log.Info("endTime[%v] is less than starTime[%v],jobId=%d", task.EndTime, task.StartTime, task.ID) + st := task.StartTime + task.StartTime = task.EndTime + task.EndTime = st + } + task.ComputeAndSetDuration() + err = models.UpdateJob(task) + if err != nil { + log.Error("UpdateJob(%s) failed:%v", task.JobName, err) + } + } + } else if task.Type == models.TypeCloudBrainTwo { + if task.JobType == string(models.JobTypeDebug) { + //result, err := modelarts.GetJob(task.JobID) + result, err := modelarts.GetNotebook2(task.JobID) + if err != nil { + log.Error("GetJob(%s) failed:%v", task.JobName, err) + task.StartTime = task.CreatedUnix + task.EndTime = task.UpdatedUnix + task.ComputeAndSetDuration() + err = models.UpdateJob(task) + if err != nil { + log.Error("UpdateJob(%s) failed:%v", task.JobName, err) + } + continue + } + + if result != nil { + task.Status = result.Status + startTime := result.Lease.CreateTime + duration := result.Lease.Duration / 1000 + if startTime > 0 { + task.StartTime = timeutil.TimeStamp(startTime) + task.EndTime = task.StartTime.Add(duration) + } + task.ComputeAndSetDuration() + err = models.UpdateJob(task) + if err != nil { + log.Error("UpdateJob(%s) failed:%v", task.JobName, err) + continue + } + } + } else if task.JobType == string(models.JobTypeTrain) { + result, err := modelarts.GetTrainJob(task.JobID, strconv.FormatInt(task.VersionID, 10)) + if err != nil { + log.Error("GetTrainJob(%s) failed:%v", task.JobName, err) + continue + } + + if result != nil { + startTime := result.StartTime / 1000 + if startTime > 0 { + task.StartTime = timeutil.TimeStamp(startTime) + task.EndTime = task.StartTime.Add(result.Duration / 1000) + } + task.ComputeAndSetDuration() + err = models.UpdateJob(task) + if err != nil { + log.Error("UpdateJob(%s) failed:%v", task.JobName, err) + continue + } + } + } else { + log.Error("task.JobType(%s) is error:%s", task.JobName, task.JobType) + } + + } else { + log.Error("task.Type(%s) is error:%d", task.JobName, task.Type) + } + } +} + +func updateDefaultDuration(task *models.Cloudbrain) { + log.Info("updateDefaultDuration: taskId=%d", task.ID) + task.StartTime = task.CreatedUnix + task.EndTime = task.UpdatedUnix + task.ComputeAndSetDuration() + err := models.UpdateJob(task) + if err != nil { + log.Error("UpdateJob(%s) failed:%v", task.JobName, err) + } +} + func CloudBrainBenchmarkIndex(ctx *context.Context) { MustEnableCloudbrain(ctx) repo := ctx.Repo.Repository @@ -1090,13 +1243,16 @@ func CloudBrainBenchmarkIndex(ctx *context.Context) { ciTasks[i].CanDel = cloudbrain.CanDeleteJob(ctx, &task.Cloudbrain) ciTasks[i].Cloudbrain.ComputeResource = task.ComputeResource if ciTasks[i].TrainJobDuration == "" { - var duration int64 - if task.Status == string(models.JobRunning) { - duration = time.Now().Unix() - int64(task.Cloudbrain.CreatedUnix) - } else { - duration = int64(task.Cloudbrain.UpdatedUnix) - int64(task.Cloudbrain.CreatedUnix) + if ciTasks[i].Duration == 0 { + var duration int64 + if task.Status == string(models.JobRunning) { + duration = time.Now().Unix() - int64(task.Cloudbrain.CreatedUnix) + } else { + duration = int64(task.Cloudbrain.UpdatedUnix) - int64(task.Cloudbrain.CreatedUnix) + } + ciTasks[i].Duration = duration } - ciTasks[i].TrainJobDuration = models.ConvertDurationToStr(duration) + ciTasks[i].TrainJobDuration = models.ConvertDurationToStr(ciTasks[i].Duration) } ciTasks[i].BenchmarkTypeName = ""