diff --git a/src/wpool_fsm_process.erl b/src/wpool_fsm_process.erl index 7682b56..d58a96a 100644 --- a/src/wpool_fsm_process.erl +++ b/src/wpool_fsm_process.erl @@ -102,14 +102,14 @@ cast_call(Process, From, Event) -> init({Name, Mod, InitArgs, Options}) -> case Mod:init(InitArgs) of {ok, FirstState, StateData} -> - ok = notify_queue_manager(new_worker, Name, Options), + ok = wpool_utils:notify_queue_manager(new_worker, Name, Options), {ok, dispatch_state, #state{ name = Name , mod = Mod , state = StateData , options = Options , fsm_state = FirstState}}; {ok, FirstState, StateData, Timeout} -> - ok = notify_queue_manager(new_worker, Name, Options), + ok = wpool_utils:notify_queue_manager(new_worker, Name, Options), {ok, dispatch_state, #state{ name = Name , mod = Mod , state = StateData @@ -127,7 +127,7 @@ terminate(Reason, CurrentState, State) -> , name = Name , options = Options } = State, - ok = notify_queue_manager(worker_dead, Name, Options), + ok = wpool_utils:notify_queue_manager(worker_dead, Name, Options), Mod:terminate(Reason, CurrentState, ModState). %% @private @@ -147,7 +147,7 @@ code_change(OldVsn, StateName, State, Extra) -> -spec handle_info(any(), fsm_state(), state()) -> {next_state, dispatch_state, state()} | {stop, term(), state()}. handle_info(Info, StateName, StateData) -> - case do_try( + case wpool_utils:do_try( fun() -> (StateData#state.mod):handle_info( Info, StateName, StateData#state.state) @@ -179,15 +179,15 @@ format_status(Opt, [PDict, StateData]) -> {next_state, dispatch_state, state()} | {stop, term(), state()}. handle_event(Event, _StateName, StateData) -> Task = - task_init( + wpool_utils:task_init( {handle_event, Event}, proplists:get_value(time_checker, StateData#state.options, undefined), proplists:get_value(overrun_warning, StateData#state.options, infinity)), ok = - notify_queue_manager( + wpool_utils:notify_queue_manager( worker_busy, StateData#state.name, StateData#state.options), Reply = - case do_try( + case wpool_utils:do_try( fun() -> (StateData#state.mod):handle_event(Event, StateData#state.fsm_state, StateData#state.state) @@ -207,9 +207,9 @@ handle_event(Event, _StateName, StateData) -> {stop, Reason, NewState} -> {stop, Reason, StateData#state{state = NewState}} end, - task_end(Task), + wpool_utils:task_end(Task), ok = - notify_queue_manager( + wpool_utils:notify_queue_manager( worker_ready, StateData#state.name, StateData#state.options), Reply. @@ -219,15 +219,15 @@ handle_event(Event, _StateName, StateData) -> | {stop, term(), state()}. handle_sync_event(Event, From, _StateName, StateData) -> Task = - task_init( + wpool_utils:task_init( {handle_sync_event, Event}, proplists:get_value(time_checker, StateData#state.options, undefined), proplists:get_value(overrun_warning, StateData#state.options, infinity)), ok = - notify_queue_manager( + wpool_utils:notify_queue_manager( worker_busy, StateData#state.name, StateData#state.options), Result = - case do_try( + case wpool_utils:do_try( fun() -> (StateData#state.mod):handle_sync_event( Event, From, StateData#state.fsm_state, StateData#state.state) @@ -269,9 +269,9 @@ handle_sync_event(Event, From, _StateName, StateData) -> {stop, Reason, Response, NewState} -> {stop, Reason, Response, StateData#state{state = NewState}} end, - task_end(Task), + wpool_utils:task_end(Task), ok = - notify_queue_manager(worker_ready, + wpool_utils:notify_queue_manager(worker_ready, StateData#state.name, StateData#state.options), Result. @@ -297,10 +297,10 @@ dispatch_state({sync_send_event, From, Event}, StateData) -> dispatch_state(Event, StateData) -> Task = get_task(Event, StateData), ok = - notify_queue_manager( + wpool_utils:notify_queue_manager( worker_busy, StateData#state.name, StateData#state.options), Reply = - case do_try( + case wpool_utils:do_try( fun() -> (StateData#state.mod):(StateData#state.fsm_state)( Event, StateData#state.state) @@ -323,9 +323,9 @@ dispatch_state(Event, StateData) -> {stop, Reason, NewStateData} -> {stop, Reason, StateData#state{state = NewStateData}} end, - task_end(Task), + wpool_utils:task_end(Task), ok = - notify_queue_manager( + wpool_utils:notify_queue_manager( worker_ready, StateData#state.name, StateData#state.options), Reply. @@ -339,10 +339,10 @@ dispatch_state(Event, StateData) -> dispatch_state(Event, From, StateData) -> Task = get_task(Event, StateData), ok = - notify_queue_manager( + wpool_utils:notify_queue_manager( worker_busy, StateData#state.name, StateData#state.options), Result = - case do_try( + case wpool_utils:do_try( fun() -> (StateData#state.mod):(StateData#state.fsm_state)( Event, From, StateData#state.state) @@ -384,46 +384,17 @@ dispatch_state(Event, From, StateData) -> {stop, Reason, Reply, NewStateData} -> {stop, Reason, Reply, StateData#state{state = NewStateData}} end, - task_end(Task), + wpool_utils:task_end(Task), ok = - notify_queue_manager( + wpool_utils:notify_queue_manager( worker_ready, StateData#state.name, StateData#state.options), Result. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% PRIVATE FUNCTIONS %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% @doc Marks Task as started in this worker --spec task_init(term(), atom(), infinity | pos_integer()) -> - undefined | reference(). -task_init(Task, _TimeChecker, infinity) -> - Time = calendar:datetime_to_gregorian_seconds(calendar:universal_time()), - erlang:put(wpool_task, {undefined, Time, Task}), - undefined; -task_init(Task, TimeChecker, OverrunTime) -> - TaskId = erlang:make_ref(), - Time = calendar:datetime_to_gregorian_seconds(calendar:universal_time()), - erlang:put(wpool_task, {TaskId, Time, Task}), - erlang:send_after( - OverrunTime, TimeChecker, {check, self(), TaskId, OverrunTime}). - -%% @doc Removes the current task from the worker --spec task_end(undefined | reference()) -> ok. -task_end(undefined) -> erlang:erase(wpool_task); -task_end(TimerRef) -> - _ = erlang:cancel_timer(TimerRef), - erlang:erase(wpool_task). - -notify_queue_manager(Function, Name, Options) -> - case proplists:get_value(queue_manager, Options) of - undefined -> ok; - QueueManager -> wpool_queue_manager:Function(QueueManager, Name) - end. - get_task(Event, StateData) -> - task_init( + wpool_utils:task_init( {dispatch_state, Event}, proplists:get_value(time_checker, StateData#state.options, undefined), proplists:get_value(overrun_warning, StateData#state.options, infinity)). - -do_try(Fun) -> try Fun() catch _:Error -> Error end. diff --git a/src/wpool_process.erl b/src/wpool_process.erl index 9b1e9e0..10dbd1d 100644 --- a/src/wpool_process.erl +++ b/src/wpool_process.erl @@ -78,14 +78,14 @@ cast_call(Process, From, Call) -> init({Name, Mod, InitArgs, Options}) -> case Mod:init(InitArgs) of {ok, ModState} -> - ok = notify_queue_manager(new_worker, Name, Options), + ok = wpool_utils:notify_queue_manager(new_worker, Name, Options), {ok, #state{ name = Name , mod = Mod , state = ModState , options = Options }}; {ok, ModState, Timeout} -> - ok = notify_queue_manager(new_worker, Name, Options), + ok = wpool_utils:notify_queue_manager(new_worker, Name, Options), {ok, #state{ name = Name , mod = Mod , state = ModState @@ -99,7 +99,7 @@ init({Name, Mod, InitArgs, Options}) -> -spec terminate(atom(), state()) -> term(). terminate(Reason, State) -> #state{mod=Mod, state=ModState, name=Name, options=Options} = State, - ok = notify_queue_manager(worker_dead, Name, Options), + ok = wpool_utils:notify_queue_manager(worker_dead, Name, Options), Mod:terminate(Reason, ModState). %% @private @@ -114,7 +114,7 @@ code_change(OldVsn, State, Extra) -> -spec handle_info(any(), state()) -> {noreply, state()} | {stop, term(), state()}. handle_info(Info, State) -> - case do_try( + case wpool_utils:do_try( fun() -> (State#state.mod):handle_info(Info, State#state.state) end) of {noreply, NewState} -> {noreply, State#state{state = NewState}}; @@ -144,13 +144,15 @@ handle_cast({call, From, Call}, State) -> end; handle_cast({cast, Cast}, State) -> Task = - task_init( + wpool_utils:task_init( {cast, Cast}, proplists:get_value(time_checker, State#state.options, undefined), proplists:get_value(overrun_warning, State#state.options, infinity)), - ok = notify_queue_manager(worker_busy, State#state.name, State#state.options), + ok = wpool_utils:notify_queue_manager(worker_busy + , State#state.name + , State#state.options), Reply = - case do_try( + case wpool_utils:do_try( fun() -> (State#state.mod):handle_cast(Cast, State#state.state) end) of {noreply, NewState} -> {noreply, State#state{state = NewState}}; @@ -159,9 +161,11 @@ handle_cast({cast, Cast}, State) -> {stop, Reason, NewState} -> {stop, Reason, State#state{state = NewState}} end, - task_end(Task), + wpool_utils:task_end(Task), ok = - notify_queue_manager(worker_ready, State#state.name, State#state.options), + wpool_utils:notify_queue_manager(worker_ready + , State#state.name + , State#state.options), Reply. %% @private @@ -174,13 +178,15 @@ handle_cast({cast, Cast}, State) -> | {stop, term(), state()}. handle_call(Call, From, State) -> Task = - task_init( + wpool_utils:task_init( {call, Call}, proplists:get_value(time_checker, State#state.options, undefined), proplists:get_value(overrun_warning, State#state.options, infinity)), - ok = notify_queue_manager(worker_busy, State#state.name, State#state.options), + ok = wpool_utils:notify_queue_manager(worker_busy + , State#state.name + , State#state.options), Reply = - case do_try( + case wpool_utils:do_try( fun() -> (State#state.mod):handle_call(Call, From, State#state.state) end) of {noreply, NewState} -> @@ -196,39 +202,9 @@ handle_call(Call, From, State) -> {stop, Reason, Response, NewState} -> {stop, Reason, Response, State#state{state = NewState}} end, - task_end(Task), + wpool_utils:task_end(Task), ok = - notify_queue_manager(worker_ready, State#state.name, State#state.options), + wpool_utils:notify_queue_manager(worker_ready + , State#state.name + , State#state.options), Reply. - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% PRIVATE FUNCTIONS -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% @doc Marks Task as started in this worker --spec task_init(term(), atom(), infinity | pos_integer()) -> - undefined | reference(). -task_init(Task, _TimeChecker, infinity) -> - Time = calendar:datetime_to_gregorian_seconds(calendar:universal_time()), - erlang:put(wpool_task, {undefined, Time, Task}), - undefined; -task_init(Task, TimeChecker, OverrunTime) -> - TaskId = erlang:make_ref(), - Time = calendar:datetime_to_gregorian_seconds(calendar:universal_time()), - erlang:put(wpool_task, {TaskId, Time, Task}), - erlang:send_after( - OverrunTime, TimeChecker, {check, self(), TaskId, OverrunTime}). - -%% @doc Removes the current task from the worker --spec task_end(undefined | reference()) -> ok. -task_end(undefined) -> erlang:erase(wpool_task); -task_end(TimerRef) -> - _ = erlang:cancel_timer(TimerRef), - erlang:erase(wpool_task). - -notify_queue_manager(Function, Name, Options) -> - case proplists:get_value(queue_manager, Options) of - undefined -> ok; - QueueManager -> wpool_queue_manager:Function(QueueManager, Name) - end. - -do_try(Fun) -> try Fun() catch _:Error -> Error end. diff --git a/src/wpool_utils.erl b/src/wpool_utils.erl new file mode 100644 index 0000000..eb8310e --- /dev/null +++ b/src/wpool_utils.erl @@ -0,0 +1,60 @@ +% This file is licensed to you under the Apache License, +% Version 2.0 (the "License"); you may not use this file +% except in compliance with the License. You may obtain +% a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, +% software distributed under the License is distributed on an +% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +% KIND, either express or implied. See the License for the +% specific language governing permissions and limitations +% under the License. +%%% @author Felipe Ripoll +%%% @doc Common functions for wpool_process and wpool_fsm_process +%%% modules. +-module(wpool_utils). +-author('ferigis@gmail.com'). + +%% API +-export([ task_init/3 + , task_end/1 + , notify_queue_manager/3 + , do_try/1]). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Api +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% @doc Marks Task as started in this worker +-spec task_init(term(), atom(), infinity | pos_integer()) -> + undefined | reference(). +task_init(Task, _TimeChecker, infinity) -> + Time = calendar:datetime_to_gregorian_seconds(calendar:universal_time()), + erlang:put(wpool_task, {undefined, Time, Task}), + undefined; +task_init(Task, TimeChecker, OverrunTime) -> + TaskId = erlang:make_ref(), + Time = calendar:datetime_to_gregorian_seconds(calendar:universal_time()), + erlang:put(wpool_task, {TaskId, Time, Task}), + erlang:send_after( + OverrunTime, TimeChecker, {check, self(), TaskId, OverrunTime}). + +%% @doc Removes the current task from the worker +-spec task_end(undefined | reference()) -> ok. +task_end(undefined) -> erlang:erase(wpool_task); +task_end(TimerRef) -> + _ = erlang:cancel_timer(TimerRef), + erlang:erase(wpool_task). + +-spec notify_queue_manager(atom(), atom(), list()) -> ok | any(). +notify_queue_manager(Function, Name, Options) -> + case proplists:get_value(queue_manager, Options) of + undefined -> ok; + QueueManager -> wpool_queue_manager:Function(QueueManager, Name) + end. + +-spec do_try(fun()) -> any(). +do_try(Fun) -> try Fun() catch _:Error -> Error end.