Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use maps as opts (and other small cosmetic changes) #207

Merged
merged 6 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@
%% == Dependencies and plugins ==

{project_plugins,
[{rebar3_hank, "~> 1.4.0"},
{rebar3_hex, "~> 7.0.7"},
[{rebar3_hank, "~> 1.4.1"},
{rebar3_hex, "~> 7.0.8"},
{rebar3_format, "~> 1.3.0"},
{rebar3_lint, "~> 3.1.0"},
{rebar3_ex_doc, "~> 0.2.20"},
{rebar3_lint, "~> 3.2.6"},
{rebar3_ex_doc, "~> 0.2.23"},
{rebar3_depup, "~> 0.4.0"},
{covertool, "~> 2.0.6"}]}.

Expand Down
133 changes: 82 additions & 51 deletions src/wpool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@
%%
%% These are the same as described at the `gen_server' documentation.

-type worker_shutdown() :: worker_shutdown().
-type worker_shutdown() :: brutal_kill | timeout().
%% The `shutdown' option to be used over the individual workers.
%%
%% Defaults to `5000'. See {@link wpool_process_sup} for more details.
Expand Down Expand Up @@ -187,7 +187,27 @@
%% `child_spec/2', `start_pool/2', `start_sup_pool/2' are the callbacks
%% that take a list of these options as a parameter.

-type custom_strategy() :: fun(([atom()]) -> Atom :: atom()).
-type options() :: #{workers => workers(),
worker => worker(),
worker_opt => [worker_opt()],
strategy => supervisor_strategy(),
worker_shutdown => worker_shutdown(),
overrun_handler => overrun_handler(),
overrun_warning => overrun_warning(),
max_overrun_warnings => max_overrun_warnings(),
pool_sup_intensity => pool_sup_intensity(),
pool_sup_shutdown => pool_sup_shutdown(),
pool_sup_period => pool_sup_period(),
queue_type => queue_type(),
enable_callbacks => enable_callbacks(),
callbacks => callbacks(),
_ => _}.
%% Options that can be provided to a new pool.
%%
%% `child_spec/2', `start_pool/2', `start_sup_pool/2' are the callbacks
%% that take a list of these options as a parameter.

-type custom_strategy() :: fun((atom()) -> Atom :: atom()).
%% A callback that gets the pool name and returns a worker's name.

-type strategy() ::
Expand Down Expand Up @@ -246,14 +266,14 @@
-type stats() ::
[{pool, name()} |
{supervisor, pid()} |
{options, [option()]} |
{options, [option()] | options()} |
{size, non_neg_integer()} |
{next_worker, pos_integer()} |
{total_message_queue_len, non_neg_integer()} |
{workers, [{pos_integer(), worker_stats()}]}].
%% Statistics about a given live pool.

-export_type([name/0, option/0, custom_strategy/0, strategy/0, worker_stats/0, stats/0]).
-export_type([name/0, option/0, options/0, custom_strategy/0, strategy/0, worker_stats/0, stats/0]).

-export([start/0, start/2, stop/0, stop/1]).
-export([child_spec/2, start_pool/1, start_pool/2, start_sup_pool/1, start_sup_pool/2]).
Expand All @@ -280,7 +300,7 @@ stop() ->
%% BEHAVIOUR CALLBACKS
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% @private
-spec start(any(), any()) -> {ok, pid()} | {error, term()}.
-spec start(any(), any()) -> supervisor:startlink_ret().
start(_StartType, _StartArgs) ->
wpool_sup:start_link().

Expand All @@ -293,20 +313,19 @@ stop(_State) ->
%% PUBLIC API
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% @equiv start_pool(Name, [])
-spec start_pool(name()) -> {ok, pid()}.
-spec start_pool(name()) -> supervisor:startlink_ret().
start_pool(Name) ->
start_pool(Name, []).

%% @doc Starts (and links) a pool of N wpool_processes.
%% The result pid belongs to a supervisor (in case you want to add it to a
%% supervisor tree)
-spec start_pool(name(), [option()]) ->
{ok, pid()} | {error, {already_started, pid()} | term()}.
-spec start_pool(name(), [option()] | options()) -> supervisor:startlink_ret().
start_pool(Name, Options) ->
wpool_pool:start_link(Name, wpool_utils:add_defaults(Options)).

%% @doc Builds a child specification to pass to a supervisor.
-spec child_spec(name(), [option()]) -> supervisor:child_spec().
-spec child_spec(name(), [option()] | options()) -> supervisor:child_spec().
child_spec(Name, Options) ->
FullOptions = wpool_utils:add_defaults(Options),
#{id => Name,
Expand All @@ -326,13 +345,12 @@ stop_pool(Name) ->
end.

%% @equiv start_sup_pool(Name, [])
-spec start_sup_pool(name()) -> {ok, pid()} | {error, {already_started, pid()} | term()}.
-spec start_sup_pool(name()) -> supervisor:startchild_ret().
start_sup_pool(Name) ->
start_sup_pool(Name, []).

%% @doc Starts a pool of N wpool_processes supervised by `wpool_sup'
-spec start_sup_pool(name(), [option()]) ->
{ok, pid()} | {error, {already_started, pid()} | term()}.
-spec start_sup_pool(name(), [option()] | options()) -> supervisor:startchild_ret().
start_sup_pool(Name, Options) ->
wpool_sup:start_pool(Name, wpool_utils:add_defaults(Options)).

Expand Down Expand Up @@ -369,14 +387,18 @@ call(Sup, Call, Strategy) ->
-spec call(name(), term(), strategy(), timeout()) -> term().
call(Sup, Call, available_worker, Timeout) ->
wpool_pool:call_available_worker(Sup, Call, Timeout);
call(Sup, Call, next_available_worker, Timeout) ->
wpool_process:call(wpool_pool:next_available_worker(Sup), Call, Timeout);
call(Sup, Call, next_worker, Timeout) ->
wpool_process:call(wpool_pool:next_worker(Sup), Call, Timeout);
call(Sup, Call, random_worker, Timeout) ->
wpool_process:call(wpool_pool:random_worker(Sup), Call, Timeout);
call(Sup, Call, best_worker, Timeout) ->
wpool_process:call(wpool_pool:best_worker(Sup), Call, Timeout);
call(Sup, Call, {hash_worker, HashKey}, Timeout) ->
wpool_process:call(
wpool_pool:hash_worker(Sup, HashKey), Call, Timeout);
call(Sup, Call, Fun, Timeout) when is_function(Fun) ->
wpool_process:call(Fun(Sup), Call, Timeout);
call(Sup, Call, Strategy, Timeout) ->
wpool_process:call(
wpool_pool:Strategy(Sup), Call, Timeout).
wpool_process:call(wpool_pool:hash_worker(Sup, HashKey), Call, Timeout);
call(Sup, Call, Fun, Timeout) when is_function(Fun, 1) ->
wpool_process:call(Fun(Sup), Call, Timeout).

%% @equiv cast(Sup, Cast, default_strategy())
-spec cast(name(), term()) -> ok.
Expand All @@ -387,14 +409,18 @@ cast(Sup, Cast) ->
-spec cast(name(), term(), strategy()) -> ok.
cast(Sup, Cast, available_worker) ->
wpool_pool:cast_to_available_worker(Sup, Cast);
cast(Sup, Cast, next_available_worker) ->
wpool_process:cast(wpool_pool:next_available_worker(Sup), Cast);
cast(Sup, Cast, next_worker) ->
wpool_process:cast(wpool_pool:next_worker(Sup), Cast);
cast(Sup, Cast, random_worker) ->
wpool_process:cast(wpool_pool:random_worker(Sup), Cast);
cast(Sup, Cast, best_worker) ->
wpool_process:cast(wpool_pool:best_worker(Sup), Cast);
cast(Sup, Cast, {hash_worker, HashKey}) ->
wpool_process:cast(
wpool_pool:hash_worker(Sup, HashKey), Cast);
cast(Sup, Cast, Fun) when is_function(Fun) ->
wpool_process:cast(Fun(Sup), Cast);
cast(Sup, Cast, Strategy) ->
wpool_process:cast(
wpool_pool:Strategy(Sup), Cast).
wpool_process:cast(wpool_pool:hash_worker(Sup, HashKey), Cast);
cast(Sup, Cast, Fun) when is_function(Fun, 1) ->
wpool_process:cast(Fun(Sup), Cast).

%% @equiv send_request(Sup, Call, default_strategy(), 5000)
-spec send_request(name(), term()) -> noproc | timeout | gen_server:request_id().
Expand All @@ -413,14 +439,37 @@ send_request(Sup, Call, Strategy) ->
noproc | timeout | gen_server:request_id().
send_request(Sup, Call, available_worker, Timeout) ->
wpool_pool:send_request_available_worker(Sup, Call, Timeout);
send_request(Sup, Call, next_available_worker, _Timeout) ->
wpool_process:send_request(wpool_pool:next_available_worker(Sup), Call);
send_request(Sup, Call, next_worker, _Timeout) ->
wpool_process:send_request(wpool_pool:next_worker(Sup), Call);
send_request(Sup, Call, random_worker, _Timeout) ->
wpool_process:send_request(wpool_pool:random_worker(Sup), Call);
send_request(Sup, Call, best_worker, _Timeout) ->
wpool_process:send_request(wpool_pool:best_worker(Sup), Call);
send_request(Sup, Call, {hash_worker, HashKey}, _Timeout) ->
wpool_process:send_request(
wpool_pool:hash_worker(Sup, HashKey), Call);
send_request(Sup, Call, Fun, _Timeout) when is_function(Fun) ->
wpool_process:send_request(Fun(Sup), Call);
send_request(Sup, Call, Strategy, _Timeout) ->
wpool_process:send_request(
wpool_pool:Strategy(Sup), Call).
wpool_process:send_request(wpool_pool:hash_worker(Sup, HashKey), Call);
send_request(Sup, Call, Fun, _Timeout) when is_function(Fun, 1) ->
wpool_process:send_request(Fun(Sup), Call).


%% @doc Casts a message to all the workers within the given pool.
%%
%% <b>NOTE:</b> These messages don't get queued, they go straight to the worker's message queues, so
%% if you're using available_worker strategy to balance the charge and you have some tasks queued up
%% waiting for the next available worker, the broadcast will reach all the workers <b>before</b> the
%% queued up tasks.
-spec broadcast(wpool:name(), term()) -> ok.
broadcast(Sup, Cast) ->
wpool_pool:broadcast(Sup, Cast).

%% @doc Calls all the workers within the given pool async and waits for the responses synchronously.
%%
%% If one worker times out, the entire call is considered timed-out.
-spec broadcall(wpool:name(), term(), timeout()) ->
{[Replies :: term()], [Errors :: term()]}.
broadcall(Sup, Call, Timeout) ->
wpool_pool:broadcall(Sup, Call, Timeout).

%% @doc Retrieves a snapshot of statistics for all pools.
%%
Expand All @@ -441,21 +490,3 @@ stats(Sup) ->
-spec get_workers(name()) -> [atom()].
get_workers(Sup) ->
wpool_pool:get_workers(Sup).

%% @doc Casts a message to all the workers within the given pool.
%%
%% <b>NOTE:</b> These messages don't get queued, they go straight to the worker's message queues, so
%% if you're using available_worker strategy to balance the charge and you have some tasks queued up
%% waiting for the next available worker, the broadcast will reach all the workers <b>before</b> the
%% queued up tasks.
-spec broadcast(wpool:name(), term()) -> ok.
broadcast(Sup, Cast) ->
wpool_pool:broadcast(Sup, Cast).

%% @doc Calls all the workers within the given pool async and waits for the responses synchronously.
%%
%% If one worker times out, the entire call is considered timed-out.
-spec broadcall(wpool:name(), term(), timeout()) ->
{[Replies :: term()], [Errors :: term()]}.
broadcall(Sup, Call, Timeout) ->
wpool_pool:broadcall(Sup, Call, Timeout).
Loading
Loading