Skip to content

Commit

Permalink
added Franz and Goka
Browse files Browse the repository at this point in the history
  • Loading branch information
lkumarjain committed Jan 16, 2024
1 parent 78995b8 commit 8c1f146
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 15 deletions.
16 changes: 11 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
github.com/robertkrimen/otto v0.2.1
github.com/segmentio/kafka-go v0.4.47
github.com/skx/evalfilter/v2 v2.1.19
github.com/twmb/franz-go v1.15.4
go.starlark.net v0.0.0-20230128213706-3f75dec8e403
)

Expand All @@ -36,7 +37,9 @@ require (
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/go-stack/stack v1.8.1 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
Expand All @@ -45,9 +48,11 @@ require (
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pierrec/lz4/v4 v4.1.19 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
golang.org/x/crypto v0.14.0 // indirect
github.com/syndtr/goleveldb v1.0.0 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.7.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/net v0.17.0 // indirect
)

Expand All @@ -63,14 +68,15 @@ require (
github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/lovoo/goka v1.1.11
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/mitchellh/pointerstructure v1.2.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
golang.org/x/sync v0.4.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto v0.0.0-20221027153422-115e99e71e1c // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 // indirect
Expand Down
49 changes: 42 additions & 7 deletions go.sum

Large diffs are not rendered by default.

44 changes: 44 additions & 0 deletions kafka-client/franz/producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package franz

import (
"context"
"strings"
"sync"

"github.com/twmb/franz-go/pkg/kgo"
)

type Producer struct {
BootstrapServers []string
wg *sync.WaitGroup
instance *kgo.Client
}

func NewProducer(bootstrapServers string) *Producer {
brokers := strings.Split(bootstrapServers, ",")
instance, err := kgo.NewClient(kgo.SeedBrokers(brokers...))
if err != nil {
panic(err)
}

producer := &Producer{BootstrapServers: brokers, wg: &sync.WaitGroup{}, instance: instance}

return producer
}

func (p *Producer) Produce(topic string, key string, value string) {
p.instance.ProduceSync(context.Background(), &kgo.Record{Topic: topic, Key: []byte(key), Value: []byte(value)})
}

func (p *Producer) ProduceChannel(topic string, key string, value string) {
p.wg.Add(1)
p.instance.Produce(context.Background(), &kgo.Record{Topic: topic, Key: []byte(key), Value: []byte(value)}, p.DeliveryReport)
}

func (p *Producer) DeliveryReport(_ *kgo.Record, _ error) {
p.wg.Done()
}

func (p *Producer) Wait() {
p.wg.Wait()
}
51 changes: 51 additions & 0 deletions kafka-client/goka/producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package goka

import (
"log"
"strings"
"sync"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
)

type Producer struct {
BootstrapServers []string
wg *sync.WaitGroup
instance *goka.Emitter
}

func NewProducer(bootstrapServers string, topic string) *Producer {
brokers := strings.Split(bootstrapServers, ",")
emitter, err := goka.NewEmitter(brokers, goka.Stream(topic), new(codec.String))
if err != nil {
log.Fatalf("error creating emitter: %v", err)
}

producer := &Producer{BootstrapServers: brokers, wg: &sync.WaitGroup{}, instance: emitter}

return producer
}

func (p *Producer) Produce(key string, value string) {
p.instance.EmitSync(key, value)
}

func (p *Producer) ProduceChannel(key string, value string) {
p.wg.Add(1)
promise, err := p.instance.Emit(key, value)
if err != nil {
p.wg.Done()
return
}

promise.Then(p.DeliveryReport)
}

func (p *Producer) DeliveryReport(err error) {
defer p.wg.Done()
}

func (p *Producer) Wait() {
p.wg.Wait()
}
2 changes: 1 addition & 1 deletion kafka-client/payload.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package kafkaclients
package kafkaclient

const (
// 1 KB
Expand Down
52 changes: 51 additions & 1 deletion kafka-client/producer_test.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package kafkaclients
package kafkaclient

import (
"testing"

"github.com/lkumarjain/benchmark/kafka-client/confluent"
"github.com/lkumarjain/benchmark/kafka-client/franz"
"github.com/lkumarjain/benchmark/kafka-client/goka"
"github.com/lkumarjain/benchmark/kafka-client/sarama"
"github.com/lkumarjain/benchmark/kafka-client/segmentio"
)

func BenchmarkProducer(b *testing.B) {
for _, tt := range tests {
benchmarkConfluentProducer(b, tt.name, tt.valueGenerator)
benchmarkFranzProducer(b, tt.name, tt.valueGenerator)
benchmarkGokaProducer(b, tt.name, tt.valueGenerator)
benchmarkSaramaProducer(b, tt.name, tt.valueGenerator)
benchmarkSegmentioProducer(b, tt.name, tt.valueGenerator)
}
Expand Down Expand Up @@ -38,6 +42,52 @@ func benchmarkConfluentProducer(b *testing.B, prefix string, valueGenerator func
})
}

func benchmarkFranzProducer(b *testing.B, prefix string, valueGenerator func(int) string) {
producer := franz.NewProducer(bootstrapServers)

b.Run(testName(prefix, "Franz@Produce"), func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
producer.Produce(topicName, generateKey(prefix, i), valueGenerator(i))
}

b.StopTimer()
})

b.Run(testName(prefix, "Franz@ProduceChannel"), func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
producer.ProduceChannel(topicName, generateKey(prefix, i), valueGenerator(i))
}

producer.Wait()
b.StopTimer()
})
}

func benchmarkGokaProducer(b *testing.B, prefix string, valueGenerator func(int) string) {
producer := goka.NewProducer(bootstrapServers, topicName)

b.Run(testName(prefix, "Goka@Produce"), func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
producer.Produce(generateKey(prefix, i), valueGenerator(i))
}

b.StopTimer()
})

b.Run(testName(prefix, "Goka@ProduceChannel"), func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
producer.ProduceChannel(generateKey(prefix, i), valueGenerator(i))
}

producer.Wait()
b.StopTimer()
})
}

func benchmarkSaramaProducer(b *testing.B, prefix string, valueGenerator func(int) string) {
producer := sarama.NewProducer(bootstrapServers)

Expand Down
2 changes: 1 addition & 1 deletion kafka-client/test_case.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package kafkaclients
package kafkaclient

import "fmt"

Expand Down

0 comments on commit 8c1f146

Please sign in to comment.