Skip to content

Commit

Permalink
improve baggage propagation (#1545)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Beemer <beeme1mr@users.noreply.github.com>
Co-authored-by: Austin Parker <austin@ap2.io>
  • Loading branch information
beeme1mr and austinlparker authored Apr 30, 2024
1 parent faa5104 commit 61d6a5f
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 60 deletions.
26 changes: 9 additions & 17 deletions src/adservice/src/main/java/oteldemo/AdService.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private static class AdServiceImpl extends oteldemo.AdServiceGrpc.AdServiceImplB
private static final String ADSERVICE_FAILURE = "adServiceFailure";
private static final String ADSERVICE_MANUAL_GC_FEATURE_FLAG = "adServiceManualGc";
private static final String ADSERVICE_HIGH_CPU_FEATURE_FLAG = "adServiceHighCpu";
Client ffClient = OpenFeatureAPI.getInstance().getClient();
private static final Client ffClient = OpenFeatureAPI.getInstance().getClient();

private AdServiceImpl() {}

Expand All @@ -149,8 +149,6 @@ private AdServiceImpl() {}
@Override
public void getAds(AdRequest req, StreamObserver<AdResponse> responseObserver) {
AdService service = AdService.getInstance();
CPULoad cpuload = CPULoad.getInstance();
cpuload.execute(getFeatureFlagEnabled(ADSERVICE_HIGH_CPU_FEATURE_FLAG));

// get the current span in context
Span span = Span.current();
Expand All @@ -160,14 +158,19 @@ public void getAds(AdRequest req, StreamObserver<AdResponse> responseObserver) {
AdResponseType adResponseType;

Baggage baggage = Baggage.fromContextOrNull(Context.current());
MutableContext evaluationContext = new MutableContext();
if (baggage != null) {
final String sessionId = baggage.getEntryValue("session.id");
span.setAttribute("session.id", sessionId);
ffClient.setEvaluationContext(new MutableContext().add("session", sessionId));
evaluationContext.setTargetingKey(sessionId);
evaluationContext.add("session", sessionId);
} else {
logger.info("no baggage found in context");
}

CPULoad cpuload = CPULoad.getInstance();
cpuload.execute(ffClient.getBooleanValue(ADSERVICE_HIGH_CPU_FEATURE_FLAG, false, evaluationContext));

span.setAttribute("app.ads.contextKeys", req.getContextKeysList().toString());
span.setAttribute("app.ads.contextKeys.count", req.getContextKeysCount());
if (req.getContextKeysCount() > 0) {
Expand Down Expand Up @@ -198,11 +201,11 @@ public void getAds(AdRequest req, StreamObserver<AdResponse> responseObserver) {
Attributes.of(
adRequestTypeKey, adRequestType.name(), adResponseTypeKey, adResponseType.name()));

if (getFeatureFlagEnabled(ADSERVICE_FAILURE)) {
if (ffClient.getBooleanValue(ADSERVICE_FAILURE, false, evaluationContext)) {
throw new StatusRuntimeException(Status.UNAVAILABLE);
}

if (getFeatureFlagEnabled(ADSERVICE_MANUAL_GC_FEATURE_FLAG)) {
if (ffClient.getBooleanValue(ADSERVICE_MANUAL_GC_FEATURE_FLAG, false, evaluationContext)) {
logger.warn("Feature Flag " + ADSERVICE_MANUAL_GC_FEATURE_FLAG + " enabled, performing a manual gc now");
GarbageCollectionTrigger gct = new GarbageCollectionTrigger();
gct.doExecute();
Expand All @@ -219,17 +222,6 @@ public void getAds(AdRequest req, StreamObserver<AdResponse> responseObserver) {
responseObserver.onError(e);
}
}

/**
* Retrieves the status of a feature flag from the Feature Flag service.
*
* @param ff The name of the feature flag to retrieve.
* @return {@code true} if the feature flag is enabled, {@code false} otherwise or in case of errors.
*/
boolean getFeatureFlagEnabled(String ff) {
Boolean boolValue = ffClient.getBooleanValue(ff, false);
return boolValue;
}
}

private static final ImmutableListMultimap<String, Ad> adsMap = createAdsMap();
Expand Down
13 changes: 2 additions & 11 deletions src/flagd/demo.flagd.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,8 @@
"defaultVariant": "off",
"targeting": {
"fractional": [
{
"var": "session"
},
[
"on",
10
],
[
"off",
90
]
["on", 10],
["off", 90]
]
}
},
Expand Down
46 changes: 31 additions & 15 deletions src/frontend/gateways/Api.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const { userId } = SessionGateway.getSession();

const basePath = '/api';

const ApiGateway = () => ({
const Apis = () => ({
getCart(currencyCode: string) {
return request<IProductCart>({
url: `${basePath}/cart`,
Expand Down Expand Up @@ -79,25 +79,41 @@ const ApiGateway = () => ({
queryParams: {
productIds,
sessionId: userId,
currencyCode
currencyCode,
},
});
},
listAds(contextKeys: string[]) {
// TODO: Figure out a better way to do this so session ID gets propagated to
// all endpoints
const baggage = propagation.getActiveBaggage() || propagation.createBaggage();
const newBaggage = baggage.setEntry(AttributeNames.SESSION_ID, { value: userId });
const newContext = propagation.setBaggage(context.active(), newBaggage);
context.with(newContext, () => {
return request<Ad[]>({
url: `${basePath}/data`,
queryParams: {
contextKeys,
},
});
return request<Ad[]>({
url: `${basePath}/data`,
queryParams: {
contextKeys,
},
});
},
});

export default ApiGateway();
/**
* Extends all the API calls to set baggage automatically.
*/
const ApiGateway = new Proxy(Apis(), {
get(target, prop, receiver) {
const originalFunction = Reflect.get(target, prop, receiver);

if (typeof originalFunction !== 'function') {
return originalFunction;
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
return function (...args: any[]) {
const baggage = propagation.getActiveBaggage() || propagation.createBaggage();
const newBaggage = baggage.setEntry(AttributeNames.SESSION_ID, { value: userId });
const newContext = propagation.setBaggage(context.active(), newBaggage);
return context.with(newContext, () => {
return Reflect.apply(originalFunction, undefined, args);
});
};
},
});

export default ApiGateway;
23 changes: 12 additions & 11 deletions src/frontend/utils/telemetry/FrontendTracer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,33 @@ import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions'
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
import { SessionIdProcessor } from './SessionIdProcessor';
import { detectResourcesSync } from '@opentelemetry/resources/build/src/detect-resources';
import { ZoneContextManager } from '@opentelemetry/context-zone';

const { NEXT_PUBLIC_OTEL_SERVICE_NAME = '', NEXT_PUBLIC_OTEL_EXPORTER_OTLP_TRACES_ENDPOINT = '', IS_SYNTHETIC_REQUEST = '' } =
typeof window !== 'undefined' ? window.ENV : {};

const FrontendTracer = async (collectorString: string) => {
const { ZoneContextManager } = await import('@opentelemetry/context-zone');
const {
NEXT_PUBLIC_OTEL_SERVICE_NAME = '',
NEXT_PUBLIC_OTEL_EXPORTER_OTLP_TRACES_ENDPOINT = '',
IS_SYNTHETIC_REQUEST = '',
} = typeof window !== 'undefined' ? window.ENV : {};

const FrontendTracer = (collectorString: string) => {
let resource = new Resource({
[SemanticResourceAttributes.SERVICE_NAME]: NEXT_PUBLIC_OTEL_SERVICE_NAME,
});

const detectedResources = detectResourcesSync({ detectors: [browserDetector] });
resource = resource.merge(detectedResources);
const provider = new WebTracerProvider({
resource
});
const provider = new WebTracerProvider({ resource });

provider.addSpanProcessor(new SessionIdProcessor());

provider.addSpanProcessor(
new BatchSpanProcessor(
new OTLPTraceExporter({
url: NEXT_PUBLIC_OTEL_EXPORTER_OTLP_TRACES_ENDPOINT || collectorString || 'http://localhost:4318/v1/traces',
}), {
scheduledDelayMillis : 500
}
}),
{
scheduledDelayMillis: 500,
}
)
);

Expand Down
12 changes: 6 additions & 6 deletions src/loadgenerator/locustfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@
import random
import uuid
import logging
import sys
from pythonjsonlogger import jsonlogger

from locust import HttpUser, task, between
from locust_plugins.users.playwright import PlaywrightUser, pw, PageWithRetry, event

from opentelemetry import context, baggage, trace
from opentelemetry.metrics import set_meter_provider
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import MetricExporter, PeriodicExportingMetricReader
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
Expand All @@ -36,7 +35,6 @@

from openfeature import api
from openfeature.contrib.provider.flagd import FlagdProvider
from openfeature.exception import OpenFeatureError

from playwright.async_api import Route, Request

Expand Down Expand Up @@ -172,7 +170,8 @@ def flood_home(self):
self.client.get("/")

def on_start(self):
ctx = baggage.set_baggage("synthetic_request", "true")
ctx = baggage.set_baggage("session.id", str(uuid.uuid4()))
ctx = baggage.set_baggage("synthetic_request", "true", context=ctx)
context.attach(ctx)
self.index()

Expand Down Expand Up @@ -210,8 +209,9 @@ async def add_product_to_cart(self, page: PageWithRetry):


async def add_baggage_header(route: Route, request: Request):
existing_baggage = request.headers.get('baggage', '')
headers = {
**request.headers,
'baggage': 'synthetic_request=true'
'baggage': ', '.join(filter(None, (existing_baggage, 'synthetic_request=true')))
}
await route.continue_(headers=headers)

0 comments on commit 61d6a5f

Please sign in to comment.