Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend the API with opaque runs #208

Merged
merged 8 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"-no_auto_compile -dir ebin -logdir log/ct --erl_args -smp enable -boot start_sasl"},
{cover_enabled, true},
{cover_export_enabled, true},
{cover_opts, [verbose]},
{cover_opts, [verbose, {min_coverage, 92}]},
{ct_opts, [{verbose, true}]},
{deps, [{katana, "1.0.0"}, {mixer, "1.2.0", {pkg, inaka_mixer}}, {meck, "0.9.2"}]},
{dialyzer,
Expand Down
76 changes: 67 additions & 9 deletions src/wpool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@
%%
%% Defaults to `5'. See {@link wpool_pool} for more details.

-type queue_type() :: wpool_queue_manager:queue_type().
-type queue_type() :: fifo | lifo.
%% Order in which requests will be stored and handled by workers.
%%
%% This option can take values `lifo' or `fifo'. Defaults to `fifo'.
Expand All @@ -164,6 +164,27 @@
%% Callbacks can be added and removed later by `wpool_pool:add_callback_module/2' and
%% `wpool_pool:remove_callback_module/2'.

-type run(Result) :: fun((wpool:name() | pid()) -> Result).
%% A function to run with a given worker.
%%
%% It can be used to enable APIs that hide the gen_server behind a complex logic
%% that might for example curate parameters or run side-effects, for example, `supervisor'.
%%
%% For example:
%% ```
%% Opts =
%% #{workers => 3,
%% worker_shutdown => infinity,
%% worker => {supervisor, {Name, ModuleCallback, Args}}},
%% %% Note that the supervisor's `init/1' callback takes such 3-tuple.
%% {ok, Pid} = wpool:start_sup_pool(pool_of_supervisors, Opts),
%%
%% ...
%%
%% Run = fun(Sup) -> supervisor:start_child(Sup, Params) end,
%% {ok, Pid} = wpool:run(pool_of_supervisors, Run, next_worker),
%% '''

-type name() :: atom().
%% Name of the pool

Expand Down Expand Up @@ -273,13 +294,15 @@
{workers, [{pos_integer(), worker_stats()}]}].
%% Statistics about a given live pool.

-export_type([name/0, option/0, options/0, custom_strategy/0, strategy/0, worker_stats/0, stats/0]).
-export_type([name/0, option/0, options/0, custom_strategy/0, strategy/0,
queue_type/0, run/1, worker_stats/0, stats/0]).

-export([start/0, start/2, stop/0, stop/1]).
-export([child_spec/2, start_pool/1, start_pool/2, start_sup_pool/1, start_sup_pool/2]).
-export([stop_pool/1, stop_sup_pool/1]).
-export([call/2, cast/2, call/3, cast/3, call/4, broadcall/3, broadcast/2]).
-export([send_request/2, send_request/3, send_request/4]).
-export([call/2, call/3, call/4, cast/2, cast/3,
run/2, run/3, run/4, broadcall/3, broadcast/2,
send_request/2, send_request/3, send_request/4]).
-export([stats/0, stats/1, get_workers/1]).
-export([default_strategy/0]).

Expand Down Expand Up @@ -369,6 +392,38 @@ default_strategy() ->
Strategy
end.

%% @equiv run(Sup, Run, default_strategy())
-spec run(name(), run(Result)) -> Result.
run(Sup, Run) ->
run(Sup, Run, default_strategy()).

%% @equiv run(Sup, Run, Strategy, 5000)
-spec run(name(), run(Result), strategy()) -> Result.
run(Sup, Run, Strategy) ->
run(Sup, Run, Strategy, 5000).

%% @doc Picks a server and issues the run to it.
%%
%% For all strategies except available_worker, Timeout applies only to the
%% time spent on the actual run to the worker, because time spent finding
%% the worker in other strategies is negligible.
%% For available_worker the time used choosing a worker is also considered
-spec run(name(), run(Result), strategy(), timeout()) -> Result.
run(Sup, Run, available_worker, Timeout) ->
wpool_pool:run_with_available_worker(Sup, Run, Timeout);
run(Sup, Run, next_available_worker, _Timeout) ->
wpool_process:run(wpool_pool:next_available_worker(Sup), Run);
run(Sup, Run, next_worker, _Timeout) ->
wpool_process:run(wpool_pool:next_worker(Sup), Run);
run(Sup, Run, random_worker, _Timeout) ->
wpool_process:run(wpool_pool:random_worker(Sup), Run);
run(Sup, Run, best_worker, _Timeout) ->
wpool_process:run(wpool_pool:best_worker(Sup), Run);
run(Sup, Run, {hash_worker, HashKey}, _Timeout) ->
wpool_process:run(wpool_pool:hash_worker(Sup, HashKey), Run);
run(Sup, Run, Fun, _Timeout) when is_function(Fun, 1) ->
wpool_process:run(Fun(Sup), Run).

%% @equiv call(Sup, Call, default_strategy())
-spec call(name(), term()) -> term().
call(Sup, Call) ->
Expand All @@ -380,10 +435,11 @@ call(Sup, Call, Strategy) ->
call(Sup, Call, Strategy, 5000).

%% @doc Picks a server and issues the call to it.
%% For all strategies except available_worker, Timeout applies only to the
%% time spent on the actual call to the worker, because time spent finding
%% the worker in other strategies is negligible.
%% For available_worker the time used choosing a worker is also considered
%%
%% For all strategies except available_worker, Timeout applies only to the
%% time spent on the actual call to the worker, because time spent finding
%% the worker in other strategies is negligible.
%% For available_worker the time used choosing a worker is also considered
-spec call(name(), term(), strategy(), timeout()) -> term().
call(Sup, Call, available_worker, Timeout) ->
wpool_pool:call_available_worker(Sup, Call, Timeout);
Expand Down Expand Up @@ -434,7 +490,8 @@ send_request(Sup, Call, Strategy) ->
send_request(Sup, Call, Strategy, 5000).

%% @doc Picks a server and issues the call to it.
%% Timeout applies only for the time used choosing a worker in the available_worker strategy
%%
%% Timeout applies only for the time used choosing a worker in the available_worker strategy
-spec send_request(name(), term(), strategy(), timeout()) ->
noproc | timeout | gen_server:request_id().
send_request(Sup, Call, available_worker, Timeout) ->
Expand Down Expand Up @@ -486,6 +543,7 @@ stats(Sup) ->
wpool_pool:stats(Sup).

%% @doc Retrieves the list of worker registered names.
%%
%% This can be useful to manually inspect the workers or do custom work on them.
-spec get_workers(name()) -> [atom()].
get_workers(Sup) ->
Expand Down
40 changes: 33 additions & 7 deletions src/wpool_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
%% API
-export([start_link/2]).
-export([best_worker/1, random_worker/1, next_worker/1, hash_worker/2,
next_available_worker/1, send_request_available_worker/3, call_available_worker/3]).
next_available_worker/1, send_request_available_worker/3, call_available_worker/3,
run_with_available_worker/3]).
-export([cast_to_available_worker/2, broadcast/2, broadcall/3]).
-export([stats/0, stats/1, get_workers/1]).
-export([worker_name/2, find_wpool/1]).
Expand Down Expand Up @@ -112,19 +113,44 @@ next_available_worker(Name) ->
end
end.

%% @doc Picks the first available worker and sends the call to it.
%% The timeout provided includes the time it takes to get a worker
%% and for it to process the call.
%% @throws no_workers | timeout
-spec run_with_available_worker(wpool:name(), wpool:run(Result), timeout()) -> Result.
run_with_available_worker(Name, Run, Timeout) ->
case find_wpool(Name) of
undefined ->
exit(no_workers);
#wpool{qmanager = QManager} ->
case wpool_queue_manager:run_with_available_worker(QManager, Run, Timeout) of
noproc ->
exit(no_workers);
timeout ->
exit(timeout);
Result ->
Result
end
end.

%% @doc Picks the first available worker and sends the call to it.
%% The timeout provided includes the time it takes to get a worker
%% and for it to process the call.
%% @throws no_workers | timeout
-spec call_available_worker(wpool:name(), any(), timeout()) -> any().
call_available_worker(Name, Call, Timeout) ->
case wpool_queue_manager:call_available_worker(queue_manager_name(Name), Call, Timeout) of
noproc ->
case find_wpool(Name) of
undefined ->
exit(no_workers);
timeout ->
exit(timeout);
Result ->
Result
#wpool{qmanager = QManager} ->
case wpool_queue_manager:call_available_worker(QManager, Call, Timeout) of
noproc ->
exit(no_workers);
timeout ->
exit(timeout);
Result ->
Result
end
end.

%% @doc Picks the first available worker and sends the request to it.
Expand Down
7 changes: 6 additions & 1 deletion src/wpool_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
-export_type([next_step/0]).

%% api
-export([start_link/4, call/3, cast/2, send_request/2]).
-export([start_link/4, run/2, call/3, cast/2, send_request/2]).

-ifdef(TEST).

Expand All @@ -91,6 +91,11 @@ start_link(Name, Module, InitArgs, Options) ->
{Name, Module, InitArgs, FullOpts},
WorkerOpt).

%% @doc Runs a function that takes as a parameter the given process
-spec run(wpool:name() | pid(), wpool:run(Result)) -> Result.
run(Process, Run) ->
Run(Process).
elbrujohalcon marked this conversation as resolved.
Show resolved Hide resolved

%% @equiv gen_server:call(Process, Call, Timeout)
-spec call(wpool:name() | pid(), term(), timeout()) -> term().
call(Process, Call, Timeout) ->
Expand Down
35 changes: 24 additions & 11 deletions src/wpool_queue_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@

%% api
-export([start_link/2, start_link/3]).
-export([call_available_worker/3, cast_to_available_worker/2, new_worker/2, worker_dead/2,
send_request_available_worker/3, worker_ready/2, worker_busy/2, pending_task_count/1]).
-export([run_with_available_worker/3, call_available_worker/3, cast_to_available_worker/2,
new_worker/2, worker_dead/2, send_request_available_worker/3, worker_ready/2,
worker_busy/2, pending_task_count/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).

Expand All @@ -28,7 +29,7 @@
clients :: queue:queue({cast | {pid(), _}, term()}),
workers :: gb_sets:set(atom()),
monitors :: #{atom() := monitored_from()},
queue_type :: queue_type()}).
queue_type :: wpool:queue_type()}).

-opaque state() :: #state{}.

Expand All @@ -50,15 +51,14 @@

-type arg() :: option() | pool.
-type queue_mgr() :: atom().
-type queue_type() :: fifo | lifo.
-type worker_event() :: new_worker | worker_dead | worker_busy | worker_ready.

-export_type([worker_event/0]).

-type call_request() :: {available_worker, infinity | pos_integer()} | pending_task_count.

-export_type([call_request/0]).
-export_type([queue_mgr/0, queue_type/0]).
-export_type([queue_mgr/0]).

%%%===================================================================
%%% API
Expand All @@ -71,6 +71,20 @@ start_link(WPool, Name) ->
start_link(WPool, Name, Options) ->
gen_server:start_link({local, Name}, ?MODULE, [{pool, WPool} | Options], []).

%% @doc returns the first available worker in the pool
-spec run_with_available_worker(queue_mgr(), wpool:run(Result), timeout()) ->
noproc | timeout | Result.
run_with_available_worker(QueueManager, Call, Timeout) ->
case get_available_worker(QueueManager, Call, Timeout) of
{ok, TimeLeft, Worker} when TimeLeft > 0 ->
wpool_process:run(Worker, Call);
{ok, _, Worker} ->
worker_ready(QueueManager, Worker),
timeout;
Other ->
Other
end.

%% @doc returns the first available worker in the pool
-spec call_available_worker(queue_mgr(), any(), timeout()) -> noproc | timeout | any().
call_available_worker(QueueManager, Call, Timeout) ->
Expand Down Expand Up @@ -239,8 +253,7 @@ handle_info(_Info, State) ->
-spec get_available_worker(queue_mgr(), any(), timeout()) ->
noproc | timeout | {ok, timeout(), any()}.
get_available_worker(QueueManager, Call, Timeout) ->
Start = now_in_milliseconds(),
ExpiresAt = expires(Timeout, Start),
ExpiresAt = expires(Timeout),
elbrujohalcon marked this conversation as resolved.
Show resolved Hide resolved
try gen_server:call(QueueManager, {available_worker, ExpiresAt}, Timeout) of
{'EXIT', _, noproc} ->
noproc;
Expand Down Expand Up @@ -268,11 +281,11 @@ inc(Key) ->
dec(Key) ->
put(Key, get(Key) - 1).

-spec expires(timeout(), integer()) -> timeout().
expires(infinity, _) ->
-spec expires(timeout()) -> timeout().
expires(infinity) ->
infinity;
expires(Timeout, NowMs) ->
NowMs + Timeout.
expires(Timeout) ->
now_in_milliseconds() + Timeout.

-spec time_left(timeout()) -> timeout().
time_left(infinity) ->
Expand Down
5 changes: 5 additions & 0 deletions test/echo_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
-behaviour(gen_server).

%% gen_server callbacks
-export([start_link/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2,
handle_continue/2, format_status/1]).

Expand All @@ -26,6 +27,10 @@

-export_type([from/0]).

-spec start_link(term()) -> gen_server:start_ret().
start_link(Something) ->
gen_server:start_link(?MODULE, Something, []).

%%%===================================================================
%%% callbacks
%%%===================================================================
Expand Down
23 changes: 23 additions & 0 deletions test/echo_supervisor.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
-module(echo_supervisor).

-behaviour(supervisor).

-export([start_link/0]).
-export([init/1]).

start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, noargs).

init(noargs) ->
Children =
#{id => undefined,
start => {echo_server, start_link, []},
restart => transient,
shutdown => 5000,
type => worker,
modules => [echo_server]},
Strategy =
#{strategy => simple_one_for_one,
intensity => 5,
period => 60},
{ok, {Strategy, [Children]}}.
29 changes: 27 additions & 2 deletions test/wpool_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
-export([stats/1, stop_pool/1, non_brutal_shutdown/1, brutal_worker_shutdown/1, overrun/1,
kill_on_overrun/1, too_much_overrun/1, default_strategy/1, overrun_handler1/1,
overrun_handler2/1, default_options/1, complete_coverage/1, child_spec/1, broadcall/1,
broadcast/1, send_request/1, worker_killed_stats/1, accepts_maps_and_lists_as_opts/1]).
broadcast/1, send_request/1, worker_killed_stats/1, accepts_maps_and_lists_as_opts/1,
pool_of_supervisors/1]).

-elvis([{elvis_style, no_block_expressions, disable}]).

Expand All @@ -51,7 +52,8 @@ all() ->
send_request,
kill_on_overrun,
worker_killed_stats,
accepts_maps_and_lists_as_opts].
accepts_maps_and_lists_as_opts,
pool_of_supervisors].

-spec init_per_suite(config()) -> config().
init_per_suite(Config) ->
Expand Down Expand Up @@ -515,6 +517,29 @@ accepts_maps_and_lists_as_opts(_Config) ->

{comment, []}.

-spec pool_of_supervisors(config()) -> {comment, string()}.
pool_of_supervisors(_Config) ->
Opts =
#{workers => 3,
worker_shutdown => infinity,
worker => {supervisor, {echo_supervisor, echo_supervisor, noargs}}},

{ok, Pid} = wpool:start_sup_pool(pool_of_supervisors, Opts),
true = erlang:is_process_alive(Pid),

[begin
Run = fun(Sup) -> supervisor:start_child(Sup, [{ok, #{}}]) end,
{ok, EchoServer} = wpool:run(pool_of_supervisors, Run, next_worker),
true = erlang:is_process_alive(EchoServer)
end
|| _N <- lists:seq(1, 9)],
NelsonVides marked this conversation as resolved.
Show resolved Hide resolved

Supervisors = wpool:get_workers(pool_of_supervisors),
[3 = proplists:get_value(active, supervisor:count_children(Supervisor))
|| Supervisor <- Supervisors],

{comment, "Nicely load-balanced childrens across supervisors"}.

%% =============================================================================
%% Helpers
%% =============================================================================
Expand Down
Loading
Loading