@@ -2,6 +2,18 @@ package schedule
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"os/exec"
"path"
"strings"
"time"
"code.gitea.io/gitea/modules/modelarts"
"code.gitea.io/gitea/modules/obs"
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/grampus"
"code.gitea.io/gitea/modules/labelmsg"
@@ -11,14 +23,7 @@ import (
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/storage"
"code.gitea.io/gitea/modules/util"
"encoding/json"
"errors"
"fmt"
"github.com/minio/minio-go"
"os/exec"
"path"
"strings"
"time"
)
const NPUModelDefaultName = "models.zip"
@@ -205,6 +210,12 @@ func LocalMigrateOperate(jobName, computeSource string, r *models.ModelMigrateRe
}
if computeSource == models.NPUResource {
//因为NPU的输出会被压缩,因此需要解压+移桶
if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.OutputPath); err != nil {
log.Error("Failed to obsMkdir_output: %s (%v)", jobName, err)
return err
}
log.Info("DestObjectKey", r.DestObjectKey)
if strings.Contains(r.DestObjectKey, ".") {
decompress(r.DestBucket+"/"+r.DestObjectKey, setting.Bucket+"/"+strings.TrimSuffix(r.DestObjectKey, models.ModelSuffix))
@@ -216,7 +227,12 @@ func LocalMigrateOperate(jobName, computeSource string, r *models.ModelMigrateRe
}
for _, fileInfo := range fileInfos {
decompress(r.DestBucket+"/"+r.DestObjectKey+"/"+fileInfo.FileName, setting.Bucket+"/"+strings.TrimSuffix(r.DestObjectKey, models.ModelSuffix))
log.Info("decompress file:", fileInfo.FileName)
sourceFilPath := r.DestBucket + "/" + r.DestObjectKey + fileInfo.FileName
if !strings.HasSuffix(r.DestObjectKey, "/") {
sourceFilPath = r.DestBucket + "/" + r.DestObjectKey + "/" + fileInfo.FileName
}
decompress(sourceFilPath, setting.Bucket+"/"+strings.TrimSuffix(r.DestObjectKey, models.ModelSuffix))
}
}
@@ -248,6 +264,19 @@ func LocalMigrateOperate(jobName, computeSource string, r *models.ModelMigrateRe
return nil
}
func obsMkdir(dir string) error {
input := &obs.PutObjectInput{}
input.Bucket = setting.Bucket
input.Key = dir
_, err := storage.ObsCli.PutObject(input)
if err != nil {
log.Error("PutObject(%s) failed: %s", input.Key, err.Error())
return err
}
return nil
}
func TryToUpdateNPUMoveBucketResult(record *models.ModelMigrateRecord, jobName, versionName string) error {
if IsNPUModelDirHasFile(jobName, versionName) {
if err := models.UpdateModelMigrateStatusByStep(record, models.BucketMoveSuccess); err != nil {