Skip to content

Commit

Permalink
#4682 PlanB State Store
Browse files Browse the repository at this point in the history
  • Loading branch information
stroomdev66 committed Jan 14, 2025
1 parent a2b6506 commit bfa015e
Show file tree
Hide file tree
Showing 10 changed files with 213 additions and 134 deletions.
33 changes: 33 additions & 0 deletions stroom-app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -821,4 +821,37 @@ tasks.register('Stroom Node 1 (GWT Super Dev)', JavaExec) {
args = ['server', 'local.yml']
}

tasks.register('Stroom Node 2 (GWT Super Dev)', JavaExec) {
doFirst {
// We have to alter the existing jvm args rather than set them so that we don't lose debug args added by
// IJ Idea.
def jvmArgsCopy = []
for (arg in jvmArgs) {
jvmArgsCopy.add(arg)
}
// -DgwtSuperDevMode=true
// -Xmx4g
// -Devent.logging.validate=true
// --add-opens java.base/java.nio=ALL-UNNAMED
// --add-opens java.base/sun.nio.ch=ALL-UNNAMED
// --add-opens java.base/java.lang=ALL-UNNAMED
jvmArgsCopy.add("-DgwtSuperDevMode=true")
jvmArgsCopy.add("-Xmx4g")
jvmArgsCopy.add("-Devent.logging.validate=true")
jvmArgsCopy.add("--add-opens")
jvmArgsCopy.add("java.base/java.nio=ALL-UNNAMED")
jvmArgsCopy.add("--add-opens")
jvmArgsCopy.add("java.base/sun.nio.ch=ALL-UNNAMED")
jvmArgsCopy.add("--add-opens")
jvmArgsCopy.add("java.base/java.lang=ALL-UNNAMED")
jvmArgs = jvmArgsCopy
}

group = 'application'
classpath = sourceSets.main.runtimeClasspath
mainClass = 'stroom.app.App'
workingDir = '../'
args = ['server', 'local2.yml']
}

// vim: set tabstop=4 shiftwidth=4 expandtab:
6 changes: 4 additions & 2 deletions stroom-app/src/test/java/stroom/test/SetupSampleDataBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import stroom.statistics.impl.hbase.entity.StroomStatsStoreStore;
import stroom.statistics.impl.sql.entity.StatisticStoreStore;
import stroom.test.common.StroomCoreServerTestFileUtil;
import stroom.util.date.DateUtil;
import stroom.util.io.FileUtil;
import stroom.util.io.StreamUtil;

Expand Down Expand Up @@ -349,8 +350,9 @@ public void loadDirectory(final boolean shutdown, final Path importRootDir) {
long startTime = System.currentTimeMillis() - (14 * dayMs);

// Load reference data first.
dataLoader.read(dataDir, true, startTime);
startTime += tenMinMs;
// Force the effective data to be before the event time in the data that is 2010.....
dataLoader.read(dataDir, true,
DateUtil.parseNormalDateTimeString("2000-01-01T00:00:00.000Z"));

// Then load event data.
dataLoader.read(dataDir, false, startTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import stroom.util.guice.GuiceUtil;
import stroom.util.guice.RestResourcesBinder;
import stroom.util.shared.Clearable;
import stroom.util.shared.scheduler.CronExpressions;

import com.google.inject.AbstractModule;
import jakarta.inject.Inject;
Expand Down Expand Up @@ -94,7 +95,7 @@ protected void configure() {
.bindJobTo(StateMergeRunnable.class, builder -> builder
.name(MergeProcessor.TASK_NAME)
.description("Plan B state store merge")
.cronSchedule("0 0 0 * * ?")
.cronSchedule(CronExpressions.EVERY_MINUTE.getExpression())
.advanced(true));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package stroom.planb.impl.data;

import java.io.IOException;
import java.nio.file.Path;

public interface FileTransferClient {

void storePart(FileDescriptor fileDescriptor,
Path path) throws IOException;
Path path);

void fetchSnapshot(String nodeName,
SnapshotRequest request,
Path snapshotDir) throws IOException;
Path snapshotDir);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import stroom.node.api.NodeInfo;
import stroom.node.api.NodeService;
import stroom.planb.impl.PlanBConfig;
import stroom.security.api.SecurityContext;
import stroom.util.jersey.WebTargetFactory;
import stroom.util.logging.LambdaLogger;
import stroom.util.logging.LambdaLoggerFactory;
Expand All @@ -23,6 +24,7 @@
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashSet;
Expand All @@ -40,63 +42,72 @@ public class FileTransferClientImpl implements FileTransferClient {
private final TargetNodeSetFactory targetNodeSetFactory;
private final WebTargetFactory webTargetFactory;
private final SequentialFileStore fileStore;
private final SecurityContext securityContext;

@Inject
public FileTransferClientImpl(final Provider<PlanBConfig> configProvider,
final NodeService nodeService,
final NodeInfo nodeInfo,
final TargetNodeSetFactory targetNodeSetFactory,
final WebTargetFactory webTargetFactory,
final SequentialFileStore fileStore) {
final SequentialFileStore fileStore,
final SecurityContext securityContext) {
this.configProvider = configProvider;
this.nodeService = nodeService;
this.nodeInfo = nodeInfo;
this.targetNodeSetFactory = targetNodeSetFactory;
this.webTargetFactory = webTargetFactory;
this.fileStore = fileStore;
this.securityContext = securityContext;
}

@Override
public void storePart(final FileDescriptor fileDescriptor,
final Path path) throws IOException {
final Set<String> targetNodes = new HashSet<>();

// Now post to all nodes.
final PlanBConfig planBConfig = configProvider.get();
final List<String> configuredNodes = planBConfig.getNodeList();
if (configuredNodes == null || configuredNodes.isEmpty()) {
LOGGER.warn("No node list configured for PlanB, assuming this is a single node test setup");
targetNodes.add(nodeInfo.getThisNodeName());

} else {
final Path path) {
securityContext.asProcessingUser(() -> {
try {
final Set<String> enabledActiveNodes = targetNodeSetFactory.getEnabledActiveTargetNodeSet();
for (final String node : configuredNodes) {
if (enabledActiveNodes.contains(node)) {
targetNodes.add(node);
} else {
throw new RuntimeException("Plan B target node '" + node + "' is not enabled or active");
final Set<String> targetNodes = new HashSet<>();

// Now post to all nodes.
final PlanBConfig planBConfig = configProvider.get();
final List<String> configuredNodes = planBConfig.getNodeList();
if (configuredNodes == null || configuredNodes.isEmpty()) {
LOGGER.warn("No node list configured for PlanB, assuming this is a single node test setup");
targetNodes.add(nodeInfo.getThisNodeName());

} else {
try {
final Set<String> enabledActiveNodes = targetNodeSetFactory.getEnabledActiveTargetNodeSet();
for (final String node : configuredNodes) {
if (enabledActiveNodes.contains(node)) {
targetNodes.add(node);
} else {
throw new RuntimeException("Plan B target node '" + node + "' is not enabled or active");
}
}
} catch (final Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
} catch (final Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}

// Send the data to all nodes.
for (final String nodeName : targetNodes) {
if (NodeCallUtil.shouldExecuteLocally(nodeInfo, nodeName)) {
storePartLocally(
fileDescriptor,
path);
} else {
storePartRemotely(
nodeInfo.getThisNodeName(),
nodeName,
fileDescriptor,
path);
// Send the data to all nodes.
for (final String nodeName : targetNodes) {
if (NodeCallUtil.shouldExecuteLocally(nodeInfo, nodeName)) {
storePartLocally(
fileDescriptor,
path);
} else {
storePartRemotely(
nodeInfo.getThisNodeName(),
nodeName,
fileDescriptor,
path);
}
}
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
}
});
}

private void storePartLocally(final FileDescriptor fileDescriptor,
Expand All @@ -109,19 +120,16 @@ private void storePartRemotely(final String sourceNode,
final FileDescriptor fileDescriptor,
final Path path) throws IOException {
final String baseEndpointUrl = NodeCallUtil.getBaseEndpointUrl(nodeInfo, nodeService, targetNode);
final String url = baseEndpointUrl + ResourcePaths.buildServletPath(FileTransferResource.BASE_PATH,
final String url = baseEndpointUrl + ResourcePaths.buildAuthenticatedApiPath(FileTransferResource.BASE_PATH,
FileTransferResource.SEND_PART_PATH_PART);
final WebTarget webTarget = webTargetFactory.create(url);
try {
if (!storePartRemotely(webTarget, fileDescriptor, path)) {
throw new IOException("Unable to send file to: " + sourceNode);
}
} catch (final IOException e) {
LOGGER.error(e::getMessage, e);
throw e;
} catch (final Exception e) {
LOGGER.error(e::getMessage, e);
throw new IOException(e);
throw new IOException("Unable to send file to: " + sourceNode, e);
}
}

Expand All @@ -142,14 +150,24 @@ boolean storePartRemotely(final WebTarget webTarget,
@Override
public void fetchSnapshot(final String nodeName,
final SnapshotRequest request,
final Path snapshotDir) throws IOException {
LOGGER.info(() -> "Fetching snapshot from '" + nodeName + "' for '" + request.getMapName() + "'");
final String url = NodeCallUtil.getBaseEndpointUrl(nodeInfo, nodeService, nodeName)
+ ResourcePaths.buildAuthenticatedApiPath(
FileTransferResource.BASE_PATH,
FileTransferResource.FETCH_SNAPSHOT_PATH_PART);
final WebTarget webTarget = webTargetFactory.create(url);
fetchSnapshot(webTarget, request, snapshotDir);
final Path snapshotDir) {
securityContext.asProcessingUser(() -> {
try {
LOGGER.info(() -> "Fetching snapshot from '" + nodeName + "' for '" + request.getMapName() + "'");
final String url = NodeCallUtil.getBaseEndpointUrl(nodeInfo, nodeService, nodeName)
+ ResourcePaths.buildAuthenticatedApiPath(
FileTransferResource.BASE_PATH,
FileTransferResource.FETCH_SNAPSHOT_PATH_PART);
final WebTarget webTarget = webTargetFactory.create(url);
fetchSnapshot(webTarget, request, snapshotDir);
} catch (final Exception e) {
throw new RuntimeException("Error fetching snapshot from '" +
nodeName +
"' for '" +
request.getMapName() +
"'", e);
}
});
}

void fetchSnapshot(final WebTarget webTarget,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
@Consumes(MediaType.APPLICATION_JSON)
public interface FileTransferResource extends RestResource {

String BASE_PATH = "/snapshot" + ResourcePaths.V1;
String BASE_PATH = "/fileTransfer" + ResourcePaths.V1;
String FETCH_SNAPSHOT_PATH_PART = "/fetchSnapshot";
String SEND_PART_PATH_PART = "/sendPart";

Expand Down
Loading

0 comments on commit bfa015e

Please sign in to comment.