From 1247a9be248884eea273604a4cf9c57678ca7ea3 Mon Sep 17 00:00:00 2001 From: Thiago Hora Date: Thu, 9 Jan 2025 18:33:07 +0100 Subject: [PATCH] [OPIK-595]: Add Redis Caching part 1 --- apps/opik-backend/config.yml | 13 + apps/opik-backend/pom.xml | 14 +- .../java/com/comet/opik/OpikApplication.java | 3 +- .../AutomationRuleEvaluatorsResource.java | 2 +- .../domain/AutomationRuleEvaluatorModel.java | 1 + .../AutomationRuleEvaluatorService.java | 22 +- .../opik/domain/LlmProviderApiKeyService.java | 4 +- .../infrastructure/CacheConfiguration.java | 30 +++ .../infrastructure/OpikConfiguration.java | 4 + .../opik/infrastructure/cache/CacheEvict.java | 18 ++ .../cache/CacheInterceptor.java | 189 ++++++++++++++ .../infrastructure/cache/CacheManager.java | 15 ++ .../infrastructure/cache/CacheModule.java | 19 ++ .../opik/infrastructure/cache/CachePut.java | 17 ++ .../opik/infrastructure/cache/Cacheable.java | 18 ++ .../redis/RedisCacheManager.java | 42 ++++ .../infrastructure/redis/RedisModule.java | 7 + .../redis/RedisRateLimitService.java | 2 +- .../java/com/comet/opik/utils/JsonUtils.java | 8 + .../TestDropwizardAppExtensionUtils.java | 9 +- .../cache/CacheManagerTest.java | 233 ++++++++++++++++++ .../infrastructure/cache/CachedService.java | 43 ++++ .../src/test/resources/config-test.yml | 13 + 23 files changed, 705 insertions(+), 21 deletions(-) create mode 100644 apps/opik-backend/src/main/java/com/comet/opik/infrastructure/CacheConfiguration.java create mode 100644 apps/opik-backend/src/main/java/com/comet/opik/infrastructure/cache/CacheEvict.java create mode 100644 apps/opik-backend/src/main/java/com/comet/opik/infrastructure/cache/CacheInterceptor.java create mode 100644 apps/opik-backend/src/main/java/com/comet/opik/infrastructure/cache/CacheManager.java create mode 100644 apps/opik-backend/src/main/java/com/comet/opik/infrastructure/cache/CacheModule.java create mode 100644 apps/opik-backend/src/main/java/com/comet/opik/infrastructure/cache/CachePut.java create mode 100644 apps/opik-backend/src/main/java/com/comet/opik/infrastructure/cache/Cacheable.java create mode 100644 apps/opik-backend/src/main/java/com/comet/opik/infrastructure/redis/RedisCacheManager.java create mode 100644 apps/opik-backend/src/test/java/com/comet/opik/infrastructure/cache/CacheManagerTest.java create mode 100644 apps/opik-backend/src/test/java/com/comet/opik/infrastructure/cache/CachedService.java diff --git a/apps/opik-backend/config.yml b/apps/opik-backend/config.yml index c4ed04eb61..3a452fb41c 100644 --- a/apps/opik-backend/config.yml +++ b/apps/opik-backend/config.yml @@ -243,3 +243,16 @@ llmProviderClient: # Default: 2023-06-01 # Description: Anthropic API version https://docs.anthropic.com/en/api/versioning version: ${LLM_PROVIDER_ANTHROPIC_VERSION:-'2023-06-01'} + +# Configuration for cache manager +cacheManager: + # Default: true + # Description: Whether or not cache manager is enabled + enabled: ${CACHE_MANAGER_ENABLED:-true} + # Default: PT1S + # Description: Time to live for cache entries in using the formats accepted are based on the ISO-8601: https://docs.oracle.com/javase/8/docs/api/java/time/Duration.html#parse-java.lang.CharSequence- + defaultDuration: ${CACHE_MANAGER_DEFAULT_DURATION:-PT1S} + caches: + # Default: {} + # Description: Dynamically created caches with their respective time to live in seconds + automationRules: ${CACHE_MANAGER_AUTOMATION_RULES_DURATION:-PT1S} diff --git a/apps/opik-backend/pom.xml b/apps/opik-backend/pom.xml index b39e700a90..3b8afc6fb7 100644 --- a/apps/opik-backend/pom.xml +++ b/apps/opik-backend/pom.xml @@ -24,7 +24,7 @@ 3.47.0 3.0.0 0.7.2 - 0.7.0 + 0.7.2 1.6.2 1.20.2 5.1.0 @@ -198,6 +198,16 @@ httpclient5 5.2.3 + + org.mvel + mvel2 + 2.5.2.Final + + + com.thoughtworks.paranamer + paranamer + 2.8 + org.mapstruct mapstruct @@ -286,7 +296,7 @@ com.clickhouse clickhouse-jdbc - 0.7.0 + ${clickhouse-java.version} test diff --git a/apps/opik-backend/src/main/java/com/comet/opik/OpikApplication.java b/apps/opik-backend/src/main/java/com/comet/opik/OpikApplication.java index 3c025b9d02..0d497603e8 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/OpikApplication.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/OpikApplication.java @@ -8,6 +8,7 @@ import com.comet.opik.infrastructure.bi.BiModule; import com.comet.opik.infrastructure.bi.OpikGuiceyLifecycleEventListener; import com.comet.opik.infrastructure.bundle.LiquibaseBundle; +import com.comet.opik.infrastructure.cache.CacheModule; import com.comet.opik.infrastructure.db.DatabaseAnalyticsModule; import com.comet.opik.infrastructure.db.IdGeneratorModule; import com.comet.opik.infrastructure.db.NameGeneratorModule; @@ -72,7 +73,7 @@ public void initialize(Bootstrap bootstrap) { .withPlugins(new SqlObjectPlugin(), new Jackson2Plugin())) .modules(new DatabaseAnalyticsModule(), new IdGeneratorModule(), new AuthModule(), new RedisModule(), new RateLimitModule(), new NameGeneratorModule(), new HttpModule(), new EventModule(), - new ConfigurationModule(), new BiModule()) + new ConfigurationModule(), new BiModule(), new CacheModule()) .installers(JobGuiceyInstaller.class) .listen(new OpikGuiceyLifecycleEventListener()) .enableAutoConfig() diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/priv/AutomationRuleEvaluatorsResource.java b/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/priv/AutomationRuleEvaluatorsResource.java index 41c70c97e5..5e6a55ab76 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/priv/AutomationRuleEvaluatorsResource.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/priv/AutomationRuleEvaluatorsResource.java @@ -86,7 +86,7 @@ public Response getEvaluator(@PathParam("projectId") UUID projectId, @PathParam( String workspaceId = requestContext.get().getWorkspaceId(); log.info("Looking for automated evaluator: id '{}' on project_id '{}'", projectId, workspaceId); - AutomationRuleEvaluator evaluator = service.findById(evaluatorId, projectId, workspaceId); + AutomationRuleEvaluator evaluator = service.findById(evaluatorId, projectId, workspaceId); log.info("Found automated evaluator: id '{}' on project_id '{}'", projectId, workspaceId); return Response.ok().entity(evaluator).build(); diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/AutomationRuleEvaluatorModel.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/AutomationRuleEvaluatorModel.java index a630c9699e..affd7b1abe 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/AutomationRuleEvaluatorModel.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/AutomationRuleEvaluatorModel.java @@ -9,6 +9,7 @@ public sealed interface AutomationRuleEvaluatorModel extends AutomationRuleMo @Json T code(); + AutomationRuleEvaluatorType type(); @Override diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/AutomationRuleEvaluatorService.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/AutomationRuleEvaluatorService.java index 191e9c8fd6..48be080807 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/AutomationRuleEvaluatorService.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/AutomationRuleEvaluatorService.java @@ -25,6 +25,7 @@ import java.util.Set; import java.util.UUID; +import static com.comet.opik.api.AutomationRuleEvaluator.AutomationRuleEvaluatorPage; import static com.comet.opik.infrastructure.db.TransactionTemplateAsync.READ_ONLY; import static com.comet.opik.infrastructure.db.TransactionTemplateAsync.WRITE; @@ -42,14 +43,14 @@ > T findById(@NonNull UUID id, @NonNull void delete(@NonNull Set ids, @NonNull UUID projectId, @NonNull String workspaceId); - AutomationRuleEvaluator.AutomationRuleEvaluatorPage find(@NonNull UUID projectId, @NonNull String workspaceId, + AutomationRuleEvaluatorPage find(@NonNull UUID projectId, @NonNull String workspaceId, String name, int page, int size); List findAll(@NonNull UUID projectId, @NonNull String workspaceId, AutomationRuleEvaluatorType automationRuleEvaluatorType); } -@NonNull @Singleton +@Singleton @RequiredArgsConstructor(onConstructor_ = @Inject) @Slf4j class AutomationRuleEvaluatorServiceImpl implements AutomationRuleEvaluatorService { @@ -58,10 +59,9 @@ class AutomationRuleEvaluatorServiceImpl implements AutomationRuleEvaluatorServi private final @NonNull IdGenerator idGenerator; private final @NonNull TransactionTemplate template; - private final int DEFAULT_PAGE_LIMIT = 10; @Override - public > T save(T inputRuleEvaluator, + public > T save(@NonNull T inputRuleEvaluator, @NonNull String workspaceId, @NonNull String userName) { @@ -146,7 +146,7 @@ public > T findById(@NonNull UUID id, @N log.debug("Finding AutomationRuleEvaluator with id '{}' in projectId '{}' and workspaceId '{}'", id, projectId, workspaceId); - return (T) template.inTransaction(READ_ONLY, handle -> { + return template.inTransaction(READ_ONLY, handle -> { var dao = handle.attach(AutomationRuleEvaluatorDAO.class); var singleIdSet = Collections.singleton(id); var criteria = AutomationRuleEvaluatorCriteria.builder().ids(singleIdSet).build(); @@ -157,6 +157,7 @@ public > T findById(@NonNull UUID id, @N case LlmAsJudgeAutomationRuleEvaluatorModel llmAsJudge -> AutomationModelEvaluatorMapper.INSTANCE.map(llmAsJudge); }) + .map(evaluator -> (T) evaluator) .orElseThrow(this::newNotFoundException); }); } @@ -187,10 +188,8 @@ private NotFoundException newNotFoundException() { } @Override - public AutomationRuleEvaluator.AutomationRuleEvaluatorPage find(@NonNull UUID projectId, - @NonNull String workspaceId, - String name, - int pageNum, int size) { + public AutomationRuleEvaluatorPage find(@NonNull UUID projectId, @NonNull String workspaceId, + String name, int pageNum, int size) { log.debug("Finding AutomationRuleEvaluators with name pattern '{}' in projectId '{}' and workspaceId '{}'", name, projectId, workspaceId); @@ -210,10 +209,9 @@ public AutomationRuleEvaluator.AutomationRuleEvaluatorPage find(@NonNull UUID pr .toList(); log.info("Found {} AutomationRuleEvaluators for projectId '{}'", automationRuleEvaluators.size(), projectId); - return new AutomationRuleEvaluator.AutomationRuleEvaluatorPage(pageNum, automationRuleEvaluators.size(), - total, - automationRuleEvaluators); + return new AutomationRuleEvaluatorPage(pageNum, automationRuleEvaluators.size(), total, + automationRuleEvaluators); }); } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/LlmProviderApiKeyService.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/LlmProviderApiKeyService.java index 6eee584a7d..a1c75842ef 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/LlmProviderApiKeyService.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/LlmProviderApiKeyService.java @@ -50,14 +50,12 @@ class LlmProviderApiKeyServiceImpl implements LlmProviderApiKeyService { @Override public ProviderApiKey find(@NonNull UUID id, @NonNull String workspaceId) { - ProviderApiKey providerApiKey = template.inTransaction(READ_ONLY, handle -> { + return template.inTransaction(READ_ONLY, handle -> { var repository = handle.attach(LlmProviderApiKeyDAO.class); return repository.fetch(id, workspaceId).orElseThrow(this::createNotFoundError); }); - - return providerApiKey; } @Override diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/CacheConfiguration.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/CacheConfiguration.java new file mode 100644 index 0000000000..087f24eeef --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/CacheConfiguration.java @@ -0,0 +1,30 @@ +package com.comet.opik.infrastructure; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; +import lombok.Data; + +import java.time.Duration; +import java.util.Map; +import java.util.Optional; + +@Data +public class CacheConfiguration { + + @Valid + @JsonProperty + private boolean enabled = false; + + @Valid + @JsonProperty + @NotNull private Duration defaultDuration; + + @Valid + @JsonProperty + private Map caches; + + public Map getCaches() { + return Optional.ofNullable(caches).orElse(Map.of()); + } +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/OpikConfiguration.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/OpikConfiguration.java index 9ff1f39091..16952cb6de 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/OpikConfiguration.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/OpikConfiguration.java @@ -63,4 +63,8 @@ public class OpikConfiguration extends JobConfiguration { @Valid @NotNull @JsonProperty private LlmProviderClientConfig llmProviderClient = new LlmProviderClientConfig(); + + @Valid + @NotNull @JsonProperty + private CacheConfiguration cacheManager = new CacheConfiguration(); } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/cache/CacheEvict.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/cache/CacheEvict.java new file mode 100644 index 0000000000..157d921a0e --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/cache/CacheEvict.java @@ -0,0 +1,18 @@ +package com.comet.opik.infrastructure.cache; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +@Documented +public @interface CacheEvict { + + String name(); + String key(); +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/cache/CacheInterceptor.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/cache/CacheInterceptor.java new file mode 100644 index 0000000000..f39e20cdb4 --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/cache/CacheInterceptor.java @@ -0,0 +1,189 @@ +package com.comet.opik.infrastructure.cache; + +import com.comet.opik.infrastructure.CacheConfiguration; +import com.thoughtworks.paranamer.BytecodeReadingParanamer; +import jakarta.inject.Provider; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.aopalliance.intercept.MethodInterceptor; +import org.aopalliance.intercept.MethodInvocation; +import org.mvel2.MVEL; +import reactor.core.publisher.Mono; + +import java.lang.reflect.Method; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.function.BiFunction; +import java.util.function.Function; + +@Slf4j +@RequiredArgsConstructor +public class CacheInterceptor implements MethodInterceptor { + + private static final BytecodeReadingParanamer PARANAMER = new BytecodeReadingParanamer(); + private final Provider cacheManager; + private final CacheConfiguration cacheConfiguration; + + @Override + public Object invoke(MethodInvocation invocation) throws Throwable { + Method method = invocation.getMethod(); + + if (!cacheConfiguration.isEnabled()) { + return invocation.proceed(); + } + + boolean isReactive = method.getReturnType().isAssignableFrom(Mono.class); + + var cacheable = method.getAnnotation(Cacheable.class); + if (cacheable != null) { + return runCacheAwareAction(invocation, isReactive, cacheable.name(), cacheable.key(), + (key, name) -> processCacheableMethod(invocation, isReactive, key, name, cacheable)); + } + + var cachePut = method.getAnnotation(CachePut.class); + if (cachePut != null) { + return runCacheAwareAction(invocation, isReactive, cachePut.name(), cachePut.key(), + (key, name) -> processCachePutMethod(invocation, isReactive, key, name)); + } + + var cacheEvict = method.getAnnotation(CacheEvict.class); + if (cacheEvict != null) { + return runCacheAwareAction(invocation, isReactive, cacheEvict.name(), cacheEvict.key(), + (key, name) -> processCacheEvictMethod(invocation, isReactive, key)); + } + + return invocation.proceed(); + } + + private Object runCacheAwareAction(MethodInvocation invocation, boolean isReactive, String name, String keyAgs, + BiFunction> action) throws Throwable { + + String key; + + try { + key = getKeyName(name, keyAgs, invocation); + } catch (Exception e) { + // If there is an error evaluating the key, proceed without caching + log.warn("Error getting key using expression: {}", keyAgs, e); + return invocation.proceed(); + } + + if (isReactive) { + return action.apply(key, name); + } + + return action.apply(key, name).block(); + } + + private Mono processCacheEvictMethod(MethodInvocation invocation, boolean isReactive, String key) { + if (isReactive) { + try { + return ((Mono) invocation.proceed()) + .flatMap(value -> cacheManager.get().evict(key).thenReturn(value)) + .switchIfEmpty(cacheManager.get().evict(key).then(Mono.empty())) + .map(Function.identity()); + } catch (Throwable e) { + return Mono.error(e); + } + } else { + try { + var value = invocation.proceed(); + if (value == null) { + return cacheManager.get().evict(key).then(Mono.empty()); + } + return cacheManager.get().evict(key).thenReturn(value); + } catch (Throwable e) { + return Mono.error(e); + } + } + } + + private Mono processCachePutMethod(MethodInvocation invocation, boolean isReactive, String key, + String name) { + if (isReactive) { + try { + return ((Mono) invocation.proceed()).flatMap(value -> cachePut(value, key, name)); + } catch (Throwable e) { + return Mono.error(e); + } + } else { + try { + var value = invocation.proceed(); + return cachePut(value, key, name).thenReturn(value); + } catch (Throwable e) { + return Mono.error(e); + } + } + } + + private Mono processCacheableMethod(MethodInvocation invocation, boolean isReactive, String key, + String name, Cacheable cacheable) { + + if (isReactive) { + return cacheManager.get().get(key, cacheable.returnType()) + .map(Object.class::cast) + .switchIfEmpty(processCacheMiss(invocation, key, name)); + } else { + return cacheManager.get().get(key, invocation.getMethod().getReturnType()) + .map(Object.class::cast) + .switchIfEmpty(processSyncCacheMiss(invocation, key, name)); + } + } + + private Mono processSyncCacheMiss(MethodInvocation invocation, String key, String name) { + return Mono.defer(() -> { + try { + return Mono.just(invocation.proceed()); + } catch (Throwable e) { + return Mono.error(e); + } + }) + .flatMap(value -> cachePut(value, key, name)); + } + + private Mono processCacheMiss(MethodInvocation invocation, String key, String name) { + return Mono.defer(() -> { + try { + return ((Mono) invocation.proceed()) + .flatMap(value -> cachePut(value, key, name)); + } catch (Throwable e) { + return Mono.error(e); + } + }); + } + + private Mono cachePut(Object value, String key, String name) { + Duration ttlDuration = cacheConfiguration.getCaches().getOrDefault(name, + cacheConfiguration.getDefaultDuration()); + return cacheManager.get().put(key, value, ttlDuration) + .thenReturn(value) + .onErrorResume(e -> { + log.error("Error putting value in cache", e); + return Mono.just(value); + }); + } + + private String getKeyName(String name, String key, MethodInvocation invocation) { + Map params = new HashMap<>(); + + // Use Paranamer to resolve parameter names + String[] parameters = PARANAMER.lookupParameterNames(invocation.getMethod()); + Object[] args = invocation.getArguments(); + + // Populate the context map with parameter names and values + for (int i = 0; i < invocation.getMethod().getParameterCount(); i++) { + Object value = args[i]; + params.put("$" + parameters[i], value != null ? value : ""); // Null safety + } + + try { + String evaluatedKey = MVEL.evalToString(key, params); + return "%s-%s".formatted(name, evaluatedKey); + } catch (Exception e) { + log.error("Error evaluating key expression: {}", key, e); + throw new IllegalArgumentException("Error evaluating key expression: " + key); + } + } + +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/cache/CacheManager.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/cache/CacheManager.java new file mode 100644 index 0000000000..9ade63c784 --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/cache/CacheManager.java @@ -0,0 +1,15 @@ +package com.comet.opik.infrastructure.cache; + +import lombok.NonNull; +import reactor.core.publisher.Mono; + +import java.time.Duration; + +public interface CacheManager { + + Mono evict(@NonNull String key); + Mono put(@NonNull String key, @NonNull Object value, @NonNull Duration ttlDuration); + Mono get(@NonNull String key, @NonNull Class clazz); + Mono contains(@NonNull String key); + +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/cache/CacheModule.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/cache/CacheModule.java new file mode 100644 index 0000000000..e64cc4d34d --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/cache/CacheModule.java @@ -0,0 +1,19 @@ +package com.comet.opik.infrastructure.cache; + +import com.comet.opik.infrastructure.OpikConfiguration; +import com.google.inject.matcher.Matchers; +import ru.vyarus.dropwizard.guice.module.support.DropwizardAwareModule; + +public class CacheModule extends DropwizardAwareModule { + + @Override + protected void configure() { + var cacheManagerProvider = getProvider(CacheManager.class); + var cacheManagerConfig = configuration().getCacheManager(); + var cacheInterceptor = new CacheInterceptor(cacheManagerProvider, cacheManagerConfig); + + bindInterceptor(Matchers.any(), Matchers.annotatedWith(Cacheable.class), cacheInterceptor); + bindInterceptor(Matchers.any(), Matchers.annotatedWith(CachePut.class), cacheInterceptor); + bindInterceptor(Matchers.any(), Matchers.annotatedWith(CacheEvict.class), cacheInterceptor); + } +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/cache/CachePut.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/cache/CachePut.java new file mode 100644 index 0000000000..cc210b127c --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/cache/CachePut.java @@ -0,0 +1,17 @@ +package com.comet.opik.infrastructure.cache; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +@Documented +public @interface CachePut { + String name(); + String key(); +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/cache/Cacheable.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/cache/Cacheable.java new file mode 100644 index 0000000000..c14aaf8908 --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/cache/Cacheable.java @@ -0,0 +1,18 @@ +package com.comet.opik.infrastructure.cache; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +@Documented +public @interface Cacheable { + String name(); + String key(); + Class returnType(); +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/redis/RedisCacheManager.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/redis/RedisCacheManager.java new file mode 100644 index 0000000000..f596437c14 --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/redis/RedisCacheManager.java @@ -0,0 +1,42 @@ +package com.comet.opik.infrastructure.redis; + +import com.comet.opik.infrastructure.cache.CacheManager; +import com.comet.opik.utils.JsonUtils; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.StringUtils; +import org.jetbrains.annotations.NotNull; +import org.redisson.api.RedissonReactiveClient; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +import java.time.Duration; + +@RequiredArgsConstructor +class RedisCacheManager implements CacheManager { + + private final RedissonReactiveClient redisClient; + + public Mono evict(@NonNull String key) { + return redisClient.getBucket(key).delete(); + } + + public Mono put(@NotNull String key, @NotNull Object value, @NotNull Duration ttlDuration) { + return Mono.fromCallable(() -> JsonUtils.writeValueAsString(value)) + .subscribeOn(Schedulers.boundedElastic()) + .flatMap(json -> redisClient.getBucket(key).set(json)) + .then(Mono.defer(() -> redisClient.getBucket(key).expire(ttlDuration))); + } + + public Mono get(@NonNull String key, @NotNull Class clazz) { + return redisClient.getBucket(key) + .get() + .filter(StringUtils::isNotEmpty) + .flatMap(json -> Mono.fromCallable(() -> JsonUtils.readValue(json, clazz)) + .subscribeOn(Schedulers.boundedElastic())); + } + + public Mono contains(@NonNull String key) { + return redisClient.getBucket(key).isExists(); + } +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/redis/RedisModule.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/redis/RedisModule.java index adc5b651cc..cf7f7f21e6 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/redis/RedisModule.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/redis/RedisModule.java @@ -3,6 +3,7 @@ import com.comet.opik.infrastructure.DistributedLockConfig; import com.comet.opik.infrastructure.OpikConfiguration; import com.comet.opik.infrastructure.RedisConfig; +import com.comet.opik.infrastructure.cache.CacheManager; import com.comet.opik.infrastructure.lock.LockService; import com.comet.opik.infrastructure.ratelimit.RateLimitService; import com.google.inject.Provides; @@ -33,4 +34,10 @@ public RateLimitService rateLimitService(RedissonReactiveClient redisClient) { return new RedisRateLimitService(redisClient); } + @Provides + @Singleton + public CacheManager cacheManager(RedissonReactiveClient redisClient) { + return new RedisCacheManager(redisClient); + } + } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/redis/RedisRateLimitService.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/redis/RedisRateLimitService.java index 93a22ff86f..c1b3129a08 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/redis/RedisRateLimitService.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/redis/RedisRateLimitService.java @@ -17,7 +17,7 @@ @RequiredArgsConstructor @Slf4j -public class RedisRateLimitService implements RateLimitService { +class RedisRateLimitService implements RateLimitService { private static final String KEY = "%s:%s"; diff --git a/apps/opik-backend/src/main/java/com/comet/opik/utils/JsonUtils.java b/apps/opik-backend/src/main/java/com/comet/opik/utils/JsonUtils.java index 61e04f22ae..244d65722a 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/utils/JsonUtils.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/utils/JsonUtils.java @@ -51,6 +51,14 @@ public T readValue(@NonNull String content, @NonNull TypeReference valueT } } + public T readValue(@NonNull String content, @NonNull Class valueTypeRef) { + try { + return MAPPER.readValue(content, valueTypeRef); + } catch (JsonProcessingException exception) { + throw new UncheckedIOException(exception); + } + } + public T readValue(@NonNull InputStream inputStream, @NonNull TypeReference valueTypeRef) { try { return MAPPER.readValue(inputStream, valueTypeRef); diff --git a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/TestDropwizardAppExtensionUtils.java b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/TestDropwizardAppExtensionUtils.java index 85112e762b..b5374f8f22 100644 --- a/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/TestDropwizardAppExtensionUtils.java +++ b/apps/opik-backend/src/test/java/com/comet/opik/api/resources/utils/TestDropwizardAppExtensionUtils.java @@ -5,6 +5,7 @@ import com.comet.opik.infrastructure.events.EventModule; import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; import com.google.common.eventbus.EventBus; +import com.google.inject.AbstractModule; import lombok.Builder; import lombok.experimental.UtilityClass; import org.apache.commons.collections4.CollectionUtils; @@ -16,6 +17,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import static com.comet.opik.infrastructure.RateLimitConfig.LimitConfig; @@ -45,7 +47,8 @@ public record AppContextConfig( String metadataVersion, EventBus mockEventBus, boolean corsEnabled, - List customConfigs) { + List customConfigs, + List modules) { } public static TestDropwizardAppExtension newTestDropwizardAppExtension(String jdbcUrl, @@ -125,6 +128,10 @@ public static TestDropwizardAppExtension newTestDropwizardAppExtension(AppContex GuiceyConfigurationHook hook = injector -> { injector.modulesOverride(TestHttpClientUtils.testAuthModule()); + Optional.ofNullable(appContextConfig.modules) + .orElse(List.of()) + .forEach(injector::modulesOverride); + if (appContextConfig.mockEventBus() != null) { injector.modulesOverride(new EventModule() { @Override diff --git a/apps/opik-backend/src/test/java/com/comet/opik/infrastructure/cache/CacheManagerTest.java b/apps/opik-backend/src/test/java/com/comet/opik/infrastructure/cache/CacheManagerTest.java new file mode 100644 index 0000000000..07c4d6422e --- /dev/null +++ b/apps/opik-backend/src/test/java/com/comet/opik/infrastructure/cache/CacheManagerTest.java @@ -0,0 +1,233 @@ +package com.comet.opik.infrastructure.cache; + +import com.comet.opik.api.resources.utils.ClickHouseContainerUtils; +import com.comet.opik.api.resources.utils.ClientSupportUtils; +import com.comet.opik.api.resources.utils.MigrationUtils; +import com.comet.opik.api.resources.utils.MySQLContainerUtils; +import com.comet.opik.api.resources.utils.RedisContainerUtils; +import com.comet.opik.api.resources.utils.TestDropwizardAppExtensionUtils; +import com.comet.opik.api.resources.utils.TestDropwizardAppExtensionUtils.AppContextConfig; +import com.comet.opik.api.resources.utils.TestDropwizardAppExtensionUtils.CustomConfig; +import com.google.inject.AbstractModule; +import com.google.inject.Singleton; +import com.redis.testcontainers.RedisContainer; +import org.assertj.core.api.Assertions; +import org.jdbi.v3.core.Jdbi; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.lifecycle.Startables; +import reactor.core.publisher.Mono; +import ru.vyarus.dropwizard.guice.test.ClientSupport; +import ru.vyarus.dropwizard.guice.test.jupiter.ext.TestDropwizardAppExtension; + +import java.time.Duration; +import java.util.List; +import java.util.UUID; + +import static com.comet.opik.api.resources.utils.ClickHouseContainerUtils.DATABASE_NAME; +import static com.comet.opik.api.resources.utils.MigrationUtils.CLICKHOUSE_CHANGELOG_FILE; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class CacheManagerTest { + + private static final RedisContainer REDIS = RedisContainerUtils.newRedisContainer(); + private static final MySQLContainer MYSQL = MySQLContainerUtils.newMySQLContainer(); + private static final ClickHouseContainer CLICKHOUSE = ClickHouseContainerUtils.newClickHouseContainer(); + + static final String CACHE_NAME_1 = "test"; + static final String CACHE_NAME_2 = "test2"; + + @RegisterExtension + private static final TestDropwizardAppExtension APP; + + static { + Startables.deepStart(MYSQL, CLICKHOUSE, REDIS).join(); + + var databaseAnalyticsFactory = ClickHouseContainerUtils.newDatabaseAnalyticsFactory( + CLICKHOUSE, DATABASE_NAME); + + APP = TestDropwizardAppExtensionUtils.newTestDropwizardAppExtension( + AppContextConfig.builder() + .jdbcUrl(MYSQL.getJdbcUrl()) + .databaseAnalyticsFactory(databaseAnalyticsFactory) + .redisUrl(REDIS.getRedisURI()) + .modules(List.of(new AbstractModule() { + + @Override + protected void configure() { + bind(CachedService.class).in(Singleton.class); + } + + })) + .customConfigs( + List.of( + new CustomConfig("cacheManager.enabled", "true"), + new CustomConfig("cacheManager.defaultDuration", "PT0.500S"), + new CustomConfig("cacheManager.caches.%s".formatted(CACHE_NAME_2), "PT0.200S"))) + .build()); + } + + private String baseURI; + private ClientSupport client; + + @BeforeAll + void setUpAll(ClientSupport client, Jdbi jdbi) throws Exception { + + MigrationUtils.runDbMigration(jdbi, MySQLContainerUtils.migrationParameters()); + + try (var connection = CLICKHOUSE.createConnection("")) { + MigrationUtils.runDbMigration(connection, CLICKHOUSE_CHANGELOG_FILE, + ClickHouseContainerUtils.migrationParameters()); + } + + this.baseURI = "http://localhost:%d".formatted(client.getPort()); + this.client = client; + + ClientSupportUtils.config(client); + } + + @Test + void testCacheable__whenCacheableExpire__shouldCallRealMethodAgain(CachedService service) { + + String id = UUID.randomUUID().toString(); + String workspaceId = UUID.randomUUID().toString(); + + // first call, should call real method + var dto = service.get(id, workspaceId); + + // second call, should return cached value + var dto2 = service.get(id, workspaceId); + + Assertions.assertThat(dto).isEqualTo(dto2); + + // wait for cache to expire + Mono.delay(Duration.ofMillis(500)).block(); + + // third call, should call real method again + var dto3 = service.get(id, workspaceId); + + Assertions.assertThat(dto).isNotEqualTo(dto3); + } + + @Test + void testCachePut__whenCachePut__shouldUpdateCache(CachedService service) { + + String id = UUID.randomUUID().toString(); + String workspaceId = UUID.randomUUID().toString(); + + // first call, should call real method + var dto = service.get(id, workspaceId); + + // second call, should return cached value + var dto2 = service.get(id, workspaceId); + + Assertions.assertThat(dto).isEqualTo(dto2); + + // update value + var updatedDto = new CachedService.DTO(id, workspaceId, UUID.randomUUID().toString()); + updatedDto = service.update(id, workspaceId, updatedDto); + + // third call, should return updated value + var dto3 = service.get(id, workspaceId); + + Assertions.assertThat(updatedDto).isEqualTo(dto3); + } + + @Test + void testCacheEvict__whenCacheEvict__shouldRemoveCache(CachedService service) { + + String id = UUID.randomUUID().toString(); + String workspaceId = UUID.randomUUID().toString(); + + // first call, should call real method + var dto = service.get(id, workspaceId); + + // second call, should return cached value + var dto2 = service.get(id, workspaceId); + + Assertions.assertThat(dto).isEqualTo(dto2); + + // evict cache + service.evict(id, workspaceId); + + // third call, should call real method again + var dto3 = service.get(id, workspaceId); + + Assertions.assertThat(dto).isNotEqualTo(dto3); + } + + @Test + void testCacheable__whenCacheableExpire__shouldCallRealMethodAgain2(CachedService service) { + + String id = UUID.randomUUID().toString(); + String workspaceId = UUID.randomUUID().toString(); + + // first call, should call real method + var dto = service.get2(id, workspaceId).block(); + + // second call, should return cached value + var dto2 = service.get2(id, workspaceId).block(); + + Assertions.assertThat(dto).isEqualTo(dto2); + + // wait for cache to expire + Mono.delay(Duration.ofMillis(200)).block(); + + // third call, should call real method again + var dto3 = service.get2(id, workspaceId).block(); + + Assertions.assertThat(dto).isNotEqualTo(dto3); + } + + @Test + void testCachePut__whenCachePut__shouldUpdateCache2(CachedService service) { + + String id = UUID.randomUUID().toString(); + String workspaceId = UUID.randomUUID().toString(); + + // first call, should call real method + var dto = service.get2(id, workspaceId).block(); + + // second call, should return cached value + var dto2 = service.get2(id, workspaceId).block(); + + Assertions.assertThat(dto).isEqualTo(dto2); + + // update value + var updatedDto = new CachedService.DTO(id, workspaceId, UUID.randomUUID().toString()); + updatedDto = service.update2(id, workspaceId, updatedDto).block(); + + // third call, should return updated value + var dto3 = service.get2(id, workspaceId).block(); + + Assertions.assertThat(updatedDto).isEqualTo(dto3); + } + + @Test + void testCacheEvict__whenCacheEvict__shouldRemoveCache2(CachedService service) { + + String id = UUID.randomUUID().toString(); + String workspaceId = UUID.randomUUID().toString(); + + // first call, should call real method + var dto = service.get2(id, workspaceId).block(); + + // second call, should return cached value + var dto2 = service.get2(id, workspaceId).block(); + + Assertions.assertThat(dto).isEqualTo(dto2); + + // evict cache + service.evict2(id, workspaceId).block(); + + // third call, should call real method again + var dto3 = service.get2(id, workspaceId).block(); + + Assertions.assertThat(dto).isNotEqualTo(dto3); + } + +} \ No newline at end of file diff --git a/apps/opik-backend/src/test/java/com/comet/opik/infrastructure/cache/CachedService.java b/apps/opik-backend/src/test/java/com/comet/opik/infrastructure/cache/CachedService.java new file mode 100644 index 0000000000..360d5ad580 --- /dev/null +++ b/apps/opik-backend/src/test/java/com/comet/opik/infrastructure/cache/CachedService.java @@ -0,0 +1,43 @@ +package com.comet.opik.infrastructure.cache; + +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Mono; + +import java.util.UUID; + +@Slf4j +public class CachedService { + + public record DTO(String id, String workspaceId, String value) { + } + + @Cacheable(name = CacheManagerTest.CACHE_NAME_1, key = "$id +'-'+ $workspaceId", returnType = DTO.class) + public DTO get(String id, String workspaceId) { + return new DTO(id, workspaceId, UUID.randomUUID().toString()); + } + + @CachePut(name = CacheManagerTest.CACHE_NAME_1, key = "$id +'-'+ $workspaceId") + public DTO update(String id, String workspaceId, DTO dto) { + return new DTO(id, workspaceId, dto.value()); + } + + @CacheEvict(name = CacheManagerTest.CACHE_NAME_1, key = "$id +'-'+ $workspaceId") + public void evict(String id, String workspaceId) { + } + + @Cacheable(name = CacheManagerTest.CACHE_NAME_2, key = "$id +'-'+ $workspaceId", returnType = DTO.class) + public Mono get2(String id, String workspaceId) { + return Mono.just(new DTO(id, workspaceId, UUID.randomUUID().toString())); + } + + @CachePut(name = CacheManagerTest.CACHE_NAME_2, key = "$id +'-'+ $workspaceId") + public Mono update2(String id, String workspaceId, DTO dto) { + return Mono.just(new DTO(id, workspaceId, dto.value())); + } + + @CacheEvict(name = CacheManagerTest.CACHE_NAME_2, key = "$id +'-'+ $workspaceId") + public Mono evict2(String id, String workspaceId) { + return Mono.empty(); + } + +} diff --git a/apps/opik-backend/src/test/resources/config-test.yml b/apps/opik-backend/src/test/resources/config-test.yml index a80d0e6206..e13a6540ea 100644 --- a/apps/opik-backend/src/test/resources/config-test.yml +++ b/apps/opik-backend/src/test/resources/config-test.yml @@ -205,3 +205,16 @@ llmProviderClient: # Default: 2023-06-01 # Description: Anthropic API version https://docs.anthropic.com/en/api/versioning version: '2023-06-01' + +# Configuration for cache manager +cacheManager: + # Default: false + # Description: Whether or not cache manager is enabled + enabled: false + # Default: PT1S + # Description: Time to live for cache entries in using the formats accepted are based on the ISO-8601: https://docs.oracle.com/javase/8/docs/api/java/time/Duration.html#parse-java.lang.CharSequence- + defaultDuration: PT1S + caches: + # Default: {} + # Description: Dynamically created caches with their respective time to live in seconds + testCache: PT1S