Skip to content

Commit

Permalink
Support micrometer-tracing (#205)
Browse files Browse the repository at this point in the history
* Support micrometer-tracing

* Improve test code
  • Loading branch information
be-hase authored May 22, 2023
1 parent 02024a9 commit 5f52fe8
Show file tree
Hide file tree
Showing 9 changed files with 362 additions and 1 deletion.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ subprojects {
protobufVersion = "3.22.3"
kafkaVersion = "3.2.3"
micrometerVersion = "1.10.6"
micrometerTracingVersion = "1.1.1"
lombokVersion = "1.18.26"
junitVersion = "4.13.2"
hamcrestVersion = "2.2"
Expand Down
11 changes: 11 additions & 0 deletions micrometer-tracing/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
dependencies {
api project(":processor")

api "org.apache.kafka:kafka-clients:$kafkaVersion"
api "io.micrometer:micrometer-tracing:$micrometerTracingVersion"

itImplementation project(":testing")
itImplementation "io.micrometer:micrometer-tracing-bridge-brave:$micrometerTracingVersion"
itImplementation "io.micrometer:micrometer-tracing-bridge-otel:$micrometerTracingVersion"
itImplementation "io.opentelemetry.instrumentation:opentelemetry-kafka-clients-2.6:1.26.0-alpha"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.decaton.processor.runtime;

import static org.junit.Assert.assertEquals;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

import com.linecorp.decaton.client.DecatonClientBuilder.DefaultKafkaProducerSupplier;
import com.linecorp.decaton.processor.TaskMetadata;
import com.linecorp.decaton.testing.KafkaClusterRule;
import com.linecorp.decaton.testing.TestUtils;
import com.linecorp.decaton.testing.processor.ProcessedRecord;
import com.linecorp.decaton.testing.processor.ProcessingGuarantee;
import com.linecorp.decaton.testing.processor.ProcessingGuarantee.GuaranteeType;
import com.linecorp.decaton.testing.processor.ProcessorTestSuite;
import com.linecorp.decaton.testing.processor.ProducedRecord;
import com.linecorp.decaton.testing.processor.ProducerAdaptor;

import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.otel.bridge.OtelCurrentTraceContext;
import io.micrometer.tracing.otel.bridge.OtelPropagator;
import io.micrometer.tracing.otel.bridge.OtelTracer;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.instrumentation.kafkaclients.v2_6.KafkaTelemetry;
import io.opentelemetry.sdk.OpenTelemetrySdk;

public class MicrometerTracingOtelBridgeTest {

private OpenTelemetry openTelemetry;
private io.opentelemetry.api.trace.Tracer otelTracer;
private KafkaTelemetry otelKafkaTelemetry;
private Tracer tracer;
private String retryTopic;

@ClassRule
public static KafkaClusterRule rule = new KafkaClusterRule();

@Before
public void setUp() {
openTelemetry = OpenTelemetrySdk.builder()
.setPropagators(ContextPropagators.create(
W3CTraceContextPropagator.getInstance()))
.buildAndRegisterGlobal();
otelTracer = openTelemetry.getTracerProvider().get("io.micrometer.micrometer-tracing");
otelKafkaTelemetry = KafkaTelemetry.create(openTelemetry);
tracer = new OtelTracer(otelTracer, new OtelCurrentTraceContext(), event -> {
});
retryTopic = rule.admin().createRandomTopic(3, 3);
}

@After
public void tearDown() {
rule.admin().deleteTopics(true, retryTopic);
}

@Test(timeout = 30000)
public void testTracePropagation() throws Exception {
// scenario:
// * half of arrived tasks are retried once
// * after retried (i.e. retryCount() > 0), no more retry
final DefaultKafkaProducerSupplier producerSupplier = new DefaultKafkaProducerSupplier();
ProcessorTestSuite
.builder(rule)
.configureProcessorsBuilder(builder -> builder.thenProcess((ctx, task) -> {
if (ctx.metadata().retryCount() == 0 && ThreadLocalRandom.current().nextBoolean()) {
ctx.retry();
}
}))
// micrometer-tracing does not yet have kafka producer support, so use openTelemetry directly
.producerSupplier(bootstrapServers -> new WithParentSpanProducer<>(
otelKafkaTelemetry.wrap(TestUtils.producer(bootstrapServers)), otelTracer))
.retryConfig(RetryConfig.builder()
.retryTopic(retryTopic)
.backoff(Duration.ofMillis(10))
.producerSupplier(config -> new WithParentSpanProducer<>(
otelKafkaTelemetry.wrap(producerSupplier.getProducer(config)),
otelTracer))
.build())
.tracingProvider(new MicrometerTracingProvider(
tracer, new OtelPropagator(openTelemetry.getPropagators(), otelTracer)))
// If we retry tasks, there's no guarantee about ordering nor serial processing
.excludeSemantics(GuaranteeType.PROCESS_ORDERING, GuaranteeType.SERIAL_PROCESSING)
.customSemantics(new TracePropagationGuarantee(tracer))
.build()
.run();
}

private static class TracePropagationGuarantee implements ProcessingGuarantee {
private final Map<String, String> producedTraceIds = new ConcurrentHashMap<>();
private final Map<String, String> consumedTraceIds = new ConcurrentHashMap<>();
private final Tracer tracer;

TracePropagationGuarantee(Tracer tracer) {
this.tracer = tracer;
}

@Override
public void onProduce(ProducedRecord record) {
producedTraceIds.put(record.task().getId(), tracer.currentTraceContext().context().traceId());
}

@Override
public void onProcess(TaskMetadata metadata, ProcessedRecord record) {
consumedTraceIds.put(record.task().getId(), tracer.currentTraceContext().context().traceId());
}

@Override
public void doAssert() {
assertEquals(producedTraceIds, consumedTraceIds);
}
}

private static class WithParentSpanProducer<K, V> extends ProducerAdaptor<K, V> {
private final io.opentelemetry.api.trace.Tracer otelTracer;

WithParentSpanProducer(Producer<K, V> delegate, io.opentelemetry.api.trace.Tracer otelTracer) {
super(delegate);
this.otelTracer = otelTracer;
}

@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}

// Since the context of parent is injected in Callback of `producer.send`, create the span manually.
// ref: https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/8a15975dcacda48375cae62e98fe7551fb192d1f/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java#L262-L264
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
final Span span = otelTracer.spanBuilder("test span").startSpan();
try (final Scope scope = span.makeCurrent()) {
return delegate.send(record, callback);
} finally {
span.end();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.decaton.processor.runtime;

import com.linecorp.decaton.processor.tracing.TracingProvider.ProcessorTraceHandle;

import io.micrometer.tracing.Span;
import io.micrometer.tracing.Tracer;

final class MicrometerProcessorTraceHandle extends MicrometerTraceHandle implements ProcessorTraceHandle {
private Tracer.SpanInScope scope;

MicrometerProcessorTraceHandle(Tracer tracer, Span span) {
super(tracer, span);
}

@Override
public void processingStart() {
scope = tracer.withSpan(span);
}

@Override
public void processingReturn() {
span.event("return");
scope.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.decaton.processor.runtime;

import com.linecorp.decaton.processor.DecatonProcessor;
import com.linecorp.decaton.processor.tracing.TracingProvider;

import io.micrometer.tracing.Span;
import io.micrometer.tracing.Tracer;

final class MicrometerRecordTraceHandle extends MicrometerTraceHandle
implements TracingProvider.RecordTraceHandle {
MicrometerRecordTraceHandle(Tracer tracer, Span span) {
super(tracer, span);
}

@Override
public MicrometerProcessorTraceHandle childFor(DecatonProcessor<?> processor) {
final Span childSpan = tracer.nextSpan(span).name(processor.name()).start();
return new MicrometerProcessorTraceHandle(tracer, childSpan);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.decaton.processor.runtime;

import com.linecorp.decaton.processor.tracing.TracingProvider.TraceHandle;

import io.micrometer.tracing.Span;
import io.micrometer.tracing.Tracer;

class MicrometerTraceHandle implements TraceHandle {
protected final Tracer tracer;
protected final Span span;

MicrometerTraceHandle(Tracer tracer, Span span) {
this.tracer = tracer;
this.span = span;
}

@Override
public void processingCompletion() {
span.end();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.decaton.processor.runtime;

import static java.nio.charset.StandardCharsets.UTF_8;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

import com.linecorp.decaton.processor.tracing.TracingProvider;

import io.micrometer.common.lang.Nullable;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.propagation.Propagator;

public class MicrometerTracingProvider implements TracingProvider {
private final Tracer tracer;
private final Propagator propagator;

public MicrometerTracingProvider(Tracer tracer, Propagator propagator) {
this.tracer = tracer;
this.propagator = propagator;
}

@Override
public MicrometerRecordTraceHandle traceFor(ConsumerRecord<?, ?> record, String subscriptionId) {
return new MicrometerRecordTraceHandle(
tracer,
propagator.extract(record.headers(), GETTER)
.name("decaton").tag("subscriptionId", subscriptionId).start()
);
}

private static final Propagator.Getter<Headers> GETTER = new Propagator.Getter<Headers>() {
@Override
public String get(Headers carrier, String key) {
return lastStringHeader(carrier, key);
}

@Nullable
private String lastStringHeader(Headers headers, String key) {
final Header header = headers.lastHeader(key);
if (header == null || header.value() == null) {
return null;
}
return new String(header.value(), UTF_8);
}
};
}
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ include ":docs"
include ":testing"
include ":benchmark"
include ":brave"
include ":micrometer-tracing"
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;

abstract class ProducerAdaptor<K, V> implements Producer<K, V> {
public abstract class ProducerAdaptor<K, V> implements Producer<K, V> {
protected final Producer<K, V> delegate;
protected ProducerAdaptor(Producer<K, V> delegate) {
this.delegate = delegate;
Expand Down

0 comments on commit 5f52fe8

Please sign in to comment.