#4325 添加异步逻辑的错误提示机制

Merged
chenyifan01 merged 6 commits from handle-async-err into V20230531 11 months ago
  1. +1
    -0
      models/cloudbrain.go
  2. +3
    -0
      routers/response/response_list.go
  3. +1
    -1
      services/ai_task_service/cluster/c2net.go
  4. +5
    -5
      services/ai_task_service/container_builder/code_builder.go
  5. +2
    -1
      services/ai_task_service/container_builder/container_builder.go
  6. +2
    -1
      services/ai_task_service/container_builder/container_builder_chan.go
  7. +4
    -4
      services/ai_task_service/container_builder/dataset_builder.go
  8. +3
    -3
      services/ai_task_service/container_builder/file_notebook_code_builder.go
  9. +4
    -4
      services/ai_task_service/container_builder/output_path_builder.go
  10. +9
    -9
      services/ai_task_service/container_builder/pre_model_builder.go
  11. +9
    -12
      services/ai_task_service/task/cloudbrain_one_notebook_task.go
  12. +8
    -12
      services/ai_task_service/task/cloudbrain_two_notebook_task.go
  13. +5
    -9
      services/ai_task_service/task/grampus_notebook_task.go
  14. +1
    -1
      services/ai_task_service/task/grampus_online_infer_task.go
  15. +1
    -1
      services/ai_task_service/task/grampus_train_task.go
  16. +52
    -3
      services/ai_task_service/task/opt.go
  17. +50
    -17
      services/ai_task_service/task/opt_handler.go
  18. +8
    -5
      services/ai_task_service/task/task_service.go

+ 1
- 0
models/cloudbrain.go View File

@@ -238,6 +238,7 @@ type Cloudbrain struct {
EngineID int64 //引擎id
ImageID string //grampus image_id
AiCenter string //grampus ai center: center_id+center_name
FailedReason string `xorm:"text"`

TrainUrl string //输出模型的obs路径
BranchName string `xorm:"varchar(2550)"` //分支名称


+ 3
- 0
routers/response/response_list.go View File

@@ -29,3 +29,6 @@ var STOP_FAILED = &BizError{Code: 2012, DefaultMsg: "Stop AI task failed", TrCod
var DATASET_SIZE_OVER_LIMIT = &BizError{Code: 2013, DefaultMsg: "Stop AI task failed", TrCode: "ai_task.dataset_size_over_limit"}
var BOOT_FILE_MUST_BE_PYTHON = &BizError{Code: 2013, DefaultMsg: "The boot file must be a python file", TrCode: "ai_task.boot_file_must_python"}
var BOOT_FILE_NOT_EXIST = &BizError{Code: 2014, DefaultMsg: "The boot file not exist", TrCode: "ai_task.boot_file_not_exist"}
var DATASET_SELECT_ERROR = &BizError{Code: 2017, DefaultMsg: "Dataset select error: the count exceed the limit or has same name", TrCode: "cloudbrain.error.dataset_select"}
var PARTIAL_DATASETS_NOT_AVAILABLE = &BizError{Code: 2018, DefaultMsg: "There are non-existent or deleted files in the selected dataset file, please select again", TrCode: "cloudbrain.error.partial_datasets_not_available"}
var LOAD_CODE_FAILED = &BizError{Code: 2019, DefaultMsg: "Fail to load code, please check if the right branch is selected.", TrCode: "cloudbrain.load_code_failed"}

+ 1
- 1
services/ai_task_service/cluster/c2net.go View File

@@ -309,7 +309,7 @@ func (c C2NetClusterAdapter) GetNoteBookOperationProfile(jobId string) (*entity.
if err == nil && getJobResult != nil && getJobResult.ExitDiagnostics != "" {
r.Events = append(r.Events, entity.ProfileEvent{
Message: getJobResult.ExitDiagnostics,
Reason: "exit",
Reason: "Exit",
})
}
return r, nil


+ 5
- 5
services/ai_task_service/container_builder/code_builder.go View File

@@ -5,9 +5,9 @@ import (
"code.gitea.io/gitea/modules/cloudbrain"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/routers/response"
"code.gitea.io/gitea/services/ai_task_service/context"
"code.gitea.io/gitea/services/ai_task_service/upload"
"errors"
"strings"
)

@@ -28,14 +28,14 @@ func (b *CodeBuilder) GetContainerType() entity.ContainerDataType {
return entity.ContainerCode
}

func (b *CodeBuilder) Build(ctx *context.CreationContext) ([]entity.ContainerData, error) {
func (b *CodeBuilder) Build(ctx *context.CreationContext) ([]entity.ContainerData, *response.BizError) {
opts := b.Opts
if opts.Disable {
return nil, nil
}
storageTypes := opts.AcceptStorageType
if storageTypes == nil || len(storageTypes) == 0 {
return nil, errors.New("storageType not set")
return nil, response.SYSTEM_ERROR
}

jobName := ctx.Request.JobName
@@ -48,12 +48,12 @@ func (b *CodeBuilder) Build(ctx *context.CreationContext) ([]entity.ContainerDat
if !ctx.Request.IsRestartRequest && !ctx.Request.IsFileNoteBookRequest {
if err := DownloadCode(ctx, codeLocalPath, b.Opts.NotArchive); err != nil {
log.Error("downloadZipCode failed, server timed out: %s (%v)", repo.FullName(), err)
return nil, errors.New("cloudbrain.load_code_failed")
return nil, response.LOAD_CODE_FAILED
}

if err := uploader.UploadDir(codeLocalPath, remoteDir); err != nil {
log.Error("Failed to UploadDir: %s (%v)", repo.FullName(), err)
return nil, errors.New("cloudbrain.load_code_failed")
return nil, response.LOAD_CODE_FAILED
}
}



+ 2
- 1
services/ai_task_service/container_builder/container_builder.go View File

@@ -3,13 +3,14 @@ package container_builder
import (
"code.gitea.io/gitea/entity"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/routers/response"
"code.gitea.io/gitea/services/ai_task_service/context"
"fmt"
"reflect"
)

type ContainerBuilder interface {
Build(ctx *context.CreationContext) ([]entity.ContainerData, error)
Build(ctx *context.CreationContext) ([]entity.ContainerData, *response.BizError)
GetContainerType() entity.ContainerDataType
SetOpts(opts *entity.ContainerBuildOpts)
}


+ 2
- 1
services/ai_task_service/container_builder/container_builder_chan.go View File

@@ -1,6 +1,7 @@
package container_builder

import (
"code.gitea.io/gitea/routers/response"
"code.gitea.io/gitea/services/ai_task_service/context"
)

@@ -17,7 +18,7 @@ func (c *BuilderChain) Next(b ContainerBuilder) *BuilderChain {
return c
}

func (c *BuilderChain) Run(ctx *context.CreationContext) error {
func (c *BuilderChain) Run(ctx *context.CreationContext) *response.BizError {
for _, builder := range c.builderList {
current := ctx.GetContainerDataArray(builder.GetContainerType())
//如果已经存在则不需要再构建


+ 4
- 4
services/ai_task_service/container_builder/dataset_builder.go View File

@@ -5,8 +5,8 @@ import (
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/routers/response"
"code.gitea.io/gitea/services/ai_task_service/context"
"errors"
"strings"
)

@@ -23,7 +23,7 @@ func (b *DatasetBuilder) SetOpts(opts *entity.ContainerBuildOpts) {
b.Opts = opts
}

func (b *DatasetBuilder) Build(ctx *context.CreationContext) ([]entity.ContainerData, error) {
func (b *DatasetBuilder) Build(ctx *context.CreationContext) ([]entity.ContainerData, *response.BizError) {
if b.Opts.Disable {
return nil, nil
}
@@ -42,11 +42,11 @@ func (b *DatasetBuilder) Build(ctx *context.CreationContext) ([]entity.Container
}
if err != nil {
log.Error("GetDatasetInfo failed: %v", err)
return nil, errors.New("cloudbrain.error.dataset_select")
return nil, response.DATASET_SELECT_ERROR
}
uuidArray := strings.Split(uuid, ";")
if datasetInfos == nil || len(datasetInfos) < len(uuidArray) {
return nil, errors.New("cloudbrain.error.partial_datasets_not_available")
return nil, response.PARTIAL_DATASETS_NOT_AVAILABLE
}
var data []entity.ContainerData
for _, datasetInfo := range datasetInfos {


+ 3
- 3
services/ai_task_service/container_builder/file_notebook_code_builder.go View File

@@ -5,8 +5,8 @@ import (
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/routers/response"
"code.gitea.io/gitea/services/ai_task_service/context"
"errors"
)

type FileNoteBookCodeBuilder struct {
@@ -26,7 +26,7 @@ func (b *FileNoteBookCodeBuilder) GetContainerType() entity.ContainerDataType {
return entity.ContainerFileNoteBookCode
}

func (b *FileNoteBookCodeBuilder) Build(ctx *context.CreationContext) ([]entity.ContainerData, error) {
func (b *FileNoteBookCodeBuilder) Build(ctx *context.CreationContext) ([]entity.ContainerData, *response.BizError) {
if b.Opts.Disable {
return nil, nil
}
@@ -37,7 +37,7 @@ func (b *FileNoteBookCodeBuilder) Build(ctx *context.CreationContext) ([]entity.
err := DownloadBranch(repo, getCodePath(ctx.Request.JobName, repo, ctx.Request.FileBranchName), ctx.Request.FileBranchName)
if err != nil {
log.Error("download code failed", err)
return nil, errors.New("cloudbrain.load_code_failed")
return nil, response.LOAD_CODE_FAILED
}
return nil, nil
}


+ 4
- 4
services/ai_task_service/container_builder/output_path_builder.go View File

@@ -4,9 +4,9 @@ import (
"code.gitea.io/gitea/entity"
"code.gitea.io/gitea/modules/cloudbrain"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/routers/response"
"code.gitea.io/gitea/services/ai_task_service/context"
"code.gitea.io/gitea/services/ai_task_service/upload"
"errors"
)

type OutputPathBuilder struct {
@@ -22,13 +22,13 @@ func (b *OutputPathBuilder) SetOpts(opts *entity.ContainerBuildOpts) {
b.Opts = opts
}

func (b *OutputPathBuilder) Build(ctx *context.CreationContext) ([]entity.ContainerData, error) {
func (b *OutputPathBuilder) Build(ctx *context.CreationContext) ([]entity.ContainerData, *response.BizError) {
if b.Opts.Disable {
return nil, nil
}
storageTypes := b.Opts.AcceptStorageType
if storageTypes == nil || len(storageTypes) == 0 {
return nil, errors.New("storageType not set")
return nil, response.SYSTEM_ERROR
}

jobName := ctx.Request.JobName
@@ -38,7 +38,7 @@ func (b *OutputPathBuilder) Build(ctx *context.CreationContext) ([]entity.Contai
err := uploader.MKDIR(remoteDir)
if err != nil {
log.Error("MKDIR err.displayJobName = %s err=%v", ctx.Request.DisplayJobName, err)
return nil, err
return nil, response.NewBizError(err)
}
return []entity.ContainerData{{
ContainerPath: b.Opts.ContainerPath,


+ 9
- 9
services/ai_task_service/container_builder/pre_model_builder.go View File

@@ -1,7 +1,7 @@
package container_builder

import (
"errors"
"code.gitea.io/gitea/routers/response"
"fmt"
"strings"

@@ -29,14 +29,14 @@ func (b *PretrainModelBuilder) SetOpts(opts *entity.ContainerBuildOpts) {
b.Opts = opts
}

func (b *PretrainModelBuilder) Build(ctx *context.CreationContext) ([]entity.ContainerData, error) {
func (b *PretrainModelBuilder) Build(ctx *context.CreationContext) ([]entity.ContainerData, *response.BizError) {
if b.Opts.Disable {
return nil, nil
}
form := ctx.Request
storageTypes := b.Opts.AcceptStorageType
if storageTypes == nil || len(storageTypes) == 0 {
return nil, errors.New("storageType not set")
return nil, response.SYSTEM_ERROR
}
//未选择预训练模型,跳过此步
if form.PretrainModelName == "" {
@@ -44,24 +44,24 @@ func (b *PretrainModelBuilder) Build(ctx *context.CreationContext) ([]entity.Con
}
if form.PretrainModelId == "" {
//异常数据,理论上应该都有modelId
return nil, errors.New("cloudbrain.result_cleared")
return nil, response.RESULT_CLEARD
}
//查出模型数据
m, err := models.QueryModelById(form.PretrainModelId)
if err != nil {
log.Error("Can not find model", err)
return nil, errors.New("repo.modelconvert.manage.model_not_exist")
return nil, response.MODEL_NOT_EXISTS
}
preTrainModelUrl := m.Path
if err != nil {
log.Error("Can not find model", err)
return nil, errors.New("repo.modelconvert.manage.model_not_exist")
return nil, response.MODEL_NOT_EXISTS
}
//模型文件存储方式
oldStorageType := entity.GetStorageTypeFromCloudbrainType(m.Type)
if oldStorageType == "" {
log.Error("model storage type error.modelId=%d", m.ID)
return nil, errors.New("model storage type error")
return nil, response.SYSTEM_ERROR
}

var preTrainModelPath string
@@ -71,7 +71,7 @@ func (b *PretrainModelBuilder) Build(ctx *context.CreationContext) ([]entity.Con
for _, ckptName := range ckptNames {
if !cloudbrainTask.IsModelFileExists(m, ckptName) {
log.Error("model file not exist.name = %s", ckptName)
return nil, errors.New("repo.modelconvert.manage.model_file_not_exist")
return nil, response.MODEL_NOT_EXISTS
}
preTrainModelPath = getPreTrainModelPath(preTrainModelUrl, ckptName)
if !b.Opts.IsStorageTypeIn(oldStorageType) {
@@ -83,7 +83,7 @@ func (b *PretrainModelBuilder) Build(ctx *context.CreationContext) ([]entity.Con
minioPreModelURL, err := dealModelInfo(form.PretrainModelId, form.JobName, ckptName)
if err != nil {
log.Error("Can not find model,modelId=%d err=%v", form.PretrainModelId, err)
return nil, errors.New("repo.modelconvert.manage.model_not_exist")
return nil, response.MODEL_NOT_EXISTS
}
preTrainModelUrl = minioPreModelURL
preTrainModelPath = getPreTrainModelPath(minioPreModelURL, ckptName)


+ 9
- 12
services/ai_task_service/task/cloudbrain_one_notebook_task.go View File

@@ -38,7 +38,7 @@ func (t CloudbrainOneNotebookTaskTemplate) Create(ctx *context.CreationContext)
Next(t.CheckDatasetSize).
Next(t.CheckDatasetExists).
Next(t.InsertCloudbrainRecord4Async).
AsyncNext(t.BuildContainerData, t.CallCreationAPI, t.AfterCallCreationAPI4Async, t.NotifyCreation).
AsyncNextWithErrFun(t.BuildContainerData, t.CallCreationAPI, t.AfterCallCreationAPI4Async, t.NotifyCreation, t.HandleErr4Async).
Operate(ctx)
if err != nil {
log.Error("create CloudbrainOneNotebookTask err.%v", err)
@@ -124,7 +124,7 @@ func (c CloudbrainOneNotebookTaskTemplate) BuildContainerData(ctx *context.Creat
IsFileNoteBookRequest: ctx.Request.IsFileNoteBookRequest,
}).ContainerSteps).Run(ctx)
if err != nil {
return response.NewBizError(err)
return err
}
return nil
}
@@ -158,16 +158,13 @@ func (g CloudbrainOneNotebookTaskTemplate) CallCreationAPI(ctx *context.Creation
res, err := c.CreateNoteBook(req)
if err != nil {
log.Error("CloudbrainOneNotebookTask CreateNoteBook err.req=%+v err=%v", req, err)
ctx.Response = &entity.CreationResponse{
Error: err,
}
return nil
} else {
ctx.Response = &entity.CreationResponse{
JobID: res.JobID,
Status: res.Status,
CreateTime: createTime,
}
return response.NewBizError(err)
}

ctx.Response = &entity.CreationResponse{
JobID: res.JobID,
Status: res.Status,
CreateTime: createTime,
}

return nil


+ 8
- 12
services/ai_task_service/task/cloudbrain_two_notebook_task.go View File

@@ -41,7 +41,7 @@ func (t CloudbrainTwoNotebookTaskTemplate) Create(ctx *context.CreationContext)
Next(t.CheckDatasetSize).
Next(t.CheckDatasetExists).
Next(t.InsertCloudbrainRecord4Async).
AsyncNext(t.BuildContainerData, t.CallCreationAPI, t.AfterCallCreationAPI4Async, t.NotifyCreation).
AsyncNextWithErrFun(t.BuildContainerData, t.CallCreationAPI, t.AfterCallCreationAPI4Async, t.NotifyCreation, t.HandleErr4Async).
Operate(ctx)
if err != nil {
log.Error("create CloudbrainOneNotebookTask err.%v", err)
@@ -130,16 +130,12 @@ func (g CloudbrainTwoNotebookTaskTemplate) CallCreationAPI(ctx *context.Creation
res, err := c.CreateNoteBook(req)
if err != nil {
log.Error("CloudbrainTwoNotebookTaskTemplate CreateNoteBook err.req=%+v err=%v", req, err)
ctx.Response = &entity.CreationResponse{
Error: err,
}
return nil
} else {
ctx.Response = &entity.CreationResponse{
JobID: res.JobID,
Status: res.Status,
CreateTime: createTime,
}
return response.NewBizError(err)
}
ctx.Response = &entity.CreationResponse{
JobID: res.JobID,
Status: res.Status,
CreateTime: createTime,
}

return nil
@@ -175,7 +171,7 @@ func (c CloudbrainTwoNotebookTaskTemplate) BuildContainerData(ctx *context.Creat
IsFileNoteBookRequest: ctx.Request.IsFileNoteBookRequest,
}).ContainerSteps).Run(ctx)
if err != nil {
return response.NewBizError(err)
return err
}
return nil
}


+ 5
- 9
services/ai_task_service/task/grampus_notebook_task.go View File

@@ -1,12 +1,11 @@
package task

import (
"strconv"
"strings"

"code.gitea.io/gitea/entity"
"code.gitea.io/gitea/routers/response"
"code.gitea.io/gitea/services/ai_task_service/container_builder"
"strconv"
"strings"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/log"
@@ -41,7 +40,7 @@ func (t GrampusNoteBookTaskTemplate) Create(ctx *context.CreationContext) (*enti
Next(t.CheckDatasetExists).
Next(t.CheckModel).
Next(t.InsertCloudbrainRecord4Async).
AsyncNext(t.BuildContainerData, t.CallCreationAPI, t.AfterCallCreationAPI4Async, t.NotifyCreation).
AsyncNextWithErrFun(t.BuildContainerData, t.CallCreationAPI, t.AfterCallCreationAPI4Async, t.NotifyCreation, t.HandleErr4Async).
Operate(ctx)
if err != nil {
log.Error("create GrampusNoteBookTask err.%v", err)
@@ -135,7 +134,7 @@ func (c GrampusNoteBookTaskTemplate) BuildContainerData(ctx *context.CreationCon
IsFileNoteBookRequest: ctx.Request.IsFileNoteBookRequest,
}).ContainerSteps).Run(ctx)
if err != nil {
return response.NewBizError(err)
return err
}
return nil
}
@@ -174,10 +173,7 @@ func (g GrampusNoteBookTaskTemplate) CallCreationAPI(ctx *context.CreationContex
res, err := c.CreateNoteBook(req)
if err != nil {
log.Error("GrampusNoteBookTask CreateNoteBook err.req=%+v err=%v", req, err)
ctx.Response = &entity.CreationResponse{
Error: err,
}
return nil
return response.NewBizError(err)
}
if res.JobID == "" {
log.Error("GrampusNoteBookTask CreateNoteBook failed.Cloudbrain.JobID=%s", ctx.SourceCloudbrain.JobID)


+ 1
- 1
services/ai_task_service/task/grampus_online_infer_task.go View File

@@ -179,7 +179,7 @@ func (c GrampusOnlineInferTaskTemplate) BuildContainerData(ctx *context.Creation
IsFileNoteBookRequest: ctx.Request.IsFileNoteBookRequest,
}).ContainerSteps).Run(ctx)
if err != nil {
return response.NewBizError(err)
return err
}
return nil
}


+ 1
- 1
services/ai_task_service/task/grampus_train_task.go View File

@@ -94,7 +94,7 @@ func (c GrampusTrainTaskTemplate) BuildContainerData(ctx *context.CreationContex
IsFileNoteBookRequest: ctx.Request.IsFileNoteBookRequest,
}).ContainerSteps).Run(ctx)
if err != nil {
return response.NewBizError(err)
return err
}
return nil
}


+ 52
- 3
services/ai_task_service/task/opt.go View File

@@ -1,9 +1,11 @@
package task

import (
"code.gitea.io/gitea/entity"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/routers/response"
"code.gitea.io/gitea/services/ai_task_service/context"
"errors"
"fmt"
)

@@ -12,6 +14,7 @@ type CreateFunc func(ctx *context.CreationContext) *response.BizError
type CreationFuncNode struct {
Funcs []CreateFunc
IsAsync bool
ErrFunc CreateFunc
}

func (node CreationFuncNode) Run(ctx *context.CreationContext) *response.BizError {
@@ -43,6 +46,41 @@ func (o *CreateOperator) AsyncNext(f ...CreateFunc) *CreateOperator {
o.FuncArray = append(o.FuncArray, CreationFuncNode{Funcs: f, IsAsync: true})
return o
}
func (o *CreateOperator) NextWithErrFun(f ...CreateFunc) *CreateOperator {
if o.FuncArray == nil {
o.FuncArray = make([]CreationFuncNode, 0)
}
if f == nil || len(f) == 0 {
return o
}
if len(f) < 2 {
log.Error("AsyncNextWithErrFun err.funcs are less than 2")
return o
}

errFun := f[len(f)-1]
normalFun := f[0 : len(f)-1]
o.FuncArray = append(o.FuncArray, CreationFuncNode{Funcs: normalFun, IsAsync: false, ErrFunc: errFun})
return o
}

func (o *CreateOperator) AsyncNextWithErrFun(f ...CreateFunc) *CreateOperator {
if o.FuncArray == nil {
o.FuncArray = make([]CreationFuncNode, 0)
}
if f == nil || len(f) == 0 {
return o
}
if len(f) < 2 {
log.Error("AsyncNextWithErrFun err.funcs are less than 2")
return o
}

errFun := f[len(f)-1]
normalFun := f[0 : len(f)-1]
o.FuncArray = append(o.FuncArray, CreationFuncNode{Funcs: normalFun, IsAsync: true, ErrFunc: errFun})
return o
}

func (o *CreateOperator) Operate(ctx *context.CreationContext) *response.BizError {
var err *response.BizError
@@ -58,18 +96,29 @@ func (o *CreateOperator) Operate(ctx *context.CreationContext) *response.BizErro
}

func runFuncNode(node CreationFuncNode, ctx *context.CreationContext) *response.BizError {
var err *response.BizError
defer func() {
if err := recover(); err != nil {
if tmpErr := recover(); tmpErr != nil {
combinedErr := fmt.Errorf("%s\n%s", err, log.Stack(2))
log.Error("PANIC:%v", combinedErr)
}
}()
for _, f := range node.Funcs {
err := f(ctx)
err = f(ctx)
if err != nil {
log.Error("runFuncNode err.%v", err)
break
}
}
if err != nil && node.ErrFunc != nil {
ctx.Response = &entity.CreationResponse{
Error: errors.New(err.DefaultMsg),
}
newErr := node.ErrFunc(ctx)
if newErr != nil {
log.Error("runFuncNode ErrFunc error.%v", err)
return err
}
}
return nil
return err
}

+ 50
- 17
services/ai_task_service/task/opt_handler.go View File

@@ -40,6 +40,7 @@ type CreationHandler interface {
CreateCloudbrainRecord4Restart(ctx *context.CreationContext) *response.BizError
CallRestartAPI(ctx *context.CreationContext) *response.BizError
NotifyCreation(ctx *context.CreationContext) *response.BizError
HandleErr4Async(ctx *context.CreationContext) *response.BizError
}

//DefaultCreationHandler CreationHandler的默认实现,公共逻辑可以在此结构体中实现
@@ -455,28 +456,16 @@ func (DefaultCreationHandler) AfterCallCreationAPI4Async(ctx *context.CreationCo
return response.AI_TASK_NOT_EXISTS
}
res := ctx.Response
if res == nil {
if res == nil || res.Status == "" {
log.Error("Response not exists.")
return response.SYSTEM_ERROR
}
//更新commitId
c.CommitID = ctx.CommitID

//处理调用集群接口返回错误的情况
if res.Error != nil {
if models.IsNetworkError(res.Error) {
//如果是网络错误,创建是否成功未知,不修改状态,等待定时任务处理
c.Status = models.LocalStatusCreating
} else {
//非网络错误则认为创建失败
c.Status = models.LocalStatusFailed
}
} else {
c.JobID = res.JobID
c.Status = TransAITaskStatus(res.Status)
c.CreatedUnix = res.CreateTime
c.UpdatedUnix = res.CreateTime
}
c.JobID = res.JobID
c.Status = TransAITaskStatus(res.Status)
c.CreatedUnix = res.CreateTime
c.UpdatedUnix = res.CreateTime
c.DatasetName = ctx.Request.DatasetNames

err := models.UpdateJob(c)
@@ -582,3 +571,47 @@ func (DefaultCreationHandler) CallRestartAPI(ctx *context.CreationContext) *resp
log.Error("CallRestartAPI not implements")
return response.SYSTEM_ERROR
}

func (DefaultCreationHandler) HandleErr4Async(ctx *context.CreationContext) *response.BizError {
log.Info("Start to HandleErr4Async.displayJobName=%s jobType=%s cluster=%s", ctx.Request.DisplayJobName, ctx.Request.JobType, ctx.Request.Cluster)

c := ctx.NewCloudbrain
if c == nil {
log.Error("HandleErr4Async cloudbrain not exists.")
return nil
}
if ctx.Response == nil || ctx.Response.Error == nil {
log.Error("HandleErr4Async response err is not exists.")
return nil
}
responseErr := ctx.Response.Error

cloudbrain, err := models.GetCloudbrainByCloudbrainID(c.ID)
if cloudbrain == nil {
log.Error("HandleErr4Async GetCloudbrainByCloudbrainID err.id=%d err=%v", c.ID, err)
return nil
}
//只处理处于PREPARING状态的任务
if !cloudbrain.IsPreparing() {
return nil
}
//处理调用集群接口返回错误的情况
if models.IsNetworkError(responseErr) {
//如果是网络错误,创建是否成功未知,不修改状态,等待定时任务处理
cloudbrain.Status = models.LocalStatusCreating
} else {
//非网络错误则认为创建失败
cloudbrain.Status = models.LocalStatusFailed
cloudbrain.FailedReason = responseErr.Error()
}
cloudbrain.CommitID = ctx.CommitID
cloudbrain.DatasetName = ctx.Request.DatasetNames

err = models.UpdateJob(cloudbrain)
if err != nil {
log.Error("HandleErr4Async UpdateJob err.displayJobName=%s jobType=%s cluster=%s err=%v", ctx.Request.DisplayJobName, ctx.Request.JobType, ctx.Request.Cluster, err)
return response.NewBizError(err)
}
log.Info("HandleErr4Async success.displayJobName=%s jobType=%s cluster=%s", ctx.Request.DisplayJobName, ctx.Request.JobType, ctx.Request.Cluster)
return nil
}

+ 8
- 5
services/ai_task_service/task/task_service.go View File

@@ -551,12 +551,15 @@ func GetOperationProfile(id int64, getOperationProfile GetOperationProfileFunc)
if err != nil {
return nil, err
}
s, err := getOperationProfile(cloudbrain.JobID)
if err != nil {
return nil, err

defaultRes := &entity.OperationProfile{Events: []entity.ProfileEvent{}}
errMsg := cloudbrain.FailedReason
if errMsg != "" {
defaultRes = &entity.OperationProfile{Events: []entity.ProfileEvent{{Reason: "Error", Message: errMsg}}}
}
if s == nil {
return &entity.OperationProfile{Events: []entity.ProfileEvent{}}, nil
s, err := getOperationProfile(cloudbrain.JobID)
if err != nil || s == nil {
return defaultRes, nil
}
return s, err
}


Loading…
Cancel
Save