diff --git a/config/config.exs b/config/config.exs index 437231f..885b957 100644 --- a/config/config.exs +++ b/config/config.exs @@ -1,4 +1,4 @@ -use Mix.Config +import Config config :kaffe, kafka_mod: :brod, diff --git a/config/dev.exs b/config/dev.exs index d2d855e..becde76 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -1 +1 @@ -use Mix.Config +import Config diff --git a/config/test.exs b/config/test.exs index 2f7eea7..a8f6344 100644 --- a/config/test.exs +++ b/config/test.exs @@ -1,4 +1,4 @@ -use Mix.Config +import Config config :kaffe, kafka_mod: TestBrod, diff --git a/lib/kaffe/config.ex b/lib/kaffe/config.ex index b411348..e63aab2 100644 --- a/lib/kaffe/config.ex +++ b/lib/kaffe/config.ex @@ -15,9 +15,7 @@ defmodule Kaffe.Config do end) end - @doc """ - Transform the encoded string into a list of `{charlist, port}` tuples. - """ + # Transform the encoded string into a list of `{charlist, port}` tuples. def parse_endpoints(url) when is_binary(url) do url |> String.replace("kafka+ssl://", "") diff --git a/lib/kaffe/consumer_group/group_manager.ex b/lib/kaffe/consumer_group/group_manager.ex index 99ca181..36daa05 100644 --- a/lib/kaffe/consumer_group/group_manager.ex +++ b/lib/kaffe/consumer_group/group_manager.ex @@ -40,6 +40,10 @@ defmodule Kaffe.GroupManager do ## ========================================================================== ## Public API ## ========================================================================== + def start_link(_) do + start_link() + end + def start_link() do GenServer.start_link(__MODULE__, [self()], name: name()) end @@ -118,9 +122,7 @@ defmodule Kaffe.GroupManager do {:reply, {:ok, new_topics}, %State{state | topics: state.topics ++ new_topics}} end - @doc """ - List the currently subscribed topics - """ + # List the currently subscribed topics def handle_call({:list_subscribed_topics}, _from, %State{topics: topics} = state) do {:reply, topics, state} end diff --git a/lib/kaffe/consumer_group/group_member_supervisor.ex b/lib/kaffe/consumer_group/group_member_supervisor.ex index c68aaa0..ad0c599 100644 --- a/lib/kaffe/consumer_group/group_member_supervisor.ex +++ b/lib/kaffe/consumer_group/group_member_supervisor.ex @@ -32,7 +32,8 @@ defmodule Kaffe.GroupMemberSupervisor do end def start_worker_supervisor(supervisor_pid, subscriber_name) do - Supervisor.start_child(supervisor_pid, supervisor(Kaffe.WorkerSupervisor, [subscriber_name])) + link = Supervisor.start_link([{Kaffe.WorkerSupervisor, [subscriber_name]}], []) + Supervisor.start_child(supervisor_pid, link) end def start_group_member( @@ -42,25 +43,16 @@ defmodule Kaffe.GroupMemberSupervisor do worker_manager_pid, topic ) do - Supervisor.start_child( - supervisor_pid, - worker( - Kaffe.GroupMember, - [subscriber_name, consumer_group, worker_manager_pid, topic], - id: :"group_member_#{subscriber_name}_#{topic}" - ) + link = Supervisor.start_link( + {Kaffe.GroupMember, [subscriber_name, consumer_group, worker_manager_pid, topic]}, + id: :"group_member_#{subscriber_name}_#{topic}" ) + Supervisor.start_child(supervisor_pid, link) end def init(:ok) do Logger.info("event#starting=#{__MODULE__}") - - children = [ - worker(Kaffe.GroupManager, []) - ] - - # If we get a failure, we need to reset so the states are all consistent. - supervise(children, strategy: :one_for_all, max_restarts: 0, max_seconds: 1) + Supervisor.start_link(Kaffe.GroupManager, strategy: :one_for_all, max_restarts: 0, max_seconds: 1) end defp name do diff --git a/lib/kaffe/consumer_group/worker/worker_supervisor.ex b/lib/kaffe/consumer_group/worker/worker_supervisor.ex index a19bddb..c7e7e16 100644 --- a/lib/kaffe/consumer_group/worker/worker_supervisor.ex +++ b/lib/kaffe/consumer_group/worker/worker_supervisor.ex @@ -11,7 +11,8 @@ defmodule Kaffe.WorkerSupervisor do end def start_worker_manager(pid, subscriber_name) do - Supervisor.start_child(pid, worker(Kaffe.WorkerManager, [subscriber_name])) + link = Supervisor.start_link({Kaffe.WorkerManager, [subscriber_name]}, []) + Supervisor.start_child(pid, link) end def start_worker(pid, message_handler, subscriber_name, worker_name) do @@ -19,7 +20,7 @@ defmodule Kaffe.WorkerSupervisor do Supervisor.start_child( pid, - worker(Kaffe.Worker, [message_handler, subscriber_name, worker_name], + Supervisor.start_link({Kaffe.Worker, [message_handler, subscriber_name, worker_name]}, id: :"worker_#{subscriber_name}_#{worker_name}" ) ) @@ -33,7 +34,7 @@ defmodule Kaffe.WorkerSupervisor do # If anything fails, the state is inconsistent with the state of # `Kaffe.Subscriber` and `Kaffe.GroupMember`. We need the failure # to cascade all the way up so that they are terminated. - supervise(children, strategy: :one_for_all, max_restarts: 0, max_seconds: 1) + Supervisor.start_link(children, strategy: :one_for_all, max_restarts: 0, max_seconds: 1) end defp name(subscriber_name) do diff --git a/lib/kaffe/partition_selector.ex b/lib/kaffe/partition_selector.ex index e80f34d..b461e6b 100644 --- a/lib/kaffe/partition_selector.ex +++ b/lib/kaffe/partition_selector.ex @@ -25,7 +25,7 @@ defmodule Kaffe.PartitionSelector do end def random(total) do - :crypto.rand_uniform(0, total) + :rand.uniform(total) - 1 end def md5(key, total) do diff --git a/lib/kaffe/producer.ex b/lib/kaffe/producer.ex index b8792dd..80152e8 100644 --- a/lib/kaffe/producer.ex +++ b/lib/kaffe/producer.ex @@ -61,17 +61,15 @@ defmodule Kaffe.Producer do produce_list(topic, message_list, global_partition_strategy()) end - @doc """ - Synchronously produce the given `key`/`value` to the first Kafka topic. - - This is a simpler way to produce if you've only given Producer a single topic - for production and don't want to specify the topic for each call. - - Returns: - - * `:ok` on successfully producing the message - * `{:error, reason}` for any error - """ + # Synchronously produce the given `key`/`value` to the first Kafka topic. + # + # This is a simpler way to produce if you've only given Producer a single topic + # for production and don't want to specify the topic for each call. + # + # Returns: + # + # * `:ok` on successfully producing the message + # * `{:error, reason}` for any error def produce_sync(key, value) do topic = config().topics |> List.first() produce_value(topic, key, value) @@ -91,11 +89,10 @@ defmodule Kaffe.Producer do produce_list(topic, message_list, fn _, _, _, _ -> partition end) end - @doc """ - Synchronously produce the `key`/`value` to `topic` - See `produce_sync/2` for returns. - """ + # Synchronously produce the `key`/`value` to `topic` + # + # See `produce_sync/2` for returns. def produce_sync(topic, key, value) do produce_value(topic, key, value) end diff --git a/mix.exs b/mix.exs index 9b434ce..10d2e12 100644 --- a/mix.exs +++ b/mix.exs @@ -4,13 +4,13 @@ defmodule Kaffe.Mixfile do def project do [ app: :kaffe, - version: "1.20.0", + version: "2.0.0", description: "An opinionated Elixir wrapper around brod, the Erlang Kafka client, that supports encrypted connections to Heroku Kafka out of the box.", name: "Kaffe", source_url: "https://github.com/spreedly/kaffe", package: package(), - elixir: "~> 1.7", + elixir: "~> 1.12", build_embedded: Mix.env() == :prod, start_permanent: Mix.env() == :prod, elixirc_paths: elixirc_paths(Mix.env()), diff --git a/mix.lock b/mix.lock index c23b911..39c1ffe 100644 --- a/mix.lock +++ b/mix.lock @@ -1,14 +1,17 @@ %{ - "brod": {:hex, :brod, "3.2.0", "64f0778a7a32ec0a39cec9a564f4686bdfe72b147b48076e114a156fd0a30222", [:make, :rebar, :rebar3], [{:kafka_protocol, "1.1.0", [hex: :kafka_protocol, repo: "hexpm", optional: false]}, {:supervisor3, "1.1.5", [hex: :supervisor3, repo: "hexpm", optional: false]}], "hexpm", "50232a668d3b495f129e8d588fd1ebe36bdc0df3ff43a5b77a1c056bea1df032"}, + "brod": {:hex, :brod, "3.16.1", "1c7b03f99c7cc310de5511cadad9879ab0cc5f1a2612211e68c26dad517d31b0", [:rebar3], [{:kafka_protocol, "4.0.1", [hex: :kafka_protocol, repo: "hexpm", optional: false]}, {:snappyer, "1.2.8", [hex: :snappyer, repo: "hexpm", optional: false]}, {:supervisor3, "1.1.11", [hex: :supervisor3, repo: "hexpm", optional: false]}], "hexpm", "8297c47cd1ff0657955027fa1beb62edfaab1cc5e09b714cc29bd7f1c8d40083"}, + "crc32cer": {:hex, :crc32cer, "0.1.8", "c6c2275c5fb60a95f4935d414f30b50ee9cfed494081c9b36ebb02edfc2f48db", [:rebar3], [], "hexpm", "251499085482920deb6c9b7aadabf9fb4c432f96add97ab42aee4501e5b6f591"}, "earmark": {:hex, :earmark, "1.3.2", "b840562ea3d67795ffbb5bd88940b1bed0ed9fa32834915125ea7d02e35888a5", [:mix], [], "hexpm", "e3be2bc3ae67781db529b80aa7e7c49904a988596e2dbff897425b48b3581161"}, - "ex_doc": {:hex, :ex_doc, "0.20.2", "1bd0dfb0304bade58beb77f20f21ee3558cc3c753743ae0ddbb0fd7ba2912331", [:mix], [{:earmark, "~> 1.3", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.10", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "8e24fc8ff9a50b9f557ff020d6c91a03cded7e59ac3e0eec8a27e771430c7d27"}, - "kafka_protocol": {:hex, :kafka_protocol, "1.1.0", "817c07a6339cbfb32d1f20a588353bf8d9a8944df296eb2e930360b83760c171", [:rebar, :rebar3], [{:snappyer, "1.2.1", [hex: :snappyer, repo: "hexpm", optional: false]}], "hexpm", "9cbb8830c7bc0b9942e1f03b9b132d5ea548516a3a67249c156654f2a902a34f"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.17", "6f3c7e94170377ba45241d394389e800fb15adc5de51d0a3cd52ae766aafd63f", [:mix], [], "hexpm", "f93ac89c9feca61c165b264b5837bf82344d13bebc634cd575cb711e2e342023"}, + "ex_doc": {:hex, :ex_doc, "0.25.5", "ac3c5425a80b4b7c4dfecdf51fa9c23a44877124dd8ca34ee45ff608b1c6deb9", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "688cfa538cdc146bc4291607764a7f1fcfa4cce8009ecd62de03b27197528350"}, + "kafka_protocol": {:hex, :kafka_protocol, "4.0.1", "fc696880c73483c8b032c4bb60f2873046035c7824e1edcb924cfce643cf23dd", [:rebar3], [{:crc32cer, "0.1.8", [hex: :crc32cer, repo: "hexpm", optional: false]}], "hexpm", "687bfd9989998ec8fbbc3ed50d1239a6c07a7dc15b52914ad477413b89ecb621"}, "logfmt": {:hex, :logfmt, "3.2.0", "887a091adad28acc6e4d8b3d3bce177b934e7c61e7655c86946410f44aca6d84", [:mix], []}, - "makeup": {:hex, :makeup, "0.8.0", "9cf32aea71c7fe0a4b2e9246c2c4978f9070257e5c9ce6d4a28ec450a839b55f", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5fbc8e549aa9afeea2847c0769e3970537ed302f93a23ac612602e805d9d1e7f"}, - "makeup_elixir": {:hex, :makeup_elixir, "0.13.0", "be7a477997dcac2e48a9d695ec730b2d22418292675c75aa2d34ba0909dcdeda", [:mix], [{:makeup, "~> 0.8", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "adf0218695e22caeda2820eaba703fa46c91820d53813a2223413da3ef4ba515"}, + "makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.15.2", "dc72dfe17eb240552857465cc00cce390960d9a0c055c4ccd38b70629227e97c", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "fd23ae48d09b32eff49d4ced2b43c9f086d402ee4fd4fcb2d7fad97fa8823e75"}, + "makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"}, "metrix": {:git, "https://github.com/rwdaigle/metrix.git", "a6738df9346da0412ca68f82a24a67d2a32b066e", [branch: "master"]}, - "nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm", "5c040b8469c1ff1b10093d3186e2e10dbe483cd73d79ec017993fb3985b8a9b3"}, + "nimble_parsec": {:hex, :nimble_parsec, "1.2.0", "b44d75e2a6542dcb6acf5d71c32c74ca88960421b6874777f79153bbbbd7dccc", [:mix], [], "hexpm", "52b2871a7515a5ac49b00f214e4165a40724cf99798d8e4a65e4fd64ebd002c1"}, "retry": {:hex, :retry, "0.14.1", "722d1b0cf87096b71213f5801d99fface7ca76adc83fc9dbf3e1daee952aef10", [:mix], [], "hexpm", "b3a609f286f6fe4f6b2c15f32cd4a8a60427d78d05d7b68c2dd9110981111ae0"}, - "snappyer": {:hex, :snappyer, "1.2.1", "06c5f5c8afe80ba38e94e1ca1bd9253de95d8f2c85b08783e8d0f63815580556", [:make, :rebar, :rebar3], [], "hexpm", "e09171f1c7106d4082db88a409d5648425b3699d55319c2cd09c4bb8cd1ba8a2"}, - "supervisor3": {:hex, :supervisor3, "1.1.5", "5f3c487a6eba23de0e64c06e93efa0eca06f40324a6412c1318c77aca6da8424", [:make, :rebar, :rebar3], [], "hexpm", "e6f489d6b819df4d8f202eb00a77515a949bf87dae4d0a060f534722a63d8977"}, + "snappyer": {:hex, :snappyer, "1.2.8", "201ce9067a33c71a6a5087c0c3a49a010b17112d461e6df696c722dcb6d0934a", [:rebar3], [], "hexpm", "35518e79a28548b56d8fd6aee2f565f12f51c2d3d053f9cfa817c83be88c4f3d"}, + "supervisor3": {:hex, :supervisor3, "1.1.11", "d81cdec31d102fde407423e1d05b569572850deebed86b951d5233c387cba80b", [:rebar3], [], "hexpm", "e6c2dedbcabcba24995a218aca12db5e208b80d3252692b22ef0f1a266104b50"}, }