Skip to content

Commit

Permalink
Track queue size to avoid costly calculations under load (#26)
Browse files Browse the repository at this point in the history
Co-authored-by: Silvio Sepulveda <silvio.sepulveda@derivco.se>
  • Loading branch information
ssep and Silvio Sepulveda authored Mar 24, 2020
1 parent 03acc19 commit 97367bf
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 20 deletions.
46 changes: 26 additions & 20 deletions lib/riemannx/connections/batch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ defmodule Riemannx.Connections.Batch do
import Riemannx.Settings
import Kernel, except: [send: 2]
alias Riemannx.Proto.Msg
alias __MODULE__
use GenServer

@behaviour Riemannx.Connection

defstruct [
:queue,
{:size, 0},
{:pending_flush, false},
{:ongoing_flush, false},
:flush_ref
Expand All @@ -57,13 +59,13 @@ defmodule Riemannx.Connections.Batch do

def init(_) do
Process.send_after(self(), :flush, batch_interval())
{:ok, %__MODULE__{queue: Qex.new()}}
{:ok, %Batch{queue: Qex.new(), size: 0}}
end

def handle_cast({:push, event}, state) do
state = %__MODULE__{queue: queue} = push(state, event)
state = %Batch{queue: queue, size: size} = push(state, event)

if queue_big_enough_to_flush?(queue),
if queue_big_enough_to_flush?(size),
do: {:noreply, flush(state)},
else: {:noreply, state}
end
Expand All @@ -73,7 +75,7 @@ defmodule Riemannx.Connections.Batch do
# a previous flush is finished, check if anyone requested another one in the meantime
def handle_info(
{:DOWN, ref, :process, _, _},
state = %__MODULE__{flush_ref: ref}
state = %Batch{flush_ref: ref}
),
do: {:noreply, state |> clear_ongoing_flush() |> flush_if_pending()}

Expand All @@ -97,42 +99,46 @@ defmodule Riemannx.Connections.Batch do
ref
end

defp flush(state = %__MODULE__{ongoing_flush: true}) do
defp flush(state = %Batch{ongoing_flush: true}) do
# try again when the flush is done
%__MODULE__{state | pending_flush: true}
%Batch{state | pending_flush: true}
end

defp flush(state = %__MODULE__{queue: queue}) do
defp flush(state = %Batch{queue: queue, size: size}) do
# the queue can grow larger than the configured batch size while we're waiting;
# if the remaining part is still big enough to flush, we'll do it right after this flush proc exits
{flush_window, remaining} = queue |> Enum.split(batch_size())
batch_size = batch_size()
{flush_window, remaining} = Enum.split(queue, batch_size)
ref = flush_window |> flush()
remaining_queue = Qex.new(remaining)
remaining_size =
case remaining do
[] -> 0
_ -> size - batch_size
end

%__MODULE__{
%Batch{
state
| pending_flush: queue_big_enough_to_flush?(remaining_queue),
| pending_flush: queue_big_enough_to_flush?(remaining_size),
ongoing_flush: ref != nil,
flush_ref: ref,
queue: remaining_queue
queue: remaining_queue,
size: remaining_size
}
end

defp flush_if_pending(state = %__MODULE__{pending_flush: true}),
defp flush_if_pending(state = %Batch{pending_flush: true}),
do: flush(state)

defp flush_if_pending(state), do: state

defp clear_ongoing_flush(state = %__MODULE__{}),
do: %__MODULE__{state | ongoing_flush: false, flush_ref: nil}
defp clear_ongoing_flush(state = %Batch{}),
do: %Batch{state | ongoing_flush: false, flush_ref: nil}

defp push(state = %__MODULE__{queue: queue}, event) do
%__MODULE__{state | queue: Qex.push(queue, event)}
end

defp queue_size(queue), do: Enum.count(queue)
defp push(state = %Batch{queue: queue, size: size}, event),
do: %Batch{state | queue: Qex.push(queue, event), size: size + 1}

defp queue_big_enough_to_flush?(queue), do: queue_size(queue) >= batch_size()
defp queue_big_enough_to_flush?(size), do: size >= batch_size()

defp do_spawn(batch) do
{_, ref} =
Expand Down
8 changes: 8 additions & 0 deletions test/riemannx_batch_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ defmodule RiemannxTest.Batch do
Riemannx.send_async(event)
end)

assert_queue_size() == true
refute assert_events_received(event, 5000)
Riemannx.send_async(event)

Expand All @@ -137,6 +138,7 @@ defmodule RiemannxTest.Batch do
end)

events = Enum.map(1..9, fn _ -> event end)
assert_queue_size() == true
assert assert_events_received(events, 1000)
end

Expand All @@ -154,6 +156,7 @@ defmodule RiemannxTest.Batch do

for _ <- 1..batch_number do
for e <- events, do: Riemannx.send_async(e)
assert_queue_size() == true
assert assert_events_received(events, 1000)
:timer.sleep(batch_interval() + 100)
end
Expand Down Expand Up @@ -225,4 +228,9 @@ defmodule RiemannxTest.Batch do
10_000 -> false
end
end

def assert_queue_size do
%{queue: queue, size: size} = :sys.get_state(Batch)
size == Enum.count(queue)
end
end

0 comments on commit 97367bf

Please sign in to comment.