Skip to content

Commit

Permalink
Merge pull request #9 from inaka/elbrujohalcon.9.indentation__
Browse files Browse the repository at this point in the history
Indentation!!
  • Loading branch information
jfacorro committed Sep 25, 2014
2 parents 48e70e4 + 6a13590 commit 293b6c5
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 218 deletions.
10 changes: 5 additions & 5 deletions src/wpool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ stop() -> application:stop(worker_pool).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% @private
-spec start(any(), any()) -> {ok, pid()} | {error, term()}.
start(_StartType, _StartArgs) ->
wpool_sup:start_link().
start(_StartType, _StartArgs) -> wpool_sup:start_link().

%% @private
-spec stop(any()) -> ok.
Expand Down Expand Up @@ -138,8 +137,8 @@ call(Sup, Call, Strategy, Timeout) ->
-spec call(name(), term(), strategy(), available_worker_timeout(), timeout()) ->
term().
call(Sup, Call, available_worker, Worker_Timeout, Timeout) ->
Worker = wpool_pool:available_worker(Sup, Worker_Timeout),
wpool_process:call(Worker, Call, Timeout);
Worker = wpool_pool:available_worker(Sup, Worker_Timeout),
wpool_process:call(Worker, Call, Timeout);
call(Sup, Call, Strategy, _Worker_Timeout, Timeout) ->
call(Sup, Call, Strategy, Timeout).

Expand All @@ -151,7 +150,8 @@ cast(Sup, Cast) -> cast(Sup, Cast, default_strategy()).
-spec cast(name(), term(), strategy()) -> ok.
cast(Sup, Cast, available_worker) ->
wpool_pool:cast_to_available_worker(Sup, Cast);
cast(Sup, Cast, Strategy) -> wpool_process:cast(wpool_pool:Strategy(Sup), Cast).
cast(Sup, Cast, Strategy) ->
wpool_process:cast(wpool_pool:Strategy(Sup), Cast).

%% @doc Retrieves a snapshot of the pool stats
-spec stats(name()) -> stats().
Expand Down
146 changes: 72 additions & 74 deletions src/wpool_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ start_link(Name, Options) ->
%% @throws no_workers
-spec best_worker(wpool:name()) -> atom().
best_worker(Sup) ->
case find_wpool(Sup) of
undefined -> throw(no_workers);
Wpool -> min_message_queue(Wpool)
end.
case find_wpool(Sup) of
undefined -> throw(no_workers);
Wpool -> min_message_queue(Wpool)
end.

%% @doc Picks a random worker
%% @throws no_workers
Expand All @@ -69,37 +69,38 @@ random_worker(Sup) ->
case wpool_size(Sup) of
undefined -> throw(no_workers);
Wpool_Size ->
_ = random:seed(now()), worker_name(Sup, random:uniform(Wpool_Size))
_ = random:seed(now()),
worker_name(Sup, random:uniform(Wpool_Size))
end.

%% @doc Picks the next worker in a round robin fashion
%% @throws no_workers
-spec next_worker(wpool:name()) -> atom().
next_worker(Sup) ->
case move_wpool(Sup) of
undefined -> throw(no_workers);
Next -> worker_name(Sup, Next)
end.
case move_wpool(Sup) of
undefined -> throw(no_workers);
Next -> worker_name(Sup, Next)
end.

%% @doc Picks the first available worker.
%% If all workers are busy, waits for Timeout ms until one is free
%% @throws no_workers
-spec available_worker(wpool:name(), timeout()) -> atom().
available_worker(Sup, Timeout) ->
case wpool_queue_manager:available_worker(
queue_manager_name(Sup), Timeout) of
timeout -> throw(no_workers);
noproc -> throw(no_workers);
Worker -> Worker
end.
case wpool_queue_manager:available_worker(
queue_manager_name(Sup), Timeout) of
timeout -> throw(no_workers);
noproc -> throw(no_workers);
Worker -> Worker
end.

%% @doc Casts a message to the first available worker.
%% Since we can wait forever for a wpool:cast to be delivered
%% but we don't want the caller to be blocked, this function
%% just forwards the cast when it gets the worker
-spec cast_to_available_worker(wpool:name(), term()) -> ok.
cast_to_available_worker(Sup, Cast) ->
wpool_queue_manager:cast_to_available_worker(queue_manager_name(Sup), Cast).
wpool_queue_manager:cast_to_available_worker(queue_manager_name(Sup), Cast).

%% @doc Retrieves a snapshot of the pool stats
%% @throws no_workers
Expand All @@ -116,12 +117,12 @@ stats(Wpool, Sup) ->
lists:foldl(
fun(N, {T, L}) ->
Worker = erlang:whereis(worker_name(Sup, N)),
[ {message_queue_len, MQL} = MQLT
, Memory, Function, Location, {dictionary, Dictionary}] =
[{message_queue_len, MQL} = MQLT,
Memory, Function, Location, {dictionary, Dictionary}] =
erlang:process_info(
Worker,
[ message_queue_len, memory, current_function
, current_location, dictionary]),
[message_queue_len, memory, current_function,
current_location, dictionary]),
Time =
calendar:datetime_to_gregorian_seconds(calendar:universal_time()),
WS =
Expand All @@ -134,8 +135,7 @@ stats(Wpool, Sup) ->
[MQLT, Memory, Function, Location];
{_, {_TaskId, Started, Task}} ->
[MQLT, Memory, Function, Location,
{task, Task},
{runtime, Time - Started}]
{task, Task}, {runtime, Time - Started}]
end,
{T + MQL, [{N, WS} | L]}
end, {0, []}, lists:seq(1, Wpool#wpool.size)),
Expand All @@ -161,22 +161,22 @@ worker_names(Pool_Name) ->
%% @doc the number of workers in the pool
-spec wpool_size(atom()) -> non_neg_integer() | undefined.
wpool_size(Name) ->
try ets:update_counter(?MODULE, Name, {#wpool.size, 0}) of
Wpool_Size ->
case erlang:whereis(Name) of
undefined ->
ets:delete(?MODULE, Name),
undefined;
_ ->
Wpool_Size
end
catch
_:badarg ->
case build_wpool(Name) of
undefined -> undefined;
Wpool -> Wpool#wpool.size
end
end.
try ets:update_counter(?MODULE, Name, {#wpool.size, 0}) of
Wpool_Size ->
case erlang:whereis(Name) of
undefined ->
ets:delete(?MODULE, Name),
undefined;
_ ->
Wpool_Size
end
catch
_:badarg ->
case build_wpool(Name) of
undefined -> undefined;
Wpool -> Wpool#wpool.size
end
end.

%% ===================================================================
%% Supervisor callbacks
Expand Down Expand Up @@ -211,16 +211,14 @@ init({Name, Options}) ->
{QueueManager,
{wpool_queue_manager, start_link, [Name, QueueManager]},
permanent, brutal_kill, worker, [wpool_queue_manager]},
{ok, {Strategy,
[TimeCheckerSpec, QueueManagerSpec
| [{worker_name(Name, I),
{wpool_process, start_link,
[worker_name(Name, I), Worker, InitArgs,
[ {queue_manager, QueueManager}, {time_checker, TimeChecker}
| Options]]},
permanent, 5000, worker, [Worker]}
|| I <- lists:seq(1, Workers)]
]}}.
WorkerSpecs =
[{worker_name(Name, I),
{wpool_process, start_link,
[worker_name(Name, I), Worker, InitArgs,
[ {queue_manager, QueueManager}, {time_checker, TimeChecker}
| Options]]}, permanent, 5000, worker, [Worker]}
|| I <- lists:seq(1, Workers)],
{ok, {Strategy, [TimeCheckerSpec, QueueManagerSpec | WorkerSpecs]}}.

%% ===================================================================
%% Private functions
Expand Down Expand Up @@ -258,36 +256,36 @@ min_message_queue(Checked, Wpool, Found) ->
%% ETS functions
%% ===================================================================
store_wpool(Wpool) ->
true = ets:insert(?MODULE, Wpool),
Wpool.
true = ets:insert(?MODULE, Wpool),
Wpool.

move_wpool(Name) ->
try
Wpool_Size = ets:update_counter(?MODULE, Name, {#wpool.size, 0}),
ets:update_counter(?MODULE, Name, {#wpool.next, 1, Wpool_Size, 1})
catch
_:badarg ->
case build_wpool(Name) of
undefined -> undefined;
Wpool -> Wpool#wpool.next
end
end.
try
Wpool_Size = ets:update_counter(?MODULE, Name, {#wpool.size, 0}),
ets:update_counter(?MODULE, Name, {#wpool.next, 1, Wpool_Size, 1})
catch
_:badarg ->
case build_wpool(Name) of
undefined -> undefined;
Wpool -> Wpool#wpool.next
end
end.

find_wpool(Name) ->
try ets:lookup(?MODULE, Name) of
[Wpool | _] ->
case erlang:whereis(Name) of
undefined ->
ets:delete(?MODULE, Name),
undefined;
_ ->
Wpool
end;
_ -> build_wpool(Name)
catch
_:badarg ->
build_wpool(Name)
end.
try ets:lookup(?MODULE, Name) of
[Wpool | _] ->
case erlang:whereis(Name) of
undefined ->
ets:delete(?MODULE, Name),
undefined;
_ ->
Wpool
end;
_ -> build_wpool(Name)
catch
_:badarg ->
build_wpool(Name)
end.

%% @doc We use this function not to report an error if for some reason we've
%% lost the record on the ets table. This SHOULDN'T be called too much
Expand Down
Loading

0 comments on commit 293b6c5

Please sign in to comment.