Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plug pour vérifier le bon fonctionnement du worker #4399

Merged
merged 6 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/transport/lib/transport_web/plugs/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule TransportWeb.Plugs.Router do
use Plug.Router

plug(TransportWeb.Plugs.HealthCheck, at: "/health-check")
plug(TransportWeb.Plugs.Halt, if: {Transport.Application, :worker_only?}, message: "UP (WORKER-ONLY)")
plug(TransportWeb.Plugs.WorkerHealthcheck, if: {Transport.Application, :worker_only?})

plug(:match)
plug(:dispatch)
Expand Down
82 changes: 82 additions & 0 deletions apps/transport/lib/transport_web/plugs/worker_healthcheck.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
defmodule TransportWeb.Plugs.WorkerHealthcheck do
@moduledoc """
A plug for the worker.
It can be conditionally enabled by passing an `:if` condition that will be evaluated.

It displays:
- when the app was started
- the last attempt for Oban jobs
- if the system is healthy

The system is considered healthy if the app was started recently or
if Oban attempted jobs recently.
"""
import Plug.Conn

@app_start_waiting_delay {20, :minute}
@oban_max_delay_since_last_attempt {60, :minute}

def init(options), do: options

def call(conn, opts) do
{mod, fun} = opts[:if]

if apply(mod, fun, []) do
store_last_attempted_at_delay_metric()
status_code = if healthy_state?(), do: 200, else: 503

conn
|> put_resp_content_type("text/plain")
|> send_resp(status_code, """
UP (WORKER-ONLY)
App start time: #{app_start_datetime()}
App started recently?: #{app_started_recently?()}
Oban last attempt: #{oban_last_attempted_at()}
Oban attempted jobs recently?: #{oban_attempted_jobs_recently?()}
Healthy state?: #{healthy_state?()}
""")
|> halt()
else
conn
end
end

def store_last_attempted_at_delay_metric do
value = DateTime.diff(oban_last_attempted_at(), DateTime.utc_now(), :second)
Appsignal.add_distribution_value("oban.last_attempted_at_delay", value)
end

def healthy_state? do
app_started_recently?() or oban_attempted_jobs_recently?()
end

def app_started_recently? do
{delay, unit} = @app_start_waiting_delay
DateTime.before?(DateTime.utc_now(), DateTime.add(app_start_datetime(), delay, unit))
thbar marked this conversation as resolved.
Show resolved Hide resolved
end

def app_start_datetime do
Transport.Cache.fetch(app_start_datetime_cache_key_name(), fn -> DateTime.utc_now() end, expire: nil)
end

def app_start_datetime_cache_key_name, do: "#{__MODULE__}::app_start_datetime"

def oban_attempted_jobs_recently? do
{delay, unit} = @oban_max_delay_since_last_attempt
DateTime.after?(oban_last_attempted_at(), DateTime.add(DateTime.utc_now(), -delay, unit))
end

def oban_last_attempted_at do
%Postgrex.Result{rows: [[delay]]} =
DB.Repo.query!("""
SELECT MAX(attempted_at)
FROM oban_jobs
WHERE state = 'completed'
""")

case delay do
nil -> DateTime.new!(~D[1970-01-01], ~T[00:00:00.000], "Etc/UTC")
AntoineAugusti marked this conversation as resolved.
Show resolved Hide resolved
%NaiveDateTime{} = nt -> DateTime.from_naive!(nt, "Etc/UTC")
end
end
end
128 changes: 128 additions & 0 deletions apps/transport/test/transport_web/plugs/worker_healthcheck_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
defmodule TransportWeb.Plugs.WorkerHealthcheckTest do
# async: false is required because we use real in-memory caching in these tests
AntoineAugusti marked this conversation as resolved.
Show resolved Hide resolved
use TransportWeb.ConnCase, async: false
alias TransportWeb.Plugs.WorkerHealthcheck

@cache_name Transport.Cache.Cachex.cache_name()
@cache_key WorkerHealthcheck.app_start_datetime_cache_key_name()

setup do
# Use a real in-memory cache for these tests to test the caching mecanism
old_value = Application.fetch_env!(:transport, :cache_impl)
Application.put_env(:transport, :cache_impl, Transport.Cache.Cachex)

on_exit(fn ->
Application.put_env(:transport, :cache_impl, old_value)
Cachex.reset(@cache_name)
end)

Ecto.Adapters.SQL.Sandbox.checkout(DB.Repo)
end

describe "healthy_state?" do
test "app was started recently, no Oban jobs" do
assert WorkerHealthcheck.app_started_recently?()
refute WorkerHealthcheck.oban_attempted_jobs_recently?()
assert WorkerHealthcheck.healthy_state?()
end

test "app was not started recently, Oban jobs have not been attempted recently" do
datetime = DateTime.add(DateTime.utc_now(), -30, :minute)
Cachex.put(@cache_name, @cache_key, datetime)

refute WorkerHealthcheck.app_started_recently?()
refute WorkerHealthcheck.oban_attempted_jobs_recently?()
refute WorkerHealthcheck.healthy_state?()
end

test "app was not started recently, Oban jobs have been attempted recently" do
datetime = DateTime.add(DateTime.utc_now(), -30, :minute)
Cachex.put(@cache_name, @cache_key, datetime)

# A completed job was attempted 55 minutes ago
Transport.Jobs.ResourceUnavailableJob.new(%{resource_id: 1})
|> Oban.insert!()
|> Ecto.Changeset.change(attempted_at: DateTime.add(DateTime.utc_now(), -55, :minute), state: "completed")
|> DB.Repo.update!()

refute WorkerHealthcheck.app_started_recently?()
assert WorkerHealthcheck.oban_attempted_jobs_recently?()
assert WorkerHealthcheck.healthy_state?()
end
end

describe "app_started_recently?" do
test "value is set when executed for the first time" do
assert {:ok, false} == Cachex.exists?(@cache_name, @cache_key)
# Calling for the first time creates the key
assert WorkerHealthcheck.app_started_recently?()
assert {:ok, true} == Cachex.exists?(@cache_name, @cache_key)

# Calling again does not refresh the initial value
start_datetime = WorkerHealthcheck.app_start_datetime()
WorkerHealthcheck.app_started_recently?()
assert start_datetime == WorkerHealthcheck.app_start_datetime()

# Key does not expire
assert {:ok, nil} == Cachex.ttl(@cache_name, @cache_key)
end

test "acceptable delay is 20 minutes" do
# Just right
datetime = DateTime.add(DateTime.utc_now(), -19, :minute)
Cachex.put(@cache_name, @cache_key, datetime)

assert WorkerHealthcheck.app_started_recently?()

# Too long ago
datetime = DateTime.add(DateTime.utc_now(), -21, :minute)
Cachex.put(@cache_name, @cache_key, datetime)
refute WorkerHealthcheck.app_started_recently?()
end
end

describe "oban_attempted_jobs_recently?" do
test "job attempted recently" do
# Attempted less than 60 minutes ago
Transport.Jobs.ResourceUnavailableJob.new(%{resource_id: 1})
|> Oban.insert!()
|> Ecto.Changeset.change(attempted_at: DateTime.add(DateTime.utc_now(), -59, :minute), state: "completed")
|> DB.Repo.update!()

assert WorkerHealthcheck.oban_attempted_jobs_recently?()
end

test "job attempted too long ago" do
# Attempted more than 60 minutes ago
Transport.Jobs.ResourceUnavailableJob.new(%{resource_id: 1})
|> Oban.insert!()
|> Ecto.Changeset.change(attempted_at: DateTime.add(DateTime.utc_now(), -61, :minute), state: "completed")
|> DB.Repo.update!()

refute WorkerHealthcheck.oban_attempted_jobs_recently?()
end
end

describe "call" do
test "healthy system", %{conn: conn} do
assert WorkerHealthcheck.app_started_recently?()
refute WorkerHealthcheck.oban_attempted_jobs_recently?()
assert WorkerHealthcheck.healthy_state?()

assert conn |> WorkerHealthcheck.call(if: {__MODULE__, :plug_enabled?}) |> text_response(200)
end

test "unhealthy system", %{conn: conn} do
datetime = DateTime.add(DateTime.utc_now(), -30, :minute)
Cachex.put(@cache_name, @cache_key, datetime)

refute WorkerHealthcheck.app_started_recently?()
refute WorkerHealthcheck.oban_attempted_jobs_recently?()
refute WorkerHealthcheck.healthy_state?()

assert conn |> WorkerHealthcheck.call(if: {__MODULE__, :plug_enabled?}) |> text_response(503)
end
end

def plug_enabled?, do: true
end
Loading