Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AOPS-66] Add a Helper method and remove member from Set on raised error #2

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ tmp
*.o
*.a
mkmf.log
vendor/*
8 changes: 8 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
inherit_from: .rubocop_todo.yml

Style/StringLiterals:
EnforcedStyle: "double_quotes"

Metrics/BlockLength:
Exclude:
- "spec/**/*.rb"
Empty file added .rubocop_todo.yml
Empty file.
4 changes: 3 additions & 1 deletion Gemfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
source 'https://rubygems.org'
# frozen_string_literal: true

source "https://rubygems.org"

# Specify your gem's dependencies in redis_dedupe.gemspec
gemspec
5 changes: 3 additions & 2 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
require "bundler/gem_tasks"
Dir.glob('tasks/**/*.rake').each(&method(:import))
# frozen_string_literal: true

require "bundler/gem_tasks"
Dir.glob("tasks/**/*.rake").each(&method(:import))
66 changes: 3 additions & 63 deletions lib/redis_dedupe.rb
Original file line number Diff line number Diff line change
@@ -1,64 +1,4 @@
require 'redis_dedupe/version'
# frozen_string_literal: true

module RedisDedupe
class<<self
attr_accessor :client
end

class Set
SEVEN_DAYS = 7 * 24 * 60 * 60

attr_reader :key, :expires_in

def initialize(redis, key, expires_in = SEVEN_DAYS)
@redis = redis
@key = key
@expires_in = expires_in
end

def check(member)
results = redis.pipelined do
redis.sadd(key, member)
redis.expire(key, expires_in)
end

if results[0]
yield
end
end

def finish
redis.del(key)
end

private

def redis
@redis
end
end

module Helpers
private

def dedupe
@dedupe ||= RedisDedupe::Set.new(RedisDedupe.client, [dedupe_namespace, dedupe_id].join(':'))
end

# Implement in class, should return an integer or string:
#
# Ex.
#
# def dedupe_id
# @announcement.id # => 42
# end
#
def dedupe_id
raise NotImplementedError
end

def dedupe_namespace
self.class.name
end
end
end
require "redis_dedupe/set"
require "redis_dedupe/helpers"
45 changes: 45 additions & 0 deletions lib/redis_dedupe/helpers.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# frozen_string_literal: true

# :nodoc:
module RedisDedupe
#
# Include `RedisDedupe::Helpers` to use +RedisDedupe::Set+
#
# class MyClass
# include RedisDedupe::Helpers
#
# private
#
# def dedupe_id
# "my_unique_set_key"
# end
# end
#
module Helpers
private

def dedupe
@dedupe ||= RedisDedupe::Set.new(RedisDedupe.client, key)
end

def key
[dedupe_namespace, dedupe_id].join(":")
end

# Implement in class, should return an integer or string:
#
# Ex.
#
# def dedupe_id
# @announcement.id # => 42
# end
#
def dedupe_id
raise NotImplementedError
end

def dedupe_namespace
self.class.name
end
end
end
103 changes: 103 additions & 0 deletions lib/redis_dedupe/set.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# frozen_string_literal: true

# :nodoc:
module RedisDedupe
class << self
attr_accessor :client
end
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#curious/nonblocking -- I like the idea of decomposing the RedisDedupe module into a Set and a Helpers file. Did you consider also leaving this attr_accessor for the client in an otherwise empty module RedisDedupe file? I found myself unsure of where client would be found and thought it might be good below the require statements in that file. Totally non-blocking!


# A mechanism to make sure that a block of code will only be called once for each specified identifier (or +member+)
# even if the calling process dies or restarts, as long the datastore is +Redis+-backed.
#
# @example Keep the set around for 7 days, letting Redis handle its own memory cleanup after that time
# ```
# dedupe = RedisDedupe::Set.new($redis, "send_payment_due_emails")
# Account.all do |account|
# dedupe.check(account.id) do
# mail(to: account.billing_email, subject: "PAY US... NOW!!!")
# end
# end
# ```
#
# @example If you want to be able to repeat the process at any time immediately following this method
bhcastle marked this conversation as resolved.
Show resolved Hide resolved
# ```
# dedupe = RedisDedupe::Set.new($redis, "send_welcome_emails")
# Account.all.pluck(:email) do |email|
# dedupe.check(email) { mail(to: email, subject: "Hello!") }
# end
# dedupe.finish
# ```
#
class Set
SEVEN_DAYS = 7 * 24 * 60 * 60

attr_reader :key, :expires_in

def initialize(redis, key, expires_in = SEVEN_DAYS)
@redis = redis
@key = key
@expires_in = expires_in
end

# Ensures that a block of code will only be run if the +member+ is not already contained in Redis.
# ie: the code block has not already run for the specified +member+.
#
# Note that if the given block raises an error, the +member+ will not remain in the +Set+ and may be tried again.
#
# @param [String, Integer] member identifiying value to make sure the given block only runs once
#
# @yield block to run for the specified +member+, which should only be run once for any particular member
#
# @return `nil` if the block was not run, otherwise the result of the yielded block
#
def check(member, &block)
raise ArgumentError, "passing a block is required" if block.nil?
return nil unless execute_block_for_member?(member)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#nonblocking -- I like the shift of this to a guard clause since it clarifies the purpose of the check method: execute the block unless the block is a duplicate. I always get a little nervous when a ? method has side effects (setting the key and updating its expiration). Still, I think setting a key and checking the result is a well-established way to use Redis to answer global application state questions like this.

I wonder if a positive version that also trivially removes the unnecessary nil would ready better? Private methods that isolate the add and expire logic from the boolean might also clarify things.

Suggested change
return nil unless execute_block_for_member?(member)
return if duplicate_request?(member)

And the supporting methods might look like this:

# @param member [String, Integer]  member identifying value to make sure the
#   given block only runs once
# @return [Boolean] when a member cannot be added to the set, it is considered a duplicate
def duplicate_request?(member)
  (added_to_set, _expiration_updated) = add_key_and_update_ttl(member)
  !added_to_set
end

# @param member [String, Integer]  member identifying value to make sure the
#   given block only runs once
# @return [Array<Boolean>] a list of pipeline results, the first element of which
#   will be true or false depending on whether or not the add operation was
#   successful. If false, the requested member should be considered a duplicate.
def add_key_and_update_ttl(member)
  redis.pipelined do
    redis.sadd(key, member)
    redis.expire(key, expires_in)
  end
end

Totally optional and maybe not even an improvement. I personally needed to think pretty hard about why "unless execute_block_for_member?(member)" should cause the block not to run, as well as look up why results[0] was a boolean indicating the sadd succeeded or failed. Might just be my rusty Redis brain.


begin
block.call
rescue StandardError => e
redis.srem(key, member)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is the change I'm still wrapping my head around. I think the behavior looks right compared to what we have (in fact, what we have I'd say is probably a bug).

I think the biggest trail my brain is going down is, could this have a negative impact on any existing app flows? I'm thinking for instance, if we run a large execution of a collection of records having to do with mailing, and there's a failure partway through the batch that is repeatable, that execution job would never be able to finish the collection. Currently, that becomes a "skip" and the collection can finish (this has its own problems, what if we are skipping something really important). As a side note, another mechanism that comes to mind is some type of dead-letter queue.

I'm thinking one thing worth it would be to run the change by some teams with big collection/batch processes that rely on dedupe. Manage and Messaging teams come to mind. I'm interested in getting their thoughts on the behavioral change as it could impact their domains. /cc @cjbuchmann @thunderd0m3 @aburka @cforcey

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very interesting proposed change. I agree that what we have is essentially a try_once_and_skip_on_exception sort of functionality from this class which might come as a surprise to some consumers of the gem. Your version feels closer to execute_successfully_once functionality that allows for transient errors sending an email to be recovered from by clearing the key from the set.

I am guessing your nervousness -- and mine -- comes from the wide-open nature of the block being wrapped by this helper. If it is a pipeline has several steps each with side effects, then an exception on a later step might trigger a job retry duplicating side effects up to that point. This feels a little far-fetched if what is being wrapped is a single invocation of a mailer. but some of our usages look a bit more complicated.

The API for this class is already a little odd -- RedisDedupe::Set#check does not roll off the tongue as a well-understood guarantee like run_only_once or run_successfully_only_once might. It might be total overkill, but you could offer an additional method in the current naming scheme like check_finished that wraps the check method in your rescue block.

That would allow consumers of this library to decide whether it is more important to never run this trice -- exceptionally or successfully -- or to tolerate a few retries because it is important that at least one (but perhaps more) succeed. The downside of that, of course, is that the new better behavior does not magically get applied in most of the cases where you would want it to. You could also make your method check and have it wrap check_if_duplicate_or_try_once or some such method. If someone really cared about the old behavior they could use that method (horribly named).

My last thought is that one RedisDedup is unlikely to handle all the different use cases for run_once functionality -- marketing messages already use email_deliveries to try to sort out what was sent in a first attempt and what should be retried. Our rewrites of the transactional email system will most likely involve a more nuanced record like email_delivery for those messages that track the state of a delivery attempt and use that state to prevent the dreaded multiple emails problem. Your PR inspired this small problem statement along those lines -- feedback is welcome! https://kajabi.atlassian.net/wiki/spaces/MAR/pages/1234141598/Duplicate+Email+Prevention+in+Transactional+Email

raise e
end
end

def finish
redis.del(key)
end

# Retrieves the member in the set with the largest value.
#
# This will work on String and Integers, but really meant for Integer
# If used for String, make sure it's really doing what you want and expect
#
# @example with Integers
# redis.smembers("foo") => [1, 2, 3, 4, 5]
# max_member => 5
#
# @example with String
# redis.smembers("foo") => ["abc", "xyz", "lmn"]
# max_member => "xyz"
#
# @see Array#max
#
# @return [Integer, String] the member in the set with the largest value
#
def max_member
redis.smembers(key).max
end

private

attr_reader :redis

def execute_block_for_member?(member)
results = redis.pipelined do
redis.sadd(key, member)
redis.expire(key, expires_in)
end

results[0] # `results` will be `[true]` or `[false]`
end
end
end
5 changes: 4 additions & 1 deletion lib/redis_dedupe/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# frozen_string_literal: true

# :nodoc:
module RedisDedupe
VERSION = "0.0.3"
VERSION = "0.0.4"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one thing I was wondering about myself. Since the behavior is to re-run previously failed blocks, which could cause the loop to switch from "finishable" to "unfinishable" if the execution stoppage is repeatable, is it a major version bump perhaps? Seems like a potentially breaking change, but also, the behavior now does seem like a bug as well. Thoughts?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeh, I am no a semantic versioning expert but think this is a good question. The interface is the same which feels like a minor version change. The modification of the exception handling feels borderline major with the new potential for duplicate execution of some block segments depending on when an exception occurs in a block with side effects. You could separate the clean refactor as a new minor version and then bump us to 0.1.0 or something for the one change to resquing if you wanted to make it more clear.

end
42 changes: 26 additions & 16 deletions redis_dedupe.gemspec
Original file line number Diff line number Diff line change
@@ -1,24 +1,34 @@
# coding: utf-8
lib = File.expand_path('../lib', __FILE__)
# frozen_string_literal: true

lib = File.expand_path("lib", __dir__)
$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)
require 'redis_dedupe/version'
require "redis_dedupe/version"

Gem::Specification.new do |spec|
spec.name = "redis_dedupe"
spec.version = RedisDedupe::VERSION
spec.authors = ["Andy Huynh"]
spec.email = ["andy4thehuynh@gmail.com"]
spec.summary = %q{ A weak deduper to make things like bulk email run safer. }
spec.description = %q{ This is a weak deduper to make things like bulk email run safer. It is not a lock safe for financial/security needs because it uses a weak redis locking pattern that can have race conditions. However, imagine a bulk email job that loops over 100 users, and enqueues a background email for each user. If the job fails at iteration 50, a retry would enqueue all the users again and many will receive dupes. This would continue multiple times as the parent job continued to rerun. By marking that a subjob has been enqueued, we can let that isolated job handle its own failures, and the batch enqueue job can run multiple times without re-enqueueing the same subjobs. }
spec.homepage = ""
spec.license = "MIT"

spec.files = `git ls-files -z`.split("\x0")
spec.executables = spec.files.grep(%r{^bin/}) { |f| File.basename(f) }
spec.require_paths = ["lib"]
spec.name = "redis_dedupe"
spec.version = RedisDedupe::VERSION
spec.required_ruby_version = ">= 2.6.0"
spec.authors = ["Andy Huynh"]
spec.email = ["andy4thehuynh@gmail.com"]
spec.summary = "A weak deduper to make things like bulk email run safer."
spec.homepage = ""
spec.license = "MIT"
spec.files = Dir["lib/**/*.rb"] + ["lib/redis_dedupe.rb"]
spec.executables = spec.files.grep(%r{^bin/}) { |f| File.basename(f) }
spec.require_paths = ["lib"]
spec.description = <<~EO_DESC
This is a weak deduper to make things like bulk email run safer. It is not a lock safe for financial/security needs
because it uses a weak redis locking pattern that can have race conditions. However, imagine a bulk email job that
loops over 100 users, and enqueues a background email for each user. If the job fails at iteration 50, a retry
would enqueue all the users again and many will receive dupes. This would continue multiple times as the parent
job continued to rerun. By marking that a subjob has been enqueued, we can let that isolated job handle its own
failures, and the batch enqueue job can run multiple times without re-enqueueing the same subjobs.
EO_DESC

spec.add_development_dependency "bundler", "~> 1.6"
spec.add_development_dependency "mock_redis"
spec.add_development_dependency "rake"
spec.add_development_dependency "rspec"
spec.add_development_dependency "mock_redis"
spec.add_development_dependency "rubocop"
spec.add_development_dependency "timecop"
end
45 changes: 45 additions & 0 deletions spec/redis_dedupe/helpers_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# frozen_string_literal: true

require "mock_redis"
require "spec_helper"

require "redis_dedupe/helpers"

RSpec.describe RedisDedupe::Helpers do
let(:redis) { MockRedis.new }
let(:instance) { RedisDedupeSpecStubbedClass.new }

before do
allow(RedisDedupe).to receive(:client).and_return(redis)
end

describe "#dedupe" do
subject { instance.test_call }

it { expect(subject).to eq(2) }

it "uses the correct redis key" do
subject
expect(redis.smembers("RedisDedupeSpecStubbedClass:just_a_test")).to match_array(%w[5 7])
end
end
end

# :nodoc:
class RedisDedupeSpecStubbedClass
include RedisDedupe::Helpers

def test_call
counter = 0

dedupe.check(5) { counter += 1 }
dedupe.check(5) { counter += 1 }
dedupe.check(7) { counter += 1 }

counter
end

def dedupe_id
"just_a_test"
end
end
Loading