diff --git a/entity/cluster.go b/entity/cluster.go index 22b133f4a0..322ea69be5 100644 --- a/entity/cluster.go +++ b/entity/cluster.go @@ -25,7 +25,7 @@ type NoteBookTask struct { AutoStopDuration int64 Name string Capacity int - CenterID []string + Queues []models.ResourceQueue Code []ContainerData Datasets []ContainerData PreTrainModel []ContainerData @@ -73,6 +73,7 @@ type QueryTaskResponse struct { Token string `json:"token"` CenterId string `json:"center_id"` CenterName string `json:"center_name"` + QueueCode string `json:"queue_code"` CodeUrl string `json:"code_url"` DataUrl string `json:"data_url"` ContainerIP string `json:"container_ip"` @@ -108,6 +109,7 @@ func ConvertGrampusNotebookResponse(job models.GrampusNotebookInfo) *QueryTaskRe DetailedStatus: job.DetailedStatus, CenterId: centerId, CenterName: centerName, + QueueCode: task.PoolId, Url: url, Token: token, JobId: job.JobID, @@ -212,20 +214,20 @@ type ClusterLog struct { } type TrainTask struct { - Command string `json:"command"` - Name string `json:"name"` - ImageId string `json:"imageId"` - ImageUrl string `json:"imageUrl"` - ResourceSpecId string `json:"resourceSpecId"` - CenterID []string `json:"centerID"` - ReplicaNum int `json:"replicaNum"` - Datasets []ContainerData `json:"datasets"` - PreTrainModel []ContainerData `json:"models"` - Code []ContainerData `json:"code"` - BootFile string `json:"bootFile"` - OutPut []ContainerData `json:"output"` - LogPath []ContainerData `json:"logPath"` - PoolId string `json:"poolId"` + Command string `json:"command"` + Name string `json:"name"` + ImageId string `json:"imageId"` + ImageUrl string `json:"imageUrl"` + ResourceSpecId string `json:"resourceSpecId"` + Queues []models.ResourceQueue `json:"centerID"` + ReplicaNum int `json:"replicaNum"` + Datasets []ContainerData `json:"datasets"` + PreTrainModel []ContainerData `json:"models"` + Code []ContainerData `json:"code"` + BootFile string `json:"bootFile"` + OutPut []ContainerData `json:"output"` + LogPath []ContainerData `json:"logPath"` + PoolId string `json:"poolId"` Params models.Parameters Spec *models.Specification RepoName string diff --git a/models/card_request.go b/models/card_request.go index 029213c8ad..71987e2419 100644 --- a/models/card_request.go +++ b/models/card_request.go @@ -143,6 +143,8 @@ type RequestSpecInfo struct { AiCenterCode string AiCenterName string QueueCode string + QueueName string + QueueType string QueueId int64 ComputeResource string AccCardType string @@ -402,7 +404,8 @@ func SearchCardRequest(opts *CardRequestOptions) (int64, []*CardRequestSpecRes, "card_request_spec.request_id", "resource_queue.cluster", "resource_queue.ai_center_code", "resource_queue.acc_card_type", "resource_queue.id as queue_id", "resource_queue.compute_resource", - "resource_queue.queue_code", "resource_queue.ai_center_name", + "resource_queue.queue_code", "resource_queue.queue_name", + "resource_queue.queue_type", "resource_queue.ai_center_name", "resource_queue.has_internet", ).In("card_request_spec.request_id", requestIds). Join("INNER", "card_request_spec", "card_request_spec.spec_id = resource_specification.id"). diff --git a/models/cloudbrain.go b/models/cloudbrain.go index af58ad0d5a..c8746468fc 100755 --- a/models/cloudbrain.go +++ b/models/cloudbrain.go @@ -271,6 +271,7 @@ type Cloudbrain struct { EngineID int64 //引擎id ImageID string //grampus image_id AiCenter string //grampus ai center: center_id+center_name + QueueCode string FailedReason string `xorm:"text"` TrainUrl string //输出模型的obs路径 @@ -1869,9 +1870,24 @@ type GrampusNotebookInfo struct { UserID string `json:"userId"` Tasks []GrampusNotebookTask `json:"tasks"` } + +const ( + GrampusNetAccess = "1" + GrampusNotNetAccess = "2" + + GrampusPoolTypePublic = "1" + GrampusPoolTypeExclusive = "2" +) + type Center struct { - ID string `json:"id"` - Name string `json:"name"` + ID string `json:"id"` + Name string `json:"name"` + ResourceSpec []struct { + ID string `json:"id"` + PoolType string `json:"poolType"` + Name string `json:"name"` + IsNetAccess string `json:"isNetAccess"` + } `json:"resourceSpec"` } type GrampusSpec struct { CreatedAt int64 `json:"createdAt"` @@ -1925,9 +1941,21 @@ type GetGrampusResourceSpecsResult struct { type GetGrampusAiCentersResult struct { GrampusResult - Infos []GrampusAiCenter `json:"aiCenterInfos"` - TotalSize int `json:"totalSize"` + Infos []GrampusAiCenter `json:"aiCenterInfos"` } + +type GrampusResourceQueue struct { + QueueCode string + QueueName string + QueueType string + AiCenterCode string + AiCenterName string + ComputeResource string + AccCardType string + HasInternet int //0 unknown;1 no internet;2 has internet + +} + type AICenterImage struct { AICenterID string `json:"aiCenterId"` ImageUrl string `json:"imageUrl"` @@ -2028,6 +2056,7 @@ type GrampusNotebookTask struct { Capacity int `json:"capacity"` CenterID []string `json:"centerID"` CenterName []string `json:"centerName"` + PoolId string `json:"poolId"` Code GrampusDataset `json:"code"` Datasets []GrampusDataset `json:"datasets"` OutPut GrampusDataset `json:"output"` diff --git a/models/cloudbrain_spec.go b/models/cloudbrain_spec.go index 0d1217286f..30e3bd2144 100644 --- a/models/cloudbrain_spec.go +++ b/models/cloudbrain_spec.go @@ -18,6 +18,8 @@ type CloudbrainSpec struct { UnitPrice int QueueId int64 QueueCode string + QueueName string + QueueType string Cluster string HasInternet int AiCenterCode string `xorm:"index"` @@ -163,6 +165,8 @@ func UpdateCloudbrainSpec(cloudbrainId int64, s *Specification) (int64, error) { UnitPrice: s.UnitPrice, QueueId: s.QueueId, QueueCode: s.QueueCode, + QueueName: s.QueueName, + QueueType: s.QueueType, Cluster: s.Cluster, AiCenterCode: s.AiCenterCode, AiCenterName: s.AiCenterName, diff --git a/models/models.go b/models/models.go index 245f7844b2..c3d9128568 100755 --- a/models/models.go +++ b/models/models.go @@ -177,6 +177,7 @@ func init() { new(ModelartsDeploy), new(ModelartsDeployQueue), new(CloudbrainConfig), + new(ResourceExclusivePool), new(CardRequest), new(CardRequestSpec), ) diff --git a/models/resource_queue.go b/models/resource_queue.go index 6a97fddd6d..51fbc4b0ef 100644 --- a/models/resource_queue.go +++ b/models/resource_queue.go @@ -2,18 +2,23 @@ package models import ( "code.gitea.io/gitea/modules/log" - "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/timeutil" - "encoding/json" "errors" "strconv" "strings" "xorm.io/builder" ) +const ( + QueueTypePublic = "public" + QueueTypeExclusive = "exclusive" +) + type ResourceQueue struct { ID int64 `xorm:"pk autoincr"` QueueCode string + QueueName string + QueueType string Cluster string `xorm:"notnull"` AiCenterCode string AiCenterName string @@ -34,6 +39,8 @@ func (r ResourceQueue) ConvertToRes() *ResourceQueueRes { return &ResourceQueueRes{ ID: r.ID, QueueCode: r.QueueCode, + QueueName: r.QueueName, + QueueType: r.QueueType, Cluster: r.Cluster, AiCenterCode: r.AiCenterCode, AiCenterName: r.AiCenterName, @@ -57,6 +64,8 @@ type ResourceQueueReq struct { CreatorId int64 IsAutomaticSync bool Remark string + QueueName string + QueueType string } func (r ResourceQueueReq) ToDTO() ResourceQueue { @@ -72,6 +81,8 @@ func (r ResourceQueueReq) ToDTO() ResourceQueue { Remark: r.Remark, CreatedBy: r.CreatorId, UpdatedBy: r.CreatorId, + QueueName: r.QueueName, + QueueType: r.QueueType, } if r.Cluster == OpenICluster { if r.AiCenterCode == AICenterOfCloudBrainOne { @@ -92,6 +103,7 @@ type SearchResourceQueueOptions struct { ComputeResource string AccCardType string HasInternet SpecInternetQuery + QueueType string } type ResourceQueueListRes struct { @@ -102,6 +114,8 @@ type ResourceQueueListRes struct { type ResourceQueueCodesRes struct { ID int64 QueueCode string + QueueName string + QueueType string Cluster string AiCenterCode string AiCenterName string @@ -136,6 +150,8 @@ func NewResourceQueueListRes(totalSize int64, list []ResourceQueue) *ResourceQue type ResourceQueueRes struct { ID int64 QueueCode string + QueueType string + QueueName string Cluster string AiCenterCode string AiCenterName string @@ -155,7 +171,7 @@ func UpdateResourceQueueById(queueId int64, queue ResourceQueue) (int64, error) return x.ID(queueId).Update(&queue) } func UpdateResourceCardsTotalNumAndInternetStatus(queueId int64, queue ResourceQueue) (int64, error) { - return x.ID(queueId).Cols("cards_total_num", "remark", "has_internet").Update(&queue) + return x.ID(queueId).Cols("cards_total_num", "remark", "has_internet", "queue_type", "queue_name").Update(&queue) } func SearchResourceQueue(opts SearchResourceQueueOptions) (int64, []ResourceQueue, error) { @@ -180,6 +196,9 @@ func SearchResourceQueue(opts SearchResourceQueueOptions) (int64, []ResourceQueu } else if opts.HasInternet == QueryHasInternetSpecs { cond = cond.And(builder.Eq{"has_internet": HasInternet}) } + if opts.QueueType != "" { + cond = cond.And(builder.Eq{"queue_type": opts.QueueType}) + } n, err := x.Where(cond).Unscoped().Count(&ResourceQueue{}) if err != nil { return 0, nil, err @@ -366,39 +385,15 @@ func GetResourceAiCenters() ([]ResourceAiCenterRes, error) { return r, nil } -type SpecificationSpecialQueueConfig struct { - SpecialQueues []SpecialQueue `json:"special_queues"` -} - -type SpecialQueue struct { - OrgName string `json:"org_name"` - JobType string `json:"job_type"` - Cluster string `json:"cluster"` - QueueId int64 `json:"queue_id"` - ComputeResource string `json:"compute_resource"` -} - -var specialQueueConfig SpecificationSpecialQueueConfig -var specialQueueConfigFlag = false - -func GetSpecialQueueConfig() SpecificationSpecialQueueConfig { - if !specialQueueConfigFlag { - if err := json.Unmarshal([]byte(setting.SPECIFICATION_SPECIAL_QUEUE), &specialQueueConfig); err != nil { - log.Error("json.Unmarshal specialQueueConfig error.%v", err) - } - specialQueueConfigFlag = true - } - return specialQueueConfig -} - -func GetSpecialQueueIds(opts FindSpecsOptions) []SpecialQueue { - config := GetSpecialQueueConfig() - if len(config.SpecialQueues) == 0 { - return []SpecialQueue{} +func GetExclusiveQueueIds(opts FindSpecsOptions) []*ResourceExclusivePool { + pools, err := FindExclusivePools() + if err != nil { + log.Error("GetSpecialQueueIds FindSpecialQueueConfig err.%v", err) + return nil } - queues := make([]SpecialQueue, 0) - for _, queue := range config.SpecialQueues { + queues := make([]*ResourceExclusivePool, 0) + for _, queue := range pools { if queue.JobType != string(opts.JobType) { continue } @@ -414,18 +409,22 @@ func GetSpecialQueueIds(opts FindSpecsOptions) []SpecialQueue { return queues } -func IsUserInSpecialPool(userId int64) bool { +func IsUserInExclusivePool(userId int64) bool { userOrgs, err := GetOrgsByUserID(userId, true) if err != nil { log.Error("GetSpecialQueueIds GetOrgsByUserID error.%v", err) return false } - config := GetSpecialQueueConfig() - if len(config.SpecialQueues) == 0 { + pools, err := FindExclusivePools() + if err != nil { + log.Error("IsUserInSpecialPool FindExclusivePools err.%v", err) + return false + } + if len(pools) == 0 { return false } for _, org := range userOrgs { - for _, queue := range config.SpecialQueues { + for _, queue := range pools { if strings.ToLower(org.Name) == strings.ToLower(queue.OrgName) { return true } diff --git a/models/resource_scene.go b/models/resource_scene.go index ee040a85e6..fd5ca70920 100644 --- a/models/resource_scene.go +++ b/models/resource_scene.go @@ -1,8 +1,10 @@ package models import ( + "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/timeutil" "errors" + "strings" "xorm.io/builder" ) @@ -11,18 +13,29 @@ const ( NotExclusive ) +const ( + SceneTypePublic = "public" + SceneTypeExclusive = "exclusive" + + SpecPublic = "public" + SpecExclusive = "exclusive" +) + type ResourceScene struct { - ID int64 `xorm:"pk autoincr"` - SceneName string - JobType string - IsExclusive bool - ExclusiveOrg string - CreatedTime timeutil.TimeStamp `xorm:"created"` - CreatedBy int64 - UpdatedTime timeutil.TimeStamp `xorm:"updated"` - UpdatedBy int64 - DeleteTime timeutil.TimeStamp `xorm:"deleted"` - DeletedBy int64 + ID int64 `xorm:"pk autoincr"` + SceneName string + JobType string + Cluster string + ComputeResource string + IsSpecExclusive string + SceneType string //共享或者独占场景 + ExclusiveOrg string + CreatedTime timeutil.TimeStamp `xorm:"created"` + CreatedBy int64 + UpdatedTime timeutil.TimeStamp `xorm:"updated"` + UpdatedBy int64 + DeleteTime timeutil.TimeStamp `xorm:"deleted"` + DeletedBy int64 } type ResourceSceneSpec struct { @@ -32,26 +45,47 @@ type ResourceSceneSpec struct { CreatedTime timeutil.TimeStamp `xorm:"created"` } +type ResourceExclusivePool struct { + ID int64 `xorm:"pk autoincr"` + SceneId int64 + OrgName string + JobType string + Cluster string + QueueId int64 + ComputeResource string + CreatedTime timeutil.TimeStamp `xorm:"created"` + CreatedBy int64 + UpdatedTime timeutil.TimeStamp `xorm:"updated"` + UpdatedBy int64 + DeleteTime timeutil.TimeStamp `xorm:"deleted"` + DeletedBy int64 +} + type ResourceSceneReq struct { - ID int64 - SceneName string - JobType string - IsExclusive bool - ExclusiveOrg string - CreatorId int64 - SpecIds []int64 + ID int64 + SceneName string + JobType string + SceneType string + Cluster string + Resource string + ExclusiveQueueIds []int64 + IsSpecExclusive string + ExclusiveOrg string + CreatorId int64 + SpecIds []int64 } type SearchResourceSceneOptions struct { ListOptions JobType string - IsExclusive int + IsSpecExclusive string AiCenterCode string QueueId int64 ComputeResource string AccCardType string Cluster string HasInternet SpecInternetQuery + SceneType string } type ResourceSceneListRes struct { @@ -67,12 +101,15 @@ func NewResourceSceneListRes(totalSize int64, list []ResourceSceneRes) *Resource } type ResourceSceneRes struct { - ID int64 - SceneName string - JobType JobType - IsExclusive bool - ExclusiveOrg string - Specs []ResourceSpecInfo + ID int64 + SceneName string + JobType JobType + IsSpecExclusive string + Cluster string + ComputeResource string + SceneType string //共享或者独占场景 + ExclusiveOrg string + Specs []ResourceSpecInfo } func (ResourceSceneRes) TableName() string { @@ -105,6 +142,8 @@ type ResourceSpecInfo struct { AiCenterCode string AiCenterName string QueueCode string + QueueType string + QueueName string QueueId int64 ComputeResource string AccCardType string @@ -119,6 +158,11 @@ func InsertResourceScene(r ResourceSceneReq) error { sess := x.NewSession() defer sess.Close() + err := sess.Begin() + if err != nil { + log.Error("InsertResourceScene start transaction err. %v", err) + return err + } //check specs := make([]ResourceSpecification, 0) cond := builder.In("id", r.SpecIds) @@ -130,34 +174,68 @@ func InsertResourceScene(r ResourceSceneReq) error { } rs := ResourceScene{ - SceneName: r.SceneName, - JobType: r.JobType, - IsExclusive: r.IsExclusive, - ExclusiveOrg: r.ExclusiveOrg, - CreatedBy: r.CreatorId, - UpdatedBy: r.CreatorId, - } - _, err := sess.InsertOne(&rs) + SceneName: r.SceneName, + JobType: r.JobType, + IsSpecExclusive: r.IsSpecExclusive, + SceneType: r.SceneType, + Cluster: r.Cluster, + ComputeResource: r.Resource, + ExclusiveOrg: r.ExclusiveOrg, + CreatedBy: r.CreatorId, + UpdatedBy: r.CreatorId, + } + _, err = sess.InsertOne(&rs) if err != nil { sess.Rollback() return err } - if len(r.SpecIds) == 0 { - return sess.Commit() - } - rss := make([]ResourceSceneSpec, len(r.SpecIds)) - for i, v := range r.SpecIds { - rss[i] = ResourceSceneSpec{ - SceneId: rs.ID, - SpecId: v, + if len(r.SpecIds) > 0 { + rss := make([]ResourceSceneSpec, len(r.SpecIds)) + for i, v := range r.SpecIds { + rss[i] = ResourceSceneSpec{ + SceneId: rs.ID, + SpecId: v, + } + } + + _, err = sess.Insert(&rss) + if err != nil { + sess.Rollback() + return err } } - _, err = sess.Insert(&rss) - if err != nil { - sess.Rollback() - return err + if r.SceneType == SceneTypeExclusive && r.ExclusiveOrg != "" && len(r.SpecIds) > 0 { + pools := make([]ResourceExclusivePool, 0) + queueIds := make([]int64, 0) + err = sess.Table("resource_specification").Distinct("queue_id").In("id", r.SpecIds).Find(&queueIds) + if err != nil { + sess.Rollback() + return err + } + for _, org := range strings.Split(r.ExclusiveOrg, ";") { + if org == "" { + continue + } + for _, id := range queueIds { + pools = append(pools, ResourceExclusivePool{ + SceneId: rs.ID, + OrgName: org, + JobType: r.JobType, + Cluster: r.Cluster, + QueueId: id, + ComputeResource: r.Resource, + CreatedBy: r.CreatorId, + UpdatedBy: r.CreatorId, + }) + } + } + _, err = sess.Insert(pools) + if err != nil { + sess.Rollback() + return err + } } return sess.Commit() @@ -172,7 +250,11 @@ func UpdateResourceScene(r ResourceSceneReq) error { } sess.Close() }() - + err = sess.Begin() + if err != nil { + log.Error("UpdateResourceScene start transaction err. %v", err) + return err + } // find old scene old := ResourceScene{} if has, _ := sess.ID(r.ID).Get(&old); !has { @@ -190,36 +272,69 @@ func UpdateResourceScene(r ResourceSceneReq) error { //update scene rs := ResourceScene{ - SceneName: r.SceneName, - IsExclusive: r.IsExclusive, - ExclusiveOrg: r.ExclusiveOrg, + SceneName: r.SceneName, + IsSpecExclusive: r.IsSpecExclusive, + ExclusiveOrg: r.ExclusiveOrg, + SceneType: r.SceneType, } - if _, err = sess.ID(r.ID).UseBool("is_exclusive").Update(&rs); err != nil { + if _, err = sess.ID(r.ID).UseBool("is_spec_exclusive").Update(&rs); err != nil { return err } //delete scene spec relation if _, err = sess.Where("scene_id = ? ", r.ID).Delete(&ResourceSceneSpec{}); err != nil { - sess.Rollback() return err } - if len(r.SpecIds) == 0 { - return sess.Commit() - } - //build new scene spec relation - rss := make([]ResourceSceneSpec, len(r.SpecIds)) - for i, v := range r.SpecIds { - rss[i] = ResourceSceneSpec{ - SceneId: r.ID, - SpecId: v, + if len(r.SpecIds) > 0 { + //build new scene spec relation + rss := make([]ResourceSceneSpec, len(r.SpecIds)) + for i, v := range r.SpecIds { + rss[i] = ResourceSceneSpec{ + SceneId: r.ID, + SpecId: v, + } + } + if _, err = sess.Insert(&rss); err != nil { + return err } + } - if _, err = sess.Insert(&rss); err != nil { - sess.Rollback() + + if _, err = sess.Where("scene_id = ? ", r.ID).Delete(&ResourceExclusivePool{}); err != nil { return err } + if r.SceneType == SceneTypeExclusive && r.ExclusiveOrg != "" && len(r.SpecIds) > 0 { + pools := make([]ResourceExclusivePool, 0) + queueIds := make([]int64, 0) + err = sess.Table("resource_specification").Distinct("queue_id").In("id", r.SpecIds).Find(&queueIds) + if err != nil { + return err + } + for _, org := range strings.Split(r.ExclusiveOrg, ";") { + if org == "" { + continue + } + for _, id := range queueIds { + pools = append(pools, ResourceExclusivePool{ + SceneId: r.ID, + OrgName: org, + JobType: r.JobType, + Cluster: r.Cluster, + QueueId: id, + ComputeResource: r.Resource, + CreatedBy: r.CreatorId, + UpdatedBy: r.CreatorId, + }) + } + } + _, err = sess.Insert(pools) + if err != nil { + return err + } + } + return sess.Commit() } @@ -232,6 +347,11 @@ func DeleteResourceScene(sceneId int64) error { } sess.Close() }() + err = sess.Begin() + if err != nil { + log.Error("DeleteResourceScene start transaction err. %v", err) + return err + } if _, err = sess.ID(sceneId).Delete(&ResourceScene{}); err != nil { return err @@ -239,6 +359,9 @@ func DeleteResourceScene(sceneId int64) error { if _, err = sess.Where("scene_id = ? ", sceneId).Delete(&ResourceSceneSpec{}); err != nil { return err } + if _, err = sess.Where("scene_id = ? ", sceneId).Delete(&ResourceExclusivePool{}); err != nil { + return err + } return sess.Commit() } @@ -250,10 +373,8 @@ func SearchResourceScene(opts SearchResourceSceneOptions) (int64, []ResourceScen if opts.JobType != "" { cond = cond.And(builder.Eq{"resource_scene.job_type": opts.JobType}) } - if opts.IsExclusive == Exclusive { - cond = cond.And(builder.Eq{"resource_scene.is_exclusive": 1}) - } else if opts.IsExclusive == NotExclusive { - cond = cond.And(builder.Eq{"resource_scene.is_exclusive": 0}) + if opts.IsSpecExclusive != "" { + cond = cond.And(builder.Eq{"resource_scene.is_spec_exclusive": opts.IsSpecExclusive}) } if opts.AiCenterCode != "" { cond = cond.And(builder.Eq{"resource_queue.ai_center_code": opts.AiCenterCode}) @@ -275,9 +396,12 @@ func SearchResourceScene(opts SearchResourceSceneOptions) (int64, []ResourceScen } else if opts.HasInternet == QueryNoInternetSpecs { cond = cond.And(builder.Eq{"resource_queue.has_internet": NoInternet}) } + if opts.SceneType != "" { + cond = cond.And(builder.Eq{"resource_scene.scene_type": opts.SceneType}) + } cond = cond.And(builder.NewCond().Or(builder.Eq{"resource_scene.delete_time": 0}).Or(builder.IsNull{"resource_scene.delete_time"})) - cols := []string{"resource_scene.id", "resource_scene.scene_name", "resource_scene.job_type", "resource_scene.is_exclusive", - "resource_scene.exclusive_org"} + cols := []string{"resource_scene.id", "resource_scene.scene_name", "resource_scene.job_type", "resource_scene.is_spec_exclusive", + "resource_scene.exclusive_org", "resource_scene.scene_type", "resource_scene.cluster", "resource_scene.compute_resource"} count, err := x.Where(cond). Distinct("resource_scene.id"). Join("INNER", "resource_scene_spec", "resource_scene_spec.scene_id = resource_scene.id"). @@ -302,6 +426,7 @@ func SearchResourceScene(opts SearchResourceSceneOptions) (int64, []ResourceScen if len(r) == 0 { return 0, r, err } + //find related specs sceneIds := make([]int64, 0, len(r)) for _, v := range r { @@ -319,7 +444,7 @@ func SearchResourceScene(opts SearchResourceSceneOptions) (int64, []ResourceScen "resource_queue.ai_center_code", "resource_queue.acc_card_type", "resource_queue.id as queue_id", "resource_queue.compute_resource", "resource_queue.queue_code", "resource_queue.ai_center_name", - "resource_queue.has_internet", + "resource_queue.has_internet", "resource_queue.queue_name", "resource_queue.queue_type", ).In("resource_scene_spec.scene_id", sceneIds). Join("INNER", "resource_scene_spec", "resource_scene_spec.spec_id = resource_specification.id"). Join("INNER", "resource_queue", "resource_queue.ID = resource_specification.queue_id"). @@ -347,3 +472,17 @@ func SearchResourceScene(opts SearchResourceSceneOptions) (int64, []ResourceScen return count, r, nil } + +func FindExclusivePools() ([]*ResourceExclusivePool, error) { + sq := make([]*ResourceExclusivePool, 0) + + err := x.Find(&sq) + if err != nil { + return nil, err + } + return sq, nil +} + +func InsertExclusivePools(queue []ResourceExclusivePool) (int64, error) { + return x.Insert(&queue) +} diff --git a/models/resource_specification.go b/models/resource_specification.go index 5e493ee81a..ea451e3f0d 100644 --- a/models/resource_specification.go +++ b/models/resource_specification.go @@ -193,6 +193,8 @@ func (r ResourceSpecAndQueue) ConvertToResourceSpecInfo() *ResourceSpecInfo { AiCenterCode: r.AiCenterCode, AiCenterName: r.AiCenterName, QueueCode: r.QueueCode, + QueueType: r.QueueType, + QueueName: r.QueueName, QueueId: r.QueueId, ComputeResource: r.ComputeResource, AccCardType: r.AccCardType, @@ -223,6 +225,7 @@ type FindSpecsOptions struct { RequestAll bool SpecStatus int HasInternet SpecInternetQuery //0 all,1 no internet,2 has internet + SceneType string } type Specification struct { @@ -238,48 +241,37 @@ type Specification struct { UnitPrice int QueueId int64 QueueCode string + QueueName string + QueueType string HasInternet int Cluster string AiCenterCode string AiCenterName string IsExclusive bool ExclusiveOrg string - //specs that have the same sourceSpecId, computeResource and cluster as current spec - RelatedSpecs []*Specification } func (Specification) TableName() string { return "resource_specification" } -func (s *Specification) loadRelatedSpecs(jobType JobType, hasInternet SpecInternetQuery) { - if s.RelatedSpecs != nil { - return - } +func (s *Specification) findRelatedSpecs(opts FindSpecsOptions, userId int64) []*Specification { defaultSpecs := make([]*Specification, 0) if s.SourceSpecId == "" { - s.RelatedSpecs = defaultSpecs - return + return defaultSpecs } - //是否需要网络的调度策略如下: - //需要联网时只能调度到有网的分中心;不需要联网时可以调度到所有的分中心 - if hasInternet == QueryNoInternetSpecs { - hasInternet = QueryAllSpecs + isUserSpecial := IsUserInExclusivePool(userId) + if isUserSpecial { + opts.SceneType = SceneTypeExclusive + } else { + opts.SceneType = SceneTypePublic } - r, err := FindSpecs(FindSpecsOptions{ - ComputeResource: s.ComputeResource, - Cluster: s.Cluster, - SourceSpecId: s.SourceSpecId, - RequestAll: false, - SpecStatus: SpecOnShelf, - JobType: jobType, - HasInternet: hasInternet, - }) + + r, err := FindSpecs(opts) if err != nil { - s.RelatedSpecs = defaultSpecs - return + return defaultSpecs } - s.RelatedSpecs = r + return r } func (s *Specification) ToShowString() string { @@ -297,6 +289,22 @@ func (s *Specification) ToShowString() string { return specName } +func (s *Specification) ParseResourceQueue() ResourceQueue { + return ResourceQueue{ + ID: s.QueueId, + QueueCode: s.QueueCode, + QueueName: s.QueueName, + QueueType: s.QueueType, + Cluster: s.Cluster, + AiCenterCode: s.AiCenterCode, + AiCenterName: s.AiCenterName, + ComputeResource: s.ComputeResource, + AccCardType: s.AccCardType, + CardsTotalNum: s.AccCardsNum, + HasInternet: s.HasInternet, + } +} + func GetAvailableCenterIdsByASpec(ID int64) ([]string, error) { spec, err := GetResourceSpecification(&ResourceSpecification{ ID: ID}) @@ -321,14 +329,39 @@ type GetAvailableCenterIdOpts struct { } func (s *Specification) GetAvailableCenterIds(opts GetAvailableCenterIdOpts) []string { - s.loadRelatedSpecs(opts.JobType, opts.HasInternet) + queues := s.GetAvailableQueues(opts) + centerIds := make([]string, 0) + for _, v := range queues { + centerIds = append(centerIds, v.AiCenterCode) + } + return centerIds +} + +func (s *Specification) GetAvailableQueues(opts GetAvailableCenterIdOpts) []ResourceQueue { + //是否需要网络的调度策略如下: + //需要联网时只能调度到有网的分中心;不需要联网时可以调度到所有的分中心 + hasInternet := opts.HasInternet + if hasInternet == QueryNoInternetSpecs { + hasInternet = QueryAllSpecs + } + + specOpts := FindSpecsOptions{ + ComputeResource: s.ComputeResource, + Cluster: s.Cluster, + SourceSpecId: s.SourceSpecId, + RequestAll: false, + SpecStatus: SpecOnShelf, + JobType: opts.JobType, + HasInternet: hasInternet, + } + relatedSpecs := s.findRelatedSpecs(specOpts, opts.UserId) - if len(s.RelatedSpecs) == 0 { - return make([]string, 0) + if len(relatedSpecs) == 0 { + return make([]ResourceQueue, 0) } //filter exclusive specs - specs := FilterExclusiveSpecs(s.RelatedSpecs, opts.UserId) + specs := FilterExclusiveSpecs(relatedSpecs, opts.UserId) specs = HandleSpecialQueues(specs, opts.UserId, FindSpecsOptions{ JobType: opts.JobType, @@ -336,11 +369,16 @@ func (s *Specification) GetAvailableCenterIds(opts GetAvailableCenterIdOpts) []s ComputeResource: s.ComputeResource, }) - centerIds := make([]string, len(specs)) - for i, v := range specs { - centerIds[i] = v.AiCenterCode + queueMap := make(map[int64]string, len(specs)) + queues := make([]ResourceQueue, 0) + for _, v := range specs { + if _, ok := queueMap[v.QueueId]; ok { + continue + } + queues = append(queues, v.ParseResourceQueue()) + queueMap[v.QueueId] = "" } - return centerIds + return queues } func FilterExclusiveSpecs(r []*Specification, userId int64) []*Specification { @@ -408,17 +446,17 @@ func HandleSpecialQueues(specs []*Specification, userId int64, opts FindSpecsOpt if len(specs) == 0 { return specs } - isUserInSpecialPool := IsUserInSpecialPool(userId) + isUserInSpecialPool := IsUserInExclusivePool(userId) if isUserInSpecialPool { - specs = handleSpecialUserSpecs(specs, userId, opts) + specs = handleExclusiveUserSpecs(specs, userId, opts) } else { specs = handleNormalUserSpecs(specs, opts) } return specs } -func handleSpecialUserSpecs(specs []*Specification, userId int64, opts FindSpecsOptions) []*Specification { - specialQueues := GetSpecialQueueIds(opts) +func handleExclusiveUserSpecs(specs []*Specification, userId int64, opts FindSpecsOptions) []*Specification { + specialQueues := GetExclusiveQueueIds(opts) userOrgs, err := GetOrgsByUserID(userId, true) if err != nil { log.Error("handleSpecialUserSpecs GetOrgsByUserID error.%v", err) @@ -437,7 +475,7 @@ func handleSpecialUserSpecs(specs []*Specification, userId int64, opts FindSpecs } func handleNormalUserSpecs(specs []*Specification, opts FindSpecsOptions) []*Specification { - queues := GetSpecialQueueIds(opts) + queues := GetExclusiveQueueIds(opts) queueIds := make([]int64, 0) for _, queue := range queues { queueIds = append(queueIds, queue.QueueId) @@ -687,6 +725,9 @@ func FindSpecs(opts FindSpecsOptions) ([]*Specification, error) { } else if opts.HasInternet == QueryHasInternetSpecs { cond = cond.And(builder.Eq{"resource_queue.has_internet": HasInternet}) } + if opts.SceneType != "" { + cond = cond.And(builder.Eq{"resource_scene.scene_type": opts.SceneType}) + } r := make([]*Specification, 0) s := x.Where(cond). diff --git a/modules/grampus/resty.go b/modules/grampus/resty.go index e48e7bc08f..ed358e4ad7 100755 --- a/modules/grampus/resty.go +++ b/modules/grampus/resty.go @@ -238,11 +238,11 @@ func GetResourceSpecs(processorType string) (*models.GetGrampusResourceSpecsResu retry := 0 sendjob: - _, err := client.R(). + res, err := client.R(). SetAuthToken(TOKEN). SetResult(&result). Get(HOST + urlGetResourceSpecs + "?processorType=" + processorType) - + log.Info("%+v", res) if err != nil { return nil, fmt.Errorf("resty GetResourceSpecs: %v", err) } @@ -503,6 +503,53 @@ sendjob: return &result, nil } +func GetResourceQueue() ([]models.GrampusResourceQueue, error) { + res, err := GetResourceSpecs("") + if err != nil { + return nil, err + } + queueList := make([]models.GrampusResourceQueue, 0) + queueMap := make(map[string]string, 0) + for _, spec := range res.Infos { + for _, c := range spec.Centers { + centerId := c.ID + computeResource := models.ParseComputeResourceFormGrampus(spec.SpecInfo.AccDeviceKind) + if centerId == "" || computeResource == "" { + continue + } + for _, queue := range c.ResourceSpec { + queueCode := queue.ID + accCardType := strings.ToUpper(spec.SpecInfo.AccDeviceModel) + key := centerId + "_" + computeResource + "_" + accCardType + "_" + queueCode + if _, has := queueMap[key]; has { + continue + } + var hasInternet = int(models.NoInternet) + if queue.IsNetAccess == models.GrampusNetAccess { + hasInternet = int(models.HasInternet) + } + var queueType = models.QueueTypePublic + if queue.PoolType == models.GrampusPoolTypeExclusive { + queueType = models.QueueTypeExclusive + } + queueMap[key] = "" + queueList = append(queueList, models.GrampusResourceQueue{ + QueueCode: queueCode, + QueueName: queue.Name, + QueueType: queueType, + AiCenterCode: centerId, + AiCenterName: c.Name, + ComputeResource: computeResource, + AccCardType: accCardType, + HasInternet: hasInternet, + }) + } + + } + } + return queueList, nil +} + func GetDebugJobEvents(jobID string) (*models.GetGrampusDebugJobEventsResponse, error) { checkSetting() client := getRestyClient() diff --git a/routers/admin/resources.go b/routers/admin/resources.go index 4412fa285e..dbfd058c2f 100644 --- a/routers/admin/resources.go +++ b/routers/admin/resources.go @@ -41,18 +41,27 @@ func GetScenePage(ctx *context.Context) { func GetResourceQueueList(ctx *context.Context) { page := ctx.QueryInt("page") + pageSize := ctx.QueryInt("pageSize") cluster := ctx.Query("cluster") aiCenterCode := ctx.Query("center") computeResource := ctx.Query("resource") accCardType := ctx.Query("card") hasInternet := ctx.QueryInt("hasInternet") + queueType := ctx.Query("queueType") + + if pageSize > 1000 { + log.Error("GetResourceQueueList pageSize too large.") + ctx.JSON(http.StatusOK, response.ServerError("pageSize too large")) + return + } list, err := resource.GetResourceQueueList(models.SearchResourceQueueOptions{ - ListOptions: models.ListOptions{Page: page, PageSize: 10}, + ListOptions: models.ListOptions{Page: page, PageSize: pageSize}, Cluster: cluster, AiCenterCode: aiCenterCode, ComputeResource: computeResource, AccCardType: accCardType, HasInternet: models.SpecInternetQuery(hasInternet), + QueueType: queueType, }) if err != nil { log.Error("GetResourceQueueList error.%v", err) @@ -118,6 +127,7 @@ func SyncGrampusQueue(ctx *context.Context) { func GetResourceSpecificationList(ctx *context.Context) { page := ctx.QueryInt("page") + pageSize := ctx.QueryInt("pageSize") queue := ctx.QueryInt64("queue") status := ctx.QueryInt("status") cluster := ctx.Query("cluster") @@ -126,8 +136,14 @@ func GetResourceSpecificationList(ctx *context.Context) { computeResource := ctx.Query("resource") cardType := ctx.Query("cardType") hasInternet := ctx.QueryInt("hasInternet") + + if pageSize > 1000 { + log.Error("GetResourceSpecificationList pageSize too large.") + ctx.JSON(http.StatusOK, response.ServerError("pageSize too large")) + return + } list, err := resource.GetResourceSpecificationList(models.SearchResourceSpecificationOptions{ - ListOptions: models.ListOptions{Page: page, PageSize: 10}, + ListOptions: models.ListOptions{Page: page, PageSize: pageSize}, QueueId: queue, Status: status, Cluster: cluster, @@ -231,24 +247,33 @@ func SyncGrampusSpecs(ctx *context.Context) { func GetResourceSceneList(ctx *context.Context) { page := ctx.QueryInt("page") + pageSize := ctx.QueryInt("pageSize") jobType := ctx.Query("jobType") aiCenterCode := ctx.Query("center") queueId := ctx.QueryInt64("queue") - isExclusive := ctx.QueryInt("IsExclusive") + isExclusive := ctx.Query("isSpecExclusive") + sceneType := ctx.Query("sceneType") computeResource := ctx.Query("resource") cardType := ctx.Query("cardType") cluster := ctx.Query("cluster") hasInternet := ctx.QueryInt("hasInternet") + + if pageSize > 1000 { + log.Error("GetResourceSceneList pageSize too large.") + ctx.JSON(http.StatusOK, response.ServerError("pageSize too large")) + return + } list, err := resource.GetResourceSceneList(models.SearchResourceSceneOptions{ - ListOptions: models.ListOptions{Page: page, PageSize: 10}, + ListOptions: models.ListOptions{Page: page, PageSize: pageSize}, JobType: jobType, - IsExclusive: isExclusive, + IsSpecExclusive: isExclusive, AiCenterCode: aiCenterCode, QueueId: queueId, ComputeResource: computeResource, AccCardType: cardType, Cluster: cluster, HasInternet: models.SpecInternetQuery(hasInternet), + SceneType: sceneType, }) if err != nil { log.Error("GetResourceSceneList error.%v", err) diff --git a/services/ai_task_service/cluster/c2net.go b/services/ai_task_service/cluster/c2net.go index e9da8821f4..a82a8f6124 100644 --- a/services/ai_task_service/cluster/c2net.go +++ b/services/ai_task_service/cluster/c2net.go @@ -33,7 +33,11 @@ func init() { } func (c C2NetClusterAdapter) CreateNoteBook(req entity.CreateNoteBookTaskRequest) (*entity.CreateNoteBookTaskResponse, error) { - jobResult, err := grampus.CreateNotebookJob(convertNoteBookReq2Grampus(req)) + newReq, err := convertNoteBookReq2Grampus(req) + if err != nil { + log.Error("CreateNoteBook err.req=%+v err=%v", req, err) + } + jobResult, err := grampus.CreateNotebookJob(newReq) if err != nil { log.Error("CreateNoteBook failed: %v", err.Error()) return nil, err @@ -47,7 +51,12 @@ func (c C2NetClusterAdapter) CreateNoteBook(req entity.CreateNoteBookTaskRequest func (c C2NetClusterAdapter) CreateOnlineInfer(req entity.CreateNoteBookTaskRequest) (*entity.CreateNoteBookTaskResponse, error) { log.Info("start to CreateOnlineInfer ") - jobResult, err := grampus.CreateNotebookJob(convertOnlineInfer2Grampus(req)) + newReq, err := convertOnlineInfer2Grampus(req) + if err != nil { + log.Error("CreateOnlineInfer err.req=%+v err=%v", req, err) + return nil, err + } + jobResult, err := grampus.CreateNotebookJob(newReq) if err != nil { log.Error("CreateNoteBook failed: %v", err.Error()) return nil, err @@ -109,7 +118,7 @@ func ConvertGrampusImageToStandard(image models.GrampusImage) entity.ClusterImag } } -func convertNoteBookReq2Grampus(req entity.CreateNoteBookTaskRequest) models.CreateGrampusNotebookRequest { +func convertNoteBookReq2Grampus(req entity.CreateNoteBookTaskRequest) (models.CreateGrampusNotebookRequest, error) { codePath := "/code" if len(req.Tasks[0].Code) > 0 { codePath = req.Tasks[0].Code[0].ContainerPath @@ -129,23 +138,31 @@ func convertNoteBookReq2Grampus(req entity.CreateNoteBookTaskRequest) models.Cre tasks := make([]models.GrampusNotebookTask, len(req.Tasks)) for i := 0; i < len(req.Tasks); i++ { t := req.Tasks[i] - tasks[i] = convertNoteBookTask2Grampus(t, command) + task, err := convertNoteBookTask2Grampus(t, command) + if err != nil { + return models.CreateGrampusNotebookRequest{}, err + } + tasks[i] = task } - return models.CreateGrampusNotebookRequest{Name: req.Name, Tasks: tasks} + return models.CreateGrampusNotebookRequest{Name: req.Name, Tasks: tasks}, nil } -func convertOnlineInfer2Grampus(req entity.CreateNoteBookTaskRequest) models.CreateGrampusNotebookRequest { +func convertOnlineInfer2Grampus(req entity.CreateNoteBookTaskRequest) (models.CreateGrampusNotebookRequest, error) { command := generateCommand(req.RepoName, req.Tasks[0].BootFile, req.PrimitiveDatasetName) tasks := make([]models.GrampusNotebookTask, len(req.Tasks)) for i := 0; i < len(req.Tasks); i++ { t := req.Tasks[i] - tasks[i] = convertNoteBookTask2Grampus(t, command) + task, err := convertNoteBookTask2Grampus(t, command) + if err != nil { + return models.CreateGrampusNotebookRequest{}, nil + } + tasks[i] = task } - return models.CreateGrampusNotebookRequest{Name: req.Name, Tasks: tasks} + return models.CreateGrampusNotebookRequest{Name: req.Name, Tasks: tasks}, nil } func generateCommand(repoName, bootFile, datasetName string) string { @@ -194,7 +211,7 @@ func getCopyCmd(datasetName, repoName, bootfilepath string) string { return cmd } -func convertNoteBookTask2Grampus(t entity.NoteBookTask, command string) models.GrampusNotebookTask { +func convertNoteBookTask2Grampus(t entity.NoteBookTask, command string) (models.GrampusNotebookTask, error) { code := models.GrampusDataset{} codeArray := convertContainerArray2GrampusArray(t.Code) if codeArray != nil && len(codeArray) > 0 { @@ -205,6 +222,10 @@ func convertNoteBookTask2Grampus(t entity.NoteBookTask, command string) models.G if outputArray != nil && len(outputArray) > 0 { output = outputArray[0] } + centerIds, err := getGrampusAvailableCenterIds(t.Queues, t.ImageId, *models.GetComputeSourceInstance(t.Spec.ComputeResource), models.JobTypeDebug) + if err != nil { + return models.GrampusNotebookTask{}, err + } return models.GrampusNotebookTask{ Name: t.Name, ResourceSpecId: t.Spec.SourceSpecId, @@ -216,8 +237,56 @@ func convertNoteBookTask2Grampus(t entity.NoteBookTask, command string) models.G AutoStopDuration: t.AutoStopDuration, Capacity: t.Capacity, Command: command, - CenterID: t.CenterID, + CenterID: centerIds, + }, nil +} + +func getGrampusAvailableCenterIds(queues []models.ResourceQueue, imageId string, computeSource models.ComputeSource, jobType models.JobType) ([]string, error) { + if len(queues) == 0 { + return []string{}, nil + } + var intersectionCenterIds []string + if imageId == "" { + for _, queue := range queues { + code := strings.TrimSuffix(queue.AiCenterCode+"+"+queue.QueueCode, "+") + intersectionCenterIds = append(intersectionCenterIds, code) + } + return intersectionCenterIds, nil + } + + processType := computeSource.FullName + images, err := grampus.GetImages(processType, string(jobType)) + if err != nil { + log.Warn("can not get image info from grampus", err) + return []string{}, err + } + var imageCenterIds []string + for _, image := range images.Infos { + if image.ID == imageId { + for _, centerInfo := range image.AICenterImage { + imageCenterIds = append(imageCenterIds, centerInfo.AICenterID) + } + break + } + } + if len(imageCenterIds) == 0 { + return []string{}, errors.New("image not available") + } + + for _, queue := range queues { + for _, imageCenterId := range imageCenterIds { + if queue.AiCenterCode == imageCenterId { + code := strings.TrimSuffix(queue.AiCenterCode+"+"+queue.QueueCode, "+") + intersectionCenterIds = append(intersectionCenterIds, code) + break + } + } + } + if len(intersectionCenterIds) == 0 { + return intersectionCenterIds, errors.New("no center match") } + + return intersectionCenterIds, nil } func convertContainerArray2GrampusArray(containerDatas []entity.ContainerData) []models.GrampusDataset { @@ -413,7 +482,12 @@ func parseC2NetEventsToOperationProfile(notebookEvents []models.GrampusJobEvents } func (c C2NetClusterAdapter) CreateTrainJob(req entity.CreateTrainTaskRequest) (*entity.CreateTrainTaskResponse, error) { - jobResult, err := grampus.CreateJob(convertTrainReq2Grampus(req)) + newReq, err := convertTrainReq2Grampus(req) + if err != nil { + log.Error("CreateTrainJob err.req=%+v err=%v", req, err) + return nil, err + } + jobResult, err := grampus.CreateJob(newReq) if err != nil { log.Error("CreateNoteBook failed: %v", err.Error()) return nil, err @@ -421,16 +495,20 @@ func (c C2NetClusterAdapter) CreateTrainJob(req entity.CreateTrainTaskRequest) ( return convertGrampus2TrainRes(jobResult), nil } -func convertTrainReq2Grampus(req entity.CreateTrainTaskRequest) models.CreateGrampusJobRequest { +func convertTrainReq2Grampus(req entity.CreateTrainTaskRequest) (models.CreateGrampusJobRequest, error) { command := generateGrampusTrainCommand(req) tasks := make([]models.GrampusTasks, len(req.Tasks)) for i := 0; i < len(req.Tasks); i++ { t := req.Tasks[i] - tasks[i] = convertTrainTask2Grampus(t, command) + task, err := convertTrainTask2Grampus(t, command) + if err != nil { + return models.CreateGrampusJobRequest{}, err + } + tasks[i] = task } - return models.CreateGrampusJobRequest{Name: req.Name, Tasks: tasks} + return models.CreateGrampusJobRequest{Name: req.Name, Tasks: tasks}, nil } func generateGrampusTrainCommand(req entity.CreateTrainTaskRequest) string { @@ -649,7 +727,12 @@ func getNpuModelObjectKey(jobName string) string { return setting.CodePathPrefix + jobName + RemoteModelPath + "/" + models.ModelSuffix } -func convertTrainTask2Grampus(t entity.TrainTask, command string) models.GrampusTasks { +func convertTrainTask2Grampus(t entity.TrainTask, command string) (models.GrampusTasks, error) { + centerIds, err := getGrampusAvailableCenterIds(t.Queues, t.ImageId, *models.GetComputeSourceInstance(t.Spec.ComputeResource), models.JobTypeTrain) + if err != nil { + return models.GrampusTasks{}, err + } + return models.GrampusTasks{ Name: t.Name, ResourceSpecId: t.ResourceSpecId, @@ -658,13 +741,13 @@ func convertTrainTask2Grampus(t entity.TrainTask, command string) models.Grampus Datasets: convertContainerArray2GrampusArray(t.Datasets), Code: convertContainerArray2Grampus(t.Code), Command: command, - CenterID: t.CenterID, + CenterID: centerIds, ReplicaNum: 1, Models: convertContainerArray2GrampusArray(t.PreTrainModel), BootFile: t.BootFile, OutPut: convertContainerArray2Grampus(t.OutPut), WorkServerNumber: t.WorkServerNumber, - } + }, nil } func convertGrampus2TrainRes(res *models.CreateGrampusJobResponse) *entity.CreateTrainTaskResponse { diff --git a/services/ai_task_service/cluster/cloudbrain_two.go b/services/ai_task_service/cluster/cloudbrain_two.go index 95bbe77930..4b6eb8982f 100644 --- a/services/ai_task_service/cluster/cloudbrain_two.go +++ b/services/ai_task_service/cluster/cloudbrain_two.go @@ -29,9 +29,6 @@ func init() { } func (c CloudbrainTwoClusterAdapter) CreateNoteBook(req entity.CreateNoteBookTaskRequest) (*entity.CreateNoteBookTaskResponse, error) { - if poolInfos == nil { - json.Unmarshal([]byte(setting.PoolInfos), &poolInfos) - } t := req.Tasks[0] var jobResult *models.CreateNotebookResult @@ -52,19 +49,13 @@ func (c CloudbrainTwoClusterAdapter) CreateNoteBook(req entity.CreateNoteBookTas WorkspaceID: "0", }) } else { - var poolId = poolInfos.PoolInfo[0].PoolId - for _, poolInfo := range poolInfos.PoolInfo { - if poolInfo.PoolName == t.Spec.QueueCode { - poolId = poolInfo.PoolId - } - } jobResult, err = cloudbrain_two.CreateNotebook2(models.CreateNotebook2Params{ JobName: req.Name, Description: req.Description, Flavor: t.Spec.SourceSpecId, Duration: t.AutoStopDuration, ImageID: t.ImageId, - PoolID: poolId, + PoolID: t.Spec.QueueCode, Feature: models.NotebookFeature, Volume: models.VolumeReq{ Capacity: setting.Capacity, diff --git a/services/ai_task_service/context/context.go b/services/ai_task_service/context/context.go index 073f7af48f..c788907e2b 100644 --- a/services/ai_task_service/context/context.go +++ b/services/ai_task_service/context/context.go @@ -13,6 +13,7 @@ type CreationContext struct { GitRepo *git.Repository Repository *models.Repository Spec *models.Specification + Queues []models.ResourceQueue User *models.User CommitID string Response *entity.CreationResponse diff --git a/services/ai_task_service/task/cloudbrain_one_notebook_task.go b/services/ai_task_service/task/cloudbrain_one_notebook_task.go index db1f67ca56..00641f9793 100644 --- a/services/ai_task_service/task/cloudbrain_one_notebook_task.go +++ b/services/ai_task_service/task/cloudbrain_one_notebook_task.go @@ -90,7 +90,7 @@ func (t CloudbrainOneNotebookTaskTemplate) Create(ctx *context.CreationContext) Next(t.CheckDatasets). Next(t.CheckBranchExists). Next(t.InsertCloudbrainRecord4Async). - AsyncNextWithErrFun(t.BuildContainerData, t.CallCreationAPI, t.AfterCallCreationAPI4Async, t.NotifyCreation, t.HandleErr4Async). + AsyncNextWithErrFun(t.BuildContainerData, t.GetAvailableQueues, t.CallCreationAPI, t.AfterCallCreationAPI4Async, t.NotifyCreation, t.HandleErr4Async). Operate(ctx) if err != nil { log.Error("create CloudbrainOneNotebookTask err.%v", err) @@ -111,6 +111,7 @@ func (t CloudbrainOneNotebookTaskTemplate) Restart(ctx *context.CreationContext) Next(t.LoadSpec). Next(t.CheckPointBalance). Next(t.BuildContainerData). + Next(t.GetAvailableQueues). Next(t.CallRestartAPI). Next(t.CreateCloudbrainRecord4Restart). Next(t.NotifyCreation). @@ -134,15 +135,6 @@ func (g CloudbrainOneNotebookTaskTemplate) CallCreationAPI(ctx *context.Creation } form := ctx.Request - centerIds, bizErr := GetAvailableCenterIds(ctx.Spec, models.GetAvailableCenterIdOpts{ - UserId: ctx.User.ID, - JobType: g.JobType, - HasInternet: form.HasInternet, - }, form.ComputeSource, form.ImageID, g.ClusterType) - if bizErr != nil { - return bizErr - } - req := entity.CreateNoteBookTaskRequest{ Name: form.JobName, Tasks: []entity.NoteBookTask{ @@ -157,7 +149,7 @@ func (g CloudbrainOneNotebookTaskTemplate) CallCreationAPI(ctx *context.Creation OutPut: ctx.GetContainerDataArray(entity.ContainerOutPutPath), AutoStopDuration: autoStopDurationMs, Capacity: setting.Capacity, - CenterID: centerIds, + Queues: ctx.Queues, Spec: ctx.Spec, }, }, diff --git a/services/ai_task_service/task/cloudbrain_one_train_task.go b/services/ai_task_service/task/cloudbrain_one_train_task.go index 6d16e82d55..b348cddaff 100644 --- a/services/ai_task_service/task/cloudbrain_one_train_task.go +++ b/services/ai_task_service/task/cloudbrain_one_train_task.go @@ -82,7 +82,7 @@ func (t CloudbrainOneTrainTaskTemplate) Create(ctx *context.CreationContext) (*e Next(t.CheckDatasets). Next(t.CheckModel). Next(t.InsertCloudbrainRecord4Async). - AsyncNextWithErrFun(t.BuildContainerData, t.CallCreationAPI, t.AfterCallCreationAPI4Async, t.NotifyCreation, t.HandleErr4Async). + AsyncNextWithErrFun(t.BuildContainerData, t.GetAvailableQueues, t.CallCreationAPI, t.AfterCallCreationAPI4Async, t.NotifyCreation, t.HandleErr4Async). Operate(ctx) if err != nil { log.Error("create GrampusNoteBookTask err.%v", err) @@ -97,14 +97,6 @@ func (g CloudbrainOneTrainTaskTemplate) CallCreationAPI(ctx *context.CreationCon return response.SYSTEM_ERROR } form := ctx.Request - centerIds, bizErr := GetAvailableCenterIds(ctx.Spec, models.GetAvailableCenterIdOpts{ - UserId: ctx.User.ID, - JobType: g.JobType, - HasInternet: form.HasInternet, - }, form.ComputeSource, form.ImageID, g.ClusterType) - if bizErr != nil { - return bizErr - } req := entity.CreateTrainTaskRequest{ Name: form.JobName, DisplayJobName: form.DisplayJobName, @@ -116,7 +108,7 @@ func (g CloudbrainOneTrainTaskTemplate) CallCreationAPI(ctx *context.CreationCon ImageUrl: strings.TrimSpace(form.ImageUrl), Datasets: ctx.GetContainerDataArray(entity.ContainerDataset), Code: ctx.GetContainerDataArray(entity.ContainerCode), - CenterID: centerIds, + Queues: ctx.Queues, PreTrainModel: ctx.GetContainerDataArray(entity.ContainerPreTrainModel), BootFile: form.BootFile, OutPut: ctx.GetContainerDataArray(entity.ContainerOutPutPath), @@ -139,51 +131,3 @@ func (g CloudbrainOneTrainTaskTemplate) CallCreationAPI(ctx *context.CreationCon } return nil } - -func (g CloudbrainOneTrainTaskTemplate) CallRestartAPI(ctx *context.CreationContext) *response.BizError { - c := g.GetMyCluster() - if c == nil { - return response.SYSTEM_ERROR - } - form := ctx.Request - centerIds, bizErr := GetAvailableCenterIds(ctx.Spec, models.GetAvailableCenterIdOpts{ - UserId: ctx.User.ID, - JobType: g.JobType, - HasInternet: form.HasInternet, - }, form.ComputeSource, form.ImageID, g.ClusterType) - if bizErr != nil { - return bizErr - } - req := entity.CreateTrainTaskRequest{ - Name: form.JobName, - DisplayJobName: form.DisplayJobName, - Tasks: []entity.TrainTask{ - { - Name: form.JobName, - ResourceSpecId: ctx.Spec.SourceSpecId, - ImageId: form.ImageID, - ImageUrl: strings.TrimSpace(form.ImageUrl), - Datasets: ctx.GetContainerDataArray(entity.ContainerDataset), - Code: ctx.GetContainerDataArray(entity.ContainerCode), - CenterID: centerIds, - PreTrainModel: ctx.GetContainerDataArray(entity.ContainerPreTrainModel), - BootFile: form.BootFile, - OutPut: ctx.GetContainerDataArray(entity.ContainerOutPutPath), - Params: form.ParamArray, - Spec: ctx.Spec, - }, - }, - } - createTime := timeutil.TimeStampNow() - res, err := c.CreateTrainJob(req) - if err != nil { - log.Error("CloudbrainOneTrainTaskTemplate CallRestartAPI err.req=%+v err=%v", req, err) - return response.NewBizError(err) - } - ctx.Response = &entity.CreationResponse{ - JobID: res.JobID, - Status: res.Status, - CreateTime: createTime, - } - return nil -} diff --git a/services/ai_task_service/task/cloudbrain_two_train_task.go b/services/ai_task_service/task/cloudbrain_two_train_task.go index 57fa0f1e93..e4138f7aa4 100644 --- a/services/ai_task_service/task/cloudbrain_two_train_task.go +++ b/services/ai_task_service/task/cloudbrain_two_train_task.go @@ -1,14 +1,11 @@ package task import ( - "encoding/json" "strings" "code.gitea.io/gitea/entity" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/log" - "code.gitea.io/gitea/modules/modelarts" - "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/routers/response" "code.gitea.io/gitea/services/ai_task_service/context" @@ -86,7 +83,7 @@ func (t CloudbrainTwoTrainTaskTemplate) Create(ctx *context.CreationContext) (*e Next(t.CheckDatasets). Next(t.CheckModel). Next(t.InsertCloudbrainRecord4Async). - AsyncNextWithErrFun(t.BuildContainerData, t.CallCreationAPI, t.AfterCallCreationAPI4Async, t.NotifyCreation, t.HandleErr4Async). + AsyncNextWithErrFun(t.BuildContainerData, t.GetAvailableQueues, t.CallCreationAPI, t.AfterCallCreationAPI4Async, t.NotifyCreation, t.HandleErr4Async). Operate(ctx) if err != nil { log.Error("create GrampusNoteBookTask err.%v", err) @@ -100,29 +97,7 @@ func (g CloudbrainTwoTrainTaskTemplate) CallCreationAPI(ctx *context.CreationCon if c == nil { return response.SYSTEM_ERROR } - var resourcePools modelarts.ResourcePool - if err := json.Unmarshal([]byte(setting.ResourcePools), &resourcePools); err != nil { - log.Error("Unmarshal error. %v", err) - return response.NewBizError(err) - } else if len(resourcePools.Info) == 0 { - log.Error("UresourcePools.Info is empty. %v", err) - return response.SYSTEM_ERROR - } - modelarts_poolid := resourcePools.Info[0].ID - for _, t := range resourcePools.Info { - if t.Value == ctx.Spec.QueueCode { - modelarts_poolid = t.ID - } - } form := ctx.Request - centerIds, bizErr := GetAvailableCenterIds(ctx.Spec, models.GetAvailableCenterIdOpts{ - UserId: ctx.User.ID, - JobType: g.JobType, - HasInternet: form.HasInternet, - }, form.ComputeSource, form.ImageID, g.ClusterType) - if bizErr != nil { - return bizErr - } req := entity.CreateTrainTaskRequest{ Name: form.JobName, DisplayJobName: form.DisplayJobName, @@ -136,13 +111,13 @@ func (g CloudbrainTwoTrainTaskTemplate) CallCreationAPI(ctx *context.CreationCon Datasets: ctx.GetContainerDataArray(entity.ContainerDataset), Code: ctx.GetContainerDataArray(entity.ContainerCode), LogPath: ctx.GetContainerDataArray(entity.ContainerLogPath), - CenterID: centerIds, + Queues: ctx.Queues, PreTrainModel: ctx.GetContainerDataArray(entity.ContainerPreTrainModel), BootFile: form.BootFile, OutPut: ctx.GetContainerDataArray(entity.ContainerOutPutPath), Params: form.ParamArray, Spec: ctx.Spec, - PoolId: modelarts_poolid, + PoolId: ctx.Spec.QueueCode, WorkServerNumber: form.WorkServerNumber, }, }, diff --git a/services/ai_task_service/task/grampus_notebook_task.go b/services/ai_task_service/task/grampus_notebook_task.go index 85868fd149..04cb99fd0f 100644 --- a/services/ai_task_service/task/grampus_notebook_task.go +++ b/services/ai_task_service/task/grampus_notebook_task.go @@ -156,7 +156,7 @@ func (t GrampusNoteBookTaskTemplate) Create(ctx *context.CreationContext) (*enti Next(t.CheckBranchExists). Next(t.CheckModel). Next(t.InsertCloudbrainRecord4Async). - AsyncNextWithErrFun(t.BuildContainerData, t.CallCreationAPI, t.AfterCallCreationAPI4Async, t.NotifyCreation, t.HandleErr4Async). + AsyncNextWithErrFun(t.BuildContainerData, t.GetAvailableQueues, t.CallCreationAPI, t.AfterCallCreationAPI4Async, t.NotifyCreation, t.HandleErr4Async). Operate(ctx) if err != nil { log.Error("create GrampusNoteBookTask err.%v", err) @@ -199,14 +199,6 @@ func (g GrampusNoteBookTaskTemplate) CallCreationAPI(ctx *context.CreationContex return response.SYSTEM_ERROR } form := ctx.Request - centerIds, bizErr := GetAvailableCenterIds(ctx.Spec, models.GetAvailableCenterIdOpts{ - UserId: ctx.User.ID, - JobType: g.JobType, - HasInternet: form.HasInternet, - }, form.ComputeSource, form.ImageID, g.ClusterType) - if bizErr != nil { - return bizErr - } imageUrl := strings.TrimSpace(form.ImageUrl) if form.ImageID != "" { imageUrl = "" @@ -223,7 +215,7 @@ func (g GrampusNoteBookTaskTemplate) CallCreationAPI(ctx *context.CreationContex Code: ctx.GetContainerDataArray(entity.ContainerCode), AutoStopDuration: autoStopDurationMs, Capacity: setting.Capacity, - CenterID: centerIds, + Queues: ctx.Queues, Spec: ctx.Spec, }, }, diff --git a/services/ai_task_service/task/grampus_online_infer_task.go b/services/ai_task_service/task/grampus_online_infer_task.go index bb33dc842e..cb9636bdde 100644 --- a/services/ai_task_service/task/grampus_online_infer_task.go +++ b/services/ai_task_service/task/grampus_online_infer_task.go @@ -80,7 +80,7 @@ func (t GrampusOnlineInferTaskTemplate) Create(ctx *context.CreationContext) (*e Next(t.CheckBranchExists). Next(t.CheckModel). Next(t.InsertCloudbrainRecord4Async). - AsyncNextWithErrFun(t.BuildContainerData, t.CallCreationAPI, t.AfterCallCreationAPI4Async, t.NotifyCreation, t.HandleErr4Async). + AsyncNextWithErrFun(t.BuildContainerData, t.GetAvailableQueues, t.CallCreationAPI, t.AfterCallCreationAPI4Async, t.NotifyCreation, t.HandleErr4Async). Operate(ctx) if err != nil { log.Error("create GrampusNoteBookTask err.%v", err) @@ -97,14 +97,6 @@ func (g GrampusOnlineInferTaskTemplate) CallCreationAPI(ctx *context.CreationCon return response.SYSTEM_ERROR } form := ctx.Request - centerIds, bizErr := GetAvailableCenterIds(ctx.Spec, models.GetAvailableCenterIdOpts{ - UserId: ctx.User.ID, - JobType: g.JobType, - HasInternet: form.HasInternet, - }, form.ComputeSource, form.ImageID, g.ClusterType) - if bizErr != nil { - return bizErr - } imageUrl := strings.TrimSpace(form.ImageUrl) if form.ImageID != "" { imageUrl = "" @@ -129,7 +121,7 @@ func (g GrampusOnlineInferTaskTemplate) CallCreationAPI(ctx *context.CreationCon OutPut: ctx.GetContainerDataArray(entity.ContainerOutPutPath), AutoStopDuration: -1, Capacity: setting.Capacity, - CenterID: centerIds, + Queues: ctx.Queues, Spec: ctx.Spec, BootFile: ctx.Request.BootFile, }, diff --git a/services/ai_task_service/task/grampus_train_task.go b/services/ai_task_service/task/grampus_train_task.go index 7c676cbcdb..abeb9db648 100644 --- a/services/ai_task_service/task/grampus_train_task.go +++ b/services/ai_task_service/task/grampus_train_task.go @@ -116,7 +116,7 @@ func (t GrampusTrainTaskTemplate) Create(ctx *context.CreationContext) (*entity. Next(t.CheckDatasets). Next(t.CheckModel). Next(t.InsertCloudbrainRecord4Async). - AsyncNextWithErrFun(t.BuildContainerData, t.CallCreationAPI, t.AfterCallCreationAPI4Async, t.NotifyCreation, t.HandleErr4Async). + AsyncNextWithErrFun(t.BuildContainerData, t.GetAvailableQueues, t.CallCreationAPI, t.AfterCallCreationAPI4Async, t.NotifyCreation, t.HandleErr4Async). Operate(ctx) if err != nil { log.Error("create GrampusTrainTaskTemplate err.%v", err) @@ -131,14 +131,6 @@ func (g GrampusTrainTaskTemplate) CallCreationAPI(ctx *context.CreationContext) return response.SYSTEM_ERROR } form := ctx.Request - centerIds, bizErr := GetAvailableCenterIds(ctx.Spec, models.GetAvailableCenterIdOpts{ - UserId: ctx.User.ID, - JobType: g.JobType, - HasInternet: form.HasInternet, - }, form.ComputeSource, form.ImageID, g.ClusterType) - if bizErr != nil { - return bizErr - } imageUrl := strings.TrimSpace(form.ImageUrl) if form.ImageID != "" { imageUrl = "" @@ -154,7 +146,7 @@ func (g GrampusTrainTaskTemplate) CallCreationAPI(ctx *context.CreationContext) ImageUrl: imageUrl, Datasets: ctx.GetContainerDataArray(entity.ContainerDataset), Code: ctx.GetContainerDataArray(entity.ContainerCode), - CenterID: centerIds, + Queues: ctx.Queues, PreTrainModel: ctx.GetContainerDataArray(entity.ContainerPreTrainModel), BootFile: form.BootFile, OutPut: ctx.GetContainerDataArray(entity.ContainerOutPutPath), @@ -179,51 +171,3 @@ func (g GrampusTrainTaskTemplate) CallCreationAPI(ctx *context.CreationContext) } return nil } - -func (g GrampusTrainTaskTemplate) CallRestartAPI(ctx *context.CreationContext) *response.BizError { - c := g.GetMyCluster() - if c == nil { - return response.SYSTEM_ERROR - } - form := ctx.Request - centerIds, bizErr := GetAvailableCenterIds(ctx.Spec, models.GetAvailableCenterIdOpts{ - UserId: ctx.User.ID, - JobType: g.JobType, - HasInternet: form.HasInternet, - }, form.ComputeSource, form.ImageID, g.ClusterType) - if bizErr != nil { - return bizErr - } - req := entity.CreateTrainTaskRequest{ - Name: form.JobName, - DisplayJobName: form.DisplayJobName, - Tasks: []entity.TrainTask{ - { - Name: form.JobName, - ResourceSpecId: ctx.Spec.SourceSpecId, - ImageId: form.ImageID, - ImageUrl: strings.TrimSpace(form.ImageUrl), - Datasets: ctx.GetContainerDataArray(entity.ContainerDataset), - Code: ctx.GetContainerDataArray(entity.ContainerCode), - CenterID: centerIds, - PreTrainModel: ctx.GetContainerDataArray(entity.ContainerPreTrainModel), - BootFile: form.BootFile, - OutPut: ctx.GetContainerDataArray(entity.ContainerOutPutPath), - Params: form.ParamArray, - Spec: ctx.Spec, - }, - }, - } - createTime := timeutil.TimeStampNow() - res, err := c.CreateTrainJob(req) - if err != nil { - log.Error("GrampusTrainTaskTemplate CallRestartAPI err.req=%+v err=%v", req, err) - return response.NewBizError(err) - } - ctx.Response = &entity.CreationResponse{ - JobID: res.JobID, - Status: res.Status, - CreateTime: createTime, - } - return nil -} diff --git a/services/ai_task_service/task/opt_handler.go b/services/ai_task_service/task/opt_handler.go index 5937bce3b1..e92320bdeb 100644 --- a/services/ai_task_service/task/opt_handler.go +++ b/services/ai_task_service/task/opt_handler.go @@ -43,6 +43,7 @@ type CreationHandler interface { NotifyCreation(ctx *context.CreationContext) *response.BizError HandleErr4Async(ctx *context.CreationContext) *response.BizError CheckNotebookCount(ctx *context.CreationContext) *response.BizError + GetAvailableQueues(ctx *context.CreationContext) *response.BizError } //DefaultCreationHandler CreationHandler的默认实现,公共逻辑可以在此结构体中实现 @@ -633,6 +634,15 @@ func (DefaultCreationHandler) CheckPointBalance(ctx *context.CreationContext) *r return nil } +func (DefaultCreationHandler) GetAvailableQueues(ctx *context.CreationContext) *response.BizError { + ctx.Queues = ctx.Spec.GetAvailableQueues(models.GetAvailableCenterIdOpts{ + UserId: ctx.User.ID, + JobType: ctx.Request.JobType, + HasInternet: ctx.Request.HasInternet, + }) + return nil +} + func (DefaultCreationHandler) CallCreationAPI(ctx *context.CreationContext) *response.BizError { log.Error("CallCreationAPI not implements") return response.SYSTEM_ERROR diff --git a/services/ai_task_service/task/task_extend.go b/services/ai_task_service/task/task_extend.go index ec11d4f464..12bec845f7 100644 --- a/services/ai_task_service/task/task_extend.go +++ b/services/ai_task_service/task/task_extend.go @@ -139,6 +139,12 @@ func getCloudBrainDatasetInfo4Local(uuid string, datasetname string, isNeedDown //根据实际调度的智算中心修正规格 func correctAITaskSpec(task *models.Cloudbrain) { + defer func() { + if err := recover(); err != nil { + combinedErr := fmt.Errorf("%s\n%s", err, log.Stack(2)) + log.Error("PANIC:%v", combinedErr) + } + }() if task.AiCenter == "" { return } @@ -161,13 +167,14 @@ func correctAITaskSpec(task *models.Cloudbrain) { log.Error("correctAITaskSpec GetCloudbrainSpecByID spec is empty.taskId=%d ", task.ID) return } - if oldSpec.AiCenterCode == realCenterCode { + if oldSpec.AiCenterCode == realCenterCode && oldSpec.QueueCode == task.QueueCode { return } - //智算中心不一样时才需要处理 + //所属资源池队列不一样时才需要处理 r, err := models.FindSpecs(models.FindSpecsOptions{ SourceSpecId: oldSpec.SourceSpecId, AiCenterCode: realCenterCode, + QueueCode: task.QueueCode, }) if err != nil { log.Error("correctAITaskSpec FindSpecs err.taskId=%d err=%v", task.ID, err) diff --git a/services/ai_task_service/task/task_service.go b/services/ai_task_service/task/task_service.go index 27aa0c64d2..6c34c36b9f 100644 --- a/services/ai_task_service/task/task_service.go +++ b/services/ai_task_service/task/task_service.go @@ -11,7 +11,6 @@ import ( "strings" "code.gitea.io/gitea/entity" - "code.gitea.io/gitea/manager/client/grampus" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/convert" "code.gitea.io/gitea/modules/git" @@ -260,6 +259,7 @@ func UpdateByQueryResponse(res *entity.QueryTaskResponse, task *models.Cloudbrai if res.CenterId != "" && res.CenterName != "" { task.AiCenter = res.CenterId + "+" + res.CenterName } + task.QueueCode = res.QueueCode oldStatus := task.Status newStatus := TransAITaskStatus(res.Status) @@ -733,49 +733,6 @@ func SyncAITaskStatus() { } } } -func GetAvailableCenterIds(specification *models.Specification, opts models.GetAvailableCenterIdOpts, computeSource *models.ComputeSource, - imageId string, clusterType entity.ClusterType) ([]string, *response.BizError) { - centerIds := specification.GetAvailableCenterIds(opts) - - if len(centerIds) == 0 || imageId == "" || clusterType != entity.C2Net { - return centerIds, nil - } - - processType := computeSource.FullName - images, err := grampus.GetImages(processType, string(opts.JobType)) - if err != nil { - log.Warn("can not get image info from grampus", err) - return centerIds, nil - } - var imageCenterIds []string - for _, image := range images.Infos { - if image.ID == imageId { - for _, centerInfo := range image.AICenterImage { - imageCenterIds = append(imageCenterIds, centerInfo.AICenterID) - } - break - } - } - if len(imageCenterIds) == 0 { - return centerIds, nil - } - - var intersectionCenterIds []string - for _, centerId := range centerIds { - for _, imageCenterId := range imageCenterIds { - if centerId == imageCenterId { - intersectionCenterIds = append(intersectionCenterIds, centerId) - break - } - } - } - if len(intersectionCenterIds) == 0 { - return intersectionCenterIds, response.NO_CENTER_MATCH - } - - return intersectionCenterIds, nil - -} func HandleNoJobIdAITasks() { defer func() { diff --git a/services/cloudbrain/resource/resource_queue.go b/services/cloudbrain/resource/resource_queue.go index f92b8c3c94..438bf5a407 100644 --- a/services/cloudbrain/resource/resource_queue.go +++ b/services/cloudbrain/resource/resource_queue.go @@ -5,7 +5,6 @@ import ( "code.gitea.io/gitea/modules/grampus" "code.gitea.io/gitea/modules/log" "fmt" - "strings" ) func AddResourceQueue(req models.ResourceQueueReq) error { @@ -18,6 +17,8 @@ func AddResourceQueue(req models.ResourceQueueReq) error { func UpdateResourceQueue(queueId int64, req models.ResourceQueueReq) error { if _, err := models.UpdateResourceCardsTotalNumAndInternetStatus(queueId, models.ResourceQueue{ CardsTotalNum: req.CardsTotalNum, + QueueType: req.QueueType, + QueueName: req.QueueName, Remark: req.Remark, HasInternet: req.HasInternet, }); err != nil { @@ -54,8 +55,9 @@ func GetResourceAiCenters() ([]models.ResourceAiCenterRes, error) { } func SyncGrampusQueue(doerId int64) error { - r, err := grampus.GetAiCenters(1, 100) + r, err := grampus.GetResourceQueue() if err != nil { + log.Error("Get grampus resource queue failed.err=%v", err) return err } log.Info("SyncGrampusQueue result = %+v", r) @@ -63,56 +65,50 @@ func SyncGrampusQueue(doerId int64) error { queueInsertList := make([]models.ResourceQueue, 0) existIds := make([]int64, 0) - for _, center := range r.Infos { - for _, device := range center.AccDevices { - computeResource := models.ParseComputeResourceFormGrampus(device.Kind) - accCardType := strings.ToUpper(device.Model) - if computeResource == "" { - continue - } - //Determine if this quque already exists.if exist,update params - //if not exist,insert a new record - oldQueue, err := models.GetResourceQueue(&models.ResourceQueue{ + for _, queue := range r { + computeResource := queue.ComputeResource + accCardType := queue.AccCardType + oldQueue, err := models.GetResourceQueue(&models.ResourceQueue{ + Cluster: models.C2NetCluster, + AiCenterCode: queue.AiCenterCode, + ComputeResource: computeResource, + AccCardType: accCardType, + QueueCode: queue.QueueCode, + }) + if err != nil { + return err + } + + hasInternet := queue.HasInternet + if oldQueue == nil { + queueInsertList = append(queueInsertList, models.ResourceQueue{ Cluster: models.C2NetCluster, - AiCenterCode: center.Id, + AiCenterCode: queue.AiCenterCode, + AiCenterName: queue.AiCenterName, ComputeResource: computeResource, AccCardType: accCardType, + IsAutomaticSync: true, + HasInternet: hasInternet, + CreatedBy: doerId, + UpdatedBy: doerId, + QueueCode: queue.QueueCode, + QueueName: queue.QueueName, + QueueType: queue.QueueType, + }) + } else { + existIds = append(existIds, oldQueue.ID) + queueUpdateList = append(queueUpdateList, models.ResourceQueue{ + ID: oldQueue.ID, + ComputeResource: computeResource, + AiCenterName: queue.AiCenterName, + AccCardType: accCardType, + UpdatedBy: doerId, + HasInternet: hasInternet, + QueueName: queue.QueueName, + QueueType: queue.QueueType, }) - if err != nil { - return err - } - - var hasInternet int - if center.IsNetAccess { - hasInternet = 2 - } else { - hasInternet = 1 - } - if oldQueue == nil { - queueInsertList = append(queueInsertList, models.ResourceQueue{ - Cluster: models.C2NetCluster, - AiCenterCode: center.Id, - AiCenterName: center.Name, - ComputeResource: computeResource, - AccCardType: accCardType, - IsAutomaticSync: true, - HasInternet: hasInternet, - CreatedBy: doerId, - UpdatedBy: doerId, - }) - } else { - existIds = append(existIds, oldQueue.ID) - queueUpdateList = append(queueUpdateList, models.ResourceQueue{ - ID: oldQueue.ID, - ComputeResource: computeResource, - AiCenterName: center.Name, - AccCardType: accCardType, - UpdatedBy: doerId, - HasInternet: hasInternet, - }) - } - } + } return models.SyncGrampusQueues(queueUpdateList, queueInsertList, existIds) } diff --git a/services/cloudbrain/resource/resource_scene.go b/services/cloudbrain/resource/resource_scene.go index 4cc840e76b..dbe5a64a6e 100644 --- a/services/cloudbrain/resource/resource_scene.go +++ b/services/cloudbrain/resource/resource_scene.go @@ -5,6 +5,7 @@ import ( ) func AddResourceScene(req models.ResourceSceneReq) error { + if err := models.InsertResourceScene(req); err != nil { return err } diff --git a/services/cloudbrain/resource/resource_specification.go b/services/cloudbrain/resource/resource_specification.go index 6fe1284e86..ea92b968a1 100644 --- a/services/cloudbrain/resource/resource_specification.go +++ b/services/cloudbrain/resource/resource_specification.go @@ -255,6 +255,14 @@ func AddSpecOperateLog(doerId int64, operateType string, newValue, oldValue *mod func FindAvailableSpecs(userId int64, opts models.FindSpecsOptions) ([]*models.Specification, error) { opts.SpecStatus = models.SpecOnShelf + + isUserSpecial := models.IsUserInExclusivePool(userId) + if isUserSpecial { + opts.SceneType = models.SceneTypeExclusive + } else { + opts.SceneType = models.SceneTypePublic + } + r, err := models.FindSpecs(opts) if err != nil { log.Error("FindAvailableSpecs error.%v", err) diff --git a/templates/admin/navbar.tmpl b/templates/admin/navbar.tmpl index d234826ae6..669b8734f6 100644 --- a/templates/admin/navbar.tmpl +++ b/templates/admin/navbar.tmpl @@ -1,5 +1,5 @@