Skip to content

Commit

Permalink
added consumer test results
Browse files Browse the repository at this point in the history
  • Loading branch information
lkumarjain committed Jan 25, 2024
1 parent 0a25f47 commit dc5c567
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 44 deletions.
2 changes: 1 addition & 1 deletion kafka-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Benchmarks of kafka client libraries for Golang.

```bash
go test -timeout=5h -bench=Producer -benchmem -count 5 -benchtime=10000x > results/producer_results.out
go test -timeout=5h -bench=Consumer -benchmem -count 5 -benchtime=10000x > results/consumer_results.out
go test -timeout=5h -bench=Consumer -benchmem -count 5 -benchtime=100000x > results/consumer_results.out
```

## Docker File
Expand Down
44 changes: 25 additions & 19 deletions kafka-client/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func BenchmarkFranzConsumer(b *testing.B) {
go consumer.Start(wg)
wg.Wait()

b.Run("Confluent@ConsumePartition", func(b *testing.B) {
b.Run("Franz@ConsumePartition", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
<-consumer.Message
Expand Down Expand Up @@ -104,19 +104,21 @@ func BenchmarkSaramaConsumer(b *testing.B) {
consumer := sarama.Consumer{Servers: bootstrapServers, Topic: topicName, EnablePartition: false}

wg := &sync.WaitGroup{}
// wg.Add(1)
// go consumer.Start(wg)
// wg.Wait()
wg.Add(1)
go consumer.Start(wg)
wg.Wait()

// b.Run("Sarama@ConsumerGroup", func(b *testing.B) {
// b.ResetTimer()
// for i := 0; i < b.N; i++ {
// <-consumer.Message
// }
// b.StopTimer()
// })
<-consumer.Message // Added this to wait till Consumer gets ready

// close(consumer.Done)
b.Run("Sarama@ConsumerGroup", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
<-consumer.Message
}
b.StopTimer()
})

close(consumer.Done)

consumer.EnablePartition = true

Expand All @@ -136,18 +138,22 @@ func BenchmarkSaramaConsumer(b *testing.B) {
}

func BenchmarkSegmentioConsumer(b *testing.B) {
consumer := segmentio.NewConsumer(bootstrapServers, topicName)
message := make(chan interface{}, 1)
done := make(chan bool, 1)
go consumer.Consume(message, done)
consumer := segmentio.Consumer{Servers: bootstrapServers, Topic: topicName, EnablePartition: false}

b.Run("Segmentio@Consumer", func(b *testing.B) {
wg := &sync.WaitGroup{}
wg.Add(1)
go consumer.Start(wg)
wg.Wait()

<-consumer.Message

b.Run("Segmentio@ConsumerFetch", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
<-message
<-consumer.Message
}
b.StopTimer()
})

done <- true
close(consumer.Done)
}
12 changes: 10 additions & 2 deletions kafka-client/goka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package goka

import (
"context"
"fmt"
"io"
"log"
"strings"
"sync"
Expand Down Expand Up @@ -32,13 +34,19 @@ func (c *Consumer) Start(wg *sync.WaitGroup) {
goka.ReplaceGlobalConfig(cfg)

topicStream := goka.Stream(c.Topic)
g := goka.DefineGroup("goka-consumer-group", goka.Input(topicStream, new(codec.String), c.handler), goka.Persist(new(codec.Int64)))
group := goka.Group(fmt.Sprintf("goka-consumer-group-%d", time.Now().UnixNano()))
g := goka.DefineGroup(group,
goka.Input(topicStream, new(codec.String), c.handler), goka.Persist(new(codec.Int64)))

config := goka.NewTopicManagerConfig()
config.Table.Replication = 1
config.CreateTopicTimeout = time.Second * 10

p, err := goka.NewProcessor(brokers, g, goka.WithTopicManagerBuilder(goka.TopicManagerBuilderWithTopicManagerConfig(config)))
log := log.New(io.Discard, "", log.LstdFlags)

p, err := goka.NewProcessor(brokers, g,
goka.WithTopicManagerBuilder(goka.TopicManagerBuilderWithTopicManagerConfig(config)),
goka.WithLogger(log))
if err != nil {
log.Fatalf("error creating processor: %v", err)
wg.Done()
Expand Down
46 changes: 46 additions & 0 deletions kafka-client/results/consumer_results.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
goos: linux
goarch: amd64
pkg: github.com/lkumarjain/benchmark/kafka-client
cpu: Intel(R) Core(TM) i5-8300H CPU @ 2.30GHz
BenchmarkConfluentConsumer/Confluent@ConsumePoll-8 100000 9837 ns/op 1477 B/op 9 allocs/op
BenchmarkConfluentConsumer/Confluent@ConsumePoll-8 100000 8103 ns/op 1479 B/op 8 allocs/op
BenchmarkConfluentConsumer/Confluent@ConsumePoll-8 100000 7368 ns/op 1507 B/op 9 allocs/op
BenchmarkConfluentConsumer/Confluent@ConsumePoll-8 100000 33209 ns/op 5586 B/op 9 allocs/op
BenchmarkConfluentConsumer/Confluent@ConsumePoll-8 100000 10402 ns/op 1535 B/op 9 allocs/op
BenchmarkConfluentConsumer/Confluent@ConsumeEvent-8 100000 9457 ns/op 1469 B/op 8 allocs/op
BenchmarkConfluentConsumer/Confluent@ConsumeEvent-8 100000 7947 ns/op 1469 B/op 8 allocs/op
BenchmarkConfluentConsumer/Confluent@ConsumeEvent-8 100000 7804 ns/op 1493 B/op 8 allocs/op
BenchmarkConfluentConsumer/Confluent@ConsumeEvent-8 100000 33654 ns/op 5571 B/op 8 allocs/op
BenchmarkConfluentConsumer/Confluent@ConsumeEvent-8 100000 9892 ns/op 1525 B/op 8 allocs/op
BenchmarkFranzConsumer/Franz@ConsumeRecord-8 100000 6555 ns/op 1248 B/op 1 allocs/op
BenchmarkFranzConsumer/Franz@ConsumeRecord-8 100000 8415 ns/op 1674 B/op 2 allocs/op
BenchmarkFranzConsumer/Franz@ConsumeRecord-8 100000 8823 ns/op 1688 B/op 2 allocs/op
BenchmarkFranzConsumer/Franz@ConsumeRecord-8 100000 34456 ns/op 5985 B/op 1 allocs/op
BenchmarkFranzConsumer/Franz@ConsumeRecord-8 100000 9891 ns/op 2051 B/op 2 allocs/op
BenchmarkFranzConsumer/Franz@ConsumePartition-8 100000 6612 ns/op 1559 B/op 0 allocs/op
BenchmarkFranzConsumer/Franz@ConsumePartition-8 100000 12197 ns/op 1650 B/op 1 allocs/op
BenchmarkFranzConsumer/Franz@ConsumePartition-8 100000 12594 ns/op 1664 B/op 1 allocs/op
BenchmarkFranzConsumer/Franz@ConsumePartition-8 100000 48006 ns/op 5961 B/op 0 allocs/op
BenchmarkFranzConsumer/Franz@ConsumePartition-8 100000 11124 ns/op 2027 B/op 1 allocs/op
BenchmarkGokaConsumer/Goka@Consumer-8 100000 395.0 ns/op 0 B/op 0 allocs/op
BenchmarkGokaConsumer/Goka@Consumer-8 100000 428.3 ns/op 0 B/op 0 allocs/op
BenchmarkGokaConsumer/Goka@Consumer-8 100000 401.5 ns/op 0 B/op 0 allocs/op
BenchmarkGokaConsumer/Goka@Consumer-8 100000 414.1 ns/op 0 B/op 0 allocs/op
BenchmarkGokaConsumer/Goka@Consumer-8 100000 461.6 ns/op 0 B/op 0 allocs/op
BenchmarkSaramaConsumer/Sarama@ConsumerGroup-8 100000 582.9 ns/op 1 B/op 0 allocs/op
BenchmarkSaramaConsumer/Sarama@ConsumerGroup-8 100000 436.8 ns/op 335 B/op 0 allocs/op
BenchmarkSaramaConsumer/Sarama@ConsumerGroup-8 100000 498.4 ns/op 0 B/op 0 allocs/op
BenchmarkSaramaConsumer/Sarama@ConsumerGroup-8 100000 518.1 ns/op 0 B/op 0 allocs/op
BenchmarkSaramaConsumer/Sarama@ConsumerGroup-8 100000 564.5 ns/op 0 B/op 0 allocs/op
BenchmarkSaramaConsumer/Sarama@ConsumePartition-8 100000 6338 ns/op 1574 B/op 0 allocs/op
BenchmarkSaramaConsumer/Sarama@ConsumePartition-8 100000 7987 ns/op 1335 B/op 1 allocs/op
BenchmarkSaramaConsumer/Sarama@ConsumePartition-8 100000 8159 ns/op 1346 B/op 1 allocs/op
BenchmarkSaramaConsumer/Sarama@ConsumePartition-8 100000 32875 ns/op 5983 B/op 1 allocs/op
BenchmarkSaramaConsumer/Sarama@ConsumePartition-8 100000 10353 ns/op 1719 B/op 1 allocs/op
BenchmarkSegmentioConsumer/Segmentio@ConsumerFetch-8 100000 4365 ns/op 1635 B/op 3 allocs/op
BenchmarkSegmentioConsumer/Segmentio@ConsumerFetch-8 100000 4537 ns/op 1489 B/op 3 allocs/op
BenchmarkSegmentioConsumer/Segmentio@ConsumerFetch-8 100000 4405 ns/op 1385 B/op 3 allocs/op
BenchmarkSegmentioConsumer/Segmentio@ConsumerFetch-8 100000 16062 ns/op 5972 B/op 3 allocs/op
BenchmarkSegmentioConsumer/Segmentio@ConsumerFetch-8 100000 4410 ns/op 1663 B/op 3 allocs/op
PASS
ok github.com/lkumarjain/benchmark/kafka-client 46.786s
13 changes: 4 additions & 9 deletions kafka-client/sarama/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strings"
"sync"
"time"

"github.com/IBM/sarama"
)
Expand Down Expand Up @@ -75,18 +76,16 @@ func (c *Consumer) consumePartition(wg *sync.WaitGroup, cfg *sarama.Config) {
func (c *Consumer) consumeGroup(wg *sync.WaitGroup, cfg *sarama.Config) {
brokers := strings.Split(c.Servers, ",")

group, err := sarama.NewConsumerGroup(brokers, "sarama-consumer-group", cfg)
group, err := sarama.NewConsumerGroup(brokers, fmt.Sprintf("sarama-consumer-group-%d", time.Now().UnixNano()), cfg)

if err != nil {
fmt.Printf("Failed to create consumer: %v\n", err)
wg.Done()
return
}

ready := make(chan bool, 1)
handler := handler{message: c.Message, done: c.Done, ready: ready}
handler := handler{message: c.Message, done: c.Done}

<-ready
wg.Done()
run := true

Expand All @@ -103,13 +102,9 @@ func (c *Consumer) consumeGroup(wg *sync.WaitGroup, cfg *sarama.Config) {
type handler struct {
message chan interface{}
done chan bool
ready chan bool
}

func (h handler) Setup(sarama.ConsumerGroupSession) error {
close(h.ready)
return nil
}
func (h handler) Setup(sarama.ConsumerGroupSession) error { return nil }

func (handler) Cleanup(sarama.ConsumerGroupSession) error { return nil }

Expand Down
31 changes: 18 additions & 13 deletions kafka-client/segmentio/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,43 @@ package segmentio

import (
"context"
"fmt"
"strings"
"sync"
"time"

kafka "github.com/segmentio/kafka-go"
)

type Consumer struct {
BootstrapServers string
reader *kafka.Reader
Servers string
Topic string
EnablePartition bool
Message chan interface{}
Done chan bool
}

func NewConsumer(bootstrapServers string, topic string) *Consumer {
brokers := strings.Split(bootstrapServers, ",")
func (c *Consumer) Start(wg *sync.WaitGroup) {
c.Message = make(chan interface{}, 1)
c.Done = make(chan bool, 1)

reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
GroupID: "segmentio-consumer-group",
Brokers: strings.Split(c.Servers, ","),
Topic: c.Topic,
GroupID: fmt.Sprintf("segmentio-consumer-group-%d", time.Now().UnixNano()),
})

return &Consumer{BootstrapServers: bootstrapServers, reader: reader}
}
wg.Done()

func (c *Consumer) Consume(message chan interface{}, done chan bool) {
run := true

for run {
select {
case <-done:
case <-c.Done:
run = false
default:
msg, _ := c.reader.FetchMessage(context.Background())
message <- msg
msg, _ := reader.FetchMessage(context.Background())
c.Message <- msg
}
}
}

0 comments on commit dc5c567

Please sign in to comment.