From 68ae808848dc977e9b958e3c1cc01d7a88a9673c Mon Sep 17 00:00:00 2001 From: Antoine Augusti Date: Thu, 19 Dec 2024 09:52:31 +0100 Subject: [PATCH 1/5] =?UTF-8?q?Ajout=20plug=20healthcheck=20sp=C3=A9cifiqu?= =?UTF-8?q?e=20pour=20le=20worker?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../lib/transport_web/plugs/router.ex | 2 +- .../transport_web/plugs/worker_healthcheck.ex | 77 +++++++++++++++++++ 2 files changed, 78 insertions(+), 1 deletion(-) create mode 100644 apps/transport/lib/transport_web/plugs/worker_healthcheck.ex diff --git a/apps/transport/lib/transport_web/plugs/router.ex b/apps/transport/lib/transport_web/plugs/router.ex index d1cbfb54a9..4ee9e686e6 100644 --- a/apps/transport/lib/transport_web/plugs/router.ex +++ b/apps/transport/lib/transport_web/plugs/router.ex @@ -2,7 +2,7 @@ defmodule TransportWeb.Plugs.Router do use Plug.Router plug(TransportWeb.Plugs.HealthCheck, at: "/health-check") - plug(TransportWeb.Plugs.Halt, if: {Transport.Application, :worker_only?}, message: "UP (WORKER-ONLY)") + plug(TransportWeb.Plugs.WorkerHealthcheck, if: {Transport.Application, :worker_only?}) plug(:match) plug(:dispatch) diff --git a/apps/transport/lib/transport_web/plugs/worker_healthcheck.ex b/apps/transport/lib/transport_web/plugs/worker_healthcheck.ex new file mode 100644 index 0000000000..0be5a4d128 --- /dev/null +++ b/apps/transport/lib/transport_web/plugs/worker_healthcheck.ex @@ -0,0 +1,77 @@ +defmodule TransportWeb.Plugs.WorkerHealthcheck do + @moduledoc """ + A plug for the worker. + + It displays: + - when the app was started + - the last attempt for Oban jobs + - if the system is healthy + + The system is considered healthy if the app was started recently or + if Oban attempted jobs recently. + """ + import Plug.Conn + + @app_start_waiting_delay {20, :minute} + @oban_max_delay_since_last_attempt {60, :minute} + + def init(options), do: options + + def call(conn, opts) do + {mod, fun} = opts[:if] + + if apply(mod, fun, []) do + status_code = if healthy_state?(), do: 200, else: 503 + + conn + |> put_resp_content_type("text/plain") + |> send_resp(status_code, """ + UP (WORKER-ONLY) + App start time: #{app_start_datetime()} + App started recently?: #{app_started_recently?()} + Oban last attempt: #{oban_last_attempted_at()} + Oban attempted jobs recently?: #{oban_attempted_jobs_recently?()} + Healthy state?: #{healthy_state?()} + """) + |> halt() + else + conn + end + end + + def healthy_state? do + app_started_recently?() or oban_attempted_jobs_recently?() + end + + def app_started_recently? do + start_datetime = app_start_datetime() + {delay, unit} = @app_start_waiting_delay + DateTime.before?(DateTime.utc_now(), DateTime.add(start_datetime, delay, unit)) + end + + def app_start_datetime do + Transport.Cache.fetch(app_start_datetime_cache_key_name(), fn -> DateTime.utc_now() end, expire: nil) + end + + def app_start_datetime_cache_key_name, do: "#{__MODULE__}::app_start_datetime" + + def oban_attempted_jobs_recently? do + oban_last_attempt = oban_last_attempted_at() + {delay, unit} = @oban_max_delay_since_last_attempt + DateTime.before?(oban_last_attempt, DateTime.add(oban_last_attempt, delay, unit)) + end + + def oban_last_attempted_at do + %Postgrex.Result{rows: [[delay]]} = + DB.Repo.query!(""" + SELECT MAX(attempted_at) + FROM oban_jobs + WHERE state = 'completed' + """) + + case delay do + nil -> DateTime.new!(~D[1970-01-01], ~T[00:00:00.000], "Etc/UTC") + %NaiveDateTime{} = nt -> DateTime.from_naive!(nt, "Etc/UTC") + end + end +end From 5177ca98a712f4d1592dcece5e9d4746d5441628 Mon Sep 17 00:00:00 2001 From: Antoine Augusti Date: Thu, 19 Dec 2024 11:22:28 +0100 Subject: [PATCH 2/5] Add tests --- .../transport_web/plugs/worker_healthcheck.ex | 7 +- .../plugs/worker_healthcheck_test.exs | 73 +++++++++++++++++++ 2 files changed, 76 insertions(+), 4 deletions(-) create mode 100644 apps/transport/test/transport_web/plugs/worker_healthcheck_test.exs diff --git a/apps/transport/lib/transport_web/plugs/worker_healthcheck.ex b/apps/transport/lib/transport_web/plugs/worker_healthcheck.ex index 0be5a4d128..c5604d899e 100644 --- a/apps/transport/lib/transport_web/plugs/worker_healthcheck.ex +++ b/apps/transport/lib/transport_web/plugs/worker_healthcheck.ex @@ -1,6 +1,7 @@ defmodule TransportWeb.Plugs.WorkerHealthcheck do @moduledoc """ A plug for the worker. + It can be conditionally enabled by passing an `:if` condition that will be evaluated. It displays: - when the app was started @@ -44,9 +45,8 @@ defmodule TransportWeb.Plugs.WorkerHealthcheck do end def app_started_recently? do - start_datetime = app_start_datetime() {delay, unit} = @app_start_waiting_delay - DateTime.before?(DateTime.utc_now(), DateTime.add(start_datetime, delay, unit)) + DateTime.before?(DateTime.utc_now(), DateTime.add(app_start_datetime(), delay, unit)) end def app_start_datetime do @@ -56,9 +56,8 @@ defmodule TransportWeb.Plugs.WorkerHealthcheck do def app_start_datetime_cache_key_name, do: "#{__MODULE__}::app_start_datetime" def oban_attempted_jobs_recently? do - oban_last_attempt = oban_last_attempted_at() {delay, unit} = @oban_max_delay_since_last_attempt - DateTime.before?(oban_last_attempt, DateTime.add(oban_last_attempt, delay, unit)) + DateTime.after?(oban_last_attempted_at(), DateTime.add(DateTime.utc_now(), -delay, unit)) end def oban_last_attempted_at do diff --git a/apps/transport/test/transport_web/plugs/worker_healthcheck_test.exs b/apps/transport/test/transport_web/plugs/worker_healthcheck_test.exs new file mode 100644 index 0000000000..b71147591d --- /dev/null +++ b/apps/transport/test/transport_web/plugs/worker_healthcheck_test.exs @@ -0,0 +1,73 @@ +defmodule TransportWeb.Plugs.WorkerHealthcheckTest do + # async: false is required because we use real in-memory caching in these tests + use TransportWeb.ConnCase, async: false + alias TransportWeb.Plugs.WorkerHealthcheck + + setup do + # Use a real in-memory cache for these tests to test the caching mecanism + old_value = Application.fetch_env!(:transport, :cache_impl) + Application.put_env(:transport, :cache_impl, Transport.Cache.Cachex) + + on_exit(fn -> + Application.put_env(:transport, :cache_impl, old_value) + Cachex.reset(Transport.Cache.Cachex.cache_name()) + end) + + Ecto.Adapters.SQL.Sandbox.checkout(DB.Repo) + end + + describe "healthy_state?" do + test "app was started recently, no Oban jobs" do + assert WorkerHealthcheck.app_started_recently?() + refute WorkerHealthcheck.oban_attempted_jobs_recently?() + assert WorkerHealthcheck.healthy_state?() + end + + test "app was not started recently, Oban jobs have not been attempted recently" do + datetime = DateTime.add(DateTime.utc_now(), -30, :minute) + Cachex.put(Transport.Cache.Cachex.cache_name(), WorkerHealthcheck.app_start_datetime_cache_key_name(), datetime) + + refute WorkerHealthcheck.app_started_recently?() + refute WorkerHealthcheck.oban_attempted_jobs_recently?() + refute WorkerHealthcheck.healthy_state?() + end + + test "app was not started recently, Oban jobs have been attempted recently" do + datetime = DateTime.add(DateTime.utc_now(), -30, :minute) + Cachex.put(Transport.Cache.Cachex.cache_name(), WorkerHealthcheck.app_start_datetime_cache_key_name(), datetime) + + # A completed job was attempted 55 minutes ago + Transport.Jobs.ResourceUnavailableJob.new(%{resource_id: 1}) + |> Oban.insert!() + |> Ecto.Changeset.change(attempted_at: DateTime.add(DateTime.utc_now(), -55, :minute), state: "completed") + |> DB.Repo.update!() + + refute WorkerHealthcheck.app_started_recently?() + assert WorkerHealthcheck.oban_attempted_jobs_recently?() + assert WorkerHealthcheck.healthy_state?() + end + end + + describe "call" do + test "healthy system", %{conn: conn} do + assert WorkerHealthcheck.app_started_recently?() + refute WorkerHealthcheck.oban_attempted_jobs_recently?() + assert WorkerHealthcheck.healthy_state?() + + assert conn |> WorkerHealthcheck.call(if: {__MODULE__, :plug_enabled?}) |> text_response(200) + end + + test "unhealthy system", %{conn: conn} do + datetime = DateTime.add(DateTime.utc_now(), -30, :minute) + Cachex.put(Transport.Cache.Cachex.cache_name(), WorkerHealthcheck.app_start_datetime_cache_key_name(), datetime) + + refute WorkerHealthcheck.app_started_recently?() + refute WorkerHealthcheck.oban_attempted_jobs_recently?() + refute WorkerHealthcheck.healthy_state?() + + assert conn |> WorkerHealthcheck.call(if: {__MODULE__, :plug_enabled?}) |> text_response(503) + end + end + + def plug_enabled?, do: true +end From 35f64e76747750d021451b6d4a57c5870d9550c0 Mon Sep 17 00:00:00 2001 From: Antoine Augusti Date: Thu, 19 Dec 2024 13:56:46 +0100 Subject: [PATCH 3/5] Even more tests --- .../plugs/worker_healthcheck_test.exs | 63 +++++++++++++++++-- 1 file changed, 59 insertions(+), 4 deletions(-) diff --git a/apps/transport/test/transport_web/plugs/worker_healthcheck_test.exs b/apps/transport/test/transport_web/plugs/worker_healthcheck_test.exs index b71147591d..61ef64db82 100644 --- a/apps/transport/test/transport_web/plugs/worker_healthcheck_test.exs +++ b/apps/transport/test/transport_web/plugs/worker_healthcheck_test.exs @@ -3,6 +3,9 @@ defmodule TransportWeb.Plugs.WorkerHealthcheckTest do use TransportWeb.ConnCase, async: false alias TransportWeb.Plugs.WorkerHealthcheck + @cache_name Transport.Cache.Cachex.cache_name() + @cache_key WorkerHealthcheck.app_start_datetime_cache_key_name() + setup do # Use a real in-memory cache for these tests to test the caching mecanism old_value = Application.fetch_env!(:transport, :cache_impl) @@ -10,7 +13,7 @@ defmodule TransportWeb.Plugs.WorkerHealthcheckTest do on_exit(fn -> Application.put_env(:transport, :cache_impl, old_value) - Cachex.reset(Transport.Cache.Cachex.cache_name()) + Cachex.reset(@cache_name) end) Ecto.Adapters.SQL.Sandbox.checkout(DB.Repo) @@ -25,7 +28,7 @@ defmodule TransportWeb.Plugs.WorkerHealthcheckTest do test "app was not started recently, Oban jobs have not been attempted recently" do datetime = DateTime.add(DateTime.utc_now(), -30, :minute) - Cachex.put(Transport.Cache.Cachex.cache_name(), WorkerHealthcheck.app_start_datetime_cache_key_name(), datetime) + Cachex.put(@cache_name, @cache_key, datetime) refute WorkerHealthcheck.app_started_recently?() refute WorkerHealthcheck.oban_attempted_jobs_recently?() @@ -34,7 +37,7 @@ defmodule TransportWeb.Plugs.WorkerHealthcheckTest do test "app was not started recently, Oban jobs have been attempted recently" do datetime = DateTime.add(DateTime.utc_now(), -30, :minute) - Cachex.put(Transport.Cache.Cachex.cache_name(), WorkerHealthcheck.app_start_datetime_cache_key_name(), datetime) + Cachex.put(@cache_name, @cache_key, datetime) # A completed job was attempted 55 minutes ago Transport.Jobs.ResourceUnavailableJob.new(%{resource_id: 1}) @@ -48,6 +51,58 @@ defmodule TransportWeb.Plugs.WorkerHealthcheckTest do end end + describe "app_started_recently?" do + test "value is set when executed for the first time" do + assert {:ok, false} == Cachex.exists?(@cache_name, @cache_key) + # Calling for the first time creates the key + assert WorkerHealthcheck.app_started_recently?() + assert {:ok, true} == Cachex.exists?(@cache_name, @cache_key) + + # Calling again does not refresh the initial value + start_datetime = WorkerHealthcheck.app_start_datetime() + WorkerHealthcheck.app_started_recently?() + assert start_datetime == WorkerHealthcheck.app_start_datetime() + + # Key does not expire + assert {:ok, nil} == Cachex.ttl(@cache_name, @cache_key) + end + + test "acceptable delay is 20 minutes" do + # Just right + datetime = DateTime.add(DateTime.utc_now(), -19, :minute) + Cachex.put(@cache_name, @cache_key, datetime) + + assert WorkerHealthcheck.app_started_recently?() + + # Too long ago + datetime = DateTime.add(DateTime.utc_now(), -21, :minute) + Cachex.put(@cache_name, @cache_key, datetime) + refute WorkerHealthcheck.app_started_recently?() + end + end + + describe "oban_attempted_jobs_recently?" do + test "job attempted recently" do + # Attempted less than 60 minutes ago + Transport.Jobs.ResourceUnavailableJob.new(%{resource_id: 1}) + |> Oban.insert!() + |> Ecto.Changeset.change(attempted_at: DateTime.add(DateTime.utc_now(), -59, :minute), state: "completed") + |> DB.Repo.update!() + + assert WorkerHealthcheck.oban_attempted_jobs_recently?() + end + + test "job attempted too long ago" do + # Attempted more than 60 minutes ago + Transport.Jobs.ResourceUnavailableJob.new(%{resource_id: 1}) + |> Oban.insert!() + |> Ecto.Changeset.change(attempted_at: DateTime.add(DateTime.utc_now(), -61, :minute), state: "completed") + |> DB.Repo.update!() + + refute WorkerHealthcheck.oban_attempted_jobs_recently?() + end + end + describe "call" do test "healthy system", %{conn: conn} do assert WorkerHealthcheck.app_started_recently?() @@ -59,7 +114,7 @@ defmodule TransportWeb.Plugs.WorkerHealthcheckTest do test "unhealthy system", %{conn: conn} do datetime = DateTime.add(DateTime.utc_now(), -30, :minute) - Cachex.put(Transport.Cache.Cachex.cache_name(), WorkerHealthcheck.app_start_datetime_cache_key_name(), datetime) + Cachex.put(@cache_name, @cache_key, datetime) refute WorkerHealthcheck.app_started_recently?() refute WorkerHealthcheck.oban_attempted_jobs_recently?() From 3c425875b0215e1d8bfa7d6062dd5c5e0a211381 Mon Sep 17 00:00:00 2001 From: Antoine Augusti Date: Thu, 19 Dec 2024 15:35:14 +0100 Subject: [PATCH 4/5] Add AppSignal metric --- .../transport/lib/transport_web/plugs/worker_healthcheck.ex | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/apps/transport/lib/transport_web/plugs/worker_healthcheck.ex b/apps/transport/lib/transport_web/plugs/worker_healthcheck.ex index c5604d899e..fedbc61644 100644 --- a/apps/transport/lib/transport_web/plugs/worker_healthcheck.ex +++ b/apps/transport/lib/transport_web/plugs/worker_healthcheck.ex @@ -22,6 +22,7 @@ defmodule TransportWeb.Plugs.WorkerHealthcheck do {mod, fun} = opts[:if] if apply(mod, fun, []) do + store_last_attempted_at_delay_metric() status_code = if healthy_state?(), do: 200, else: 503 conn @@ -40,6 +41,11 @@ defmodule TransportWeb.Plugs.WorkerHealthcheck do end end + def store_last_attempted_at_delay_metric do + value = DateTime.diff(oban_last_attempted_at(), DateTime.utc_now(), :second) + Appsignal.add_distribution_value("oban.last_attempted_at_delay", value) + end + def healthy_state? do app_started_recently?() or oban_attempted_jobs_recently?() end From b9a40ec445b9a9b8c0c85e76aeaa1ee57752ff73 Mon Sep 17 00:00:00 2001 From: Antoine Augusti Date: Thu, 19 Dec 2024 15:57:01 +0100 Subject: [PATCH 5/5] PR review --- apps/transport/lib/transport_web/plugs/worker_healthcheck.ex | 2 +- .../test/transport_web/plugs/worker_healthcheck_test.exs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/apps/transport/lib/transport_web/plugs/worker_healthcheck.ex b/apps/transport/lib/transport_web/plugs/worker_healthcheck.ex index fedbc61644..269386d259 100644 --- a/apps/transport/lib/transport_web/plugs/worker_healthcheck.ex +++ b/apps/transport/lib/transport_web/plugs/worker_healthcheck.ex @@ -52,7 +52,7 @@ defmodule TransportWeb.Plugs.WorkerHealthcheck do def app_started_recently? do {delay, unit} = @app_start_waiting_delay - DateTime.before?(DateTime.utc_now(), DateTime.add(app_start_datetime(), delay, unit)) + DateTime.diff(DateTime.utc_now(), app_start_datetime(), unit) < delay end def app_start_datetime do diff --git a/apps/transport/test/transport_web/plugs/worker_healthcheck_test.exs b/apps/transport/test/transport_web/plugs/worker_healthcheck_test.exs index 61ef64db82..ce6da59f9d 100644 --- a/apps/transport/test/transport_web/plugs/worker_healthcheck_test.exs +++ b/apps/transport/test/transport_web/plugs/worker_healthcheck_test.exs @@ -1,5 +1,6 @@ defmodule TransportWeb.Plugs.WorkerHealthcheckTest do - # async: false is required because we use real in-memory caching in these tests + # async: false is required because we use real in-memory caching in these tests, + # and we swap application config (shared state) use TransportWeb.ConnCase, async: false alias TransportWeb.Plugs.WorkerHealthcheck