diff --git a/rebar.config b/rebar.config
index 55f1e58..6a94c0b 100644
--- a/rebar.config
+++ b/rebar.config
@@ -36,11 +36,11 @@
%% == Dependencies and plugins ==
{project_plugins,
- [{rebar3_hank, "~> 1.4.0"},
- {rebar3_hex, "~> 7.0.7"},
+ [{rebar3_hank, "~> 1.4.1"},
+ {rebar3_hex, "~> 7.0.8"},
{rebar3_format, "~> 1.3.0"},
- {rebar3_lint, "~> 3.1.0"},
- {rebar3_ex_doc, "~> 0.2.20"},
+ {rebar3_lint, "~> 3.2.6"},
+ {rebar3_ex_doc, "~> 0.2.23"},
{rebar3_depup, "~> 0.4.0"},
{covertool, "~> 2.0.6"}]}.
diff --git a/src/wpool.erl b/src/wpool.erl
index 2b7b04c..e3bac4e 100644
--- a/src/wpool.erl
+++ b/src/wpool.erl
@@ -121,7 +121,7 @@
%%
%% These are the same as described at the `gen_server' documentation.
--type worker_shutdown() :: worker_shutdown().
+-type worker_shutdown() :: brutal_kill | timeout().
%% The `shutdown' option to be used over the individual workers.
%%
%% Defaults to `5000'. See {@link wpool_process_sup} for more details.
@@ -187,7 +187,27 @@
%% `child_spec/2', `start_pool/2', `start_sup_pool/2' are the callbacks
%% that take a list of these options as a parameter.
--type custom_strategy() :: fun(([atom()]) -> Atom :: atom()).
+-type options() :: #{workers => workers(),
+ worker => worker(),
+ worker_opt => [worker_opt()],
+ strategy => supervisor_strategy(),
+ worker_shutdown => worker_shutdown(),
+ overrun_handler => overrun_handler(),
+ overrun_warning => overrun_warning(),
+ max_overrun_warnings => max_overrun_warnings(),
+ pool_sup_intensity => pool_sup_intensity(),
+ pool_sup_shutdown => pool_sup_shutdown(),
+ pool_sup_period => pool_sup_period(),
+ queue_type => queue_type(),
+ enable_callbacks => enable_callbacks(),
+ callbacks => callbacks(),
+ _ => _}.
+%% Options that can be provided to a new pool.
+%%
+%% `child_spec/2', `start_pool/2', `start_sup_pool/2' are the callbacks
+%% that take a list of these options as a parameter.
+
+-type custom_strategy() :: fun((atom()) -> Atom :: atom()).
%% A callback that gets the pool name and returns a worker's name.
-type strategy() ::
@@ -246,14 +266,14 @@
-type stats() ::
[{pool, name()} |
{supervisor, pid()} |
- {options, [option()]} |
+ {options, [option()] | options()} |
{size, non_neg_integer()} |
{next_worker, pos_integer()} |
{total_message_queue_len, non_neg_integer()} |
{workers, [{pos_integer(), worker_stats()}]}].
%% Statistics about a given live pool.
--export_type([name/0, option/0, custom_strategy/0, strategy/0, worker_stats/0, stats/0]).
+-export_type([name/0, option/0, options/0, custom_strategy/0, strategy/0, worker_stats/0, stats/0]).
-export([start/0, start/2, stop/0, stop/1]).
-export([child_spec/2, start_pool/1, start_pool/2, start_sup_pool/1, start_sup_pool/2]).
@@ -280,7 +300,7 @@ stop() ->
%% BEHAVIOUR CALLBACKS
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% @private
--spec start(any(), any()) -> {ok, pid()} | {error, term()}.
+-spec start(any(), any()) -> supervisor:startlink_ret().
start(_StartType, _StartArgs) ->
wpool_sup:start_link().
@@ -293,20 +313,19 @@ stop(_State) ->
%% PUBLIC API
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% @equiv start_pool(Name, [])
--spec start_pool(name()) -> {ok, pid()}.
+-spec start_pool(name()) -> supervisor:startlink_ret().
start_pool(Name) ->
start_pool(Name, []).
%% @doc Starts (and links) a pool of N wpool_processes.
%% The result pid belongs to a supervisor (in case you want to add it to a
%% supervisor tree)
--spec start_pool(name(), [option()]) ->
- {ok, pid()} | {error, {already_started, pid()} | term()}.
+-spec start_pool(name(), [option()] | options()) -> supervisor:startlink_ret().
start_pool(Name, Options) ->
wpool_pool:start_link(Name, wpool_utils:add_defaults(Options)).
%% @doc Builds a child specification to pass to a supervisor.
--spec child_spec(name(), [option()]) -> supervisor:child_spec().
+-spec child_spec(name(), [option()] | options()) -> supervisor:child_spec().
child_spec(Name, Options) ->
FullOptions = wpool_utils:add_defaults(Options),
#{id => Name,
@@ -326,13 +345,12 @@ stop_pool(Name) ->
end.
%% @equiv start_sup_pool(Name, [])
--spec start_sup_pool(name()) -> {ok, pid()} | {error, {already_started, pid()} | term()}.
+-spec start_sup_pool(name()) -> supervisor:startchild_ret().
start_sup_pool(Name) ->
start_sup_pool(Name, []).
%% @doc Starts a pool of N wpool_processes supervised by `wpool_sup'
--spec start_sup_pool(name(), [option()]) ->
- {ok, pid()} | {error, {already_started, pid()} | term()}.
+-spec start_sup_pool(name(), [option()] | options()) -> supervisor:startchild_ret().
start_sup_pool(Name, Options) ->
wpool_sup:start_pool(Name, wpool_utils:add_defaults(Options)).
@@ -369,14 +387,18 @@ call(Sup, Call, Strategy) ->
-spec call(name(), term(), strategy(), timeout()) -> term().
call(Sup, Call, available_worker, Timeout) ->
wpool_pool:call_available_worker(Sup, Call, Timeout);
+call(Sup, Call, next_available_worker, Timeout) ->
+ wpool_process:call(wpool_pool:next_available_worker(Sup), Call, Timeout);
+call(Sup, Call, next_worker, Timeout) ->
+ wpool_process:call(wpool_pool:next_worker(Sup), Call, Timeout);
+call(Sup, Call, random_worker, Timeout) ->
+ wpool_process:call(wpool_pool:random_worker(Sup), Call, Timeout);
+call(Sup, Call, best_worker, Timeout) ->
+ wpool_process:call(wpool_pool:best_worker(Sup), Call, Timeout);
call(Sup, Call, {hash_worker, HashKey}, Timeout) ->
- wpool_process:call(
- wpool_pool:hash_worker(Sup, HashKey), Call, Timeout);
-call(Sup, Call, Fun, Timeout) when is_function(Fun) ->
- wpool_process:call(Fun(Sup), Call, Timeout);
-call(Sup, Call, Strategy, Timeout) ->
- wpool_process:call(
- wpool_pool:Strategy(Sup), Call, Timeout).
+ wpool_process:call(wpool_pool:hash_worker(Sup, HashKey), Call, Timeout);
+call(Sup, Call, Fun, Timeout) when is_function(Fun, 1) ->
+ wpool_process:call(Fun(Sup), Call, Timeout).
%% @equiv cast(Sup, Cast, default_strategy())
-spec cast(name(), term()) -> ok.
@@ -387,14 +409,18 @@ cast(Sup, Cast) ->
-spec cast(name(), term(), strategy()) -> ok.
cast(Sup, Cast, available_worker) ->
wpool_pool:cast_to_available_worker(Sup, Cast);
+cast(Sup, Cast, next_available_worker) ->
+ wpool_process:cast(wpool_pool:next_available_worker(Sup), Cast);
+cast(Sup, Cast, next_worker) ->
+ wpool_process:cast(wpool_pool:next_worker(Sup), Cast);
+cast(Sup, Cast, random_worker) ->
+ wpool_process:cast(wpool_pool:random_worker(Sup), Cast);
+cast(Sup, Cast, best_worker) ->
+ wpool_process:cast(wpool_pool:best_worker(Sup), Cast);
cast(Sup, Cast, {hash_worker, HashKey}) ->
- wpool_process:cast(
- wpool_pool:hash_worker(Sup, HashKey), Cast);
-cast(Sup, Cast, Fun) when is_function(Fun) ->
- wpool_process:cast(Fun(Sup), Cast);
-cast(Sup, Cast, Strategy) ->
- wpool_process:cast(
- wpool_pool:Strategy(Sup), Cast).
+ wpool_process:cast(wpool_pool:hash_worker(Sup, HashKey), Cast);
+cast(Sup, Cast, Fun) when is_function(Fun, 1) ->
+ wpool_process:cast(Fun(Sup), Cast).
%% @equiv send_request(Sup, Call, default_strategy(), 5000)
-spec send_request(name(), term()) -> noproc | timeout | gen_server:request_id().
@@ -413,14 +439,37 @@ send_request(Sup, Call, Strategy) ->
noproc | timeout | gen_server:request_id().
send_request(Sup, Call, available_worker, Timeout) ->
wpool_pool:send_request_available_worker(Sup, Call, Timeout);
+send_request(Sup, Call, next_available_worker, _Timeout) ->
+ wpool_process:send_request(wpool_pool:next_available_worker(Sup), Call);
+send_request(Sup, Call, next_worker, _Timeout) ->
+ wpool_process:send_request(wpool_pool:next_worker(Sup), Call);
+send_request(Sup, Call, random_worker, _Timeout) ->
+ wpool_process:send_request(wpool_pool:random_worker(Sup), Call);
+send_request(Sup, Call, best_worker, _Timeout) ->
+ wpool_process:send_request(wpool_pool:best_worker(Sup), Call);
send_request(Sup, Call, {hash_worker, HashKey}, _Timeout) ->
- wpool_process:send_request(
- wpool_pool:hash_worker(Sup, HashKey), Call);
-send_request(Sup, Call, Fun, _Timeout) when is_function(Fun) ->
- wpool_process:send_request(Fun(Sup), Call);
-send_request(Sup, Call, Strategy, _Timeout) ->
- wpool_process:send_request(
- wpool_pool:Strategy(Sup), Call).
+ wpool_process:send_request(wpool_pool:hash_worker(Sup, HashKey), Call);
+send_request(Sup, Call, Fun, _Timeout) when is_function(Fun, 1) ->
+ wpool_process:send_request(Fun(Sup), Call).
+
+
+%% @doc Casts a message to all the workers within the given pool.
+%%
+%% NOTE: These messages don't get queued, they go straight to the worker's message queues, so
+%% if you're using available_worker strategy to balance the charge and you have some tasks queued up
+%% waiting for the next available worker, the broadcast will reach all the workers before the
+%% queued up tasks.
+-spec broadcast(wpool:name(), term()) -> ok.
+broadcast(Sup, Cast) ->
+ wpool_pool:broadcast(Sup, Cast).
+
+%% @doc Calls all the workers within the given pool async and waits for the responses synchronously.
+%%
+%% If one worker times out, the entire call is considered timed-out.
+-spec broadcall(wpool:name(), term(), timeout()) ->
+ {[Replies :: term()], [Errors :: term()]}.
+broadcall(Sup, Call, Timeout) ->
+ wpool_pool:broadcall(Sup, Call, Timeout).
%% @doc Retrieves a snapshot of statistics for all pools.
%%
@@ -441,21 +490,3 @@ stats(Sup) ->
-spec get_workers(name()) -> [atom()].
get_workers(Sup) ->
wpool_pool:get_workers(Sup).
-
-%% @doc Casts a message to all the workers within the given pool.
-%%
-%% NOTE: These messages don't get queued, they go straight to the worker's message queues, so
-%% if you're using available_worker strategy to balance the charge and you have some tasks queued up
-%% waiting for the next available worker, the broadcast will reach all the workers before the
-%% queued up tasks.
--spec broadcast(wpool:name(), term()) -> ok.
-broadcast(Sup, Cast) ->
- wpool_pool:broadcast(Sup, Cast).
-
-%% @doc Calls all the workers within the given pool async and waits for the responses synchronously.
-%%
-%% If one worker times out, the entire call is considered timed-out.
--spec broadcall(wpool:name(), term(), timeout()) ->
- {[Replies :: term()], [Errors :: term()]}.
-broadcall(Sup, Call, Timeout) ->
- wpool_pool:broadcall(Sup, Call, Timeout).
diff --git a/src/wpool_pool.erl b/src/wpool_pool.erl
index 3314bda..5a12aa9 100644
--- a/src/wpool_pool.erl
+++ b/src/wpool_pool.erl
@@ -40,7 +40,7 @@
size :: pos_integer(),
next :: atomics:atomics_ref(),
workers :: tuple(),
- opts :: [wpool:option()],
+ opts :: wpool:options(),
qmanager :: wpool_queue_manager:queue_mgr(),
born = erlang:system_time(second) :: integer()}).
@@ -53,8 +53,7 @@
%% ===================================================================
%% @doc Starts a supervisor with several `wpool_process'es as its children
--spec start_link(wpool:name(), [wpool:option()]) ->
- {ok, pid()} | {error, {already_started, pid()} | term()}.
+-spec start_link(wpool:name(), wpool:options()) -> supervisor:startlink_ret().
start_link(Name, Options) ->
supervisor:start_link({local, Name}, ?MODULE, {Name, Options}).
@@ -115,7 +114,7 @@ next_available_worker(Name) ->
%% The timeout provided includes the time it takes to get a worker
%% and for it to process the call.
%% @throws no_workers | timeout
--spec call_available_worker(wpool:name(), any(), timeout()) -> atom().
+-spec call_available_worker(wpool:name(), any(), timeout()) -> any().
call_available_worker(Name, Call, Timeout) ->
case wpool_queue_manager:call_available_worker(queue_manager_name(Name), Call, Timeout) of
noproc ->
@@ -190,7 +189,10 @@ broadcall(Name, Call, Timeout) ->
-spec all() -> [wpool:name()].
all() ->
- [Name || {{?MODULE, Name}, _} <- persistent_term:get(), find_wpool(Name) /= undefined].
+ [Name
+ || {{?MODULE, Name}, _} <- persistent_term:get(),
+ is_atom(Name),
+ find_wpool(Name) /= undefined].
%% @doc Retrieves the list of worker registered names.
%% This can be useful to manually inspect the workers or do custom work on them.
@@ -243,7 +245,7 @@ stats(Wpool, Name) ->
PendingTasks = wpool_queue_manager:pending_task_count(Wpool#wpool.qmanager),
[{pool, Name},
{supervisor, erlang:whereis(Name)},
- {options, lists:ukeysort(1, proplists:unfold(Wpool#wpool.opts))},
+ {options, maps:to_list(Wpool#wpool.opts)},
{size, Wpool#wpool.size},
{next_worker, atomics:get(Wpool#wpool.next, 1)},
{total_message_queue_len, Total + PendingTasks},
@@ -322,48 +324,54 @@ time_checker_name(Name) ->
%% Supervisor callbacks
%% ===================================================================
%% @private
--spec init({wpool:name(), [wpool:option()]}) ->
+-spec init({wpool:name(), wpool:options()}) ->
{ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
init({Name, Options}) ->
- Size = proplists:get_value(workers, Options, 100),
- QueueType = proplists:get_value(queue_type, Options),
- OverrunHandler =
- proplists:get_value(overrun_handler, Options, {error_logger, warning_report}),
- TimeChecker = time_checker_name(Name),
- QueueManager = queue_manager_name(Name),
- ProcessSup = process_sup_name(Name),
+ Size = maps:get(workers, Options, 100),
+ QueueType = maps:get(queue_type, Options),
+ OverrunHandler = maps:get(overrun_handler, Options, {error_logger, warning_report}),
+ SupShutdown = maps:get(pool_sup_shutdown, Options, brutal_kill),
+ TimeCheckerName = time_checker_name(Name),
+ QueueManagerName = queue_manager_name(Name),
+ ProcessSupName = process_sup_name(Name),
EventManagerName = event_manager_name(Name),
_Wpool = store_wpool(Name, Size, Options),
+
+ WorkerOpts0 =
+ [{queue_manager, QueueManagerName}, {time_checker, TimeCheckerName}
+ | maybe_event_manager(Options, {event_manager, EventManagerName})],
+ WorkerOpts =
+ maps:merge(
+ maps:from_list(WorkerOpts0), Options),
+
TimeCheckerSpec =
- {TimeChecker,
- {wpool_time_checker, start_link, [Name, TimeChecker, OverrunHandler]},
- permanent,
- brutal_kill,
- worker,
- [wpool_time_checker]},
+ #{id => TimeCheckerName,
+ start => {wpool_time_checker, start_link, [Name, TimeCheckerName, OverrunHandler]},
+ restart => permanent,
+ shutdown => brutal_kill,
+ type => worker,
+ modules => [wpool_time_checker]},
QueueManagerSpec =
- {QueueManager,
- {wpool_queue_manager, start_link, [Name, QueueManager, [{queue_type, QueueType}]]},
- permanent,
- brutal_kill,
- worker,
- [wpool_queue_manager]},
-
+ #{id => QueueManagerName,
+ start =>
+ {wpool_queue_manager,
+ start_link,
+ [Name, QueueManagerName, [{queue_type, QueueType}]]},
+ restart => permanent,
+ shutdown => brutal_kill,
+ type => worker,
+ modules => [wpool_queue_manager]},
EventManagerSpec =
- {EventManagerName,
- {gen_event, start_link, [{local, EventManagerName}]},
- permanent,
- brutal_kill,
- worker,
- dynamic},
+ #{id => EventManagerName,
+ start => {gen_event, start_link, [{local, EventManagerName}]},
+ restart => permanent,
+ shutdown => brutal_kill,
+ type => worker,
+ modules => dynamic},
- SupShutdown = proplists:get_value(pool_sup_shutdown, Options, brutal_kill),
- WorkerOpts =
- [{queue_manager, QueueManager}, {time_checker, TimeChecker} | Options]
- ++ maybe_event_manager(Options, {event_manager, EventManagerName}),
ProcessSupSpec =
- {ProcessSup,
- {wpool_process_sup, start_link, [Name, ProcessSup, WorkerOpts]},
+ {ProcessSupName,
+ {wpool_process_sup, start_link, [Name, ProcessSupName, WorkerOpts]},
permanent,
SupShutdown,
supervisor,
@@ -374,8 +382,8 @@ init({Name, Options}) ->
++ maybe_event_manager(Options, EventManagerSpec)
++ [ProcessSupSpec],
- SupIntensity = proplists:get_value(pool_sup_intensity, Options, 5),
- SupPeriod = proplists:get_value(pool_sup_period, Options, 60),
+ SupIntensity = maps:get(pool_sup_intensity, Options, 5),
+ SupPeriod = maps:get(pool_sup_period, Options, 60),
SupStrategy =
#{strategy => one_for_all,
intensity => SupIntensity,
@@ -519,18 +527,14 @@ build_wpool(Name) ->
try supervisor:count_children(process_sup_name(Name)) of
Children ->
Size = proplists:get_value(active, Children, 0),
- store_wpool(Name, Size, [])
+ store_wpool(Name, Size, #{})
catch
_:Error ->
error_logger:warning_msg("Wpool ~p not found: ~p", [Name, Error]),
undefined
end.
-maybe_event_manager(Options, Item) ->
- EnableEventManager = proplists:get_value(enable_callbacks, Options, false),
- case EnableEventManager of
- true ->
- [Item];
- _ ->
- []
- end.
+maybe_event_manager(#{enable_callbacks := true}, Item) ->
+ [Item];
+maybe_event_manager(_, _) ->
+ [].
diff --git a/src/wpool_process.erl b/src/wpool_process.erl
index 9105e6e..c28c756 100644
--- a/src/wpool_process.erl
+++ b/src/wpool_process.erl
@@ -64,10 +64,6 @@
-export_type([next_step/0]).
--type options() :: [{time_checker | queue_manager, atom()} | wpool:option()].
-
--export_type([options/0]).
-
%% api
-export([start_link/4, call/3, cast/2, send_request/2]).
@@ -85,11 +81,11 @@
%%% API
%%%===================================================================
%% @doc Starts a named process
--spec start_link(wpool:name(), module(), term(), options()) ->
+-spec start_link(wpool:name(), module(), term(), wpool:options()) ->
{ok, pid()} | ignore | {error, {already_started, pid()} | term()}.
start_link(Name, Module, InitArgs, Options) ->
FullOpts = wpool_utils:add_defaults(Options),
- WorkerOpt = proplists:get_value(worker_opt, FullOpts, []),
+ WorkerOpt = maps:get(worker_opt, FullOpts, []),
gen_server:start_link({local, Name},
?MODULE,
{Name, Module, InitArgs, FullOpts},
@@ -106,7 +102,7 @@ cast(Process, Cast) ->
gen_server:cast(Process, Cast).
%% @equiv gen_server:send_request(Process, Request)
--spec send_request(wpool:name() | pid(), term()) -> term().
+-spec send_request(wpool:name() | pid(), term()) -> gen_server:request_id().
send_request(Name, Request) ->
gen_server:send_request(Name, Request).
@@ -122,10 +118,9 @@ get_state(#state{state = State}) ->
%%% init, terminate, code_change, info callbacks
%%%===================================================================
%% @private
--spec init({atom(), atom(), term(), options()}) ->
+-spec init({atom(), atom(), term(), wpool:options()}) ->
{ok, state()} | {ok, state(), next_step()} | {stop, can_not_ignore} | {stop, term()}.
-init({Name, Mod, InitArgs, LOptions}) ->
- Options = maps:from_list(LOptions),
+init({Name, Mod, InitArgs, Options}) ->
wpool_process_callbacks:notify(handle_init_start, Options, [Name]),
CbCache = create_callback_cache(Mod),
case Mod:init(InitArgs) of
diff --git a/src/wpool_process_sup.erl b/src/wpool_process_sup.erl
index 10dc9e6..deb49dd 100644
--- a/src/wpool_process_sup.erl
+++ b/src/wpool_process_sup.erl
@@ -22,37 +22,39 @@
-export([init/1]).
%% @private
--spec start_link(wpool:name(), atom(), [wpool:option()]) -> {ok, pid()}.
+-spec start_link(wpool:name(), atom(), wpool:options()) -> supervisor:startlink_ret().
start_link(Parent, Name, Options) ->
supervisor:start_link({local, Name}, ?MODULE, {Parent, Options}).
%% @private
--spec init({wpool:name(), [wpool:option()]}) ->
+-spec init({wpool:name(), wpool:options()}) ->
{ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
init({Name, Options}) ->
- Workers = proplists:get_value(workers, Options, 100),
- Strategy = proplists:get_value(strategy, Options, {one_for_one, 5, 60}),
+ Workers = maps:get(workers, Options, 100),
+ Strategy = maps:get(strategy, Options, {one_for_one, 5, 60}),
+ WorkerShutdown = maps:get(worker_shutdown, Options, 5000),
+ {Worker, InitArgs} = maps:get(worker, Options, {wpool_worker, undefined}),
maybe_add_event_handler(Options),
- {W, IA} = proplists:get_value(worker, Options, {wpool_worker, undefined}),
- {WorkerType, Worker, InitArgs} = {wpool_process, W, IA},
- WorkerShutdown = proplists:get_value(worker_shutdown, Options, 5000),
WorkerSpecs =
- [{wpool_pool:worker_name(Name, I),
- {WorkerType, start_link, [wpool_pool:worker_name(Name, I), Worker, InitArgs, Options]},
- permanent,
- WorkerShutdown,
- worker,
- [Worker]}
+ [#{id => wpool_pool:worker_name(Name, I),
+ start =>
+ {wpool_process,
+ start_link,
+ [wpool_pool:worker_name(Name, I), Worker, InitArgs, Options]},
+ restart => permanent,
+ shutdown => WorkerShutdown,
+ type => worker,
+ modules => [Worker]}
|| I <- lists:seq(1, Workers)],
{ok, {Strategy, WorkerSpecs}}.
maybe_add_event_handler(Options) ->
- case proplists:get_value(event_manager, Options, undefined) of
+ case maps:get(event_manager, Options, undefined) of
undefined ->
ok;
EventMgr ->
lists:foreach(fun(M) -> add_initial_callback(EventMgr, M) end,
- proplists:get_value(callbacks, Options, []))
+ maps:get(callbacks, Options, []))
end.
add_initial_callback(EventManager, Module) ->
diff --git a/src/wpool_queue_manager.erl b/src/wpool_queue_manager.erl
index 69074fd..7540ab6 100644
--- a/src/wpool_queue_manager.erl
+++ b/src/wpool_queue_manager.erl
@@ -63,13 +63,11 @@
%%%===================================================================
%%% API
%%%===================================================================
--spec start_link(wpool:name(), queue_mgr()) ->
- {ok, pid()} | {error, {already_started, pid()} | term()}.
+-spec start_link(wpool:name(), queue_mgr()) -> gen_server:start_ret().
start_link(WPool, Name) ->
start_link(WPool, Name, []).
--spec start_link(wpool:name(), queue_mgr(), options()) ->
- {ok, pid()} | {error, {already_started, pid()} | term()}.
+-spec start_link(wpool:name(), queue_mgr(), options()) -> gen_server:start_ret().
start_link(WPool, Name, Options) ->
gen_server:start_link({local, Name}, ?MODULE, [{pool, WPool} | Options], []).
diff --git a/src/wpool_sup.erl b/src/wpool_sup.erl
index 4143943..68e1044 100644
--- a/src/wpool_sup.erl
+++ b/src/wpool_sup.erl
@@ -23,13 +23,12 @@
%% PUBLIC API
%%-------------------------------------------------------------------
%% @doc Starts the supervisor
--spec start_link() -> {ok, pid()} | {error, {already_started, pid()} | term()}.
+-spec start_link() -> supervisor:startlink_ret().
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%% @doc Starts a new pool
--spec start_pool(wpool:name(), [wpool:option()]) ->
- {ok, pid()} | {error, {already_started, pid()} | term()}.
+-spec start_pool(wpool:name(), wpool:options()) -> supervisor:startchild_ret().
start_pool(Name, Options) ->
supervisor:start_child(?MODULE, [Name, Options]).
diff --git a/src/wpool_utils.erl b/src/wpool_utils.erl
index 25bd93a..1b5eb48 100644
--- a/src/wpool_utils.erl
+++ b/src/wpool_utils.erl
@@ -48,15 +48,16 @@ task_end(TimerRef) ->
erlang:erase(wpool_task).
%% @doc Adds default parameters to a pool configuration
--spec add_defaults([wpool:option()]) -> [wpool:option()].
-add_defaults(Opts) ->
- lists:ukeymerge(1, lists:sort(Opts), defaults()).
+-spec add_defaults([wpool:option()] | wpool:options()) -> wpool:options().
+add_defaults(Opts) when is_map(Opts) ->
+ maps:merge(defaults(), Opts);
+add_defaults(Opts) when is_list(Opts) ->
+ maps:merge(defaults(), maps:from_list(Opts)).
--spec defaults() -> [wpool:option()].
defaults() ->
- [{max_overrun_warnings, infinity},
- {overrun_handler, {error_logger, warning_report}},
- {overrun_warning, infinity},
- {queue_type, fifo},
- {worker_opt, []},
- {workers, 100}].
+ #{max_overrun_warnings => infinity,
+ overrun_handler => {error_logger, warning_report},
+ overrun_warning => infinity,
+ queue_type => fifo,
+ worker_opt => [],
+ workers => 100}.
diff --git a/test/wpool_SUITE.erl b/test/wpool_SUITE.erl
index 9abfee4..ee5ba94 100644
--- a/test/wpool_SUITE.erl
+++ b/test/wpool_SUITE.erl
@@ -28,7 +28,7 @@
-export([stats/1, stop_pool/1, non_brutal_shutdown/1, brutal_worker_shutdown/1, overrun/1,
kill_on_overrun/1, too_much_overrun/1, default_strategy/1, overrun_handler1/1,
overrun_handler2/1, default_options/1, complete_coverage/1, child_spec/1, broadcall/1,
- broadcast/1, send_request/1, worker_killed_stats/1]).
+ broadcast/1, send_request/1, worker_killed_stats/1, accepts_maps_and_lists_as_opts/1]).
-elvis([{elvis_style, no_block_expressions, disable}]).
@@ -50,7 +50,8 @@ all() ->
broadcall,
send_request,
kill_on_overrun,
- worker_killed_stats].
+ worker_killed_stats,
+ accepts_maps_and_lists_as_opts].
-spec init_per_suite(config()) -> config().
init_per_suite(Config) ->
@@ -382,6 +383,7 @@ complete_coverage(_Config) ->
ct:comment("Queue Manager"),
QMPid = get_queue_manager(PoolPid),
QMPid ! info,
+ {ok, _} = wpool_queue_manager:start_link(pool, pool_queue_manager),
{ok, _} = wpool_queue_manager:init([{pool, pool}]),
{comment, []}.
@@ -496,6 +498,23 @@ worker_killed_stats(_Config) ->
{comment, []}.
+-spec accepts_maps_and_lists_as_opts(config()) -> {comment, []}.
+accepts_maps_and_lists_as_opts(_Config) ->
+ %% Each server will take 100ms to start, but the start_sup_pool/2 call is synchronous anyway
+ {ok, PoolPidList} =
+ wpool:start_sup_pool(accepts_maps_and_lists_as_opts_list,
+ [{workers, 3}, {worker, {sleepy_server, 500}}]),
+ true = erlang:is_process_alive(PoolPidList),
+ ct:comment("accepts lists as opts"),
+
+ {ok, PoolPidMap} =
+ wpool:start_sup_pool(accepts_maps_and_lists_as_opts_map,
+ #{workers => 3, worker => {sleepy_server, 500}}),
+ true = erlang:is_process_alive(PoolPidMap),
+ ct:comment("accepts lists as opts"),
+
+ {comment, []}.
+
%% =============================================================================
%% Helpers
%% =============================================================================
diff --git a/test/wpool_pool_SUITE.erl b/test/wpool_pool_SUITE.erl
index 08ca8e5..7036d20 100644
--- a/test/wpool_pool_SUITE.erl
+++ b/test/wpool_pool_SUITE.erl
@@ -155,6 +155,9 @@ best_worker(_Config) ->
ok
end,
+ Req = wpool:send_request(Pool, {erlang, self, []}, best_worker),
+ {reply, {ok, _}} = gen_server:wait_response(Req, 5000),
+
%% Fill up their message queues...
[wpool:cast(Pool, {timer, sleep, [60000]}, next_worker) || _ <- lists:seq(1, ?WORKERS)],
[0] = ktn_task:wait_for(fun() -> worker_msg_queue_lengths(Pool) end, [0]),
@@ -208,6 +211,9 @@ next_available_worker(_Config) ->
ct:log("Wait until the first frees up..."),
1 = ktn_task:wait_for(AvailableWorkers, 1),
+ Req = wpool:send_request(Pool, {erlang, self, []}, next_available_worker),
+ {reply, {ok, _}} = gen_server:wait_response(Req, 5000),
+
ok = wpool:cast(Pool, {timer, sleep, [60000]}, next_available_worker),
ct:log("No more available workers..."),
@@ -243,6 +249,7 @@ next_worker(_Config) ->
?WORKERS =
sets:size(
sets:from_list(Res0)),
+
Res0 =
[begin
Stats = wpool:stats(Pool),
@@ -251,6 +258,9 @@ next_worker(_Config) ->
end
|| I <- lists:seq(1, ?WORKERS)],
+ Req = wpool:send_request(Pool, {erlang, self, []}, next_worker),
+ {reply, {ok, _}} = gen_server:wait_response(Req, 5000),
+
{comment, []}.
-spec random_worker(config()) -> {comment, []}.
@@ -273,6 +283,21 @@ random_worker(_Config) ->
sets:size(
sets:from_list(Serial)),
+ %% Randomly ask a lot of workers to send ourselves the atom true
+ [wpool:cast(Pool, {erlang, send, [self(), true]}, random_worker)
+ || _ <- lists:seq(1, 20 * ?WORKERS)],
+ Results =
+ [receive
+ true ->
+ true
+ end
+ || _ <- lists:seq(1, 20 * ?WORKERS)],
+ true = lists:all(fun(Value) -> true =:= Value end, Results),
+
+ %% do a gen_server:send_request/3
+ Req = wpool:send_request(Pool, {erlang, self, []}, random_worker),
+ {reply, {ok, _}} = gen_server:wait_response(Req, 5000),
+
%% Now do the same with a freshly spawned process for each request to ensure
%% randomness isn't reset with each spawn of the process_dictionary
Self = self(),
@@ -365,6 +390,9 @@ custom_worker(_Config) ->
end
|| I <- lists:seq(1, ?WORKERS)],
+ Req = wpool:send_request(Pool, {erlang, self, []}, Strategy),
+ {reply, {ok, _}} = gen_server:wait_response(Req, 5000),
+
{comment, []}.
-spec manager_crash(config()) -> {comment, []}.
@@ -526,7 +554,7 @@ mess_up_with_store(_Config) ->
true = process_flag(trap_exit, Flag),
ct:comment("And now delete the ets table altogether"),
- true = persistent_term:erase({wpool_pool, Pool}),
+ store_mess_up(Pool),
_ = wpool_pool:find_wpool(Pool),
wpool:stop(),
diff --git a/test/wpool_process_SUITE.erl b/test/wpool_process_SUITE.erl
index 1e63f8b..0f9d1f0 100644
--- a/test/wpool_process_SUITE.erl
+++ b/test/wpool_process_SUITE.erl
@@ -58,16 +58,16 @@ end_per_testcase(_TestCase, Config) ->
-spec init(config()) -> {comment, []}.
init(_Config) ->
- {error, can_not_ignore} = wpool_process:start_link(?MODULE, echo_server, ignore, []),
- {error, ?MODULE} = wpool_process:start_link(?MODULE, echo_server, {stop, ?MODULE}, []),
- {ok, _Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, []),
+ {error, can_not_ignore} = wpool_process:start_link(?MODULE, echo_server, ignore, #{}),
+ {error, ?MODULE} = wpool_process:start_link(?MODULE, echo_server, {stop, ?MODULE}, #{}),
+ {ok, _Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, #{}),
wpool_process:cast(?MODULE, {stop, normal, state}),
{comment, []}.
-spec init_timeout(config()) -> {comment, []}.
init_timeout(_Config) ->
- {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state, 0}, []),
+ {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state, 0}, #{}),
timeout = get_state(?MODULE),
Pid ! {stop, normal, state},
false = ktn_task:wait_for(fun() -> erlang:is_process_alive(Pid) end, false),
@@ -76,7 +76,7 @@ init_timeout(_Config) ->
-spec info(config()) -> {comment, []}.
info(_Config) ->
- {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, []),
+ {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, #{}),
Pid ! {noreply, newstate},
newstate = get_state(?MODULE),
Pid ! {noreply, newerstate, 1},
@@ -88,7 +88,7 @@ info(_Config) ->
-spec cast(config()) -> {comment, []}.
cast(_Config) ->
- {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, []),
+ {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, #{}),
wpool_process:cast(Pid, {noreply, newstate}),
newstate = get_state(?MODULE),
wpool_process:cast(Pid, {noreply, newerstate, 0}),
@@ -100,7 +100,7 @@ cast(_Config) ->
-spec send_request(config()) -> {comment, []}.
send_request(_Config) ->
- {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, []),
+ {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, #{}),
Req1 = wpool_process:send_request(Pid, {reply, ok1, newstate}),
ok1 = wait_response(Req1),
Req2 = wpool_process:send_request(Pid, {reply, ok2, newerstate, 1}),
@@ -127,7 +127,7 @@ continue(_Config) ->
wpool_process:start_link(?MODULE,
echo_server,
{ok, state, {continue, C(continue_state)}},
- []),
+ #{}),
continue_state = get_state(Pid),
%% handle_call/3 returns {continue, ...}
@@ -176,14 +176,14 @@ continue(_Config) ->
-spec handle_info_missing(config()) -> {comment, []}.
handle_info_missing(_Config) ->
%% sleepy_server does not implement handle_info/2
- {ok, Pid} = wpool_process:start_link(?MODULE, sleepy_server, 1, []),
+ {ok, Pid} = wpool_process:start_link(?MODULE, sleepy_server, 1, #{}),
Pid ! test,
{comment, []}.
-spec handle_info_fails(config()) -> {comment, []}.
handle_info_fails(_Config) ->
%% sleepy_server does not implement handle_info/2
- {ok, Pid} = wpool_process:start_link(?MODULE, crashy_server, {ok, state}, []),
+ {ok, Pid} = wpool_process:start_link(?MODULE, crashy_server, {ok, state}, #{}),
Pid ! undef,
false = ktn_task:wait_for(fun() -> erlang:is_process_alive(Pid) end, false),
{comment, []}.
@@ -191,7 +191,7 @@ handle_info_fails(_Config) ->
-spec format_status(config()) -> {comment, []}.
format_status(_Config) ->
%% echo_server implements format_status/1
- {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, []),
+ {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, #{}),
%% therefore it returns State as its status
state = get_state(Pid),
{comment, []}.
@@ -199,7 +199,7 @@ format_status(_Config) ->
-spec no_format_status(config()) -> {comment, []}.
no_format_status(_Config) ->
%% crashy_server doesn't implement format_status/1
- {ok, Pid} = wpool_process:start_link(?MODULE, crashy_server, state, []),
+ {ok, Pid} = wpool_process:start_link(?MODULE, crashy_server, state, #{}),
%% therefore it uses the default format for the stauts (but with the status of
%% the gen_server, not wpool_process)
state = get_state(Pid),
@@ -207,7 +207,7 @@ no_format_status(_Config) ->
-spec call(config()) -> {comment, []}.
call(_Config) ->
- {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, []),
+ {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, #{}),
ok1 = wpool_process:call(Pid, {reply, ok1, newstate}, 5000),
newstate = get_state(?MODULE),
ok2 = wpool_process:call(Pid, {reply, ok2, newerstate, 1}, 5000),
@@ -265,7 +265,7 @@ pool_norestart_crash(_Config) ->
-spec stop(config()) -> {comment, []}.
stop(_Config) ->
ct:comment("cast_call with stop/reply"),
- {ok, Pid1} = wpool_process:start_link(stopper, echo_server, {ok, state}, []),
+ {ok, Pid1} = wpool_process:start_link(stopper, echo_server, {ok, state}, #{}),
ReqId1 = wpool_process:send_request(stopper, {stop, reason, response, state}),
case gen_server:wait_response(ReqId1, 5000) of
{reply, response} ->
@@ -281,7 +281,7 @@ stop(_Config) ->
end,
ct:comment("cast_call with regular stop"),
- {ok, Pid2} = wpool_process:start_link(stopper, echo_server, {ok, state}, []),
+ {ok, Pid2} = wpool_process:start_link(stopper, echo_server, {ok, state}, #{}),
ReqId2 = wpool_process:send_request(stopper, {stop, reason, state}),
case gen_server:wait_response(ReqId2, 500) of
{error, {reason, Pid2}} ->
@@ -297,7 +297,7 @@ stop(_Config) ->
end,
ct:comment("call with regular stop"),
- {ok, Pid3} = wpool_process:start_link(stopper, echo_server, {ok, state}, []),
+ {ok, Pid3} = wpool_process:start_link(stopper, echo_server, {ok, state}, #{}),
try wpool_process:call(stopper, {noreply, state}, 100) of
_ ->
ct:fail("unexpected response")