From a12d2b15e48bf62f48b0026a5bdcbcd9633b502f Mon Sep 17 00:00:00 2001 From: Mr Robot Date: Tue, 12 Jan 2016 15:51:47 +0100 Subject: [PATCH 1/3] Fixing issue https://github.com/CollectionFS/Meteor-CollectionFS/issues/731 and https://github.com/CollectionFS/Meteor-CollectionFS/issues/408 --- packages/tempstore/tempStore.js | 2 ++ packages/worker/fileWorker.js | 1 + 2 files changed, 3 insertions(+) diff --git a/packages/tempstore/tempStore.js b/packages/tempstore/tempStore.js index f7d4d5f0..eacb644d 100644 --- a/packages/tempstore/tempStore.js +++ b/packages/tempstore/tempStore.js @@ -299,6 +299,8 @@ FS.TempStore.createWriteStream = function(fileObj, options) { // Progress self.emit('progress', fileObj, chunkNum, chunkCount, chunkSum, result); + fileObj.update({ $set: {node_id: process.env.METEOR_PARENT_PID} }); + // If upload is completed if (chunkCount === chunkSum) { // We no longer need the chunk info diff --git a/packages/worker/fileWorker.js b/packages/worker/fileWorker.js index b21ff870..893c8243 100644 --- a/packages/worker/fileWorker.js +++ b/packages/worker/fileWorker.js @@ -84,6 +84,7 @@ function getReadyQuery(storeName) { var selector = {uploadedAt: {$exists: true}}; selector['copies.' + storeName] = null; selector['failures.copies.' + storeName + '.doneTrying'] = {$ne: true}; + selector['node_id'] = process.env.METEOR_PARENT_PID; return selector; } From 0706aa30eefdc2af8c0b5e7b7818d45e867ab429 Mon Sep 17 00:00:00 2001 From: Mr Robot Date: Fri, 22 Jan 2016 09:58:29 +0100 Subject: [PATCH 2/3] Use env var COLLECTIONFS_ENV_NAME_UNIQUE_ID to find the node instance unique identifier. Move removing file to finish event --- packages/tempstore/tempStore.js | 23 +++++++++++++++++++---- packages/worker/fileWorker.js | 11 +++++++++-- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/packages/tempstore/tempStore.js b/packages/tempstore/tempStore.js index eacb644d..adeab990 100644 --- a/packages/tempstore/tempStore.js +++ b/packages/tempstore/tempStore.js @@ -293,19 +293,28 @@ FS.TempStore.createWriteStream = function(fileObj, options) { setObj['keys.' + chunkNum] = result.fileKey; tracker.update(selector, {$set: setObj}); + + var temp = tracker.findOne(selector); + + if(!temp){ + FS.debug && console.log('NOT FOUND FROM TEMPSTORE => EXIT (REMOVED)'); + return; + } + // Get updated chunkCount var chunkCount = FS.Utility.size(tracker.findOne(selector).keys); // Progress self.emit('progress', fileObj, chunkNum, chunkCount, chunkSum, result); - fileObj.update({ $set: {node_id: process.env.METEOR_PARENT_PID} }); - // If upload is completed if (chunkCount === chunkSum) { // We no longer need the chunk info var modifier = { $set: {}, $unset: {chunkCount: 1, chunkSum: 1, chunkSize: 1} }; + if(!fileObj.instance_id) + modifier.$set.instance_id = process.env.COLLECTIONFS_ENV_NAME_UNIQUE_ID ? process.env[process.env.COLLECTIONFS_ENV_NAME_UNIQUE_ID] : process.env.METEOR_PARENT_PID; + // Check if the file has been uploaded before if (typeof fileObj.uploadedAt === 'undefined') { // We set the uploadedAt date @@ -326,8 +335,14 @@ FS.TempStore.createWriteStream = function(fileObj, options) { // XXX is emitting "ready" necessary? self.emit('ready', fileObj, chunkCount, result); } else { - // Update the chunkCount on the fileObject - fileObj.update({ $set: {chunkCount: chunkCount} }); + + var modifier = { $set: {}}; + if(!fileObj.instance_id) + modifier.$set.instance_id = process.env.COLLECTIONFS_ENV_NAME_UNIQUE_ID ? process.env[process.env.COLLECTIONFS_ENV_NAME_UNIQUE_ID] : process.env.METEOR_PARENT_PID; + + modifier.$set.chunkCount = chunkCount; + + fileObj.update(modifier); } }); diff --git a/packages/worker/fileWorker.js b/packages/worker/fileWorker.js index 893c8243..d401b84e 100644 --- a/packages/worker/fileWorker.js +++ b/packages/worker/fileWorker.js @@ -43,12 +43,14 @@ FS.FileWorker.observe = function(fsCollection) { // Initiate observe for finding files that have been stored so we can delete // any temp files + /* fsCollection.files.find(getDoneQuery(fsCollection.options.stores)).observe({ added: function(fsFile) { FS.debug && console.log("FileWorker ADDED - calling deleteChunks for", fsFile._id); FS.TempStore.removeFile(fsFile); } }); + */ // Initiate observe for catching files that have been removed and // removing the data from all stores as well @@ -84,7 +86,7 @@ function getReadyQuery(storeName) { var selector = {uploadedAt: {$exists: true}}; selector['copies.' + storeName] = null; selector['failures.copies.' + storeName + '.doneTrying'] = {$ne: true}; - selector['node_id'] = process.env.METEOR_PARENT_PID; + selector['instance_id'] = process.env.COLLECTIONFS_ENV_NAME_UNIQUE_ID ? process.env[process.env.COLLECTIONFS_ENV_NAME_UNIQUE_ID] : process.env.METEOR_PARENT_PID; return selector; } @@ -142,7 +144,7 @@ function getDoneQuery(stores) { tempCond['failures.copies.' + storeName + '.doneTrying'] = true; copyCond.$or.push(tempCond); selector.$and.push(copyCond); - }) + }); return selector; } @@ -173,6 +175,11 @@ function saveCopy(fsFile, storeName, options) { var writeStream = storage.adapter.createWriteStream(fsFile); var readStream = FS.TempStore.createReadStream(fsFile); + writeStream.on('finish',Meteor.bindEnvironment(function(){ + FS.debug && console.log('finish', fsFile._id); + FS.TempStore.removeFile(fsFile); + })); + // Pipe the temp data into the storage adapter readStream.pipe(writeStream); } From 39a7b687b55398ad589535037ffd7a52c2c33d1d Mon Sep 17 00:00:00 2001 From: Mr Robot Date: Fri, 22 Jan 2016 10:05:13 +0100 Subject: [PATCH 3/3] Correction : suppress 1 tracker query --- packages/tempstore/tempStore.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/tempstore/tempStore.js b/packages/tempstore/tempStore.js index adeab990..b372dd05 100644 --- a/packages/tempstore/tempStore.js +++ b/packages/tempstore/tempStore.js @@ -302,7 +302,7 @@ FS.TempStore.createWriteStream = function(fileObj, options) { } // Get updated chunkCount - var chunkCount = FS.Utility.size(tracker.findOne(selector).keys); + var chunkCount = FS.Utility.size(temp.keys); // Progress self.emit('progress', fileObj, chunkNum, chunkCount, chunkSum, result);