Skip to content

Commit

Permalink
Emit events from the Metric.Registry.Populate runs
Browse files Browse the repository at this point in the history
  • Loading branch information
IvanIvanoff committed Dec 20, 2024
1 parent bebd9ad commit 431a4e3
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 3 deletions.
1 change: 1 addition & 0 deletions lib/sanbase/event_bus/event_validation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ defmodule Sanbase.EventBus.EventValidation do
event_type: event_type
})
when event_type in [
:bulk_metric_registry_change,
:create_metric_registry,
:update_metric_registry,
:delete_metric_registry
Expand Down
27 changes: 27 additions & 0 deletions lib/sanbase/event_bus/metric_registry_subscriber.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ defmodule Sanbase.EventBus.MetricRegistrySubscriber do
{:noreply, state}
end

def on_metric_registry_bulk_change(_event_type, _count) do
:ok = Sanbase.Metric.Registry.refresh_stored_terms()

:ok
end

def on_metric_registry_change(_event_type, _metric) do
:ok = Sanbase.Metric.Registry.refresh_stored_terms()

Expand All @@ -64,6 +70,27 @@ defmodule Sanbase.EventBus.MetricRegistrySubscriber do
:ok
end

defp handle_event(
%{data: %{event_type: event_type, inserts_count: i_count, updates_count: u_count}},
event_shadow,
state
)
when event_type in [:bulk_metric_registry_change] do
Logger.info("Start refreshing stored terms from #{__MODULE__}")

{mod, fun} =
Config.module_get(
__MODULE__,
:metric_registry_bulk_change_handler,
{__MODULE__, :on_metric_registry_bulk_change}
)

:ok = apply(mod, fun, [event_type, _count = i_count + u_count])

EventBus.mark_as_completed({__MODULE__, event_shadow})
state
end

defp handle_event(
%{data: %{event_type: event_type, metric: metric}},
event_shadow,
Expand Down
7 changes: 7 additions & 0 deletions lib/sanbase/metric/registry/event_emitter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ defmodule Sanbase.Metric.Registry.EventEmitter do
|> notify()
end

def handle_event({:ok, map}, event_type = :bulk_metric_registry_change, args) do
%{event_type: event_type}
|> Map.merge(map)
|> Map.merge(args)
|> notify()
end

def handle_event({:ok, struct}, event_type, args)
when event_type in [
:create_metric_registry,
Expand Down
50 changes: 48 additions & 2 deletions lib/sanbase/metric/registry/populate.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule Sanbase.Metric.Registry.Populate do
populate()
end)
|> case do
{:ok, {:ok, result}} -> {:ok, result}
{:ok, {:ok, list, summary}} -> {:ok, list, summary}
data -> data
end
end
Expand Down Expand Up @@ -53,7 +53,9 @@ defmodule Sanbase.Metric.Registry.Populate do
def populate() do
case process_metrics() do
list when is_list(list) ->
summarize_results(list)
{:ok, list, summary} = summarize_results(list)
emit_events(list, summary)
{:ok, list, summary}

{:error, %Ecto.Changeset{} = error} ->
log_and_return_changeset_error(error)
Expand Down Expand Up @@ -144,4 +146,48 @@ defmodule Sanbase.Metric.Registry.Populate do
Reason: #{inspect(error)}
""")
end

defp emit_events(list, summary) do
inserts = Map.get(summary, :insert, [])
updates = Map.get(summary, :update, [])

if inserts != [] or updates != [] do
map = %{inserts_count: inserts, updates_count: updates}

{inserted_metrics, updated_metrics} =
Enum.reduce(list, {[], []}, fn {type, record}, {insert_acc, update_acc} ->
case type do
:insert -> {[record.metric | insert_acc], update_acc}
:update -> {insert_acc, [record.metric | update_acc]}
_ -> {insert_acc, update_acc}
end
end)

# Emit locally event with more data. The distributed events are used only to refresh the
# persistent term, the local node event will also trigger notifications
local_event_map =
Map.merge(map, %{inserted_metrics: inserted_metrics, updated_metrics: updated_metrics})

Sanbase.Metric.Registry.EventEmitter.emit_event(
{:ok, local_event_map},
:bulk_metric_registry_change,
%{}
)

# Emit distributed event only to the MetricRegistrySubscriber so it refreshed the stored data
# in persistent term
Node.list()
|> Enum.each(fn node ->
Node.spawn(node, fn ->
Sanbase.Metric.Registry.EventEmitter.emit_event(
{:ok, map},
:bulk_update_metric_registry,
%{__only_process_by__: [Sanbase.EventBus.MetricRegistrySubscriber]}
)
end)
end)
else
:ok
end
end
end
2 changes: 1 addition & 1 deletion test/test_seeds.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ IO.puts("Running test seeds")

IO.puts("Populating the Metric Registry...")

{:ok, metrics} = Sanbase.Metric.Registry.Populate.run()
{:ok, metrics, _summary} = Sanbase.Metric.Registry.Populate.run()

IO.puts("Finished populating the Metric Registry. Inserted #{length(metrics)} metrics")

0 comments on commit 431a4e3

Please sign in to comment.