From dc5c5671031d73815bc1ba965251e7e7e2c00ff4 Mon Sep 17 00:00:00 2001 From: Lokesh Jain Date: Thu, 25 Jan 2024 23:48:20 +0530 Subject: [PATCH] added consumer test results --- kafka-client/README.md | 2 +- kafka-client/consumer_test.go | 44 ++++++++++++---------- kafka-client/goka/consumer.go | 12 +++++- kafka-client/results/consumer_results.out | 46 +++++++++++++++++++++++ kafka-client/sarama/consumer.go | 13 ++----- kafka-client/segmentio/consumer.go | 31 ++++++++------- 6 files changed, 104 insertions(+), 44 deletions(-) create mode 100644 kafka-client/results/consumer_results.out diff --git a/kafka-client/README.md b/kafka-client/README.md index 7d26316..0c75fc2 100644 --- a/kafka-client/README.md +++ b/kafka-client/README.md @@ -6,7 +6,7 @@ Benchmarks of kafka client libraries for Golang. ```bash go test -timeout=5h -bench=Producer -benchmem -count 5 -benchtime=10000x > results/producer_results.out -go test -timeout=5h -bench=Consumer -benchmem -count 5 -benchtime=10000x > results/consumer_results.out +go test -timeout=5h -bench=Consumer -benchmem -count 5 -benchtime=100000x > results/consumer_results.out ``` ## Docker File diff --git a/kafka-client/consumer_test.go b/kafka-client/consumer_test.go index df9005a..264fd1e 100644 --- a/kafka-client/consumer_test.go +++ b/kafka-client/consumer_test.go @@ -70,7 +70,7 @@ func BenchmarkFranzConsumer(b *testing.B) { go consumer.Start(wg) wg.Wait() - b.Run("Confluent@ConsumePartition", func(b *testing.B) { + b.Run("Franz@ConsumePartition", func(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { <-consumer.Message @@ -104,19 +104,21 @@ func BenchmarkSaramaConsumer(b *testing.B) { consumer := sarama.Consumer{Servers: bootstrapServers, Topic: topicName, EnablePartition: false} wg := &sync.WaitGroup{} - // wg.Add(1) - // go consumer.Start(wg) - // wg.Wait() + wg.Add(1) + go consumer.Start(wg) + wg.Wait() - // b.Run("Sarama@ConsumerGroup", func(b *testing.B) { - // b.ResetTimer() - // for i := 0; i < b.N; i++ { - // <-consumer.Message - // } - // b.StopTimer() - // }) + <-consumer.Message // Added this to wait till Consumer gets ready - // close(consumer.Done) + b.Run("Sarama@ConsumerGroup", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + <-consumer.Message + } + b.StopTimer() + }) + + close(consumer.Done) consumer.EnablePartition = true @@ -136,18 +138,22 @@ func BenchmarkSaramaConsumer(b *testing.B) { } func BenchmarkSegmentioConsumer(b *testing.B) { - consumer := segmentio.NewConsumer(bootstrapServers, topicName) - message := make(chan interface{}, 1) - done := make(chan bool, 1) - go consumer.Consume(message, done) + consumer := segmentio.Consumer{Servers: bootstrapServers, Topic: topicName, EnablePartition: false} - b.Run("Segmentio@Consumer", func(b *testing.B) { + wg := &sync.WaitGroup{} + wg.Add(1) + go consumer.Start(wg) + wg.Wait() + + <-consumer.Message + + b.Run("Segmentio@ConsumerFetch", func(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - <-message + <-consumer.Message } b.StopTimer() }) - done <- true + close(consumer.Done) } diff --git a/kafka-client/goka/consumer.go b/kafka-client/goka/consumer.go index e288a82..b9fa251 100644 --- a/kafka-client/goka/consumer.go +++ b/kafka-client/goka/consumer.go @@ -2,6 +2,8 @@ package goka import ( "context" + "fmt" + "io" "log" "strings" "sync" @@ -32,13 +34,19 @@ func (c *Consumer) Start(wg *sync.WaitGroup) { goka.ReplaceGlobalConfig(cfg) topicStream := goka.Stream(c.Topic) - g := goka.DefineGroup("goka-consumer-group", goka.Input(topicStream, new(codec.String), c.handler), goka.Persist(new(codec.Int64))) + group := goka.Group(fmt.Sprintf("goka-consumer-group-%d", time.Now().UnixNano())) + g := goka.DefineGroup(group, + goka.Input(topicStream, new(codec.String), c.handler), goka.Persist(new(codec.Int64))) config := goka.NewTopicManagerConfig() config.Table.Replication = 1 config.CreateTopicTimeout = time.Second * 10 - p, err := goka.NewProcessor(brokers, g, goka.WithTopicManagerBuilder(goka.TopicManagerBuilderWithTopicManagerConfig(config))) + log := log.New(io.Discard, "", log.LstdFlags) + + p, err := goka.NewProcessor(brokers, g, + goka.WithTopicManagerBuilder(goka.TopicManagerBuilderWithTopicManagerConfig(config)), + goka.WithLogger(log)) if err != nil { log.Fatalf("error creating processor: %v", err) wg.Done() diff --git a/kafka-client/results/consumer_results.out b/kafka-client/results/consumer_results.out new file mode 100644 index 0000000..957b707 --- /dev/null +++ b/kafka-client/results/consumer_results.out @@ -0,0 +1,46 @@ +goos: linux +goarch: amd64 +pkg: github.com/lkumarjain/benchmark/kafka-client +cpu: Intel(R) Core(TM) i5-8300H CPU @ 2.30GHz +BenchmarkConfluentConsumer/Confluent@ConsumePoll-8 100000 9837 ns/op 1477 B/op 9 allocs/op +BenchmarkConfluentConsumer/Confluent@ConsumePoll-8 100000 8103 ns/op 1479 B/op 8 allocs/op +BenchmarkConfluentConsumer/Confluent@ConsumePoll-8 100000 7368 ns/op 1507 B/op 9 allocs/op +BenchmarkConfluentConsumer/Confluent@ConsumePoll-8 100000 33209 ns/op 5586 B/op 9 allocs/op +BenchmarkConfluentConsumer/Confluent@ConsumePoll-8 100000 10402 ns/op 1535 B/op 9 allocs/op +BenchmarkConfluentConsumer/Confluent@ConsumeEvent-8 100000 9457 ns/op 1469 B/op 8 allocs/op +BenchmarkConfluentConsumer/Confluent@ConsumeEvent-8 100000 7947 ns/op 1469 B/op 8 allocs/op +BenchmarkConfluentConsumer/Confluent@ConsumeEvent-8 100000 7804 ns/op 1493 B/op 8 allocs/op +BenchmarkConfluentConsumer/Confluent@ConsumeEvent-8 100000 33654 ns/op 5571 B/op 8 allocs/op +BenchmarkConfluentConsumer/Confluent@ConsumeEvent-8 100000 9892 ns/op 1525 B/op 8 allocs/op +BenchmarkFranzConsumer/Franz@ConsumeRecord-8 100000 6555 ns/op 1248 B/op 1 allocs/op +BenchmarkFranzConsumer/Franz@ConsumeRecord-8 100000 8415 ns/op 1674 B/op 2 allocs/op +BenchmarkFranzConsumer/Franz@ConsumeRecord-8 100000 8823 ns/op 1688 B/op 2 allocs/op +BenchmarkFranzConsumer/Franz@ConsumeRecord-8 100000 34456 ns/op 5985 B/op 1 allocs/op +BenchmarkFranzConsumer/Franz@ConsumeRecord-8 100000 9891 ns/op 2051 B/op 2 allocs/op +BenchmarkFranzConsumer/Franz@ConsumePartition-8 100000 6612 ns/op 1559 B/op 0 allocs/op +BenchmarkFranzConsumer/Franz@ConsumePartition-8 100000 12197 ns/op 1650 B/op 1 allocs/op +BenchmarkFranzConsumer/Franz@ConsumePartition-8 100000 12594 ns/op 1664 B/op 1 allocs/op +BenchmarkFranzConsumer/Franz@ConsumePartition-8 100000 48006 ns/op 5961 B/op 0 allocs/op +BenchmarkFranzConsumer/Franz@ConsumePartition-8 100000 11124 ns/op 2027 B/op 1 allocs/op +BenchmarkGokaConsumer/Goka@Consumer-8 100000 395.0 ns/op 0 B/op 0 allocs/op +BenchmarkGokaConsumer/Goka@Consumer-8 100000 428.3 ns/op 0 B/op 0 allocs/op +BenchmarkGokaConsumer/Goka@Consumer-8 100000 401.5 ns/op 0 B/op 0 allocs/op +BenchmarkGokaConsumer/Goka@Consumer-8 100000 414.1 ns/op 0 B/op 0 allocs/op +BenchmarkGokaConsumer/Goka@Consumer-8 100000 461.6 ns/op 0 B/op 0 allocs/op +BenchmarkSaramaConsumer/Sarama@ConsumerGroup-8 100000 582.9 ns/op 1 B/op 0 allocs/op +BenchmarkSaramaConsumer/Sarama@ConsumerGroup-8 100000 436.8 ns/op 335 B/op 0 allocs/op +BenchmarkSaramaConsumer/Sarama@ConsumerGroup-8 100000 498.4 ns/op 0 B/op 0 allocs/op +BenchmarkSaramaConsumer/Sarama@ConsumerGroup-8 100000 518.1 ns/op 0 B/op 0 allocs/op +BenchmarkSaramaConsumer/Sarama@ConsumerGroup-8 100000 564.5 ns/op 0 B/op 0 allocs/op +BenchmarkSaramaConsumer/Sarama@ConsumePartition-8 100000 6338 ns/op 1574 B/op 0 allocs/op +BenchmarkSaramaConsumer/Sarama@ConsumePartition-8 100000 7987 ns/op 1335 B/op 1 allocs/op +BenchmarkSaramaConsumer/Sarama@ConsumePartition-8 100000 8159 ns/op 1346 B/op 1 allocs/op +BenchmarkSaramaConsumer/Sarama@ConsumePartition-8 100000 32875 ns/op 5983 B/op 1 allocs/op +BenchmarkSaramaConsumer/Sarama@ConsumePartition-8 100000 10353 ns/op 1719 B/op 1 allocs/op +BenchmarkSegmentioConsumer/Segmentio@ConsumerFetch-8 100000 4365 ns/op 1635 B/op 3 allocs/op +BenchmarkSegmentioConsumer/Segmentio@ConsumerFetch-8 100000 4537 ns/op 1489 B/op 3 allocs/op +BenchmarkSegmentioConsumer/Segmentio@ConsumerFetch-8 100000 4405 ns/op 1385 B/op 3 allocs/op +BenchmarkSegmentioConsumer/Segmentio@ConsumerFetch-8 100000 16062 ns/op 5972 B/op 3 allocs/op +BenchmarkSegmentioConsumer/Segmentio@ConsumerFetch-8 100000 4410 ns/op 1663 B/op 3 allocs/op +PASS +ok github.com/lkumarjain/benchmark/kafka-client 46.786s diff --git a/kafka-client/sarama/consumer.go b/kafka-client/sarama/consumer.go index 776b796..bc6fa7a 100644 --- a/kafka-client/sarama/consumer.go +++ b/kafka-client/sarama/consumer.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" "sync" + "time" "github.com/IBM/sarama" ) @@ -75,7 +76,7 @@ func (c *Consumer) consumePartition(wg *sync.WaitGroup, cfg *sarama.Config) { func (c *Consumer) consumeGroup(wg *sync.WaitGroup, cfg *sarama.Config) { brokers := strings.Split(c.Servers, ",") - group, err := sarama.NewConsumerGroup(brokers, "sarama-consumer-group", cfg) + group, err := sarama.NewConsumerGroup(brokers, fmt.Sprintf("sarama-consumer-group-%d", time.Now().UnixNano()), cfg) if err != nil { fmt.Printf("Failed to create consumer: %v\n", err) @@ -83,10 +84,8 @@ func (c *Consumer) consumeGroup(wg *sync.WaitGroup, cfg *sarama.Config) { return } - ready := make(chan bool, 1) - handler := handler{message: c.Message, done: c.Done, ready: ready} + handler := handler{message: c.Message, done: c.Done} - <-ready wg.Done() run := true @@ -103,13 +102,9 @@ func (c *Consumer) consumeGroup(wg *sync.WaitGroup, cfg *sarama.Config) { type handler struct { message chan interface{} done chan bool - ready chan bool } -func (h handler) Setup(sarama.ConsumerGroupSession) error { - close(h.ready) - return nil -} +func (h handler) Setup(sarama.ConsumerGroupSession) error { return nil } func (handler) Cleanup(sarama.ConsumerGroupSession) error { return nil } diff --git a/kafka-client/segmentio/consumer.go b/kafka-client/segmentio/consumer.go index 74d0f81..67164bc 100644 --- a/kafka-client/segmentio/consumer.go +++ b/kafka-client/segmentio/consumer.go @@ -2,38 +2,43 @@ package segmentio import ( "context" + "fmt" "strings" + "sync" + "time" kafka "github.com/segmentio/kafka-go" ) type Consumer struct { - BootstrapServers string - reader *kafka.Reader + Servers string + Topic string + EnablePartition bool + Message chan interface{} + Done chan bool } -func NewConsumer(bootstrapServers string, topic string) *Consumer { - brokers := strings.Split(bootstrapServers, ",") +func (c *Consumer) Start(wg *sync.WaitGroup) { + c.Message = make(chan interface{}, 1) + c.Done = make(chan bool, 1) reader := kafka.NewReader(kafka.ReaderConfig{ - Brokers: brokers, - Topic: topic, - GroupID: "segmentio-consumer-group", + Brokers: strings.Split(c.Servers, ","), + Topic: c.Topic, + GroupID: fmt.Sprintf("segmentio-consumer-group-%d", time.Now().UnixNano()), }) - return &Consumer{BootstrapServers: bootstrapServers, reader: reader} -} + wg.Done() -func (c *Consumer) Consume(message chan interface{}, done chan bool) { run := true for run { select { - case <-done: + case <-c.Done: run = false default: - msg, _ := c.reader.FetchMessage(context.Background()) - message <- msg + msg, _ := reader.FetchMessage(context.Background()) + c.Message <- msg } } }