diff --git a/lib/implementations/batch/batch.js b/lib/implementations/batch/batch.js index 7004c63..b356b64 100644 --- a/lib/implementations/batch/batch.js +++ b/lib/implementations/batch/batch.js @@ -11,7 +11,7 @@ module.exports = class Batch extends Builder.Flow { if (new.target === Batch) { throw new Error('Cannot construct Batch instances directly'); } - this._storeOptions = _.pick(options, ["number", "groupBy", "attributes", "timeout", "type", "path"]); + this._storeOptions = _.pick(options, ["number", "groupBy", "attributes", "timeout", "type", "path", "timestamp"]); this._lambda = lambda; this._timeout = options.timeout; this._aggrigator = aggrigator diff --git a/lib/storage/inmemory.js b/lib/storage/inmemory.js index bcd11d0..61269c0 100644 --- a/lib/storage/inmemory.js +++ b/lib/storage/inmemory.js @@ -18,6 +18,7 @@ module.exports = class Inmemory { this._tupels = new Map(); this._events = new Events(); this._buckts = [[]]; + this._timestamp = options.timestamp; this._setTimer(); } @@ -104,16 +105,23 @@ module.exports = class Inmemory { console.error(`can't store store inactive`); } else { try { + let time; + if (!!this._timestamp) { + time = obj.data[this._timestamp]; + } else { + time = new Date().getTime(); + } const id = this._incriment(); + obj['_timestamp'] = time; this._tupels.set(id, obj); - this._addToBucket(id, obj.data); + this._addToBucket(id, obj.data, time); } catch (err) { this._flushBucket(obj.error); } } } - _addToBucket(id, data) { + _addToBucket(id, data, time) { let viewObj; if (!!this._attributes) { viewObj = _.pick(data, this._attributes); @@ -122,6 +130,7 @@ module.exports = class Inmemory { } viewObj['id'] = id; viewObj['groups'] = this._groupBy; + viewObj['_timestamp'] = time; this._currentBucket().push(viewObj); if (this._currentBucket().length >= this._maxBucketSize) { this._moveBucket(); diff --git a/lib/storage/leveldb.js b/lib/storage/leveldb.js index 52fcdc3..7b6dc7c 100644 --- a/lib/storage/leveldb.js +++ b/lib/storage/leveldb.js @@ -22,6 +22,7 @@ module.exports = class LevelDb { this._timerRef; this._active = true; this._error = null; + this._timestamp = options.timestamp; this.db = level(getDbPath(this._path), leveDBoptions); if (this._maxBucketSize > maxRecordSize / 2) { throw `Bucket size cannot be larger than ${maxRecordSize / 2}`; @@ -136,16 +137,23 @@ module.exports = class LevelDb { console.error(`can't store store inactive`); } else { try { + let time; + if (!!this._timestamp) { + time = obj.data[this._timestamp]; + } else { + time = new Date().getTime(); + } const id = this._incriment(); + obj['_timestamp'] = time; this.db.put(id, obj); - this._addToBucket(id, obj.data); // promise bnana hai kya + this._addToBucket(id, obj.data, time); // promise bnana hai kya } catch (err) { this._flushBucket(error); } } } - _addToBucket(id, data) { + _addToBucket(id, data, time) { let viewObj; let moveBucket; let bucktRefs; @@ -157,6 +165,7 @@ module.exports = class LevelDb { } viewObj['id'] = id; viewObj['groups'] = this._groupBy; + viewObj['_timestamp'] = time; this._currentBucket() .then(buckt => { bucktId = buckt; diff --git a/sample/basic-app.js b/sample/basic-app.js index c12727e..ca7f77c 100644 --- a/sample/basic-app.js +++ b/sample/basic-app.js @@ -25,24 +25,47 @@ const countries = ['japan', 'barmuda']; // .sink(Kafka.producer({ topic: 'log' })); // pipeline.source(Stream.consumer({ name: 'process' })) -// .flow((data, err, next) => { -// let num = parseInt(data.num); -// Object.assign(data, { num: num + 1 , from : countries[Math.floor(Math.random() * country.length)]}); -// Object.assign(data, { to : countries[Math.floor(Math.random() * country.length)]}); -// // throw new Error('Kaka punjabi'); -// next(data, err); -// }) -// .flow(Batch.reduce({ number: 5, timeout: 30000, groupBy: "from", attributes: ["num", "to"]}, -// (aggtr ,data) => { +// .flow((data, err, next) => { // let num = parseInt(data.num); -// aggtr.number += num; -// return aggtr; -// }, { number:0})) -// .sink((data, err, next) => { -// console.log("\n\n Reduced: \n", JSON.stringify(data, null, 3)); -// next(data, err); -// }); +// Object.assign(data, { num: num + 1, from: countries[Math.floor(Math.random() * country.length)] }); +// Object.assign(data, { to: countries[Math.floor(Math.random() * country.length)] }); +// // throw new Error('Kaka punjabi'); +// next(data, err); +// }) +// .flow(Batch.reduce({ timeout: 5000, groupBy: "from", type: "leveldb", path: "/Users/vaibhav/workspace/xbl/willa", attributes: ["num", "to"] }, +// (aggtr, data) => { +// let num = parseInt(data.num); +// aggtr.number += num; +// return aggtr; +// }, { number: 0 })) +// .sink((data, err, next) => { +// console.log("\n\n Reduced: \n", JSON.stringify(data, null, 3)); +// next(data, err); +// }); + +// pipeline.source(Stream.consumer({ name: 'process-mapper' })) +// .flow((data, err, next) => { +// let num = parseInt(data.num); +// Object.assign(data, { num: num + 1, from: country[Math.floor(Math.random() * country.length)] }); +// Object.assign(data, { to: country[Math.floor(Math.random() * country.length)] }); +// // throw new Error('Kaka punjabi'); +// next(data, err); +// }) +// .flow(Batch.map({ number: 5, timeout: 30000, groupBy: ["to", "from"], attributes: ["num", "from"], type: "leveldb" }, +// (data) => { +// return { "origination": data.from, "volume": data.num }; +// }) +// ) +// .sink(async (data, err, next) => { +// console.log("\n\n Mapped: \n", JSON.stringify(data, null, 3)); +// data.data.forEach(async element => { +// for await (const i of element.argdata) { +// console.log(i) +// } +// }); +// next(data, err); +// }); pipeline.source(Stream.consumer({ name: 'process-mapper' })) .flow((data, err, next) => { @@ -52,7 +75,7 @@ pipeline.source(Stream.consumer({ name: 'process-mapper' })) // throw new Error('Kaka punjabi'); next(data, err); }) - .flow(Batch.map({ number: 5, timeout: 30000, groupBy: ["to", "from"], attributes: ["num", "from"], type: "leveldb" }, + .flow(Batch.map({ number: 5, timeout: 30000, groupBy: ["to", "from"], attributes: ["num", "from" , "_timestamp"], timestamp : "time" }, (data) => { return { "origination": data.from, "volume": data.num }; }) @@ -67,6 +90,7 @@ pipeline.source(Stream.consumer({ name: 'process-mapper' })) next(data, err); }); + // pipeline.sourceCommitable(Kafka.consumer({ topic: 'log' })) // .flow((data, err, next) => { // next(data, err);