#597 #530

Merged
yangxzh1 merged 1 commits from openioctopus/octopus:master into master 1 year ago
  1. +43
    -0
      server/base-server/internal/common/job.go
  2. +48
    -1
      server/base-server/internal/service/develop/callback.go
  3. +4
    -4
      server/base-server/internal/service/develop/develop.go
  4. +44
    -6
      server/base-server/internal/service/trainjob/train_job.go
  5. +0
    -2
      server/volcano/pkg/controllers/job/plugins/svc/svc.go

+ 43
- 0
server/base-server/internal/common/job.go View File

@@ -0,0 +1,43 @@
package common

import (
"encoding/json"
"server/common/constant"
"server/common/utils"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
typeJob "volcano.sh/apis/pkg/apis/batch/v1alpha1"
)

func GetStopDetail(detailstr string) *typeJob.JobStatusDetail {
detail := typeJob.JobStatusDetail{}
json.Unmarshal([]byte(detailstr), &detail)

if detail.Job != nil {
if !utils.IsCompletedState(detail.Job.State) {
detail.Job.State = constant.STOPPED
}
if detail.Job.FinishedAt == nil {
detail.Job.FinishedAt = &metav1.Time{Time: time.Now()}
}
for _, role := range detail.Tasks {
if !utils.IsCompletedState(role.State) {
role.State = constant.STOPPED
}
if role.Replicas == nil {
continue
}
for _, roleReplica := range role.Replicas {
if !utils.IsCompletedState(roleReplica.State) {
roleReplica.State = constant.STOPPED
}
if roleReplica.FinishedAt == nil {
roleReplica.FinishedAt = &metav1.Time{Time: time.Now()}
}
}
}
}

return &detail
}

+ 48
- 1
server/base-server/internal/service/develop/callback.go View File

@@ -10,6 +10,8 @@ import (
"strings"
"time"

jobUtil "server/base-server/internal/common"

typeJob "volcano.sh/apis/pkg/apis/batch/v1alpha1"
)

@@ -17,7 +19,45 @@ func (s *developService) onJobAdd(obj interface{}) {
}

func (s *developService) onJobDelete(obj interface{}) {

job := utils.ConvertObjToOtjob(obj)
if job == nil {
return
}
if job.Annotations == nil {
return
}
jobType, found := job.Annotations[constant.JOB_TYPE]
if !found || jobType != constant.NotebookJob {
return
}
nbJob, err := s.data.DevelopDao.GetNotebookJob(context.TODO(), job.Name)
if err != nil {
s.log.Error(context.TODO(), "GetNotebookJob err when onJobDelete: "+job.Name, err)
return
}
detail := jobUtil.GetStopDetail(nbJob.Detail)
detailBuf, err := json.Marshal(detail)
if err != nil {
s.log.Error(context.TODO(), "Marshal err when onJobDelete:"+job.Name, err)
}
newJob := &model.NotebookJob{
Id: job.Name,
Detail: string(detailBuf),
}
if !utils.IsCompletedState(nbJob.Status) {
newJob.Status = constant.STOPPED
err = s.data.DevelopDao.UpdateNotebookSelective(context.TODO(), &model.Notebook{
Id: nbJob.NotebookId,
Status: constant.STOPPED,
})
if err != nil {
s.log.Error(context.TODO(), "UpdateNotebookSelective err when onJobDelete:"+job.Name, err)
}
}
err = s.data.DevelopDao.UpdateNotebookJobSelective(context.TODO(), newJob)
if err != nil {
s.log.Error(context.TODO(), "UpdateNotebookJobSelective err when onJobDelete:"+job.Name, err)
}
}

func (s *developService) onJobUpdate(old, obj interface{}) {
@@ -126,4 +166,11 @@ func (s *developService) onJobUpdate(old, obj interface{}) {
s.log.Error(ctx, "create notebook event record error:", err)
}
}

if utils.IsCompletedState(newState) {
err = s.data.Cluster.DeleteJob(context.TODO(), newjob.Namespace, newjob.Name)
if err != nil {
s.log.Error(context.TODO(), "DeleteJob err when onJobUpdate: "+newjob.Name, err)
}
}
}

+ 4
- 4
server/base-server/internal/service/develop/develop.go View File

@@ -659,17 +659,17 @@ func (s *developService) StopNotebook(ctx context.Context, req *api.StopNotebook
return nil, errors.Errorf(nil, errors.ErrorNotebookStatusForbidden)
}

err = s.data.Cluster.DeleteJob(ctx, nb.UserId, nbJob.Id)
err = s.deleteIngress(ctx, nb, nbJob)
if err != nil {
return nil, err
s.log.Errorw(ctx, "err", err)
}

err = s.deleteIngress(ctx, nb, nbJob)
err = s.deleteService(ctx, nb, nbJob)
if err != nil {
s.log.Errorw(ctx, "err", err)
}

err = s.deleteService(ctx, nb, nbJob)
err = s.data.Cluster.DeleteJob(ctx, nb.UserId, nbJob.Id)
if err != nil {
s.log.Errorw(ctx, "err", err)
}


+ 44
- 6
server/base-server/internal/service/trainjob/train_job.go View File

@@ -6,6 +6,7 @@ import (
"io/ioutil"
api "server/base-server/api/v1"
"server/base-server/internal/common"
jobUtil "server/base-server/internal/common"
"server/base-server/internal/conf"
"server/base-server/internal/data"
"server/base-server/internal/data/dao/model"
@@ -667,9 +668,8 @@ func (s *trainJobService) StopJob(ctx context.Context, req *api.StopJobRequest)
return nil, err
}

err = s.data.Cluster.DeleteJob(ctx, job.UserId, req.Id)
if err != nil {
return nil, err
if utils.IsCompletedState(job.Status) {
return nil, errors.Errorf(nil, errors.ErrorStopTerminatedJob)
}

now := time.Now()
@@ -688,6 +688,11 @@ func (s *trainJobService) StopJob(ctx context.Context, req *api.StopJobRequest)
s.log.Error(ctx, err)
}

err = s.data.Cluster.DeleteJob(ctx, job.UserId, req.Id)
if err != nil {
s.log.Error(ctx, err)
}

return &api.StopJobReply{StoppedAt: time.Now().Unix()}, nil
}

@@ -1108,9 +1113,7 @@ func (s *trainJobService) getJobDetail(ctx context.Context, jobID string) (*type
}

func defaultDetail(trainJob *model.TrainJob) *typeJob.JobStatusDetail {

status := constant.PREPARING

if trainJob.Status == constant.STOPPED ||
constant.SUSPENDED == trainJob.Status ||
constant.FAILED == trainJob.Status {
@@ -1133,7 +1136,38 @@ func (s *trainJobService) onJobAdd(obj interface{}) {
}

func (s *trainJobService) onJobDelete(obj interface{}) {

job := utils.ConvertObjToOtjob(obj)
if job == nil {
return
}
if job.Annotations == nil {
return
}
jobType, found := job.Annotations[constant.JOB_TYPE]
if !found || jobType != constant.TrainJob {
return
}
trainJob, err := s.data.TrainJobDao.GetTrainJob(context.TODO(), job.Name)
if err != nil {
s.log.Error(context.TODO(), "GetTrainJob err when onJobDelete:"+job.Name, err)
return
}
detail := jobUtil.GetStopDetail(trainJob.Detail)
detailBuf, err := json.Marshal(detail)
if err != nil {
s.log.Error(context.TODO(), "Marshal err when onJobDelete:"+job.Name, err)
}
newJob := &model.TrainJob{
Id: job.Name,
Detail: string(detailBuf),
}
if !utils.IsCompletedState(trainJob.Status) {
newJob.Status = constant.STOPPED
}
err = s.data.TrainJobDao.UpdateTrainJob(context.TODO(), newJob)
if err != nil {
s.log.Error(context.TODO(), "UpdateTrainJob err when onJobDelete:"+job.Name, err)
}
}

func (s *trainJobService) onJobUpdate(old, obj interface{}) {
@@ -1207,5 +1241,9 @@ func (s *trainJobService) onJobUpdate(old, obj interface{}) {
if err != nil {
s.log.Error(context.TODO(), err)
}
err = s.data.Cluster.DeleteJob(context.TODO(), newjob.Namespace, newjob.Name)
if err != nil {
s.log.Error(context.TODO(), "DeleteJob err when onJobUpdate:"+newjob.Name, err)
}
}
}

+ 0
- 2
server/volcano/pkg/controllers/job/plugins/svc/svc.go View File

@@ -131,7 +131,6 @@ func (sp *servicePlugin) OnJobAdd(job *batch.Job) error {
}

hostFile := GenerateHosts(job)
fmt.Println("bbbbbbbbbbbbbbbbbbbbb", job.Name)
// Create ConfigMap of hosts for Pods to mount.
if err := helpers.CreateOrUpdateConfigMap(job, sp.Clientset.KubeClients, hostFile, sp.cmName(job)); err != nil {
return err
@@ -181,7 +180,6 @@ func (sp *servicePlugin) OnJobDelete(job *batch.Job) error {

func (sp *servicePlugin) OnJobUpdate(job *batch.Job) error {
hostFile := GenerateHosts(job)
fmt.Println("aaaaaaaaaaaaaaaaaaaaaaaa", job.Name)
// updates ConfigMap of hosts for Pods to mount.
if err := helpers.CreateOrUpdateConfigMap(job, sp.Clientset.KubeClients, hostFile, sp.cmName(job)); err != nil {
return err


Loading…
Cancel
Save