diff --git a/src/wpool_process.erl b/src/wpool_process.erl index 886c853..9105e6e 100644 --- a/src/wpool_process.erl +++ b/src/wpool_process.erl @@ -18,9 +18,33 @@ -behaviour(gen_server). +%% Taken from gen_server OTP +-record(callback_cache, + {module :: module(), + handle_call :: + fun((Request :: term(), From :: from(), State :: term()) -> + {reply, Reply :: term(), NewState :: term()} | + {reply, + Reply :: term(), + NewState :: term(), + timeout() | hibernate | {continue, term()}} | + {noreply, NewState :: term()} | + {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} | + {stop, Reason :: term(), Reply :: term(), NewState :: term()} | + {stop, Reason :: term(), NewState :: term()}), + handle_cast :: + fun((Request :: term(), State :: term()) -> + {noreply, NewState :: term()} | + {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} | + {stop, Reason :: term(), NewState :: term()}), + handle_info :: + fun((Info :: timeout | term(), State :: term()) -> + {noreply, NewState :: term()} | + {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} | + {stop, Reason :: term(), NewState :: term()})}). -record(state, {name :: atom(), - mod :: atom(), + mod :: #callback_cache{}, state :: term(), options :: #{time_checker := atom(), @@ -103,13 +127,14 @@ get_state(#state{state = State}) -> init({Name, Mod, InitArgs, LOptions}) -> Options = maps:from_list(LOptions), wpool_process_callbacks:notify(handle_init_start, Options, [Name]), + CbCache = create_callback_cache(Mod), case Mod:init(InitArgs) of {ok, ModState} -> ok = notify_queue_manager(new_worker, Name, Options), wpool_process_callbacks:notify(handle_worker_creation, Options, [Name]), {ok, #state{name = Name, - mod = Mod, + mod = CbCache, state = ModState, options = Options}}; {ok, ModState, NextStep} -> @@ -117,7 +142,7 @@ init({Name, Mod, InitArgs, LOptions}) -> wpool_process_callbacks:notify(handle_worker_creation, Options, [Name]), {ok, #state{name = Name, - mod = Mod, + mod = CbCache, state = ModState, options = Options}, NextStep}; @@ -130,30 +155,42 @@ init({Name, Mod, InitArgs, LOptions}) -> %% @private -spec terminate(atom(), state()) -> term(). terminate(Reason, State) -> - #state{mod = Mod, + #state{mod = #callback_cache{module = Mod}, state = ModState, name = Name, options = Options} = State, ok = notify_queue_manager(worker_dead, Name, Options), wpool_process_callbacks:notify(handle_worker_death, Options, [Name, Reason]), - Mod:terminate(Reason, ModState). + case erlang:function_exported(Mod, terminate, 2) of + true -> + Mod:terminate(Reason, ModState); + _ -> + ok + end. %% @private --spec code_change(string(), state(), any()) -> {ok, state()} | {error, term()}. -code_change(OldVsn, State, Extra) -> - case (State#state.mod):code_change(OldVsn, State#state.state, Extra) of - {ok, NewState} -> - {ok, State#state{state = NewState}}; - Error -> - {error, Error} +-spec code_change(string() | {down, string()}, state(), any()) -> + {ok, state()} | {error, term()}. +code_change(OldVsn, #state{mod = #callback_cache{module = Mod}} = State, Extra) -> + case erlang:function_exported(Mod, code_change, 3) of + true -> + case Mod:code_change(OldVsn, State#state.state, Extra) of + {ok, NewState} -> + {ok, State#state{state = NewState}}; + {error, Error} -> + {error, Error} + end; + _ -> + {ok, State} end. %% @private -spec handle_info(any(), state()) -> {noreply, state()} | {noreply, state(), next_step()} | {stop, term(), state()}. -handle_info(Info, State) -> - try (State#state.mod):handle_info(Info, State#state.state) of +handle_info(Info, #state{mod = CbCache} = State) -> + #callback_cache{module = Mod, handle_info = HandleInfo} = CbCache, + try HandleInfo(Info, State#state.state) of {noreply, NewState} -> {noreply, State#state{state = NewState}}; {noreply, NewState, NextStep} -> @@ -161,6 +198,13 @@ handle_info(Info, State) -> {stop, Reason, NewState} -> {stop, Reason, State#state{state = NewState}} catch + error:undef:Stacktrace -> + case erlang:function_exported(Mod, handle_info, 2) of + false -> + {noreply, State}; + true -> + erlang:raise(error, undef, Stacktrace) + end; _:{noreply, NewState} -> {noreply, State#state{state = NewState}}; _:{noreply, NewState, NextStep} -> @@ -174,8 +218,8 @@ handle_info(Info, State) -> {noreply, state()} | {noreply, state(), next_step()} | {stop, term(), state()}. -handle_continue(Continue, State) -> - try (State#state.mod):handle_continue(Continue, State#state.state) of +handle_continue(Continue, #state{mod = #callback_cache{module = Mod}} = State) -> + try Mod:handle_continue(Continue, State#state.state) of {noreply, NewState} -> {noreply, State#state{state = NewState}}; {noreply, NewState, NextStep} -> @@ -193,7 +237,7 @@ handle_continue(Continue, State) -> %% @private -spec format_status(gen_server:format_status()) -> gen_server:format_status(). -format_status(#{state := #state{mod = Mod}} = Status) -> +format_status(#{state := #state{mod = #callback_cache{module = Mod}}} = Status) -> case erlang:function_exported(Mod, format_status, 1) of false -> Status; @@ -207,11 +251,12 @@ format_status(#{state := #state{mod = Mod}} = Status) -> %% @private -spec handle_cast(term(), state()) -> {noreply, state()} | {noreply, state(), next_step()} | {stop, term(), state()}. -handle_cast(Cast, #state{options = Options} = State) -> +handle_cast(Cast, #state{mod = CbCache, options = Options} = State) -> + #callback_cache{handle_cast = HandleCast} = CbCache, Task = wpool_utils:task_init({cast, Cast}, Options), ok = notify_queue_manager(worker_busy, State#state.name, Options), Reply = - try (State#state.mod):handle_cast(Cast, State#state.state) of + try HandleCast(Cast, State#state.state) of {noreply, NewState} -> {noreply, State#state{state = NewState}}; {noreply, NewState, NextStep} -> @@ -238,11 +283,12 @@ handle_cast(Cast, #state{options = Options} = State) -> {noreply, state(), next_step()} | {stop, term(), term(), state()} | {stop, term(), state()}. -handle_call(Call, From, #state{options = Options} = State) -> +handle_call(Call, From, #state{mod = CbCache, options = Options} = State) -> + #callback_cache{handle_call = HandleCall} = CbCache, Task = wpool_utils:task_init({call, Call}, Options), ok = notify_queue_manager(worker_busy, State#state.name, Options), Reply = - try (State#state.mod):handle_call(Call, From, State#state.state) of + try HandleCall(Call, From, State#state.state) of {noreply, NewState} -> {noreply, State#state{state = NewState}}; {noreply, NewState, NextStep} -> @@ -277,3 +323,9 @@ notify_queue_manager(Function, Name, #{queue_manager := QueueManager}) -> wpool_queue_manager:Function(QueueManager, Name); notify_queue_manager(_, _, _) -> ok. + +create_callback_cache(Mod) -> + #callback_cache{module = Mod, + handle_call = fun Mod:handle_call/3, + handle_cast = fun Mod:handle_cast/2, + handle_info = fun Mod:handle_info/2}. diff --git a/test/crashy_server.erl b/test/crashy_server.erl index e3c30e0..90ec009 100644 --- a/test/crashy_server.erl +++ b/test/crashy_server.erl @@ -44,6 +44,8 @@ code_change(_OldVsn, State, _Extra) -> -spec handle_info(timeout | Info, term()) -> {noreply, timeout} | Info. handle_info(timeout, _State) -> {noreply, timeout}; +handle_info(undef, _State) -> + erlang:error(undef); handle_info(Info, _State) -> Info. diff --git a/test/wpool_process_SUITE.erl b/test/wpool_process_SUITE.erl index 7638452..1e63f8b 100644 --- a/test/wpool_process_SUITE.erl +++ b/test/wpool_process_SUITE.erl @@ -24,7 +24,7 @@ -export([all/0]). -export([init_per_suite/1, end_per_suite/1, init_per_testcase/2, end_per_testcase/2]). -export([init/1, init_timeout/1, info/1, cast/1, send_request/1, call/1, continue/1, - format_status/1, no_format_status/1, stop/1]). + handle_info_missing/1, handle_info_fails/1, format_status/1, no_format_status/1, stop/1]). -export([pool_restart_crash/1, pool_norestart_crash/1, complete_coverage/1]). -spec all() -> [atom()]. @@ -173,6 +173,21 @@ continue(_Config) -> {comment, []}. +-spec handle_info_missing(config()) -> {comment, []}. +handle_info_missing(_Config) -> + %% sleepy_server does not implement handle_info/2 + {ok, Pid} = wpool_process:start_link(?MODULE, sleepy_server, 1, []), + Pid ! test, + {comment, []}. + +-spec handle_info_fails(config()) -> {comment, []}. +handle_info_fails(_Config) -> + %% sleepy_server does not implement handle_info/2 + {ok, Pid} = wpool_process:start_link(?MODULE, crashy_server, {ok, state}, []), + Pid ! undef, + false = ktn_task:wait_for(fun() -> erlang:is_process_alive(Pid) end, false), + {comment, []}. + -spec format_status(config()) -> {comment, []}. format_status(_Config) -> %% echo_server implements format_status/1 @@ -319,7 +334,7 @@ complete_coverage(_Config) -> ct:comment("Code Change"), {ok, State} = wpool_process:init({complete_coverage, echo_server, {ok, state}, []}), {ok, _} = wpool_process:code_change("oldvsn", State, {ok, state}), - {error, bad} = wpool_process:code_change("oldvsn", State, bad), + {error, bad} = wpool_process:code_change("oldvsn", State, {error, bad}), {comment, []}.