Skip to content

Commit

Permalink
#4682 PlanB State Store
Browse files Browse the repository at this point in the history
  • Loading branch information
stroomdev66 committed Jan 9, 2025
1 parent 6445aa2 commit f19a5ba
Show file tree
Hide file tree
Showing 83 changed files with 2,378 additions and 1,086 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.function.Predicate;
import java.util.regex.Pattern;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -49,7 +50,11 @@ void testImportZip() throws IOException {
final Path importDir = rootTestDir.resolve("samples/config");
final Path zipFile = getCurrentTestDir().resolve(UUID.randomUUID() + ".zip");

ZipUtil.zip(zipFile, importDir, Pattern.compile(".*DATA_SPLITTER.*"), null);
final Predicate<Path> filePredicate = path -> !path.equals(zipFile);
final Predicate<String> entryPredicate = ZipUtil
.createIncludeExcludeEntryPredicate(Pattern.compile(".*DATA_SPLITTER.*"), null);

ZipUtil.zip(zipFile, importDir, filePredicate, entryPredicate);
assertThat(Files.isRegularFile(zipFile)).isTrue();
assertThat(Files.isDirectory(importDir)).isTrue();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -73,7 +74,11 @@ void testAdvancedFeatures() throws IOException {
final Path zipFile = getCurrentTestDir().resolve(UUID.randomUUID() + ".zip");
final ImportSettings.Builder builder = ImportSettings.builder();

ZipUtil.zip(zipFile, importDir, Pattern.compile(".*DATA_SPLITTER.*"), null);
final Predicate<Path> filePredicate = path -> !path.equals(zipFile);
final Predicate<String> entryPredicate = ZipUtil
.createIncludeExcludeEntryPredicate(Pattern.compile(".*DATA_SPLITTER.*"), null);

ZipUtil.zip(zipFile, importDir, filePredicate, entryPredicate);
assertThat(Files.isRegularFile(zipFile)).isTrue();
assertThat(Files.isDirectory(importDir)).isTrue();

Expand Down
1 change: 1 addition & 0 deletions stroom-importexport/stroom-importexport-impl/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies {
implementation project(':stroom-util')
implementation project(':stroom-util-shared')

implementation libs.commons_compress
implementation libs.eventLogging
implementation libs.guice
implementation libs.jackson_annotations
Expand Down
8 changes: 8 additions & 0 deletions stroom-state/stroom-planb-impl/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ ext.moduleName = 'stroom.planb.impl'
dependencies {
implementation project(':stroom-bytebuffer')
implementation project(':stroom-cache:stroom-cache-api')
implementation project(':stroom-cluster:stroom-cluster-task-api')
implementation project(':stroom-cluster:stroom-cluster-lock-api')
implementation project(':stroom-core-shared')
implementation project(':stroom-dictionary:stroom-dictionary-api')
Expand All @@ -14,6 +15,7 @@ dependencies {
implementation project(':stroom-job:stroom-job-api')
implementation project(':stroom-lmdb')
implementation project(':stroom-meta:stroom-meta-api')
implementation project(':stroom-node:stroom-node-api')
implementation project(':stroom-pipeline')
implementation project(':stroom-query:stroom-query-api')
implementation project(':stroom-query:stroom-query-common')
Expand All @@ -25,19 +27,25 @@ dependencies {
implementation project(':stroom-util-shared')

implementation libs.caffeine
implementation libs.commons_compress
implementation libs.fast_infoset
implementation libs.guava
implementation libs.guice
implementation libs.http_client
implementation libs.jackson_annotations
implementation libs.jakarta_inject
implementation libs.jakarta_servlet_api
implementation libs.jaxb_api
implementation libs.kryo
implementation libs.lmdbjava
implementation libs.restygwt
implementation libs.saxon_he
implementation libs.slf4j_api
implementation libs.swagger_annotations
implementation libs.ws_rs_api
implementation libs.zero_allocation_hashing

testImplementation project(':stroom-test-common')
testImplementation libs.assertj_core
testImplementation libs.mockito_core
testImplementation libs.mockito_junit_jupiter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,77 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;

import java.util.ArrayList;
import java.util.List;

@JsonPropertyOrder(alphabetic = true)
public class PlanBConfig extends AbstractConfig implements IsStroomConfig {

private final CacheConfig stateDocCache;
private final CacheConfig snapshotCache;
private final CacheConfig readerCache;
private final List<String> nodeList;
private final String path;

public PlanBConfig() {
stateDocCache = CacheConfig.builder()
.maximumSize(100L)
.expireAfterWrite(StroomDuration.ofMinutes(10))
.build();
snapshotCache = CacheConfig.builder()
.maximumSize(100L)
.expireAfterWrite(StroomDuration.ofMinutes(10))
.build();
readerCache = CacheConfig.builder()
.maximumSize(10L)
.expireAfterWrite(StroomDuration.ofMinutes(5))
.build();
nodeList = new ArrayList<>();
path = "${stroom.home}/planb";
}

@SuppressWarnings("unused")
@JsonCreator
public PlanBConfig(@JsonProperty("stateDocCache") final CacheConfig stateDocCache) {
public PlanBConfig(@JsonProperty("stateDocCache") final CacheConfig stateDocCache,
@JsonProperty("snapshotCache") final CacheConfig snapshotCache,
@JsonProperty("readerCache") final CacheConfig readerCache,
@JsonProperty("nodeList") final List<String> nodeList,
@JsonProperty("path") final String path) {
this.stateDocCache = stateDocCache;
this.snapshotCache = snapshotCache;
this.readerCache = readerCache;
this.nodeList = nodeList;
this.path = path;
}

public CacheConfig getPlanBDocCache() {
public CacheConfig getStateDocCache() {
return stateDocCache;
}

public CacheConfig getSnapshotCache() {
return snapshotCache;
}

public CacheConfig getReaderCache() {
return readerCache;
}

public List<String> getNodeList() {
return nodeList;
}

public String getPath() {
return path;
}

@Override
public String toString() {
return "StateConfig{" +
return "PlanBConfig{" +
"stateDocCache=" + stateDocCache +
", snapshotCache=" + snapshotCache +
", readerCache=" + readerCache +
", nodeList=" + nodeList +
", path='" + path + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class PlanBDocCacheImpl implements PlanBDocCache, Clearable, EntityEvent.

private static final Logger LOGGER = LoggerFactory.getLogger(PlanBDocCacheImpl.class);

private static final String CACHE_NAME = "State Doc Cache";
private static final String CACHE_NAME = "PlanB State Doc Cache";

private final PlanBDocStore stateDocStore;
private final LoadingStroomCache<String, PlanBDoc> cache;
Expand All @@ -62,7 +62,7 @@ public class PlanBDocCacheImpl implements PlanBDocCache, Clearable, EntityEvent.
this.securityContext = securityContext;
cache = cacheManager.createLoadingCache(
CACHE_NAME,
() -> stateConfigProvider.get().getPlanBDocCache(),
() -> stateConfigProvider.get().getStateDocCache(),
this::create);
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@
import stroom.importexport.api.ImportExportActionHandler;
import stroom.job.api.ScheduledJobsBinder;
import stroom.pipeline.xsltfunctions.StateLookupProvider;
import stroom.planb.impl.data.FileTransferClient;
import stroom.planb.impl.data.FileTransferClientImpl;
import stroom.planb.impl.data.FileTransferResourceImpl;
import stroom.planb.impl.data.FileTransferService;
import stroom.planb.impl.data.FileTransferServiceImpl;
import stroom.planb.impl.data.MergeProcessor;
import stroom.planb.impl.pipeline.PlanBElementModule;
import stroom.planb.impl.pipeline.StateLookupProviderImpl;
import stroom.planb.impl.pipeline.StateProviderImpl;
Expand Down Expand Up @@ -60,6 +66,8 @@ protected void configure() {

// State
bind(PlanBDocStore.class).to(PlanBDocStoreImpl.class);
bind(FileTransferClient.class).to(FileTransferClientImpl.class);
bind(FileTransferService.class).to(FileTransferServiceImpl.class);

GuiceUtil.buildMultiBinder(binder(), ExplorerActionHandler.class)
.addBinding(PlanBDocStoreImpl.class);
Expand All @@ -72,7 +80,8 @@ protected void configure() {
.bind(PlanBDoc.TYPE, PlanBDocStoreImpl.class);

RestResourcesBinder.create(binder())
.bind(PlanBDocResourceImpl.class);
.bind(PlanBDocResourceImpl.class)
.bind(FileTransferResourceImpl.class);

GuiceUtil.buildMultiBinder(binder(), DataSourceProvider.class)
.addBinding(StateSearchProvider.class);
Expand All @@ -82,19 +91,20 @@ protected void configure() {
.addBinding(StateSearchProvider.class);

ScheduledJobsBinder.create(binder())
.bindJobTo(StateMaintenanceRunnable.class, builder -> builder
.name(StateMaintenanceExecutor.TASK_NAME)
.description("State store maintenance")
.bindJobTo(StateMergeRunnable.class, builder -> builder
.name(MergeProcessor.TASK_NAME)
.description("PlanB State store merge")
.cronSchedule("0 0 0 * * ?")
.advanced(true));
}

private static class StateMaintenanceRunnable extends RunnableWrapper {

private static class StateMergeRunnable extends RunnableWrapper {

@Inject
StateMaintenanceRunnable(final StateMaintenanceExecutor condenserExecutor,
final ClusterLockService clusterLockService) {
super(() -> clusterLockService.tryLock(StateMaintenanceExecutor.TASK_NAME, condenserExecutor::exec));
StateMergeRunnable(final MergeProcessor mergeProcessor,
final ClusterLockService clusterLockService) {
super(mergeProcessor::exec);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import stroom.docref.DocRef;
import stroom.entity.shared.ExpressionCriteria;
import stroom.index.shared.IndexFieldImpl;
import stroom.planb.impl.dao.StateFieldUtil;
import stroom.planb.impl.io.StateFieldUtil;
import stroom.planb.shared.PlanBDoc;
import stroom.query.api.v2.ExpressionUtil;
import stroom.query.api.v2.Query;
Expand Down
Loading

0 comments on commit f19a5ba

Please sign in to comment.