Skip to content

Commit

Permalink
Improve the Metric Registry Populate script
Browse files Browse the repository at this point in the history
  • Loading branch information
IvanIvanoff committed Dec 19, 2024
1 parent f4e1f6b commit cf01ec7
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 22 deletions.
120 changes: 99 additions & 21 deletions lib/sanbase/metric/registry/populate.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@ defmodule Sanbase.Metric.Registry.Populate do
end
end

def json_map_to_registry_changeset(%{} = map) do
def json_map_to_registry_params(%{} = map) do
{:ok, captures} = Sanbase.TemplateEngine.Captures.extract_captures(map["name"])
is_template = captures != []

%Sanbase.Metric.Registry{}
|> Sanbase.Metric.Registry.changeset(%{
%{
access: map["access"],
default_aggregation: map["aggregation"],
aliases: Map.get(map, "aliases", []) |> Enum.map(&%{name: &1}),
Expand All @@ -41,29 +40,108 @@ defmodule Sanbase.Metric.Registry.Populate do
required_selectors: Map.get(map, "required_selectors", []) |> Enum.map(&%{type: &1}),
selectors: Map.get(map, "selectors", []) |> Enum.map(&%{type: &1}),
tables: map["table"] |> List.wrap() |> Enum.map(&%{name: &1})
})
}
end

def json_map_to_registry_changeset(%{} = map) do
params = json_map_to_registry_params(map)

%Sanbase.Metric.Registry{}
|> Sanbase.Metric.Registry.changeset(params)
end

def populate() do
Sanbase.Clickhouse.MetricAdapter.FileHandler.raw_metrics_json()
|> Enum.reduce_while([], fn map, acc ->
changeset = json_map_to_registry_changeset(map)

case Sanbase.Repo.insert(changeset,
on_conflict: :replace_all,
conflict_target: [:metric, :fixed_parameters, :data_type]
) do
{:ok, result} -> {:cont, [result | acc]}
{:error, error} -> {:halt, {:error, error}}
end
end)
|> case do
case process_metrics() do
list when is_list(list) ->
{:ok, list}
summarize_results(list)

{:error, %Ecto.Changeset{} = error} ->
log_and_return_changeset_error(error)

{:error, error} when is_binary(error) ->
log_and_return_error(error)
end
end

defp process_metrics do
Sanbase.Clickhouse.MetricAdapter.FileHandler.raw_metrics_json()
|> Enum.reduce_while([], &process_single_metric/2)
end

defp process_single_metric(map, acc) do
existing_record =
Sanbase.Metric.Registry.by_name(
map["name"],
map["data_type"],
map["fixed_parameters"] || %{}
)

case handle_metric_record(existing_record, map) do
{type, {:ok, result}} ->
{:cont, [{type, result} | acc]}

{:error, error} ->
IO.puts("Error: #{Sanbase.Utils.ErrorHandling.changeset_errors_string(error)}")
{:error, error}
{:insert, {:error, error}} ->
log_insert_error(map, error)
{:halt, {:error, error}}

{:update, {:error, error}} ->
log_update_error(map, error)
{:halt, {:error, error}}
end
end

defp handle_metric_record({:ok, record}, map) do
params = json_map_to_registry_params(map)
changeset = Sanbase.Metric.Registry.changeset(record, params)
new_record = changeset |> Ecto.Changeset.apply_changes()

case record == new_record do
true ->
{:unchanged, {:ok, record}}

false ->
{:update, Sanbase.Repo.update(changeset)}
end
end

defp handle_metric_record({:error, _}, map) do
changeset = json_map_to_registry_changeset(map)
{:insert, Sanbase.Repo.insert(changeset)}
end

defp summarize_results(list) do
{list, counts} =
Enum.reduce(list, {[], %{}}, fn {type, record}, {list_acc, count_acc} ->
{
[{type, record} | list_acc],
Map.update(count_acc, type, 1, &(&1 + 1))
}
end)

{:ok, list, counts}
end

defp log_and_return_changeset_error(error) do
IO.puts("Error: #{Sanbase.Utils.ErrorHandling.changeset_errors_string(error)}")
{:error, error}
end

defp log_and_return_error(error) do
IO.puts(error)
{:error, error}
end

defp log_insert_error(map, error) do
IO.puts("""
Error inserting new metric: #{inspect(map)}.
Reason: #{inspect(error)}
""")
end

defp log_update_error(map, error) do
IO.puts("""
Error updating existing metric: #{inspect(map)}
Reason: #{inspect(error)}
""")
end
end
2 changes: 1 addition & 1 deletion lib/sanbase/metric/registry/registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ defmodule Sanbase.Metric.Registry do
query =
from(mr in __MODULE__,
where:
mr.metric == ^metric and mr.data_type == "timeseries" and
mr.metric == ^metric and mr.data_type == ^data_type and
mr.fixed_parameters == ^fixed_parameters
)

Expand Down

0 comments on commit cf01ec7

Please sign in to comment.