@@ -6,6 +6,7 @@ import (
"io/ioutil"
api "server/base-server/api/v1"
"server/base-server/internal/common"
jobUtil "server/base-server/internal/common"
"server/base-server/internal/conf"
"server/base-server/internal/data"
"server/base-server/internal/data/dao/model"
@@ -667,9 +668,8 @@ func (s *trainJobService) StopJob(ctx context.Context, req *api.StopJobRequest)
return nil, err
}
err = s.data.Cluster.DeleteJob(ctx, job.UserId, req.Id)
if err != nil {
return nil, err
if utils.IsCompletedState(job.Status) {
return nil, errors.Errorf(nil, errors.ErrorStopTerminatedJob)
}
now := time.Now()
@@ -688,6 +688,11 @@ func (s *trainJobService) StopJob(ctx context.Context, req *api.StopJobRequest)
s.log.Error(ctx, err)
}
err = s.data.Cluster.DeleteJob(ctx, job.UserId, req.Id)
if err != nil {
s.log.Error(ctx, err)
}
return &api.StopJobReply{StoppedAt: time.Now().Unix()}, nil
}
@@ -1108,9 +1113,7 @@ func (s *trainJobService) getJobDetail(ctx context.Context, jobID string) (*type
}
func defaultDetail(trainJob *model.TrainJob) *typeJob.JobStatusDetail {
status := constant.PREPARING
if trainJob.Status == constant.STOPPED ||
constant.SUSPENDED == trainJob.Status ||
constant.FAILED == trainJob.Status {
@@ -1133,7 +1136,38 @@ func (s *trainJobService) onJobAdd(obj interface{}) {
}
func (s *trainJobService) onJobDelete(obj interface{}) {
job := utils.ConvertObjToOtjob(obj)
if job == nil {
return
}
if job.Annotations == nil {
return
}
jobType, found := job.Annotations[constant.JOB_TYPE]
if !found || jobType != constant.TrainJob {
return
}
trainJob, err := s.data.TrainJobDao.GetTrainJob(context.TODO(), job.Name)
if err != nil {
s.log.Error(context.TODO(), "GetTrainJob err when onJobDelete:"+job.Name, err)
return
}
detail := jobUtil.GetStopDetail(trainJob.Detail)
detailBuf, err := json.Marshal(detail)
if err != nil {
s.log.Error(context.TODO(), "Marshal err when onJobDelete:"+job.Name, err)
}
newJob := &model.TrainJob{
Id: job.Name,
Detail: string(detailBuf),
}
if !utils.IsCompletedState(trainJob.Status) {
newJob.Status = constant.STOPPED
}
err = s.data.TrainJobDao.UpdateTrainJob(context.TODO(), newJob)
if err != nil {
s.log.Error(context.TODO(), "UpdateTrainJob err when onJobDelete:"+job.Name, err)
}
}
func (s *trainJobService) onJobUpdate(old, obj interface{}) {
@@ -1207,5 +1241,9 @@ func (s *trainJobService) onJobUpdate(old, obj interface{}) {
if err != nil {
s.log.Error(context.TODO(), err)
}
err = s.data.Cluster.DeleteJob(context.TODO(), newjob.Namespace, newjob.Name)
if err != nil {
s.log.Error(context.TODO(), "DeleteJob err when onJobUpdate:"+newjob.Name, err)
}
}
}