@@ -71,16 +71,17 @@ func GetGPUDataBack(cloudbrainID int64, jobName, centerId string) error {
log.Error("ScheduleDataToPeerByKey failed info is EndPoint:%s,Bucket:%s,ObjectKey:%s,ProxyServer:%s,TargetObjectKey:%s,error:%v",
endpoint, bucket, objectKey, destPeerHost, grampus.GetGPUModelObjectKey(jobName), err)
_, err = models.InsertScheduleRecord(&models.ScheduleRecord{
CloudbrainID: cloudbrainID,
EndPoint: endpoint,
Bucket: bucket,
ObjectKey: objectKey,
ProxyServer: destPeerHost,
Status: models.StorageScheduleFailed,
IsDir: true,
ComputeSource: models.GPUResource,
TargetObjectKey: grampus.GetGPUModelObjectKey(jobName),
Remark: interceptErrorMessages(err),
CloudbrainID: cloudbrainID,
EndPoint: endpoint,
Bucket: bucket,
ObjectKey: objectKey,
ProxyServer: destPeerHost,
Status: models.StorageUrchinScheduleFailed,
IsDir: true,
ComputeSource: models.GPUResource,
TargetObjectKey: grampus.GetGPUModelObjectKey(jobName),
Remark: interceptErrorMessages(err),
LocalOperateStatus: models.MoveBucketWaiting,
})
if err != nil {
log.Error("InsertScheduleRecord failed:%v", err)
@@ -90,15 +91,16 @@ func GetGPUDataBack(cloudbrainID int64, jobName, centerId string) error {
}
_, err = models.InsertScheduleRecord(&models.ScheduleRecord{
CloudbrainID: cloudbrainID,
EndPoint: endpoint,
Bucket: bucket,
ObjectKey: objectKey,
ProxyServer: destPeerHost,
Status: res.StatusCode,
IsDir: true,
ComputeSource: models.GPUResource,
TargetObjectKey: grampus.GetGPUModelObjectKey(jobName),
CloudbrainID: cloudbrainID,
EndPoint: endpoint,
Bucket: bucket,
ObjectKey: objectKey,
ProxyServer: destPeerHost,
Status: res.StatusCode,
IsDir: true,
ComputeSource: models.GPUResource,
TargetObjectKey: grampus.GetGPUModelObjectKey(jobName),
LocalOperateStatus: models.MoveBucketWaiting,
})
if err != nil {
log.Error("InsertScheduleRecord failed:%v", err)
@@ -148,15 +150,16 @@ func GetGCUDataBack(cloudbrainID int64, jobName, centerId string) error {
log.Error("ScheduleDataToPeerByKey failed info is EndPoint:%s,Bucket:%s,ObjectKey:%s,ProxyServer:%s,TargetObjectKey:%s,error:%v",
endpoint, bucket, objectKey, destPeerHost, grampus.GetGPUModelObjectKey(jobName), err)
_, err = models.InsertScheduleRecord(&models.ScheduleRecord{
CloudbrainID: cloudbrainID,
EndPoint: endpoint,
Bucket: bucket,
ObjectKey: objectKey,
ProxyServer: destPeerHost,
Status: models.StorageScheduleFailed,
IsDir: true,
ComputeSource: models.GCUResource,
TargetObjectKey: grampus.GetGPUModelObjectKey(jobName),
CloudbrainID: cloudbrainID,
EndPoint: endpoint,
Bucket: bucket,
ObjectKey: objectKey,
ProxyServer: destPeerHost,
Status: models.StorageUrchinScheduleFailed,
IsDir: true,
ComputeSource: models.GCUResource,
TargetObjectKey: grampus.GetGPUModelObjectKey(jobName),
LocalOperateStatus: models.MoveBucketWaiting,
})
if err != nil {
log.Error("InsertScheduleRecord failed:%v", err)
@@ -166,15 +169,16 @@ func GetGCUDataBack(cloudbrainID int64, jobName, centerId string) error {
}
_, err = models.InsertScheduleRecord(&models.ScheduleRecord{
CloudbrainID: cloudbrainID,
EndPoint: endpoint,
Bucket: bucket,
ObjectKey: objectKey,
ProxyServer: destPeerHost,
Status: res.StatusCode,
IsDir: true,
ComputeSource: models.GCUResource,
TargetObjectKey: grampus.GetGPUModelObjectKey(jobName),
CloudbrainID: cloudbrainID,
EndPoint: endpoint,
Bucket: bucket,
ObjectKey: objectKey,
ProxyServer: destPeerHost,
Status: res.StatusCode,
IsDir: true,
ComputeSource: models.GCUResource,
TargetObjectKey: grampus.GetGPUModelObjectKey(jobName),
LocalOperateStatus: models.MoveBucketWaiting,
})
if err != nil {
log.Error("InsertScheduleRecord failed:%v", err)
@@ -224,15 +228,16 @@ func GetNPUDataBack(cloudbrainID int64, jobName, centerId string) error {
log.Error("ScheduleDataToPeerByKey failed after retrying, errorInfo is EndPoint:%s,Bucket:%s,ObjectKey:%s,ProxyServer:%s,error:%v",
endpoint, bucket, objectKey, destPeerHost, err)
_, err = models.InsertScheduleRecord(&models.ScheduleRecord{
CloudbrainID: cloudbrainID,
EndPoint: endpoint,
Bucket: bucket,
ObjectKey: objectKey,
ProxyServer: destPeerHost,
Status: models.StorageScheduleFailed,
IsDir: false,
ComputeSource: models.NPUResource,
Remark: interceptErrorMessages(err),
CloudbrainID: cloudbrainID,
EndPoint: endpoint,
Bucket: bucket,
ObjectKey: objectKey,
ProxyServer: destPeerHost,
Status: models.StorageUrchinScheduleFailed,
IsDir: false,
ComputeSource: models.NPUResource,
Remark: interceptErrorMessages(err),
LocalOperateStatus: models.MoveBucketWaiting,
})
if err != nil {
log.Error("InsertScheduleRecord failed:%v", err)
@@ -242,14 +247,15 @@ func GetNPUDataBack(cloudbrainID int64, jobName, centerId string) error {
}
_, err = models.InsertScheduleRecord(&models.ScheduleRecord{
CloudbrainID: cloudbrainID,
EndPoint: endpoint,
Bucket: bucket,
ObjectKey: objectKey,
ProxyServer: destPeerHost,
Status: res.StatusCode,
IsDir: false,
ComputeSource: models.NPUResource,
CloudbrainID: cloudbrainID,
EndPoint: endpoint,
Bucket: bucket,
ObjectKey: objectKey,
ProxyServer: destPeerHost,
Status: res.StatusCode,
IsDir: false,
ComputeSource: models.NPUResource,
LocalOperateStatus: models.MoveBucketWaiting,
})
if err != nil {
log.Error("InsertScheduleRecord failed:%v", err)
@@ -368,22 +374,24 @@ func HandleScheduleRecords() error {
func handleScheduleResult(record *models.ScheduleRecord, res *PeerResult) error {
var err error
switch res.StatusCode {
case models.StorageScheduleSucceed:
case models.StorageUrchin ScheduleSucceed:
log.Info("ScheduleDataToPeerByKey(%s) succeed", record.ObjectKey)
models.UpdateScheduleLocalOperateStatus(record, models.StorageLocal Operating)
models.UpdateScheduleLocalOperateStatus(record, models.MoveBucket Operating)
if record.ComputeSource == models.GPUResource || record.ComputeSource == models.GCUResource {
err = MoveBucketInOpenIMinio(res.DataPath, record.TargetObjectKey, res.DataRoot, setting.Attachment.Minio.Bucket)
if err != nil {
models.UpdateScheduleLocalOperateStatus(record, models.MoveBucketFailed)
log.Error("GetBackModel MoveBucketInOpenIMinio err.%v", err)
return err
}
models.UpdateScheduleLocalOperateStatus(record, models.MoveBucketSucceed)
} else {
decompress(record.Bucket+"/"+record.ObjectKey, setting.Bucket+"/"+strings.TrimSuffix(record.ObjectKey, models.ModelSuffix))
}
case models.StorageScheduleProcessing:
case models.StorageUrchin ScheduleProcessing:
log.Info("ScheduleDataToPeerByKey(%s) processing", record.ObjectKey)
case models.StorageScheduleFailed:
case models.StorageUrchin ScheduleFailed:
log.Error("ScheduleDataToPeerByKey(%s) failed:%s", record.ObjectKey, res.StatusMsg)
default: