Skip to content

Commit

Permalink
Merge branch 'main' into slowloading-images
Browse files Browse the repository at this point in the history
  • Loading branch information
austinlparker authored Apr 30, 2024
2 parents af1837f + 61d6a5f commit 25b4e3a
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 70 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/gradle-wrapper-validation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ jobs:
steps:
- uses: actions/checkout@v4

- uses: gradle/wrapper-validation-action@v3.3.1
- uses: gradle/wrapper-validation-action@v3.3.2
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ the release.
([#1522](https://github.com/open-telemetry/opentelemetry-demo/pull/1522))
* [frontend] Pass down image optimization requests to imageprovider
([#1522](https://github.com/open-telemetry/opentelemetry-demo/pull/1522))
* [kafka] add kafkaQueueProblems feature flag
([#1528](https://github.com/open-telemetry/opentelemetry-demo/pull/1528))
* [otelcollector] Add `redisreceiver`
([#1537](https://github.com/open-telemetry/opentelemetry-demo/pull/1537))
* [frontend] Slowloading of images based on imageSlowLoad flag
Expand Down
4 changes: 3 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,11 @@ services:
deploy:
resources:
limits:
memory: 200M
memory: 300M
restart: unless-stopped
environment:
- FLAGD_HOST
- FLAGD_PORT
- KAFKA_SERVICE_ADDR
- OTEL_EXPORTER_OTLP_ENDPOINT=http://${OTEL_COLLECTOR_HOST}:${OTEL_COLLECTOR_PORT_HTTP}
- OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE
Expand Down
26 changes: 9 additions & 17 deletions src/adservice/src/main/java/oteldemo/AdService.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private static class AdServiceImpl extends oteldemo.AdServiceGrpc.AdServiceImplB
private static final String ADSERVICE_FAILURE = "adServiceFailure";
private static final String ADSERVICE_MANUAL_GC_FEATURE_FLAG = "adServiceManualGc";
private static final String ADSERVICE_HIGH_CPU_FEATURE_FLAG = "adServiceHighCpu";
Client ffClient = OpenFeatureAPI.getInstance().getClient();
private static final Client ffClient = OpenFeatureAPI.getInstance().getClient();

private AdServiceImpl() {}

Expand All @@ -149,8 +149,6 @@ private AdServiceImpl() {}
@Override
public void getAds(AdRequest req, StreamObserver<AdResponse> responseObserver) {
AdService service = AdService.getInstance();
CPULoad cpuload = CPULoad.getInstance();
cpuload.execute(getFeatureFlagEnabled(ADSERVICE_HIGH_CPU_FEATURE_FLAG));

// get the current span in context
Span span = Span.current();
Expand All @@ -160,14 +158,19 @@ public void getAds(AdRequest req, StreamObserver<AdResponse> responseObserver) {
AdResponseType adResponseType;

Baggage baggage = Baggage.fromContextOrNull(Context.current());
MutableContext evaluationContext = new MutableContext();
if (baggage != null) {
final String sessionId = baggage.getEntryValue("session.id");
span.setAttribute("session.id", sessionId);
ffClient.setEvaluationContext(new MutableContext().add("session", sessionId));
evaluationContext.setTargetingKey(sessionId);
evaluationContext.add("session", sessionId);
} else {
logger.info("no baggage found in context");
}

CPULoad cpuload = CPULoad.getInstance();
cpuload.execute(ffClient.getBooleanValue(ADSERVICE_HIGH_CPU_FEATURE_FLAG, false, evaluationContext));

span.setAttribute("app.ads.contextKeys", req.getContextKeysList().toString());
span.setAttribute("app.ads.contextKeys.count", req.getContextKeysCount());
if (req.getContextKeysCount() > 0) {
Expand Down Expand Up @@ -198,11 +201,11 @@ public void getAds(AdRequest req, StreamObserver<AdResponse> responseObserver) {
Attributes.of(
adRequestTypeKey, adRequestType.name(), adResponseTypeKey, adResponseType.name()));

if (getFeatureFlagEnabled(ADSERVICE_FAILURE)) {
if (ffClient.getBooleanValue(ADSERVICE_FAILURE, false, evaluationContext)) {
throw new StatusRuntimeException(Status.UNAVAILABLE);
}

if (getFeatureFlagEnabled(ADSERVICE_MANUAL_GC_FEATURE_FLAG)) {
if (ffClient.getBooleanValue(ADSERVICE_MANUAL_GC_FEATURE_FLAG, false, evaluationContext)) {
logger.warn("Feature Flag " + ADSERVICE_MANUAL_GC_FEATURE_FLAG + " enabled, performing a manual gc now");
GarbageCollectionTrigger gct = new GarbageCollectionTrigger();
gct.doExecute();
Expand All @@ -219,17 +222,6 @@ public void getAds(AdRequest req, StreamObserver<AdResponse> responseObserver) {
responseObserver.onError(e);
}
}

/**
* Retrieves the status of a feature flag from the Feature Flag service.
*
* @param ff The name of the feature flag to retrieve.
* @return {@code true} if the feature flag is enabled, {@code false} otherwise or in case of errors.
*/
boolean getFeatureFlagEnabled(String ff) {
Boolean boolValue = ffClient.getBooleanValue(ff, false);
return boolValue;
}
}

private static final ImmutableListMultimap<String, Ad> adsMap = createAdsMap();
Expand Down
49 changes: 41 additions & 8 deletions src/checkoutservice/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func main() {
}

openfeature.SetProvider(flagd.NewProvider())
openfeature.AddHooks(otelhooks.NewTracesHook())

tracer = tp.Tracer("checkoutservice")

Expand Down Expand Up @@ -316,6 +317,7 @@ func (cs *checkoutService) PlaceOrder(ctx context.Context, req *pb.PlaceOrderReq

// send to kafka only if kafka broker address is set
if cs.kafkaBrokerSvcAddr != "" {
log.Infof("sending to postProcessor")
cs.sendToPostProcessor(ctx, orderResult)
}

Expand Down Expand Up @@ -439,7 +441,7 @@ func (cs *checkoutService) convertCurrency(ctx context.Context, from *pb.Money,

func (cs *checkoutService) chargeCard(ctx context.Context, amount *pb.Money, paymentInfo *pb.CreditCardInfo) (string, error) {
paymentService := cs.paymentSvcClient
if cs.checkPaymentFailure(ctx) {
if cs.isFeatureFlagEnabled(ctx, "paymentServiceUnreachable") {
badAddress := "badAddress:50051"
c := mustCreateClient(context.Background(), badAddress)
paymentService = pb.NewPaymentServiceClient(c)
Expand Down Expand Up @@ -505,6 +507,18 @@ func (cs *checkoutService) sendToPostProcessor(ctx context.Context, result *pb.O
cs.KafkaProducerClient.Input() <- &msg
successMsg := <-cs.KafkaProducerClient.Successes()
log.Infof("Successful to write message. offset: %v", successMsg.Offset)

ffValue := cs.getIntFeatureFlag(ctx, "kafkaQueueProblems")
if ffValue > 0 {
log.Infof("Warning: FeatureFlag 'kafkaQueueProblems' is activated, overloading queue now.")
for i := 0; i < ffValue; i++ {
go func(i int) {
cs.KafkaProducerClient.Input() <- &msg
_ = <-cs.KafkaProducerClient.Successes()
}(i)
}
log.Infof("Done with #%d messages for overload simulation.", ffValue)
}
}

func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace.Span {
Expand Down Expand Up @@ -533,11 +547,30 @@ func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace.
return span
}

func (cs *checkoutService) checkPaymentFailure(ctx context.Context) bool {
openfeature.AddHooks(otelhooks.NewTracesHook())
client := openfeature.NewClient("checkout")
failureEnabled, _ := client.BooleanValue(
ctx, "paymentServiceUnreachable", false, openfeature.EvaluationContext{},
)
return failureEnabled
func (cs *checkoutService) isFeatureFlagEnabled(ctx context.Context, featureFlagName string) bool {
client := openfeature.NewClient("checkout")

// Default value is set to false, but you could also make this a parameter.
featureEnabled, _ := client.BooleanValue(
ctx,
featureFlagName,
false,
openfeature.EvaluationContext{},
)

return featureEnabled
}

func (cs *checkoutService) getIntFeatureFlag(ctx context.Context, featureFlagName string) int {
client := openfeature.NewClient("checkout")

// Default value is set to 0, but you could also make this a parameter.
featureFlagValue, _ := client.IntValue(
ctx,
featureFlagName,
0,
openfeature.EvaluationContext{},
)

return int(featureFlagValue)
}
22 changes: 11 additions & 11 deletions src/flagd/demo.flagd.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,20 @@
"defaultVariant": "off",
"targeting": {
"fractional": [
{
"var": "session"
},
[
"on",
10
],
[
"off",
90
]
["on", 10],
["off", 90]
]
}
},
"kafkaQueueProblems": {
"description": "Overloads Kafka queue while simultaneously introducing a consumer side delay leading to a lag spike",
"state": "ENABLED",
"variants": {
"on": 100,
"off": 0
},
"defaultVariant": "off"
},
"cartServiceFailure": {
"description": "Fail cart service",
"state": "ENABLED",
Expand Down
8 changes: 8 additions & 0 deletions src/frauddetectionservice/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ dependencies {
implementation("org.apache.logging.log4j:log4j-core:2.21.1")
implementation("org.slf4j:slf4j-api:2.0.9")
implementation("com.google.protobuf:protobuf-kotlin:${protobufVersion}")
implementation("dev.openfeature:sdk:1.7.4")
implementation("dev.openfeature.contrib.providers:flagd:0.7.0")

if (JavaVersion.current().isJava9Compatible) {
// Workaround for @javax.annotation.Generated
Expand All @@ -50,6 +52,12 @@ dependencies {
}
}

tasks {
shadowJar {
mergeServiceFiles()
}
}

tasks.test {
useJUnitPlatform()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,26 @@ import oteldemo.Demo.*
import java.time.Duration.ofMillis
import java.util.*
import kotlin.system.exitProcess
import dev.openfeature.contrib.providers.flagd.FlagdOptions
import dev.openfeature.contrib.providers.flagd.FlagdProvider
import dev.openfeature.sdk.Client
import dev.openfeature.sdk.EvaluationContext
import dev.openfeature.sdk.ImmutableContext
import dev.openfeature.sdk.Value
import dev.openfeature.sdk.OpenFeatureAPI

const val topic = "orders"
const val groupID = "frauddetectionservice"

private val logger: Logger = LogManager.getLogger(groupID)

fun main() {
val options = FlagdOptions.builder()
.withGlobalTelemetry(true)
.build()
val flagdProvider = FlagdProvider(options)
OpenFeatureAPI.getInstance().setProvider(flagdProvider)

val props = Properties()
props[KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
props[VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java.name
Expand All @@ -44,10 +57,32 @@ fun main() {
.poll(ofMillis(100))
.fold(totalCount) { accumulator, record ->
val newCount = accumulator + 1
if (getFeatureFlagValue("kafkaQueueProblems") > 0) {
logger.info("FeatureFlag 'kafkaQueueProblems' is enabled, sleeping 1 second")
Thread.sleep(1000)
}
val orders = OrderResult.parseFrom(record.value())
logger.info("Consumed record with orderId: ${orders.orderId}, and updated total count to: $newCount")
newCount
}
}
}
}

/**
* Retrieves the status of a feature flag from the Feature Flag service.
*
* @param ff The name of the feature flag to retrieve.
* @return `true` if the feature flag is enabled, `false` otherwise or in case of errors.
*/
fun getFeatureFlagValue(ff: String): Int {
val client = OpenFeatureAPI.getInstance().client
// TODO: Plumb the actual session ID from the frontend via baggage?
val uuid = UUID.randomUUID()

val clientAttrs = mutableMapOf<String, Value>()
clientAttrs["session"] = Value(uuid.toString())
client.evaluationContext = ImmutableContext(clientAttrs)
val intValue = client.getIntegerValue(ff, 0)
return intValue
}
46 changes: 31 additions & 15 deletions src/frontend/gateways/Api.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const { userId } = SessionGateway.getSession();

const basePath = '/api';

const ApiGateway = () => ({
const Apis = () => ({
getCart(currencyCode: string) {
return request<IProductCart>({
url: `${basePath}/cart`,
Expand Down Expand Up @@ -79,25 +79,41 @@ const ApiGateway = () => ({
queryParams: {
productIds,
sessionId: userId,
currencyCode
currencyCode,
},
});
},
listAds(contextKeys: string[]) {
// TODO: Figure out a better way to do this so session ID gets propagated to
// all endpoints
const baggage = propagation.getActiveBaggage() || propagation.createBaggage();
const newBaggage = baggage.setEntry(AttributeNames.SESSION_ID, { value: userId });
const newContext = propagation.setBaggage(context.active(), newBaggage);
context.with(newContext, () => {
return request<Ad[]>({
url: `${basePath}/data`,
queryParams: {
contextKeys,
},
});
return request<Ad[]>({
url: `${basePath}/data`,
queryParams: {
contextKeys,
},
});
},
});

export default ApiGateway();
/**
* Extends all the API calls to set baggage automatically.
*/
const ApiGateway = new Proxy(Apis(), {
get(target, prop, receiver) {
const originalFunction = Reflect.get(target, prop, receiver);

if (typeof originalFunction !== 'function') {
return originalFunction;
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
return function (...args: any[]) {
const baggage = propagation.getActiveBaggage() || propagation.createBaggage();
const newBaggage = baggage.setEntry(AttributeNames.SESSION_ID, { value: userId });
const newContext = propagation.setBaggage(context.active(), newBaggage);
return context.with(newContext, () => {
return Reflect.apply(originalFunction, undefined, args);
});
};
},
});

export default ApiGateway;
Loading

0 comments on commit 25b4e3a

Please sign in to comment.