Skip to content

Commit

Permalink
Merge pull request #83 from inaka/ferigis.80.common_functions
Browse files Browse the repository at this point in the history
[#80] Implemented shared functions in shared module. [Fix #80]
  • Loading branch information
Brujo Benavides authored Aug 17, 2016
2 parents 4f55cc7 + 886094c commit 31b99fe
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 98 deletions.
75 changes: 23 additions & 52 deletions src/wpool_fsm_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,14 @@ cast_call(Process, From, Event) ->
init({Name, Mod, InitArgs, Options}) ->
case Mod:init(InitArgs) of
{ok, FirstState, StateData} ->
ok = notify_queue_manager(new_worker, Name, Options),
ok = wpool_utils:notify_queue_manager(new_worker, Name, Options),
{ok, dispatch_state, #state{ name = Name
, mod = Mod
, state = StateData
, options = Options
, fsm_state = FirstState}};
{ok, FirstState, StateData, Timeout} ->
ok = notify_queue_manager(new_worker, Name, Options),
ok = wpool_utils:notify_queue_manager(new_worker, Name, Options),
{ok, dispatch_state, #state{ name = Name
, mod = Mod
, state = StateData
Expand All @@ -127,7 +127,7 @@ terminate(Reason, CurrentState, State) ->
, name = Name
, options = Options
} = State,
ok = notify_queue_manager(worker_dead, Name, Options),
ok = wpool_utils:notify_queue_manager(worker_dead, Name, Options),
Mod:terminate(Reason, CurrentState, ModState).

%% @private
Expand All @@ -147,7 +147,7 @@ code_change(OldVsn, StateName, State, Extra) ->
-spec handle_info(any(), fsm_state(), state()) ->
{next_state, dispatch_state, state()} | {stop, term(), state()}.
handle_info(Info, StateName, StateData) ->
case do_try(
case wpool_utils:do_try(
fun() ->
(StateData#state.mod):handle_info(
Info, StateName, StateData#state.state)
Expand Down Expand Up @@ -179,15 +179,15 @@ format_status(Opt, [PDict, StateData]) ->
{next_state, dispatch_state, state()} | {stop, term(), state()}.
handle_event(Event, _StateName, StateData) ->
Task =
task_init(
wpool_utils:task_init(
{handle_event, Event},
proplists:get_value(time_checker, StateData#state.options, undefined),
proplists:get_value(overrun_warning, StateData#state.options, infinity)),
ok =
notify_queue_manager(
wpool_utils:notify_queue_manager(
worker_busy, StateData#state.name, StateData#state.options),
Reply =
case do_try(
case wpool_utils:do_try(
fun() ->
(StateData#state.mod):handle_event(Event,
StateData#state.fsm_state, StateData#state.state)
Expand All @@ -207,9 +207,9 @@ handle_event(Event, _StateName, StateData) ->
{stop, Reason, NewState} ->
{stop, Reason, StateData#state{state = NewState}}
end,
task_end(Task),
wpool_utils:task_end(Task),
ok =
notify_queue_manager(
wpool_utils:notify_queue_manager(
worker_ready, StateData#state.name, StateData#state.options),
Reply.

Expand All @@ -219,15 +219,15 @@ handle_event(Event, _StateName, StateData) ->
| {stop, term(), state()}.
handle_sync_event(Event, From, _StateName, StateData) ->
Task =
task_init(
wpool_utils:task_init(
{handle_sync_event, Event},
proplists:get_value(time_checker, StateData#state.options, undefined),
proplists:get_value(overrun_warning, StateData#state.options, infinity)),
ok =
notify_queue_manager(
wpool_utils:notify_queue_manager(
worker_busy, StateData#state.name, StateData#state.options),
Result =
case do_try(
case wpool_utils:do_try(
fun() ->
(StateData#state.mod):handle_sync_event(
Event, From, StateData#state.fsm_state, StateData#state.state)
Expand Down Expand Up @@ -269,9 +269,9 @@ handle_sync_event(Event, From, _StateName, StateData) ->
{stop, Reason, Response, NewState} ->
{stop, Reason, Response, StateData#state{state = NewState}}
end,
task_end(Task),
wpool_utils:task_end(Task),
ok =
notify_queue_manager(worker_ready,
wpool_utils:notify_queue_manager(worker_ready,
StateData#state.name,
StateData#state.options),
Result.
Expand All @@ -297,10 +297,10 @@ dispatch_state({sync_send_event, From, Event}, StateData) ->
dispatch_state(Event, StateData) ->
Task = get_task(Event, StateData),
ok =
notify_queue_manager(
wpool_utils:notify_queue_manager(
worker_busy, StateData#state.name, StateData#state.options),
Reply =
case do_try(
case wpool_utils:do_try(
fun() ->
(StateData#state.mod):(StateData#state.fsm_state)(
Event, StateData#state.state)
Expand All @@ -323,9 +323,9 @@ dispatch_state(Event, StateData) ->
{stop, Reason, NewStateData} ->
{stop, Reason, StateData#state{state = NewStateData}}
end,
task_end(Task),
wpool_utils:task_end(Task),
ok =
notify_queue_manager(
wpool_utils:notify_queue_manager(
worker_ready, StateData#state.name, StateData#state.options),
Reply.

Expand All @@ -339,10 +339,10 @@ dispatch_state(Event, StateData) ->
dispatch_state(Event, From, StateData) ->
Task = get_task(Event, StateData),
ok =
notify_queue_manager(
wpool_utils:notify_queue_manager(
worker_busy, StateData#state.name, StateData#state.options),
Result =
case do_try(
case wpool_utils:do_try(
fun() ->
(StateData#state.mod):(StateData#state.fsm_state)(
Event, From, StateData#state.state)
Expand Down Expand Up @@ -384,46 +384,17 @@ dispatch_state(Event, From, StateData) ->
{stop, Reason, Reply, NewStateData} ->
{stop, Reason, Reply, StateData#state{state = NewStateData}}
end,
task_end(Task),
wpool_utils:task_end(Task),
ok =
notify_queue_manager(
wpool_utils:notify_queue_manager(
worker_ready, StateData#state.name, StateData#state.options),
Result.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% PRIVATE FUNCTIONS
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% @doc Marks Task as started in this worker
-spec task_init(term(), atom(), infinity | pos_integer()) ->
undefined | reference().
task_init(Task, _TimeChecker, infinity) ->
Time = calendar:datetime_to_gregorian_seconds(calendar:universal_time()),
erlang:put(wpool_task, {undefined, Time, Task}),
undefined;
task_init(Task, TimeChecker, OverrunTime) ->
TaskId = erlang:make_ref(),
Time = calendar:datetime_to_gregorian_seconds(calendar:universal_time()),
erlang:put(wpool_task, {TaskId, Time, Task}),
erlang:send_after(
OverrunTime, TimeChecker, {check, self(), TaskId, OverrunTime}).

%% @doc Removes the current task from the worker
-spec task_end(undefined | reference()) -> ok.
task_end(undefined) -> erlang:erase(wpool_task);
task_end(TimerRef) ->
_ = erlang:cancel_timer(TimerRef),
erlang:erase(wpool_task).

notify_queue_manager(Function, Name, Options) ->
case proplists:get_value(queue_manager, Options) of
undefined -> ok;
QueueManager -> wpool_queue_manager:Function(QueueManager, Name)
end.

get_task(Event, StateData) ->
task_init(
wpool_utils:task_init(
{dispatch_state, Event},
proplists:get_value(time_checker, StateData#state.options, undefined),
proplists:get_value(overrun_warning, StateData#state.options, infinity)).

do_try(Fun) -> try Fun() catch _:Error -> Error end.
68 changes: 22 additions & 46 deletions src/wpool_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ cast_call(Process, From, Call) ->
init({Name, Mod, InitArgs, Options}) ->
case Mod:init(InitArgs) of
{ok, ModState} ->
ok = notify_queue_manager(new_worker, Name, Options),
ok = wpool_utils:notify_queue_manager(new_worker, Name, Options),
{ok, #state{ name = Name
, mod = Mod
, state = ModState
, options = Options
}};
{ok, ModState, Timeout} ->
ok = notify_queue_manager(new_worker, Name, Options),
ok = wpool_utils:notify_queue_manager(new_worker, Name, Options),
{ok, #state{ name = Name
, mod = Mod
, state = ModState
Expand All @@ -99,7 +99,7 @@ init({Name, Mod, InitArgs, Options}) ->
-spec terminate(atom(), state()) -> term().
terminate(Reason, State) ->
#state{mod=Mod, state=ModState, name=Name, options=Options} = State,
ok = notify_queue_manager(worker_dead, Name, Options),
ok = wpool_utils:notify_queue_manager(worker_dead, Name, Options),
Mod:terminate(Reason, ModState).

%% @private
Expand All @@ -114,7 +114,7 @@ code_change(OldVsn, State, Extra) ->
-spec handle_info(any(), state()) ->
{noreply, state()} | {stop, term(), state()}.
handle_info(Info, State) ->
case do_try(
case wpool_utils:do_try(
fun() -> (State#state.mod):handle_info(Info, State#state.state) end) of
{noreply, NewState} ->
{noreply, State#state{state = NewState}};
Expand Down Expand Up @@ -144,13 +144,15 @@ handle_cast({call, From, Call}, State) ->
end;
handle_cast({cast, Cast}, State) ->
Task =
task_init(
wpool_utils:task_init(
{cast, Cast},
proplists:get_value(time_checker, State#state.options, undefined),
proplists:get_value(overrun_warning, State#state.options, infinity)),
ok = notify_queue_manager(worker_busy, State#state.name, State#state.options),
ok = wpool_utils:notify_queue_manager(worker_busy
, State#state.name
, State#state.options),
Reply =
case do_try(
case wpool_utils:do_try(
fun() -> (State#state.mod):handle_cast(Cast, State#state.state) end) of
{noreply, NewState} ->
{noreply, State#state{state = NewState}};
Expand All @@ -159,9 +161,11 @@ handle_cast({cast, Cast}, State) ->
{stop, Reason, NewState} ->
{stop, Reason, State#state{state = NewState}}
end,
task_end(Task),
wpool_utils:task_end(Task),
ok =
notify_queue_manager(worker_ready, State#state.name, State#state.options),
wpool_utils:notify_queue_manager(worker_ready
, State#state.name
, State#state.options),
Reply.

%% @private
Expand All @@ -174,13 +178,15 @@ handle_cast({cast, Cast}, State) ->
| {stop, term(), state()}.
handle_call(Call, From, State) ->
Task =
task_init(
wpool_utils:task_init(
{call, Call},
proplists:get_value(time_checker, State#state.options, undefined),
proplists:get_value(overrun_warning, State#state.options, infinity)),
ok = notify_queue_manager(worker_busy, State#state.name, State#state.options),
ok = wpool_utils:notify_queue_manager(worker_busy
, State#state.name
, State#state.options),
Reply =
case do_try(
case wpool_utils:do_try(
fun() -> (State#state.mod):handle_call(Call, From, State#state.state)
end) of
{noreply, NewState} ->
Expand All @@ -196,39 +202,9 @@ handle_call(Call, From, State) ->
{stop, Reason, Response, NewState} ->
{stop, Reason, Response, State#state{state = NewState}}
end,
task_end(Task),
wpool_utils:task_end(Task),
ok =
notify_queue_manager(worker_ready, State#state.name, State#state.options),
wpool_utils:notify_queue_manager(worker_ready
, State#state.name
, State#state.options),
Reply.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% PRIVATE FUNCTIONS
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% @doc Marks Task as started in this worker
-spec task_init(term(), atom(), infinity | pos_integer()) ->
undefined | reference().
task_init(Task, _TimeChecker, infinity) ->
Time = calendar:datetime_to_gregorian_seconds(calendar:universal_time()),
erlang:put(wpool_task, {undefined, Time, Task}),
undefined;
task_init(Task, TimeChecker, OverrunTime) ->
TaskId = erlang:make_ref(),
Time = calendar:datetime_to_gregorian_seconds(calendar:universal_time()),
erlang:put(wpool_task, {TaskId, Time, Task}),
erlang:send_after(
OverrunTime, TimeChecker, {check, self(), TaskId, OverrunTime}).

%% @doc Removes the current task from the worker
-spec task_end(undefined | reference()) -> ok.
task_end(undefined) -> erlang:erase(wpool_task);
task_end(TimerRef) ->
_ = erlang:cancel_timer(TimerRef),
erlang:erase(wpool_task).

notify_queue_manager(Function, Name, Options) ->
case proplists:get_value(queue_manager, Options) of
undefined -> ok;
QueueManager -> wpool_queue_manager:Function(QueueManager, Name)
end.

do_try(Fun) -> try Fun() catch _:Error -> Error end.
60 changes: 60 additions & 0 deletions src/wpool_utils.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
% This file is licensed to you under the Apache License,
% Version 2.0 (the "License"); you may not use this file
% except in compliance with the License. You may obtain
% a copy of the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing,
% software distributed under the License is distributed on an
% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
% KIND, either express or implied. See the License for the
% specific language governing permissions and limitations
% under the License.
%%% @author Felipe Ripoll <ferigis@gmail.com>
%%% @doc Common functions for wpool_process and wpool_fsm_process
%%% modules.
-module(wpool_utils).
-author('ferigis@gmail.com').

%% API
-export([ task_init/3
, task_end/1
, notify_queue_manager/3
, do_try/1]).


%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Api
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%% @doc Marks Task as started in this worker
-spec task_init(term(), atom(), infinity | pos_integer()) ->
undefined | reference().
task_init(Task, _TimeChecker, infinity) ->
Time = calendar:datetime_to_gregorian_seconds(calendar:universal_time()),
erlang:put(wpool_task, {undefined, Time, Task}),
undefined;
task_init(Task, TimeChecker, OverrunTime) ->
TaskId = erlang:make_ref(),
Time = calendar:datetime_to_gregorian_seconds(calendar:universal_time()),
erlang:put(wpool_task, {TaskId, Time, Task}),
erlang:send_after(
OverrunTime, TimeChecker, {check, self(), TaskId, OverrunTime}).

%% @doc Removes the current task from the worker
-spec task_end(undefined | reference()) -> ok.
task_end(undefined) -> erlang:erase(wpool_task);
task_end(TimerRef) ->
_ = erlang:cancel_timer(TimerRef),
erlang:erase(wpool_task).

-spec notify_queue_manager(atom(), atom(), list()) -> ok | any().
notify_queue_manager(Function, Name, Options) ->
case proplists:get_value(queue_manager, Options) of
undefined -> ok;
QueueManager -> wpool_queue_manager:Function(QueueManager, Name)
end.

-spec do_try(fun()) -> any().
do_try(Fun) -> try Fun() catch _:Error -> Error end.

0 comments on commit 31b99fe

Please sign in to comment.