#2700 针对modelarts报错502的处理优化

Merged
zouap merged 11 commits from fix502 into V20220815 1 year ago
  1. +55
    -5
      models/cloudbrain.go
  2. +68
    -0
      models/cloudbrain_temp.go
  3. +1
    -0
      models/models.go
  4. +4
    -4
      modules/cloudbrain/cloudbrain.go
  5. +13
    -0
      modules/cron/tasks_basic.go
  6. +524
    -108
      modules/modelarts/modelarts.go
  7. +185
    -31
      modules/modelarts/resty.go
  8. +2
    -0
      modules/setting/setting.go
  9. +4
    -86
      routers/api/v1/repo/modelarts.go
  10. +6
    -52
      routers/repo/cloudbrain.go
  11. +2
    -2
      routers/repo/grampus.go
  12. +129
    -197
      routers/repo/modelarts.go
  13. +2
    -1
      routers/routes/routes.go

+ 55
- 5
models/cloudbrain.go View File

@@ -8,14 +8,13 @@ import (
"strings"
"time"

"code.gitea.io/gitea/modules/util"

"xorm.io/builder"
"xorm.io/xorm"

"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/util"
)

type CloudbrainStatus string
@@ -31,9 +30,9 @@ const (
)

const (
NPUResource = "NPU"
GPUResource = "CPU/GPU"
AllResource = "all"
NPUResource = "NPU"
GPUResource = "CPU/GPU"
AllResource = "all"

//notebook storage category
EVSCategory = "EVS"
@@ -1262,6 +1261,52 @@ type LogFile struct {
Name string
}

type JobList struct {
JobName string `json:"job_name"`
JobID int64 `json:"job_id"`
VersionID int64 `json:"version_id"`
VersionCount int64 `json:"version_count"`
Description string `json:"job_desc"`
IntStatus int `json:"status"`
}

type GetTrainJobListResult struct {
ErrorResult
JobTotalCount int `json:"job_total_count"` //查询到的用户创建作业总数
JobCountLimit int `json:"job_count_limit"` //用户还可以创建训练作业的数量
Quotas int `json:"quotas"` //训练作业的运行数量上限
JobList []JobList `json:"jobs"`
}

type JobVersionList struct {
VersionName string `json:"version_name"`
VersionID int64 `json:"version_id"`
IntStatus int `json:"status"`
}

type GetTrainJobVersionListResult struct {
ErrorResult
JobID int64 `json:"job_id"`
JobName string `json:"job_name"`
JobDesc string `json:"job_desc"`
VersionCount int64 `json:"version_count"`
JobVersionList []JobVersionList `json:"versions"`
}

type NotebookList struct {
JobName string `json:"name"`
JobID string `json:"id"`
Status string `json:"status"`
}

type GetNotebookListResult struct {
TotalCount int64 `json:"total"` //总的记录数量
CurrentPage int `json:"current"` //当前页数
TotalPages int `json:"pages"` //总的页数
Size int `json:"size"` //每一页的数量
NotebookList []NotebookList `json:"data"`
}

//Grampus
type GrampusResult struct {
ErrorCode int `json:"errorCode"`
@@ -2289,3 +2334,8 @@ func GetCloudbrainByIDs(ids []int64) ([]*Cloudbrain, error) {
In("id", ids).
Find(&cloudbrains)
}

func GetCloudbrainCountByJobName(jobName, jobType string, typeCloudbrain int) (int, error) {
count, err := x.Where("job_name = ? and job_type= ? and type = ?", jobName, jobType, typeCloudbrain).Count(new(Cloudbrain))
return int(count), err
}

+ 68
- 0
models/cloudbrain_temp.go View File

@@ -0,0 +1,68 @@
package models

import (
"time"

"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil"
)

const (
TempJobId = "TEMP"
TempVersionId = TempJobId
TempJobStatus = TempJobId
)

type CloudbrainTemp struct {
ID int64 `xorm:"pk autoincr"`
JobID string `xorm:"NOT NULL DEFAULT 'TEMP'"`
VersionID string `xorm:"NOT NULL DEFAULT 'TEMP'"`
JobName string `xorm:"NOT NULL "`
Type int `xorm:"NOT NULL "`
JobType string `xorm:"INDEX NOT NULL DEFAULT 'DEBUG'"`
Status string `xorm:"INDEX NOT NULL DEFAULT 'TEMP'"`
QueryTimes int `xorm:"INDEX NOT NULL DEFAULT 0"`
CreatedUnix timeutil.TimeStamp `xorm:"INDEX created"`
UpdatedUnix timeutil.TimeStamp `xorm:"INDEX updated"`
DeletedAt time.Time `xorm:"deleted"`
}

func InsertCloudbrainTemp(temp *CloudbrainTemp) (err error) {
if _, err = x.Insert(temp); err != nil {
return err
}

return nil
}

func getCloudBrainTemp(temp *CloudbrainTemp) (*CloudbrainTemp, error) {
has, err := x.Get(temp)
if err != nil {
return nil, err
} else if !has {
return nil, ErrJobNotExist{}
}
return temp, nil
}

func GetCloudBrainTempJobs() ([]*CloudbrainTemp, error) {
jobs := make([]*CloudbrainTemp, 0, 10)
return jobs, x.In("status", TempJobStatus, string(ModelArtsStopping), string(ModelArtsTrainJobKilling)).
And("query_times < ?", setting.MaxTempQueryTimes).
Limit(100).
Find(&jobs)
}

func DeleteCloudbrainTemp(temp *CloudbrainTemp) error {
return deleteCloudbrainTemp(x, temp)
}

func deleteCloudbrainTemp(e Engine, temp *CloudbrainTemp) error {
_, err := e.ID(temp.ID).Delete(temp)
return err
}

func UpdateCloudbrainTemp(temp *CloudbrainTemp) error {
_, err := x.ID(temp.ID).AllCols().Update(temp)
return err
}

+ 1
- 0
models/models.go View File

@@ -145,6 +145,7 @@ func init() {
new(OrgStatistic),
new(SearchRecord),
new(AiModelConvert),
new(CloudbrainTemp),
)

tablesStatistic = append(tablesStatistic,


+ 4
- 4
modules/cloudbrain/cloudbrain.go View File

@@ -142,8 +142,8 @@ func isAdminOrImageCreater(ctx *context.Context, image *models.Image, err error)

func AdminOrOwnerOrJobCreaterRight(ctx *context.Context) {

var ID = ctx.Params(":id")
job, err := models.GetCloudbrainByID(ID)
var id = ctx.Params(":id")
job, err := models.GetCloudbrainByID(id)
if err != nil {
log.Error("GetCloudbrainByID failed:%v", err.Error())
ctx.NotFound(ctx.Req.URL.RequestURI(), nil)
@@ -158,8 +158,8 @@ func AdminOrOwnerOrJobCreaterRight(ctx *context.Context) {

func AdminOrJobCreaterRight(ctx *context.Context) {

var ID = ctx.Params(":id")
job, err := models.GetCloudbrainByID(ID)
var id = ctx.Params(":id")
job, err := models.GetCloudbrainByID(id)
if err != nil {
log.Error("GetCloudbrainByID failed:%v", err.Error())
ctx.NotFound(ctx.Req.URL.RequestURI(), nil)


+ 13
- 0
modules/cron/tasks_basic.go View File

@@ -5,6 +5,7 @@
package cron

import (
"code.gitea.io/gitea/modules/modelarts"
"context"
"time"

@@ -207,6 +208,17 @@ func registerSyncCloudbrainStatus() {
})
}

func registerSyncModelArtsTempJobs() {
RegisterTaskFatal("sync_model_arts_temp_jobs", &BaseConfig{
Enabled: true,
RunAtStart: false,
Schedule: "@every 1m",
}, func(ctx context.Context, _ *models.User, _ Config) error {
modelarts.SyncTempStatusJob()
return nil
})
}

func initBasicTasks() {
registerUpdateMirrorTask()
registerRepoHealthCheck()
@@ -227,4 +239,5 @@ func initBasicTasks() {

registerSyncCloudbrainStatus()
registerHandleOrgStatistic()
registerSyncModelArtsTempJobs()
}

+ 524
- 108
modules/modelarts/modelarts.go View File

@@ -6,8 +6,7 @@ import (
"fmt"
"path"
"strconv"

"code.gitea.io/gitea/modules/timeutil"
"strings"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/context"
@@ -15,6 +14,7 @@ import (
"code.gitea.io/gitea/modules/notification"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/storage"
"code.gitea.io/gitea/modules/timeutil"
)

const (
@@ -59,7 +59,7 @@ const (
PerPage = 10
IsLatestVersion = "1"
NotLatestVersion = "0"
VersionCount = 1
VersionCountOne = 1

SortByCreateTime = "create_time"
ConfigTypeCustom = "custom"
@@ -284,9 +284,24 @@ func GenerateNotebook2(ctx *context.Context, displayJobName, jobName, uuid, desc
})
if err != nil {
log.Error("createNotebook2 failed: %v", err.Error())
if strings.HasPrefix(err.Error(), UnknownErrorPrefix) {
log.Info("(%s)unknown error, set temp status", displayJobName)
errTemp := models.InsertCloudbrainTemp(&models.CloudbrainTemp{
JobID: models.TempJobId,
VersionID: models.TempVersionId,
Status: models.TempJobStatus,
Type: models.TypeCloudBrainTwo,
JobName: jobName,
JobType: string(models.JobTypeDebug),
})
if errTemp != nil {
log.Error("InsertCloudbrainTemp failed: %v", errTemp.Error())
return errTemp
}
}
return err
}
err = models.CreateCloudbrain(&models.Cloudbrain{
task := &models.Cloudbrain{
Status: jobResult.Status,
UserID: ctx.User.ID,
RepoID: ctx.Repo.Repository.ID,
@@ -302,16 +317,13 @@ func GenerateNotebook2(ctx *context.Context, displayJobName, jobName, uuid, desc
Description: description,
CreatedUnix: createTime,
UpdatedUnix: createTime,
})

if err != nil {
return err
}
task, err := models.GetCloudbrainByName(jobName)

err = models.CreateCloudbrain(task)
if err != nil {
log.Error("GetCloudbrainByName failed: %v", err.Error())
return err
}

stringId := strconv.FormatInt(task.ID, 10)
notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, stringId, displayJobName, models.ActionCreateDebugNPUTask)
return nil
@@ -364,7 +376,22 @@ func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (err error
})
}
if createErr != nil {
log.Error("CreateJob failed: %v", createErr.Error())
log.Error("createTrainJob failed: %v", createErr.Error())
if strings.HasPrefix(createErr.Error(), UnknownErrorPrefix) {
log.Info("(%s)unknown error, set temp status", req.DisplayJobName)
errTemp := models.InsertCloudbrainTemp(&models.CloudbrainTemp{
JobID: models.TempJobId,
VersionID: models.TempVersionId,
Status: models.TempJobStatus,
Type: models.TypeCloudBrainTwo,
JobName: req.JobName,
JobType: string(models.JobTypeTrain),
})
if errTemp != nil {
log.Error("InsertCloudbrainTemp failed: %v", errTemp.Error())
return errTemp
}
}
return createErr
}
jobId := strconv.FormatInt(jobResult.JobID, 10)
@@ -438,7 +465,7 @@ func GenerateTrainJobVersion(ctx *context.Context, req *GenerateTrainJobReq, job
createTime := timeutil.TimeStampNow()
var jobResult *models.CreateTrainJobResult
var createErr error
log.Info(" req.EngineID =" + fmt.Sprint(req.EngineID))
if req.EngineID < 0 {
jobResult, createErr = createTrainJobVersionUserImage(models.CreateTrainJobVersionUserImageParams{
Description: req.Description,
@@ -480,7 +507,22 @@ func GenerateTrainJobVersion(ctx *context.Context, req *GenerateTrainJobReq, job
}, jobId)
}
if createErr != nil {
log.Error("CreateJob failed: %v", createErr.Error())
log.Error("createTrainJobVersion failed: %v", createErr.Error())
if strings.HasPrefix(createErr.Error(), UnknownErrorPrefix) {
log.Info("(%s)unknown error, set temp status", req.DisplayJobName)
errTemp := models.InsertCloudbrainTemp(&models.CloudbrainTemp{
JobID: jobId,
VersionID: models.TempVersionId,
Status: models.TempJobStatus,
Type: models.TypeCloudBrainTwo,
JobName: req.JobName,
JobType: string(models.JobTypeTrain),
})
if errTemp != nil {
log.Error("InsertCloudbrainTemp failed: %v", errTemp.Error())
return errTemp
}
}
return createErr
}

@@ -540,7 +582,7 @@ func GenerateTrainJobVersion(ctx *context.Context, req *GenerateTrainJobReq, job
}

//将训练任务的上一版本的isLatestVersion设置为"0"
createErr = models.SetVersionCountAndLatestVersion(strconv.FormatInt(jobResult.JobID, 10), VersionTaskList[0].VersionName, VersionCount, NotLatestVersion, TotalVersionCount)
createErr = models.SetVersionCountAndLatestVersion(strconv.FormatInt(jobResult.JobID, 10), VersionTaskList[0].VersionName, VersionCountOne, NotLatestVersion, TotalVersionCount)
if createErr != nil {
ctx.ServerError("Update IsLatestVersion failed", createErr)
return createErr
@@ -549,99 +591,6 @@ func GenerateTrainJobVersion(ctx *context.Context, req *GenerateTrainJobReq, job
return createErr
}

func GenerateTrainJobVersionByUserImage(ctx *context.Context, req *GenerateTrainJobReq, jobId string) (err error) {
createTime := timeutil.TimeStampNow()
jobResult, err := createTrainJobUserImage(models.CreateUserImageTrainJobParams{
JobName: req.JobName,
Description: req.Description,
Config: models.UserImageConfig{
WorkServerNum: req.WorkServerNumber,
AppUrl: req.CodeObsPath,
BootFileUrl: req.BootFileUrl,
DataUrl: req.DataUrl,
TrainUrl: req.TrainUrl,
LogUrl: req.LogUrl,
PoolID: req.PoolID,
CreateVersion: true,
Flavor: models.Flavor{
Code: req.FlavorCode,
},
Parameter: req.Parameters,
UserImageUrl: req.UserImageUrl,
UserCommand: req.UserCommand,
},
})
if err != nil {
log.Error("CreateJob failed: %v", err.Error())
return err
}

var jobTypes []string
jobTypes = append(jobTypes, string(models.JobTypeTrain))
repo := ctx.Repo.Repository
VersionTaskList, VersionListCount, err := models.CloudbrainsVersionList(&models.CloudbrainsOptions{
RepoID: repo.ID,
Type: models.TypeCloudBrainTwo,
JobTypes: jobTypes,
JobID: strconv.FormatInt(jobResult.JobID, 10),
})
if err != nil {
ctx.ServerError("Cloudbrain", err)
return err
}
//将当前版本的isLatestVersion设置为"1"和任务数量更新,任务数量包括当前版本数VersionCount和历史创建的总版本数TotalVersionCount

err = models.CreateCloudbrain(&models.Cloudbrain{
Status: TransTrainJobStatus(jobResult.Status),
UserID: ctx.User.ID,
RepoID: ctx.Repo.Repository.ID,
JobID: strconv.FormatInt(jobResult.JobID, 10),
JobName: req.JobName,
DisplayJobName: req.DisplayJobName,
JobType: string(models.JobTypeTrain),
Type: models.TypeCloudBrainTwo,
VersionID: jobResult.VersionID,
VersionName: jobResult.VersionName,
Uuid: req.Uuid,
DatasetName: req.DatasetName,
CommitID: req.CommitID,
IsLatestVersion: req.IsLatestVersion,
PreVersionName: req.PreVersionName,
ComputeResource: models.NPUResource,
EngineID: MORDELART_USER_IMAGE_ENGINE_ID,
Image: req.UserImageUrl,
TrainUrl: req.TrainUrl,
BranchName: req.BranchName,
Parameters: req.Params,
BootFile: req.BootFile,
DataUrl: req.DataUrl,
LogUrl: req.LogUrl,
PreVersionId: req.PreVersionId,
FlavorCode: req.FlavorCode,
Description: req.Description,
WorkServerNumber: req.WorkServerNumber,
FlavorName: req.FlavorName,
EngineName: req.EngineName,
TotalVersionCount: VersionTaskList[0].TotalVersionCount + 1,
VersionCount: VersionListCount + 1,
CreatedUnix: createTime,
UpdatedUnix: createTime,
})
if err != nil {
log.Error("CreateCloudbrain(%s) failed:%v", req.JobName, err.Error())
return err
}

//将训练任务的上一版本的isLatestVersion设置为"0"
err = models.SetVersionCountAndLatestVersion(strconv.FormatInt(jobResult.JobID, 10), VersionTaskList[0].VersionName, VersionCount, NotLatestVersion, TotalVersionCount)
if err != nil {
ctx.ServerError("Update IsLatestVersion failed", err)
return err
}

return err
}

func TransTrainJobStatus(status int) string {
switch status {
case 0:
@@ -722,7 +671,22 @@ func GenerateInferenceJob(ctx *context.Context, req *GenerateInferenceJobReq) (e
},
})
if err != nil {
log.Error("CreateJob failed: %v", err.Error())
log.Error("createInferenceJob failed: %v", err.Error())
if strings.HasPrefix(err.Error(), UnknownErrorPrefix) {
log.Info("(%s)unknown error, set temp status", req.DisplayJobName)
err = models.InsertCloudbrainTemp(&models.CloudbrainTemp{
JobID: models.TempJobId,
VersionID: models.TempVersionId,
Status: models.TempJobStatus,
Type: models.TypeCloudBrainTwo,
JobName: req.JobName,
JobType: string(models.JobTypeInference),
})
if err != nil {
log.Error("InsertCloudbrainTemp failed: %v", err.Error())
return err
}
}
return err
}

@@ -807,3 +771,455 @@ func InitSpecialPool() {
json.Unmarshal([]byte(setting.ModelArtsSpecialPools), &SpecialPools)
}
}

func HandleTrainJobInfo(task *models.Cloudbrain) error {

result, err := GetTrainJob(task.JobID, strconv.FormatInt(task.VersionID, 10))
if err != nil {
log.Error("GetTrainJob(%s) failed:%v", task.DisplayJobName, err)
return err
}

if result != nil {
oldStatus := task.Status
task.Status = TransTrainJobStatus(result.IntStatus)
task.Duration = result.Duration / 1000
task.TrainJobDuration = result.TrainJobDuration

if task.StartTime == 0 && result.StartTime > 0 {
task.StartTime = timeutil.TimeStamp(result.StartTime / 1000)
}
task.TrainJobDuration = models.ConvertDurationToStr(task.Duration)
if task.EndTime == 0 && models.IsTrainJobTerminal(task.Status) && task.StartTime > 0 {
task.EndTime = task.StartTime.Add(task.Duration)
}
task.CorrectCreateUnix()
if oldStatus != task.Status {
notification.NotifyChangeCloudbrainStatus(task, oldStatus)
}
err = models.UpdateJob(task)
if err != nil {
log.Error("UpdateJob(%s) failed:%v", task.JobName, err)
return err
}
}

return nil
}

func HandleNotebookInfo(task *models.Cloudbrain) error {

result, err := GetNotebook2(task.JobID)
if err != nil {
log.Error("GetNotebook2(%s) failed:%v", task.DisplayJobName, err)
return err
}

if result != nil {
oldStatus := task.Status
task.Status = result.Status
if task.StartTime == 0 && result.Lease.UpdateTime > 0 {
task.StartTime = timeutil.TimeStamp(result.Lease.UpdateTime / 1000)
}
if task.EndTime == 0 && models.IsModelArtsDebugJobTerminal(task.Status) {
task.EndTime = timeutil.TimeStampNow()
}
task.CorrectCreateUnix()
task.ComputeAndSetDuration()
if oldStatus != task.Status {
notification.NotifyChangeCloudbrainStatus(task, oldStatus)
}
if task.FlavorCode == "" {
task.FlavorCode = result.Flavor
}
err = models.UpdateJob(task)
if err != nil {
log.Error("UpdateJob(%s) failed:%v", task.DisplayJobName, err)
return err
}
}

return nil
}

func SyncTempStatusJob() {
jobs, err := models.GetCloudBrainTempJobs()
if err != nil {
log.Error("GetCloudBrainTempJobs failed:%v", err.Error())
return
}

for _, temp := range jobs {
log.Info("start to handle record: %s", temp.JobName)
if temp.Type == models.TypeCloudBrainTwo {
if temp.JobType == string(models.JobTypeDebug) {
err = handleNotebook(temp)
if err != nil {
log.Error("handleNotebook falied:%v", err)
break
}
} else if temp.JobType == string(models.JobTypeTrain) || temp.JobType == string(models.JobTypeInference) {
_, err = models.GetCloudbrainByJobID(temp.JobID)
if err != nil {
//one version
err = handleTrainJob(temp)
if err != nil {
log.Error("handleTrainJob falied:%v", err)
break
}
} else {
//multi version
err = handleTrainJobMultiVersion(temp)
if err != nil {
log.Error("handleTrainJobMultiVersion falied:%v", err)
break
}
}
}
}
}

return
}

func handleNotebook(temp *models.CloudbrainTemp) error {
if temp.Status == models.TempJobStatus {
err := handleTempNotebook(temp)
if err != nil {
log.Error("handleTempNotebook failed:%v", err)
return err
}
} else if temp.Status == string(models.ModelArtsStopping) {
res, err := GetNotebook2(temp.JobID)
if err != nil {
log.Error("GetNotebook2 failed:%v", err)
return err
}

temp.Status = res.Status
if temp.Status == string(models.ModelArtsStopped) {
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp failed:%v", err)
return err
}

_, err := DelNotebook2(temp.JobID)
if err != nil {
log.Error("DelNotebook2 failed:%v", err)
return err
}

temp.Status = string(models.ModelArtsDeleted)
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp failed:%v", err)
return err
}
}
}

return nil
}

func handleTempNotebook(temp *models.CloudbrainTemp) error {
var err error
var isExist bool

for {
result, err := GetNotebookList(1000, 0, "createTime", "DESC", temp.JobName)
if err != nil {
log.Error("GetNotebookList failed:%v", err)
break
}

temp.QueryTimes++
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp failed:%v", err)
}

if result != nil {
for _, notebook := range result.NotebookList {
if temp.JobID == models.TempJobId {
//new notebook
if notebook.JobName == temp.JobName {
isExist = true
temp.Status = notebook.Status
temp.JobID = notebook.JobID
break
}
} else {
//restart: always can find one record
if notebook.JobName == temp.JobName {
if notebook.Status != string(models.ModelArtsStopped) {
isExist = true
temp.Status = notebook.Status
temp.JobID = notebook.JobID
break
}
}
}
}

if isExist {
log.Info("find the record(%s), status(%s)", temp.JobName, temp.Status)
if temp.Status == string(models.ModelArtsCreateFailed) {
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp failed:%v", err)
break
}

_, err := DelNotebook2(temp.JobID)
if err != nil {
log.Error("DelNotebook2(%s) failed:%v", temp.JobName, err)
break
}

temp.Status = string(models.ModelArtsDeleted)
} else {
_, err := ManageNotebook2(temp.JobID, models.NotebookAction{Action: models.ActionStop})
if err != nil {
log.Error("ManageNotebook2(%s) failed:%v", temp.JobName, err)
break
}
temp.Status = string(models.ModelArtsStopping)
}

models.UpdateCloudbrainTemp(temp)
} else {
log.Error("can not find the record(%s) till now", temp.JobName)
err = errors.New("not found")
break
}
} else {
log.Error("can not find the record(%s) till now", temp.JobName)
err = errors.New("not found")
break
}

break
}

if temp.QueryTimes >= setting.MaxTempQueryTimes && !isExist {
log.Info("reach MaxTempQueryTimes, set the job failed")

temp.Status = string(models.ModelArtsTrainJobFailed)
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp(%s) failed:%v", temp.JobName, err)
return err
}
}

return err
}

func handleTrainJob(temp *models.CloudbrainTemp) error {
if temp.Status == models.TempJobStatus {
err := handleTempTrainJob(temp)
if err != nil {
log.Error("handleTempTrainJob failed:%v", err)
return err
}
} else if temp.Status == string(models.ModelArtsTrainJobKilling) {
res, err := GetTrainJob(temp.JobID, temp.VersionID)
if err != nil {
log.Error("GetTrainJob failed:%v", err)
return err
}

temp.Status = TransTrainJobStatus(res.IntStatus)
if temp.Status == string(models.ModelArtsTrainJobKilled) {
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp failed:%v", err)
return err
}

_, err := DelTrainJob(temp.JobID)
if err != nil {
log.Error("DelTrainJob failed:%v", err)
return err
}

temp.Status = string(models.ModelArtsDeleted)
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp failed:%v", err)
return err
}
}
}

return nil
}

func handleTrainJobMultiVersion(temp *models.CloudbrainTemp) error {
if temp.Status == models.TempJobStatus {
err := handleTempTrainJobMultiVersion(temp)
if err != nil {
log.Error("handleTempTrainJobMultiVersion failed:%v", err)
return err
}
} else if temp.Status == string(models.ModelArtsTrainJobKilling) {
res, err := GetTrainJob(temp.JobID, temp.VersionID)
if err != nil {
log.Error("GetTrainJob failed:%v", err)
return err
}

temp.Status = TransTrainJobStatus(res.IntStatus)
if temp.Status == string(models.ModelArtsTrainJobKilled) {
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp failed:%v", err)
return err
}

_, err := DelTrainJobVersion(temp.JobID, temp.VersionID)
if err != nil {
log.Error("DelTrainJob failed:%v", err)
return err
}

temp.Status = string(models.ModelArtsDeleted)
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp failed:%v", err)
return err
}
}

}

return nil
}

func handleTempTrainJobMultiVersion(temp *models.CloudbrainTemp) error {
var err error
var isExist bool

for {
result, err := GetTrainJobVersionList(1000, 1, temp.JobID)
if err != nil {
log.Error("GetTrainJobVersionList failed:%v", err)
break
}

temp.QueryTimes++
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp failed:%v", err)
}

if result != nil {
count, _ := models.GetCloudbrainCountByJobName(temp.JobName, temp.JobType, temp.Type)
if result.VersionCount == int64(count+1) {
isExist = true
temp.Status = TransTrainJobStatus(result.JobVersionList[0].IntStatus)
temp.VersionID = strconv.FormatInt(result.JobVersionList[0].VersionID, 10)

log.Info("find the record(%s), status(%s)", temp.JobName, temp.Status)

_, err := StopTrainJob(temp.JobID, temp.VersionID)
if err != nil {
log.Error("StopTrainJob failed:%v", err)
break
}
temp.Status = string(models.ModelArtsTrainJobKilling)
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp(%s) failed:%v", temp.JobName, err)
break
}
} else {
log.Error("can not find the record(%s) till now", temp.JobName)
err = errors.New("not found")
break
}
}

break
}

if temp.QueryTimes >= setting.MaxTempQueryTimes && !isExist {
log.Info("reach MaxTempQueryTimes, set the job failed")

temp.Status = string(models.ModelArtsTrainJobFailed)
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp(%s) failed:%v", temp.JobName, err)
return err
}
}

return err
}

func handleTempTrainJob(temp *models.CloudbrainTemp) error {
var err error
var isExist bool

for {
result, err := GetTrainJobList(1000, 1, "create_time", "desc", temp.JobName)
if err != nil {
log.Error("GetTrainJobList failed:%v", err)
break
}

temp.QueryTimes++
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp failed:%v", err)
}

if result != nil {
for _, job := range result.JobList {
if temp.JobName == job.JobName && TransTrainJobStatus(job.IntStatus) != string(models.ModelArtsTrainJobFailed) {
isExist = true
temp.Status = TransTrainJobStatus(job.IntStatus)
temp.JobID = strconv.FormatInt(job.JobID, 10)
temp.VersionID = strconv.FormatInt(job.VersionID, 10)

log.Info("find the record(%s), status(%s)", temp.JobName, temp.Status)

_, err = StopTrainJob(temp.JobID, temp.VersionID)
if err != nil {
log.Error("StopTrainJob(%s) failed:%v", temp.JobName, err)
break
}

temp.Status = string(models.ModelArtsTrainJobKilling)
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp(%s) failed:%v", temp.JobName, err)
break
}
}
}

if !isExist {
log.Error("can not find the record(%s) till now", temp.JobName)
err = errors.New("not found")
break
}
}

break
}

if temp.QueryTimes >= setting.MaxTempQueryTimes && !isExist {
log.Info("reach MaxTempQueryTimes, set the job failed")

temp.Status = string(models.ModelArtsTrainJobFailed)
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp(%s) failed:%v", temp.JobName, err)
return err
}
}

return err
}

+ 185
- 31
modules/modelarts/resty.go View File

@@ -37,6 +37,7 @@ const (
NotebookNotFound = "ModelArts.6404"
NotebookNoPermission = "ModelArts.6407"
NotebookInvalid = "ModelArts.6400"
UnknownErrorPrefix = "UNKNOWN:"
)

func getRestyClient() *resty.Client {
@@ -298,6 +299,10 @@ sendjob:
return &result, fmt.Errorf("son.Unmarshal failed: %s", err.Error())
}

if res.StatusCode() == http.StatusBadGateway {
return &result, fmt.Errorf(UnknownErrorPrefix+"createNotebook2 failed(%s): %s", response.ErrorCode, response.ErrorMsg)
}

if len(response.ErrorCode) != 0 {
log.Error("ManageNotebook2 failed(%s): %s", response.ErrorCode, response.ErrorMsg)
if response.ErrorCode == modelartsIllegalToken && retry < 1 {
@@ -506,23 +511,27 @@ sendjob:
log.Error("json.Unmarshal failed(%s): %v", res.String(), err.Error())
return &result, fmt.Errorf("json.Unmarshal failed(%s): %v", res.String(), err.Error())
}
log.Error("createTrainJob failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg)
BootFileErrorMsg := "Invalid OBS path '" + createJobParams.Config.BootFileUrl + "'."
DataSetErrorMsg := "Invalid OBS path '" + createJobParams.Config.DataUrl + "'."
if temp.ErrorMsg == BootFileErrorMsg {
log.Error("启动文件错误!createTrainJob failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg)
log.Error("createTrainJobUserImage failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg)
bootFileErrorMsg := "Invalid OBS path '" + createJobParams.Config.BootFileUrl + "'."
dataSetErrorMsg := "Invalid OBS path '" + createJobParams.Config.DataUrl + "'."
if temp.ErrorMsg == bootFileErrorMsg {
log.Error("启动文件错误!createTrainJobUserImage failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg)
return &result, fmt.Errorf("启动文件错误!")
}
if temp.ErrorMsg == DataSetErrorMsg {
log.Error("数据集错误!createTrainJob failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg)
if temp.ErrorMsg == dataSetErrorMsg {
log.Error("数据集错误!createTrainJobUserImage failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg)
return &result, fmt.Errorf("数据集错误!")
}
return &result, fmt.Errorf("createTrainJob failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg)
if res.StatusCode() == http.StatusBadGateway {
return &result, fmt.Errorf(UnknownErrorPrefix+"createTrainJobUserImage failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg)
} else {
return &result, fmt.Errorf("createTrainJobUserImage failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg)
}
}

if !result.IsSuccess {
log.Error("createTrainJob failed(%s): %s", result.ErrorCode, result.ErrorMsg)
return &result, fmt.Errorf("createTrainJob failed(%s): %s", result.ErrorCode, result.ErrorMsg)
log.Error("createTrainJobUserImage failed(%s): %s", result.ErrorCode, result.ErrorMsg)
return &result, fmt.Errorf("createTrainJobUserImage failed(%s): %s", result.ErrorCode, result.ErrorMsg)
}

return &result, nil
@@ -547,9 +556,6 @@ sendjob:
return nil, fmt.Errorf("resty create train-job: %s", err)
}

req, _ := json.Marshal(createJobParams)
log.Info("%s", req)

if res.StatusCode() == http.StatusUnauthorized && retry < 1 {
retry++
_ = getToken()
@@ -563,17 +569,21 @@ sendjob:
return &result, fmt.Errorf("json.Unmarshal failed(%s): %v", res.String(), err.Error())
}
log.Error("createTrainJob failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg)
BootFileErrorMsg := "Invalid OBS path '" + createJobParams.Config.BootFileUrl + "'."
DataSetErrorMsg := "Invalid OBS path '" + createJobParams.Config.DataUrl + "'."
if temp.ErrorMsg == BootFileErrorMsg {
bootFileErrorMsg := "Invalid OBS path '" + createJobParams.Config.BootFileUrl + "'."
dataSetErrorMsg := "Invalid OBS path '" + createJobParams.Config.DataUrl + "'."
if temp.ErrorMsg == bootFileErrorMsg {
log.Error("启动文件错误!createTrainJob failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg)
return &result, fmt.Errorf("启动文件错误!")
}
if temp.ErrorMsg == DataSetErrorMsg {
if temp.ErrorMsg == dataSetErrorMsg {
log.Error("数据集错误!createTrainJob failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg)
return &result, fmt.Errorf("数据集错误!")
}
return &result, fmt.Errorf("createTrainJob failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg)
if res.StatusCode() == http.StatusBadGateway {
return &result, fmt.Errorf(UnknownErrorPrefix+"createTrainJob failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg)
} else {
return &result, fmt.Errorf("createTrainJob failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg)
}
}

if !result.IsSuccess {
@@ -603,9 +613,6 @@ sendjob:
return nil, fmt.Errorf("resty create train-job version: %s", err)
}

req, _ := json.Marshal(createJobVersionParams)
log.Info("%s", req)

if res.StatusCode() == http.StatusUnauthorized && retry < 1 {
retry++
_ = getToken()
@@ -618,17 +625,23 @@ sendjob:
log.Error("json.Unmarshal failed(%s): %v", res.String(), err.Error())
return &result, fmt.Errorf("json.Unmarshal failed(%s): %v", res.String(), err.Error())
}
BootFileErrorMsg := "Invalid OBS path '" + createJobVersionParams.Config.BootFileUrl + "'."
DataSetErrorMsg := "Invalid OBS path '" + createJobVersionParams.Config.DataUrl + "'."
if temp.ErrorMsg == BootFileErrorMsg {

log.Error("createTrainJobVersion failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg)
bootFileErrorMsg := "Invalid OBS path '" + createJobVersionParams.Config.BootFileUrl + "'."
dataSetErrorMsg := "Invalid OBS path '" + createJobVersionParams.Config.DataUrl + "'."
if temp.ErrorMsg == bootFileErrorMsg {
log.Error("启动文件错误!createTrainJobVersion failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg)
return &result, fmt.Errorf("启动文件错误!")
}
if temp.ErrorMsg == DataSetErrorMsg {
if temp.ErrorMsg == dataSetErrorMsg {
log.Error("数据集错误!createTrainJobVersion failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg)
return &result, fmt.Errorf("数据集错误!")
}
return &result, fmt.Errorf("createTrainJobVersion failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg)
if res.StatusCode() == http.StatusBadGateway {
return &result, fmt.Errorf(UnknownErrorPrefix+"createTrainJobVersion failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg)
} else {
return &result, fmt.Errorf("createTrainJobVersion failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg)
}
}

if !result.IsSuccess {
@@ -761,9 +774,6 @@ sendjob:
goto sendjob
}

//temp, _ := json.Marshal(req)
//log.Info("%s", temp)

if res.StatusCode() != http.StatusOK {
var temp models.ErrorResult
if err = json.Unmarshal([]byte(res.String()), &temp); err != nil {
@@ -1172,7 +1182,11 @@ sendjob:
log.Error("数据集错误!createInferenceJob failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg)
return &result, fmt.Errorf("数据集错误!")
}
return &result, fmt.Errorf("createInferenceJob failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg)
if res.StatusCode() == http.StatusBadGateway {
return &result, fmt.Errorf(UnknownErrorPrefix+"createInferenceJob failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg)
} else {
return &result, fmt.Errorf("createInferenceJob failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg)
}
}

if !result.IsSuccess {
@@ -1212,7 +1226,11 @@ sendjob:
err = json.Unmarshal(res.Body(), &response)
if err != nil {
log.Error("json.Unmarshal failed: %s", err.Error())
return &result, fmt.Errorf("son.Unmarshal failed: %s", err.Error())
return &result, fmt.Errorf("json.Unmarshal failed: %s", err.Error())
}

if res.StatusCode() == http.StatusBadGateway {
return &result, fmt.Errorf(UnknownErrorPrefix+"createNotebook2 failed(%s): %s", response.ErrorCode, response.ErrorMsg)
}

if len(response.ErrorCode) != 0 {
@@ -1271,3 +1289,139 @@ sendjob:

return &result, nil
}

func GetTrainJobList(perPage, page int, sortBy, order, searchContent string) (*models.GetTrainJobListResult, error) {
checkSetting()
client := getRestyClient()
var result models.GetTrainJobListResult

retry := 0

sendjob:
res, err := client.R().
SetQueryParams(map[string]string{
"per_page": strconv.Itoa(perPage),
"page": strconv.Itoa(page),
"sortBy": sortBy,
"order": order,
"search_content": searchContent,
}).
SetAuthToken(TOKEN).
SetResult(&result).
Get(HOST + "/v1/" + setting.ProjectID + urlTrainJob)

if err != nil {
return nil, fmt.Errorf("resty GetTrainJobList: %v", err)
}

if res.StatusCode() == http.StatusUnauthorized && retry < 1 {
retry++
_ = getToken()
goto sendjob
}

if res.StatusCode() != http.StatusOK {
var temp models.ErrorResult
if err = json.Unmarshal([]byte(res.String()), &temp); err != nil {
log.Error("json.Unmarshal failed(%s): %v", res.String(), err.Error())
return &result, fmt.Errorf("json.Unmarshal failed(%s): %v", res.String(), err.Error())
}
log.Error("GetTrainJobList failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg)
return &result, fmt.Errorf(temp.ErrorMsg)
}

if !result.IsSuccess {
log.Error("GetTrainJobList failed(%s): %s", result.ErrorCode, result.ErrorMsg)
return &result, fmt.Errorf(result.ErrorMsg)
}

return &result, nil
}

func GetTrainJobVersionList(perPage, page int, jobID string) (*models.GetTrainJobVersionListResult, error) {
checkSetting()
client := getRestyClient()
var result models.GetTrainJobVersionListResult

retry := 0

sendjob:
res, err := client.R().
SetQueryParams(map[string]string{
"per_page": strconv.Itoa(perPage),
"page": strconv.Itoa(page),
}).
SetAuthToken(TOKEN).
SetResult(&result).
Get(HOST + "/v1/" + setting.ProjectID + urlTrainJob + "/" + jobID + "/versions")

if err != nil {
return nil, fmt.Errorf("resty GetTrainJobVersionList: %v", err)
}

if res.StatusCode() == http.StatusUnauthorized && retry < 1 {
retry++
_ = getToken()
goto sendjob
}

if res.StatusCode() != http.StatusOK {
var temp models.ErrorResult
if err = json.Unmarshal([]byte(res.String()), &temp); err != nil {
log.Error("json.Unmarshal failed(%s): %v", res.String(), err.Error())
return &result, fmt.Errorf("json.Unmarshal failed(%s): %v", res.String(), err.Error())
}
log.Error("GetTrainJobVersionList failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg)
return &result, fmt.Errorf(temp.ErrorMsg)
}

if !result.IsSuccess {
log.Error("GetTrainJobVersionList failed(%s): %s", result.ErrorCode, result.ErrorMsg)
return &result, fmt.Errorf(result.ErrorMsg)
}

return &result, nil
}

func GetNotebookList(limit, offset int, sortBy, order, searchContent string) (*models.GetNotebookListResult, error) {
checkSetting()
client := getRestyClient()
var result models.GetNotebookListResult

retry := 0

sendjob:
res, err := client.R().
SetQueryParams(map[string]string{
"limit": strconv.Itoa(limit),
"offset": strconv.Itoa(offset),
"name": searchContent,
"sort_key": sortBy,
"sort_dir": order,
}).
SetAuthToken(TOKEN).
SetResult(&result).
Get(HOST + "/v1/" + setting.ProjectID + urlNotebook2)

if err != nil {
return nil, fmt.Errorf("resty GetNotebookList: %v", err)
}

if res.StatusCode() == http.StatusUnauthorized && retry < 1 {
retry++
_ = getToken()
goto sendjob
}

if res.StatusCode() != http.StatusOK {
var temp models.ErrorResult
if err = json.Unmarshal([]byte(res.String()), &temp); err != nil {
log.Error("json.Unmarshal failed(%s): %v", res.String(), err.Error())
return &result, fmt.Errorf("json.Unmarshal failed(%s): %v", res.String(), err.Error())
}
log.Error("GetNotebookList failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg)
return &result, fmt.Errorf(temp.ErrorMsg)
}

return &result, nil
}

+ 2
- 0
modules/setting/setting.go View File

@@ -539,6 +539,7 @@ var (
DebugHost string
ImageInfos string
Capacity int
MaxTempQueryTimes int
//train-job
ResourcePools string
Engines string
@@ -1424,6 +1425,7 @@ func NewContext() {
Flavor = sec.Key("FLAVOR").MustString("")
ImageInfos = sec.Key("IMAGE_INFOS").MustString("")
Capacity = sec.Key("IMAGE_INFOS").MustInt(100)
MaxTempQueryTimes = sec.Key("MAX_TEMP_QUERY_TIMES").MustInt(30)
ResourcePools = sec.Key("Resource_Pools").MustString("")
Engines = sec.Key("Engines").MustString("")
EngineVersions = sec.Key("Engine_Versions").MustString("")


+ 4
- 86
routers/api/v1/repo/modelarts.go View File

@@ -26,40 +26,6 @@ import (
routerRepo "code.gitea.io/gitea/routers/repo"
)

func GetModelArtsNotebook(ctx *context.APIContext) {
var (
err error
)

jobID := ctx.Params(":jobid")
repoID := ctx.Repo.Repository.ID
job, err := models.GetRepoCloudBrainByJobID(repoID, jobID)
if err != nil {
ctx.NotFound(err)
return
}
result, err := modelarts.GetJob(jobID)
if err != nil {
ctx.NotFound(err)
return
}
oldStatus := job.Status
job.Status = result.Status
if oldStatus != result.Status {
notification.NotifyChangeCloudbrainStatus(job, oldStatus)
}
err = models.UpdateJob(job)
if err != nil {
log.Error("UpdateJob failed:", err)
}

ctx.JSON(http.StatusOK, map[string]interface{}{
"JobID": jobID,
"JobStatus": result.Status,
})

}

func GetModelArtsNotebook2(ctx *context.APIContext) {
var (
err error
@@ -71,33 +37,16 @@ func GetModelArtsNotebook2(ctx *context.APIContext) {
ctx.NotFound(err)
return
}
result, err := modelarts.GetNotebook2(job.JobID)
err = modelarts.HandleNotebookInfo(job)
if err != nil {
ctx.NotFound(err)
return
}
if job.StartTime == 0 && result.Lease.UpdateTime > 0 {
job.StartTime = timeutil.TimeStamp(result.Lease.UpdateTime / 1000)
}
oldStatus := job.Status
job.Status = result.Status
if job.EndTime == 0 && models.IsModelArtsDebugJobTerminal(job.Status) {
job.EndTime = timeutil.TimeStampNow()
}
job.CorrectCreateUnix()
job.ComputeAndSetDuration()
if oldStatus != result.Status {
notification.NotifyChangeCloudbrainStatus(job, oldStatus)
}
err = models.UpdateJob(job)
if err != nil {
log.Error("UpdateJob failed:", err)
}

ctx.JSON(http.StatusOK, map[string]interface{}{
"ID": ID,
"JobName": job.JobName,
"JobStatus": result.Status,
"JobStatus": job.Status,
"JobDuration": job.TrainJobDuration,
})

@@ -189,27 +138,11 @@ func GetModelArtsTrainJobVersion(ctx *context.APIContext) {
}
}
} else if job.Type == models.TypeCloudBrainTwo {
result, err := modelarts.GetTrainJob(jobID, strconv.FormatInt(job.VersionID, 10))
err := modelarts.HandleTrainJobInfo(job)
if err != nil {
ctx.NotFound(err)
return
}

if job.StartTime == 0 && result.StartTime > 0 {
job.StartTime = timeutil.TimeStamp(result.StartTime / 1000)
}
job.Status = modelarts.TransTrainJobStatus(result.IntStatus)
job.Duration = result.Duration / 1000
job.TrainJobDuration = models.ConvertDurationToStr(job.Duration)

if job.EndTime == 0 && models.IsTrainJobTerminal(job.Status) && job.StartTime > 0 {
job.EndTime = job.StartTime.Add(job.Duration)
}
job.CorrectCreateUnix()
err = models.UpdateTrainJobVersion(job)
if err != nil {
log.Error("UpdateJob failed:", err)
}
} else if job.Type == models.TypeC2Net {
result, err := grampus.GetJob(jobID)
if err != nil {
@@ -558,26 +491,11 @@ func GetModelArtsInferenceJob(ctx *context.APIContext) {
ctx.NotFound(err)
return
}
result, err := modelarts.GetTrainJob(jobID, strconv.FormatInt(job.VersionID, 10))
err = modelarts.HandleTrainJobInfo(job)
if err != nil {
ctx.NotFound(err)
return
}
if job.StartTime == 0 && result.StartTime > 0 {
job.StartTime = timeutil.TimeStamp(result.StartTime / 1000)
}
job.Status = modelarts.TransTrainJobStatus(result.IntStatus)
job.Duration = result.Duration / 1000
job.TrainJobDuration = models.ConvertDurationToStr(job.Duration)

if job.EndTime == 0 && models.IsTrainJobTerminal(job.Status) && job.StartTime > 0 {
job.EndTime = job.StartTime.Add(job.Duration)
}
job.CorrectCreateUnix()
err = models.UpdateInferenceJob(job)
if err != nil {
log.Error("UpdateJob failed:", err)
}

ctx.JSON(http.StatusOK, map[string]interface{}{
"JobID": jobID,


+ 6
- 52
routers/repo/cloudbrain.go View File

@@ -1828,70 +1828,24 @@ func SyncCloudbrainStatus() {
}
} else if task.Type == models.TypeCloudBrainTwo {
if task.JobType == string(models.JobTypeDebug) {
//result, err := modelarts.GetJob(task.JobID)
result, err := modelarts.GetNotebook2(task.JobID)
err := modelarts.HandleNotebookInfo(task)
if err != nil {
log.Error("GetJob(%s) failed:%v", task.JobName, err)
log.Error("HandleNotebookInfo(%s) failed:%v", task.DisplayJobName, err)
continue
}

if result != nil {
oldStatus := task.Status
task.Status = result.Status
if task.StartTime == 0 && result.Lease.UpdateTime > 0 {
task.StartTime = timeutil.TimeStamp(result.Lease.UpdateTime / 1000)
}
if task.EndTime == 0 && models.IsModelArtsDebugJobTerminal(task.Status) {
task.EndTime = timeutil.TimeStampNow()
}
task.CorrectCreateUnix()
task.ComputeAndSetDuration()
if oldStatus != task.Status {
notification.NotifyChangeCloudbrainStatus(task, oldStatus)
}
err = models.UpdateJob(task)
if err != nil {
log.Error("UpdateJob(%s) failed:%v", task.JobName, err)
continue
}
}
} else if task.JobType == string(models.JobTypeTrain) || task.JobType == string(models.JobTypeInference) {
result, err := modelarts.GetTrainJob(task.JobID, strconv.FormatInt(task.VersionID, 10))
err := modelarts.HandleTrainJobInfo(task)
if err != nil {
log.Error("GetTrainJob(%s) failed:%v", task.JobName, err)
log.Error("HandleTrainJobInfo(%s) failed:%v", task.DisplayJobName, err)
continue
}

if result != nil {
oldStatus := task.Status
task.Status = modelarts.TransTrainJobStatus(result.IntStatus)
task.Duration = result.Duration / 1000
task.TrainJobDuration = result.TrainJobDuration

if task.StartTime == 0 && result.StartTime > 0 {
task.StartTime = timeutil.TimeStamp(result.StartTime / 1000)
}
task.TrainJobDuration = models.ConvertDurationToStr(task.Duration)
if task.EndTime == 0 && models.IsTrainJobTerminal(task.Status) && task.StartTime > 0 {
task.EndTime = task.StartTime.Add(task.Duration)
}
task.CorrectCreateUnix()
if oldStatus != task.Status {
notification.NotifyChangeCloudbrainStatus(task, oldStatus)
}
err = models.UpdateJob(task)
if err != nil {
log.Error("UpdateJob(%s) failed:%v", task.JobName, err)
continue
}
}
} else {
log.Error("task.JobType(%s) is error:%s", task.JobName, task.JobType)
log.Error("task.JobType(%s) is error:%s", task.DisplayJobName, task.JobType)
}
} else if task.Type == models.TypeC2Net {
result, err := grampus.GetJob(task.JobID)
if err != nil {
log.Error("GetTrainJob(%s) failed:%v", task.JobName, err)
log.Error("GetTrainJob(%s) failed:%v", task.DisplayJobName, err)
continue
}



+ 2
- 2
routers/repo/grampus.go View File

@@ -348,7 +348,7 @@ func GrampusTrainJobGpuCreate(ctx *context.Context, form auth.CreateGrampusTrain
EngineName: image,
DatasetName: attachment.Name,
IsLatestVersion: modelarts.IsLatestVersion,
VersionCount: modelarts.VersionCount,
VersionCount: modelarts.VersionCountOne,
WorkServerNumber: 1,
}

@@ -398,7 +398,7 @@ func GrampusTrainJobNpuCreate(ctx *context.Context, form auth.CreateGrampusTrain
branchName := form.BranchName
isLatestVersion := modelarts.IsLatestVersion
flavorName := form.FlavorName
versionCount := modelarts.VersionCount
versionCount := modelarts.VersionCountOne
engineName := form.EngineName

if !jobNamePattern.MatchString(displayJobName) {


+ 129
- 197
routers/repo/modelarts.go View File

@@ -15,9 +15,6 @@ import (
"time"
"unicode/utf8"

"code.gitea.io/gitea/modules/notification"
"code.gitea.io/gitea/modules/timeutil"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/auth"
"code.gitea.io/gitea/modules/base"
@@ -26,9 +23,11 @@ import (
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/modelarts"
"code.gitea.io/gitea/modules/notification"
"code.gitea.io/gitea/modules/obs"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/storage"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/util"
)

@@ -272,33 +271,15 @@ func NotebookShow(ctx *context.Context) {
return
}

result, err := modelarts.GetNotebook2(task.JobID)
if err != nil {
log.Error("GET job error", err.Error())
ctx.NotFound(ctx.Req.URL.RequestURI(), nil)
return
}

if result != nil {
if task.DeletedAt.IsZero() { //normal record
if task.Status != result.Status {
oldStatus := task.Status
task.Status = result.Status
models.ParseAndSetDurationFromModelArtsNotebook(result, task)
notification.NotifyChangeCloudbrainStatus(task, oldStatus)
err = models.UpdateJob(task)
if err != nil {
log.Error("GET job error", err.Error())
ctx.NotFound(ctx.Req.URL.RequestURI(), nil)
return
}
}
} else { //deleted record

}
if task.FlavorCode == "" {
task.FlavorCode = result.Flavor
if task.DeletedAt.IsZero() { //normal record
err := modelarts.HandleNotebookInfo(task)
if err != nil {
ctx.Data["error"] = err.Error()
ctx.RenderWithErr(err.Error(), tplModelArtsNotebookShow, nil)
return
}
} else { //deleted record

}

datasetDownload := make([]models.DatasetDownload, 0)
@@ -435,82 +416,127 @@ func NotebookDebug2(ctx *context.Context) {
ctx.Redirect(result.Url + "?token=" + result.Token)
}

func NotebookManage(ctx *context.Context) {
var ID = ctx.Params(":id")
var action = ctx.Params(":action")
var resultCode = "0"
func NotebookRestart(ctx *context.Context) {
var id = ctx.Params(":id")
var resultCode = "-1"
var errorMsg = ""
var status = ""

task := ctx.Cloudbrain

for {
task, err := models.GetCloudbrainByID(ID)
if err != nil {
log.Error("get task(%s) failed:%v", task.JobName, err.Error(), ctx.Data["MsgID"])
resultCode = "-1"
errorMsg = "system error"
ctx.CheckWechatBind()
if ctx.Written() {
return
}
if task.Status != string(models.ModelArtsStopped) && task.Status != string(models.ModelArtsStartFailed) && task.Status != string(models.ModelArtsCreateFailed) {
log.Error("the job(%s) is not stopped", task.JobName, ctx.Data["MsgID"])
errorMsg = "the job is not stopped"
break
}

if action == models.ActionStop {
if task.Status != string(models.ModelArtsRunning) {
log.Error("the job(%s) is not running", task.JobName, ctx.Data["MsgID"])
resultCode = "-1"
errorMsg = "the job is not running"
break
}

if !ctx.IsSigned || (ctx.User.ID != task.UserID && !ctx.IsUserSiteAdmin() && !ctx.IsUserRepoOwner()) {
log.Error("the user has no right ro stop the job", task.JobName, ctx.Data["MsgID"])
resultCode = "-1"
errorMsg = "you have no right to stop the job"
break
}
} else if action == models.ActionRestart {
ctx.CheckWechatBind()
if ctx.Written() {
return
}
if task.Status != string(models.ModelArtsStopped) && task.Status != string(models.ModelArtsStartFailed) && task.Status != string(models.ModelArtsCreateFailed) {
log.Error("the job(%s) is not stopped", task.JobName, ctx.Data["MsgID"])
resultCode = "-1"
errorMsg = "the job is not stopped"
count, err := models.GetCloudbrainNotebookCountByUserID(ctx.User.ID)
if err != nil {
log.Error("GetCloudbrainNotebookCountByUserID failed:%v", err, ctx.Data["MsgID"])
errorMsg = "system error"
break
} else {
if count >= 1 {
log.Error("the user already has running or waiting task", ctx.Data["MsgID"])
errorMsg = "you have already a running or waiting task, can not create more"
break
}
}

if !ctx.IsSigned || (ctx.User.ID != task.UserID && !ctx.IsUserSiteAdmin()) {
log.Error("the user has no right ro restart the job", task.JobName, ctx.Data["MsgID"])
resultCode = "-1"
errorMsg = "you have no right to restart the job"
break
}
createTime := timeutil.TimeStampNow()
param := models.NotebookAction{
Action: models.ActionStart,
}

count, err := models.GetCloudbrainNotebookCountByUserID(ctx.User.ID)
if err != nil {
log.Error("GetCloudbrainNotebookCountByUserID failed:%v", err, ctx.Data["MsgID"])
resultCode = "-1"
errorMsg = "system error"
break
} else {
if count >= 1 {
log.Error("the user already has running or waiting task", ctx.Data["MsgID"])
resultCode = "-1"
errorMsg = "you have already a running or waiting task, can not create more"
break
res, err := modelarts.ManageNotebook2(task.JobID, param)
if err != nil {
log.Error("ManageNotebook2(%s) failed:%v", task.DisplayJobName, err.Error(), ctx.Data["MsgID"])
/* 暂不处理再次调试502的场景,详情见方案
if strings.HasPrefix(err.Error(), modelarts.UnknownErrorPrefix) {
log.Info("(%s)unknown error, set temp status", task.DisplayJobName)
errTemp := models.InsertCloudbrainTemp(&models.CloudbrainTemp{
JobID: task.JobID,
VersionID: models.TempVersionId,
Status: models.TempJobStatus,
Type: task.Type,
JobName: task.JobName,
JobType: task.JobType,
})
if errTemp != nil {
log.Error("InsertCloudbrainTemp failed: %v", errTemp.Error())
}
}
*/
errorMsg = err.Error()
break
}

action = models.ActionStart
} else {
log.Error("the action(%s) is illegal", action, ctx.Data["MsgID"])
newTask := &models.Cloudbrain{
Status: res.Status,
UserID: task.UserID,
RepoID: task.RepoID,
JobID: task.JobID,
JobName: task.JobName,
DisplayJobName: task.DisplayJobName,
JobType: task.JobType,
Type: task.Type,
Uuid: task.Uuid,
Image: task.Image,
ComputeResource: task.ComputeResource,
Description: task.Description,
CreatedUnix: createTime,
UpdatedUnix: createTime,
FlavorCode: task.FlavorCode,
FlavorName: task.FlavorName,
}

err = models.RestartCloudbrain(task, newTask)
if err != nil {
log.Error("RestartCloudbrain(%s) failed:%v", task.JobName, err.Error(), ctx.Data["MsgID"])
errorMsg = "system error"
break
}

status = res.Status
resultCode = "0"
notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, strconv.FormatInt(newTask.ID, 10), newTask.DisplayJobName, models.ActionCreateDebugNPUTask)

break
}

ctx.JSON(200, map[string]string{
"result_code": resultCode,
"error_msg": errorMsg,
"status": status,
"id": id,
})
}

func NotebookStop(ctx *context.Context) {
var id = ctx.Params(":id")
var resultCode = "0"
var errorMsg = ""
var status = ""

task := ctx.Cloudbrain

for {
if task.Status != string(models.ModelArtsRunning) {
log.Error("the job(%s) is not running", task.JobName, ctx.Data["MsgID"])
resultCode = "-1"
errorMsg = "非法操作"
errorMsg = "the job is not running"
break
}

param := models.NotebookAction{
Action: action,
Action: models.ActionStop,
}
createTime := timeutil.TimeStampNow()
res, err := modelarts.ManageNotebook2(task.JobID, param)
if err != nil {
log.Error("ManageNotebook2(%s) failed:%v", task.JobName, err.Error(), ctx.Data["MsgID"])
@@ -523,52 +549,21 @@ func NotebookManage(ctx *context.Context) {
}

status = res.Status
if action == models.ActionStart {
newTask := &models.Cloudbrain{
Status: status,
UserID: task.UserID,
RepoID: task.RepoID,
JobID: task.JobID,
JobName: task.JobName,
DisplayJobName: task.DisplayJobName,
JobType: task.JobType,
Type: task.Type,
Uuid: task.Uuid,
Image: task.Image,
ComputeResource: task.ComputeResource,
Description: task.Description,
CreatedUnix: createTime,
UpdatedUnix: createTime,
FlavorCode: task.FlavorCode,
FlavorName: task.FlavorName,
}

err = models.RestartCloudbrain(task, newTask)
if err != nil {
log.Error("RestartCloudbrain(%s) failed:%v", task.JobName, err.Error(), ctx.Data["MsgID"])
resultCode = "-1"
errorMsg = "system error"
break
}
ID = strconv.FormatInt(newTask.ID, 10)
notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, ID, task.DisplayJobName, models.ActionCreateDebugNPUTask)
} else {
oldStatus := task.Status
task.Status = res.Status
if task.EndTime == 0 && models.IsModelArtsDebugJobTerminal(task.Status) {
task.EndTime = timeutil.TimeStampNow()
}
task.ComputeAndSetDuration()
if oldStatus != task.Status {
notification.NotifyChangeCloudbrainStatus(task, oldStatus)
}
err = models.UpdateJob(task)
if err != nil {
log.Error("UpdateJob(%s) failed:%v", task.JobName, err.Error(), ctx.Data["MsgID"])
resultCode = "-1"
errorMsg = "system error"
break
}
oldStatus := task.Status
task.Status = res.Status
if task.EndTime == 0 && models.IsModelArtsDebugJobTerminal(task.Status) {
task.EndTime = timeutil.TimeStampNow()
}
task.ComputeAndSetDuration()
if oldStatus != task.Status {
notification.NotifyChangeCloudbrainStatus(task, oldStatus)
}
err = models.UpdateJob(task)
if err != nil {
log.Error("UpdateJob(%s) failed:%v", task.JobName, err.Error(), ctx.Data["MsgID"])
resultCode = "-1"
errorMsg = "system error"
break
}

break
@@ -578,7 +573,7 @@ func NotebookManage(ctx *context.Context) {
"result_code": resultCode,
"error_msg": errorMsg,
"status": status,
"id": ID,
"id": id,
})
}

@@ -1092,7 +1087,7 @@ func TrainJobCreate(ctx *context.Context, form auth.CreateModelArtsTrainJobForm)
branchName := form.BranchName
isLatestVersion := modelarts.IsLatestVersion
FlavorName := form.FlavorName
VersionCount := modelarts.VersionCount
VersionCount := modelarts.VersionCountOne
EngineName := form.EngineName

count, err := models.GetCloudbrainTrainJobCountByUserID(ctx.User.ID)
@@ -1826,60 +1821,6 @@ func TrainJobShow(ctx *context.Context) {
ctx.HTML(http.StatusOK, tplModelArtsTrainJobShow)
}

func TrainJobGetLog(ctx *context.Context) {
ctx.Data["PageIsTrainJob"] = true

var jobID = ctx.Params(":jobid")
var logFileName = ctx.Query("file_name")
var baseLine = ctx.Query("base_line")
var order = ctx.Query("order")

if order != modelarts.OrderDesc && order != modelarts.OrderAsc {
log.Error("order(%s) check failed", order)
ctx.HTML(http.StatusBadRequest, tplModelArtsTrainJobShow)
return
}

task, err := models.GetCloudbrainByJobID(jobID)
if err != nil {
log.Error("GetCloudbrainByJobID(%s) failed:%v", jobID, err.Error())
ctx.RenderWithErr(err.Error(), tplModelArtsTrainJobShow, nil)
return
}

result, err := modelarts.GetTrainJobLog(jobID, strconv.FormatInt(task.VersionID, 10), baseLine, logFileName, order, modelarts.Lines)
if err != nil {
log.Error("GetTrainJobLog(%s) failed:%v", jobID, err.Error())
ctx.RenderWithErr(err.Error(), tplModelArtsTrainJobShow, nil)
return
}

ctx.Data["log"] = result
//ctx.HTML(http.StatusOK, tplModelArtsTrainJobShow)
}

func trainJobGetLog(jobID string) (*models.GetTrainJobLogFileNamesResult, *models.GetTrainJobLogResult, error) {
task, err := models.GetCloudbrainByJobID(jobID)
if err != nil {
log.Error("GetCloudbrainByJobID(%s) failed:%v", jobID, err.Error())
return nil, nil, err
}

resultLogFile, err := modelarts.GetTrainJobLogFileNames(jobID, strconv.FormatInt(task.VersionID, 10))
if err != nil {
log.Error("GetTrainJobLogFileNames(%s) failed:%v", jobID, err.Error())
return nil, nil, err
}

result, err := modelarts.GetTrainJobLog(jobID, strconv.FormatInt(task.VersionID, 10), "", resultLogFile.LogFileList[0], modelarts.OrderDesc, modelarts.Lines)
if err != nil {
log.Error("GetTrainJobLog(%s) failed:%v", jobID, err.Error())
return nil, nil, err
}

return resultLogFile, result, err
}

func TrainJobDel(ctx *context.Context) {
var jobID = ctx.Params(":jobid")
var listType = ctx.Query("listType")
@@ -1946,15 +1887,6 @@ func TrainJobStop(ctx *context.Context) {
ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/train-job?listType=" + listType)
}

func canUserCreateTrainJob(uid int64) (bool, error) {
org, err := models.GetOrgByName(setting.AllowedOrg)
if err != nil {
log.Error("get allowed org failed: ", setting.AllowedOrg)
return false, err
}

return org.IsOrgMember(uid)
}
func canUserCreateTrainJobVersion(ctx *context.Context, userID int64) (bool, error) {
if ctx == nil || ctx.User == nil {
log.Error("user unlogin!")
@@ -2046,7 +1978,7 @@ func InferenceJobCreate(ctx *context.Context, form auth.CreateModelArtsInference
EngineName := form.EngineName
LabelName := form.LabelName
isLatestVersion := modelarts.IsLatestVersion
VersionCount := modelarts.VersionCount
VersionCount := modelarts.VersionCountOne
trainUrl := form.TrainUrl
modelName := form.ModelName
modelVersion := form.ModelVersion


+ 2
- 1
routers/routes/routes.go View File

@@ -1183,7 +1183,8 @@ func RegisterRoutes(m *macaron.Macaron) {
m.Group("/:id", func() {
m.Get("", reqRepoCloudBrainReader, repo.NotebookShow)
m.Get("/debug", cloudbrain.AdminOrJobCreaterRight, repo.NotebookDebug2)
m.Post("/:action", reqRepoCloudBrainWriter, repo.NotebookManage)
m.Post("/restart", cloudbrain.AdminOrJobCreaterRight, repo.NotebookRestart)
m.Post("/stop", cloudbrain.AdminOrJobCreaterRight, repo.NotebookStop)
m.Post("/del", cloudbrain.AdminOrOwnerOrJobCreaterRight, repo.NotebookDel)
})
m.Get("/create", reqWechatBind, reqRepoCloudBrainWriter, repo.NotebookNew)


Loading…
Cancel
Save