Skip to content

Commit

Permalink
add customDuration option for consume, penalty and reward
Browse files Browse the repository at this point in the history
  • Loading branch information
animirr committed Apr 4, 2019
1 parent 11a8ec3 commit 9c71074
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 36 deletions.
6 changes: 6 additions & 0 deletions lib/RateLimiterAbstract.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ module.exports = class RateLimiterAbstract {
this._keyPrefix = value;
}

_getKeySecDuration(options = {}) {
return options && options.customDuration
? options.customDuration
: this.duration;
}

getKey(key) {
return this.keyPrefix.length > 0 ? `${this.keyPrefix}:${key}` : key;
}
Expand Down
39 changes: 20 additions & 19 deletions lib/RateLimiterCluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ const workerWaitInit = function (payload) {
}, 30);
};

const workerSendToMaster = function (func, promiseId, key, arg) {
const workerSendToMaster = function (func, promiseId, key, arg, opts) {
const payload = {
channel,
keyPrefix: this.keyPrefix,
Expand All @@ -69,6 +69,7 @@ const workerSendToMaster = function (func, promiseId, key, arg) {
data: {
key,
arg,
opts,
},
};

Expand All @@ -89,22 +90,22 @@ const masterProcessMsg = function (worker, msg) {

switch (msg.func) {
case 'consume':
promise = this._rateLimiters[msg.keyPrefix].consume(msg.data.key, msg.data.arg);
promise = this._rateLimiters[msg.keyPrefix].consume(msg.data.key, msg.data.arg, msg.data.opts);
break;
case 'penalty':
promise = this._rateLimiters[msg.keyPrefix].penalty(msg.data.key, msg.data.arg);
promise = this._rateLimiters[msg.keyPrefix].penalty(msg.data.key, msg.data.arg, msg.data.opts);
break;
case 'reward':
promise = this._rateLimiters[msg.keyPrefix].reward(msg.data.key, msg.data.arg);
promise = this._rateLimiters[msg.keyPrefix].reward(msg.data.key, msg.data.arg, msg.data.opts);
break;
case 'block':
promise = this._rateLimiters[msg.keyPrefix].block(msg.data.key, msg.data.arg);
promise = this._rateLimiters[msg.keyPrefix].block(msg.data.key, msg.data.arg, msg.data.opts);
break;
case 'get':
promise = this._rateLimiters[msg.keyPrefix].get(msg.data.key);
promise = this._rateLimiters[msg.keyPrefix].get(msg.data.key, msg.data.opts);
break;
case 'delete':
promise = this._rateLimiters[msg.keyPrefix].delete(msg.data.key);
promise = this._rateLimiters[msg.keyPrefix].delete(msg.data.key, msg.data.opts);
break;
default:
return false;
Expand Down Expand Up @@ -308,51 +309,51 @@ class RateLimiterClusterWorker extends RateLimiterAbstract {
this._promises = {};
}

consume(key, pointsToConsume = 1) {
consume(key, pointsToConsume = 1, options = {}) {
return new Promise((resolve, reject) => {
const promiseId = savePromise.call(this, resolve, reject);

workerSendToMaster.call(this, 'consume', promiseId, key, pointsToConsume);
workerSendToMaster.call(this, 'consume', promiseId, key, pointsToConsume, options);
});
}

penalty(key, points = 1) {
penalty(key, points = 1, options = {}) {
return new Promise((resolve, reject) => {
const promiseId = savePromise.call(this, resolve, reject);

workerSendToMaster.call(this, 'penalty', promiseId, key, points);
workerSendToMaster.call(this, 'penalty', promiseId, key, points, options);
});
}

reward(key, points = 1) {
reward(key, points = 1, options = {}) {
return new Promise((resolve, reject) => {
const promiseId = savePromise.call(this, resolve, reject);

workerSendToMaster.call(this, 'reward', promiseId, key, points);
workerSendToMaster.call(this, 'reward', promiseId, key, points, options);
});
}

block(key, secDuration) {
block(key, secDuration, options = {}) {
return new Promise((resolve, reject) => {
const promiseId = savePromise.call(this, resolve, reject);

workerSendToMaster.call(this, 'block', promiseId, key, secDuration);
workerSendToMaster.call(this, 'block', promiseId, key, secDuration, options);
});
}

get(key) {
get(key, options = {}) {
return new Promise((resolve, reject) => {
const promiseId = savePromise.call(this, resolve, reject);

workerSendToMaster.call(this, 'get', promiseId, key);
workerSendToMaster.call(this, 'get', promiseId, key, options);
});
}

delete(key) {
delete(key, options = {}) {
return new Promise((resolve, reject) => {
const promiseId = savePromise.call(this, resolve, reject);

workerSendToMaster.call(this, 'delete', promiseId, key);
workerSendToMaster.call(this, 'delete', promiseId, key, options);
});
}
}
Expand Down
16 changes: 10 additions & 6 deletions lib/RateLimiterMemory.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ class RateLimiterMemory extends RateLimiterAbstract {
*
* @param key
* @param pointsToConsume
* @param {Object} options
* @returns {Promise<RateLimiterRes>}
*/
consume(key, pointsToConsume = 1) {
consume(key, pointsToConsume = 1, options = {}) {
return new Promise((resolve, reject) => {
const rlKey = this.getKey(key);
let res = this._memoryStorage.incrby(rlKey, pointsToConsume, this.duration);
const secDuration = this._getKeySecDuration(options);
let res = this._memoryStorage.incrby(rlKey, pointsToConsume, secDuration);
res.remainingPoints = Math.max(this.points - res.consumedPoints, 0);

if (res.consumedPoints > this.points) {
Expand All @@ -41,19 +43,21 @@ class RateLimiterMemory extends RateLimiterAbstract {
});
}

penalty(key, points = 1) {
penalty(key, points = 1, options = {}) {
const rlKey = this.getKey(key);
return new Promise((resolve) => {
const res = this._memoryStorage.incrby(rlKey, points, this.duration);
const secDuration = this._getKeySecDuration(options);
const res = this._memoryStorage.incrby(rlKey, points, secDuration);
res.remainingPoints = this.points - res.consumedPoints;
resolve(res);
});
}

reward(key, points = 1) {
reward(key, points = 1, options = {}) {
const rlKey = this.getKey(key);
return new Promise((resolve) => {
const res = this._memoryStorage.incrby(rlKey, -points, this.duration);
const secDuration = this._getKeySecDuration(options);
const res = this._memoryStorage.incrby(rlKey, -points, secDuration);
res.remainingPoints = this.points - res.consumedPoints;
resolve(res);
});
Expand Down
22 changes: 11 additions & 11 deletions lib/RateLimiterStoreAbstract.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ module.exports = class RateLimiterStoreAbstract extends RateLimiterAbstract {
}
}

_handleError(err, funcName, resolve, reject, key, data = false) {
_handleError(err, funcName, resolve, reject, key, data = false, options = {}) {
if (!(this.insuranceLimiter instanceof RateLimiterAbstract)) {
reject(err);
} else {
this.insuranceLimiter[funcName](key, data)
this.insuranceLimiter[funcName](key, data, options)
.then((res) => {
resolve(res);
})
Expand Down Expand Up @@ -175,12 +175,12 @@ module.exports = class RateLimiterStoreAbstract extends RateLimiterAbstract {
return reject(new RateLimiterRes(0, inmemoryBlockMsBeforeExpire));
}

this._upsert(rlKey, pointsToConsume, this.msDuration, false, options)
this._upsert(rlKey, pointsToConsume, this._getKeySecDuration(options) * 1000, false, options)
.then((res) => {
this._afterConsume(resolve, reject, rlKey, pointsToConsume, res);
})
.catch((err) => {
this._handleError(err, 'consume', resolve, reject, key, pointsToConsume);
this._handleError(err, 'consume', resolve, reject, key, pointsToConsume, options);
});
});
}
Expand All @@ -195,12 +195,12 @@ module.exports = class RateLimiterStoreAbstract extends RateLimiterAbstract {
penalty(key, points = 1, options = {}) {
const rlKey = this.getKey(key);
return new Promise((resolve, reject) => {
this._upsert(rlKey, points, this.msDuration, false, options)
this._upsert(rlKey, points, this._getKeySecDuration(options) * 1000, false, options)
.then((res) => {
resolve(this._getRateLimiterRes(rlKey, points, res));
})
.catch((err) => {
this._handleError(err, 'penalty', resolve, reject, key, points);
this._handleError(err, 'penalty', resolve, reject, key, points, options);
});
});
}
Expand All @@ -215,12 +215,12 @@ module.exports = class RateLimiterStoreAbstract extends RateLimiterAbstract {
reward(key, points = 1, options = {}) {
const rlKey = this.getKey(key);
return new Promise((resolve, reject) => {
this._upsert(rlKey, -points, this.msDuration, false, options)
this._upsert(rlKey, -points, this._getKeySecDuration(options) * 1000, false, options)
.then((res) => {
resolve(this._getRateLimiterRes(rlKey, -points, res));
})
.catch((err) => {
this._handleError(err, 'reward', resolve, reject, key, points);
this._handleError(err, 'reward', resolve, reject, key, points, options);
});
});
}
Expand All @@ -243,7 +243,7 @@ module.exports = class RateLimiterStoreAbstract extends RateLimiterAbstract {
}
})
.catch((err) => {
this._handleError(err, 'get', resolve, reject, key);
this._handleError(err, 'get', resolve, reject, key, options);
});
});
}
Expand All @@ -262,7 +262,7 @@ module.exports = class RateLimiterStoreAbstract extends RateLimiterAbstract {
resolve(res);
})
.catch((err) => {
this._handleError(err, 'delete', resolve, reject, key);
this._handleError(err, 'delete', resolve, reject, key, options);
});
});
}
Expand Down Expand Up @@ -297,7 +297,7 @@ module.exports = class RateLimiterStoreAbstract extends RateLimiterAbstract {
resolve(new RateLimiterRes(0, msDuration, initPoints));
})
.catch((err) => {
this._handleError(err, 'block', resolve, reject, this.parseKey(rlKey), msDuration / 1000);
this._handleError(err, 'block', resolve, reject, this.parseKey(rlKey), msDuration / 1000, options);
});
});
}
Expand Down
13 changes: 13 additions & 0 deletions test/RateLimiterCluster.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -200,5 +200,18 @@ describe('RateLimiterCluster', function () {
})
});
});

it('consume applies options.customDuration to set expire', (done) => {
const key = 'consume.customDuration';
const rateLimiterCluster = new RateLimiterCluster({points: 2, duration: 5, keyPrefix: key});
rateLimiterCluster.consume(key, 1, {customDuration: 1})
.then((res) => {
expect(res.msBeforeNext <= 1000).to.be.true;
done();
})
.catch((rej) => {
done(rej);
});
});
});

39 changes: 39 additions & 0 deletions test/RateLimiterMemory.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -241,4 +241,43 @@ describe('RateLimiterMemory with fixed window', function () {
done(Error('must not reject'));
});
});

it('consume applies options.customDuration to set expire', (done) => {
const testKey = 'options.customDuration';
const rateLimiterMemory = new RateLimiterMemory({ points: 1, duration: 5 });
rateLimiterMemory.consume(testKey, 1, {customDuration: 1})
.then((res) => {
expect(res.msBeforeNext <= 1000).to.be.true;
done();
})
.catch(() => {
done(Error('must not reject'));
});
});

it('penalty applies options.customDuration to set expire', (done) => {
const testKey = 'options.customDuration';
const rateLimiterMemory = new RateLimiterMemory({ points: 1, duration: 5 });
rateLimiterMemory.penalty(testKey, 1, {customDuration: 1})
.then((res) => {
expect(res.msBeforeNext <= 1000).to.be.true;
done();
})
.catch(() => {
done(Error('must not reject'));
});
});

it('reward applies options.customDuration to set expire', (done) => {
const testKey = 'options.customDuration';
const rateLimiterMemory = new RateLimiterMemory({ points: 1, duration: 5 });
rateLimiterMemory.reward(testKey, 1, {customDuration: 1})
.then((res) => {
expect(res.msBeforeNext <= 1000).to.be.true;
done();
})
.catch(() => {
done(Error('must not reject'));
});
});
});
62 changes: 62 additions & 0 deletions test/RateLimiterRedis.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -612,4 +612,66 @@ describe('RateLimiterRedis with fixed window', function() {
rateLimiter.delete(testKey)
.catch(() => done())
});

it('consume applies options.customDuration to set expire', (done) => {
const testKey = 'consume.customDuration';
const rateLimiter = new RateLimiterRedis({
storeClient: redisMockClient,
points: 2,
duration: 5,
});
rateLimiter
.consume(testKey, 1, {customDuration: 1})
.then((res) => {
expect(res.msBeforeNext <= 1000).to.be.true;
done();
})
.catch((err) => {
done(err);
});
});

it('insurance limiter on error consume applies options.customDuration to set expire', (done) => {
const testKey = 'consume.customDuration';
const rateLimiter = new RateLimiterRedis({
storeClient: redisMockClient,
points: 2,
duration: 5,
});
rateLimiter
.consume(testKey, 1, {customDuration: 1})
.then((res) => {
expect(res.msBeforeNext <= 1000).to.be.true;
done();
})
.catch((err) => {
done(err);
});
});

it('insurance limiter on error consume applies options.customDuration to set expire', (done) => {
const testKey = 'consume.customDuration.onerror';

const rateLimiter = new RateLimiterRedis({
storeClient: redisClientClosed,
points: 1,
duration: 2,
insuranceLimiter: new RateLimiterRedis({
points: 2,
duration: 3,
storeClient: redisMockClient,
}),
});

// Consume from insurance limiter with different options
rateLimiter
.consume(testKey, 1, {customDuration: 1})
.then((res) => {
expect(res.remainingPoints === 1 && res.msBeforeNext <= 1000).to.equal(true);
done();
})
.catch((rejRes) => {
done(rejRes);
});
});
});

0 comments on commit 9c71074

Please sign in to comment.