diff --git a/src/wpool.erl b/src/wpool.erl index bbebfa3..126a849 100644 --- a/src/wpool.erl +++ b/src/wpool.erl @@ -18,14 +18,6 @@ -behaviour(application). --define(DEFAULTS, - [{overrun_warning, infinity}, - {max_overrun_warnings, infinity}, - {overrun_handler, {error_logger, warning_report}}, - {workers, 100}, - {worker_opt, []}, - {queue_type, fifo}]). - %% Copied from gen.erl -type debug_flag() :: trace | log | statistics | debug | {logfile, string()}. -type gen_option() :: @@ -115,7 +107,7 @@ start_pool(Name) -> -spec start_pool(name(), [option()]) -> {ok, pid()} | {error, {already_started, pid()} | term()}. start_pool(Name, Options) -> - wpool_pool:start_link(Name, all_opts(Options)). + wpool_pool:start_link(Name, wpool_utils:add_defaults(Options)). %% @doc Stops the pool -spec stop_pool(name()) -> true. @@ -136,7 +128,7 @@ start_sup_pool(Name) -> -spec start_sup_pool(name(), [option()]) -> {ok, pid()} | {error, {already_started, pid()} | term()}. start_sup_pool(Name, Options) -> - wpool_sup:start_pool(Name, all_opts(Options)). + wpool_sup:start_pool(Name, wpool_utils:add_defaults(Options)). %% @doc Stops the pool -spec stop_sup_pool(name()) -> ok. @@ -243,9 +235,3 @@ get_workers(Sup) -> -spec broadcast(wpool:name(), term()) -> ok. broadcast(Sup, Cast) -> wpool_pool:broadcast(Sup, Cast). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% PRIVATE -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -all_opts(Options) -> - Options ++ ?DEFAULTS. diff --git a/src/wpool_process.erl b/src/wpool_process.erl index 15153d5..7e8a8bf 100644 --- a/src/wpool_process.erl +++ b/src/wpool_process.erl @@ -22,11 +22,16 @@ {name :: atom(), mod :: atom(), state :: term(), - options :: [{time_checker | queue_manager, atom()} | wpool:option()]}). + options :: + #{time_checker := atom(), + queue_manager := atom(), + overrun_warning := timeout(), + _ => _}}). -type state() :: #state{}. -type from() :: {pid(), reference()}. -type next_step() :: timeout() | hibernate | {continue, term()}. +-type options() :: [{time_checker | queue_manager, atom()} | wpool:option()]. %% api -export([start_link/4, call/3, cast/2, send_request/2]). @@ -38,13 +43,14 @@ %%% API %%%=================================================================== %% @doc Starts a named process --spec start_link(wpool:name(), module(), term(), [wpool:option()]) -> +-spec start_link(wpool:name(), module(), term(), options()) -> {ok, pid()} | ignore | {error, {already_started, pid()} | term()}. start_link(Name, Module, InitArgs, Options) -> - WorkerOpt = proplists:get_value(worker_opt, Options, []), + FullOpts = wpool_utils:add_defaults(Options), + WorkerOpt = proplists:get_value(worker_opt, FullOpts, []), gen_server:start_link({local, Name}, ?MODULE, - {Name, Module, InitArgs, Options}, + {Name, Module, InitArgs, FullOpts}, WorkerOpt). %% @equiv gen_server:call(Process, Call, Timeout) @@ -66,9 +72,10 @@ send_request(Name, Request) -> %%% init, terminate, code_change, info callbacks %%%=================================================================== %% @private --spec init({atom(), atom(), term(), [wpool:option()]}) -> +-spec init({atom(), atom(), term(), options()}) -> {ok, state()} | {ok, state(), next_step()} | {stop, can_not_ignore} | {stop, term()}. -init({Name, Mod, InitArgs, Options}) -> +init({Name, Mod, InitArgs, LOptions}) -> + Options = maps:from_list(LOptions), wpool_process_callbacks:notify(handle_init_start, Options, [Name]), case Mod:init(InitArgs) of {ok, ModState} -> @@ -180,15 +187,9 @@ format_status(Opt, [PDict, State]) -> %% @private -spec handle_cast(term(), state()) -> {noreply, state()} | {noreply, state(), next_step()} | {stop, term(), state()}. -handle_cast(Cast, State) -> - Task = - wpool_utils:task_init({cast, Cast}, - proplists:get_value(time_checker, State#state.options, undefined), - proplists:get_value(overrun_warning, State#state.options, infinity), - proplists:get_value(max_overrun_warnings, - State#state.options, - infinity)), - ok = notify_queue_manager(worker_busy, State#state.name, State#state.options), +handle_cast(Cast, #state{options = Options} = State) -> + Task = wpool_utils:task_init({cast, Cast}, Options), + ok = notify_queue_manager(worker_busy, State#state.name, Options), Reply = try (State#state.mod):handle_cast(Cast, State#state.state) of {noreply, NewState} -> @@ -206,7 +207,7 @@ handle_cast(Cast, State) -> {stop, Reason, State#state{state = NewState}} end, wpool_utils:task_end(Task), - ok = notify_queue_manager(worker_ready, State#state.name, State#state.options), + ok = notify_queue_manager(worker_ready, State#state.name, Options), Reply. %% @private @@ -217,15 +218,9 @@ handle_cast(Cast, State) -> {noreply, state(), next_step()} | {stop, term(), term(), state()} | {stop, term(), state()}. -handle_call(Call, From, State) -> - Task = - wpool_utils:task_init({call, Call}, - proplists:get_value(time_checker, State#state.options, undefined), - proplists:get_value(overrun_warning, State#state.options, infinity), - proplists:get_value(max_overrun_warnings, - State#state.options, - infinity)), - ok = notify_queue_manager(worker_busy, State#state.name, State#state.options), +handle_call(Call, From, #state{options = Options} = State) -> + Task = wpool_utils:task_init({call, Call}, Options), + ok = notify_queue_manager(worker_busy, State#state.name, Options), Reply = try (State#state.mod):handle_call(Call, From, State#state.state) of {noreply, NewState} -> @@ -255,13 +250,10 @@ handle_call(Call, From, State) -> {stop, Reason, Response, State#state{state = NewState}} end, wpool_utils:task_end(Task), - ok = notify_queue_manager(worker_ready, State#state.name, State#state.options), + ok = notify_queue_manager(worker_ready, State#state.name, Options), Reply. -notify_queue_manager(Function, Name, Options) -> - case proplists:get_value(queue_manager, Options) of - undefined -> - ok; - QueueManager -> - wpool_queue_manager:Function(QueueManager, Name) - end. +notify_queue_manager(Function, Name, #{queue_manager := QueueManager}) -> + wpool_queue_manager:Function(QueueManager, Name); +notify_queue_manager(_, _, _) -> + ok. diff --git a/src/wpool_process_callbacks.erl b/src/wpool_process_callbacks.erl index 5ed7623..6f59897 100644 --- a/src/wpool_process_callbacks.erl +++ b/src/wpool_process_callbacks.erl @@ -35,14 +35,11 @@ handle_call(Msg, State) -> {ok, {error, {unexpected_call, Msg}}, State}. %% @doc Sends a notification to all registered callback modules. --spec notify(event(), [wpool:option()], [any()]) -> ok. -notify(Event, Options, Args) -> - case lists:keyfind(event_manager, 1, Options) of - {event_manager, EventMgr} -> - gen_event:notify(EventMgr, {Event, Args}); - _ -> - ok - end. +-spec notify(event(), #{event_manager := any(), _ => _}, [any()]) -> ok. +notify(Event, #{event_manager := EventMgr}, Args) -> + gen_event:notify(EventMgr, {Event, Args}); +notify(_, _, _) -> + ok. %% @doc Adds a callback module. -spec add_callback_module(wpool:name(), module()) -> ok | {error, any()}. diff --git a/src/wpool_queue_manager.erl b/src/wpool_queue_manager.erl index 1afba3f..add2e12 100644 --- a/src/wpool_queue_manager.erl +++ b/src/wpool_queue_manager.erl @@ -27,7 +27,7 @@ {wpool :: wpool:name(), clients :: queue:queue({cast | {pid(), _}, term()}), workers :: gb_sets:set(atom()), - monitors :: gb_trees:tree(atom(), monitored_from()), + monitors :: #{atom() := monitored_from()}, queue_type :: queue_type()}). -type state() :: #state{}. @@ -128,7 +128,7 @@ init(Args) -> #state{wpool = WPool, clients = queue:new(), workers = gb_sets:new(), - monitors = gb_trees:empty(), + monitors = #{}, queue_type = QueueType}}. -type worker_event() :: new_worker | worker_dead | worker_busy | worker_ready. @@ -149,12 +149,11 @@ handle_cast({worker_ready, Worker}, State0) -> queue_type = QueueType} = State0, State = - case gb_trees:is_defined(Worker, Mons) of - true -> - {Ref, _Client} = gb_trees:get(Worker, Mons), + case Mons of + #{Worker := {Ref, _Client}} -> demonitor(Ref, [flush]), - State0#state{monitors = gb_trees:delete(Worker, Mons)}; - false -> + State0#state{monitors = maps:remove(Worker, Mons)}; + _ -> State0 end, case queue_out(Clients, QueueType) of @@ -218,12 +217,11 @@ handle_call(pending_task_count, _From, State) -> handle_info({'DOWN', Ref, Type, {Worker, _Node}, Exit}, State) -> handle_info({'DOWN', Ref, Type, Worker, Exit}, State); handle_info({'DOWN', _, _, Worker, Exit}, State = #state{monitors = Mons}) -> - case gb_trees:is_defined(Worker, Mons) of - true -> - {_Ref, Client} = gb_trees:get(Worker, Mons), + case Mons of + #{Worker := {_Ref, Client}} -> gen_server:reply(Client, {'EXIT', Worker, Exit}), - {noreply, State#state{monitors = gb_trees:delete(Worker, Mons)}}; - false -> + {noreply, State#state{monitors = maps:remove(Worker, Mons)}}; + _ -> {noreply, State} end; handle_info(_Info, State) -> @@ -286,7 +284,7 @@ now_in_milliseconds() -> monitor_worker(Worker, Client, State = #state{monitors = Mons}) -> Ref = monitor(process, Worker), - State#state{monitors = gb_trees:enter(Worker, {Ref, Client}, Mons)}. + State#state{monitors = maps:put(Worker, {Ref, Client}, Mons)}. queue_out(Clients, fifo) -> queue:out(Clients); diff --git a/src/wpool_utils.erl b/src/wpool_utils.erl index c9bb9c0..dbb4c78 100644 --- a/src/wpool_utils.erl +++ b/src/wpool_utils.erl @@ -18,26 +18,25 @@ -author('ferigis@gmail.com'). %% API --export([task_init/4, task_end/1]). +-export([task_init/2, task_end/1, add_defaults/1]). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Api %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% @doc Marks Task as started in this worker --spec task_init(term(), atom(), infinity | pos_integer(), infinity | pos_integer()) -> +-spec task_init(term(), #{overrun_warning := timeout(), _ => _}) -> undefined | reference(). -task_init(Task, _TimeChecker, infinity, _MaxWarnings) -> - Time = - calendar:datetime_to_gregorian_seconds( - calendar:universal_time()), +task_init(Task, #{overrun_warning := infinity}) -> + Time = erlang:system_time(second), erlang:put(wpool_task, {undefined, Time, Task}), undefined; -task_init(Task, TimeChecker, OverrunTime, MaxWarnings) -> +task_init(Task, + #{overrun_warning := OverrunTime, + time_checker := TimeChecker, + max_overrun_warnings := MaxWarnings}) -> TaskId = erlang:make_ref(), - Time = - calendar:datetime_to_gregorian_seconds( - calendar:universal_time()), + Time = erlang:system_time(second), erlang:put(wpool_task, {TaskId, Time, Task}), erlang:send_after(OverrunTime, TimeChecker, @@ -50,3 +49,16 @@ task_end(undefined) -> task_end(TimerRef) -> _ = erlang:cancel_timer(TimerRef), erlang:erase(wpool_task). + +-spec add_defaults([wpool:option()]) -> [wpool:option()]. +add_defaults(Opts) -> + lists:ukeymerge(1, lists:sort(Opts), defaults()). + +-spec defaults() -> [wpool:option()]. +defaults() -> + [{max_overrun_warnings, infinity}, + {overrun_handler, {error_logger, warning_report}}, + {overrun_warning, infinity}, + {queue_type, fifo}, + {worker_opt, []}, + {workers, 100}].