V20230322.patch
into develop
1 year ago
@@ -1672,6 +1672,19 @@ type GrampusStopJobResponse struct { | |||
Status string `json:"status"` | |||
} | |||
type GrampusModelMigrateInfoResponse struct { | |||
GrampusResult | |||
DestBucket string `json:"destBucket"` | |||
DestEndpoint string `json:"destEndpoint"` | |||
DestObjectKey string `json:"destObjectKey"` | |||
DestProxy string `json:"destProxy"` | |||
FailedReason string `json:"failedReason"` | |||
SrcBucket string `json:"srcBucket"` | |||
SrcEndpoint string `json:"srcEndpoint"` | |||
SrcObjectKey string `json:"srcObjectKey"` | |||
Status int `json:"status"` //0:初始化 1:成功 2:失败 3:调度中 | |||
} | |||
type GetGrampusJobEventsResponse struct { | |||
GrampusResult | |||
JobEvents []GrampusJobEvents `json:"jobEvents"` | |||
@@ -1724,6 +1737,7 @@ type GrampusDataset struct { | |||
ObjectKey string `json:"objectKey"` | |||
ContainerPath string `json:"containerPath"` | |||
ReadOnly bool `json:"readOnly"` | |||
GetBackEndpoint string `json:"getBackEndpoint"` | |||
} | |||
type CreateGrampusJobRequest struct { | |||
@@ -0,0 +1,215 @@ | |||
package models | |||
import ( | |||
"code.gitea.io/gitea/modules/log" | |||
"errors" | |||
"time" | |||
"xorm.io/builder" | |||
"code.gitea.io/gitea/modules/timeutil" | |||
) | |||
type GrampusMigrateResponse int | |||
const ( | |||
GrampusMigrateResponseMigrateInit GrampusMigrateResponse = 0 | |||
GrampusMigrateResponseSuccess GrampusMigrateResponse = 1 | |||
GrampusMigrateResponseFailed GrampusMigrateResponse = 2 | |||
GrampusMigrateResponseMigrating GrampusMigrateResponse = 3 | |||
GrampusMigrateResponseNoNeedMigrate GrampusMigrateResponse = 4 | |||
) | |||
func (r GrampusMigrateResponse) ConvertToModelMigrateStep() ModelMigrateStep { | |||
switch r { | |||
case GrampusMigrateResponseMigrateInit: | |||
return GrampusMigrateInit | |||
case GrampusMigrateResponseSuccess: | |||
return GrampusMigrateSuccess | |||
case GrampusMigrateResponseFailed: | |||
return GrampusMigrateFailed | |||
case GrampusMigrateResponseMigrating: | |||
return GrampusMigrating | |||
case GrampusMigrateResponseNoNeedMigrate: | |||
return GrampusMigrateNoNeed | |||
} | |||
return -1 | |||
} | |||
type ModelMigrateStep int | |||
const ( | |||
GrampusMigrateInit ModelMigrateStep = 0 | |||
GrampusMigrating ModelMigrateStep = 1 | |||
GrampusMigrateSuccess ModelMigrateStep = 2 | |||
GrampusMigrateFailed ModelMigrateStep = 3 | |||
GrampusMigrateNoNeed ModelMigrateStep = 4 | |||
BucketMoving ModelMigrateStep = 10 | |||
BucketMoveSuccess ModelMigrateStep = 11 | |||
BucketMoveFailed ModelMigrateStep = 12 | |||
) | |||
func (m ModelMigrateStep) GetStatus() ModelMigrateStatus { | |||
switch m { | |||
case BucketMoveSuccess, GrampusMigrateNoNeed: | |||
return ModelMigrateSuccess | |||
case GrampusMigrateFailed, BucketMoveFailed: | |||
return ModelMigrateFailed | |||
case GrampusMigrateInit: | |||
return ModelMigrateWaiting | |||
case GrampusMigrateSuccess, GrampusMigrating, BucketMoving: | |||
return ModelMigrating | |||
} | |||
return -1 | |||
} | |||
type ModelMigrateStatus int | |||
const ( | |||
ModelMigrateSuccess ModelMigrateStatus = 0 | |||
ModelMigrating ModelMigrateStatus = 1 | |||
ModelMigrateFailed ModelMigrateStatus = 2 | |||
ModelMigrateWaiting ModelMigrateStatus = 3 | |||
) | |||
var UnFinishedMigrateSteps = []ModelMigrateStep{GrampusMigrateInit, GrampusMigrating, GrampusMigrateSuccess, BucketMoving} | |||
type ModelMigrateRecord struct { | |||
ID int64 `xorm:"pk autoincr"` | |||
CloudbrainID int64 `xorm:"INDEX NOT NULL unique"` | |||
DestBucket string | |||
DestEndpoint string | |||
DestObjectKey string | |||
DestProxy string | |||
SrcBucket string | |||
SrcEndpoint string | |||
SrcObjectKey string | |||
Status ModelMigrateStatus `xorm:"NOT NULL DEFAULT 3"` | |||
CurrentStep ModelMigrateStep `xorm:"NOT NULL DEFAULT 0"` | |||
RetryCount int | |||
CreatedUnix timeutil.TimeStamp `xorm:"created"` | |||
UpdatedUnix timeutil.TimeStamp `xorm:"updated"` | |||
DeletedAt time.Time `xorm:"deleted"` | |||
Remark string | |||
} | |||
func (r *ModelMigrateRecord) IsFinished() bool { | |||
for _, s := range UnFinishedMigrateSteps { | |||
if s == r.CurrentStep { | |||
return false | |||
} | |||
} | |||
return true | |||
} | |||
func updateModelMigrateRecordCols(e Engine, record *ModelMigrateRecord, cols ...string) error { | |||
_, err := e.ID(record.ID).Cols(cols...).Update(record) | |||
return err | |||
} | |||
func UpdateModelMigrateRecordCols(record *ModelMigrateRecord, cols ...string) error { | |||
return updateModelMigrateRecordCols(x, record, cols...) | |||
} | |||
func IncreaseModelMigrateRetryCount(recordId int64) error { | |||
_, err := x.ID(recordId).Incr("retry_count", 1).Update(&ModelMigrateRecord{}) | |||
return err | |||
} | |||
func UpdateModelMigrateStatusByStep(record *ModelMigrateRecord, newStep ModelMigrateStep) error { | |||
status := newStep.GetStatus() | |||
if status < 0 { | |||
log.Error("Step format error.id = %d,newStep = %d", record.ID, newStep) | |||
return errors.New("Step format error") | |||
} | |||
record.Status = status | |||
record.CurrentStep = newStep | |||
//正常情况下状态只能向更大的状态更新 | |||
n, err := x.Where(builder.NewCond().And(builder.Eq{"id": record.ID}). | |||
And(builder.Lt{"current_step": newStep})). | |||
Cols("status", "current_step"). | |||
Update(record) | |||
if err != nil { | |||
log.Error("UpdateModelMigrateStatusByStep err.%v", err) | |||
return err | |||
} | |||
if n == 0 { | |||
log.Error("UpdateModelMigrateStatusByStep total num is 0.r.ID=%d", record.ID) | |||
return errors.New("current_step not valid") | |||
} | |||
return nil | |||
} | |||
func RollBackMigrateStatus(record *ModelMigrateRecord, newStep ModelMigrateStep) error { | |||
status := newStep.GetStatus() | |||
if status < 0 { | |||
log.Error("Step format error.id = %d,newStep = %d", record.ID, newStep) | |||
return errors.New("Step format error") | |||
} | |||
record.Status = status | |||
record.CurrentStep = newStep | |||
_, err := x.ID(record.ID). | |||
Cols("status", "current_step"). | |||
Update(record) | |||
if err != nil { | |||
log.Error("RollBackMigrateStatus err.%v", err) | |||
return err | |||
} | |||
return nil | |||
} | |||
func UpdateModelMigrateRecordByStep(record *ModelMigrateRecord) error { | |||
n, err := x. | |||
Where(builder.NewCond().And(builder.Eq{"id": record.ID})). | |||
Update(record) | |||
if err != nil { | |||
log.Error("UpdateModelMigrateRecordByStep err. ID=%d err=%v", record.ID, err) | |||
return err | |||
} | |||
if n == 0 { | |||
log.Error("UpdateModelMigrateRecordByStep total num is 0.r.ID=%d", record.ID) | |||
return errors.New("current_step not valid") | |||
} | |||
return nil | |||
} | |||
func GetUnfinishedModelMigrateRecords() ([]*ModelMigrateRecord, error) { | |||
records := make([]*ModelMigrateRecord, 0, 10) | |||
return records, x. | |||
Where(builder.NewCond().And(builder.In("current_step", UnFinishedMigrateSteps))). | |||
Limit(100). | |||
Find(&records) | |||
} | |||
func InsertModelMigrateRecord(record *ModelMigrateRecord) (_ *ModelMigrateRecord, err error) { | |||
if _, err := x.Insert(record); err != nil { | |||
return nil, err | |||
} | |||
return record, nil | |||
} | |||
func GetModelMigrateRecordByCloudbrainId(cloudbrainId int64) (*ModelMigrateRecord, error) { | |||
r := &ModelMigrateRecord{} | |||
if has, err := x.Where("cloudbrain_id = ?", cloudbrainId).Get(r); err != nil { | |||
log.Error("GetModelMigrateRecordByCloudbrainId err. %v", err) | |||
return nil, err | |||
} else if !has { | |||
return nil, ErrRecordNotExist{} | |||
} | |||
return r, nil | |||
} | |||
func GetModelMigrateRecordById(id int64) (*ModelMigrateRecord, error) { | |||
r := &ModelMigrateRecord{} | |||
if has, err := x.ID(id).Get(r); err != nil { | |||
log.Error("GetModelMigrateRecordByCloudbrainId err. %v", err) | |||
return nil, err | |||
} else if !has { | |||
return nil, ErrRecordNotExist{} | |||
} | |||
return r, nil | |||
} |
@@ -170,6 +170,7 @@ func init() { | |||
new(TechConvergeBaseInfo), | |||
new(RepoConvergeInfo), | |||
new(UserRole), | |||
new(ModelMigrateRecord), | |||
) | |||
tablesStatistic = append(tablesStatistic, | |||
@@ -5,6 +5,7 @@ | |||
package cron | |||
import ( | |||
"code.gitea.io/gitea/services/ai_task_service/schedule" | |||
"context" | |||
"time" | |||
@@ -249,6 +250,17 @@ func registerHandleScheduleRecord() { | |||
}) | |||
} | |||
func registerHandleModelMigrateRecord() { | |||
RegisterTaskFatal("handle_model_migrate_record", &BaseConfig{ | |||
Enabled: true, | |||
RunAtStart: false, | |||
Schedule: "@every 1m", | |||
}, func(ctx context.Context, _ *models.User, _ Config) error { | |||
schedule.HandleUnfinishedMigrateRecords() | |||
return nil | |||
}) | |||
} | |||
func registerRewardPeriodTask() { | |||
RegisterTaskFatal("reward_period_task", &BaseConfig{ | |||
Enabled: true, | |||
@@ -335,4 +347,6 @@ func initBasicTasks() { | |||
registerHandleScheduleRecord() | |||
registerHandleCloudbrainDurationStatistic() | |||
registerHandleModelMigrateRecord() | |||
} |
@@ -349,6 +349,9 @@ func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (jobId str | |||
EndPoint: getEndPoint(), | |||
ObjectKey: req.CodeObsPath + cloudbrain.DefaultBranchName + ".zip", | |||
} | |||
outputGrampus = models.GrampusDataset{ | |||
GetBackEndpoint: getEndPoint(), | |||
} | |||
} else if ProcessorTypeGPU == req.ProcessType { | |||
datasetGrampus = getDatasetGPUGrampus(req.DatasetInfos, "/tmp/dataset") | |||
if len(req.ModelName) != 0 { | |||
@@ -373,6 +376,7 @@ func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (jobId str | |||
} | |||
outputGrampus = models.GrampusDataset{ | |||
ContainerPath: "/tmp/output", | |||
GetBackEndpoint: setting.Attachment.Minio.Endpoint, | |||
} | |||
} else if ProcessorTypeGCU == req.ProcessType { | |||
@@ -399,8 +403,8 @@ func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (jobId str | |||
} | |||
outputGrampus = models.GrampusDataset{ | |||
ContainerPath: "/tmp/output", | |||
GetBackEndpoint: setting.Attachment.Minio.Endpoint, | |||
} | |||
} | |||
jobResult, err := createJob(models.CreateGrampusJobRequest{ | |||
@@ -393,6 +393,72 @@ sendjob: | |||
return &result, nil | |||
} | |||
func PostModelMigrate(jobID string) (*models.GrampusModelMigrateInfoResponse, error) { | |||
checkSetting() | |||
client := getRestyClient() | |||
var result models.GrampusModelMigrateInfoResponse | |||
retry := 0 | |||
sendjob: | |||
res, err := client.R(). | |||
//SetHeader("Content-Type", "application/json"). | |||
SetAuthToken(TOKEN). | |||
SetResult(&result). | |||
Post(HOST + urlTrainJob + "/" + jobID + "/modelMigrate") | |||
if err != nil { | |||
return &result, fmt.Errorf("resty ModelMigrate: %v", err) | |||
} | |||
log.Info("call modelMigrate res=%+v", res) | |||
if result.ErrorCode == errorIllegalToken && retry < 1 { | |||
retry++ | |||
log.Info("retry get token") | |||
_ = getToken() | |||
goto sendjob | |||
} | |||
if result.ErrorCode != 0 { | |||
log.Error("ModelMigrate failed(%d): %s", result.ErrorCode, result.ErrorMsg) | |||
return &result, fmt.Errorf("GetJob failed(%d): %s", result.ErrorCode, result.ErrorMsg) | |||
} | |||
return &result, nil | |||
} | |||
func ModelMigrateInfo(jobID string) (*models.GrampusModelMigrateInfoResponse, error) { | |||
checkSetting() | |||
client := getRestyClient() | |||
var result models.GrampusModelMigrateInfoResponse | |||
retry := 0 | |||
sendjob: | |||
res, err := client.R(). | |||
//SetHeader("Content-Type", "application/json"). | |||
SetAuthToken(TOKEN). | |||
SetResult(&result). | |||
Get(HOST + urlTrainJob + "/" + jobID + "/modelMigrateInfo") | |||
if err != nil { | |||
return &result, fmt.Errorf("resty ModelMigrateInfo: %v", err) | |||
} | |||
log.Info("call modelMigrateInfo res=%+v", res) | |||
if result.ErrorCode == errorIllegalToken && retry < 1 { | |||
retry++ | |||
log.Info("retry get token") | |||
_ = getToken() | |||
goto sendjob | |||
} | |||
if result.ErrorCode != 0 { | |||
log.Error("ModelMigrateInfo failed(%d): %s", result.ErrorCode, result.ErrorMsg) | |||
return &result, fmt.Errorf("GetJob failed(%d): %s", result.ErrorCode, result.ErrorMsg) | |||
} | |||
return &result, nil | |||
} | |||
func GetAiCenters(pageIndex, pageSize int) (*models.GetGrampusAiCentersResult, error) { | |||
checkSetting() | |||
client := getRestyClient() | |||
@@ -0,0 +1,41 @@ | |||
package model_schedule | |||
import ( | |||
"code.gitea.io/gitea/models" | |||
"code.gitea.io/gitea/modules/log" | |||
"code.gitea.io/gitea/modules/notification/base" | |||
) | |||
type scheduleNotifier struct { | |||
base.NullNotifier | |||
} | |||
var ( | |||
_ base.Notifier = &scheduleNotifier{} | |||
) | |||
// NewNotifier create a new wechatNotifier notifier | |||
func NewNotifier() base.Notifier { | |||
return &scheduleNotifier{} | |||
} | |||
func (*scheduleNotifier) NotifyChangeCloudbrainStatus(cloudbrain *models.Cloudbrain, oldStatus string) { | |||
if !cloudbrain.IsTerminal() { | |||
return | |||
} | |||
log.Info("try to InsertModelMigrateRecord.cloudbrainId=%d oldStatus=%s newStatus=%d", cloudbrain.ID, oldStatus, cloudbrain.Status) | |||
switch cloudbrain.Type { | |||
case models.TypeC2Net: | |||
if cloudbrain.JobType == string(models.JobTypeDebug) { | |||
return | |||
} | |||
_, err := models.InsertModelMigrateRecord(&models.ModelMigrateRecord{ | |||
CloudbrainID: cloudbrain.ID, | |||
Status: models.ModelMigrating, | |||
CurrentStep: models.GrampusMigrating, | |||
}) | |||
if err != nil { | |||
log.Error("InsertModelMigrateRecord err.cloudbrain.id=%d err=%v", cloudbrain.ID, err) | |||
} | |||
} | |||
} |
@@ -11,6 +11,7 @@ import ( | |||
"code.gitea.io/gitea/modules/notification/base" | |||
"code.gitea.io/gitea/modules/notification/indexer" | |||
"code.gitea.io/gitea/modules/notification/mail" | |||
"code.gitea.io/gitea/modules/notification/model_schedule" | |||
"code.gitea.io/gitea/modules/notification/reward" | |||
"code.gitea.io/gitea/modules/notification/ui" | |||
"code.gitea.io/gitea/modules/notification/webhook" | |||
@@ -41,6 +42,7 @@ func NewContext() { | |||
RegisterNotifier(action.NewNotifier()) | |||
RegisterNotifier(wechatNotifier.NewNotifier()) | |||
RegisterNotifier(reward.NewNotifier()) | |||
RegisterNotifier(model_schedule.NewNotifier()) | |||
} | |||
// NotifyUploadAttachment notifies attachment upload message to notifiers | |||
@@ -0,0 +1,9 @@ | |||
package redis_key | |||
import "fmt" | |||
const MODEL_SCHEDULE_PREFIX = "model_schedule" | |||
func RecordHandleLock(jobId string) string { | |||
return KeyJoin(MODEL_SCHEDULE_PREFIX, fmt.Sprint(jobId), "handle") | |||
} |
@@ -676,6 +676,9 @@ var ( | |||
DeductTaskRangeForFirst time.Duration | |||
DeductTaskMinTimestamp int64 | |||
//model-migrate config | |||
UseLocalMinioMigrate bool | |||
//badge config | |||
BadgeIconMaxFileSize int64 | |||
BadgeIconMaxWidth int | |||
@@ -1677,6 +1680,9 @@ func NewContext() { | |||
DeductTaskRangeForFirst = sec.Key("DEDUCT_TASK_RANGE_FOR_FIRST").MustDuration(3 * time.Hour) | |||
DeductTaskMinTimestamp = sec.Key("DEDUCT_TASK_MIN_TIMESTAMP").MustInt64(0) | |||
sec = Cfg.Section("model-migrate") | |||
UseLocalMinioMigrate = sec.Key("USE_LOCAL_MINIO_MIGRATE").MustBool(false) | |||
sec = Cfg.Section("icons") | |||
BadgeIconMaxFileSize = sec.Key("BADGE_ICON_MAX_FILE_SIZE").MustInt64(1048576) | |||
BadgeIconMaxWidth = sec.Key("BADGE_ICON_MAX_WIDTH").MustInt(4096) | |||
@@ -418,6 +418,53 @@ func GetOneLevelAllObjectUnderDir(bucket string, prefixRootPath string, relative | |||
return fileInfos, nil | |||
} | |||
func GetOneLevelObjectsUnderDir(bucket string, prefixRootPath string, relativePath string) ([]FileInfo, error) { | |||
input := &obs.ListObjectsInput{} | |||
input.Bucket = bucket | |||
input.Prefix = prefixRootPath + relativePath | |||
input.Delimiter = "/" | |||
if !strings.HasSuffix(input.Prefix, "/") { | |||
input.Prefix += "/" | |||
} | |||
fileInfos := make([]FileInfo, 0) | |||
prefixLen := len(input.Prefix) | |||
index := 1 | |||
output, err := ObsCli.ListObjects(input) | |||
if err != nil { | |||
if obsError, ok := err.(obs.ObsError); ok { | |||
log.Error("Code:%s, Message:%s", obsError.Code, obsError.Message) | |||
} | |||
return nil, err | |||
} | |||
log.Info("Page:%d\n", index) | |||
index++ | |||
for _, val := range output.Contents { | |||
var fileName string | |||
if val.Key == input.Prefix { | |||
continue | |||
} | |||
fileName = val.Key[prefixLen:] | |||
fileInfo := FileInfo{ | |||
ModTime: val.LastModified.Local().Format("2006-01-02 15:04:05"), | |||
FileName: fileName, | |||
Size: val.Size, | |||
IsDir: false, | |||
ParenDir: relativePath, | |||
} | |||
fileInfos = append(fileInfos, fileInfo) | |||
} | |||
for _, val := range output.CommonPrefixes { | |||
fileName := strings.TrimSuffix(strings.TrimPrefix(val, input.Prefix), "/") | |||
fileInfo := FileInfo{ | |||
FileName: fileName, | |||
IsDir: true, | |||
ParenDir: strings.TrimPrefix(val, prefixRootPath), | |||
} | |||
fileInfos = append(fileInfos, fileInfo) | |||
} | |||
return fileInfos, nil | |||
} | |||
func GetAllObjectByBucketAndPrefix(bucket string, prefix string) ([]FileInfo, error) { | |||
input := &obs.ListObjectsInput{} | |||
input.Bucket = bucket | |||
@@ -1,18 +1,15 @@ | |||
package urchin | |||
import ( | |||
"encoding/json" | |||
"fmt" | |||
"strings" | |||
"time" | |||
"code.gitea.io/gitea/models" | |||
"code.gitea.io/gitea/modules/grampus" | |||
"code.gitea.io/gitea/modules/labelmsg" | |||
"code.gitea.io/gitea/modules/log" | |||
"code.gitea.io/gitea/modules/setting" | |||
"code.gitea.io/gitea/modules/storage" | |||
"encoding/json" | |||
"fmt" | |||
"github.com/minio/minio-go" | |||
"strings" | |||
) | |||
type DecompressReq struct { | |||
@@ -30,253 +27,6 @@ func getUrfsClient() { | |||
urfsClient = New() | |||
} | |||
func GetAITaskOutPutBack(cloudbrainID int64, jobName, centerId, computerResource string) error { | |||
switch computerResource { | |||
case models.NPUResource: | |||
return GetNPUDataBack(cloudbrainID, jobName, centerId) | |||
case models.GPUResource: | |||
return GetGPUDataBack(cloudbrainID, jobName, centerId) | |||
case models.GCUResource: | |||
return GetGCUDataBack(cloudbrainID, jobName, centerId) | |||
} | |||
return nil | |||
} | |||
func GetGPUDataBack(cloudbrainID int64, jobName, centerId string) error { | |||
endpoint := grampus.GetRemoteEndPoint(centerId) | |||
bucket := grampus.BucketRemote | |||
objectKey := grampus.GetGPUModelObjectKey4Grampus(jobName) | |||
destPeerHost := grampus.GetCenterProxy(setting.Grampus.GPULocalCenterID) | |||
getUrfsClient() | |||
var res *PeerResult | |||
var err error | |||
var retryIntervalList = []time.Duration{1 * time.Minute, 1 * time.Minute, 3 * time.Minute} | |||
for i, retryInterval := range retryIntervalList { | |||
res, err = urfsClient.ScheduleDirToPeerByKey(endpoint, bucket, objectKey, destPeerHost) | |||
if err == nil { | |||
log.Info("ScheduleDirToPeerByKey res=%v", res) | |||
break | |||
} | |||
log.Error("ScheduleDataToPeerByKey failed:%v, ObjectKey is:%s,retry in %v", err, objectKey, retryInterval) | |||
time.Sleep(retryInterval) | |||
// If it's the last retry, break | |||
if i == len(retryIntervalList)-1 { | |||
res, err = urfsClient.ScheduleDirToPeerByKey(endpoint, bucket, objectKey, destPeerHost) | |||
} | |||
} | |||
// If err is still not nil after retrying, insert a default value | |||
if err != nil { | |||
log.Error("ScheduleDataToPeerByKey failed info is EndPoint:%s,Bucket:%s,ObjectKey:%s,ProxyServer:%s,TargetObjectKey:%s,error:%v", | |||
endpoint, bucket, objectKey, destPeerHost, grampus.GetGPUModelObjectKey(jobName), err) | |||
_, err = models.InsertScheduleRecord(&models.ScheduleRecord{ | |||
CloudbrainID: cloudbrainID, | |||
EndPoint: endpoint, | |||
Bucket: bucket, | |||
ObjectKey: objectKey, | |||
ProxyServer: destPeerHost, | |||
Status: models.StorageUrchinScheduleFailed, | |||
IsDir: true, | |||
ComputeSource: models.GPUResource, | |||
TargetObjectKey: grampus.GetGPUModelObjectKey(jobName), | |||
Remark: interceptErrorMessages(err), | |||
LocalOperateStatus: models.MoveBucketWaiting, | |||
}) | |||
if err != nil { | |||
log.Error("InsertScheduleRecord failed:%v", err) | |||
return err | |||
} | |||
return fmt.Errorf("GetBackModel failed after retrying:%v", err) | |||
} | |||
_, err = models.InsertScheduleRecord(&models.ScheduleRecord{ | |||
CloudbrainID: cloudbrainID, | |||
EndPoint: endpoint, | |||
Bucket: bucket, | |||
ObjectKey: objectKey, | |||
ProxyServer: destPeerHost, | |||
Status: res.StatusCode, | |||
IsDir: true, | |||
ComputeSource: models.GPUResource, | |||
TargetObjectKey: grampus.GetGPUModelObjectKey(jobName), | |||
LocalOperateStatus: models.MoveBucketWaiting, | |||
}) | |||
if err != nil { | |||
log.Error("InsertScheduleRecord failed:%v", err) | |||
return err | |||
} | |||
r, err := models.GetScheduleRecordByCloudbrainID(cloudbrainID) | |||
if err != nil { | |||
log.Error("GetScheduleRecordByCloudbrainID err.cloudbrainID=%d err=%v", cloudbrainID, err) | |||
return err | |||
} | |||
err = handleScheduleResult(r, res) | |||
if err != nil { | |||
log.Error("GetGPUDataBack handleScheduleResult err.%v", err) | |||
return err | |||
} | |||
return nil | |||
} | |||
func GetGCUDataBack(cloudbrainID int64, jobName, centerId string) error { | |||
endpoint := grampus.GetRemoteEndPoint(centerId) | |||
bucket := grampus.BucketRemote | |||
objectKey := grampus.GetGPUModelObjectKey4Grampus(jobName) | |||
destPeerHost := grampus.GetCenterProxy(setting.Grampus.GPULocalCenterID) | |||
getUrfsClient() | |||
var res *PeerResult | |||
var err error | |||
var retryIntervalList = []time.Duration{1 * time.Minute, 1 * time.Minute, 3 * time.Minute} | |||
for i, retryInterval := range retryIntervalList { | |||
res, err = urfsClient.ScheduleDirToPeerByKey(endpoint, bucket, objectKey, destPeerHost) | |||
if err == nil { | |||
break | |||
} | |||
log.Error("ScheduleDataToPeerByKey failed:%v, retry in %v", err, retryInterval) | |||
time.Sleep(retryInterval) | |||
// If it's the last retry, break | |||
if i == len(retryIntervalList)-1 { | |||
res, err = urfsClient.ScheduleDirToPeerByKey(endpoint, bucket, objectKey, destPeerHost) | |||
} | |||
} | |||
// If err is still not nil after retrying, insert a default value | |||
if err != nil { | |||
log.Error("ScheduleDataToPeerByKey failed info is EndPoint:%s,Bucket:%s,ObjectKey:%s,ProxyServer:%s,TargetObjectKey:%s,error:%v", | |||
endpoint, bucket, objectKey, destPeerHost, grampus.GetGPUModelObjectKey(jobName), err) | |||
_, err = models.InsertScheduleRecord(&models.ScheduleRecord{ | |||
CloudbrainID: cloudbrainID, | |||
EndPoint: endpoint, | |||
Bucket: bucket, | |||
ObjectKey: objectKey, | |||
ProxyServer: destPeerHost, | |||
Status: models.StorageUrchinScheduleFailed, | |||
IsDir: true, | |||
ComputeSource: models.GCUResource, | |||
TargetObjectKey: grampus.GetGPUModelObjectKey(jobName), | |||
LocalOperateStatus: models.MoveBucketWaiting, | |||
}) | |||
if err != nil { | |||
log.Error("InsertScheduleRecord failed:%v", err) | |||
return err | |||
} | |||
return fmt.Errorf("GetBackModel failed after retrying:%v", err) | |||
} | |||
_, err = models.InsertScheduleRecord(&models.ScheduleRecord{ | |||
CloudbrainID: cloudbrainID, | |||
EndPoint: endpoint, | |||
Bucket: bucket, | |||
ObjectKey: objectKey, | |||
ProxyServer: destPeerHost, | |||
Status: res.StatusCode, | |||
IsDir: true, | |||
ComputeSource: models.GCUResource, | |||
TargetObjectKey: grampus.GetGPUModelObjectKey(jobName), | |||
LocalOperateStatus: models.MoveBucketWaiting, | |||
}) | |||
if err != nil { | |||
log.Error("InsertScheduleRecord failed:%v", err) | |||
return err | |||
} | |||
r, err := models.GetScheduleRecordByCloudbrainID(cloudbrainID) | |||
if err != nil { | |||
log.Error("GetScheduleRecordByCloudbrainID err.cloudbrainID=%d err=%v", cloudbrainID, err) | |||
return err | |||
} | |||
err = handleScheduleResult(r, res) | |||
if err != nil { | |||
log.Error("GetGCUDataBack handleScheduleResult err.%v", err) | |||
return err | |||
} | |||
return nil | |||
} | |||
func GetNPUDataBack(cloudbrainID int64, jobName, centerId string) error { | |||
endpoint := grampus.GetRemoteEndPoint(centerId) | |||
bucket := grampus.BucketRemote | |||
objectKey := grampus.GetNpuModelObjectKey(jobName) | |||
destPeerHost := grampus.GetCenterProxy(setting.Grampus.LocalCenterID) | |||
getUrfsClient() | |||
var res *PeerResult | |||
var err error | |||
var retryIntervalList = []time.Duration{1 * time.Minute, 1 * time.Minute, 3 * time.Minute} | |||
for i, retryInterval := range retryIntervalList { | |||
res, err = urfsClient.ScheduleDataToPeerByKey(endpoint, bucket, objectKey, destPeerHost) | |||
if err == nil { | |||
break | |||
} | |||
log.Error("ScheduleDataToPeerByKey failed:%v, ObjectKey is:%s,retry in %v", err, objectKey, retryInterval) | |||
time.Sleep(retryInterval) | |||
// If it's the last retry, break | |||
if i == len(retryIntervalList)-1 { | |||
res, err = urfsClient.ScheduleDataToPeerByKey(endpoint, bucket, objectKey, destPeerHost) | |||
} | |||
} | |||
// If err is still not nil after retrying, insert a default value | |||
if err != nil { | |||
log.Error("ScheduleDataToPeerByKey failed after retrying, errorInfo is EndPoint:%s,Bucket:%s,ObjectKey:%s,ProxyServer:%s,error:%v", | |||
endpoint, bucket, objectKey, destPeerHost, err) | |||
_, err = models.InsertScheduleRecord(&models.ScheduleRecord{ | |||
CloudbrainID: cloudbrainID, | |||
EndPoint: endpoint, | |||
Bucket: bucket, | |||
ObjectKey: objectKey, | |||
ProxyServer: destPeerHost, | |||
Status: models.StorageUrchinScheduleFailed, | |||
IsDir: false, | |||
ComputeSource: models.NPUResource, | |||
Remark: interceptErrorMessages(err), | |||
LocalOperateStatus: models.MoveBucketWaiting, | |||
}) | |||
if err != nil { | |||
log.Error("InsertScheduleRecord failed:%v", err) | |||
return err | |||
} | |||
return fmt.Errorf("GetBackModel failed after retrying:%v", err) | |||
} | |||
_, err = models.InsertScheduleRecord(&models.ScheduleRecord{ | |||
CloudbrainID: cloudbrainID, | |||
EndPoint: endpoint, | |||
Bucket: bucket, | |||
ObjectKey: objectKey, | |||
ProxyServer: destPeerHost, | |||
Status: res.StatusCode, | |||
IsDir: false, | |||
ComputeSource: models.NPUResource, | |||
LocalOperateStatus: models.MoveBucketWaiting, | |||
}) | |||
if err != nil { | |||
log.Error("InsertScheduleRecord failed:%v", err) | |||
return err | |||
} | |||
r, err := models.GetScheduleRecordByCloudbrainID(cloudbrainID) | |||
if err != nil { | |||
log.Error("GetScheduleRecordByCloudbrainID err.cloudbrainID=%d err=%v", cloudbrainID, err) | |||
return err | |||
} | |||
err = handleScheduleResult(r, res) | |||
if err != nil { | |||
log.Error("GetNPUDataBack handleScheduleResult err.%v", err) | |||
return err | |||
} | |||
return nil | |||
} | |||
func tryScheduleDir(endpoint, bucket, objectKey, dstPeer string) { | |||
println("new request dstPeer: ", dstPeer) | |||
urfs := New() | |||
@@ -0,0 +1,11 @@ | |||
package util | |||
func TruncateString(msg string, maxLength int) string { | |||
if msg == "" { | |||
return "" | |||
} | |||
if len(msg) < maxLength { | |||
maxLength = len(msg) | |||
} | |||
return msg[0:maxLength] | |||
} |
@@ -1053,6 +1053,7 @@ func RegisterRoutes(m *macaron.Macaron) { | |||
m.Put("/stop", cloudbrain.AdminOrOwnerOrJobCreaterRightForTrain, repo.GeneralCloudBrainJobStop) | |||
m.Group("/model", func() { | |||
m.Get("/schedule_status", repo.GetModelScheduleStatus) | |||
m.Post("/reschedule", cloudbrain.AdminOrOwnerOrJobCreaterRightForTrain, repo.RetryModelSchedule) | |||
}) | |||
}) | |||
}) | |||
@@ -6,6 +6,7 @@ | |||
package repo | |||
import ( | |||
"code.gitea.io/gitea/services/ai_task_service/schedule" | |||
"encoding/json" | |||
"net/http" | |||
"path" | |||
@@ -20,8 +21,6 @@ import ( | |||
"code.gitea.io/gitea/services/cloudbrain/cloudbrainTask" | |||
"code.gitea.io/gitea/modules/urfs_client/urchin" | |||
"code.gitea.io/gitea/modules/notification" | |||
"code.gitea.io/gitea/modules/grampus" | |||
@@ -169,9 +168,6 @@ func GetModelArtsTrainJobVersion(ctx *context.APIContext) { | |||
} | |||
if oldStatus != job.Status { | |||
notification.NotifyChangeCloudbrainStatus(job, oldStatus) | |||
if models.IsTrainJobTerminal(job.Status) && len(result.JobInfo.Tasks[0].CenterID) == 1 { | |||
go urchin.GetAITaskOutPutBack(job.ID, job.JobName, result.JobInfo.Tasks[0].CenterID[0], job.ComputeResource) | |||
} | |||
} | |||
err = models.UpdateTrainJobVersion(job) | |||
if err != nil { | |||
@@ -191,7 +187,7 @@ func GetModelArtsTrainJobVersion(ctx *context.APIContext) { | |||
func GetModelScheduleStatus(ctx *context.APIContext) { | |||
jobID := ctx.Params(":jobid") | |||
status, err := cloudbrainTask.GetModelScheduleStatus(jobID) | |||
status, err := schedule.GetModelScheduleStatus(jobID) | |||
if err != nil { | |||
ctx.JSON(http.StatusOK, response.OuterResponseError(err)) | |||
return | |||
@@ -200,6 +196,16 @@ func GetModelScheduleStatus(ctx *context.APIContext) { | |||
ctx.JSON(http.StatusOK, response.OuterSuccessWithData(m)) | |||
} | |||
func RetryModelSchedule(ctx *context.APIContext) { | |||
jobID := ctx.Params(":jobid") | |||
err := schedule.RetryModelMigrate(jobID) | |||
if err != nil { | |||
ctx.JSON(http.StatusOK, response.OuterResponseError(err)) | |||
return | |||
} | |||
ctx.JSON(http.StatusOK, response.OuterSuccess()) | |||
} | |||
func TrainJobForModelConvertGetLog(ctx *context.APIContext) { | |||
var ( | |||
err error | |||
@@ -466,14 +472,41 @@ func ModelList(ctx *context.APIContext) { | |||
return | |||
} | |||
status := models.ModelScheduleSucceed | |||
status := models.ModelMigrateSuccess | |||
if task.Type == models.TypeC2Net { | |||
if !task.IsTerminal() { | |||
log.Info("GetModelScheduleStatus job is not terminal.jobId=%s", jobID) | |||
status = models.JobNoTeminal | |||
} else { | |||
status, err = schedule.GetModelScheduleStatus(task.JobID) | |||
if err != nil { | |||
log.Error("GetModelScheduleStatus(%s) failed:%v", task.JobName, err.Error()) | |||
return | |||
} | |||
} | |||
} | |||
if status != models.ModelMigrateSuccess { | |||
ctx.JSON(http.StatusOK, map[string]interface{}{ | |||
"JobID": jobID, | |||
"VersionName": versionName, | |||
"StatusOK": status, | |||
"Path": dirArray, | |||
"Dirs": []storage.FileInfo{}, | |||
"task": task, | |||
"PageIsCloudBrain": true, | |||
}) | |||
return | |||
} | |||
var fileInfos []storage.FileInfo | |||
if task.ComputeResource == models.NPUResource { | |||
prefix := strings.TrimPrefix(path.Join(setting.TrainJobModelPath, task.JobName, setting.OutPutPath, versionName), "/") | |||
if !strings.HasSuffix(prefix, "/") { | |||
prefix += "/" | |||
} | |||
fileInfos, err = storage.GetOneLevelAllObjectUnderDir(setting.Bucket, prefix, parentDir) | |||
fileInfos, err = storage.GetOneLevelObjectsUnderDir(setting.Bucket, prefix, parentDir) | |||
if err != nil { | |||
log.Info("get TrainJobListModel failed:", err) | |||
ctx.ServerError("GetObsListObject:", err) | |||
@@ -504,19 +537,6 @@ func ModelList(ctx *context.APIContext) { | |||
}) | |||
} | |||
if task.Type == models.TypeC2Net { | |||
if !task.IsTerminal() { | |||
log.Info("GetModelScheduleStatus job is not terminal.jobId=%s", jobID) | |||
status = models.JobNoTeminal | |||
} else { | |||
status, err = cloudbrainTask.GetModelScheduleStatus(task.JobID) | |||
if err != nil { | |||
log.Error("GetModelScheduleStatus(%s) failed:%v", task.JobName, err.Error()) | |||
return | |||
} | |||
} | |||
} | |||
ctx.JSON(http.StatusOK, map[string]interface{}{ | |||
"JobID": jobID, | |||
"VersionName": versionName, | |||
@@ -21,8 +21,6 @@ import ( | |||
cloudbrainService "code.gitea.io/gitea/services/cloudbrain" | |||
"code.gitea.io/gitea/modules/urfs_client/urchin" | |||
"code.gitea.io/gitea/modules/dataset" | |||
"code.gitea.io/gitea/services/cloudbrain/cloudbrainTask" | |||
@@ -1953,9 +1951,6 @@ func SyncCloudbrainStatus() { | |||
task.CorrectCreateUnix() | |||
if oldStatus != task.Status { | |||
notification.NotifyChangeCloudbrainStatus(task, oldStatus) | |||
if models.IsTrainJobTerminal(task.Status) && len(result.JobInfo.Tasks[0].CenterID) == 1 { | |||
go urchin.GetAITaskOutPutBack(task.ID, task.JobName, result.JobInfo.Tasks[0].CenterID[0], task.ComputeResource) | |||
} | |||
} | |||
err = models.UpdateJob(task) | |||
if err != nil { | |||
@@ -16,7 +16,6 @@ import ( | |||
"code.gitea.io/gitea/services/lock" | |||
"code.gitea.io/gitea/modules/urfs_client/urchin" | |||
"code.gitea.io/gitea/routers/response" | |||
"code.gitea.io/gitea/services/cloudbrain/cloudbrainTask" | |||
@@ -1379,11 +1378,6 @@ func GrampusNotebookShow(ctx *context.Context) { | |||
task.CorrectCreateUnix() | |||
if oldStatus != task.Status { | |||
notification.NotifyChangeCloudbrainStatus(task, oldStatus) | |||
if models.IsTrainJobTerminal(task.Status) && task.ComputeResource == models.NPUResource { | |||
if len(result.JobInfo.Tasks[0].CenterID) == 1 { | |||
urchin.GetNPUDataBack(task.ID, task.JobName, result.JobInfo.Tasks[0].CenterID[0]) | |||
} | |||
} | |||
} | |||
} | |||
err = models.UpdateJob(task) | |||
@@ -1535,9 +1529,6 @@ func GrampusTrainJobShow(ctx *context.Context) { | |||
task.CorrectCreateUnix() | |||
if oldStatus != task.Status { | |||
notification.NotifyChangeCloudbrainStatus(task, oldStatus) | |||
if models.IsTrainJobTerminal(task.Status) && len(result.JobInfo.Tasks[0].CenterID) == 1 { | |||
go urchin.GetAITaskOutPutBack(task.ID, task.JobName, result.JobInfo.Tasks[0].CenterID[0], task.ComputeResource) | |||
} | |||
} | |||
} | |||
err = models.UpdateJob(task) | |||
@@ -1576,6 +1567,7 @@ func GrampusTrainJobShow(ctx *context.Context) { | |||
ctx.Data["datasetDownload"] = GetCloudBrainDataSetInfo(task.Uuid, task.DatasetName, false) | |||
ctx.Data["canDownload"] = cloudbrain.CanModifyJob(ctx, task) | |||
ctx.Data["displayJobName"] = task.DisplayJobName | |||
ctx.Data["canReschedule"] = cloudbrain.CanDeleteJob(ctx, task) | |||
ctx.Data["ai_center"] = cloudbrainService.GetAiCenterShow(task.AiCenter, ctx) | |||
@@ -0,0 +1,374 @@ | |||
package schedule | |||
import ( | |||
"bytes" | |||
"code.gitea.io/gitea/models" | |||
"code.gitea.io/gitea/modules/grampus" | |||
"code.gitea.io/gitea/modules/labelmsg" | |||
"code.gitea.io/gitea/modules/log" | |||
"code.gitea.io/gitea/modules/redis/redis_key" | |||
"code.gitea.io/gitea/modules/redis/redis_lock" | |||
"code.gitea.io/gitea/modules/setting" | |||
"code.gitea.io/gitea/modules/storage" | |||
"code.gitea.io/gitea/modules/util" | |||
"encoding/json" | |||
"errors" | |||
"fmt" | |||
"github.com/minio/minio-go" | |||
"os/exec" | |||
"path" | |||
"strings" | |||
"time" | |||
) | |||
const NPUModelDefaultName = "models.zip" | |||
func GetModelScheduleStatus(jobId string) (models.ModelMigrateStatus, error) { | |||
job, err := models.GetCloudbrainByJobID(jobId) | |||
if err != nil { | |||
log.Error("GetModelScheduleStatus GetCloudbrainByJobID err.jobId=%s err=%v", jobId, err) | |||
return 0, errors.New("jobId not correct") | |||
} | |||
if !job.IsTerminal() { | |||
log.Info("GetModelScheduleStatus job is not terminal.jobId=%s", jobId) | |||
return models.ModelMigrateWaiting, nil | |||
} | |||
record, err := models.GetModelMigrateRecordByCloudbrainId(job.ID) | |||
if err != nil { | |||
log.Error("GetModelScheduleStatus GetModelMigrateRecordByCloudbrainId err.jobId=%s err=%v", jobId, err) | |||
if models.IsErrRecordNotExist(err) { | |||
return models.ModelMigrateSuccess, nil | |||
} | |||
return models.ModelMigrateFailed, err | |||
} | |||
if !record.IsFinished() { | |||
go HandleUnfinishedMigrateRecord(record) | |||
} | |||
return record.Status, nil | |||
} | |||
func RetryModelMigrate(jobId string) error { | |||
job, err := models.GetCloudbrainByJobID(jobId) | |||
if err != nil { | |||
log.Error("RetryModelMigrate GetCloudbrainByJobID err.jobId=%s err=%v", jobId, err) | |||
return errors.New("jobId not correct") | |||
} | |||
if !job.IsTerminal() { | |||
log.Info("RetryModelMigrate job is not terminal.jobId=%s", jobId) | |||
return errors.New("task is not terminal") | |||
} | |||
//避免并发问题,先尝试获取锁,获取锁以后再查最新的记录 | |||
lock := redis_lock.NewDistributeLock(redis_key.RecordHandleLock(jobId)) | |||
success, err := lock.LockWithWait(10*time.Second, 10*time.Second) | |||
if err != nil { | |||
log.Error("HandleUnfinishedMigrateRecord lock err.jobId=%d %v", jobId, err) | |||
return err | |||
} | |||
if !success { | |||
log.Error("HandleUnfinishedMigrateRecord lock failed.ID=%d ", jobId) | |||
return nil | |||
} | |||
defer lock.UnLock() | |||
record, err := models.GetModelMigrateRecordByCloudbrainId(job.ID) | |||
if err != nil { | |||
log.Error("RetryModelMigrate GetModelMigrateRecordByCloudbrainId err.jobId=%s err=%v", jobId, err) | |||
if models.IsErrRecordNotExist(err) { | |||
return nil | |||
} | |||
return err | |||
} | |||
//只有两种情况可以再次调度,一是虎鲸调度失败 二是本地移桶失败 | |||
if record.CurrentStep == models.GrampusMigrateFailed { | |||
log.Info("retry PostModelMigrate. record.id = %d", record.ID) | |||
_, err := grampus.PostModelMigrate(jobId) | |||
if err != nil { | |||
log.Error("PostModelMigrate err.%v", err) | |||
return err | |||
} | |||
models.IncreaseModelMigrateRetryCount(record.ID) | |||
if err := models.RollBackMigrateStatus(record, models.GrampusMigrating); err != nil { | |||
log.Error("UpdateModelMigrateStatusByStep err.%v", err) | |||
return err | |||
} | |||
return nil | |||
} | |||
if record.CurrentStep == models.BucketMoveFailed { | |||
log.Info("retry BucketMove. record.id = %d", record.ID) | |||
if err := models.RollBackMigrateStatus(record, models.GrampusMigrateSuccess); err != nil { | |||
log.Error("UpdateModelMigrateStatusByStep err.%v", err) | |||
return err | |||
} | |||
models.IncreaseModelMigrateRetryCount(record.ID) | |||
return nil | |||
} | |||
return errors.New("No need to retry,the model migration has been successful or is in the process.") | |||
} | |||
func HandleUnfinishedMigrateRecords() { | |||
list, err := models.GetUnfinishedModelMigrateRecords() | |||
if err != nil { | |||
log.Error("GetUnfinishedModelMigrateRecords err=%v", err) | |||
return | |||
} | |||
for _, r := range list { | |||
HandleUnfinishedMigrateRecord(r) | |||
} | |||
} | |||
func HandleUnfinishedMigrateRecord(r *models.ModelMigrateRecord) error { | |||
cloudbrain, err := models.GetCloudbrainByID(fmt.Sprint(r.CloudbrainID)) | |||
if err != nil { | |||
log.Error("GetCloudbrainByID err. %v", err) | |||
return err | |||
} | |||
lock := redis_lock.NewDistributeLock(redis_key.RecordHandleLock(cloudbrain.JobID)) | |||
success, err := lock.LockWithWait(10*time.Second, 10*time.Second) | |||
if err != nil { | |||
log.Error("HandleUnfinishedMigrateRecord lock err.ID=%d %v", r.ID, err) | |||
return err | |||
} | |||
if !success { | |||
log.Error("HandleUnfinishedMigrateRecord lock failed.ID=%d ", r.ID) | |||
return nil | |||
} | |||
defer lock.UnLock() | |||
//拿到锁以后重新查询一次 | |||
r, err = models.GetModelMigrateRecordById(r.ID) | |||
if err != nil { | |||
log.Error("RetryModelMigrate GetModelMigrateRecordById err.Id=%s err=%v", r.ID, err) | |||
if models.IsErrRecordNotExist(err) { | |||
return nil | |||
} | |||
return err | |||
} | |||
if r.CurrentStep == models.GrampusMigrateInit || r.CurrentStep == models.GrampusMigrating { | |||
if err := UpdateModelMigrateStatusFromGrampus(r, cloudbrain.JobID); err != nil { | |||
log.Error("UpdateModelMigrateStatusFromGrampus err. %v", err) | |||
return err | |||
} | |||
} | |||
if r.CurrentStep == models.GrampusMigrateSuccess { | |||
if err := LocalMigrateOperate(cloudbrain.JobName, cloudbrain.ComputeResource, r); err != nil { | |||
log.Error("LocalMigrateOperate err. %v", err) | |||
return err | |||
} | |||
} | |||
if r.CurrentStep == models.BucketMoving { | |||
//尝试查询NPU结果目录下是否有文件,有文件则认为已经解压成功 | |||
if cloudbrain.ComputeResource == models.NPUResource && IsNPUModelDirHasFile(cloudbrain.JobName, cloudbrain.VersionName) { | |||
TryToUpdateNPUMoveBucketResult(r, cloudbrain.JobName, cloudbrain.VersionName) | |||
} | |||
} | |||
return nil | |||
} | |||
func UpdateModelMigrateStatusFromGrampus(r *models.ModelMigrateRecord, jobId string) error { | |||
res, err := grampus.ModelMigrateInfo(jobId) | |||
if err != nil { | |||
log.Error("ModelMigrateInfo err. r.ID=%d %v", r.ID, err) | |||
return err | |||
} | |||
log.Info("grampus ModelMigrateInfo r.ID=%d res=%+v", r.ID, res) | |||
newStep := models.GrampusMigrateResponse(res.Status).ConvertToModelMigrateStep() | |||
if newStep == r.CurrentStep { | |||
log.Info("The status has not changed. r.ID=%d status=%d", r.ID, res.Status) | |||
return nil | |||
} | |||
err = updateModelMigrateFromRes(r, res) | |||
if err != nil { | |||
log.Error("updateModelMigrateFromRes err. r.ID=%d %v", r.ID, err) | |||
return err | |||
} | |||
return nil | |||
} | |||
func LocalMigrateOperate(jobName, computeSource string, r *models.ModelMigrateRecord) error { | |||
log.Info("Grampus model migrate succeed,objectKey = %s computeSource= %s", r.DestObjectKey, computeSource) | |||
err := models.UpdateModelMigrateStatusByStep(r, models.BucketMoving) | |||
if err != nil { | |||
log.Error("UpdateModelMigrateStatusByStep err. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveFailed, err) | |||
return err | |||
} | |||
if computeSource == models.NPUResource { | |||
//因为NPU的输出会被压缩,因此需要解压+移桶 | |||
decompress(r.DestBucket+"/"+r.DestObjectKey, setting.Bucket+"/"+strings.TrimSuffix(r.DestObjectKey, models.ModelSuffix)) | |||
} else { | |||
//因为调度无法指定桶,所以调度成功后我们还需要移桶 | |||
if setting.UseLocalMinioMigrate { | |||
if err := MoveBucketJust4LocalMinio(r.DestObjectKey, grampus.GetGPUModelObjectKey(jobName), r.DestBucket, setting.Attachment.Minio.Bucket); err != nil { | |||
log.Error("MoveBucketJust4LocalMinio err.%v", err) | |||
if tmpErr := models.UpdateModelMigrateStatusByStep(r, models.BucketMoveFailed); tmpErr != nil { | |||
log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveFailed, tmpErr) | |||
} | |||
return err | |||
} | |||
} else { | |||
if err := MoveBucketInOpenIMinio(r.DestObjectKey, grampus.GetGPUModelObjectKey(jobName), r.DestBucket, setting.Attachment.Minio.Bucket); err != nil { | |||
log.Error("MoveBucketInOpenIMinio err.%v", err) | |||
if tmpErr := models.UpdateModelMigrateStatusByStep(r, models.BucketMoveFailed); tmpErr != nil { | |||
log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveFailed, tmpErr) | |||
} | |||
return err | |||
} | |||
} | |||
if err := models.UpdateModelMigrateStatusByStep(r, models.BucketMoveSuccess); err != nil { | |||
log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveSuccess, err) | |||
} | |||
} | |||
return nil | |||
} | |||
func TryToUpdateNPUMoveBucketResult(record *models.ModelMigrateRecord, jobName, versionName string) error { | |||
if IsNPUModelDirHasFile(jobName, versionName) { | |||
if err := models.UpdateModelMigrateStatusByStep(record, models.BucketMoveSuccess); err != nil { | |||
log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", record.ID, models.BucketMoveSuccess, err) | |||
return err | |||
} | |||
} | |||
return nil | |||
} | |||
func updateModelMigrateFromRes(r *models.ModelMigrateRecord, res *models.GrampusModelMigrateInfoResponse) error { | |||
step := models.GrampusMigrateResponse(res.Status).ConvertToModelMigrateStep() | |||
err := models.UpdateModelMigrateStatusByStep(r, step) | |||
if err != nil { | |||
log.Error("UpdateModelMigrateStatusByStep err,ID=%d err=%v", r.ID, err) | |||
return err | |||
} | |||
r.DestBucket = res.DestBucket | |||
r.DestEndpoint = res.DestEndpoint | |||
r.DestObjectKey = res.DestObjectKey | |||
r.DestProxy = res.DestProxy | |||
r.Remark = strings.TrimPrefix(r.Remark+";"+util.TruncateString(res.FailedReason, 200), ";") | |||
r.SrcBucket = res.SrcBucket | |||
r.SrcEndpoint = res.SrcEndpoint | |||
r.SrcObjectKey = res.SrcObjectKey | |||
err = models.UpdateModelMigrateRecordByStep(r) | |||
if err != nil { | |||
log.Error("updateModelMigrateFromRes UpdateModelMigrateRecord error.id=%d.err=%v", r.ID, err) | |||
return err | |||
} | |||
return nil | |||
} | |||
func MoveBucketInOpenIMinio(objectKeyPrefix, targetObjectPrefix, oldBucket, newBucket string) error { | |||
var core = storage.ScheduleMinioCore | |||
objectInfo := core.Client.ListObjects(oldBucket, objectKeyPrefix, true, nil) | |||
log.Info("MoveBucketInOpenIMinio start.objectKeyPrefix=%s", objectKeyPrefix) | |||
count := 0 | |||
for object := range objectInfo { | |||
count++ | |||
if object.Err != nil { | |||
log.Error("MoveBucketInOpenIMinio object.Err=%v", object.Err) | |||
return object.Err | |||
} | |||
log.Debug("MoveBucketInOpenIMinio object.Key=%s", object.Key) | |||
newObjectKey := strings.Replace(object.Key, objectKeyPrefix, targetObjectPrefix, 1) | |||
err := MoveMinioFileBucket(core, object.Key, newObjectKey, oldBucket, newBucket) | |||
if err != nil { | |||
log.Error("MoveBucketInOpenIMinio MoveMinioFileBucket object.Key=%s Err=%v", object.Key, err) | |||
continue | |||
} | |||
} | |||
log.Info("MoveBucketInOpenIMinio finished.objectKeyPrefix=%s ,total=%d", objectKeyPrefix, count) | |||
return nil | |||
} | |||
func MoveBucketJust4LocalMinio(objectKeyPrefix, targetObjectPrefix, oldBucket, newBucket string) error { | |||
oldPath := path.Join(setting.Attachment.Minio.RealPath, oldBucket, objectKeyPrefix) | |||
newPath := path.Join(setting.Attachment.Minio.RealPath, newBucket, targetObjectPrefix) | |||
log.Info("MoveBucketJust4LocalMinio start.oldPath=%s newPath=%s", oldPath, newPath) | |||
//重命名原有文件夹,防止已有该文件 | |||
err, errStr := sudoMv(newPath, fmt.Sprintf("%s_%d", newPath, time.Now().Unix())) | |||
if err != nil { | |||
log.Error("MoveBucketJust4LocalMinio sudoMv error.oldPath=%s newPath=%s Err=%v errStr=%s ", oldPath, newPath, err, errStr) | |||
} | |||
//移动(重命名)文件夹 | |||
err, errStr = sudoMv(oldPath, newPath) | |||
if err != nil { | |||
log.Error("MoveBucketJust4LocalMinio sudoMv error.oldPath=%s newPath=%s Err=%v errStr=%s ", oldPath, newPath, err, errStr) | |||
return err | |||
} | |||
log.Info("MoveBucketInOpenIMinio finished.oldPath=%s newPath=%s ", oldPath, newPath) | |||
return nil | |||
} | |||
func sudoMv(oldPath, newPath string) (error, string) { | |||
c := fmt.Sprintf("sudo mv %s %s", oldPath, newPath) | |||
log.Info("start to sudoMv,oldPath=%s newPath=%s", oldPath, newPath) | |||
cmd := exec.Command("/bin/sh", "-c", c) | |||
var stdout, stderr bytes.Buffer | |||
cmd.Stdout = &stdout // 标准输出 | |||
cmd.Stderr = &stderr // 标准错误 | |||
err := cmd.Run() | |||
outStr, errStr := string(stdout.Bytes()), string(stderr.Bytes()) | |||
log.Debug("out:\n%s\nerr:\n%s\n", outStr, errStr) | |||
if err != nil { | |||
log.Error("cmd.Run() failed,oldPath=%s newPath=%s err=%v\n", oldPath, newPath, err) | |||
return err, errStr | |||
} | |||
return nil, errStr | |||
} | |||
func MoveMinioFileBucket(core *minio.Core, oldObjectKey, newObjectKey, oldBucket, newBucket string) error { | |||
_, err := core.CopyObject(oldBucket, oldObjectKey, newBucket, newObjectKey, map[string]string{}) | |||
if err != nil { | |||
log.Error("MoveBucketInOpenIMinio CopyObject err oldObjectKey=%s .%v", oldObjectKey, err) | |||
return err | |||
} | |||
err = core.RemoveObject(oldBucket, oldObjectKey) | |||
if err != nil { | |||
log.Error("MoveBucketInOpenIMinio RemoveObject err oldObjectKey=%s .%v", oldObjectKey, err) | |||
} | |||
return err | |||
} | |||
type DecompressReq struct { | |||
SourceFile string `json:"source_file"` | |||
DestPath string `json:"dest_path"` | |||
} | |||
func decompress(sourceFile, destPath string) { | |||
req, _ := json.Marshal(DecompressReq{ | |||
SourceFile: sourceFile, | |||
DestPath: destPath, | |||
}) | |||
err := labelmsg.SendDecompressAttachToLabelOBS(string(req)) | |||
if err != nil { | |||
log.Error("SendDecompressTask to labelsystem (%s) failed:%s", sourceFile, err.Error()) | |||
} | |||
} | |||
func IsNPUModelDirHasFile(jobName string, versionName string) bool { | |||
prefix := strings.TrimPrefix(path.Join(setting.TrainJobModelPath, jobName, setting.OutPutPath, versionName), "/") | |||
if !strings.HasSuffix(prefix, "/") { | |||
prefix += "/" | |||
} | |||
fileInfos, err := storage.GetOneLevelAllObjectUnderDir(setting.Bucket, prefix, "") | |||
if err != nil { | |||
log.Info("IsNPUModelDirHasFile.get TrainJobListModel failed:", err) | |||
return false | |||
} | |||
if len(fileInfos) > 0 { | |||
return true | |||
} | |||
return len(fileInfos) > 0 | |||
} |
@@ -1,93 +0,0 @@ | |||
package cloudbrainTask | |||
import ( | |||
"code.gitea.io/gitea/models" | |||
"code.gitea.io/gitea/modules/log" | |||
"code.gitea.io/gitea/modules/setting" | |||
"code.gitea.io/gitea/modules/storage" | |||
"errors" | |||
"path" | |||
"strings" | |||
) | |||
func GetModelScheduleStatus(jobId string) (models.ModelScheduleStatus, error) { | |||
job, err := models.GetCloudbrainByJobID(jobId) | |||
if err != nil { | |||
log.Error("GetModelScheduleStatus GetCloudbrainByJobID err.jobId=%s err=%v", jobId, err) | |||
return 0, errors.New("jobId not correct") | |||
} | |||
if !job.IsTerminal() { | |||
log.Info("GetModelScheduleStatus job is not terminal.jobId=%s", jobId) | |||
return models.ModelScheduleWaiting, nil | |||
} | |||
record, err := models.GetScheduleRecordByCloudbrainID(job.ID) | |||
if err != nil { | |||
log.Error("GetModelScheduleStatus GetScheduleRecordByCloudbrainID err.jobId=%s err=%v", jobId, err) | |||
if models.IsErrRecordNotExist(err) { | |||
return models.ModelScheduleSucceed, nil | |||
} | |||
return models.ModelScheduleFailed, err | |||
} | |||
switch record.Status { | |||
case models.StorageUrchinScheduleWaiting: | |||
return models.ModelScheduleWaiting, nil | |||
case models.StorageUrchinScheduleProcessing: | |||
return models.ModelScheduleOperating, nil | |||
case models.StorageUrchinScheduleFailed: | |||
return models.ModelScheduleFailed, nil | |||
case models.StorageUrchinNoFile: | |||
return models.ModelScheduleSucceed, nil | |||
case models.StorageUrchinScheduleSucceed: | |||
moveStatus, err := GetMoveBucketStatus(record, job.JobName, job.VersionName) | |||
if err != nil { | |||
log.Error("GetMoveBucketStatus err.%v", err) | |||
return models.ModelScheduleFailed, err | |||
} | |||
switch moveStatus { | |||
case models.MoveBucketSucceed: | |||
return models.ModelScheduleSucceed, nil | |||
case models.MoveBucketOperating: | |||
return models.ModelScheduleOperating, nil | |||
case models.MoveBucketFailed: | |||
return models.ModelScheduleFailed, nil | |||
} | |||
} | |||
return models.ModelScheduleFailed, nil | |||
} | |||
func GetMoveBucketStatus(record *models.ScheduleRecord, jobName, versionName string) (int, error) { | |||
if record.ComputeSource == models.GPUResource || record.ComputeSource == models.GCUResource { | |||
return record.LocalOperateStatus, nil | |||
} | |||
if record.LocalOperateStatus != models.MoveBucketOperating { | |||
return record.LocalOperateStatus, nil | |||
} | |||
//由于NPU回传后还有异步的解压,所以对于进行中的状态需要进一步查询是否已解压结束 | |||
//判断方法是查询模型目录是否有文件 | |||
if IsNPUModelDirHasFile(jobName, versionName) { | |||
models.UpdateScheduleLocalOperateStatus(record, models.MoveBucketSucceed) | |||
return models.MoveBucketSucceed, nil | |||
} | |||
return record.LocalOperateStatus, nil | |||
} | |||
func IsNPUModelDirHasFile(jobName string, versionName string) bool { | |||
prefix := strings.TrimPrefix(path.Join(setting.TrainJobModelPath, jobName, setting.OutPutPath, versionName), "/") | |||
if !strings.HasSuffix(prefix, "/") { | |||
prefix += "/" | |||
} | |||
fileInfos, err := storage.GetOneLevelAllObjectUnderDir(setting.Bucket, prefix, "") | |||
if err != nil { | |||
log.Info("IsNPUModelDirHasFile.get TrainJobListModel failed:", err) | |||
return false | |||
} | |||
if len(fileInfos) > 0 { | |||
return true | |||
} | |||
return len(fileInfos) > 0 | |||
} |
@@ -13,8 +13,6 @@ import ( | |||
"strconv" | |||
"strings" | |||
"code.gitea.io/gitea/modules/urfs_client/urchin" | |||
"code.gitea.io/gitea/modules/timeutil" | |||
"code.gitea.io/gitea/modules/notification" | |||
@@ -1087,9 +1085,6 @@ func SyncTaskStatus(task *models.Cloudbrain) error { | |||
task.CorrectCreateUnix() | |||
if oldStatus != task.Status { | |||
notification.NotifyChangeCloudbrainStatus(task, oldStatus) | |||
if models.IsTrainJobTerminal(task.Status) && len(result.JobInfo.Tasks[0].CenterID) == 1 { | |||
go urchin.GetAITaskOutPutBack(task.ID, task.JobName, result.JobInfo.Tasks[0].CenterID[0], task.ComputeResource) | |||
} | |||
} | |||
err = models.UpdateJob(task) | |||
if err != nil { | |||
@@ -389,7 +389,7 @@ | |||
function parseLog() { | |||
let jsonValue = document.getElementById("json_value").value; | |||
let jsonObj = JSON.parse(jsonValue); | |||
let jsonObj = jsonValue&&JSON.parse(jsonValue); | |||
let podRoleName = jsonObj["podRoleName"]; | |||
let html = ""; | |||
if (podRoleName != null) { | |||
@@ -94,7 +94,7 @@ | |||
{{if eq .ComputeResource "CPU/GPU"}} | |||
<a class="item run_info" data-tab="five{{$k}}" data-version="{{.VersionName}}">{{$.i18n.Tr "repo.cloudbrain.runinfo"}}</a> | |||
{{end}} | |||
<a class="item load-model-file" data-tab="third{{$k}}" data-download-flag="{{$.canDownload}}" data-path="{{$.RepoLink}}/modelarts/train-job/{{.JobID}}/model_list" data-version="{{.VersionName}}" data-parents="" data-filename="" data-init="init" >{{$.i18n.Tr "repo.model_download"}}</a> | |||
<a class="item load-model-file" data-tab="third{{$k}}" data-can-reschedule="{{$.canReschedule}}" data-retry-path="{{$.RepoLink}}/cloudbrain/train-job/{{.JobID}}/model/reschedule" data-download-flag="{{$.canDownload}}" data-path="{{$.RepoLink}}/modelarts/train-job/{{.JobID}}/model_list" data-version="{{.VersionName}}" data-parents="" data-filename="" data-init="init" >{{$.i18n.Tr "repo.model_download"}}</a> | |||
</div> | |||
<div class="ui tab active" data-tab="first{{$k}}"> | |||
<div style="padding-top: 10px;"> | |||
@@ -587,6 +587,7 @@ | |||
}); | |||
$('td.ti-text-form-content.spec').text(specStr); | |||
})(); | |||
console.log({{.version_list_task}}) | |||
var setting = { | |||
check: { | |||
enable: true, | |||
@@ -565,6 +565,7 @@ export default async function initCloudrainSow() { | |||
activeTab.trigger('click'); | |||
} | |||
// | |||
$(".content-pad").on("click", ".load-model-file", function () { | |||
let downloadFlag = $(this).data("download-flag") || ""; | |||
let gpuFlag = $(this).data("gpu-flag") || ""; | |||
@@ -573,9 +574,12 @@ export default async function initCloudrainSow() { | |||
let filename = $(this).data("filename"); | |||
let init = $(this).data("init") || ""; | |||
let path = $(this).data("path"); | |||
let retryPath = `/api/v1/repos${$(this).data("retry-path")}`; | |||
const rescheduleFlag = $(this).data("can-reschedule") || ""; | |||
$(`#dir_list${version_name}`).empty(); | |||
let url = `/api/v1/repos${path}?version_name=${version_name}&parentDir=${parents}`; | |||
$.get(url, (data) => { | |||
if (data.StatusOK == 0) { // 成功 0 | |||
if (data.Dirs) { | |||
data.Dirs.length !==0 && $(`#${version_name}-result-down`).show() | |||
@@ -595,6 +599,7 @@ export default async function initCloudrainSow() { | |||
$(`#file_breadcrumb${version_name}`).append(htmlBread); | |||
} else { | |||
renderBrend( | |||
this, | |||
path, | |||
version_name, | |||
parents, | |||
@@ -628,12 +633,23 @@ export default async function initCloudrainSow() { | |||
</div>`); | |||
} else if (data.StatusOK == 2) { // 失败 2 | |||
$(`#file_breadcrumb${version_name}`).empty(); | |||
if (rescheduleFlag) { | |||
$(`#dir_list${version_name}`).html(`<div style="height:200px;display:flex;justify-content:center;align-items:center;font-size:14px;color:rgb(16, 16, 16);"> | |||
<div style="display:flex;justify-content:center;align-items:center;height:24px;width:24px;margin-right:5px;"> | |||
<svg xmlns="http://www.w3.org/2000/svg" class="styles__StyledSVGIconPathComponent-sc-16fsqc8-0 iKfgJk svg-icon-path-icon fill" viewBox="64 64 896 896" width="16" height="16"><defs data-reactroot=""></defs><g><path d="M464 720a48 48 0 1 0 96 0 48 48 0 1 0-96 0zm16-304v184c0 4.4 3.6 8 8 8h48c4.4 0 8-3.6 8-8V416c0-4.4-3.6-8-8-8h-48c-4.4 0-8 3.6-8 8zm475.7 440l-416-720c-6.2-10.7-16.9-16-27.7-16s-21.6 5.3-27.7 16l-416 720C56 877.4 71.4 904 96 904h832c24.6 0 40-26.6 27.7-48zm-783.5-27.9L512 239.9l339.8 588.2H172.2z"></path></g></svg> | |||
</div> | |||
<span>${i18n['file_sync_fail']}</span> | |||
<a href="javascript:void(0)" id="retry_result" style='text-decoration: underline;margin-left:0.5rem'>${i18n['retrieve_results']}</a> | |||
</div>`); | |||
} | |||
else { | |||
$(`#dir_list${version_name}`).html(`<div style="height:200px;display:flex;justify-content:center;align-items:center;font-size:14px;color:rgb(16, 16, 16);"> | |||
<div style="display:flex;justify-content:center;align-items:center;height:24px;width:24px;margin-right:5px;"> | |||
<svg xmlns="http://www.w3.org/2000/svg" class="styles__StyledSVGIconPathComponent-sc-16fsqc8-0 iKfgJk svg-icon-path-icon fill" viewBox="64 64 896 896" width="16" height="16"><defs data-reactroot=""></defs><g><path d="M464 720a48 48 0 1 0 96 0 48 48 0 1 0-96 0zm16-304v184c0 4.4 3.6 8 8 8h48c4.4 0 8-3.6 8-8V416c0-4.4-3.6-8-8-8h-48c-4.4 0-8 3.6-8 8zm475.7 440l-416-720c-6.2-10.7-16.9-16-27.7-16s-21.6 5.3-27.7 16l-416 720C56 877.4 71.4 904 96 904h832c24.6 0 40-26.6 27.7-48zm-783.5-27.9L512 239.9l339.8 588.2H172.2z"></path></g></svg> | |||
</div> | |||
<span>${i18n['file_sync_fail']}</span> | |||
</div>`); | |||
} | |||
} else if (data.StatusOK == 3) { // 等待同步 3 | |||
$(`#file_breadcrumb${version_name}`).empty(); | |||
$(`#dir_list${version_name}`).html(`<div style="height:200px;display:flex;justify-content:center;align-items:center;font-size:14px;color:rgb(16, 16, 16);"> | |||
@@ -651,10 +667,21 @@ export default async function initCloudrainSow() { | |||
<span>${i18n['no_file_to_download']}</span> | |||
</div>`); | |||
} | |||
$('#retry_result').on('click', function () { | |||
$.post(retryPath, (data) => { | |||
if (data.code === 0) { | |||
$('.load-model-file').trigger('click'); | |||
} | |||
}).fail(function (err) { | |||
console.log(err); | |||
}); | |||
}) | |||
}).fail(function (err) { | |||
console.log(err, version_name); | |||
}); | |||
}); | |||
function renderSize(value) { | |||
if (null == value || value == "") { | |||
return "0 Bytes"; | |||
@@ -678,6 +705,7 @@ export default async function initCloudrainSow() { | |||
return size + unitArr[index]; | |||
} | |||
function renderBrend( | |||
that, | |||
path, | |||
version_name, | |||
parents, | |||
@@ -711,15 +739,9 @@ export default async function initCloudrainSow() { | |||
} else { | |||
$(`input[name=model${version_name}]`).val(parents); | |||
$(`input[name=modelback${version_name}]`).val(filename); | |||
let selectEle = $(`#file_breadcrumb${version_name} a.section`).filter( | |||
(index, item) => { | |||
return item.text == filename; | |||
} | |||
); | |||
selectEle.nextAll().remove(); | |||
selectEle.after("<div class='divider'> / </div>"); | |||
selectEle.replaceWith(`<div class='active section'>${filename}</div>`); | |||
$(that).nextAll().remove(); | |||
$(that).after("<div class='divider'> / </div>"); | |||
$(that).replaceWith(`<div class='active section'>${filename}</div>`); | |||
} | |||
} | |||
@@ -75,6 +75,7 @@ export const i18nVue = { | |||
file_sync_fail:"文件同步失败", | |||
no_file_to_download:"没有文件可以下载,稍后再来看看", | |||
task_not_finished: "任务还未结束,稍后再来看看", | |||
retrieve_results: "重新获取结果", | |||
local:"本地", | |||
online:"线上", | |||
modify:"修改", | |||
@@ -216,6 +217,7 @@ export const i18nVue = { | |||
file_sync_fail:"File synchronization failed", | |||
no_file_to_download:"No files can be downloaded", | |||
task_not_finished: "Task not finished yet, please wait", | |||
retrieve_results: "Retrieve results", | |||
local:"Local", | |||
online:"Online", | |||
modify:"Modify", | |||
Dear OpenI User
Thank you for your continuous support to the Openl Qizhi Community AI Collaboration Platform. In order to protect your usage rights and ensure network security, we updated the Openl Qizhi Community AI Collaboration Platform Usage Agreement in January 2024. The updated agreement specifies that users are prohibited from using intranet penetration tools. After you click "Agree and continue", you can continue to use our services. Thank you for your cooperation and understanding.
For more agreement content, please refer to the《Openl Qizhi Community AI Collaboration Platform Usage Agreement》