diff --git a/lib/sanbase/event_bus/event_validation.ex b/lib/sanbase/event_bus/event_validation.ex index ad6433c11..34d82ed82 100644 --- a/lib/sanbase/event_bus/event_validation.ex +++ b/lib/sanbase/event_bus/event_validation.ex @@ -223,6 +223,7 @@ defmodule Sanbase.EventBus.EventValidation do event_type: event_type }) when event_type in [ + :bulk_metric_registry_change, :create_metric_registry, :update_metric_registry, :delete_metric_registry diff --git a/lib/sanbase/event_bus/metric_registry_subscriber.ex b/lib/sanbase/event_bus/metric_registry_subscriber.ex index a453dad95..c4e8a324b 100644 --- a/lib/sanbase/event_bus/metric_registry_subscriber.ex +++ b/lib/sanbase/event_bus/metric_registry_subscriber.ex @@ -51,6 +51,12 @@ defmodule Sanbase.EventBus.MetricRegistrySubscriber do {:noreply, state} end + def on_metric_registry_bulk_change(_event_type, _count) do + :ok = Sanbase.Metric.Registry.refresh_stored_terms() + + :ok + end + def on_metric_registry_change(_event_type, _metric) do :ok = Sanbase.Metric.Registry.refresh_stored_terms() @@ -64,6 +70,27 @@ defmodule Sanbase.EventBus.MetricRegistrySubscriber do :ok end + defp handle_event( + %{data: %{event_type: event_type, inserts_count: i_count, updates_count: u_count}}, + event_shadow, + state + ) + when event_type in [:bulk_metric_registry_change] do + Logger.info("Start refreshing stored terms from #{__MODULE__}") + + {mod, fun} = + Config.module_get( + __MODULE__, + :metric_registry_bulk_change_handler, + {__MODULE__, :on_metric_registry_bulk_change} + ) + + :ok = apply(mod, fun, [event_type, _count = i_count + u_count]) + + EventBus.mark_as_completed({__MODULE__, event_shadow}) + state + end + defp handle_event( %{data: %{event_type: event_type, metric: metric}}, event_shadow, diff --git a/lib/sanbase/metric/registry/event_emitter.ex b/lib/sanbase/metric/registry/event_emitter.ex index 8cb3d2da7..84a44deaa 100644 --- a/lib/sanbase/metric/registry/event_emitter.ex +++ b/lib/sanbase/metric/registry/event_emitter.ex @@ -11,6 +11,13 @@ defmodule Sanbase.Metric.Registry.EventEmitter do |> notify() end + def handle_event({:ok, map}, event_type = :bulk_metric_registry_change, args) do + %{event_type: event_type} + |> Map.merge(map) + |> Map.merge(args) + |> notify() + end + def handle_event({:ok, struct}, event_type, args) when event_type in [ :create_metric_registry, diff --git a/lib/sanbase/metric/registry/populate.ex b/lib/sanbase/metric/registry/populate.ex index 84d81cec0..2997adca4 100644 --- a/lib/sanbase/metric/registry/populate.ex +++ b/lib/sanbase/metric/registry/populate.ex @@ -7,7 +7,7 @@ defmodule Sanbase.Metric.Registry.Populate do populate() end) |> case do - {:ok, {:ok, result}} -> {:ok, result} + {:ok, {:ok, list, summary}} -> {:ok, list, summary} data -> data end end @@ -53,7 +53,9 @@ defmodule Sanbase.Metric.Registry.Populate do def populate() do case process_metrics() do list when is_list(list) -> - summarize_results(list) + {:ok, list, summary} = summarize_results(list) + emit_events(list, summary) + {:ok, list, summary} {:error, %Ecto.Changeset{} = error} -> log_and_return_changeset_error(error) @@ -144,4 +146,48 @@ defmodule Sanbase.Metric.Registry.Populate do Reason: #{inspect(error)} """) end + + defp emit_events(list, summary) do + inserts = Map.get(summary, :insert, []) + updates = Map.get(summary, :update, []) + + if inserts != [] or updates != [] do + map = %{inserts_count: inserts, updates_count: updates} + + {inserted_metrics, updated_metrics} = + Enum.reduce(list, {[], []}, fn {type, record}, {insert_acc, update_acc} -> + case type do + :insert -> {[record.metric | insert_acc], update_acc} + :update -> {insert_acc, [record.metric | update_acc]} + _ -> {insert_acc, update_acc} + end + end) + + # Emit locally event with more data. The distributed events are used only to refresh the + # persistent term, the local node event will also trigger notifications + local_event_map = + Map.merge(map, %{inserted_metrics: inserted_metrics, updated_metrics: updated_metrics}) + + Sanbase.Metric.Registry.EventEmitter.emit_event( + {:ok, local_event_map}, + :bulk_metric_registry_change, + %{} + ) + + # Emit distributed event only to the MetricRegistrySubscriber so it refreshed the stored data + # in persistent term + Node.list() + |> Enum.each(fn node -> + Node.spawn(node, fn -> + Sanbase.Metric.Registry.EventEmitter.emit_event( + {:ok, map}, + :bulk_update_metric_registry, + %{__only_process_by__: [Sanbase.EventBus.MetricRegistrySubscriber]} + ) + end) + end) + else + :ok + end + end end diff --git a/test/test_seeds.exs b/test/test_seeds.exs index ac7c80a9b..4d345e87e 100644 --- a/test/test_seeds.exs +++ b/test/test_seeds.exs @@ -2,6 +2,6 @@ IO.puts("Running test seeds") IO.puts("Populating the Metric Registry...") -{:ok, metrics} = Sanbase.Metric.Registry.Populate.run() +{:ok, metrics, _summary} = Sanbase.Metric.Registry.Populate.run() IO.puts("Finished populating the Metric Registry. Inserted #{length(metrics)} metrics")