Skip to content

Commit

Permalink
feat: implement parallel uploading for multipartUpload (#149)
Browse files Browse the repository at this point in the history
  • Loading branch information
rockuw authored and dead-horse committed Oct 27, 2016
1 parent 188df57 commit a89181f
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 10 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,7 @@ parameters:
- name {String} object name
- file {String|File} file path or HTML5 Web File
- [options] {Object} optional args
- [parallel] {Number} the number of parts to be uploaded in parallel
- [partSize] {Number} the suggested size for each part
- [progress] {Function} the progress callback called after each
successful upload of one part, it will be given two parameters:
Expand All @@ -1387,6 +1388,7 @@ var result = yield store.multipartUpload('object', '/tmp/file');
console.log(result);
var result = yield store.multipartUpload('object', '/tmp/file', {
parallel: 4,
partSize: 1024 * 1024,
progress: function* (p, cpt) {
console.log(p);
Expand Down
45 changes: 35 additions & 10 deletions lib/multipart.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ var eoe = require('end-or-error');
var util = require('util');
var path = require('path');
var mime = require('mime');
var gather = require('co-gather');

var proto = exports;

Expand Down Expand Up @@ -60,8 +61,7 @@ proto.multipartUpload = function* multipartUpload(name, file, options) {
fileSize: fileSize,
partSize: partSize,
uploadId: uploadId,
doneParts: [],
nextPart: 0
doneParts: []
};

return yield this._resumeMultipart(checkpoint, options);
Expand All @@ -79,27 +79,51 @@ proto._resumeMultipart = function* _resumeMultipart(checkpoint, options) {
var partSize = checkpoint.partSize;
var uploadId = checkpoint.uploadId;
var doneParts = checkpoint.doneParts;
var nextPart = checkpoint.nextPart;
var name = checkpoint.name;

var partOffs = this._divideParts(fileSize, partSize);
var numParts = partOffs.length;
for (var i = nextPart; i < numParts; i++) {
var partNo = i + 1;
var pi = partOffs[i];

var uploadPartJob = function* (self, partNo) {
var pi = partOffs[partNo - 1];
var data = {
stream: this._createStream(file, pi.start, pi.end),
stream: self._createStream(file, pi.start, pi.end),
size: pi.end - pi.start
};
var result = yield this._uploadPart(name, uploadId, partNo, data);

var result = yield self._uploadPart(name, uploadId, partNo, data);

doneParts.push({
number: partNo,
etag: result.res.headers.etag
});
checkpoint.nextPart = i + 1;
checkpoint.doneParts = doneParts;

if (options && options.progress) {
yield options.progress(partNo / numParts, checkpoint);
yield options.progress(doneParts.length / numParts, checkpoint);
}
};

var all = Array.from(new Array(numParts), (x, i) => i + 1);
var done = doneParts.map(p => p.number);
var todo = all.filter(p => done.indexOf(p) < 0);

var jobs = [];
for (var i = 0; i < todo.length; i++) {
jobs.push(uploadPartJob(this, todo[i]));
}

const defaultParallel = 5;
var parallel = options.parallel || defaultParallel;

// yield in parallel
var results = yield gather(jobs, parallel);

// check errors after all jobs are completed
for (var i = 0; i < results.length; i++) {
if (results[i].isError) {
throw new Error(
'Failed to upload some parts with error: ' + results[i].error.toString());
}
}

Expand Down Expand Up @@ -228,6 +252,7 @@ proto._uploadPart = function* _uploadPart(name, uploadId, partNo, data, options)
* @param {Object} options
*/
proto._completeMultipartUpload = function* _completeMultipartUpload(name, uploadId, parts, options) {
parts.sort((a, b) => a.number - b.number);
var xml = '<?xml version="1.0" encoding="UTF-8"?>\n<CompleteMultipartUpload>\n';
for (var i = 0; i < parts.length; i++) {
var p = parts[i];
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
"agentkeepalive": "^2.1.1",
"co": "^4.6.0",
"co-defer": "^1.0.0",
"co-gather": "^0.0.1",
"copy-to": "^2.0.1",
"dateformat": "^1.0.12",
"debug": "^2.2.0",
Expand Down
2 changes: 2 additions & 0 deletions test/multipart.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ describe('test/multipart.test.js', function () {
fs.writeFileSync(cptFile, JSON.stringify(cpt));
}
});
// should not succeed
assert(false);
} catch (err) {
// pass
}
Expand Down

0 comments on commit a89181f

Please sign in to comment.