From 88ffb17592f0280ae98b824bd4acdb11459af644 Mon Sep 17 00:00:00 2001 From: Santiago Botta Date: Tue, 12 Mar 2024 02:07:39 -0300 Subject: [PATCH 1/5] Implement a stream module to refactor some redis code --- lib/redis/client.ex | 49 +++--------------------- lib/redis/stream.ex | 93 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+), 44 deletions(-) create mode 100644 lib/redis/stream.ex diff --git a/lib/redis/client.ex b/lib/redis/client.ex index f84e586..c809cbf 100644 --- a/lib/redis/client.ex +++ b/lib/redis/client.ex @@ -72,6 +72,11 @@ defmodule Redis.Client do Redix.command(:redix, [command, stream_name, "+", "-", "COUNT", count]) end + @spec fetch_reverse_stream_since(binary(), binary() | non_neg_integer()) :: + Stream.response() + def fetch_reverse_stream_since(stream_name, since), + do: Stream.xrevrange(stream_name, "+", since) + @spec fetch_last_stream_entry(String.t()) :: {:ok, Stream.Entry.t()} | {:error, :no_result | :stream_parse_error} @@ -177,47 +182,3 @@ defmodule Redis.Client do acc ++ [Atom.to_string(key), value] end) end - -defmodule Redis.Stream.Entry do - @moduledoc false - require Integer - - @enforce_keys [:id, :values, :datetime] - defstruct id: nil, values: nil, datetime: nil - - @type t :: %__MODULE__{} - - @doc """ - Given a redis stream entry returns a readable map representation of the - entry values. - """ - @spec from_raw_entry(any()) :: map() - def from_raw_entry([entry_id, entry]) do - datetime = parse_stream_entry_id(entry_id) - - Enum.reduce(Enum.with_index(entry), {[], []}, fn {value, index}, - {keys, values} -> - case Integer.is_even(index) do - true -> - {keys ++ [value], values} - - false -> - {keys, values ++ [value]} - end - end) - |> then(fn {keys, values} -> Enum.zip(keys, values) end) - |> Enum.into(%{}) - |> then(fn values -> - %__MODULE__{id: entry_id, values: values, datetime: datetime} - end) - end - - @spec parse_stream_entry_id(String.t()) :: DateTime.t() - defp parse_stream_entry_id(entry_id) do - entry_id - |> String.split("-") - |> hd - |> String.to_integer() - |> DateTime.from_unix!(:millisecond) - end -end diff --git a/lib/redis/stream.ex b/lib/redis/stream.ex new file mode 100644 index 0000000..35da6d1 --- /dev/null +++ b/lib/redis/stream.ex @@ -0,0 +1,93 @@ +defmodule Redis.Stream do + @moduledoc """ + Module definition for redis streams + """ + + alias Redis.Client + alias Redis.Stream + + require Logger + + @type since :: binary() | non_neg_integer() + @type until :: binary() | non_neg_integer() + @type response :: {:ok, [Stream.Entry.t()]} | {:error, :stream_parse_error} + + @spec xrevrange(binary(), since(), until()) :: response() + def xrevrange(stream_name, until \\ "+", since \\ "-") do + Redix.command(:redix, ["XREVRANGE", stream_name, since, until]) + |> parse_response() + end + + @spec parse_response(Client.redix_response()) :: + {:ok, [map()]} | {:error, :stream_parse_error} + defp parse_response(reply) do + with {:ok, entries} <- parse_reply(reply), + parsed_entries <- parse_stream_entries(entries) do + {:ok, parsed_entries} + end + rescue + error -> + Logger.error( + "There was an error while processing the stream result error=#{inspect(error)}" + ) + + {:error, :stream_parse_error} + end + + @spec parse_reply({atom, list | binary} | any) :: + {:error, :no_result} | {:ok, any} | any + defp parse_reply({:ok, []}), do: {:error, :no_result} + defp parse_reply({:ok, _result} = result), do: result + + @spec parse_stream_entries([any()]) :: [Stream.Entry.t()] + defp parse_stream_entries(entries), + do: entries |> Enum.map(&parse_stream_entry/1) + + @spec parse_stream_entry(list()) :: Stream.Entry.t() + defp parse_stream_entry([_entry_id, _entry_values] = entry), + do: Stream.Entry.from_raw_entry(entry) +end + +defmodule Redis.Stream.Entry do + @moduledoc false + require Integer + + @enforce_keys [:id, :values, :datetime] + defstruct id: nil, values: nil, datetime: nil + + @type t :: %__MODULE__{} + + @doc """ + Given a redis stream entry returns a readable map representation of the + entry values. + """ + @spec from_raw_entry(any()) :: t() + def from_raw_entry([entry_id, entry]) do + datetime = parse_stream_entry_id(entry_id) + + Enum.reduce(Enum.with_index(entry), {[], []}, fn {value, index}, + {keys, values} -> + case Integer.is_even(index) do + true -> + {keys ++ [value], values} + + false -> + {keys, values ++ [value]} + end + end) + |> then(fn {keys, values} -> Enum.zip(keys, values) end) + |> Enum.into(%{}) + |> then(fn values -> + %__MODULE__{id: entry_id, values: values, datetime: datetime} + end) + end + + @spec parse_stream_entry_id(String.t()) :: DateTime.t() + defp parse_stream_entry_id(entry_id) do + entry_id + |> String.split("-") + |> hd + |> String.to_integer() + |> DateTime.from_unix!(:millisecond) + end +end From de3516627f9aa7db4f7147dd3475c7a3a5204086 Mon Sep 17 00:00:00 2001 From: Santiago Botta Date: Tue, 12 Mar 2024 03:06:50 -0300 Subject: [PATCH 2/5] Fixup xrevrange params order --- lib/redis/stream.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/redis/stream.ex b/lib/redis/stream.ex index 35da6d1..4ecd851 100644 --- a/lib/redis/stream.ex +++ b/lib/redis/stream.ex @@ -14,7 +14,7 @@ defmodule Redis.Stream do @spec xrevrange(binary(), since(), until()) :: response() def xrevrange(stream_name, until \\ "+", since \\ "-") do - Redix.command(:redix, ["XREVRANGE", stream_name, since, until]) + Redix.command(:redix, ["XREVRANGE", stream_name, until, since]) |> parse_response() end From 55a69dbe5db357ef2f96a86c2007be8b73eafc21 Mon Sep 17 00:00:00 2001 From: Santiago Botta Date: Tue, 12 Mar 2024 03:07:44 -0300 Subject: [PATCH 3/5] Group quote history by day --- lib/ex_finance/currencies.ex | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/lib/ex_finance/currencies.ex b/lib/ex_finance/currencies.ex index 340d09d..84bb00c 100644 --- a/lib/ex_finance/currencies.ex +++ b/lib/ex_finance/currencies.ex @@ -374,9 +374,17 @@ defmodule ExFinance.Currencies do stream_name = get_stream_name("currency-history_" <> supplier_name <> "_" <> type) - with {:ok, entries} <- Redis.Client.fetch_history(stream_name, 40), - sorted_entries <- Enum.reverse(entries), - history <- map_currency_history(sorted_entries) do + days_before_now = -20 + + since = + DateTime.utc_now() + |> DateTime.add(days_before_now, :day) + |> DateTime.to_unix(:millisecond) + + with {:ok, entries} <- + Redis.Client.fetch_reverse_stream_since(stream_name, since), + filtered_entries <- filter_history_entries(entries), + history <- map_currency_history(filtered_entries) do {:ok, history} else error -> @@ -395,6 +403,18 @@ defmodule ExFinance.Currencies do :error end + @spec filter_history_entries([Redis.Stream.Entry.t()]) :: [ + Redis.Stream.Entry.t() + ] + defp filter_history_entries(entries) do + entries + |> Enum.group_by(fn entry -> + "#{entry.datetime.year}-#{entry.datetime.month}-#{entry.datetime.day}" + end) + |> Enum.map(fn {_datetime, entries} -> hd(entries) end) + |> Enum.sort_by(& &1.datetime, :asc) + end + @spec map_currency_history([Redis.Stream.Entry.t()]) :: [ {NaiveDateTime.t(), Currency.t()} ] From 5de7d6c8312b393be01663bc3006a5255785536d Mon Sep 17 00:00:00 2001 From: Santiago Botta Date: Tue, 12 Mar 2024 03:08:32 -0300 Subject: [PATCH 4/5] Update suggestedmin/max for the charts --- assets/js/charts/line_chart.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/assets/js/charts/line_chart.js b/assets/js/charts/line_chart.js index 3996d17..e0a8eaa 100644 --- a/assets/js/charts/line_chart.js +++ b/assets/js/charts/line_chart.js @@ -34,8 +34,8 @@ export default class { suggestedMin: 50 }, y: { - suggestedMax: 200, - suggestedMin: 50 + suggestedMax: 1200, + suggestedMin: 800 } } }, From 11fbd6bca96d987bc2db4d2a8226d129528d9cc1 Mon Sep 17 00:00:00 2001 From: Santiago Botta Date: Tue, 12 Mar 2024 03:09:04 -0300 Subject: [PATCH 5/5] Update version --- version.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.txt b/version.txt index 8294c18..7693c96 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.1.2 \ No newline at end of file +0.1.3 \ No newline at end of file