From bf5e0f17d86e03027550e4a0ce319a5577498c8c Mon Sep 17 00:00:00 2001 From: Mohammed Shaffi Date: Fri, 26 Oct 2018 22:27:38 +0530 Subject: [PATCH] :sparkles: Adding a feature to perform multipart copy and setting max number of parts to 10000 as per AWS specs --- README.md | 2 ++ evaporate.js | 60 +++++++++++++++++++++++++++------------ test/evaporate.spec.js | 64 ++++++++++++++++++++++++++++++++++++------ 3 files changed, 100 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index 7a4f7f4e..1727229e 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ Major features include: - Configurable MD5 Checksum calculations and handling for each uploaded part (`computeContentMd5`) - AWS Signature Version 2 and 4 (`awsSignatureVersion`) +- Multipart upload / copy - S3 Transfer Acceleration (`s3Acceleration`) - Robust recovery when uploading huge files. Only parts that have not been fully uploaded again. (`s3FileCacheHoursAgo`, `allowS3ExistenceOptimization`) @@ -97,6 +98,7 @@ See more examples on [wiki](https://github.com/TTLabs/EvaporateJS/wiki/Examples) - [#create()](https://github.com/TTLabs/EvaporateJS/wiki/Evaporate.create()) - [#add()](https://github.com/TTLabs/EvaporateJS/wiki/Evaporate.prototype.add()) +- [#copy()](https://github.com/TTLabs/EvaporateJS/wiki/Evaporate.prototype.copy()) - [#cancel()](https://github.com/TTLabs/EvaporateJS/wiki/Evaporate.prototype.cancel()) - [#pause()](https://github.com/TTLabs/EvaporateJS/wiki/Evaporate.prototype.pause()) - [#resume()](https://github.com/TTLabs/EvaporateJS/wiki/Evaporate.prototype.resume()) diff --git a/evaporate.js b/evaporate.js index 696667af..2b3a22d4 100644 --- a/evaporate.js +++ b/evaporate.js @@ -28,6 +28,7 @@ ACTIVE_STATUSES = [PENDING, EVAPORATING, ERROR], ETAG_OF_0_LENGTH_BLOB = '"d41d8cd98f00b204e9800998ecf8427e"', PARTS_MONITOR_INTERVAL_MS = 2 * 60 * 1000, + S3_MAX_SUPPORTED_PARTS = 10000, IMMUTABLE_OPTIONS = [ 'maxConcurrentParts', 'logging', @@ -209,14 +210,23 @@ } }; Evaporate.prototype.add = function (file, pConfig) { + return this.createFileUploadAndAddToQueue(file, pConfig); + }; + Evaporate.prototype.copy = function (file, pConfig) { + return this.createFileUploadAndAddToQueue(file, extend(pConfig, { isMultipartCopy: true})); + }; + Evaporate.prototype.cancel = function (id) { + return typeof id === 'undefined' ? this._cancelAll() : this._cancelOne(id); + }; + Evaporate.prototype.createFileUploadAndAddToQueue = function (file, pConfig) { var self = this, fileConfig; return new Promise(function (resolve, reject) { - var c = extend(pConfig, {}); + var c = extend({}, pConfig); IMMUTABLE_OPTIONS.forEach(function (a) { delete c[a]; }); - fileConfig = extend(self.config, c); + fileConfig = extend(extend({}, self.config), c); if (typeof file === 'undefined' || typeof file.file === 'undefined') { return reject('Missing file'); @@ -233,6 +243,8 @@ file.name = s3EncodedObjectName(file.name); } + var xAmzHeadersAtUpload = fileConfig.isMultipartCopy ? { 'x-amz-copy-source': encodeURIComponent('/' + self.config.bucket + '/' + file.file.path) } : {}; + var fileUpload = new FileUpload(extend({ started: function () {}, uploadInitiated: function () {}, @@ -250,7 +262,7 @@ xAmzHeadersAtInitiate: {}, notSignedHeadersAtInitiate: {}, xAmzHeadersCommon: null, - xAmzHeadersAtUpload: {}, + xAmzHeadersAtUpload: xAmzHeadersAtUpload, xAmzHeadersAtComplete: {} }, file, { status: PENDING, @@ -453,6 +465,7 @@ this.evaporate = evaporate; this.localTimeOffset = evaporate.localTimeOffset; this.deferredCompletion = defer(); + this.computedPartSize = this.con.partSize; extend(this, file); @@ -472,6 +485,7 @@ FileUpload.prototype.s3Parts = []; FileUpload.prototype.partsOnS3 = []; FileUpload.prototype.deferredCompletion = undefined; + FileUpload.prototype.computedPartSize = undefined; FileUpload.prototype.abortedByUser = false; // Progress and Stats @@ -677,7 +691,10 @@ }); }; FileUpload.prototype.makeParts = function (firstPart) { - this.numParts = Math.ceil(this.sizeBytes / this.con.partSize) || 1; // issue #58 + var numParts = Math.ceil(this.sizeBytes / this.con.partSize) || 1; // issue #58 + numParts = Math.min(numParts, S3_MAX_SUPPORTED_PARTS); + this.computedPartSize = Math.ceil(this.sizeBytes / numParts); + this.numParts = Math.ceil(this.sizeBytes / this.computedPartSize) || 1; var partsDeferredPromises = []; var self = this; @@ -749,7 +766,7 @@ fileSize: this.sizeBytes, fileType: this.file.type, lastModifiedDate: dateISOString(this.file.lastModified), - partSize: this.con.partSize, + partSize: this.computedPartSize, signParams: this.con.signParams, createdAt: new Date().toISOString() }; @@ -803,7 +820,7 @@ // check that the part sizes and bucket match, and if the file name of the upload // matches if onlyRetryForSameFileName is true - return this.con.partSize === u.partSize && + return this.computedPartSize === u.partSize && completedAt > HOURS_AGO && this.con.bucket === u.bucket && (this.con.onlyRetryForSameFileName ? this.name === u.awsKey : true); @@ -1354,14 +1371,16 @@ this.part = part; this.partNumber = part.partNumber; - this.start = (this.partNumber - 1) * fileUpload.con.partSize; - this.end = Math.min(this.partNumber * fileUpload.con.partSize, fileUpload.sizeBytes); + this.start = (this.partNumber - 1) * fileUpload.computedPartSize; + this.end = Math.min(this.partNumber * fileUpload.computedPartSize, fileUpload.sizeBytes); + + var xAmzHeaders = fileUpload.con.isMultipartCopy ? extend({'x-amz-copy-source-range': 'bytes=' + this.start + '-' + (this.end-1) }, fileUpload.xAmzHeadersAtUpload) : fileUpload.xAmzHeadersAtUpload; var request = { method: 'PUT', path: '?partNumber=' + this.partNumber + '&uploadId=' + fileUpload.uploadId, step: 'upload #' + this.partNumber, - x_amz_headers: fileUpload.xAmzHeadersCommon || fileUpload.xAmzHeadersAtUpload, + x_amz_headers: fileUpload.xAmzHeadersCommon || xAmzHeaders, contentSha256: "UNSIGNED-PAYLOAD", onProgress: this.onProgress.bind(this) }; @@ -1416,18 +1435,23 @@ this.part.loadedBytesPrevious = null; var self = this; - return this.getPartMd5Digest() - .then(function () { - l.d('Sending', self.request.step); - SignedS3AWSRequest.prototype.send.call(self); - }); + var sendRequest = function () { + l.d('Sending', self.request.step); + SignedS3AWSRequest.prototype.send.call(self); + }; + + return this.con.isMultipartCopy ? sendRequest() : this.getPartMd5Digest().then(sendRequest); } }; PutPart.prototype.success = function () { clearInterval(this.stalledInterval); - var eTag = this.currentXhr.getResponseHeader('ETag'); + var eTag = this.con.isMultipartCopy? elementText(this.currentXhr.responseText, "ETag").replace(/"/g, '"') : this.currentXhr.getResponseHeader('ETag'); this.currentXhr = null; - if (this.fileUpload.partSuccess(eTag, this)) { this.awsDeferred.resolve(this.currentXhr); } + if (this.fileUpload.partSuccess(eTag, this)) { + if (this.con.isMultipartCopy) + this.fileUpload.updateLoaded(this.end - this.start); // updating progress on part complete as copy doesn't have any payload + this.awsDeferred.resolve(this.currentXhr); + } }; PutPart.prototype.onProgress = function (evt) { if (evt.loaded > 0) { @@ -1502,7 +1526,7 @@ // stream is empty or ended if (!stream.readable) { return resolve([]); } - var arr = new Uint8Array(Math.min(this.con.partSize, this.end - this.start)), + var arr = new Uint8Array(Math.min(this.computedPartSize, this.end - this.start)), i = 0; stream.on('data', onData); stream.on('end', onEnd); @@ -1537,7 +1561,7 @@ }; PutPart.prototype.getPayload = function () { if (typeof this.payloadPromise === 'undefined') { - this.payloadPromise = this.con.readableStreams ? this.payloadFromStream() : this.payloadFromBlob(); + this.payloadPromise = this.con.isMultipartCopy ? Promise.resolve('') : this.con.readableStreams ? this.payloadFromStream() : this.payloadFromBlob(); } return this.payloadPromise; }; diff --git a/test/evaporate.spec.js b/test/evaporate.spec.js index 0e2d3402..3ec4e9d3 100644 --- a/test/evaporate.spec.js +++ b/test/evaporate.spec.js @@ -94,6 +94,15 @@ test('#create evaporate should support #add', (t) => { }) }) +test('#create evaporate should support #copy', (t) => { + return Evaporate.create(baseConfig) + .then(function (evaporate) { + expect(evaporate.copy).to.be.instanceof(Function) + }, + function (reason) { + t.fail(reason) + }) +}) test('#create evaporate should support #cancel', (t) => { return Evaporate.create(baseConfig) .then(function (evaporate) { @@ -330,10 +339,49 @@ test.todo('should validate readableStream and readableStreamPartMethod') // add -test('should fail to add() when no file is present', (t) => { +test('should invoke createFileUploadAndAddToQueue() when add is called', (t) => { + return Evaporate.create(baseConfig) + .then(function (evaporate) { + var createUploadStub = sinon.stub(evaporate, 'createFileUploadAndAddToQueue'); + var targetFile = { + name: 'test', + file: new File({ + name: 'test', + path: '/tmp/file', + size: 1024 + }) + }; + createUploadStub.calledWith(); + evaporate.add(targetFile); + expect(createUploadStub.calledOnce).to.be.true; + expect(createUploadStub.calledWith(targetFile, undefined)).to.be.true; + sinon.restore(evaporate.createFileUploadAndAddToQueue); + t.pass() + }) +}); +test('should invoke createFileUploadAndAddToQueue() when copy is called', (t) => { + return Evaporate.create(baseConfig) + .then(function (evaporate) { + var createUploadStub = sinon.stub(evaporate, 'createFileUploadAndAddToQueue'); + var targetFile = { + name: 'test', + file: new File({ + name: 'test', + path: '/tmp/file', + size: 1024 + }) + }; + evaporate.copy(targetFile); + expect(createUploadStub.calledOnce).to.be.true; + expect(createUploadStub.calledWith(targetFile, { isMultipartCopy: true })).to.be.true; + sinon.restore(evaporate.createFileUploadAndAddToQueue); + t.pass() + }) +}); +test('should fail to createFileUploadAndAddToQueue() when no file is present', (t) => { return Evaporate.create(baseConfig) .then(function (evaporate) { - evaporate.add({ name: 'test' }) + evaporate.createFileUploadAndAddToQueue({ name: 'test' }) .then(function () { t.fail('Evaporate added a new file but should not have.') }, @@ -342,10 +390,10 @@ test('should fail to add() when no file is present', (t) => { }) }) }); -test('should fail to add() when empty config is present', (t) => { +test('should fail to createFileUploadAndAddToQueue() when empty config is present', (t) => { return Evaporate.create(baseConfig) .then(function (evaporate) { - evaporate.add({}) + evaporate.createFileUploadAndAddToQueue({}) .then(function () { t.fail('Evaporate added a new file but should not have.') }, @@ -354,10 +402,10 @@ test('should fail to add() when empty config is present', (t) => { }) }) }); -test('should fail to add() when no config is present', (t) => { +test('should fail to createFileUploadAndAddToQueue() when no config is present', (t) => { return Evaporate.create(baseConfig) .then(function (evaporate) { - evaporate.add() + evaporate.createFileUploadAndAddToQueue() .then(function () { t.fail('Evaporate added a new file but should not have.') }, @@ -369,7 +417,7 @@ test('should fail to add() when no config is present', (t) => { test('should require a name if file is present', (t) => { return Evaporate.create(baseConfig) .then(function (evaporate) { - evaporate.add({ + evaporate.createFileUploadAndAddToQueue({ file: new File({ path: '/tmp/file', size: 50000 @@ -386,7 +434,7 @@ test('should require a name if file is present', (t) => { test('should respect maxFileSize', (t) => { return Evaporate.create(Object.assign({}, baseConfig, {maxFileSize: 10})) .then(function (evaporate) { - evaporate.add({ + evaporate.createFileUploadAndAddToQueue({ file: new File({ path: '/tmp/file', size: 50000