From cc4a1c274ca59b66a15613a70d16f2f45c89a75e Mon Sep 17 00:00:00 2001 From: Nelson Vides Date: Fri, 23 Aug 2024 10:17:57 +0200 Subject: [PATCH 1/6] Improve type information --- src/wpool.erl | 9 ++++----- src/wpool_process.erl | 2 +- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/wpool.erl b/src/wpool.erl index 2b7b04c..e1190c4 100644 --- a/src/wpool.erl +++ b/src/wpool.erl @@ -121,7 +121,7 @@ %% %% These are the same as described at the `gen_server' documentation. --type worker_shutdown() :: worker_shutdown(). +-type worker_shutdown() :: brutal_kill | timeout(). %% The `shutdown' option to be used over the individual workers. %% %% Defaults to `5000'. See {@link wpool_process_sup} for more details. @@ -187,7 +187,7 @@ %% `child_spec/2', `start_pool/2', `start_sup_pool/2' are the callbacks %% that take a list of these options as a parameter. --type custom_strategy() :: fun(([atom()]) -> Atom :: atom()). +-type custom_strategy() :: fun((atom()) -> Atom :: atom()). %% A callback that gets the pool name and returns a worker's name. -type strategy() :: @@ -293,15 +293,14 @@ stop(_State) -> %% PUBLIC API %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% @equiv start_pool(Name, []) --spec start_pool(name()) -> {ok, pid()}. +-spec start_pool(name()) -> supervisor:startlink_ret(). start_pool(Name) -> start_pool(Name, []). %% @doc Starts (and links) a pool of N wpool_processes. %% The result pid belongs to a supervisor (in case you want to add it to a %% supervisor tree) --spec start_pool(name(), [option()]) -> - {ok, pid()} | {error, {already_started, pid()} | term()}. +-spec start_pool(name(), [option()]) -> supervisor:startlink_ret(). start_pool(Name, Options) -> wpool_pool:start_link(Name, wpool_utils:add_defaults(Options)). diff --git a/src/wpool_process.erl b/src/wpool_process.erl index 9105e6e..857919a 100644 --- a/src/wpool_process.erl +++ b/src/wpool_process.erl @@ -106,7 +106,7 @@ cast(Process, Cast) -> gen_server:cast(Process, Cast). %% @equiv gen_server:send_request(Process, Request) --spec send_request(wpool:name() | pid(), term()) -> term(). +-spec send_request(wpool:name() | pid(), term()) -> gen_server:request_id(). send_request(Name, Request) -> gen_server:send_request(Name, Request). From a74e2322d549d3b398536b3dc767d718ca1c4689 Mon Sep 17 00:00:00 2001 From: Nelson Vides Date: Fri, 23 Aug 2024 10:21:03 +0200 Subject: [PATCH 2/6] Fix test helper reuse --- test/wpool_pool_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/wpool_pool_SUITE.erl b/test/wpool_pool_SUITE.erl index 08ca8e5..000117e 100644 --- a/test/wpool_pool_SUITE.erl +++ b/test/wpool_pool_SUITE.erl @@ -526,7 +526,7 @@ mess_up_with_store(_Config) -> true = process_flag(trap_exit, Flag), ct:comment("And now delete the ets table altogether"), - true = persistent_term:erase({wpool_pool, Pool}), + store_mess_up(Pool), _ = wpool_pool:find_wpool(Pool), wpool:stop(), From 76f444cd62bc559038361132cf5aa6c7fcfc65fd Mon Sep 17 00:00:00 2001 From: Nelson Vides Date: Fri, 23 Aug 2024 10:25:12 +0200 Subject: [PATCH 3/6] Update rebar plugins --- rebar.config | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rebar.config b/rebar.config index 55f1e58..6a94c0b 100644 --- a/rebar.config +++ b/rebar.config @@ -36,11 +36,11 @@ %% == Dependencies and plugins == {project_plugins, - [{rebar3_hank, "~> 1.4.0"}, - {rebar3_hex, "~> 7.0.7"}, + [{rebar3_hank, "~> 1.4.1"}, + {rebar3_hex, "~> 7.0.8"}, {rebar3_format, "~> 1.3.0"}, - {rebar3_lint, "~> 3.1.0"}, - {rebar3_ex_doc, "~> 0.2.20"}, + {rebar3_lint, "~> 3.2.6"}, + {rebar3_ex_doc, "~> 0.2.23"}, {rebar3_depup, "~> 0.4.0"}, {covertool, "~> 2.0.6"}]}. From ac70aa07deaa3793fcb1743dc8edebaddf089499 Mon Sep 17 00:00:00 2001 From: Nelson Vides Date: Fri, 23 Aug 2024 11:10:03 +0200 Subject: [PATCH 4/6] Accept options as map --- src/wpool.erl | 35 +++++++++--- src/wpool_pool.erl | 104 ++++++++++++++++++----------------- src/wpool_process.erl | 13 ++--- src/wpool_process_sup.erl | 32 ++++++----- src/wpool_queue_manager.erl | 6 +- src/wpool_sup.erl | 5 +- src/wpool_utils.erl | 21 +++---- test/wpool_SUITE.erl | 22 +++++++- test/wpool_process_SUITE.erl | 32 +++++------ 9 files changed, 153 insertions(+), 117 deletions(-) diff --git a/src/wpool.erl b/src/wpool.erl index e1190c4..f889c62 100644 --- a/src/wpool.erl +++ b/src/wpool.erl @@ -187,6 +187,26 @@ %% `child_spec/2', `start_pool/2', `start_sup_pool/2' are the callbacks %% that take a list of these options as a parameter. +-type options() :: #{workers => workers(), + worker => worker(), + worker_opt => [worker_opt()], + strategy => supervisor_strategy(), + worker_shutdown => worker_shutdown(), + overrun_handler => overrun_handler(), + overrun_warning => overrun_warning(), + max_overrun_warnings => max_overrun_warnings(), + pool_sup_intensity => pool_sup_intensity(), + pool_sup_shutdown => pool_sup_shutdown(), + pool_sup_period => pool_sup_period(), + queue_type => queue_type(), + enable_callbacks => enable_callbacks(), + callbacks => callbacks(), + _ => _}. +%% Options that can be provided to a new pool. +%% +%% `child_spec/2', `start_pool/2', `start_sup_pool/2' are the callbacks +%% that take a list of these options as a parameter. + -type custom_strategy() :: fun((atom()) -> Atom :: atom()). %% A callback that gets the pool name and returns a worker's name. @@ -246,14 +266,14 @@ -type stats() :: [{pool, name()} | {supervisor, pid()} | - {options, [option()]} | + {options, [option()] | options()} | {size, non_neg_integer()} | {next_worker, pos_integer()} | {total_message_queue_len, non_neg_integer()} | {workers, [{pos_integer(), worker_stats()}]}]. %% Statistics about a given live pool. --export_type([name/0, option/0, custom_strategy/0, strategy/0, worker_stats/0, stats/0]). +-export_type([name/0, option/0, options/0, custom_strategy/0, strategy/0, worker_stats/0, stats/0]). -export([start/0, start/2, stop/0, stop/1]). -export([child_spec/2, start_pool/1, start_pool/2, start_sup_pool/1, start_sup_pool/2]). @@ -280,7 +300,7 @@ stop() -> %% BEHAVIOUR CALLBACKS %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% @private --spec start(any(), any()) -> {ok, pid()} | {error, term()}. +-spec start(any(), any()) -> supervisor:startlink_ret(). start(_StartType, _StartArgs) -> wpool_sup:start_link(). @@ -300,12 +320,12 @@ start_pool(Name) -> %% @doc Starts (and links) a pool of N wpool_processes. %% The result pid belongs to a supervisor (in case you want to add it to a %% supervisor tree) --spec start_pool(name(), [option()]) -> supervisor:startlink_ret(). +-spec start_pool(name(), [option()] | options()) -> supervisor:startlink_ret(). start_pool(Name, Options) -> wpool_pool:start_link(Name, wpool_utils:add_defaults(Options)). %% @doc Builds a child specification to pass to a supervisor. --spec child_spec(name(), [option()]) -> supervisor:child_spec(). +-spec child_spec(name(), [option()] | options()) -> supervisor:child_spec(). child_spec(Name, Options) -> FullOptions = wpool_utils:add_defaults(Options), #{id => Name, @@ -325,13 +345,12 @@ stop_pool(Name) -> end. %% @equiv start_sup_pool(Name, []) --spec start_sup_pool(name()) -> {ok, pid()} | {error, {already_started, pid()} | term()}. +-spec start_sup_pool(name()) -> supervisor:startchild_ret(). start_sup_pool(Name) -> start_sup_pool(Name, []). %% @doc Starts a pool of N wpool_processes supervised by `wpool_sup' --spec start_sup_pool(name(), [option()]) -> - {ok, pid()} | {error, {already_started, pid()} | term()}. +-spec start_sup_pool(name(), [option()] | options()) -> supervisor:startchild_ret(). start_sup_pool(Name, Options) -> wpool_sup:start_pool(Name, wpool_utils:add_defaults(Options)). diff --git a/src/wpool_pool.erl b/src/wpool_pool.erl index 3314bda..5a12aa9 100644 --- a/src/wpool_pool.erl +++ b/src/wpool_pool.erl @@ -40,7 +40,7 @@ size :: pos_integer(), next :: atomics:atomics_ref(), workers :: tuple(), - opts :: [wpool:option()], + opts :: wpool:options(), qmanager :: wpool_queue_manager:queue_mgr(), born = erlang:system_time(second) :: integer()}). @@ -53,8 +53,7 @@ %% =================================================================== %% @doc Starts a supervisor with several `wpool_process'es as its children --spec start_link(wpool:name(), [wpool:option()]) -> - {ok, pid()} | {error, {already_started, pid()} | term()}. +-spec start_link(wpool:name(), wpool:options()) -> supervisor:startlink_ret(). start_link(Name, Options) -> supervisor:start_link({local, Name}, ?MODULE, {Name, Options}). @@ -115,7 +114,7 @@ next_available_worker(Name) -> %% The timeout provided includes the time it takes to get a worker %% and for it to process the call. %% @throws no_workers | timeout --spec call_available_worker(wpool:name(), any(), timeout()) -> atom(). +-spec call_available_worker(wpool:name(), any(), timeout()) -> any(). call_available_worker(Name, Call, Timeout) -> case wpool_queue_manager:call_available_worker(queue_manager_name(Name), Call, Timeout) of noproc -> @@ -190,7 +189,10 @@ broadcall(Name, Call, Timeout) -> -spec all() -> [wpool:name()]. all() -> - [Name || {{?MODULE, Name}, _} <- persistent_term:get(), find_wpool(Name) /= undefined]. + [Name + || {{?MODULE, Name}, _} <- persistent_term:get(), + is_atom(Name), + find_wpool(Name) /= undefined]. %% @doc Retrieves the list of worker registered names. %% This can be useful to manually inspect the workers or do custom work on them. @@ -243,7 +245,7 @@ stats(Wpool, Name) -> PendingTasks = wpool_queue_manager:pending_task_count(Wpool#wpool.qmanager), [{pool, Name}, {supervisor, erlang:whereis(Name)}, - {options, lists:ukeysort(1, proplists:unfold(Wpool#wpool.opts))}, + {options, maps:to_list(Wpool#wpool.opts)}, {size, Wpool#wpool.size}, {next_worker, atomics:get(Wpool#wpool.next, 1)}, {total_message_queue_len, Total + PendingTasks}, @@ -322,48 +324,54 @@ time_checker_name(Name) -> %% Supervisor callbacks %% =================================================================== %% @private --spec init({wpool:name(), [wpool:option()]}) -> +-spec init({wpool:name(), wpool:options()}) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}. init({Name, Options}) -> - Size = proplists:get_value(workers, Options, 100), - QueueType = proplists:get_value(queue_type, Options), - OverrunHandler = - proplists:get_value(overrun_handler, Options, {error_logger, warning_report}), - TimeChecker = time_checker_name(Name), - QueueManager = queue_manager_name(Name), - ProcessSup = process_sup_name(Name), + Size = maps:get(workers, Options, 100), + QueueType = maps:get(queue_type, Options), + OverrunHandler = maps:get(overrun_handler, Options, {error_logger, warning_report}), + SupShutdown = maps:get(pool_sup_shutdown, Options, brutal_kill), + TimeCheckerName = time_checker_name(Name), + QueueManagerName = queue_manager_name(Name), + ProcessSupName = process_sup_name(Name), EventManagerName = event_manager_name(Name), _Wpool = store_wpool(Name, Size, Options), + + WorkerOpts0 = + [{queue_manager, QueueManagerName}, {time_checker, TimeCheckerName} + | maybe_event_manager(Options, {event_manager, EventManagerName})], + WorkerOpts = + maps:merge( + maps:from_list(WorkerOpts0), Options), + TimeCheckerSpec = - {TimeChecker, - {wpool_time_checker, start_link, [Name, TimeChecker, OverrunHandler]}, - permanent, - brutal_kill, - worker, - [wpool_time_checker]}, + #{id => TimeCheckerName, + start => {wpool_time_checker, start_link, [Name, TimeCheckerName, OverrunHandler]}, + restart => permanent, + shutdown => brutal_kill, + type => worker, + modules => [wpool_time_checker]}, QueueManagerSpec = - {QueueManager, - {wpool_queue_manager, start_link, [Name, QueueManager, [{queue_type, QueueType}]]}, - permanent, - brutal_kill, - worker, - [wpool_queue_manager]}, - + #{id => QueueManagerName, + start => + {wpool_queue_manager, + start_link, + [Name, QueueManagerName, [{queue_type, QueueType}]]}, + restart => permanent, + shutdown => brutal_kill, + type => worker, + modules => [wpool_queue_manager]}, EventManagerSpec = - {EventManagerName, - {gen_event, start_link, [{local, EventManagerName}]}, - permanent, - brutal_kill, - worker, - dynamic}, + #{id => EventManagerName, + start => {gen_event, start_link, [{local, EventManagerName}]}, + restart => permanent, + shutdown => brutal_kill, + type => worker, + modules => dynamic}, - SupShutdown = proplists:get_value(pool_sup_shutdown, Options, brutal_kill), - WorkerOpts = - [{queue_manager, QueueManager}, {time_checker, TimeChecker} | Options] - ++ maybe_event_manager(Options, {event_manager, EventManagerName}), ProcessSupSpec = - {ProcessSup, - {wpool_process_sup, start_link, [Name, ProcessSup, WorkerOpts]}, + {ProcessSupName, + {wpool_process_sup, start_link, [Name, ProcessSupName, WorkerOpts]}, permanent, SupShutdown, supervisor, @@ -374,8 +382,8 @@ init({Name, Options}) -> ++ maybe_event_manager(Options, EventManagerSpec) ++ [ProcessSupSpec], - SupIntensity = proplists:get_value(pool_sup_intensity, Options, 5), - SupPeriod = proplists:get_value(pool_sup_period, Options, 60), + SupIntensity = maps:get(pool_sup_intensity, Options, 5), + SupPeriod = maps:get(pool_sup_period, Options, 60), SupStrategy = #{strategy => one_for_all, intensity => SupIntensity, @@ -519,18 +527,14 @@ build_wpool(Name) -> try supervisor:count_children(process_sup_name(Name)) of Children -> Size = proplists:get_value(active, Children, 0), - store_wpool(Name, Size, []) + store_wpool(Name, Size, #{}) catch _:Error -> error_logger:warning_msg("Wpool ~p not found: ~p", [Name, Error]), undefined end. -maybe_event_manager(Options, Item) -> - EnableEventManager = proplists:get_value(enable_callbacks, Options, false), - case EnableEventManager of - true -> - [Item]; - _ -> - [] - end. +maybe_event_manager(#{enable_callbacks := true}, Item) -> + [Item]; +maybe_event_manager(_, _) -> + []. diff --git a/src/wpool_process.erl b/src/wpool_process.erl index 857919a..c28c756 100644 --- a/src/wpool_process.erl +++ b/src/wpool_process.erl @@ -64,10 +64,6 @@ -export_type([next_step/0]). --type options() :: [{time_checker | queue_manager, atom()} | wpool:option()]. - --export_type([options/0]). - %% api -export([start_link/4, call/3, cast/2, send_request/2]). @@ -85,11 +81,11 @@ %%% API %%%=================================================================== %% @doc Starts a named process --spec start_link(wpool:name(), module(), term(), options()) -> +-spec start_link(wpool:name(), module(), term(), wpool:options()) -> {ok, pid()} | ignore | {error, {already_started, pid()} | term()}. start_link(Name, Module, InitArgs, Options) -> FullOpts = wpool_utils:add_defaults(Options), - WorkerOpt = proplists:get_value(worker_opt, FullOpts, []), + WorkerOpt = maps:get(worker_opt, FullOpts, []), gen_server:start_link({local, Name}, ?MODULE, {Name, Module, InitArgs, FullOpts}, @@ -122,10 +118,9 @@ get_state(#state{state = State}) -> %%% init, terminate, code_change, info callbacks %%%=================================================================== %% @private --spec init({atom(), atom(), term(), options()}) -> +-spec init({atom(), atom(), term(), wpool:options()}) -> {ok, state()} | {ok, state(), next_step()} | {stop, can_not_ignore} | {stop, term()}. -init({Name, Mod, InitArgs, LOptions}) -> - Options = maps:from_list(LOptions), +init({Name, Mod, InitArgs, Options}) -> wpool_process_callbacks:notify(handle_init_start, Options, [Name]), CbCache = create_callback_cache(Mod), case Mod:init(InitArgs) of diff --git a/src/wpool_process_sup.erl b/src/wpool_process_sup.erl index 10dc9e6..deb49dd 100644 --- a/src/wpool_process_sup.erl +++ b/src/wpool_process_sup.erl @@ -22,37 +22,39 @@ -export([init/1]). %% @private --spec start_link(wpool:name(), atom(), [wpool:option()]) -> {ok, pid()}. +-spec start_link(wpool:name(), atom(), wpool:options()) -> supervisor:startlink_ret(). start_link(Parent, Name, Options) -> supervisor:start_link({local, Name}, ?MODULE, {Parent, Options}). %% @private --spec init({wpool:name(), [wpool:option()]}) -> +-spec init({wpool:name(), wpool:options()}) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}. init({Name, Options}) -> - Workers = proplists:get_value(workers, Options, 100), - Strategy = proplists:get_value(strategy, Options, {one_for_one, 5, 60}), + Workers = maps:get(workers, Options, 100), + Strategy = maps:get(strategy, Options, {one_for_one, 5, 60}), + WorkerShutdown = maps:get(worker_shutdown, Options, 5000), + {Worker, InitArgs} = maps:get(worker, Options, {wpool_worker, undefined}), maybe_add_event_handler(Options), - {W, IA} = proplists:get_value(worker, Options, {wpool_worker, undefined}), - {WorkerType, Worker, InitArgs} = {wpool_process, W, IA}, - WorkerShutdown = proplists:get_value(worker_shutdown, Options, 5000), WorkerSpecs = - [{wpool_pool:worker_name(Name, I), - {WorkerType, start_link, [wpool_pool:worker_name(Name, I), Worker, InitArgs, Options]}, - permanent, - WorkerShutdown, - worker, - [Worker]} + [#{id => wpool_pool:worker_name(Name, I), + start => + {wpool_process, + start_link, + [wpool_pool:worker_name(Name, I), Worker, InitArgs, Options]}, + restart => permanent, + shutdown => WorkerShutdown, + type => worker, + modules => [Worker]} || I <- lists:seq(1, Workers)], {ok, {Strategy, WorkerSpecs}}. maybe_add_event_handler(Options) -> - case proplists:get_value(event_manager, Options, undefined) of + case maps:get(event_manager, Options, undefined) of undefined -> ok; EventMgr -> lists:foreach(fun(M) -> add_initial_callback(EventMgr, M) end, - proplists:get_value(callbacks, Options, [])) + maps:get(callbacks, Options, [])) end. add_initial_callback(EventManager, Module) -> diff --git a/src/wpool_queue_manager.erl b/src/wpool_queue_manager.erl index 69074fd..7540ab6 100644 --- a/src/wpool_queue_manager.erl +++ b/src/wpool_queue_manager.erl @@ -63,13 +63,11 @@ %%%=================================================================== %%% API %%%=================================================================== --spec start_link(wpool:name(), queue_mgr()) -> - {ok, pid()} | {error, {already_started, pid()} | term()}. +-spec start_link(wpool:name(), queue_mgr()) -> gen_server:start_ret(). start_link(WPool, Name) -> start_link(WPool, Name, []). --spec start_link(wpool:name(), queue_mgr(), options()) -> - {ok, pid()} | {error, {already_started, pid()} | term()}. +-spec start_link(wpool:name(), queue_mgr(), options()) -> gen_server:start_ret(). start_link(WPool, Name, Options) -> gen_server:start_link({local, Name}, ?MODULE, [{pool, WPool} | Options], []). diff --git a/src/wpool_sup.erl b/src/wpool_sup.erl index 4143943..68e1044 100644 --- a/src/wpool_sup.erl +++ b/src/wpool_sup.erl @@ -23,13 +23,12 @@ %% PUBLIC API %%------------------------------------------------------------------- %% @doc Starts the supervisor --spec start_link() -> {ok, pid()} | {error, {already_started, pid()} | term()}. +-spec start_link() -> supervisor:startlink_ret(). start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). %% @doc Starts a new pool --spec start_pool(wpool:name(), [wpool:option()]) -> - {ok, pid()} | {error, {already_started, pid()} | term()}. +-spec start_pool(wpool:name(), wpool:options()) -> supervisor:startchild_ret(). start_pool(Name, Options) -> supervisor:start_child(?MODULE, [Name, Options]). diff --git a/src/wpool_utils.erl b/src/wpool_utils.erl index 25bd93a..1b5eb48 100644 --- a/src/wpool_utils.erl +++ b/src/wpool_utils.erl @@ -48,15 +48,16 @@ task_end(TimerRef) -> erlang:erase(wpool_task). %% @doc Adds default parameters to a pool configuration --spec add_defaults([wpool:option()]) -> [wpool:option()]. -add_defaults(Opts) -> - lists:ukeymerge(1, lists:sort(Opts), defaults()). +-spec add_defaults([wpool:option()] | wpool:options()) -> wpool:options(). +add_defaults(Opts) when is_map(Opts) -> + maps:merge(defaults(), Opts); +add_defaults(Opts) when is_list(Opts) -> + maps:merge(defaults(), maps:from_list(Opts)). --spec defaults() -> [wpool:option()]. defaults() -> - [{max_overrun_warnings, infinity}, - {overrun_handler, {error_logger, warning_report}}, - {overrun_warning, infinity}, - {queue_type, fifo}, - {worker_opt, []}, - {workers, 100}]. + #{max_overrun_warnings => infinity, + overrun_handler => {error_logger, warning_report}, + overrun_warning => infinity, + queue_type => fifo, + worker_opt => [], + workers => 100}. diff --git a/test/wpool_SUITE.erl b/test/wpool_SUITE.erl index 9abfee4..88d29f1 100644 --- a/test/wpool_SUITE.erl +++ b/test/wpool_SUITE.erl @@ -28,7 +28,7 @@ -export([stats/1, stop_pool/1, non_brutal_shutdown/1, brutal_worker_shutdown/1, overrun/1, kill_on_overrun/1, too_much_overrun/1, default_strategy/1, overrun_handler1/1, overrun_handler2/1, default_options/1, complete_coverage/1, child_spec/1, broadcall/1, - broadcast/1, send_request/1, worker_killed_stats/1]). + broadcast/1, send_request/1, worker_killed_stats/1, accepts_maps_and_lists_as_opts/1]). -elvis([{elvis_style, no_block_expressions, disable}]). @@ -50,7 +50,8 @@ all() -> broadcall, send_request, kill_on_overrun, - worker_killed_stats]. + worker_killed_stats, + accepts_maps_and_lists_as_opts]. -spec init_per_suite(config()) -> config(). init_per_suite(Config) -> @@ -496,6 +497,23 @@ worker_killed_stats(_Config) -> {comment, []}. +-spec accepts_maps_and_lists_as_opts(config()) -> {comment, []}. +accepts_maps_and_lists_as_opts(_Config) -> + %% Each server will take 100ms to start, but the start_sup_pool/2 call is synchronous anyway + {ok, PoolPidList} = + wpool:start_sup_pool(accepts_maps_and_lists_as_opts_list, + [{workers, 3}, {worker, {sleepy_server, 500}}]), + true = erlang:is_process_alive(PoolPidList), + ct:comment("accepts lists as opts"), + + {ok, PoolPidMap} = + wpool:start_sup_pool(accepts_maps_and_lists_as_opts_map, + #{workers => 3, worker => {sleepy_server, 500}}), + true = erlang:is_process_alive(PoolPidMap), + ct:comment("accepts lists as opts"), + + {comment, []}. + %% ============================================================================= %% Helpers %% ============================================================================= diff --git a/test/wpool_process_SUITE.erl b/test/wpool_process_SUITE.erl index 1e63f8b..0f9d1f0 100644 --- a/test/wpool_process_SUITE.erl +++ b/test/wpool_process_SUITE.erl @@ -58,16 +58,16 @@ end_per_testcase(_TestCase, Config) -> -spec init(config()) -> {comment, []}. init(_Config) -> - {error, can_not_ignore} = wpool_process:start_link(?MODULE, echo_server, ignore, []), - {error, ?MODULE} = wpool_process:start_link(?MODULE, echo_server, {stop, ?MODULE}, []), - {ok, _Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, []), + {error, can_not_ignore} = wpool_process:start_link(?MODULE, echo_server, ignore, #{}), + {error, ?MODULE} = wpool_process:start_link(?MODULE, echo_server, {stop, ?MODULE}, #{}), + {ok, _Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, #{}), wpool_process:cast(?MODULE, {stop, normal, state}), {comment, []}. -spec init_timeout(config()) -> {comment, []}. init_timeout(_Config) -> - {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state, 0}, []), + {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state, 0}, #{}), timeout = get_state(?MODULE), Pid ! {stop, normal, state}, false = ktn_task:wait_for(fun() -> erlang:is_process_alive(Pid) end, false), @@ -76,7 +76,7 @@ init_timeout(_Config) -> -spec info(config()) -> {comment, []}. info(_Config) -> - {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, []), + {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, #{}), Pid ! {noreply, newstate}, newstate = get_state(?MODULE), Pid ! {noreply, newerstate, 1}, @@ -88,7 +88,7 @@ info(_Config) -> -spec cast(config()) -> {comment, []}. cast(_Config) -> - {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, []), + {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, #{}), wpool_process:cast(Pid, {noreply, newstate}), newstate = get_state(?MODULE), wpool_process:cast(Pid, {noreply, newerstate, 0}), @@ -100,7 +100,7 @@ cast(_Config) -> -spec send_request(config()) -> {comment, []}. send_request(_Config) -> - {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, []), + {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, #{}), Req1 = wpool_process:send_request(Pid, {reply, ok1, newstate}), ok1 = wait_response(Req1), Req2 = wpool_process:send_request(Pid, {reply, ok2, newerstate, 1}), @@ -127,7 +127,7 @@ continue(_Config) -> wpool_process:start_link(?MODULE, echo_server, {ok, state, {continue, C(continue_state)}}, - []), + #{}), continue_state = get_state(Pid), %% handle_call/3 returns {continue, ...} @@ -176,14 +176,14 @@ continue(_Config) -> -spec handle_info_missing(config()) -> {comment, []}. handle_info_missing(_Config) -> %% sleepy_server does not implement handle_info/2 - {ok, Pid} = wpool_process:start_link(?MODULE, sleepy_server, 1, []), + {ok, Pid} = wpool_process:start_link(?MODULE, sleepy_server, 1, #{}), Pid ! test, {comment, []}. -spec handle_info_fails(config()) -> {comment, []}. handle_info_fails(_Config) -> %% sleepy_server does not implement handle_info/2 - {ok, Pid} = wpool_process:start_link(?MODULE, crashy_server, {ok, state}, []), + {ok, Pid} = wpool_process:start_link(?MODULE, crashy_server, {ok, state}, #{}), Pid ! undef, false = ktn_task:wait_for(fun() -> erlang:is_process_alive(Pid) end, false), {comment, []}. @@ -191,7 +191,7 @@ handle_info_fails(_Config) -> -spec format_status(config()) -> {comment, []}. format_status(_Config) -> %% echo_server implements format_status/1 - {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, []), + {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, #{}), %% therefore it returns State as its status state = get_state(Pid), {comment, []}. @@ -199,7 +199,7 @@ format_status(_Config) -> -spec no_format_status(config()) -> {comment, []}. no_format_status(_Config) -> %% crashy_server doesn't implement format_status/1 - {ok, Pid} = wpool_process:start_link(?MODULE, crashy_server, state, []), + {ok, Pid} = wpool_process:start_link(?MODULE, crashy_server, state, #{}), %% therefore it uses the default format for the stauts (but with the status of %% the gen_server, not wpool_process) state = get_state(Pid), @@ -207,7 +207,7 @@ no_format_status(_Config) -> -spec call(config()) -> {comment, []}. call(_Config) -> - {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, []), + {ok, Pid} = wpool_process:start_link(?MODULE, echo_server, {ok, state}, #{}), ok1 = wpool_process:call(Pid, {reply, ok1, newstate}, 5000), newstate = get_state(?MODULE), ok2 = wpool_process:call(Pid, {reply, ok2, newerstate, 1}, 5000), @@ -265,7 +265,7 @@ pool_norestart_crash(_Config) -> -spec stop(config()) -> {comment, []}. stop(_Config) -> ct:comment("cast_call with stop/reply"), - {ok, Pid1} = wpool_process:start_link(stopper, echo_server, {ok, state}, []), + {ok, Pid1} = wpool_process:start_link(stopper, echo_server, {ok, state}, #{}), ReqId1 = wpool_process:send_request(stopper, {stop, reason, response, state}), case gen_server:wait_response(ReqId1, 5000) of {reply, response} -> @@ -281,7 +281,7 @@ stop(_Config) -> end, ct:comment("cast_call with regular stop"), - {ok, Pid2} = wpool_process:start_link(stopper, echo_server, {ok, state}, []), + {ok, Pid2} = wpool_process:start_link(stopper, echo_server, {ok, state}, #{}), ReqId2 = wpool_process:send_request(stopper, {stop, reason, state}), case gen_server:wait_response(ReqId2, 500) of {error, {reason, Pid2}} -> @@ -297,7 +297,7 @@ stop(_Config) -> end, ct:comment("call with regular stop"), - {ok, Pid3} = wpool_process:start_link(stopper, echo_server, {ok, state}, []), + {ok, Pid3} = wpool_process:start_link(stopper, echo_server, {ok, state}, #{}), try wpool_process:call(stopper, {noreply, state}, 100) of _ -> ct:fail("unexpected response") From d86146ce1d550c4629fe8206e226b3ff2c1230e4 Mon Sep 17 00:00:00 2001 From: Nelson Vides Date: Fri, 23 Aug 2024 12:28:58 +0200 Subject: [PATCH 5/6] Use fully-qualified function names --- src/wpool.erl | 91 +++++++++++++++++++++++++++++---------------------- 1 file changed, 52 insertions(+), 39 deletions(-) diff --git a/src/wpool.erl b/src/wpool.erl index f889c62..e3bac4e 100644 --- a/src/wpool.erl +++ b/src/wpool.erl @@ -387,14 +387,18 @@ call(Sup, Call, Strategy) -> -spec call(name(), term(), strategy(), timeout()) -> term(). call(Sup, Call, available_worker, Timeout) -> wpool_pool:call_available_worker(Sup, Call, Timeout); +call(Sup, Call, next_available_worker, Timeout) -> + wpool_process:call(wpool_pool:next_available_worker(Sup), Call, Timeout); +call(Sup, Call, next_worker, Timeout) -> + wpool_process:call(wpool_pool:next_worker(Sup), Call, Timeout); +call(Sup, Call, random_worker, Timeout) -> + wpool_process:call(wpool_pool:random_worker(Sup), Call, Timeout); +call(Sup, Call, best_worker, Timeout) -> + wpool_process:call(wpool_pool:best_worker(Sup), Call, Timeout); call(Sup, Call, {hash_worker, HashKey}, Timeout) -> - wpool_process:call( - wpool_pool:hash_worker(Sup, HashKey), Call, Timeout); -call(Sup, Call, Fun, Timeout) when is_function(Fun) -> - wpool_process:call(Fun(Sup), Call, Timeout); -call(Sup, Call, Strategy, Timeout) -> - wpool_process:call( - wpool_pool:Strategy(Sup), Call, Timeout). + wpool_process:call(wpool_pool:hash_worker(Sup, HashKey), Call, Timeout); +call(Sup, Call, Fun, Timeout) when is_function(Fun, 1) -> + wpool_process:call(Fun(Sup), Call, Timeout). %% @equiv cast(Sup, Cast, default_strategy()) -spec cast(name(), term()) -> ok. @@ -405,14 +409,18 @@ cast(Sup, Cast) -> -spec cast(name(), term(), strategy()) -> ok. cast(Sup, Cast, available_worker) -> wpool_pool:cast_to_available_worker(Sup, Cast); +cast(Sup, Cast, next_available_worker) -> + wpool_process:cast(wpool_pool:next_available_worker(Sup), Cast); +cast(Sup, Cast, next_worker) -> + wpool_process:cast(wpool_pool:next_worker(Sup), Cast); +cast(Sup, Cast, random_worker) -> + wpool_process:cast(wpool_pool:random_worker(Sup), Cast); +cast(Sup, Cast, best_worker) -> + wpool_process:cast(wpool_pool:best_worker(Sup), Cast); cast(Sup, Cast, {hash_worker, HashKey}) -> - wpool_process:cast( - wpool_pool:hash_worker(Sup, HashKey), Cast); -cast(Sup, Cast, Fun) when is_function(Fun) -> - wpool_process:cast(Fun(Sup), Cast); -cast(Sup, Cast, Strategy) -> - wpool_process:cast( - wpool_pool:Strategy(Sup), Cast). + wpool_process:cast(wpool_pool:hash_worker(Sup, HashKey), Cast); +cast(Sup, Cast, Fun) when is_function(Fun, 1) -> + wpool_process:cast(Fun(Sup), Cast). %% @equiv send_request(Sup, Call, default_strategy(), 5000) -spec send_request(name(), term()) -> noproc | timeout | gen_server:request_id(). @@ -431,14 +439,37 @@ send_request(Sup, Call, Strategy) -> noproc | timeout | gen_server:request_id(). send_request(Sup, Call, available_worker, Timeout) -> wpool_pool:send_request_available_worker(Sup, Call, Timeout); +send_request(Sup, Call, next_available_worker, _Timeout) -> + wpool_process:send_request(wpool_pool:next_available_worker(Sup), Call); +send_request(Sup, Call, next_worker, _Timeout) -> + wpool_process:send_request(wpool_pool:next_worker(Sup), Call); +send_request(Sup, Call, random_worker, _Timeout) -> + wpool_process:send_request(wpool_pool:random_worker(Sup), Call); +send_request(Sup, Call, best_worker, _Timeout) -> + wpool_process:send_request(wpool_pool:best_worker(Sup), Call); send_request(Sup, Call, {hash_worker, HashKey}, _Timeout) -> - wpool_process:send_request( - wpool_pool:hash_worker(Sup, HashKey), Call); -send_request(Sup, Call, Fun, _Timeout) when is_function(Fun) -> - wpool_process:send_request(Fun(Sup), Call); -send_request(Sup, Call, Strategy, _Timeout) -> - wpool_process:send_request( - wpool_pool:Strategy(Sup), Call). + wpool_process:send_request(wpool_pool:hash_worker(Sup, HashKey), Call); +send_request(Sup, Call, Fun, _Timeout) when is_function(Fun, 1) -> + wpool_process:send_request(Fun(Sup), Call). + + +%% @doc Casts a message to all the workers within the given pool. +%% +%% NOTE: These messages don't get queued, they go straight to the worker's message queues, so +%% if you're using available_worker strategy to balance the charge and you have some tasks queued up +%% waiting for the next available worker, the broadcast will reach all the workers before the +%% queued up tasks. +-spec broadcast(wpool:name(), term()) -> ok. +broadcast(Sup, Cast) -> + wpool_pool:broadcast(Sup, Cast). + +%% @doc Calls all the workers within the given pool async and waits for the responses synchronously. +%% +%% If one worker times out, the entire call is considered timed-out. +-spec broadcall(wpool:name(), term(), timeout()) -> + {[Replies :: term()], [Errors :: term()]}. +broadcall(Sup, Call, Timeout) -> + wpool_pool:broadcall(Sup, Call, Timeout). %% @doc Retrieves a snapshot of statistics for all pools. %% @@ -459,21 +490,3 @@ stats(Sup) -> -spec get_workers(name()) -> [atom()]. get_workers(Sup) -> wpool_pool:get_workers(Sup). - -%% @doc Casts a message to all the workers within the given pool. -%% -%% NOTE: These messages don't get queued, they go straight to the worker's message queues, so -%% if you're using available_worker strategy to balance the charge and you have some tasks queued up -%% waiting for the next available worker, the broadcast will reach all the workers before the -%% queued up tasks. --spec broadcast(wpool:name(), term()) -> ok. -broadcast(Sup, Cast) -> - wpool_pool:broadcast(Sup, Cast). - -%% @doc Calls all the workers within the given pool async and waits for the responses synchronously. -%% -%% If one worker times out, the entire call is considered timed-out. --spec broadcall(wpool:name(), term(), timeout()) -> - {[Replies :: term()], [Errors :: term()]}. -broadcall(Sup, Call, Timeout) -> - wpool_pool:broadcall(Sup, Call, Timeout). From 30fa495c04d03f1bc2d48752bd681fa577f7dee9 Mon Sep 17 00:00:00 2001 From: Nelson Vides Date: Fri, 23 Aug 2024 17:42:55 +0200 Subject: [PATCH 6/6] Increase test coverage --- test/wpool_SUITE.erl | 1 + test/wpool_pool_SUITE.erl | 28 ++++++++++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/test/wpool_SUITE.erl b/test/wpool_SUITE.erl index 88d29f1..ee5ba94 100644 --- a/test/wpool_SUITE.erl +++ b/test/wpool_SUITE.erl @@ -383,6 +383,7 @@ complete_coverage(_Config) -> ct:comment("Queue Manager"), QMPid = get_queue_manager(PoolPid), QMPid ! info, + {ok, _} = wpool_queue_manager:start_link(pool, pool_queue_manager), {ok, _} = wpool_queue_manager:init([{pool, pool}]), {comment, []}. diff --git a/test/wpool_pool_SUITE.erl b/test/wpool_pool_SUITE.erl index 000117e..7036d20 100644 --- a/test/wpool_pool_SUITE.erl +++ b/test/wpool_pool_SUITE.erl @@ -155,6 +155,9 @@ best_worker(_Config) -> ok end, + Req = wpool:send_request(Pool, {erlang, self, []}, best_worker), + {reply, {ok, _}} = gen_server:wait_response(Req, 5000), + %% Fill up their message queues... [wpool:cast(Pool, {timer, sleep, [60000]}, next_worker) || _ <- lists:seq(1, ?WORKERS)], [0] = ktn_task:wait_for(fun() -> worker_msg_queue_lengths(Pool) end, [0]), @@ -208,6 +211,9 @@ next_available_worker(_Config) -> ct:log("Wait until the first frees up..."), 1 = ktn_task:wait_for(AvailableWorkers, 1), + Req = wpool:send_request(Pool, {erlang, self, []}, next_available_worker), + {reply, {ok, _}} = gen_server:wait_response(Req, 5000), + ok = wpool:cast(Pool, {timer, sleep, [60000]}, next_available_worker), ct:log("No more available workers..."), @@ -243,6 +249,7 @@ next_worker(_Config) -> ?WORKERS = sets:size( sets:from_list(Res0)), + Res0 = [begin Stats = wpool:stats(Pool), @@ -251,6 +258,9 @@ next_worker(_Config) -> end || I <- lists:seq(1, ?WORKERS)], + Req = wpool:send_request(Pool, {erlang, self, []}, next_worker), + {reply, {ok, _}} = gen_server:wait_response(Req, 5000), + {comment, []}. -spec random_worker(config()) -> {comment, []}. @@ -273,6 +283,21 @@ random_worker(_Config) -> sets:size( sets:from_list(Serial)), + %% Randomly ask a lot of workers to send ourselves the atom true + [wpool:cast(Pool, {erlang, send, [self(), true]}, random_worker) + || _ <- lists:seq(1, 20 * ?WORKERS)], + Results = + [receive + true -> + true + end + || _ <- lists:seq(1, 20 * ?WORKERS)], + true = lists:all(fun(Value) -> true =:= Value end, Results), + + %% do a gen_server:send_request/3 + Req = wpool:send_request(Pool, {erlang, self, []}, random_worker), + {reply, {ok, _}} = gen_server:wait_response(Req, 5000), + %% Now do the same with a freshly spawned process for each request to ensure %% randomness isn't reset with each spawn of the process_dictionary Self = self(), @@ -365,6 +390,9 @@ custom_worker(_Config) -> end || I <- lists:seq(1, ?WORKERS)], + Req = wpool:send_request(Pool, {erlang, self, []}, Strategy), + {reply, {ok, _}} = gen_server:wait_response(Req, 5000), + {comment, []}. -spec manager_crash(config()) -> {comment, []}.