diff --git a/README.md b/README.md index 0846cf9..9803001 100644 --- a/README.md +++ b/README.md @@ -193,9 +193,17 @@ Batch message consumers receive a list of messages and work as part of the `:bro 3. Add `Kaffe.GroupMemberSupervisor` as a supervisor in your supervision tree - ```elixir - supervisor(Kaffe.GroupMemberSupervisor, []) - ``` + ```elixir + def start(_type, _args) do + + children = [ + Kaffe.GroupMemberSupervisor + ] + + opts = [strategy: :one_for_all, name: MyApp.Supervisor] + Supervisor.start_link(children, opts) + end + ``` ### async message acknowledgement diff --git a/lib/kaffe/group_member/manager/group_member_supervisor.ex b/lib/kaffe/group_member/manager/group_member_supervisor.ex index c37b7d3..9a7a767 100644 --- a/lib/kaffe/group_member/manager/group_member_supervisor.ex +++ b/lib/kaffe/group_member/manager/group_member_supervisor.ex @@ -7,6 +7,10 @@ defmodule Kaffe.GroupMemberSupervisor do require Logger + def start_link(_) do + start_link() + end + def start_link do Supervisor.start_link(__MODULE__, :ok, name: name()) end diff --git a/lib/kaffe/group_member/subscriber/subscriber.ex b/lib/kaffe/group_member/subscriber/subscriber.ex index 586a28b..4896231 100644 --- a/lib/kaffe/group_member/subscriber/subscriber.ex +++ b/lib/kaffe/group_member/subscriber/subscriber.ex @@ -108,7 +108,7 @@ defmodule Kaffe.Subscriber do end def handle_cast({:commit_offsets, topic, partition, generation_id, offset}, state) do - Logger.debug "Ready to commit offsets for #{state.topic} / #{state.partition} / #{generation_id} at offset: #{offset}" + Logger.debug "Committing offsets for #{state.topic} / #{state.partition} / #{generation_id} at offset: #{offset}" # Is this the ack we're looking for? ^topic = state.topic @@ -123,7 +123,7 @@ defmodule Kaffe.Subscriber do end def handle_cast({:request_more_messages, offset}, state) do - Logger.debug "Ready to consume more messages of #{state.topic} / #{state.partition} at offset: #{offset}. Offset has not been commited back" + Logger.debug "Requesting messages from #{state.topic} / #{state.partition} at offset: #{offset}" :ok = kafka().consume_ack(state.subscriber_pid, offset) diff --git a/mix.exs b/mix.exs index 7b04df7..551b8f0 100644 --- a/mix.exs +++ b/mix.exs @@ -3,7 +3,7 @@ defmodule Kaffe.Mixfile do def project do [app: :kaffe, - version: "1.7.0", + version: "1.8.0", description: "An opinionated Elixir wrapper around brod, the Erlang Kafka client, that supports encrypted connections to Heroku Kafka out of the box.", name: "Kaffe", source_url: "https://github.com/spreedly/kaffe",