@@ -6,8 +6,7 @@ import (
"fmt"
"path"
"strconv"
"code.gitea.io/gitea/modules/timeutil"
"strings"
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/context"
@@ -15,6 +14,7 @@ import (
"code.gitea.io/gitea/modules/notification"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/storage"
"code.gitea.io/gitea/modules/timeutil"
)
const (
@@ -59,7 +59,7 @@ const (
PerPage = 10
IsLatestVersion = "1"
NotLatestVersion = "0"
VersionCount = 1
VersionCountOne = 1
SortByCreateTime = "create_time"
ConfigTypeCustom = "custom"
@@ -284,9 +284,24 @@ func GenerateNotebook2(ctx *context.Context, displayJobName, jobName, uuid, desc
})
if err != nil {
log.Error("createNotebook2 failed: %v", err.Error())
if strings.HasPrefix(err.Error(), UnknownErrorPrefix) {
log.Info("(%s)unknown error, set temp status", displayJobName)
errTemp := models.InsertCloudbrainTemp(&models.CloudbrainTemp{
JobID: models.TempJobId,
VersionID: models.TempVersionId,
Status: models.TempJobStatus,
Type: models.TypeCloudBrainTwo,
JobName: jobName,
JobType: string(models.JobTypeDebug),
})
if errTemp != nil {
log.Error("InsertCloudbrainTemp failed: %v", errTemp.Error())
return errTemp
}
}
return err
}
err = models.CreateCloudbrain(&models.Cloudbrain{
task := &models.Cloudbrain{
Status: jobResult.Status,
UserID: ctx.User.ID,
RepoID: ctx.Repo.Repository.ID,
@@ -302,16 +317,13 @@ func GenerateNotebook2(ctx *context.Context, displayJobName, jobName, uuid, desc
Description: description,
CreatedUnix: createTime,
UpdatedUnix: createTime,
})
if err != nil {
return err
}
task, err := models.GetCloudbrainByName(jobName)
err = models.CreateCloudbrain(task)
if err != nil {
log.Error("GetCloudbrainByName failed: %v", err.Error())
return err
}
stringId := strconv.FormatInt(task.ID, 10)
notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, stringId, displayJobName, models.ActionCreateDebugNPUTask)
return nil
@@ -364,7 +376,22 @@ func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (err error
})
}
if createErr != nil {
log.Error("CreateJob failed: %v", createErr.Error())
log.Error("createTrainJob failed: %v", createErr.Error())
if strings.HasPrefix(createErr.Error(), UnknownErrorPrefix) {
log.Info("(%s)unknown error, set temp status", req.DisplayJobName)
errTemp := models.InsertCloudbrainTemp(&models.CloudbrainTemp{
JobID: models.TempJobId,
VersionID: models.TempVersionId,
Status: models.TempJobStatus,
Type: models.TypeCloudBrainTwo,
JobName: req.JobName,
JobType: string(models.JobTypeTrain),
})
if errTemp != nil {
log.Error("InsertCloudbrainTemp failed: %v", errTemp.Error())
return errTemp
}
}
return createErr
}
jobId := strconv.FormatInt(jobResult.JobID, 10)
@@ -438,7 +465,7 @@ func GenerateTrainJobVersion(ctx *context.Context, req *GenerateTrainJobReq, job
createTime := timeutil.TimeStampNow()
var jobResult *models.CreateTrainJobResult
var createErr error
log.Info(" req.EngineID =" + fmt.Sprint(req.EngineID))
if req.EngineID < 0 {
jobResult, createErr = createTrainJobVersionUserImage(models.CreateTrainJobVersionUserImageParams{
Description: req.Description,
@@ -480,7 +507,22 @@ func GenerateTrainJobVersion(ctx *context.Context, req *GenerateTrainJobReq, job
}, jobId)
}
if createErr != nil {
log.Error("CreateJob failed: %v", createErr.Error())
log.Error("createTrainJobVersion failed: %v", createErr.Error())
if strings.HasPrefix(createErr.Error(), UnknownErrorPrefix) {
log.Info("(%s)unknown error, set temp status", req.DisplayJobName)
errTemp := models.InsertCloudbrainTemp(&models.CloudbrainTemp{
JobID: jobId,
VersionID: models.TempVersionId,
Status: models.TempJobStatus,
Type: models.TypeCloudBrainTwo,
JobName: req.JobName,
JobType: string(models.JobTypeTrain),
})
if errTemp != nil {
log.Error("InsertCloudbrainTemp failed: %v", errTemp.Error())
return errTemp
}
}
return createErr
}
@@ -540,7 +582,7 @@ func GenerateTrainJobVersion(ctx *context.Context, req *GenerateTrainJobReq, job
}
//将训练任务的上一版本的isLatestVersion设置为"0"
createErr = models.SetVersionCountAndLatestVersion(strconv.FormatInt(jobResult.JobID, 10), VersionTaskList[0].VersionName, VersionCount, NotLatestVersion, TotalVersionCount)
createErr = models.SetVersionCountAndLatestVersion(strconv.FormatInt(jobResult.JobID, 10), VersionTaskList[0].VersionName, VersionCountOne , NotLatestVersion, TotalVersionCount)
if createErr != nil {
ctx.ServerError("Update IsLatestVersion failed", createErr)
return createErr
@@ -549,99 +591,6 @@ func GenerateTrainJobVersion(ctx *context.Context, req *GenerateTrainJobReq, job
return createErr
}
func GenerateTrainJobVersionByUserImage(ctx *context.Context, req *GenerateTrainJobReq, jobId string) (err error) {
createTime := timeutil.TimeStampNow()
jobResult, err := createTrainJobUserImage(models.CreateUserImageTrainJobParams{
JobName: req.JobName,
Description: req.Description,
Config: models.UserImageConfig{
WorkServerNum: req.WorkServerNumber,
AppUrl: req.CodeObsPath,
BootFileUrl: req.BootFileUrl,
DataUrl: req.DataUrl,
TrainUrl: req.TrainUrl,
LogUrl: req.LogUrl,
PoolID: req.PoolID,
CreateVersion: true,
Flavor: models.Flavor{
Code: req.FlavorCode,
},
Parameter: req.Parameters,
UserImageUrl: req.UserImageUrl,
UserCommand: req.UserCommand,
},
})
if err != nil {
log.Error("CreateJob failed: %v", err.Error())
return err
}
var jobTypes []string
jobTypes = append(jobTypes, string(models.JobTypeTrain))
repo := ctx.Repo.Repository
VersionTaskList, VersionListCount, err := models.CloudbrainsVersionList(&models.CloudbrainsOptions{
RepoID: repo.ID,
Type: models.TypeCloudBrainTwo,
JobTypes: jobTypes,
JobID: strconv.FormatInt(jobResult.JobID, 10),
})
if err != nil {
ctx.ServerError("Cloudbrain", err)
return err
}
//将当前版本的isLatestVersion设置为"1"和任务数量更新,任务数量包括当前版本数VersionCount和历史创建的总版本数TotalVersionCount
err = models.CreateCloudbrain(&models.Cloudbrain{
Status: TransTrainJobStatus(jobResult.Status),
UserID: ctx.User.ID,
RepoID: ctx.Repo.Repository.ID,
JobID: strconv.FormatInt(jobResult.JobID, 10),
JobName: req.JobName,
DisplayJobName: req.DisplayJobName,
JobType: string(models.JobTypeTrain),
Type: models.TypeCloudBrainTwo,
VersionID: jobResult.VersionID,
VersionName: jobResult.VersionName,
Uuid: req.Uuid,
DatasetName: req.DatasetName,
CommitID: req.CommitID,
IsLatestVersion: req.IsLatestVersion,
PreVersionName: req.PreVersionName,
ComputeResource: models.NPUResource,
EngineID: MORDELART_USER_IMAGE_ENGINE_ID,
Image: req.UserImageUrl,
TrainUrl: req.TrainUrl,
BranchName: req.BranchName,
Parameters: req.Params,
BootFile: req.BootFile,
DataUrl: req.DataUrl,
LogUrl: req.LogUrl,
PreVersionId: req.PreVersionId,
FlavorCode: req.FlavorCode,
Description: req.Description,
WorkServerNumber: req.WorkServerNumber,
FlavorName: req.FlavorName,
EngineName: req.EngineName,
TotalVersionCount: VersionTaskList[0].TotalVersionCount + 1,
VersionCount: VersionListCount + 1,
CreatedUnix: createTime,
UpdatedUnix: createTime,
})
if err != nil {
log.Error("CreateCloudbrain(%s) failed:%v", req.JobName, err.Error())
return err
}
//将训练任务的上一版本的isLatestVersion设置为"0"
err = models.SetVersionCountAndLatestVersion(strconv.FormatInt(jobResult.JobID, 10), VersionTaskList[0].VersionName, VersionCount, NotLatestVersion, TotalVersionCount)
if err != nil {
ctx.ServerError("Update IsLatestVersion failed", err)
return err
}
return err
}
func TransTrainJobStatus(status int) string {
switch status {
case 0:
@@ -722,7 +671,22 @@ func GenerateInferenceJob(ctx *context.Context, req *GenerateInferenceJobReq) (e
},
})
if err != nil {
log.Error("CreateJob failed: %v", err.Error())
log.Error("createInferenceJob failed: %v", err.Error())
if strings.HasPrefix(err.Error(), UnknownErrorPrefix) {
log.Info("(%s)unknown error, set temp status", req.DisplayJobName)
err = models.InsertCloudbrainTemp(&models.CloudbrainTemp{
JobID: models.TempJobId,
VersionID: models.TempVersionId,
Status: models.TempJobStatus,
Type: models.TypeCloudBrainTwo,
JobName: req.JobName,
JobType: string(models.JobTypeInference),
})
if err != nil {
log.Error("InsertCloudbrainTemp failed: %v", err.Error())
return err
}
}
return err
}
@@ -807,3 +771,455 @@ func InitSpecialPool() {
json.Unmarshal([]byte(setting.ModelArtsSpecialPools), &SpecialPools)
}
}
func HandleTrainJobInfo(task *models.Cloudbrain) error {
result, err := GetTrainJob(task.JobID, strconv.FormatInt(task.VersionID, 10))
if err != nil {
log.Error("GetTrainJob(%s) failed:%v", task.DisplayJobName, err)
return err
}
if result != nil {
oldStatus := task.Status
task.Status = TransTrainJobStatus(result.IntStatus)
task.Duration = result.Duration / 1000
task.TrainJobDuration = result.TrainJobDuration
if task.StartTime == 0 && result.StartTime > 0 {
task.StartTime = timeutil.TimeStamp(result.StartTime / 1000)
}
task.TrainJobDuration = models.ConvertDurationToStr(task.Duration)
if task.EndTime == 0 && models.IsTrainJobTerminal(task.Status) && task.StartTime > 0 {
task.EndTime = task.StartTime.Add(task.Duration)
}
task.CorrectCreateUnix()
if oldStatus != task.Status {
notification.NotifyChangeCloudbrainStatus(task, oldStatus)
}
err = models.UpdateJob(task)
if err != nil {
log.Error("UpdateJob(%s) failed:%v", task.JobName, err)
return err
}
}
return nil
}
func HandleNotebookInfo(task *models.Cloudbrain) error {
result, err := GetNotebook2(task.JobID)
if err != nil {
log.Error("GetNotebook2(%s) failed:%v", task.DisplayJobName, err)
return err
}
if result != nil {
oldStatus := task.Status
task.Status = result.Status
if task.StartTime == 0 && result.Lease.UpdateTime > 0 {
task.StartTime = timeutil.TimeStamp(result.Lease.UpdateTime / 1000)
}
if task.EndTime == 0 && models.IsModelArtsDebugJobTerminal(task.Status) {
task.EndTime = timeutil.TimeStampNow()
}
task.CorrectCreateUnix()
task.ComputeAndSetDuration()
if oldStatus != task.Status {
notification.NotifyChangeCloudbrainStatus(task, oldStatus)
}
if task.FlavorCode == "" {
task.FlavorCode = result.Flavor
}
err = models.UpdateJob(task)
if err != nil {
log.Error("UpdateJob(%s) failed:%v", task.DisplayJobName, err)
return err
}
}
return nil
}
func SyncTempStatusJob() {
jobs, err := models.GetCloudBrainTempJobs()
if err != nil {
log.Error("GetCloudBrainTempJobs failed:%v", err.Error())
return
}
for _, temp := range jobs {
log.Info("start to handle record: %s", temp.JobName)
if temp.Type == models.TypeCloudBrainTwo {
if temp.JobType == string(models.JobTypeDebug) {
err = handleNotebook(temp)
if err != nil {
log.Error("handleNotebook falied:%v", err)
break
}
} else if temp.JobType == string(models.JobTypeTrain) || temp.JobType == string(models.JobTypeInference) {
_, err = models.GetCloudbrainByJobID(temp.JobID)
if err != nil {
//one version
err = handleTrainJob(temp)
if err != nil {
log.Error("handleTrainJob falied:%v", err)
break
}
} else {
//multi version
err = handleTrainJobMultiVersion(temp)
if err != nil {
log.Error("handleTrainJobMultiVersion falied:%v", err)
break
}
}
}
}
}
return
}
func handleNotebook(temp *models.CloudbrainTemp) error {
if temp.Status == models.TempJobStatus {
err := handleTempNotebook(temp)
if err != nil {
log.Error("handleTempNotebook failed:%v", err)
return err
}
} else if temp.Status == string(models.ModelArtsStopping) {
res, err := GetNotebook2(temp.JobID)
if err != nil {
log.Error("GetNotebook2 failed:%v", err)
return err
}
temp.Status = res.Status
if temp.Status == string(models.ModelArtsStopped) {
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp failed:%v", err)
return err
}
_, err := DelNotebook2(temp.JobID)
if err != nil {
log.Error("DelNotebook2 failed:%v", err)
return err
}
temp.Status = string(models.ModelArtsDeleted)
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp failed:%v", err)
return err
}
}
}
return nil
}
func handleTempNotebook(temp *models.CloudbrainTemp) error {
var err error
var isExist bool
for {
result, err := GetNotebookList(1000, 0, "createTime", "DESC", temp.JobName)
if err != nil {
log.Error("GetNotebookList failed:%v", err)
break
}
temp.QueryTimes++
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp failed:%v", err)
}
if result != nil {
for _, notebook := range result.NotebookList {
if temp.JobID == models.TempJobId {
//new notebook
if notebook.JobName == temp.JobName {
isExist = true
temp.Status = notebook.Status
temp.JobID = notebook.JobID
break
}
} else {
//restart: always can find one record
if notebook.JobName == temp.JobName {
if notebook.Status != string(models.ModelArtsStopped) {
isExist = true
temp.Status = notebook.Status
temp.JobID = notebook.JobID
break
}
}
}
}
if isExist {
log.Info("find the record(%s), status(%s)", temp.JobName, temp.Status)
if temp.Status == string(models.ModelArtsCreateFailed) {
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp failed:%v", err)
break
}
_, err := DelNotebook2(temp.JobID)
if err != nil {
log.Error("DelNotebook2(%s) failed:%v", temp.JobName, err)
break
}
temp.Status = string(models.ModelArtsDeleted)
} else {
_, err := ManageNotebook2(temp.JobID, models.NotebookAction{Action: models.ActionStop})
if err != nil {
log.Error("ManageNotebook2(%s) failed:%v", temp.JobName, err)
break
}
temp.Status = string(models.ModelArtsStopping)
}
models.UpdateCloudbrainTemp(temp)
} else {
log.Error("can not find the record(%s) till now", temp.JobName)
err = errors.New("not found")
break
}
} else {
log.Error("can not find the record(%s) till now", temp.JobName)
err = errors.New("not found")
break
}
break
}
if temp.QueryTimes >= setting.MaxTempQueryTimes && !isExist {
log.Info("reach MaxTempQueryTimes, set the job failed")
temp.Status = string(models.ModelArtsTrainJobFailed)
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp(%s) failed:%v", temp.JobName, err)
return err
}
}
return err
}
func handleTrainJob(temp *models.CloudbrainTemp) error {
if temp.Status == models.TempJobStatus {
err := handleTempTrainJob(temp)
if err != nil {
log.Error("handleTempTrainJob failed:%v", err)
return err
}
} else if temp.Status == string(models.ModelArtsTrainJobKilling) {
res, err := GetTrainJob(temp.JobID, temp.VersionID)
if err != nil {
log.Error("GetTrainJob failed:%v", err)
return err
}
temp.Status = TransTrainJobStatus(res.IntStatus)
if temp.Status == string(models.ModelArtsTrainJobKilled) {
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp failed:%v", err)
return err
}
_, err := DelTrainJob(temp.JobID)
if err != nil {
log.Error("DelTrainJob failed:%v", err)
return err
}
temp.Status = string(models.ModelArtsDeleted)
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp failed:%v", err)
return err
}
}
}
return nil
}
func handleTrainJobMultiVersion(temp *models.CloudbrainTemp) error {
if temp.Status == models.TempJobStatus {
err := handleTempTrainJobMultiVersion(temp)
if err != nil {
log.Error("handleTempTrainJobMultiVersion failed:%v", err)
return err
}
} else if temp.Status == string(models.ModelArtsTrainJobKilling) {
res, err := GetTrainJob(temp.JobID, temp.VersionID)
if err != nil {
log.Error("GetTrainJob failed:%v", err)
return err
}
temp.Status = TransTrainJobStatus(res.IntStatus)
if temp.Status == string(models.ModelArtsTrainJobKilled) {
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp failed:%v", err)
return err
}
_, err := DelTrainJobVersion(temp.JobID, temp.VersionID)
if err != nil {
log.Error("DelTrainJob failed:%v", err)
return err
}
temp.Status = string(models.ModelArtsDeleted)
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp failed:%v", err)
return err
}
}
}
return nil
}
func handleTempTrainJobMultiVersion(temp *models.CloudbrainTemp) error {
var err error
var isExist bool
for {
result, err := GetTrainJobVersionList(1000, 1, temp.JobID)
if err != nil {
log.Error("GetTrainJobVersionList failed:%v", err)
break
}
temp.QueryTimes++
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp failed:%v", err)
}
if result != nil {
count, _ := models.GetCloudbrainCountByJobName(temp.JobName, temp.JobType, temp.Type)
if result.VersionCount == int64(count+1) {
isExist = true
temp.Status = TransTrainJobStatus(result.JobVersionList[0].IntStatus)
temp.VersionID = strconv.FormatInt(result.JobVersionList[0].VersionID, 10)
log.Info("find the record(%s), status(%s)", temp.JobName, temp.Status)
_, err := StopTrainJob(temp.JobID, temp.VersionID)
if err != nil {
log.Error("StopTrainJob failed:%v", err)
break
}
temp.Status = string(models.ModelArtsTrainJobKilling)
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp(%s) failed:%v", temp.JobName, err)
break
}
} else {
log.Error("can not find the record(%s) till now", temp.JobName)
err = errors.New("not found")
break
}
}
break
}
if temp.QueryTimes >= setting.MaxTempQueryTimes && !isExist {
log.Info("reach MaxTempQueryTimes, set the job failed")
temp.Status = string(models.ModelArtsTrainJobFailed)
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp(%s) failed:%v", temp.JobName, err)
return err
}
}
return err
}
func handleTempTrainJob(temp *models.CloudbrainTemp) error {
var err error
var isExist bool
for {
result, err := GetTrainJobList(1000, 1, "create_time", "desc", temp.JobName)
if err != nil {
log.Error("GetTrainJobList failed:%v", err)
break
}
temp.QueryTimes++
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp failed:%v", err)
}
if result != nil {
for _, job := range result.JobList {
if temp.JobName == job.JobName && TransTrainJobStatus(job.IntStatus) != string(models.ModelArtsTrainJobFailed) {
isExist = true
temp.Status = TransTrainJobStatus(job.IntStatus)
temp.JobID = strconv.FormatInt(job.JobID, 10)
temp.VersionID = strconv.FormatInt(job.VersionID, 10)
log.Info("find the record(%s), status(%s)", temp.JobName, temp.Status)
_, err = StopTrainJob(temp.JobID, temp.VersionID)
if err != nil {
log.Error("StopTrainJob(%s) failed:%v", temp.JobName, err)
break
}
temp.Status = string(models.ModelArtsTrainJobKilling)
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp(%s) failed:%v", temp.JobName, err)
break
}
}
}
if !isExist {
log.Error("can not find the record(%s) till now", temp.JobName)
err = errors.New("not found")
break
}
}
break
}
if temp.QueryTimes >= setting.MaxTempQueryTimes && !isExist {
log.Info("reach MaxTempQueryTimes, set the job failed")
temp.Status = string(models.ModelArtsTrainJobFailed)
err = models.UpdateCloudbrainTemp(temp)
if err != nil {
log.Error("UpdateCloudbrainTemp(%s) failed:%v", temp.JobName, err)
return err
}
}
return err
}