#1961 解决数据集上传相关问题及速度优化

Merged
ychao_1983 merged 8 commits from fix-1934 into V20220428 2 years ago
  1. +47
    -38
      modules/storage/obs.go
  2. +0
    -43
      routers/repo/attachment.go
  3. +1
    -2
      routers/routes/routes.go
  4. +10
    -4
      web_src/js/components/MinioUploader.vue
  5. +0
    -484
      web_src/js/components/ObsUploader.vue

+ 47
- 38
modules/storage/obs.go View File

@@ -59,21 +59,55 @@ func ObsHasObject(path string) (bool, error) {
return hasObject, nil
}

func listAllParts(uuid, uploadID, key string) (output *obs.ListPartsOutput, err error) {
output = &obs.ListPartsOutput{}
partNumberMarker := 0
for {
temp, err := ObsCli.ListParts(&obs.ListPartsInput{
Bucket: setting.Bucket,
Key: key,
UploadId: uploadID,
MaxParts: MAX_LIST_PARTS,
PartNumberMarker: partNumberMarker,
})
if err != nil {
log.Error("ListParts failed:", err.Error())
return output, err
}

partNumberMarker = temp.NextPartNumberMarker
log.Info("uuid:%s, MaxParts:%d, PartNumberMarker:%d, NextPartNumberMarker:%d, len:%d", uuid, temp.MaxParts, temp.PartNumberMarker, temp.NextPartNumberMarker, len(temp.Parts))

for _, partInfo := range temp.Parts {
output.Parts = append(output.Parts, obs.Part{
PartNumber: partInfo.PartNumber,
ETag: partInfo.ETag,
})
}

if len(temp.Parts) < temp.MaxParts {
break
} else {
continue
}

break
}

return output, nil
}

func GetObsPartInfos(uuid, uploadID, fileName string) (string, error) {
key := strings.TrimPrefix(path.Join(setting.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid, fileName)), "/")

output, err := ObsCli.ListParts(&obs.ListPartsInput{
Bucket: setting.Bucket,
Key: key,
UploadId: uploadID,
})
allParts, err := listAllParts(uuid, uploadID, key)
if err != nil {
log.Error("ListParts failed:", err.Error())
log.Error("listAllParts failed: %v", err)
return "", err
}

var chunks string
for _, partInfo := range output.Parts {
for _, partInfo := range allParts.Parts {
chunks += strconv.Itoa(partInfo.PartNumber) + "-" + partInfo.ETag + ","
}

@@ -100,39 +134,14 @@ func CompleteObsMultiPartUpload(uuid, uploadID, fileName string) error {
input.Key = strings.TrimPrefix(path.Join(setting.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid, fileName)), "/")
input.UploadId = uploadID

partNumberMarker := 0
for {
output, err := ObsCli.ListParts(&obs.ListPartsInput{
Bucket: setting.Bucket,
Key: input.Key,
UploadId: uploadID,
MaxParts: MAX_LIST_PARTS,
PartNumberMarker: partNumberMarker,
})
if err != nil {
log.Error("ListParts failed:", err.Error())
return err
}

partNumberMarker = output.NextPartNumberMarker
log.Info("uuid:%s, MaxParts:%d, PartNumberMarker:%d, NextPartNumberMarker:%d, len:%d", uuid, output.MaxParts, output.PartNumberMarker, output.NextPartNumberMarker, len(output.Parts))

for _, partInfo := range output.Parts {
input.Parts = append(input.Parts, obs.Part{
PartNumber: partInfo.PartNumber,
ETag: partInfo.ETag,
})
}

if len(output.Parts) < output.MaxParts {
break
} else {
continue
}

break
allParts, err := listAllParts(uuid, uploadID, input.Key)
if err != nil {
log.Error("listAllParts failed: %v", err)
return err
}

input.Parts = allParts.Parts

output, err := ObsCli.CompleteMultipartUpload(input)
if err != nil {
log.Error("CompleteMultipartUpload failed:", err.Error())


+ 0
- 43
routers/repo/attachment.go View File

@@ -11,7 +11,6 @@ import (
"fmt"
"mime/multipart"
"net/http"
"path"
"strconv"
"strings"

@@ -830,20 +829,6 @@ func GetMultipartUploadUrl(ctx *context.Context) {
})
}

func GetObsKey(ctx *context.Context) {
uuid := gouuid.NewV4().String()
key := strings.TrimPrefix(path.Join(setting.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid, uuid)), "/")

ctx.JSON(200, map[string]string{
"uuid": uuid,
"key": key,
"access_key_id": setting.AccessKeyID,
"secret_access_key": setting.SecretAccessKey,
"server": setting.Endpoint,
"bucket": setting.Bucket,
})
}

func CompleteMultipart(ctx *context.Context) {
uuid := ctx.Query("uuid")
uploadID := ctx.Query("uploadID")
@@ -947,34 +932,6 @@ func CompleteMultipart(ctx *context.Context) {
})
}

func UpdateMultipart(ctx *context.Context) {
uuid := ctx.Query("uuid")
partNumber := ctx.QueryInt("chunkNumber")
etag := ctx.Query("etag")

fileChunk, err := models.GetFileChunkByUUID(uuid)
if err != nil {
if models.IsErrFileChunkNotExist(err) {
ctx.Error(404)
} else {
ctx.ServerError("GetFileChunkByUUID", err)
}
return
}

fileChunk.CompletedParts = append(fileChunk.CompletedParts, strconv.Itoa(partNumber)+"-"+strings.Replace(etag, "\"", "", -1))

err = models.UpdateFileChunk(fileChunk)
if err != nil {
ctx.Error(500, fmt.Sprintf("UpdateFileChunk: %v", err))
return
}

ctx.JSON(200, map[string]string{
"result_code": "0",
})
}

func HandleUnDecompressAttachment() {
attachs, err := models.GetUnDecompressAttachments()
if err != nil {


+ 1
- 2
routers/routes/routes.go View File

@@ -608,12 +608,11 @@ func RegisterRoutes(m *macaron.Macaron) {
m.Put("/obs_proxy_multipart", repo.PutOBSProxyUpload)
m.Get("/obs_proxy_download", repo.GetOBSProxyDownload)
m.Get("/get_multipart_url", repo.GetMultipartUploadUrl)
m.Post("/complete_multipart", repo.CompleteMultipart)
m.Post("/update_chunk", repo.UpdateMultipart)
}, reqSignIn)

m.Group("/attachments", func() {
m.Post("/decompress_done_notify", repo.UpdateAttachmentDecompressState)
m.Post("/complete_multipart", repo.CompleteMultipart)
})

m.Group("/attachments", func() {


+ 10
- 4
web_src/js/components/MinioUploader.vue View File

@@ -27,6 +27,7 @@ import createDropzone from '../features/dropzone.js';

const {_AppSubUrl, _StaticUrlPrefix, csrf} = window.config;
const chunkSize = 1024 * 1024 * 64;
const md5ChunkSize = 1024 * 1024 * 1;

export default {
props:{
@@ -190,10 +191,9 @@ export default {
let currentChunk = 0;

const time = new Date().getTime();
// console.log('计算MD5...')
this.status = this.dropzoneParams.data('md5-computing');
file.totalChunkCounts = chunks;
loadNext();
loadMd5Next();

fileReader.onload = (e) => {
fileLoaded.call(this, e);
@@ -207,13 +207,12 @@ export default {
spark.append(e.target.result); // Append array buffer
currentChunk++;
if (currentChunk < chunks) {
// console.log(`第${currentChunk}分片解析完成, 开始第${currentChunk +1}/${chunks}分片解析`);
this.status = `${this.dropzoneParams.data('loading-file')} ${(
(currentChunk / chunks) *
100
).toFixed(2)}% (${currentChunk}/${chunks})`;
this.updateProgress(file, ((currentChunk / chunks) * 100).toFixed(2));
loadNext();
loadMd5Next();
return;
}

@@ -235,6 +234,13 @@ export default {
start + chunkSize >= file.size ? file.size : start + chunkSize;
fileReader.readAsArrayBuffer(blobSlice.call(file, start, end));
}

function loadMd5Next() {
const start = currentChunk * chunkSize;
const end =
start + md5ChunkSize >= file.size ? file.size : start + md5ChunkSize;
fileReader.readAsArrayBuffer(blobSlice.call(file, start, end));
}
},

async computeMD5Success(md5edFile) {


+ 0
- 484
web_src/js/components/ObsUploader.vue View File

@@ -1,484 +0,0 @@
<template>
<div class="dropzone-wrapper dataset-files">
<div
id="dataset"
class="dropzone"
/>
<p class="upload-info">
{{ file_status_text }}
<strong class="success text red">{{ status }}</strong>
</p>
<p>说明:<br>
- 只有zip格式的数据集才能发起云脑任务;<br>
- 云脑1提供 <span class="text blue">CPU / GPU</span> 资源,云脑2提供 <span class="text blue">Ascend NPU</span> 资源;调试使用的数据集也需要上传到对应的环境。
</p>
</div>
</template>

<script>
/* eslint-disable eqeqeq */
// import Dropzone from 'dropzone/dist/dropzone.js';
// import 'dropzone/dist/dropzone.css'
import SparkMD5 from 'spark-md5';
import axios from 'axios';
import qs from 'qs';
import createDropzone from '../features/dropzone.js';

const {_AppSubUrl, _StaticUrlPrefix, csrf} = window.config;
const CloudBrainType = 1;

export default {
data() {
return {
dropzoneUploader: null,
maxFiles: 1,
maxFilesize: 1 * 1024 * 1024 * 1024 * 1024,
acceptedFiles: '*/*',
progress: 0,
status: '',
dropzoneParams: {},
file_status_text: ''
};
},

async mounted() {
this.dropzoneParams = $('div#minioUploader-params');
this.file_status_text = this.dropzoneParams.data('file-status');
this.status = this.dropzoneParams.data('file-init-status');

let previewTemplate = '';
previewTemplate += '<div class="dz-preview dz-file-preview">\n ';
previewTemplate += ' <div class="dz-details">\n ';
previewTemplate += ' <div class="dz-filename">';
previewTemplate +=
' <span data-dz-name data-dz-thumbnail></span>';
previewTemplate += ' </div>\n ';
previewTemplate += ' <div class="dz-size" data-dz-size style="white-space: nowrap"></div>\n ';
previewTemplate += ' </div>\n ';
previewTemplate += ' <div class="dz-progress ui active progress">';
previewTemplate +=
' <div class="dz-upload bar" data-dz-uploadprogress><div class="progress"></div></div>\n ';
previewTemplate += ' </div>\n ';
previewTemplate += ' <div class="dz-success-mark">';
previewTemplate += ' <span>上传成功</span>';
previewTemplate += ' </div>\n ';
previewTemplate += ' <div class="dz-error-mark">';
previewTemplate += ' <span>上传失败</span>';
previewTemplate += ' </div>\n ';
previewTemplate += ' <div class="dz-error-message">';
previewTemplate += ' <span data-dz-errormessage></span>';
previewTemplate += ' </div>\n';
previewTemplate += '</div>';

const $dropzone = $('div#dataset');
console.log('createDropzone');
const dropzoneUploader = await createDropzone($dropzone[0], {
url: '/todouploader',
maxFiles: this.maxFiles,
maxFilesize: this.maxFileSize,
timeout: 0,
autoQueue: false,
dictDefaultMessage: this.dropzoneParams.data('default-message'),
dictInvalidFileType: this.dropzoneParams.data('invalid-input-type'),
dictFileTooBig: this.dropzoneParams.data('file-too-big'),
dictRemoveFile: this.dropzoneParams.data('remove-file'),
previewTemplate
});
dropzoneUploader.on('addedfile', (file) => {
setTimeout(() => {
// eslint-disable-next-line no-unused-expressions
file.accepted && this.onFileAdded(file);
}, 200);
});
dropzoneUploader.on('maxfilesexceeded', function (file) {
if (this.files[0].status !== 'success') {
alert(this.dropzoneParams.data('waitting-uploading'));
this.removeFile(file);
return;
}
this.removeAllFiles();
this.addFile(file);
});

this.dropzoneUploader = dropzoneUploader;
},
methods: {
resetStatus() {
this.progress = 0;
this.status = '';
},
updateProgress(file, progress) {
file.previewTemplate.querySelector(
'.dz-upload'
).style.width = `${progress}%`;
},
emitDropzoneSuccess(file) {
file.status = 'success';
this.dropzoneUploader.emit('success', file);
this.dropzoneUploader.emit('complete', file);
},
emitDropzoneFailed(file) {
this.status = this.dropzoneParams.data('falied');
file.status = 'error';
this.dropzoneUploader.emit('error', file);
// this.dropzoneUploader.emit('complete', file);
},
onFileAdded(file) {
file.datasetId = document
.getElementById('datasetId')
.getAttribute('datasetId');
this.resetStatus();
this.computeMD5(file);
},

finishUpload(file) {
this.emitDropzoneSuccess(file);
setTimeout(() => {
window.location.reload();
}, 1000);
},

computeMD5(file) {
this.resetStatus();
const blobSlice =
File.prototype.slice ||
File.prototype.mozSlice ||
File.prototype.webkitSlice,
chunkSize = 1024 * 1024 * 64,
chunks = Math.ceil(file.size / chunkSize),
spark = new SparkMD5.ArrayBuffer(),
fileReader = new FileReader();
let currentChunk = 0;

const time = new Date().getTime();
// console.log('计算MD5...')
this.status = this.dropzoneParams.data('md5-computing');
file.totalChunkCounts = chunks;
loadNext();

fileReader.onload = (e) => {
fileLoaded.call(this, e);
};
fileReader.onerror = (err) => {
console.warn('oops, something went wrong.', err);
file.cancel();
};

function fileLoaded(e) {
spark.append(e.target.result); // Append array buffer
currentChunk++;
if (currentChunk < chunks) {
// console.log(`第${currentChunk}分片解析完成, 开始第${currentChunk +1}/${chunks}分片解析`);
this.status = `${this.dropzoneParams.data('loading-file')} ${(
(currentChunk / chunks) *
100
).toFixed(2)}% (${currentChunk}/${chunks})`;
this.updateProgress(file, ((currentChunk / chunks) * 100).toFixed(2));
loadNext();
return;
}

const md5 = spark.end();
console.log(
`MD5计算完成:${file.name} \nMD5:${md5} \n分片:${chunks} 大小:${
file.size
} 用时:${(new Date().getTime() - time) / 1000} s`
);
spark.destroy(); // 释放缓存
file.uniqueIdentifier = md5; // 将文件md5赋值给文件唯一标识
file.cmd5 = false; // 取消计算md5状态
this.computeMD5Success(file);
}

function loadNext() {
const start = currentChunk * chunkSize;
const end =
start + chunkSize >= file.size ? file.size : start + chunkSize;
fileReader.readAsArrayBuffer(blobSlice.call(file, start, end));
}
},

async computeMD5Success(md5edFile) {
const file = await this.getSuccessChunks(md5edFile);
try {
if (file.uploadID == '' || file.uuid == '') {
// 未上传过
await this.newMultiUpload(file);
if (file.uploadID != '' && file.uuid != '') {
file.chunks = '';
this.multipartUpload(file);
} else {
// 失败如何处理
return;
}
return;
}

if (file.uploaded == '1') {
// 已上传成功
// 秒传
if (file.attachID == '0') {
// 删除数据集记录,未删除文件
await addAttachment(file);
}
//不同数据集上传同一个文件
if (file.datasetID != '' ) {
if (file.datasetName != "" && file.realName != "") {
var info = "该文件已上传,对应数据集(" + file.datasetName + ")-文件(" + file.realName + ")";
window.alert(info);
window.location.reload();
}
}
console.log('文件已上传完成');
this.progress = 100;
this.status = this.dropzoneParams.data('upload-complete');
this.finishUpload(file);
} else {
// 断点续传
this.multipartUpload(file);
}
} catch (error) {
this.emitDropzoneFailed(file);
console.log(error);
}

async function addAttachment(file) {
return await axios.post(
'/attachments/add',
qs.stringify({
uuid: file.uuid,
file_name: file.name,
size: file.size,
dataset_id: file.datasetId,
type: CloudBrainType,
_csrf: csrf,
})
);
}
},

async getSuccessChunks(file) {
const params = {
params: {
md5: file.uniqueIdentifier,
type: CloudBrainType,
file_name: file.name,
_csrf: csrf
}
};
try {
const response = await axios.get('/attachments/get_chunks', params);
file.uploadID = response.data.uploadID;
file.uuid = response.data.uuid;
file.uploaded = response.data.uploaded;
file.chunks = response.data.chunks;
file.attachID = response.data.attachID;
file.datasetID = response.data.datasetID;
file.datasetName = response.data.datasetName;
file.realName = response.data.fileName;
return file;
} catch (error) {
this.emitDropzoneFailed(file);
console.log('getSuccessChunks catch: ', error);
return null;
}
},

async newMultiUpload(file) {
const res = await axios.get('/attachments/new_multipart', {
params: {
totalChunkCounts: file.totalChunkCounts,
md5: file.uniqueIdentifier,
size: file.size,
fileType: file.type,
type: CloudBrainType,
file_name: file.name,
_csrf: csrf
}
});
file.uploadID = res.data.uploadID;
file.uuid = res.data.uuid;
},

multipartUpload(file) {
const blobSlice =
File.prototype.slice ||
File.prototype.mozSlice ||
File.prototype.webkitSlice,
chunkSize = 1024 * 1024 * 64,
chunks = Math.ceil(file.size / chunkSize),
fileReader = new FileReader(),
time = new Date().getTime();
let currentChunk = 0;

function loadNext() {
const start = currentChunk * chunkSize;
const end =
start + chunkSize >= file.size ? file.size : start + chunkSize;
fileReader.readAsArrayBuffer(blobSlice.call(file, start, end));
}

function checkSuccessChunks() {
const index = successChunks.indexOf((currentChunk + 1).toString());
if (index == -1) {
return false;
}
return true;
}

async function getUploadChunkUrl(currentChunk, partSize) {
const res = await axios.get('/attachments/get_multipart_url', {
params: {
uuid: file.uuid,
uploadID: file.uploadID,
size: partSize,
chunkNumber: currentChunk + 1,
type: CloudBrainType,
file_name: file.name,
_csrf: csrf
}
});
urls[currentChunk] = res.data.url;
}

async function uploadMinio(url, e) {
let urls = [];
const res = await axios.put(url, e.target.result, {
headers: {
'Content-Type': ''
}});
etags[currentChunk] = res.headers.etag;
}

async function uploadMinioNewMethod(url,e){
var xhr = new XMLHttpRequest();
xhr.open('PUT', url, false);
xhr.setRequestHeader('Content-Type', '')
xhr.send(e.target.result);
var etagValue = xhr.getResponseHeader('ETag');
//console.log(etagValue);
etags[currentChunk] = etagValue;
}

async function updateChunk(currentChunk) {
await axios.post(
'/attachments/update_chunk',
qs.stringify({
uuid: file.uuid,
chunkNumber: currentChunk + 1,
etag: etags[currentChunk],
type: CloudBrainType,
_csrf: csrf
})
);
}
async function uploadChunk(e) {
try {
if (!checkSuccessChunks()) {
const start = currentChunk * chunkSize;
const partSize =
start + chunkSize >= file.size ? file.size - start : chunkSize;
// 获取分片上传url
await getUploadChunkUrl(currentChunk, partSize);
if (urls[currentChunk] != '') {
// 上传到minio
await uploadMinioNewMethod(urls[currentChunk], e);
if (etags[currentChunk] != '') {
// 更新数据库:分片上传结果
//await updateChunk(currentChunk);
} else {
console.log("上传到minio uploadChunk etags[currentChunk] == ''");// TODO
}
} else {
console.log("uploadChunk urls[currentChunk] != ''");// TODO
}
}
} catch (error) {
this.emitDropzoneFailed(file);
console.log(error);
}
}

async function completeUpload() {
return await axios.post(
'/attachments/complete_multipart',
qs.stringify({
uuid: file.uuid,
uploadID: file.uploadID,
file_name: file.name,
size: file.size,
dataset_id: file.datasetId,
type: CloudBrainType,
_csrf: csrf
})
);
}

const successChunks = [];
let successParts = [];
successParts = file.chunks.split(',');
for (let i = 0; i < successParts.length; i++) {
successChunks[i] = successParts[i].split('-')[0];
}
const urls = []; // TODO const ?
const etags = [];
console.log('上传分片...');
this.status = this.dropzoneParams.data('uploading');
loadNext();
fileReader.onload = async (e) => {
await uploadChunk(e);
fileReader.abort();
currentChunk++;
if (currentChunk < chunks) {
console.log(
`第${currentChunk}个分片上传完成, 开始第${currentChunk +
1}/${chunks}个分片上传`
);
this.progress = Math.ceil((currentChunk / chunks) * 100);
this.updateProgress(file, ((currentChunk / chunks) * 100).toFixed(2));
this.status = `${this.dropzoneParams.data('uploading')} ${(
(currentChunk / chunks) *
100
).toFixed(2)}%`;
await loadNext();
} else {
await completeUpload();
console.log(
`文件上传完成:${file.name} \n分片:${chunks} 大小:${
file.size
} 用时:${(new Date().getTime() - time) / 1000} s`
);
this.progress = 100;
this.status = this.dropzoneParams.data('upload-complete');
this.finishUpload(file);
}
};
}
}
};
</script>

<style>
.dropzone-wrapper {
margin: 0;
}
.ui .dropzone {
border: 2px dashed #0087f5;
box-shadow: none !important;
padding: 0;
min-height: 5rem;
border-radius: 4px;
}
.dataset .dataset-files #dataset .dz-preview.dz-file-preview,
.dataset .dataset-files #dataset .dz-preview.dz-processing {
display: flex;
align-items: center;
}
.dataset .dataset-files #dataset .dz-preview {
border-bottom: 1px solid #dadce0;
min-height: 0;
}
.upload-info{
margin-top: 0.2em;
}
</style>

Loading…
Cancel
Save