Skip to content

Commit

Permalink
Merge branch 'master' into move-stuff-from-data-checker-to-oban-jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
vdegove committed Jan 17, 2025
2 parents d278d6c + 4897113 commit 37fdaf0
Show file tree
Hide file tree
Showing 41 changed files with 499 additions and 157 deletions.
2 changes: 1 addition & 1 deletion apps/transport/lib/db/dataset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ defmodule DB.Dataset do
@spec type_to_str_map() :: %{binary() => binary()}
def type_to_str_map,
do: %{
"public-transit" => dgettext("db-dataset", "Public transit - static schedules"),
"public-transit" => dgettext("db-dataset", "Public transit"),
"carpooling-areas" => dgettext("db-dataset", "Carpooling areas"),
"carpooling-lines" => dgettext("db-dataset", "Carpooling lines"),
"carpooling-offers" => dgettext("db-dataset", "Carpooling offers"),
Expand Down
38 changes: 33 additions & 5 deletions apps/transport/lib/jobs/new_datagouv_datasets_job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ defmodule Transport.Jobs.NewDatagouvDatasetsJob do
"trottinette",
"vls",
"scooter",
"scooters",
"libre-service",
"libre service",
"scooter"
"libre service"
]),
formats: MapSet.new(["gbfs"])
},
Expand All @@ -51,7 +51,7 @@ defmodule Transport.Jobs.NewDatagouvDatasetsJob do
"etalab/schema-comptage-mobilites-measure",
"etalab/schema-comptage-mobilites-site"
],
tags: MapSet.new(["cyclable", "parking", "stationnement", "vélo"]),
tags: MapSet.new(["cyclable", "cyclables", "parking", "parkings", "stationnement", "vélo", "vélos"]),
formats: MapSet.new([])
},
%{
Expand Down Expand Up @@ -221,7 +221,17 @@ defmodule Transport.Jobs.NewDatagouvDatasetsJob do

defp string_matches?(str, %{formats: formats, tags: tags} = _rule) when is_binary(str) do
searches = MapSet.union(formats, tags) |> MapSet.to_list() |> Enum.map(&normalize/1)
str |> normalize() |> String.contains?(searches)
{words_with_spaces, words_without_spaces} = Enum.split_with(searches, &String.contains?(&1, " "))

match_without_spaces =
not (str
|> normalize()
|> String.split(~r/\s+/)
|> MapSet.new()
|> MapSet.disjoint?(MapSet.new(words_without_spaces)))

match_with_spaces = str |> normalize() |> String.contains?(words_with_spaces)
match_without_spaces || match_with_spaces
end

defp tags_is_relevant?(%{"tags" => tags} = _dataset, rule) do
Expand Down Expand Up @@ -257,8 +267,26 @@ defmodule Transport.Jobs.NewDatagouvDatasetsJob do
"velo"
iex> normalize("Châteauroux")
"chateauroux"
iex> normalize("J'adore manger")
"j'adore manger"
"""
def normalize(value) do
value |> String.normalize(:nfd) |> String.replace(~r/[^A-z]/u, "") |> String.downcase()
value
|> String.downcase()
|> String.graphemes()
|> Enum.map_join("", &normalize_grapheme/1)
end

defp normalize_grapheme(grapheme) do
case String.normalize(grapheme, :nfd) do
<<first, rest::binary>> when is_binary(rest) ->
case String.valid?(<<first>>) do
true -> <<first>>
false -> ""
end

_ ->
grapheme
end
end
end
27 changes: 25 additions & 2 deletions apps/transport/lib/jobs/oban_logger.ex
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
defmodule Transport.Jobs.ObanLogger do
@moduledoc """
Logs the Oban job exceptions as warnings
Setup telemetry/logging for Oban.
We:
- log job exceptions as warnings
- log Oban events related to the orchestration (notifier, queues, plugins etc.)
- we send an email when a job failed after its maximum attempt for jobs with a specific tag
"""
require Logger

@tag_email_on_failure "email_on_failure"

@doc """
Expand Down Expand Up @@ -35,5 +41,22 @@ defmodule Transport.Jobs.ObanLogger do
)
end

def setup, do: :telemetry.attach("oban-logger", [:oban, :job, :exception], &handle_event/4, nil)
def setup do
:telemetry.attach("oban-logger", [:oban, :job, :exception], &handle_event/4, nil)

# Log recommended events for production.
# We leave out `job` events because job start/end can be quite noisy.
# https://hexdocs.pm/oban/preparing_for_production.html#logging
# https://hexdocs.pm/oban/Oban.Telemetry.html
# We may simplify this when
# https://github.com/oban-bg/oban/commit/13eabe3f8019e350ef979369a26f186bdf7be63e
# will be released.
events = [
[:oban, :notifier, :switch],
[:oban, :queue, :shutdown],
[:oban, :stager, :switch]
]

:telemetry.attach_many("oban-default-logger", events, &Oban.Telemetry.handle_event/4, encode: true, level: :info)
end
end
11 changes: 9 additions & 2 deletions apps/transport/lib/jobs/workflow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ defmodule Transport.Jobs.Workflow do

{:notification, :gossip, %{"success" => false, "job_id" => ^job_id} = notif} ->
reason = notif |> Map.get("reason", "unknown reason")
{:error, "Job #{job_id} has failed: #{inspect(reason)}. Workflow is stopping here"}
{:error, "Job #{job_id} has failed: #{reason}. Workflow is stopping here"}
end
end
end
Expand Down Expand Up @@ -173,10 +173,17 @@ defmodule Transport.Jobs.Workflow do
},
nil
) do
# `error` can be an error message or an `Oban.TimeoutError` exception.
# ````
# %Oban.TimeoutError{
# message: "Transport.Jobs.ResourceHistoryJob timed out after 1000ms",
# reason: :timeout
# }
# ```
Notifier.notify_workflow(%{meta: %{"workflow" => true}}, %{
"success" => false,
"job_id" => job_id,
"reason" => error
"reason" => inspect(error)
})
end

Expand Down
2 changes: 1 addition & 1 deletion apps/transport/lib/transport_web/plugs/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule TransportWeb.Plugs.Router do
use Plug.Router

plug(TransportWeb.Plugs.HealthCheck, at: "/health-check")
plug(TransportWeb.Plugs.Halt, if: {Transport.Application, :worker_only?}, message: "UP (WORKER-ONLY)")
plug(TransportWeb.Plugs.WorkerHealthcheck, if: {Transport.Application, :worker_only?})

plug(:match)
plug(:dispatch)
Expand Down
118 changes: 118 additions & 0 deletions apps/transport/lib/transport_web/plugs/worker_healthcheck.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
defmodule TransportWeb.Plugs.WorkerHealthcheck do
@moduledoc """
A plug for the worker.
It can be conditionally enabled by passing an `:if` condition that will be evaluated.
It displays:
- when the app was started
- the last attempt for Oban jobs
- if the system is healthy
The system is considered healthy if the app was started recently or
if Oban attempted jobs recently.
"""
import Plug.Conn
require Logger

@app_start_waiting_delay {20, :minute}
@oban_max_delay_since_last_attempt {60, :minute}

def init(options), do: options

def call(conn, opts) do
{mod, fun} = opts[:if]

if apply(mod, fun, []) do
store_last_attempted_at_delay_metric()
status_code = if healthy_state?(), do: 200, else: 503

conn =
conn
|> put_resp_content_type("text/plain")
|> send_resp(status_code, """
UP (WORKER-ONLY)
App start time: #{app_start_datetime()}
App started recently?: #{app_started_recently?()}
Oban last attempt: #{oban_last_attempted_at()}
Oban attempted jobs recently?: #{oban_attempted_jobs_recently?()}
Healthy state?: #{healthy_state?()}
""")
|> halt()

# NOTE: Clever Cloud monitoring will better pick stuff back up
# if the app is completely down.
if !healthy_state?() do
Logger.info("Hot-fix: shutting down!!!")
stop_the_beam!()
end

conn
else
conn
end
end

@doc """
A fix for https://github.com/etalab/transport-site/issues/4377.
If the worker sees that no jobs have been attempted by Oban for some time,
this plug's logic stops the whole program (BEAM/VM) completely. Because the
Clever Cloud monitoring checks that they can open a socket to the 8080 port,
this makes the test fails, hence resulting in an automatic restart.
This is a cheap but so far effective way to ensure the worker gets restarted
when it malfunctions.
"""
def stop_the_beam! do
# "Asynchronously and carefully stops the Erlang runtime system."
if Mix.env() == :test do
# We do not want to stop the system during tests, because it
# gives the impression the test suite completed successfully, but
# it would actually just bypass all the tests after the one running this!
raise "would halt the BEAM"
else
# Also make sure to return with a non-zero exit code, to more clearly
# indicate that this is not the normal output
System.stop(1)
end
end

def store_last_attempted_at_delay_metric do
value = DateTime.diff(oban_last_attempted_at(), DateTime.utc_now(), :second)
Appsignal.add_distribution_value("oban.last_attempted_at_delay", value)
end

def healthy_state? do
app_started_recently?() or oban_attempted_jobs_recently?()
end

def app_started_recently? do
{delay, unit} = @app_start_waiting_delay
DateTime.diff(DateTime.utc_now(), app_start_datetime(), unit) < delay
end

def app_start_datetime do
Transport.Cache.fetch(app_start_datetime_cache_key_name(), fn -> DateTime.utc_now() end, expire: nil)
end

def app_start_datetime_cache_key_name, do: "#{__MODULE__}::app_start_datetime"

def oban_attempted_jobs_recently? do
{delay, unit} = @oban_max_delay_since_last_attempt
DateTime.after?(oban_last_attempted_at(), DateTime.add(DateTime.utc_now(), -delay, unit))
end

def oban_last_attempted_at do
%Postgrex.Result{rows: [[delay]]} =
DB.Repo.query!("""
SELECT MAX(attempted_at)
FROM oban_jobs
WHERE state = 'completed'
""")

case delay do
nil -> DateTime.new!(~D[1970-01-01], ~T[00:00:00.000], "Etc/UTC")
%NaiveDateTime{} = nt -> DateTime.from_naive!(nt, "Etc/UTC")
end
end
end
12 changes: 0 additions & 12 deletions apps/transport/lib/transport_web/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ defmodule TransportWeb.Router do
pipeline :reuser_space do
plug(:browser)
plug(:authentication_required, destination_path: "/infos_reutilisateurs")
plug(:check_reuser_space_enabled)
end

scope "/", OpenApiSpex.Plug do
Expand Down Expand Up @@ -387,17 +386,6 @@ defmodule TransportWeb.Router do
end
end

def check_reuser_space_enabled(%Plug.Conn{} = conn, _) do
if TransportWeb.Session.display_reuser_space?(conn) do
conn
else
conn
|> put_flash(:info, dgettext("alert", "This feature is currently not available."))
|> redirect(to: "/")
|> halt()
end
end

# Check that a secret key is passed in the URL in the `export_key` query parameter
defp check_export_secret_key(%Plug.Conn{params: params} = conn, _) do
export_key_value = Map.get(params, "export_key", "")
Expand Down
17 changes: 0 additions & 17 deletions apps/transport/lib/transport_web/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,6 @@ defmodule TransportWeb.Session do
DB.Dataset.base_query() |> where([dataset: d], d.organization_id in ^org_ids) |> DB.Repo.exists?()
end

@doc """
A temporary helper method to determine if we should display "reuser space features".
Convenient method to find various entrypoints in the codebase:
- links and buttons to the reuser space
- follow dataset hearts (search results, dataset pages)
- reuser space
Enable it for everybody but keep a "kill switch" to disable it quickly
by setting an environment variable and rebooting the app.
transport.data.gouv.fr admins get access no matter what.
"""
def display_reuser_space?(%Plug.Conn{} = conn) do
feature_disabled = Application.fetch_env!(:transport, :disable_reuser_space)
admin?(conn) or not feature_disabled
end

@spec set_session_attribute_attribute(Plug.Conn.t(), binary(), boolean()) :: Plug.Conn.t()
defp set_session_attribute_attribute(%Plug.Conn{} = conn, key, value) do
current_user = current_user(conn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,30 @@
<i class="fa fa-external-link-alt"></i>
<%= link("Backoffice", to: backoffice_page_path(@conn, :edit, @dataset.id)) %> &middot;
<% end %>
<%= if TransportWeb.Session.display_reuser_space?(@conn) do %>
<i class="fa fa-external-link-alt"></i>
<%= if @current_user do %>
<%= if @is_producer do %>
<%= link(dgettext("default", "Producer space"),
to: espace_producteur_path(@conn, :edit_dataset, @dataset.id, utm_campaign: "dataset_details"),
<i class="fa fa-external-link-alt"></i>
<%= if @current_user do %>
<%= if @is_producer do %>
<%= link(dgettext("default", "Producer space"),
to: espace_producteur_path(@conn, :edit_dataset, @dataset.id, utm_campaign: "dataset_details"),
target: "_blank"
) %>
<% else %>
<%= if @follows_dataset do %>
<%= link(dgettext("default", "Reuser space"),
to: reuser_space_path(@conn, :datasets_edit, @dataset.id, utm_campaign: "dataset_details"),
target: "_blank"
) %>
<% else %>
<%= if @follows_dataset do %>
<%= link(dgettext("default", "Reuser space"),
to: reuser_space_path(@conn, :datasets_edit, @dataset.id, utm_campaign: "dataset_details"),
target: "_blank"
) %>
<% else %>
<%= link(dgettext("default", "Reuser space"),
to: reuser_space_path(@conn, :espace_reutilisateur, utm_campaign: "dataset_details"),
target: "_blank"
) %>
<% end %>
<%= link(dgettext("default", "Reuser space"),
to: reuser_space_path(@conn, :espace_reutilisateur, utm_campaign: "dataset_details"),
target: "_blank"
) %>
<% end %>
<% else %>
<%= link(dgettext("default", "Reuser space"),
to: page_path(@conn, :infos_reutilisateurs, utm_campaign: "dataset_details"),
target: "_blank"
) %>
<% end %>
<% else %>
<%= link(dgettext("default", "Reuser space"),
to: page_path(@conn, :infos_reutilisateurs, utm_campaign: "dataset_details"),
target: "_blank"
) %>
<% end %>
</div>
Loading

0 comments on commit 37fdaf0

Please sign in to comment.