Skip to content

Commit

Permalink
#4682 PlanB State Store
Browse files Browse the repository at this point in the history
  • Loading branch information
stroomdev66 committed Jan 10, 2025
1 parent 48dba3f commit 445a43c
Show file tree
Hide file tree
Showing 18 changed files with 92 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import stroom.meta.shared.MetaResource;
import stroom.pipeline.shared.PipelineDoc;
import stroom.pipeline.shared.data.PipelineReference;
import stroom.planb.shared.PlanBDoc;
import stroom.security.shared.DocumentPermission;
import stroom.state.shared.StateDoc;
import stroom.ui.config.client.UiConfigCache;
Expand Down Expand Up @@ -67,7 +68,7 @@ public NewPipelineReferencePresenter(final EventBus eventBus,
this.uiConfigCache = uiConfigCache;

// TODO : @66 FIX TEMPORARY ABUSE OF PIPELINE REF
pipelinePresenter.setIncludedTypes(PipelineDoc.TYPE, StateDoc.TYPE);
pipelinePresenter.setIncludedTypes(PipelineDoc.TYPE, StateDoc.TYPE, PlanBDoc.TYPE);
pipelinePresenter.setRequiredPermissions(DocumentPermission.USE);

feedPresenter.setIncludedTypes(FeedDoc.TYPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import stroom.pipeline.shared.data.PipelineElementType;
import stroom.pipeline.shared.data.PipelinePropertyType;
import stroom.pipeline.shared.data.PipelineReference;
import stroom.planb.shared.PlanBDoc;
import stroom.state.shared.StateDoc;
import stroom.svg.client.SvgPresets;
import stroom.util.client.DataGridUtil;
Expand Down Expand Up @@ -336,10 +337,12 @@ private void showEditor(final PipelineReference pipelineReference, final boolean
AlertEvent.fireError(PipelineReferenceListPresenter.this,
"You must specify a pipeline to use.", e::reset);
} else if (!StateDoc.TYPE.equals(pipelineReference.getPipeline().getType()) &&
!PlanBDoc.TYPE.equals(pipelineReference.getPipeline().getType()) &&
pipelineReference.getFeed() == null) {
AlertEvent.fireError(PipelineReferenceListPresenter.this, "You must specify a feed to use.",
e::reset);
} else if (!StateDoc.TYPE.equals(pipelineReference.getPipeline().getType()) &&
!PlanBDoc.TYPE.equals(pipelineReference.getPipeline().getType()) &&
pipelineReference.getStreamType() == null) {
AlertEvent.fireError(PipelineReferenceListPresenter.this,
"You must specify a stream type to use.", e::reset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import stroom.pipeline.shared.data.PipelineReference;
import stroom.pipeline.state.FeedHolder;
import stroom.pipeline.state.MetaHolder;
import stroom.pipeline.xsltfunctions.PlanBLookup;
import stroom.pipeline.xsltfunctions.StateLookup;
import stroom.planb.shared.PlanBDoc;
import stroom.security.api.SecurityContext;
import stroom.state.shared.StateDoc;
import stroom.task.api.TaskContext;
Expand Down Expand Up @@ -82,6 +84,7 @@ public class ReferenceData {
private final SecurityContext securityContext;
private final TaskContextFactory taskContextFactory;
private final StateLookup stateLookup;
private final PlanBLookup planBLookup;

@Inject
ReferenceData(final EffectiveStreamService effectiveStreamService,
Expand All @@ -95,7 +98,8 @@ public class ReferenceData {
final PipelineStore pipelineStore,
final SecurityContext securityContext,
final TaskContextFactory taskContextFactory,
final StateLookup stateLookup) {
final StateLookup stateLookup,
final PlanBLookup planBLookup) {
this.effectiveStreamService = effectiveStreamService;
this.feedHolder = feedHolder;
this.metaHolder = metaHolder;
Expand All @@ -108,6 +112,7 @@ public class ReferenceData {
this.securityContext = securityContext;
this.taskContextFactory = taskContextFactory;
this.stateLookup = stateLookup;
this.planBLookup = planBLookup;
}

/**
Expand Down Expand Up @@ -164,7 +169,7 @@ public ReferenceDataResult ensureReferenceDataAvailability(final List<PipelineRe
result.logLazyTemplate(
Severity.INFO,
"Nested lookup using previous lookup value as new key: '{}' - " +
"(primary map: {}, secondary map: {}, nested lookup: {})",
"(primary map: {}, secondary map: {}, nested lookup: {})",
() -> Arrays.asList(
nextKey,
nestedIdentifier.getPrimaryMapName(),
Expand Down Expand Up @@ -235,9 +240,17 @@ private void doGetValue(final List<PipelineReference> pipelineReferences,
pipelineReference, lookupIdentifier);

// Try the state store if it is present.
if (StateDoc.TYPE.equals(pipelineReference.getPipeline().getType())) {
final DocRef pipeline = pipelineReference.getPipeline();
final String mapName = lookupIdentifier.getPrimaryMapName();
if (PlanBDoc.TYPE.equals(pipeline.getType())) {
// TODO : @66 TEMPORARY INTEGRATION OF STATE LOOKUP USING PIPELINE AS STATE DOC REFERENCE.
if (stateLookup != null) {
if (planBLookup != null && mapName.equalsIgnoreCase(pipeline.getName())) {
planBLookup.lookup(lookupIdentifier, referenceDataResult);
}

} else if (StateDoc.TYPE.equals(pipeline.getType())) {
// TODO : @66 TEMPORARY INTEGRATION OF STATE LOOKUP USING PIPELINE AS STATE DOC REFERENCE.
if (planBLookup != null && mapName.equalsIgnoreCase(pipeline.getName())) {
stateLookup.lookup(lookupIdentifier, referenceDataResult);
}

Expand Down Expand Up @@ -385,7 +398,7 @@ private void logMapAvailability(final PipelineReference pipelineReference,

result.logLazyTemplate(Severity.INFO,
"Availability of map '{}' is '{}' in stream: {}, feed: '{}', pipeline: '{}', " +
"lookup required: {}",
"lookup required: {}",
() -> Arrays.asList(
mapName,
mapAvailability,
Expand Down Expand Up @@ -455,10 +468,10 @@ private void getValueFromExternalRefStream(
final ReferenceDataResult result) {

if (pipelineReference.getFeed() == null ||
pipelineReference.getFeed().getUuid() == null ||
pipelineReference.getFeed().getUuid().isEmpty() ||
pipelineReference.getStreamType() == null ||
pipelineReference.getStreamType().isEmpty()) {
pipelineReference.getFeed().getUuid() == null ||
pipelineReference.getFeed().getUuid().isEmpty() ||
pipelineReference.getStreamType() == null ||
pipelineReference.getStreamType().isEmpty()) {

result.logSimpleTemplate(Severity.ERROR,
"pipelineReference is not fully formed, {}",
Expand All @@ -468,7 +481,7 @@ private void getValueFromExternalRefStream(
// Check that the current user has permission to read the ref stream.
final boolean hasPermission = localDocumentPermissionCache.computeIfAbsent(pipelineReference, k ->
documentPermissionCache == null ||
documentPermissionCache.canUseDocument(pipelineReference.getFeed()));
documentPermissionCache.canUseDocument(pipelineReference.getFeed()));

if (hasPermission) {
// Find the latest ref stream that is before our lookup time
Expand All @@ -481,7 +494,7 @@ private void getValueFromExternalRefStream(

result.logLazyTemplate(Severity.INFO,
"Checking effective stream: {} in feed: '{}' for presence of map '{}' " +
"(stream effective time: {}, lookup time: {}, pipeline feed: '{}')",
"(stream effective time: {}, lookup time: {}, pipeline feed: '{}')",
() -> Arrays.asList(
effectiveStream.getId(),
effectiveStream.getFeedName(),
Expand Down Expand Up @@ -592,7 +605,7 @@ private boolean ensureRefStreamAvailability(final ReferenceDataResult result,
if (optLoadState.isPresent() && optLoadState.get().equals(ProcessingState.FAILED)) {
throw new RuntimeException(LogUtil.message(
"Reference stream {} has been loaded previously but failed, " +
"aborting lookup against this stream.",
"aborting lookup against this stream.",
refStreamDefinition.getStreamId()));
} else {
if (isRefLoadRequired) {
Expand All @@ -617,7 +630,7 @@ private boolean ensureRefStreamAvailability(final ReferenceDataResult result,

// No point in continuing if the load was interrupted
if (!isTerminated()
&& (storedErrorReceiver == null
&& (storedErrorReceiver == null
|| storedErrorReceiver.getCount(Severity.FATAL_ERROR) == 0)) {
// mark this ref stream defs as available for future lookups within this
// pipeline process
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package stroom.pipeline.xsltfunctions;

public interface PlanBLookup extends StateLookupProvider {

}
Original file line number Diff line number Diff line change
@@ -1,35 +1,5 @@
package stroom.pipeline.xsltfunctions;

import stroom.pipeline.refdata.LookupIdentifier;
import stroom.pipeline.refdata.ReferenceDataResult;
public interface StateLookup extends StateLookupProvider {

import jakarta.inject.Inject;

import java.util.Set;

public class StateLookup {

private final Set<StateLookupProvider> providers;

@Inject
StateLookup(final Set<StateLookupProvider> providers) {
this.providers = providers;
}

/**
* <p>
* Given a {@link LookupIdentifier} and a store doc ref, ensure that
* the data required to perform the lookup is in the ref store. This method will not
* perform the lookup, instead it will populate the {@link ReferenceDataResult} with
* a proxy object that can later be used to do the lookup.
* </p>
*
* @param docRef A reference to the state doc.
* @param lookupIdentifier The identifier to lookup in the reference data
* @param result The reference result object containing the proxy object for performing the lookup
*/
public void lookup(LookupIdentifier lookupIdentifier,
ReferenceDataResult result) {
providers.forEach(provider -> provider.lookup(lookupIdentifier, result));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,5 @@ public interface StateLookupProvider {
* @param lookupIdentifier The identifier to lookup in the reference data
* @param result The reference result object containing the proxy object for performing the lookup
*/
void lookup(LookupIdentifier lookupIdentifier,
ReferenceDataResult result);
void lookup(LookupIdentifier lookupIdentifier, ReferenceDataResult result);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import stroom.docstore.api.ContentIndexable;
import stroom.explorer.api.ExplorerActionHandler;
import stroom.importexport.api.ImportExportActionHandler;
import stroom.pipeline.xsltfunctions.StateLookupProvider;
import stroom.pipeline.xsltfunctions.PlanBLookup;
import stroom.planb.impl.pipeline.PlanBElementModule;
import stroom.planb.impl.pipeline.StateLookupProviderImpl;
import stroom.planb.impl.pipeline.PlanBLookupImpl;
import stroom.planb.impl.pipeline.StateProviderImpl;
import stroom.query.language.functions.StateProvider;
import stroom.util.entityevent.EntityEvent;
Expand All @@ -20,7 +20,7 @@ public class MockStateModule extends AbstractModule {
protected void configure() {
install(new PlanBElementModule());

GuiceUtil.buildMultiBinder(binder(), StateLookupProvider.class).addBinding(StateLookupProviderImpl.class);
bind(PlanBLookup.class).to(PlanBLookupImpl.class);
GuiceUtil.buildMultiBinder(binder(), StateProvider.class).addBinding(StateProviderImpl.class);

// // Services
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class PlanBDocCacheImpl implements PlanBDocCache, Clearable, EntityEvent.

private static final Logger LOGGER = LoggerFactory.getLogger(PlanBDocCacheImpl.class);

private static final String CACHE_NAME = "PlanB State Doc Cache";
private static final String CACHE_NAME = "Plan B State Doc Cache";

private final PlanBDocStore stateDocStore;
private final LoadingStroomCache<String, PlanBDoc> cache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@
import stroom.explorer.api.ExplorerActionHandler;
import stroom.importexport.api.ImportExportActionHandler;
import stroom.job.api.ScheduledJobsBinder;
import stroom.pipeline.xsltfunctions.StateLookupProvider;
import stroom.pipeline.xsltfunctions.PlanBLookup;
import stroom.planb.impl.data.FileTransferClient;
import stroom.planb.impl.data.FileTransferClientImpl;
import stroom.planb.impl.data.FileTransferResourceImpl;
import stroom.planb.impl.data.FileTransferService;
import stroom.planb.impl.data.FileTransferServiceImpl;
import stroom.planb.impl.data.MergeProcessor;
import stroom.planb.impl.pipeline.PlanBElementModule;
import stroom.planb.impl.pipeline.StateLookupProviderImpl;
import stroom.planb.impl.pipeline.PlanBLookupImpl;
import stroom.planb.impl.pipeline.StateProviderImpl;
import stroom.planb.shared.PlanBDoc;
import stroom.query.common.v2.IndexFieldProvider;
Expand All @@ -52,7 +52,7 @@ public class PlanBModule extends AbstractModule {
protected void configure() {
install(new PlanBElementModule());

GuiceUtil.buildMultiBinder(binder(), StateLookupProvider.class).addBinding(StateLookupProviderImpl.class);
bind(PlanBLookup.class).to(PlanBLookupImpl.class);
GuiceUtil.buildMultiBinder(binder(), StateProvider.class).addBinding(StateProviderImpl.class);

// Caches
Expand Down Expand Up @@ -93,7 +93,7 @@ protected void configure() {
ScheduledJobsBinder.create(binder())
.bindJobTo(StateMergeRunnable.class, builder -> builder
.name(MergeProcessor.TASK_NAME)
.description("PlanB state store merge")
.description("Plan B state store merge")
.cronSchedule("0 0 0 * * ?")
.advanced(true));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
public class PlanBNameValidator {

// Deliberately forces lower case naming.
private static final Pattern NAME_PATTERN = Pattern.compile("^[a-z_0-9]{1,48}$");
private static final Pattern NAME_PATTERN = Pattern.compile("^[a-z_0-9]+$");

private PlanBNameValidator() {
// Utility class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void storePart(final FileDescriptor fileDescriptor,
if (enabledActiveNodes.contains(node)) {
targetNodes.add(node);
} else {
throw new RuntimeException("PlanB target node '" + node + "' is not enabled or active");
throw new RuntimeException("Plan B target node '" + node + "' is not enabled or active");
}
}
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import stroom.planb.impl.io.TemporalStateWriter;
import stroom.planb.shared.PlanBDoc;
import stroom.security.api.SecurityContext;
import stroom.task.api.TaskContext;
import stroom.task.api.TaskContextFactory;
import stroom.util.io.FileUtil;
import stroom.util.logging.LambdaLogger;
Expand All @@ -30,7 +31,7 @@
@Singleton
public class MergeProcessor {

public static final String TASK_NAME = "PlanB Merge Processor";
public static final String TASK_NAME = "Plan B Merge Processor";

private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(MergeProcessor.class);

Expand Down Expand Up @@ -80,33 +81,34 @@ private boolean ensureDirExists(final Path path) {

public void exec() {
securityContext.asProcessingUser(() -> {
taskContextFactory.context("PlanB state merge", taskContext -> {
try {
final long minStoreId = fileStore.getMinStoreId();
final long maxStoreId = fileStore.getMaxStoreId();
LOGGER.info(() -> LogUtil.message("Min store id = {}, max store id = {}", minStoreId, maxStoreId));

long storeId = minStoreId;
if (storeId == -1) {
LOGGER.info("Store is empty");
storeId = 0;
}
final TaskContext taskContext = taskContextFactory.current();
try {
final long minStoreId = fileStore.getMinStoreId();
final long maxStoreId = fileStore.getMaxStoreId();
LOGGER.info(() -> LogUtil.message("Min store id = {}, max store id = {}",
minStoreId,
maxStoreId));

long storeId = minStoreId;
if (storeId == -1) {
LOGGER.info("Store is empty");
storeId = 0;
}

while (!taskContext.isTerminated() && !Thread.currentThread().isInterrupted()) {
// Wait until new data is available.
final long currentStoreId = storeId;
taskContext.info(() -> "Waiting for data...");
final SequentialFile sequentialFile = fileStore.awaitNew(currentStoreId);
taskContext.info(() -> "Merging data: " + currentStoreId);
merge(sequentialFile);
while (!taskContext.isTerminated() && !Thread.currentThread().isInterrupted()) {
// Wait until new data is available.
final long currentStoreId = storeId;
taskContext.info(() -> "Waiting for data...");
final SequentialFile sequentialFile = fileStore.awaitNew(currentStoreId);
taskContext.info(() -> "Merging data: " + currentStoreId);
merge(sequentialFile);

// Increment store id.
storeId++;
}
} catch (final IOException e) {
throw new UncheckedIOException(e);
// Increment store id.
storeId++;
}
}).run();
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class ReaderCache {

private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(ReaderCache.class);

private static final String CACHE_NAME = "PlanB Reader Cache";
private static final String CACHE_NAME = "Plan B Reader Cache";

private final LoadingStroomCache<String, Shard> cache;
private final ByteBufferFactory byteBufferFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ Takes XML input (conforming to the reference-data:2 schema) and \
roles = {
PipelineElementType.ROLE_TARGET,
PipelineElementType.ROLE_HAS_TARGETS},
icon = SvgImage.DOCUMENT_STATE_STORE)
icon = SvgImage.DOCUMENT_PLAN_B)
public class PlanBFilter extends AbstractXMLFilter {

private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(PlanBFilter.class);
Expand Down
Loading

0 comments on commit 445a43c

Please sign in to comment.