Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduced a feature to perform multipart copy with progress update #419

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Major features include:
- Configurable MD5 Checksum calculations and handling for each uploaded
part (`computeContentMd5`)
- AWS Signature Version 2 and 4 (`awsSignatureVersion`)
- Multipart upload / copy
- S3 Transfer Acceleration (`s3Acceleration`)
- Robust recovery when uploading huge files. Only parts that
have not been fully uploaded again. (`s3FileCacheHoursAgo`, `allowS3ExistenceOptimization`)
Expand Down Expand Up @@ -97,6 +98,7 @@ See more examples on [wiki](https://github.com/TTLabs/EvaporateJS/wiki/Examples)

- [#create()](https://github.com/TTLabs/EvaporateJS/wiki/Evaporate.create())
- [#add()](https://github.com/TTLabs/EvaporateJS/wiki/Evaporate.prototype.add())
- [#copy()](https://github.com/TTLabs/EvaporateJS/wiki/Evaporate.prototype.copy())
- [#cancel()](https://github.com/TTLabs/EvaporateJS/wiki/Evaporate.prototype.cancel())
- [#pause()](https://github.com/TTLabs/EvaporateJS/wiki/Evaporate.prototype.pause())
- [#resume()](https://github.com/TTLabs/EvaporateJS/wiki/Evaporate.prototype.resume())
Expand Down
60 changes: 42 additions & 18 deletions evaporate.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
ACTIVE_STATUSES = [PENDING, EVAPORATING, ERROR],
ETAG_OF_0_LENGTH_BLOB = '"d41d8cd98f00b204e9800998ecf8427e"',
PARTS_MONITOR_INTERVAL_MS = 2 * 60 * 1000,
S3_MAX_SUPPORTED_PARTS = 10000,
IMMUTABLE_OPTIONS = [
'maxConcurrentParts',
'logging',
Expand Down Expand Up @@ -209,14 +210,23 @@
}
};
Evaporate.prototype.add = function (file, pConfig) {
return this.createFileUploadAndAddToQueue(file, pConfig);
};
Evaporate.prototype.copy = function (file, pConfig) {
return this.createFileUploadAndAddToQueue(file, extend(pConfig, { isMultipartCopy: true}));
};
Evaporate.prototype.cancel = function (id) {
return typeof id === 'undefined' ? this._cancelAll() : this._cancelOne(id);
};
Evaporate.prototype.createFileUploadAndAddToQueue = function (file, pConfig) {
var self = this,
fileConfig;
return new Promise(function (resolve, reject) {
var c = extend(pConfig, {});
var c = extend({}, pConfig);

IMMUTABLE_OPTIONS.forEach(function (a) { delete c[a]; });

fileConfig = extend(self.config, c);
fileConfig = extend(extend({}, self.config), c);

if (typeof file === 'undefined' || typeof file.file === 'undefined') {
return reject('Missing file');
Expand All @@ -233,6 +243,8 @@
file.name = s3EncodedObjectName(file.name);
}

var xAmzHeadersAtUpload = fileConfig.isMultipartCopy ? { 'x-amz-copy-source': encodeURIComponent('/' + self.config.bucket + '/' + file.file.path) } : {};

var fileUpload = new FileUpload(extend({
started: function () {},
uploadInitiated: function () {},
Expand All @@ -250,7 +262,7 @@
xAmzHeadersAtInitiate: {},
notSignedHeadersAtInitiate: {},
xAmzHeadersCommon: null,
xAmzHeadersAtUpload: {},
xAmzHeadersAtUpload: xAmzHeadersAtUpload,
xAmzHeadersAtComplete: {}
}, file, {
status: PENDING,
Expand Down Expand Up @@ -453,6 +465,7 @@
this.evaporate = evaporate;
this.localTimeOffset = evaporate.localTimeOffset;
this.deferredCompletion = defer();
this.computedPartSize = this.con.partSize;

extend(this, file);

Expand All @@ -472,6 +485,7 @@
FileUpload.prototype.s3Parts = [];
FileUpload.prototype.partsOnS3 = [];
FileUpload.prototype.deferredCompletion = undefined;
FileUpload.prototype.computedPartSize = undefined;
FileUpload.prototype.abortedByUser = false;

// Progress and Stats
Expand Down Expand Up @@ -677,7 +691,10 @@
});
};
FileUpload.prototype.makeParts = function (firstPart) {
this.numParts = Math.ceil(this.sizeBytes / this.con.partSize) || 1; // issue #58
var numParts = Math.ceil(this.sizeBytes / this.con.partSize) || 1; // issue #58
numParts = Math.min(numParts, S3_MAX_SUPPORTED_PARTS);
this.computedPartSize = Math.ceil(this.sizeBytes / numParts);
this.numParts = Math.ceil(this.sizeBytes / this.computedPartSize) || 1;
var partsDeferredPromises = [];

var self = this;
Expand Down Expand Up @@ -749,7 +766,7 @@
fileSize: this.sizeBytes,
fileType: this.file.type,
lastModifiedDate: dateISOString(this.file.lastModified),
partSize: this.con.partSize,
partSize: this.computedPartSize,
signParams: this.con.signParams,
createdAt: new Date().toISOString()
};
Expand Down Expand Up @@ -803,7 +820,7 @@

// check that the part sizes and bucket match, and if the file name of the upload
// matches if onlyRetryForSameFileName is true
return this.con.partSize === u.partSize &&
return this.computedPartSize === u.partSize &&
completedAt > HOURS_AGO &&
this.con.bucket === u.bucket &&
(this.con.onlyRetryForSameFileName ? this.name === u.awsKey : true);
Expand Down Expand Up @@ -1354,14 +1371,16 @@
this.part = part;

this.partNumber = part.partNumber;
this.start = (this.partNumber - 1) * fileUpload.con.partSize;
this.end = Math.min(this.partNumber * fileUpload.con.partSize, fileUpload.sizeBytes);
this.start = (this.partNumber - 1) * fileUpload.computedPartSize;
this.end = Math.min(this.partNumber * fileUpload.computedPartSize, fileUpload.sizeBytes);

var xAmzHeaders = fileUpload.con.isMultipartCopy ? extend({'x-amz-copy-source-range': 'bytes=' + this.start + '-' + (this.end-1) }, fileUpload.xAmzHeadersAtUpload) : fileUpload.xAmzHeadersAtUpload;

var request = {
method: 'PUT',
path: '?partNumber=' + this.partNumber + '&uploadId=' + fileUpload.uploadId,
step: 'upload #' + this.partNumber,
x_amz_headers: fileUpload.xAmzHeadersCommon || fileUpload.xAmzHeadersAtUpload,
x_amz_headers: fileUpload.xAmzHeadersCommon || xAmzHeaders,
contentSha256: "UNSIGNED-PAYLOAD",
onProgress: this.onProgress.bind(this)
};
Expand Down Expand Up @@ -1416,18 +1435,23 @@
this.part.loadedBytesPrevious = null;

var self = this;
return this.getPartMd5Digest()
.then(function () {
l.d('Sending', self.request.step);
SignedS3AWSRequest.prototype.send.call(self);
});
var sendRequest = function () {
l.d('Sending', self.request.step);
SignedS3AWSRequest.prototype.send.call(self);
};

return this.con.isMultipartCopy ? sendRequest() : this.getPartMd5Digest().then(sendRequest);
}
};
PutPart.prototype.success = function () {
clearInterval(this.stalledInterval);
var eTag = this.currentXhr.getResponseHeader('ETag');
var eTag = this.con.isMultipartCopy? elementText(this.currentXhr.responseText, "ETag").replace(/"/g, '"') : this.currentXhr.getResponseHeader('ETag');
this.currentXhr = null;
if (this.fileUpload.partSuccess(eTag, this)) { this.awsDeferred.resolve(this.currentXhr); }
if (this.fileUpload.partSuccess(eTag, this)) {
if (this.con.isMultipartCopy)
this.fileUpload.updateLoaded(this.end - this.start); // updating progress on part complete as copy doesn't have any payload
this.awsDeferred.resolve(this.currentXhr);
}
};
PutPart.prototype.onProgress = function (evt) {
if (evt.loaded > 0) {
Expand Down Expand Up @@ -1502,7 +1526,7 @@
// stream is empty or ended
if (!stream.readable) { return resolve([]); }

var arr = new Uint8Array(Math.min(this.con.partSize, this.end - this.start)),
var arr = new Uint8Array(Math.min(this.computedPartSize, this.end - this.start)),
i = 0;
stream.on('data', onData);
stream.on('end', onEnd);
Expand Down Expand Up @@ -1537,7 +1561,7 @@
};
PutPart.prototype.getPayload = function () {
if (typeof this.payloadPromise === 'undefined') {
this.payloadPromise = this.con.readableStreams ? this.payloadFromStream() : this.payloadFromBlob();
this.payloadPromise = this.con.isMultipartCopy ? Promise.resolve('') : this.con.readableStreams ? this.payloadFromStream() : this.payloadFromBlob();
}
return this.payloadPromise;
};
Expand Down
64 changes: 56 additions & 8 deletions test/evaporate.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ test('#create evaporate should support #add', (t) => {
})

})
test('#create evaporate should support #copy', (t) => {
return Evaporate.create(baseConfig)
.then(function (evaporate) {
expect(evaporate.copy).to.be.instanceof(Function)
},
function (reason) {
t.fail(reason)
})
})
test('#create evaporate should support #cancel', (t) => {
return Evaporate.create(baseConfig)
.then(function (evaporate) {
Expand Down Expand Up @@ -330,10 +339,49 @@ test.todo('should validate readableStream and readableStreamPartMethod')

// add

test('should fail to add() when no file is present', (t) => {
test('should invoke createFileUploadAndAddToQueue() when add is called', (t) => {
return Evaporate.create(baseConfig)
.then(function (evaporate) {
var createUploadStub = sinon.stub(evaporate, 'createFileUploadAndAddToQueue');
var targetFile = {
name: 'test',
file: new File({
name: 'test',
path: '/tmp/file',
size: 1024
})
};
createUploadStub.calledWith();
evaporate.add(targetFile);
expect(createUploadStub.calledOnce).to.be.true;
expect(createUploadStub.calledWith(targetFile, undefined)).to.be.true;
sinon.restore(evaporate.createFileUploadAndAddToQueue);
t.pass()
})
});
test('should invoke createFileUploadAndAddToQueue() when copy is called', (t) => {
return Evaporate.create(baseConfig)
.then(function (evaporate) {
var createUploadStub = sinon.stub(evaporate, 'createFileUploadAndAddToQueue');
var targetFile = {
name: 'test',
file: new File({
name: 'test',
path: '/tmp/file',
size: 1024
})
};
evaporate.copy(targetFile);
expect(createUploadStub.calledOnce).to.be.true;
expect(createUploadStub.calledWith(targetFile, { isMultipartCopy: true })).to.be.true;
sinon.restore(evaporate.createFileUploadAndAddToQueue);
t.pass()
})
});
test('should fail to createFileUploadAndAddToQueue() when no file is present', (t) => {
return Evaporate.create(baseConfig)
.then(function (evaporate) {
evaporate.add({ name: 'test' })
evaporate.createFileUploadAndAddToQueue({ name: 'test' })
.then(function () {
t.fail('Evaporate added a new file but should not have.')
},
Expand All @@ -342,10 +390,10 @@ test('should fail to add() when no file is present', (t) => {
})
})
});
test('should fail to add() when empty config is present', (t) => {
test('should fail to createFileUploadAndAddToQueue() when empty config is present', (t) => {
return Evaporate.create(baseConfig)
.then(function (evaporate) {
evaporate.add({})
evaporate.createFileUploadAndAddToQueue({})
.then(function () {
t.fail('Evaporate added a new file but should not have.')
},
Expand All @@ -354,10 +402,10 @@ test('should fail to add() when empty config is present', (t) => {
})
})
});
test('should fail to add() when no config is present', (t) => {
test('should fail to createFileUploadAndAddToQueue() when no config is present', (t) => {
return Evaporate.create(baseConfig)
.then(function (evaporate) {
evaporate.add()
evaporate.createFileUploadAndAddToQueue()
.then(function () {
t.fail('Evaporate added a new file but should not have.')
},
Expand All @@ -369,7 +417,7 @@ test('should fail to add() when no config is present', (t) => {
test('should require a name if file is present', (t) => {
return Evaporate.create(baseConfig)
.then(function (evaporate) {
evaporate.add({
evaporate.createFileUploadAndAddToQueue({
file: new File({
path: '/tmp/file',
size: 50000
Expand All @@ -386,7 +434,7 @@ test('should require a name if file is present', (t) => {
test('should respect maxFileSize', (t) => {
return Evaporate.create(Object.assign({}, baseConfig, {maxFileSize: 10}))
.then(function (evaporate) {
evaporate.add({
evaporate.createFileUploadAndAddToQueue({
file: new File({
path: '/tmp/file',
size: 50000
Expand Down