-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AOPS-66] Add a Helper method and remove member from Set on raised error #2
base: master
Are you sure you want to change the base?
Changes from 5 commits
61569d0
9821ac6
500fba3
2e98171
db33d94
3aac507
a2f3906
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,3 +20,4 @@ tmp | |
*.o | ||
*.a | ||
mkmf.log | ||
vendor/* |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
inherit_from: .rubocop_todo.yml | ||
|
||
Style/StringLiterals: | ||
EnforcedStyle: "double_quotes" | ||
|
||
Metrics/BlockLength: | ||
Exclude: | ||
- "spec/**/*.rb" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,6 @@ | ||
source 'https://rubygems.org' | ||
# frozen_string_literal: true | ||
|
||
source "https://rubygems.org" | ||
|
||
# Specify your gem's dependencies in redis_dedupe.gemspec | ||
gemspec |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
require "bundler/gem_tasks" | ||
Dir.glob('tasks/**/*.rake').each(&method(:import)) | ||
# frozen_string_literal: true | ||
|
||
require "bundler/gem_tasks" | ||
Dir.glob("tasks/**/*.rake").each(&method(:import)) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,64 +1,4 @@ | ||
require 'redis_dedupe/version' | ||
# frozen_string_literal: true | ||
|
||
module RedisDedupe | ||
class<<self | ||
attr_accessor :client | ||
end | ||
|
||
class Set | ||
SEVEN_DAYS = 7 * 24 * 60 * 60 | ||
|
||
attr_reader :key, :expires_in | ||
|
||
def initialize(redis, key, expires_in = SEVEN_DAYS) | ||
@redis = redis | ||
@key = key | ||
@expires_in = expires_in | ||
end | ||
|
||
def check(member) | ||
results = redis.pipelined do | ||
redis.sadd(key, member) | ||
redis.expire(key, expires_in) | ||
end | ||
|
||
if results[0] | ||
yield | ||
end | ||
end | ||
|
||
def finish | ||
redis.del(key) | ||
end | ||
|
||
private | ||
|
||
def redis | ||
@redis | ||
end | ||
end | ||
|
||
module Helpers | ||
private | ||
|
||
def dedupe | ||
@dedupe ||= RedisDedupe::Set.new(RedisDedupe.client, [dedupe_namespace, dedupe_id].join(':')) | ||
end | ||
|
||
# Implement in class, should return an integer or string: | ||
# | ||
# Ex. | ||
# | ||
# def dedupe_id | ||
# @announcement.id # => 42 | ||
# end | ||
# | ||
def dedupe_id | ||
raise NotImplementedError | ||
end | ||
|
||
def dedupe_namespace | ||
self.class.name | ||
end | ||
end | ||
end | ||
require "redis_dedupe/set" | ||
require "redis_dedupe/helpers" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
# frozen_string_literal: true | ||
|
||
# :nodoc: | ||
module RedisDedupe | ||
# | ||
# Include `RedisDedupe::Helpers` to use +RedisDedupe::Set+ | ||
# | ||
# class MyClass | ||
# include RedisDedupe::Helpers | ||
# | ||
# private | ||
# | ||
# def dedupe_id | ||
# "my_unique_set_key" | ||
# end | ||
# end | ||
# | ||
module Helpers | ||
private | ||
|
||
def dedupe | ||
@dedupe ||= RedisDedupe::Set.new(RedisDedupe.client, key) | ||
end | ||
|
||
def key | ||
[dedupe_namespace, dedupe_id].join(":") | ||
end | ||
|
||
# Implement in class, should return an integer or string: | ||
# | ||
# Ex. | ||
# | ||
# def dedupe_id | ||
# @announcement.id # => 42 | ||
# end | ||
# | ||
def dedupe_id | ||
raise NotImplementedError | ||
end | ||
|
||
def dedupe_namespace | ||
self.class.name | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
# frozen_string_literal: true | ||
|
||
# :nodoc: | ||
module RedisDedupe | ||
class << self | ||
attr_accessor :client | ||
end | ||
|
||
# A mechanism to make sure that a block of code will only be called once for each specified identifier (or +member+) | ||
# even if the calling process dies or restarts, as long the datastore is +Redis+-backed. | ||
# | ||
# @example Keep the set around for 7 days, letting Redis handle its own memory cleanup after that time | ||
# ``` | ||
# dedupe = RedisDedupe::Set.new($redis, "send_payment_due_emails") | ||
# Account.all do |account| | ||
# dedupe.check(account.id) do | ||
# mail(to: account.billing_email, subject: "PAY US... NOW!!!") | ||
# end | ||
# end | ||
# ``` | ||
# | ||
# @example If you want to be able to repeat the process at any time immediately following this method | ||
bhcastle marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# ``` | ||
# dedupe = RedisDedupe::Set.new($redis, "send_welcome_emails") | ||
# Account.all.pluck(:email) do |email| | ||
# dedupe.check(email) { mail(to: email, subject: "Hello!") } | ||
# end | ||
# dedupe.finish | ||
# ``` | ||
# | ||
class Set | ||
SEVEN_DAYS = 7 * 24 * 60 * 60 | ||
|
||
attr_reader :key, :expires_in | ||
|
||
def initialize(redis, key, expires_in = SEVEN_DAYS) | ||
@redis = redis | ||
@key = key | ||
@expires_in = expires_in | ||
end | ||
|
||
# Ensures that a block of code will only be run if the +member+ is not already contained in Redis. | ||
# ie: the code block has not already run for the specified +member+. | ||
# | ||
# Note that if the given block raises an error, the +member+ will not remain in the +Set+ and may be tried again. | ||
# | ||
# @param [String, Integer] member identifiying value to make sure the given block only runs once | ||
# | ||
# @return `nil` if the block was not run, otherwise the result of the yielded block | ||
# | ||
def check(member) | ||
return unless execute_block_for_member?(member) | ||
|
||
# If it didn't get processed... we can retry it again | ||
# Unless it it's an error that's ALWAYS going to occur, in which case we might not want to do this. | ||
# Should we track/handle this differently? | ||
# But as long as the calling code doesn't error in the yielded block, all should be fine. | ||
begin | ||
yield | ||
rescue StandardError => e | ||
redis.del(key, member) | ||
bhcastle marked this conversation as resolved.
Show resolved
Hide resolved
|
||
raise e | ||
end | ||
end | ||
|
||
def finish | ||
redis.del(key) | ||
end | ||
|
||
# Retrieves the member in the set with the largest value. | ||
# | ||
# This will work on String and Integers, but really meant for Integer | ||
# If used for String, make sure it's really doing what you want and expect | ||
# | ||
# @example with Integers | ||
# redis.smembers("foo") => [1, 2, 3, 4, 5] | ||
# max_member => 5 | ||
# | ||
# @example with String | ||
# redis.smembers("foo") => ["abc", "xyz", "lmn"] | ||
# max_member => "xyz" | ||
# | ||
# @see Array#max | ||
# | ||
# @return [Integer, String] the member in the set with the largest value | ||
# | ||
def max_member | ||
redis.smembers(key).max | ||
end | ||
|
||
private | ||
|
||
attr_reader :redis | ||
|
||
def execute_block_for_member?(member) | ||
results = redis.pipelined do | ||
redis.sadd(key, member) | ||
redis.expire(key, expires_in) | ||
end | ||
|
||
block_given? && results[0] # `results` will be `[true]` or `[false]` | ||
bhcastle marked this conversation as resolved.
Show resolved
Hide resolved
|
||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,6 @@ | ||
# frozen_string_literal: true | ||
|
||
# :nodoc: | ||
module RedisDedupe | ||
VERSION = "0.0.3" | ||
VERSION = "0.0.4" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is one thing I was wondering about myself. Since the behavior is to re-run previously failed blocks, which could cause the loop to switch from "finishable" to "unfinishable" if the execution stoppage is repeatable, is it a major version bump perhaps? Seems like a potentially breaking change, but also, the behavior now does seem like a bug as well. Thoughts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeh, I am no a semantic versioning expert but think this is a good question. The interface is the same which feels like a minor version change. The modification of the exception handling feels borderline major with the new potential for duplicate execution of some block segments depending on when an exception occurs in a block with side effects. You could separate the clean refactor as a new minor version and then bump us to 0.1.0 or something for the one change to resquing if you wanted to make it more clear. |
||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,24 +1,34 @@ | ||
# coding: utf-8 | ||
lib = File.expand_path('../lib', __FILE__) | ||
# frozen_string_literal: true | ||
|
||
lib = File.expand_path("lib", __dir__) | ||
$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) | ||
require 'redis_dedupe/version' | ||
require "redis_dedupe/version" | ||
|
||
Gem::Specification.new do |spec| | ||
spec.name = "redis_dedupe" | ||
spec.version = RedisDedupe::VERSION | ||
spec.authors = ["Andy Huynh"] | ||
spec.email = ["andy4thehuynh@gmail.com"] | ||
spec.summary = %q{ A weak deduper to make things like bulk email run safer. } | ||
spec.description = %q{ This is a weak deduper to make things like bulk email run safer. It is not a lock safe for financial/security needs because it uses a weak redis locking pattern that can have race conditions. However, imagine a bulk email job that loops over 100 users, and enqueues a background email for each user. If the job fails at iteration 50, a retry would enqueue all the users again and many will receive dupes. This would continue multiple times as the parent job continued to rerun. By marking that a subjob has been enqueued, we can let that isolated job handle its own failures, and the batch enqueue job can run multiple times without re-enqueueing the same subjobs. } | ||
spec.homepage = "" | ||
spec.license = "MIT" | ||
|
||
spec.files = `git ls-files -z`.split("\x0") | ||
spec.executables = spec.files.grep(%r{^bin/}) { |f| File.basename(f) } | ||
spec.require_paths = ["lib"] | ||
spec.name = "redis_dedupe" | ||
spec.version = RedisDedupe::VERSION | ||
spec.required_ruby_version = ">= 2.6.0" | ||
spec.authors = ["Andy Huynh"] | ||
spec.email = ["andy4thehuynh@gmail.com"] | ||
spec.summary = "A weak deduper to make things like bulk email run safer." | ||
spec.homepage = "" | ||
spec.license = "MIT" | ||
spec.files = Dir["lib/**/*.rb"] + ["lib/redis_dedupe.rb"] | ||
spec.executables = spec.files.grep(%r{^bin/}) { |f| File.basename(f) } | ||
spec.require_paths = ["lib"] | ||
spec.description = <<~EO_DESC | ||
This is a weak deduper to make things like bulk email run safer. It is not a lock safe for financial/security needs | ||
because it uses a weak redis locking pattern that can have race conditions. However, imagine a bulk email job that | ||
loops over 100 users, and enqueues a background email for each user. If the job fails at iteration 50, a retry | ||
would enqueue all the users again and many will receive dupes. This would continue multiple times as the parent | ||
job continued to rerun. By marking that a subjob has been enqueued, we can let that isolated job handle its own | ||
failures, and the batch enqueue job can run multiple times without re-enqueueing the same subjobs. | ||
EO_DESC | ||
|
||
spec.add_development_dependency "bundler", "~> 1.6" | ||
spec.add_development_dependency "mock_redis" | ||
spec.add_development_dependency "rake" | ||
spec.add_development_dependency "rspec" | ||
spec.add_development_dependency "mock_redis" | ||
spec.add_development_dependency "rubocop" | ||
spec.add_development_dependency "timecop" | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
# frozen_string_literal: true | ||
|
||
require "mock_redis" | ||
require "spec_helper" | ||
|
||
require "redis_dedupe/helpers" | ||
|
||
RSpec.describe RedisDedupe::Helpers do | ||
let(:redis) { MockRedis.new } | ||
let(:instance) { RedisDedupeSpecStubbedClass.new } | ||
|
||
before do | ||
allow(RedisDedupe).to receive(:client).and_return(redis) | ||
end | ||
|
||
describe "#dedupe" do | ||
subject { instance.test_call } | ||
|
||
it { expect { subject }.to change(instance, :counter).from(0).to(2) } | ||
|
||
it "uses the correct redis key" do | ||
subject | ||
expect(redis.smembers("RedisDedupeSpecStubbedClass:just_a_test")).to match_array(%w[5 7]) | ||
end | ||
end | ||
end | ||
|
||
# :nodoc: | ||
class RedisDedupeSpecStubbedClass | ||
include RedisDedupe::Helpers | ||
|
||
attr_reader :counter | ||
|
||
def initialize | ||
@counter = 0 | ||
end | ||
|
||
def test_call | ||
dedupe.check(5) { @counter += 1 } | ||
dedupe.check(5) { @counter += 1 } | ||
dedupe.check(7) { @counter += 1 } | ||
end | ||
|
||
def dedupe_id | ||
"just_a_test" | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#curious/nonblocking -- I like the idea of decomposing the
RedisDedupe
module into aSet
and aHelpers
file. Did you consider also leaving this attr_accessor for the client in an otherwise emptymodule RedisDedupe
file? I found myself unsure of whereclient
would be found and thought it might be good below the require statements in that file. Totally non-blocking!