Skip to content

Commit

Permalink
Debounce jobs to broadcast page refresh signals
Browse files Browse the repository at this point in the history
A page refresh stream signals the need to reload the page. It's a global action
that makes sense to aggregate when multiple signals are generated in a short period
of time for a given streamable.

This implementation is based on creating a thread-level debouncer associated to
the set of streamables. The debouncer is implemented using concurrent-ruby's
scheduled tasks.
  • Loading branch information
jorgemanrubia committed Oct 31, 2023
1 parent a669409 commit 8fcc54b
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 13 deletions.
9 changes: 7 additions & 2 deletions app/channels/turbo/streams/broadcasts.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ def broadcast_prepend_later_to(*streamables, **opts)
broadcast_action_later_to(*streamables, action: :prepend, **opts)
end

def broadcast_refresh_later_to(*streamables, **opts)
Turbo::Streams::BroadcastStreamJob.perform_later stream_name_from(streamables), content: turbo_stream_refresh_tag(**opts)
def broadcast_refresh_later_to(*streamables, request_id: Turbo.current_request_id, **opts)
refresh_debouncer_for(*streamables, request_id: request_id).debounce do
Turbo::Streams::BroadcastStreamJob.perform_later stream_name_from(streamables), content: turbo_stream_refresh_tag(request_id: request_id, **opts)
end
end

def broadcast_action_later_to(*streamables, action:, target: nil, targets: nil, attributes: {}, **rendering)
Expand All @@ -89,6 +91,9 @@ def broadcast_stream_to(*streamables, content:)
ActionCable.server.broadcast stream_name_from(streamables), content
end

def refresh_debouncer_for(*streamables, request_id: nil) # :nodoc:
Turbo::ThreadDebouncer.for("turbo-refresh-debouncer-#{stream_name_from(streamables.including(request_id))}")
end

private
def render_format(format, **rendering)
Expand Down
4 changes: 2 additions & 2 deletions app/helpers/turbo/streams/action_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ def turbo_stream_action_tag(action, target: nil, targets: nil, template: nil, **
end
end

def turbo_stream_refresh_tag(**attributes)
turbo_stream_action_tag(:refresh, **{ "request-id": Turbo.current_request_id }.compact, **attributes)
def turbo_stream_refresh_tag(request_id: Turbo.current_request_id, **attributes)
turbo_stream_action_tag(:refresh, **{ "request-id": request_id }.compact, **attributes)
end

private
Expand Down
8 changes: 4 additions & 4 deletions app/models/concerns/turbo/broadcastable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -354,12 +354,12 @@ def broadcast_prepend_later(target: broadcast_target_default, **rendering)
broadcast_prepend_later_to self, target: target, **rendering
end

def broadcast_refresh_later_to(*streamables, target: broadcast_target_default, **rendering)
Turbo::StreamsChannel.broadcast_refresh_later_to(*streamables, target: target, **broadcast_rendering_with_defaults(rendering).merge(request_id: Turbo.current_request_id)) unless suppressed_turbo_broadcasts?
def broadcast_refresh_later_to(*streamables)
Turbo::StreamsChannel.broadcast_refresh_later_to(*streamables, request_id: Turbo.current_request_id) unless suppressed_turbo_broadcasts?
end

def broadcast_refresh_later(target: broadcast_target_default, **rendering)
broadcast_refresh_later_to self, target: target, **rendering
def broadcast_refresh_later
broadcast_refresh_later_to self
end

# Same as <tt>broadcast_action_to</tt> but run asynchronously via a <tt>Turbo::Streams::BroadcastJob</tt>.
Expand Down
24 changes: 24 additions & 0 deletions app/models/turbo/debouncer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
class Turbo::Debouncer
attr_reader :delay, :scheduled_task

DEFAULT_DELAY = 0.5

def initialize(delay: DEFAULT_DELAY)
@delay = delay
@scheduled_task = nil
end

def debounce(&block)
scheduled_task&.cancel unless scheduled_task&.complete?
@scheduled_task = Concurrent::ScheduledTask.execute(delay, &block)
end

def wait
scheduled_task.wait(wait_timeout)
end

private
def wait_timeout
delay + 1
end
end
28 changes: 28 additions & 0 deletions app/models/turbo/thread_debouncer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# A decorated debouncer that will store instances in the current thread clearing them
# after the debounced logic triggers.
class Turbo::ThreadDebouncer
delegate :wait, to: :debouncer

def self.for(key, delay: Turbo::Debouncer::DEFAULT_DELAY)
Thread.current[key] ||= new(key, Thread.current, delay: delay)
end

private_class_method :new

def initialize(key, thread, delay: )
@key = key
@debouncer = Turbo::Debouncer.new(delay: delay)
@thread = thread
end

def debounce
debouncer.debounce do
yield.tap do
thread[key] = nil
end
end
end

private
attr_reader :key, :debouncer, :thread
end
18 changes: 16 additions & 2 deletions test/streams/broadcastable_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,20 @@ class Turbo::BroadcastableTest < ActionCable::Channel::TestCase
end
end

test "broadcasting refresh later is debounced" do
assert_broadcast_on @message.to_gid_param, turbo_stream_refresh_tag do
assert_broadcasts(@message.to_gid_param, 1) do
perform_enqueued_jobs do
assert_no_changes -> { Thread.current.keys.size } do
# Not leaking thread variables once the debounced code executes
3.times { @message.broadcast_refresh_later }
Turbo::StreamsChannel.refresh_debouncer_for(@message).wait
end
end
end
end
end

test "broadcasting action to stream now" do
assert_broadcast_on "stream", turbo_stream_action_tag("prepend", target: "messages", template: render(@message)) do
@message.broadcast_action_to "stream", action: "prepend"
Expand Down Expand Up @@ -146,7 +160,7 @@ class Turbo::BroadcastableTest < ActionCable::Channel::TestCase

test "broadcasting action later to with attributes" do
@message.save!

assert_broadcast_on @message.to_gid_param, turbo_stream_action_tag("prepend", target: "messages", template: render(@message), "data-foo" => "bar") do
perform_enqueued_jobs do
@message.broadcast_action_later_to @message, action: "prepend", target: "messages", attributes: { "data-foo" => "bar" }
Expand Down Expand Up @@ -176,7 +190,7 @@ class Turbo::BroadcastableTest < ActionCable::Channel::TestCase

test "broadcasting action later with no rendering" do
@message.save!

assert_broadcast_on @message.to_gid_param, turbo_stream_action_tag("prepend", target: "messages", template: nil) do
perform_enqueued_jobs do
@message.broadcast_action_later action: "prepend", target: "messages", render: false
Expand Down
37 changes: 34 additions & 3 deletions test/streams/streams_channel_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ class Turbo::StreamsChannelTest < ActionCable::Channel::TestCase
assert_equal "stream", Turbo::StreamsChannel.verified_stream_name(Turbo::StreamsChannel.signed_stream_name("stream"))
end


test "broadcasting remove now" do
assert_broadcast_on "stream", turbo_stream_action_tag("remove", target: "message_1") do
Turbo::StreamsChannel.broadcast_remove_to "stream", target: "message_1"
Expand Down Expand Up @@ -175,13 +174,46 @@ class Turbo::StreamsChannelTest < ActionCable::Channel::TestCase
assert_broadcast_on "stream", turbo_stream_refresh_tag do
perform_enqueued_jobs do
Turbo::StreamsChannel.broadcast_refresh_later_to "stream"
Turbo::StreamsChannel.refresh_debouncer_for("stream").wait
end
end

Turbo.current_request_id = "123"
assert_broadcast_on "stream", turbo_stream_refresh_tag do
assert_broadcast_on "stream", turbo_stream_refresh_tag(request_id: "123") do
perform_enqueued_jobs do
Turbo::StreamsChannel.broadcast_refresh_later_to "stream"
Turbo::StreamsChannel.refresh_debouncer_for("stream", request_id: "123").wait
end
end
end

test "broadcasting refresh later is debounced" do
assert_broadcast_on "stream", turbo_stream_refresh_tag do
assert_broadcasts("stream", 1) do
perform_enqueued_jobs do
Turbo::StreamsChannel.broadcast_refresh_later_to "stream"

Turbo::StreamsChannel.refresh_debouncer_for("stream").wait
end
end
end
end

test "broadcasting refresh later is debounced considering the current request id" do
assert_broadcasts("stream", 2) do
perform_enqueued_jobs do
assert_broadcast_on "stream", turbo_stream_refresh_tag("request-id": "123") do
assert_broadcast_on "stream", turbo_stream_refresh_tag("request-id": "456") do
Turbo.current_request_id = "123"
3.times { Turbo::StreamsChannel.broadcast_refresh_later_to "stream" }

Turbo.current_request_id = "456"
3.times { Turbo::StreamsChannel.broadcast_refresh_later_to "stream" }

Turbo::StreamsChannel.refresh_debouncer_for("stream", request_id: "123").wait
Turbo::StreamsChannel.refresh_debouncer_for("stream", request_id: "456").wait
end
end
end
end
end
Expand All @@ -204,7 +236,6 @@ class Turbo::StreamsChannelTest < ActionCable::Channel::TestCase
end
end


test "broadcasting render now" do
assert_broadcast_on "stream", turbo_stream_action_tag("replace", target: "message_1", template: "Goodbye!") do
Turbo::StreamsChannel.broadcast_render_to "stream", partial: "messages/message"
Expand Down

0 comments on commit 8fcc54b

Please sign in to comment.