Skip to content

Commit

Permalink
feat: add FetchMessage and CommitMessages functions (#9)
Browse files Browse the repository at this point in the history
* feat: add FetchMessage and CommitMessages functions

* feat: add manuel commit support

* feat: add integration test

* chore: dummy commit

* chore: add manual commit to the readme

* fix: add fetchSpan end on consumer close

* chore: disable lint for example folder

* chore: remove nolint statements within example folder

* chore: disable gocritic for example folder

* chore: dummy commit

---------

Co-authored-by: Abdulsametileri <sametileri07@gmail.com>
  • Loading branch information
mhmtszr and Abdulsametileri authored Dec 7, 2023
1 parent dae8e8b commit 529dd2c
Show file tree
Hide file tree
Showing 10 changed files with 229 additions and 22 deletions.
Binary file added .github/images/consumer-with-manual-commit.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 5 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ issues:
- linters:
- stylecheck
text: "ST1001:"
- path: example
linters:
- errcheck
- funlen
- gocritic
- path: _test\.go
linters:
- errcheck
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ docker-compose up --build

![Consuming Example](.github/images/consumer-example.png)

## Consuming With Manual Commit

![Consuming Example](.github/images/consumer-with-manual-commit.png)

## Bring it all together

You can run producer and consumer, respectively, to see that they work together.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func main() {
),
)
if err != nil {
log.Fatal(err.Error()) //nolint:gocritic
log.Fatal(err.Error())
}

for {
Expand Down
93 changes: 93 additions & 0 deletions example/consumer-with-manualcommit/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package main

import (
"context"
"fmt"
"log"
"time"

otelkafkakonsumer "github.com/Trendyol/otel-kafka-konsumer"
"github.com/segmentio/kafka-go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
)

func initJaegerTracer(url string) *trace.TracerProvider {
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url)))
if err != nil {
log.Fatalf("Err initializing jaeger instance %v", err)
}

tp := trace.NewTracerProvider(
trace.WithBatcher(exp),
trace.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName("trace-demo"),
attribute.String("environment", "prod"),
)),
)

otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.TraceContext{})

return tp
}

func main() {
tp := initJaegerTracer("http://localhost:14268/api/traces")
defer func(tp *trace.TracerProvider, ctx context.Context) {
err := tp.Shutdown(ctx)
if err != nil {
log.Fatal(err.Error())
}
}(tp, context.Background())

segmentioReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
GroupTopics: []string{"opentel"},
GroupID: "opentel-manualcommit-cg",
})

reader, err := otelkafkakonsumer.NewReader(
segmentioReader,
otelkafkakonsumer.WithTracerProvider(tp),
otelkafkakonsumer.WithPropagator(propagation.TraceContext{}),
otelkafkakonsumer.WithAttributes(
[]attribute.KeyValue{
semconv.MessagingDestinationKindTopic,
semconv.MessagingKafkaClientIDKey.String("opentel-manualcommit-cg"),
},
),
)
if err != nil {
log.Fatal(err.Error())
}

for {
message, err := reader.FetchMessage(context.Background())
if err != nil {
fmt.Println(err.Error())
continue
}
fmt.Println(message)

// Extract tracing info from message
ctx := reader.TraceConfig.Propagator.Extract(context.Background(), otelkafkakonsumer.NewMessageCarrier(message))

tr := otel.Tracer("consumer")
parentCtx, span := tr.Start(ctx, "work")
time.Sleep(100 * time.Millisecond)
span.End()

_, span = tr.Start(parentCtx, "another work")
time.Sleep(50 * time.Millisecond)
span.End()

reader.CommitMessages(context.Background(), *message)
}
}
2 changes: 1 addition & 1 deletion example/producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func main() {
},
))
if err != nil {
log.Fatal(err.Error()) //nolint:gocritic
log.Fatal(err.Error())
}
defer writer.Close()

Expand Down
61 changes: 45 additions & 16 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import (
)

type Reader struct {
R *kafka.Reader
TraceConfig *Config
activeSpan unsafe.Pointer
R *kafka.Reader
TraceConfig *Config
activeFetchSpan unsafe.Pointer
activeCommitSpan unsafe.Pointer
}

type readerSpan struct {
type spanWrapper struct {
otelSpan trace.Span
}

Expand All @@ -38,17 +39,17 @@ func NewReader(r *kafka.Reader, opts ...Option) (*Reader, error) {
)

return &Reader{
R: r,
TraceConfig: cfg,
activeSpan: unsafe.Pointer(&readerSpan{}),
R: r,
TraceConfig: cfg,
activeFetchSpan: unsafe.Pointer(&spanWrapper{}),
activeCommitSpan: unsafe.Pointer(&spanWrapper{}),
}, nil
}

func (r *Reader) startSpan(msg *kafka.Message) readerSpan {
func (r *Reader) startSpan(spanName string, msg *kafka.Message) spanWrapper {
carrier := NewMessageCarrier(msg)
psc := r.TraceConfig.Propagator.Extract(context.Background(), carrier)

name := fmt.Sprintf("received from %s", msg.Topic)
opts := r.TraceConfig.MergedSpanStartOptions(
trace.WithAttributes(
semconv.MessagingDestinationKey.String(msg.Topic),
Expand All @@ -58,28 +59,55 @@ func (r *Reader) startSpan(msg *kafka.Message) readerSpan {
),
trace.WithSpanKind(trace.SpanKindConsumer),
)
ctx, otelSpan := r.TraceConfig.Tracer.Start(psc, name, opts...)
ctx, otelSpan := r.TraceConfig.Tracer.Start(psc, spanName, opts...)

// Inject the current span into the original message, so it can be used to
// propagate the span.
r.TraceConfig.Propagator.Inject(ctx, carrier)

return readerSpan{otelSpan: otelSpan}
return spanWrapper{otelSpan: otelSpan}
}

func (r *Reader) FetchMessage(ctx context.Context) (*kafka.Message, error) {
startTime := time.Now()
msg, err := r.R.FetchMessage(ctx)
if err == nil {
s := r.startSpan(fmt.Sprintf("fetched from %s", msg.Topic), &msg)
active := atomic.SwapPointer(&r.activeFetchSpan, unsafe.Pointer(&s))
(*spanWrapper)(active).End(trace.WithTimestamp(startTime))
s.End()
}
return &msg, err
}

func (r *Reader) CommitMessages(ctx context.Context, msgs ...kafka.Message) error {
// open span
startTime := time.Now()
s := r.startSpan(fmt.Sprintf("committed to %s", msgs[0].Topic), &msgs[0])
active := atomic.SwapPointer(&r.activeCommitSpan, unsafe.Pointer(&s))

err := r.R.CommitMessages(ctx, msgs...)

// end span
(*spanWrapper)(active).End(trace.WithTimestamp(startTime))
s.End()

return err
}

func (r *Reader) ReadMessage(ctx context.Context) (*kafka.Message, error) {
endTime := time.Now()
msg, err := r.R.ReadMessage(ctx)
if err == nil {
s := r.startSpan(&msg)
active := atomic.SwapPointer(&r.activeSpan, unsafe.Pointer(&s))
(*readerSpan)(active).End(trace.WithTimestamp(endTime))
s := r.startSpan(fmt.Sprintf("received from %s", msg.Topic), &msg)
active := atomic.SwapPointer(&r.activeFetchSpan, unsafe.Pointer(&s))
(*spanWrapper)(active).End(trace.WithTimestamp(endTime))
s.End()
}
return &msg, err
}

func (s readerSpan) End(options ...trace.SpanEndOption) {
func (s spanWrapper) End(options ...trace.SpanEndOption) {
if s.otelSpan != nil {
s.otelSpan.End(options...)
}
Expand All @@ -89,6 +117,7 @@ func (s readerSpan) End(options ...trace.SpanEndOption) {
// any remaining span.
func (r *Reader) Close() error {
err := r.R.Close()
(*readerSpan)(atomic.LoadPointer(&r.activeSpan)).End()
(*spanWrapper)(atomic.LoadPointer(&r.activeFetchSpan)).End()
(*spanWrapper)(atomic.LoadPointer(&r.activeCommitSpan)).End()
return err
}
2 changes: 2 additions & 0 deletions test/integration/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module integration

go 1.19

replace github.com/Trendyol/otel-kafka-konsumer => ../..

require (
github.com/Trendyol/otel-kafka-konsumer v0.0.5
github.com/segmentio/kafka-go v0.4.44
Expand Down
3 changes: 0 additions & 3 deletions test/integration/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
github.com/Trendyol/otel-kafka-konsumer v0.0.5 h1:i5Q6vR4ZRTtlb+uLimGJNBOQUiAtcbjn7Xc2FmPap/4=
github.com/Trendyol/otel-kafka-konsumer v0.0.5/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -9,7 +7,6 @@ github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbV
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
Expand Down
79 changes: 78 additions & 1 deletion test/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func getTracer() *trace.TracerProvider {
return tp
}

func Test_Producer_And_Consumer_Spans_Have_Same_Trace_Id(t *testing.T) {
func Test_Producer_And_Consumer_Spans_Have_Same_Trace_Id_In_AutoCommit_Mode(t *testing.T) {
tracer := getTracer()
segmentioProducer := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Expand Down Expand Up @@ -105,3 +105,80 @@ func Test_Producer_And_Consumer_Spans_Have_Same_Trace_Id(t *testing.T) {
assert.Equal(t, producerSpan.SpanContext().TraceID().String(), workSpan.SpanContext().TraceID().String())
assert.Equal(t, producerSpan.SpanContext().TraceID().String(), traceParent[1])
}

func Test_Producer_And_Consumer_Spans_Have_Same_Trace_Id_In_ManualCommit_Mode(t *testing.T) {
tracer := getTracer()
segmentioProducer := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
AllowAutoTopicCreation: true,
}
writer, err := otelkafkakonsumer.NewWriter(segmentioProducer,
otelkafkakonsumer.WithTracerProvider(tracer),
otelkafkakonsumer.WithPropagator(propagation.TraceContext{}),
otelkafkakonsumer.WithAttributes(
[]attribute.KeyValue{
semconv.MessagingDestinationKindTopic,
semconv.MessagingKafkaClientIDKey.String("opentel-cg-manual-commit"),
},
))
if err != nil {
log.Fatal(err.Error())
}
defer writer.Close()

message := kafka.Message{Topic: "opentel-consumer-test-manual-commit", Value: []byte("1")}

// Extract tracing info from message
ctx := writer.TraceConfig.Propagator.Extract(context.Background(), otelkafkakonsumer.NewMessageCarrier(&message))

tr := tracer.Tracer("otel-kafka-konsumer")
parentCtx, producerSpan := tr.Start(ctx, "before produce operation")
time.Sleep(100 * time.Millisecond)
producerSpan.End()

writer.WriteMessage(parentCtx, message)

segmentioReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
GroupTopics: []string{"opentel-consumer-test-manual-commit"},
GroupID: "opentel-cg-manual-commit",
})

reader, err := otelkafkakonsumer.NewReader(
segmentioReader,
otelkafkakonsumer.WithTracerProvider(tracer),
otelkafkakonsumer.WithPropagator(propagation.TraceContext{}),
otelkafkakonsumer.WithAttributes(
[]attribute.KeyValue{
semconv.MessagingDestinationKindTopic,
semconv.MessagingKafkaClientIDKey.String("opentel-cg-manual-commit"),
},
),
)
if err != nil {
log.Fatal(err.Error())
}
consumerMessage, err := reader.FetchMessage(context.Background())

// Extract tracing info from message
consumerCtx := reader.TraceConfig.Propagator.Extract(context.Background(), otelkafkakonsumer.NewMessageCarrier(consumerMessage))
consumerCtx, workSpan := tr.Start(consumerCtx, "work")

time.Sleep(100 * time.Millisecond)
workSpan.End()

_, anotherWorkSpan := tr.Start(consumerCtx, "another work")
time.Sleep(50 * time.Millisecond)
anotherWorkSpan.End()

reader.CommitMessages(context.Background(), *consumerMessage)

//assert
traceParent := strings.Split(string(consumerMessage.Headers[0].Value), "-")
assert.Equal(t, producerSpan.SpanContext().HasTraceID(), true)
assert.Equal(t, producerSpan.SpanContext().IsValid(), true)
assert.Equal(t, producerSpan.SpanContext().HasSpanID(), true)
assert.Equal(t, producerSpan.SpanContext().TraceID().String(), anotherWorkSpan.SpanContext().TraceID().String())
assert.Equal(t, producerSpan.SpanContext().TraceID().String(), workSpan.SpanContext().TraceID().String())
assert.Equal(t, producerSpan.SpanContext().TraceID().String(), traceParent[1])
}

0 comments on commit 529dd2c

Please sign in to comment.