Skip to content

Commit

Permalink
topic_subscriber owns its consumers
Browse files Browse the repository at this point in the history
Fixes #369
  • Loading branch information
k32 authored and Dmitry Fedoseev committed Jul 21, 2020
1 parent dcd3282 commit 23dc042
Showing 1 changed file with 52 additions and 11 deletions.
63 changes: 52 additions & 11 deletions src/brod_topic_subscriber.erl
Original file line number Diff line number Diff line change
Expand Up @@ -130,21 +130,32 @@
-type consumer() :: #consumer{}.

-record(state,
<<<<<<< HEAD
{ client :: brod:client()
, client_mref :: reference()
, topic :: brod:topic()
, consumers = [] :: [consumer()]
, cb_module :: module()
, cb_state :: cb_state()
, message_type :: message | message_set
=======
{ client :: brod:client()
, client_mref :: reference()
, topic :: brod:topic()
, consumers = [] :: [consumer()]
, cb_fun :: cb_fun()
, cb_state :: cb_state()
, message_type :: message | message_set
, consumer_config :: list()
>>>>>>> topic_subscriber owns its consumers
}).

-type state() :: #state{}.

%% delay 2 seconds retry the failed subscription to partiton consumer process
-define(RESUBSCRIBE_DELAY, 2000).

-define(LO_CMD_START_CONSUMER(ConsumerConfig, CommittedOffsets, Partitions),
-define(LO_CMD_START_CONSUMER(CommittedOffsets, Partitions),
{'$start_consumer', ConsumerConfig, CommittedOffsets, Partitions}).
-define(LO_CMD_SUBSCRIBE_PARTITIONS, '$subscribe_partitions').

Expand Down Expand Up @@ -274,6 +285,7 @@ ack(Pid, Partition, Offset) ->
%%%_* gen_server callbacks =====================================================

%% @private
<<<<<<< HEAD
-spec init(topic_subscriber_config()) -> {ok, state()}.
init(Config) ->
Defaults = #{ message_type => message_set
Expand All @@ -290,29 +302,50 @@ init(Config) ->
, partitions := Partitions
} = maps:merge(Defaults, Config),
{ok, CommittedOffsets, CbState} = CbModule:init(Topic, InitData),
=======
init({Client, Topic, Partitions, ConsumerConfig,
MessageType, CbModule, CbInitArg}) ->
{ok, CommittedOffsets, CbState} = CbModule:init(Topic, CbInitArg),
CbFun = fun(Partition, Msg, CbStateIn) ->
CbModule:handle_message(Partition, Msg, CbStateIn)
end,
init({Client, Topic, Partitions, ConsumerConfig,
CommittedOffsets, MessageType, CbFun, CbState});
init({Client, Topic, Partitions, ConsumerConfig0,
CommittedOffsets, MessageType, CbFun, CbState}) ->
>>>>>>> topic_subscriber owns its consumers
ok = brod_utils:assert_client(Client),
ok = brod_utils:assert_topic(Topic),
self() ! ?LO_CMD_START_CONSUMER(ConsumerConfig, CommittedOffsets, Partitions),
ConsumerConfig = [{register_self, false} | ConsumerConfig0],
self() ! ?LO_CMD_START_CONSUMER(CommittedOffsets, Partitions),
State =
<<<<<<< HEAD
#state{ client = Client
, client_mref = erlang:monitor(process, Client)
, topic = Topic
, cb_module = CbModule
, cb_state = CbState
, message_type = MessageType
=======
#state{ client = Client
, client_mref = erlang:monitor(process, Client)
, topic = Topic
, cb_fun = CbFun
, cb_state = CbState
, message_type = MessageType
, consumer_config = ConsumerConfig
>>>>>>> topic_subscriber owns its consumers
},
{ok, State}.

%% @private
handle_info({_ConsumerPid, #kafka_message_set{} = MsgSet}, State0) ->
State = handle_consumer_delivery(MsgSet, State0),
{noreply, State};
handle_info(?LO_CMD_START_CONSUMER(ConsumerConfig, CommittedOffsets,
Partitions0),
handle_info(?LO_CMD_START_CONSUMER(CommittedOffsets, Partitions0),
#state{ client = Client
, topic = Topic
} = State) ->
ok = brod:start_consumer(Client, Topic, ConsumerConfig),
{ok, PartitionsCount} = brod:get_partitions_count(Client, Topic),
AllPartitions = lists:seq(0, PartitionsCount - 1),
Partitions =
Expand Down Expand Up @@ -413,15 +446,17 @@ update_last_offset(Partition, Messages,
Consumer = C#consumer{last_offset = LastOffset},
State#state{consumers = put_consumer(Consumer, Consumers)}.

subscribe_partitions(#state{ client = Client
, topic = Topic
, consumers = Consumers0
subscribe_partitions(#state{ client = Client
, topic = Topic
, consumers = Consumers0
, consumer_config = ConsumerConfig
} = State) ->
Consumers =
lists:map(fun(C) -> subscribe_partition(Client, Topic, C) end, Consumers0),
lists:map( fun(C) -> subscribe_partition(Client, Topic, ConsumerConfig, C) end
, Consumers0),
{ok, State#state{consumers = Consumers}}.

subscribe_partition(Client, Topic, Consumer) ->
subscribe_partition(Client, Topic, ConsumerConfig, Consumer) ->
#consumer{ partition = Partition
, consumer_pid = Pid
, acked_offset = AckedOffset
Expand All @@ -448,9 +483,15 @@ subscribe_partition(Client, Topic, Consumer) ->
StartOffset >= 0 orelse erlang:error({invalid_offset, AckedOffset}),
[{begin_offset, StartOffset}]
end,
case brod:subscribe(Client, self(), Topic, Partition, Options) of
ClientPid = if is_atom(Client) -> whereis(Client);
is_pid(Client) -> Client
end,
case brod_consumer:start_link(ClientPid, Topic, Partition,
ConsumerConfig) of
{ok, ConsumerPid} ->
Mref = erlang:monitor(process, ConsumerPid),
unlink(ConsumerPid),
ok = brod_consumer:subscribe(ConsumerPid, self(), Options),
Consumer#consumer{ consumer_pid = ConsumerPid
, consumer_mref = Mref
};
Expand Down

0 comments on commit 23dc042

Please sign in to comment.