-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
timestamp added #42
base: leveldb
Are you sure you want to change the base?
timestamp added #42
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ module.exports = class LevelDb { | |
this._timerRef; | ||
this._active = true; | ||
this._error = null; | ||
this._timestamp = options.timestamp; | ||
this.db = level(getDbPath(this._path), leveDBoptions); | ||
if (this._maxBucketSize > maxRecordSize / 2) { | ||
throw `Bucket size cannot be larger than ${maxRecordSize / 2}`; | ||
|
@@ -136,16 +137,23 @@ module.exports = class LevelDb { | |
console.error(`can't store store inactive`); | ||
} else { | ||
try { | ||
let time; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. voluntarily capture both options.timesamp and message arrival timestamp and save them in the bucket. |
||
if (!!this._timestamp) { | ||
time = obj.data[this._timestamp]; | ||
} else { | ||
time = new Date().getTime(); | ||
} | ||
const id = this._incriment(); | ||
obj['_timestamp'] = time; | ||
this.db.put(id, obj); | ||
this._addToBucket(id, obj.data); // promise bnana hai kya | ||
this._addToBucket(id, obj.data, time); // promise bnana hai kya | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove the comment |
||
} catch (err) { | ||
this._flushBucket(error); | ||
} | ||
} | ||
} | ||
|
||
_addToBucket(id, data) { | ||
_addToBucket(id, data, time) { | ||
let viewObj; | ||
let moveBucket; | ||
let bucktRefs; | ||
|
@@ -157,6 +165,7 @@ module.exports = class LevelDb { | |
} | ||
viewObj['id'] = id; | ||
viewObj['groups'] = this._groupBy; | ||
viewObj['_timestamp'] = time; | ||
this._currentBucket() | ||
.then(buckt => { | ||
bucktId = buckt; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,24 +25,47 @@ const countries = ['japan', 'barmuda']; | |
// .sink(Kafka.producer({ topic: 'log' })); | ||
|
||
// pipeline.source(Stream.consumer({ name: 'process' })) | ||
// .flow((data, err, next) => { | ||
// let num = parseInt(data.num); | ||
// Object.assign(data, { num: num + 1 , from : countries[Math.floor(Math.random() * country.length)]}); | ||
// Object.assign(data, { to : countries[Math.floor(Math.random() * country.length)]}); | ||
// // throw new Error('Kaka punjabi'); | ||
// next(data, err); | ||
// }) | ||
// .flow(Batch.reduce({ number: 5, timeout: 30000, groupBy: "from", attributes: ["num", "to"]}, | ||
// (aggtr ,data) => { | ||
// .flow((data, err, next) => { | ||
// let num = parseInt(data.num); | ||
// aggtr.number += num; | ||
// return aggtr; | ||
// }, { number:0})) | ||
// .sink((data, err, next) => { | ||
// console.log("\n\n Reduced: \n", JSON.stringify(data, null, 3)); | ||
// next(data, err); | ||
// }); | ||
// Object.assign(data, { num: num + 1, from: countries[Math.floor(Math.random() * country.length)] }); | ||
// Object.assign(data, { to: countries[Math.floor(Math.random() * country.length)] }); | ||
// // throw new Error('Kaka punjabi'); | ||
// next(data, err); | ||
// }) | ||
// .flow(Batch.reduce({ timeout: 5000, groupBy: "from", type: "leveldb", path: "/Users/vaibhav/workspace/xbl/willa", attributes: ["num", "to"] }, | ||
// (aggtr, data) => { | ||
// let num = parseInt(data.num); | ||
// aggtr.number += num; | ||
// return aggtr; | ||
// }, { number: 0 })) | ||
// .sink((data, err, next) => { | ||
// console.log("\n\n Reduced: \n", JSON.stringify(data, null, 3)); | ||
// next(data, err); | ||
// }); | ||
|
||
|
||
// pipeline.source(Stream.consumer({ name: 'process-mapper' })) | ||
// .flow((data, err, next) => { | ||
// let num = parseInt(data.num); | ||
// Object.assign(data, { num: num + 1, from: country[Math.floor(Math.random() * country.length)] }); | ||
// Object.assign(data, { to: country[Math.floor(Math.random() * country.length)] }); | ||
// // throw new Error('Kaka punjabi'); | ||
// next(data, err); | ||
// }) | ||
// .flow(Batch.map({ number: 5, timeout: 30000, groupBy: ["to", "from"], attributes: ["num", "from"], type: "leveldb" }, | ||
// (data) => { | ||
// return { "origination": data.from, "volume": data.num }; | ||
// }) | ||
// ) | ||
// .sink(async (data, err, next) => { | ||
// console.log("\n\n Mapped: \n", JSON.stringify(data, null, 3)); | ||
// data.data.forEach(async element => { | ||
// for await (const i of element.argdata) { | ||
// console.log(i) | ||
// } | ||
// }); | ||
// next(data, err); | ||
// }); | ||
|
||
pipeline.source(Stream.consumer({ name: 'process-mapper' })) | ||
.flow((data, err, next) => { | ||
|
@@ -52,7 +75,7 @@ pipeline.source(Stream.consumer({ name: 'process-mapper' })) | |
// throw new Error('Kaka punjabi'); | ||
next(data, err); | ||
}) | ||
.flow(Batch.map({ number: 5, timeout: 30000, groupBy: ["to", "from"], attributes: ["num", "from"], type: "leveldb" }, | ||
.flow(Batch.map({ number: 5, timeout: 30000, groupBy: ["to", "from"], attributes: ["num", "from" , "_timestamp"], timestamp : "time" }, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. user will not be aware of _timestamp.
|
||
(data) => { | ||
return { "origination": data.from, "volume": data.num }; | ||
}) | ||
|
@@ -67,6 +90,7 @@ pipeline.source(Stream.consumer({ name: 'process-mapper' })) | |
next(data, err); | ||
}); | ||
|
||
|
||
// pipeline.sourceCommitable(Kafka.consumer({ topic: 'log' })) | ||
// .flow((data, err, next) => { | ||
// next(data, err); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
passed time stamp parameter should be treated similar to group by or attribute parameters