Skip to content

Commit

Permalink
TBE-87 : Switching to buffered consumer (#4658)
Browse files Browse the repository at this point in the history
* Switching to buffered consumer
* Added specs. The actual connection to mixpanel is not guarded by the mutex.
  • Loading branch information
tofarr authored Jul 31, 2024
1 parent 33bcc8b commit 55371b3
Show file tree
Hide file tree
Showing 2 changed files with 890 additions and 796 deletions.
50 changes: 38 additions & 12 deletions app/services/mixpanel_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,19 @@
# the singleton can be referenced using `MixpanelService.instance`
#
class MixpanelService
MAX_ATTEMPTS = 3
RETRY_DELAY = 30
MAX_BUFFER_SIZE = 50
FLUSH_DELAY = 5

include Singleton

def initialize
mixpanel_key = Rails.application.credentials.dig(:mixpanel_token)
return if mixpanel_key.nil?

@consumer = Mixpanel::Consumer.new
@buffer = []
@mutex = Mutex.new
@tracker = Mixpanel::Tracker.new(mixpanel_key) do |type, message|
send_event_to_mixpanel(type, message)
buffer_event_for_send(type, message)
end

# silence local SSL errors
Expand Down Expand Up @@ -51,17 +52,42 @@ def data_from(obj)

private

def send_event_to_mixpanel(type, message, num_attempts = 1, delay = 0)
task = Concurrent::ScheduledTask.new(delay) do
@consumer.send!(type, message)
rescue StandardError => err
if num_attempts >= MAX_ATTEMPTS
Rails.logger.error "Failed to consume tracking event '#{type}' async #{err}"
def buffer_event_for_send(type, message)
buffer = nil
@mutex.synchronize do
buffer = @buffer
buffer << [type, message]
@flusher.cancel if @flusher
if buffer.length < MAX_BUFFER_SIZE
init_flusher
return
else
send_event_to_mixpanel(type, message, num_attempts + 1, RETRY_DELAY)
@buffer = []
end
end
task.execute
if buffer.length >= MAX_BUFFER_SIZE
send_buffered_events_to_mixpanel(buffer)
end
end

def init_flusher
@flusher = Concurrent::ScheduledTask.new(FLUSH_DELAY) do
buffer = nil
@mutex.synchronize do
buffer = @buffer
@buffer = []
end
send_buffered_events_to_mixpanel(buffer)
end
@flusher.execute
end

def send_buffered_events_to_mixpanel(buffer)
consumer = Mixpanel::BufferedConsumer.new(nil, nil, nil, MAX_BUFFER_SIZE + 1)
buffer.each do |type, message|
consumer.send!(type, message)
end
consumer.flush
end

class << self
Expand Down
Loading

0 comments on commit 55371b3

Please sign in to comment.