Skip to content

Commit

Permalink
#4682 PlanB State Store
Browse files Browse the repository at this point in the history
  • Loading branch information
stroomdev66 committed Jan 3, 2025
1 parent c3bf8d3 commit bfc6329
Show file tree
Hide file tree
Showing 15 changed files with 115 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,18 @@
import stroom.pipeline.refdata.LookupIdentifier;
import stroom.pipeline.refdata.ReferenceDataResult;

public interface StateLookup {
import jakarta.inject.Inject;

import java.util.Set;

public class StateLookup {

private final Set<StateLookupProvider> providers;

@Inject
StateLookup(final Set<StateLookupProvider> providers) {
this.providers = providers;
}

/**
* <p>
Expand All @@ -17,6 +28,8 @@ public interface StateLookup {
* @param lookupIdentifier The identifier to lookup in the reference data
* @param result The reference result object containing the proxy object for performing the lookup
*/
void lookup(LookupIdentifier lookupIdentifier,
ReferenceDataResult result);
public void lookup(LookupIdentifier lookupIdentifier,
ReferenceDataResult result) {
providers.forEach(provider -> provider.lookup(lookupIdentifier, result));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import stroom.pipeline.refdata.LookupIdentifier;
import stroom.pipeline.refdata.ReferenceDataResult;

public interface PlanBLookup {
public interface StateLookupProvider {

/**
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import stroom.query.api.v2.SearchRequest;
import stroom.query.api.v2.SearchRequestSource;
import stroom.query.language.functions.ExpressionContext;
import stroom.query.language.functions.StateProvider;
import stroom.query.language.functions.StateFetcher;
import stroom.query.language.functions.ValNull;

import jakarta.inject.Inject;
Expand All @@ -14,18 +14,18 @@ public class ExpressionContextFactory {

private final Provider<AnalyticResultStoreConfig> analyticResultStoreConfigProvider;
private final Provider<SearchResultStoreConfig> searchResultStoreConfigProvider;
private final Provider<StateProvider> stateProviderProvider;
private final Provider<StateFetcher> stateProviderProvider;

public ExpressionContextFactory() {
this.analyticResultStoreConfigProvider = AnalyticResultStoreConfig::new;
this.searchResultStoreConfigProvider = SearchResultStoreConfig::new;
stateProviderProvider = () -> (StateProvider) (map, key, effectiveTimeMs) -> ValNull.INSTANCE;
stateProviderProvider = () -> (StateFetcher) (map, key, effectiveTimeMs) -> ValNull.INSTANCE;
}

@Inject
public ExpressionContextFactory(final Provider<AnalyticResultStoreConfig> analyticResultStoreConfigProvider,
final Provider<SearchResultStoreConfig> searchResultStoreConfigProvider,
final Provider<StateProvider> stateProviderProvider) {
final Provider<StateFetcher> stateProviderProvider) {
this.analyticResultStoreConfigProvider = analyticResultStoreConfigProvider;
this.searchResultStoreConfigProvider = searchResultStoreConfigProvider;
this.stateProviderProvider = stateProviderProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public class ExpressionContext {

private final int maxStringLength;
private final DateTimeSettings dateTimeSettings;
private final StateProvider stateProvider;
private final StateFetcher stateProvider;

public ExpressionContext() {
this.maxStringLength = 100;
Expand All @@ -21,7 +21,7 @@ public ExpressionContext() {
@JsonCreator
public ExpressionContext(final int maxStringLength,
final DateTimeSettings dateTimeSettings,
final StateProvider stateProvider) {
final StateFetcher stateProvider) {
this.maxStringLength = maxStringLength;
this.dateTimeSettings = dateTimeSettings;
this.stateProvider = stateProvider;
Expand All @@ -35,7 +35,7 @@ public DateTimeSettings getDateTimeSettings() {
return dateTimeSettings;
}

public StateProvider getStateProvider() {
public StateFetcher getStateProvider() {
return stateProvider;
}

Expand Down Expand Up @@ -81,7 +81,7 @@ public static final class Builder {

private int maxStringLength;
private DateTimeSettings dateTimeSettings;
private StateProvider stateProvider;
private StateFetcher stateProvider;

private Builder() {
}
Expand All @@ -102,7 +102,7 @@ public Builder dateTimeSettings(final DateTimeSettings dateTimeSettings) {
return this;
}

public Builder stateProvider(final StateProvider stateProvider) {
public Builder stateProvider(final StateFetcher stateProvider) {
this.stateProvider = stateProvider;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
commonReturnDescription = "The state value if found else null.",
signatures = @FunctionSignature(
description = "Lookup a value from a state map using a key and optional effective time for " +
"temporally sensitive states.",
"temporally sensitive states.",
args = {
@FunctionArg(
name = "map",
Expand All @@ -50,16 +50,16 @@
class GetState extends AbstractManyChildFunction {

static final String NAME = "getState";
private final StateProvider lookupProvider;
private final StateFetcher stateProvider;
private Generator gen;
private String map;
private String key;
private Instant effectiveTime;

public GetState(final ExpressionContext expressionContext, final String name) {
super(name, 2, 3);
this.lookupProvider = expressionContext.getStateProvider();
Objects.requireNonNull(lookupProvider, "Null lookup provider");
this.stateProvider = expressionContext.getStateProvider();
Objects.requireNonNull(stateProvider, "Null lookup provider");
}

@Override
Expand All @@ -83,7 +83,7 @@ public void setParams(final Param[] params) throws ParseException {
// If we have values for all params then do a lookup now.
if (map != null && key != null && effectiveTime != null) {
// Create static value.
final Val val = lookupProvider.getState(map, key, effectiveTime);
final Val val = stateProvider.getState(map, key, effectiveTime);
gen = new StaticValueGen(val);
}
}
Expand All @@ -98,23 +98,23 @@ public Generator createGenerator() {

@Override
protected Generator createGenerator(final Generator[] childGenerators) {
return new Gen(lookupProvider, map, key, effectiveTime, childGenerators);
return new Gen(stateProvider, map, key, effectiveTime, childGenerators);
}

private static final class Gen extends AbstractManyChildGenerator {

private final StateProvider lookupProvider;
private final StateFetcher stateProvider;
private final String map;
private final String key;
private final Instant effectiveTime;

Gen(final StateProvider lookupProvider,
Gen(final StateFetcher stateProvider,
final String map,
final String key,
final Instant effectiveTime,
final Generator[] childGenerators) {
super(childGenerators);
this.lookupProvider = lookupProvider;
this.stateProvider = stateProvider;
this.map = map;
this.key = key;
this.effectiveTime = effectiveTime;
Expand Down Expand Up @@ -150,7 +150,7 @@ public Val eval(final StoredValues storedValues, final Supplier<ChildData> child

Val val = ValNull.INSTANCE;
if (map != null && key != null && effectiveTime != null) {
val = lookupProvider.getState(map, key, effectiveTime);
val = stateProvider.getState(map, key, effectiveTime);
}
return val;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import java.time.Instant;

public interface PlanBStateProvider {
public interface StateFetcher {

Val getState(String map, String key, Instant effectiveTimeMs);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import stroom.docstore.api.ContentIndexable;
import stroom.explorer.api.ExplorerActionHandler;
import stroom.importexport.api.ImportExportActionHandler;
import stroom.pipeline.xsltfunctions.PlanBLookup;
import stroom.pipeline.xsltfunctions.StateLookupProvider;
import stroom.planb.impl.pipeline.PlanBElementModule;
import stroom.planb.impl.pipeline.PlanBLookupImpl;
import stroom.planb.impl.pipeline.StateLookupProviderImpl;
import stroom.planb.impl.pipeline.StateProviderImpl;
import stroom.query.language.functions.StateProvider;
import stroom.util.entityevent.EntityEvent;
import stroom.util.guice.GuiceUtil;
import stroom.util.shared.Clearable;
Expand All @@ -18,7 +20,8 @@ public class MockStateModule extends AbstractModule {
protected void configure() {
install(new PlanBElementModule());

bind(PlanBLookup.class).to(PlanBLookupImpl.class);
GuiceUtil.buildMultiBinder(binder(), StateLookupProvider.class).addBinding(StateLookupProviderImpl.class);
GuiceUtil.buildMultiBinder(binder(), StateProvider.class).addBinding(StateProviderImpl.class);

// // Services
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
import stroom.explorer.api.ExplorerActionHandler;
import stroom.importexport.api.ImportExportActionHandler;
import stroom.job.api.ScheduledJobsBinder;
import stroom.pipeline.xsltfunctions.PlanBLookup;
import stroom.pipeline.xsltfunctions.StateLookupProvider;
import stroom.planb.impl.pipeline.PlanBElementModule;
import stroom.planb.impl.pipeline.PlanBLookupImpl;
import stroom.planb.impl.pipeline.PlanBProviderImpl;
import stroom.planb.impl.pipeline.StateLookupProviderImpl;
import stroom.planb.impl.pipeline.StateProviderImpl;
import stroom.planb.shared.PlanBDoc;
import stroom.query.common.v2.IndexFieldProvider;
import stroom.query.common.v2.SearchProvider;
import stroom.query.language.functions.PlanBStateProvider;
import stroom.query.language.functions.StateProvider;
import stroom.util.RunnableWrapper;
import stroom.util.entityevent.EntityEvent;
import stroom.util.guice.GuiceUtil;
Expand All @@ -46,8 +46,8 @@ public class StateModule extends AbstractModule {
protected void configure() {
install(new PlanBElementModule());

bind(PlanBLookup.class).to(PlanBLookupImpl.class);
bind(PlanBStateProvider.class).to(PlanBProviderImpl.class);
GuiceUtil.buildMultiBinder(binder(), StateLookupProvider.class).addBinding(StateLookupProviderImpl.class);
GuiceUtil.buildMultiBinder(binder(), StateProvider.class).addBinding(StateProviderImpl.class);

// Caches
bind(PlanBDocCache.class).to(PlanBDocCacheImpl.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import stroom.pipeline.refdata.ReferenceDataResult;
import stroom.pipeline.refdata.store.MapDefinition;
import stroom.pipeline.refdata.store.RefStreamDefinition;
import stroom.pipeline.xsltfunctions.PlanBLookup;
import stroom.pipeline.xsltfunctions.StateLookupProvider;
import stroom.planb.impl.PlanBDocCache;
import stroom.planb.impl.dao.TemporalState;
import stroom.planb.shared.PlanBDoc;
Expand All @@ -22,8 +22,10 @@
import java.util.Map;
import java.util.Optional;

// TODO : FIXME

@PipelineScoped
public class PlanBLookupImpl implements PlanBLookup {
public class StateLookupProviderImpl implements StateLookupProvider {

private static final ByteBuffer TRUE = ByteBuffer
.wrap(Boolean.toString(true).getBytes(StandardCharsets.UTF_8));
Expand All @@ -35,7 +37,7 @@ public class PlanBLookupImpl implements PlanBLookup {
private final Map<String, Optional<PlanBDoc>> stateDocMap = new HashMap<>();

@Inject
public PlanBLookupImpl(final PlanBDocCache stateDocCache) {
public StateLookupProviderImpl(final PlanBDocCache stateDocCache) {
this.stateDocCache = stateDocCache;
cache = Caffeine.newBuilder().maximumSize(1000).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import stroom.planb.impl.PlanBDocCache;
import stroom.planb.impl.dao.TemporalState;
import stroom.planb.shared.PlanBDoc;
import stroom.query.language.functions.PlanBStateProvider;
import stroom.query.language.functions.StateProvider;
import stroom.query.language.functions.Val;
import stroom.query.language.functions.ValNull;
import stroom.query.language.functions.ValString;
Expand All @@ -18,14 +18,14 @@
import java.util.Map;
import java.util.Optional;

public class PlanBProviderImpl implements PlanBStateProvider {
public class StateProviderImpl implements StateProvider {

private final PlanBDocCache stateDocCache;
private final Cache<Key, Val> cache;
private final Map<String, Optional<PlanBDoc>> stateDocMap = new HashMap<>();

@Inject
public PlanBProviderImpl(final PlanBDocCache stateDocCache) {
public StateProviderImpl(final PlanBDocCache stateDocCache) {
this.stateDocCache = stateDocCache;
cache = Caffeine.newBuilder().maximumSize(1000).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@
import java.util.Optional;
import java.util.function.Consumer;

public class PlanBValueProxy implements RefDataValueProxy {
public class StateValueProxy implements RefDataValueProxy {

private static final Logger LOGGER = LoggerFactory.getLogger(PlanBValueProxy.class);
private static final Logger LOGGER = LoggerFactory.getLogger(StateValueProxy.class);

private final TemporalState state;
private final MapDefinition mapDefinition;

// This will be set with mapDefinition if we have a successful lookup with it, else stays null
private MapDefinition successfulMapDefinition = null;

public PlanBValueProxy(final TemporalState state,
public StateValueProxy(final TemporalState state,
final MapDefinition mapDefinition) {
this.state = state;
this.mapDefinition = mapDefinition;
Expand Down Expand Up @@ -80,7 +80,8 @@ public Optional<RefDataValue> supplyValue() {

@Override
public boolean consumeBytes(final Consumer<TypedByteBuffer> typedByteBufferConsumer) {
typedByteBufferConsumer.accept(new TypedByteBuffer(state.value().typeId(),
typedByteBufferConsumer.accept(new TypedByteBuffer(
state.value().typeId(),
state.value().byteBuffer().duplicate()));
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
import stroom.docstore.api.DocumentActionHandlerBinder;
import stroom.explorer.api.ExplorerActionHandler;
import stroom.importexport.api.ImportExportActionHandler;
import stroom.pipeline.xsltfunctions.StateLookup;
import stroom.pipeline.xsltfunctions.StateLookupProvider;
import stroom.query.language.functions.StateFetcher;
import stroom.query.language.functions.StateProvider;
import stroom.state.impl.pipeline.StateElementModule;
import stroom.state.impl.pipeline.StateLookupImpl;
import stroom.state.impl.pipeline.StateFetcherImpl;
import stroom.state.impl.pipeline.StateLookupProviderImpl;
import stroom.state.impl.pipeline.StateProviderImpl;
import stroom.state.shared.ScyllaDbDoc;
import stroom.util.entityevent.EntityEvent;
import stroom.util.guice.GuiceUtil;
Expand All @@ -21,7 +25,9 @@ public class MockStateModule extends AbstractModule {
protected void configure() {
install(new StateElementModule());

bind(StateLookup.class).to(StateLookupImpl.class);
GuiceUtil.buildMultiBinder(binder(), StateLookupProvider.class).addBinding(StateLookupProviderImpl.class);
GuiceUtil.buildMultiBinder(binder(), StateProvider.class).addBinding(StateProviderImpl.class);
bind(StateFetcher.class).to(StateFetcherImpl.class);

// // Services
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import stroom.explorer.api.ExplorerActionHandler;
import stroom.importexport.api.ImportExportActionHandler;
import stroom.job.api.ScheduledJobsBinder;
import stroom.pipeline.xsltfunctions.StateLookup;
import stroom.pipeline.xsltfunctions.StateLookupProvider;
import stroom.query.common.v2.IndexFieldProvider;
import stroom.query.common.v2.SearchProvider;
import stroom.query.language.functions.StateFetcher;
import stroom.query.language.functions.StateProvider;
import stroom.state.impl.pipeline.StateElementModule;
import stroom.state.impl.pipeline.StateLookupImpl;
import stroom.state.impl.pipeline.StateFetcherImpl;
import stroom.state.impl.pipeline.StateLookupProviderImpl;
import stroom.state.impl.pipeline.StateProviderImpl;
import stroom.state.shared.ScyllaDbDoc;
import stroom.state.shared.StateDoc;
Expand All @@ -47,8 +49,9 @@ public class StateModule extends AbstractModule {
protected void configure() {
install(new StateElementModule());

bind(StateLookup.class).to(StateLookupImpl.class);
bind(StateProvider.class).to(StateProviderImpl.class);
GuiceUtil.buildMultiBinder(binder(), StateLookupProvider.class).addBinding(StateLookupProviderImpl.class);
GuiceUtil.buildMultiBinder(binder(), StateProvider.class).addBinding(StateProviderImpl.class);
bind(StateFetcher.class).to(StateFetcherImpl.class);

// Caches
bind(ScyllaDbDocCache.class).to(ScyllaDbDocCacheImpl.class);
Expand Down
Loading

0 comments on commit bfc6329

Please sign in to comment.