Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store additional fields in context instead of baggage. #1316

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.opentelemetry.api.trace.TraceId;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.ContextKey;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.context.propagation.TextMapSetter;
Expand Down Expand Up @@ -68,6 +69,10 @@ public final class AwsXrayPropagator implements TextMapPropagator {
private static final char IS_SAMPLED = '1';
private static final char NOT_SAMPLED = '0';

private static final String XRAY_HEADER_ADDITIONAL_FIELDS_KEY_NAME = "XrayHeaderAdditionalFields";
static final ContextKey<Baggage> XRAY_HEADER_ADDITIONAL_FIELDS_KEY =
ContextKey.named(XRAY_HEADER_ADDITIONAL_FIELDS_KEY_NAME);

private static final List<String> FIELDS = Collections.singletonList(TRACE_HEADER_KEY);

private static final AwsXrayPropagator INSTANCE = new AwsXrayPropagator();
Expand All @@ -85,6 +90,7 @@ public List<String> fields() {
return FIELDS;
}

@SuppressWarnings("null")
@Override
public <C> void inject(Context context, @Nullable C carrier, TextMapSetter<C> setter) {
if (context == null) {
Expand Down Expand Up @@ -126,34 +132,35 @@ public <C> void inject(Context context, @Nullable C carrier, TextMapSetter<C> se
.append(KV_DELIMITER)
.append(samplingFlag);

Baggage baggage = Baggage.fromContext(context);
// Truncate baggage to 256 chars per X-Ray spec.
baggage.forEach(
new BiConsumer<String, BaggageEntry>() {

private int baggageWrittenBytes;

@Override
public void accept(String key, BaggageEntry entry) {
if (key.equals(TRACE_ID_KEY)
|| key.equals(PARENT_ID_KEY)
|| key.equals(SAMPLED_FLAG_KEY)) {
return;
Baggage fields = context.get(XRAY_HEADER_ADDITIONAL_FIELDS_KEY);
// Truncate fields to 256 chars per X-Ray spec.
if (fields != null) {
fields.forEach(
new BiConsumer<String, BaggageEntry>() {

private int baggageWrittenBytes;

@Override
public void accept(String key, BaggageEntry entry) {
if (key.equals(TRACE_ID_KEY)
|| key.equals(PARENT_ID_KEY)
|| key.equals(SAMPLED_FLAG_KEY)) {
return;
}
// Size is key/value pair, excludes delimiter.
int size = key.length() + entry.getValue().length() + 1;
if (baggageWrittenBytes + size > 256) {
return;
}
traceHeader
.append(TRACE_HEADER_DELIMITER)
.append(key)
.append(KV_DELIMITER)
.append(entry.getValue());
baggageWrittenBytes += size;
}
// Size is key/value pair, excludes delimiter.
int size = key.length() + entry.getValue().length() + 1;
if (baggageWrittenBytes + size > 256) {
return;
}
traceHeader
.append(TRACE_HEADER_DELIMITER)
.append(key)
.append(KV_DELIMITER)
.append(entry.getValue());
baggageWrittenBytes += size;
}
});

});
}
setter.set(carrier, TRACE_HEADER_KEY, traceHeader.toString());
}

Expand All @@ -180,7 +187,7 @@ private static <C> Context getContextFromHeader(
String spanId = SpanId.getInvalid();
Boolean isSampled = false;

BaggageBuilder baggage = null;
BaggageBuilder fields = null;
int baggageReadBytes = 0;

int pos = 0;
Expand Down Expand Up @@ -211,10 +218,10 @@ private static <C> Context getContextFromHeader(
} else if (trimmedPart.startsWith(SAMPLED_FLAG_KEY)) {
isSampled = parseTraceFlag(value);
} else if (baggageReadBytes + trimmedPart.length() <= 256) {
if (baggage == null) {
baggage = Baggage.builder();
if (fields == null) {
fields = Baggage.builder();
}
baggage.put(trimmedPart.substring(0, equalsIndex), value);
fields.put(trimmedPart.substring(0, equalsIndex), value);
baggageReadBytes += trimmedPart.length();
}
}
Expand All @@ -241,8 +248,8 @@ private static <C> Context getContextFromHeader(
if (spanContext.isValid()) {
context = context.with(Span.wrap(spanContext));
}
if (baggage != null) {
context = context.with(baggage.build());
if (fields != null) {
context = context.with(XRAY_HEADER_ADDITIONAL_FIELDS_KEY, fields.build());
}
return context;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.contrib.awsxray.propagator;

import static io.opentelemetry.contrib.awsxray.propagator.AwsXrayPropagator.TRACE_HEADER_KEY;
import static io.opentelemetry.contrib.awsxray.propagator.AwsXrayPropagator.XRAY_HEADER_ADDITIONAL_FIELDS_KEY;
import static org.assertj.core.api.Assertions.assertThat;

import io.opentelemetry.api.baggage.Baggage;
Expand Down Expand Up @@ -90,14 +91,15 @@ void inject_NotSampledContext() {
}

@Test
void inject_WithBaggage() {
void inject_WithAdditionalFields() {
Map<String, String> carrier = new LinkedHashMap<>();
subject.inject(
withSpanContext(
SpanContext.create(
TRACE_ID, SPAN_ID, TraceFlags.getDefault(), TraceState.getDefault()),
Context.current())
.with(
XRAY_HEADER_ADDITIONAL_FIELDS_KEY,
Baggage.builder()
.put("cat", "meow")
.put("dog", "bark")
Expand All @@ -116,7 +118,7 @@ void inject_WithBaggage() {
}

@Test
void inject_WithBaggage_LimitTruncates() {
void inject_WithAdditionalFields_LimitTruncates() {
Map<String, String> carrier = new LinkedHashMap<>();
// Limit is 256 characters for all baggage. We add a 254-character key/value pair and a
// 3 character key value pair.
Expand All @@ -133,7 +135,7 @@ void inject_WithBaggage_LimitTruncates() {
SpanContext.create(
TRACE_ID, SPAN_ID, TraceFlags.getDefault(), TraceState.getDefault()),
Context.current())
.with(baggage),
.with(XRAY_HEADER_ADDITIONAL_FIELDS_KEY, baggage),
carrier,
SETTER);

Expand Down Expand Up @@ -244,11 +246,13 @@ void extract_AdditionalFields() {
.isEqualTo(
SpanContext.createFromRemoteParent(
TRACE_ID, SPAN_ID, TraceFlags.getSampled(), TraceState.getDefault()));
assertThat(Baggage.fromContext(context).getEntryValue("Foo")).isEqualTo("Bar");
Baggage additionalFields = context.get(XRAY_HEADER_ADDITIONAL_FIELDS_KEY);
assertThat(additionalFields.getEntryValue("Foo")).isEqualTo("Bar");
assertThat(Baggage.fromContext(context)).isEqualTo(Baggage.empty());
}

@Test
void extract_Baggage_LimitTruncates() {
void extract_AdditionalFields_LimitTruncates() {
// Limit is 256 characters for all baggage. We add a 254-character key/value pair and a
// 3 character key value pair.
String key1 = Stream.generate(() -> "a").limit(252).collect(Collectors.joining());
Expand All @@ -274,8 +278,10 @@ void extract_Baggage_LimitTruncates() {
.isEqualTo(
SpanContext.createFromRemoteParent(
TRACE_ID, SPAN_ID, TraceFlags.getSampled(), TraceState.getDefault()));
assertThat(Baggage.fromContext(context).getEntryValue(key1)).isEqualTo(value1);
assertThat(Baggage.fromContext(context).getEntryValue(key2)).isNull();
Baggage additionalFields = context.get(XRAY_HEADER_ADDITIONAL_FIELDS_KEY);
assertThat(additionalFields.getEntryValue(key1)).isEqualTo(value1);
assertThat(additionalFields.getEntryValue(key2)).isNull();
assertThat(Baggage.fromContext(context)).isEqualTo(Baggage.empty());
}

@Test
Expand Down
Loading