Skip to content

Commit

Permalink
#4682 PlanB State Store
Browse files Browse the repository at this point in the history
  • Loading branch information
stroomdev66 committed Jan 17, 2025
1 parent 891b388 commit 7618e4d
Show file tree
Hide file tree
Showing 15 changed files with 932 additions and 646 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class PlanBConfig extends AbstractConfig implements IsStroomConfig {
private final String path;
private final StroomDuration minTimeToKeepSnapshots;
private final StroomDuration minTimeToKeepEnvOpen;
private final StroomDuration snapshotRetryFetchInterval;

public PlanBConfig() {
this("${stroom.home}/planb");
Expand All @@ -36,6 +37,7 @@ public PlanBConfig(final String path) {
Collections.emptyList(),
path,
StroomDuration.ofMinutes(10),
StroomDuration.ofMinutes(1),
StroomDuration.ofMinutes(1));
}

Expand All @@ -45,12 +47,14 @@ public PlanBConfig(@JsonProperty("stateDocCache") final CacheConfig stateDocCach
@JsonProperty("nodeList") final List<String> nodeList,
@JsonProperty("path") final String path,
@JsonProperty("minTimeToKeepSnapshots") final StroomDuration minTimeToKeepSnapshots,
@JsonProperty("minTimeToKeepEnvOpen") final StroomDuration minTimeToKeepEnvOpen) {
@JsonProperty("minTimeToKeepEnvOpen") final StroomDuration minTimeToKeepEnvOpen,
@JsonProperty("snapshotRetryFetchInterval") final StroomDuration snapshotRetryFetchInterval) {
this.stateDocCache = stateDocCache;
this.nodeList = nodeList;
this.path = path;
this.minTimeToKeepSnapshots = minTimeToKeepSnapshots;
this.minTimeToKeepEnvOpen = minTimeToKeepEnvOpen;
this.snapshotRetryFetchInterval = snapshotRetryFetchInterval;
}

@JsonProperty
Expand Down Expand Up @@ -85,6 +89,12 @@ public StroomDuration getMinTimeToKeepEnvOpen() {
return minTimeToKeepEnvOpen;
}

@JsonProperty
@JsonPropertyDescription("How often should we retry to fetch snapshots when we fail to get a snapshot.")
public StroomDuration getSnapshotRetryFetchInterval() {
return snapshotRetryFetchInterval;
}

@Override
public String toString() {
return "PlanBConfig{" +
Expand All @@ -93,6 +103,7 @@ public String toString() {
", path='" + path + '\'' +
", minTimeToKeepSnapshots=" + minTimeToKeepSnapshots +
", minTimeToKeepEnvOpen=" + minTimeToKeepEnvOpen +
", snapshotRetryFetchInterval=" + snapshotRetryFetchInterval +
'}';
}

Expand All @@ -109,11 +120,18 @@ public boolean equals(final Object o) {
Objects.equals(nodeList, that.nodeList) &&
Objects.equals(path, that.path) &&
Objects.equals(minTimeToKeepSnapshots, that.minTimeToKeepSnapshots) &&
Objects.equals(minTimeToKeepEnvOpen, that.minTimeToKeepEnvOpen);
Objects.equals(minTimeToKeepEnvOpen, that.minTimeToKeepEnvOpen) &&
Objects.equals(snapshotRetryFetchInterval, that.snapshotRetryFetchInterval);
}

@Override
public int hashCode() {
return Objects.hash(stateDocCache, nodeList, path, minTimeToKeepSnapshots, minTimeToKeepEnvOpen);
return Objects.hash(
stateDocCache,
nodeList,
path,
minTimeToKeepSnapshots,
minTimeToKeepEnvOpen,
snapshotRetryFetchInterval);
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package stroom.planb.impl.data;

import java.nio.file.Path;
import java.time.Instant;

public interface FileTransferClient {

void storePart(FileDescriptor fileDescriptor,
Path path);

void fetchSnapshot(String nodeName,
SnapshotRequest request,
Path snapshotDir);
Instant fetchSnapshot(String nodeName,
SnapshotRequest request,
Path snapshotDir);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,18 @@
import stroom.util.jersey.WebTargetFactory;
import stroom.util.logging.LambdaLogger;
import stroom.util.logging.LambdaLoggerFactory;
import stroom.util.shared.PermissionException;
import stroom.util.shared.ResourcePaths;
import stroom.util.zip.ZipUtil;

import jakarta.inject.Inject;
import jakarta.inject.Provider;
import jakarta.inject.Singleton;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.client.WebTarget;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Response.Status;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.io.BufferedInputStream;
Expand All @@ -28,6 +29,7 @@
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -127,34 +129,37 @@ private void storePartRemotely(final String sourceNode,
FileTransferResource.SEND_PART_PATH_PART);
final WebTarget webTarget = webTargetFactory.create(url);
try {
if (!storePartRemotely(webTarget, fileDescriptor, path)) {
throw new IOException("Unable to send file to: " + sourceNode);
}
storePartRemotely(webTarget, fileDescriptor, path);
} catch (final Exception e) {
LOGGER.error(e::getMessage, e);
throw new IOException("Unable to send file to: " + sourceNode, e);
}
}

boolean storePartRemotely(final WebTarget webTarget,
final FileDescriptor fileDescriptor,
final Path path) throws IOException {
void storePartRemotely(final WebTarget webTarget,
final FileDescriptor fileDescriptor,
final Path path) throws IOException {
try (final InputStream inputStream = new BufferedInputStream(Files.newInputStream(path))) {
return webTarget
final Response response = webTarget
.request()
.header("createTime", fileDescriptor.createTimeMs())
.header("metaId", fileDescriptor.metaId())
.header("fileHash", fileDescriptor.fileHash())
.header("fileName", path.getFileName().toString())
.post(Entity.entity(inputStream, MediaType.APPLICATION_OCTET_STREAM), Boolean.class);
.post(Entity.entity(inputStream, MediaType.APPLICATION_OCTET_STREAM));
if (response.getStatus() == Status.UNAUTHORIZED.getStatusCode()) {
throw new PermissionException(null, response.getStatusInfo().getReasonPhrase());
} else if (response.getStatus() != Status.OK.getStatusCode()) {
throw new RuntimeException(response.getStatusInfo().getReasonPhrase());
}
}
}

@Override
public void fetchSnapshot(final String nodeName,
final SnapshotRequest request,
final Path snapshotDir) {
securityContext.asProcessingUser(() -> {
public Instant fetchSnapshot(final String nodeName,
final SnapshotRequest request,
final Path snapshotDir) {
return securityContext.asProcessingUserResult(() -> {
try {
LOGGER.info(() -> "Fetching snapshot from '" +
nodeName +
Expand All @@ -166,7 +171,7 @@ public void fetchSnapshot(final String nodeName,
FileTransferResource.BASE_PATH,
FileTransferResource.FETCH_SNAPSHOT_PATH_PART);
final WebTarget webTarget = webTargetFactory.create(url);
fetchSnapshot(webTarget, request, snapshotDir);
return fetchSnapshot(webTarget, request, snapshotDir);
} catch (final Exception e) {
throw new RuntimeException("Error fetching snapshot from '" +
nodeName +
Expand All @@ -177,18 +182,25 @@ public void fetchSnapshot(final String nodeName,
});
}

void fetchSnapshot(final WebTarget webTarget,
final SnapshotRequest request,
final Path snapshotDir) throws IOException {
Instant fetchSnapshot(final WebTarget webTarget,
final SnapshotRequest request,
final Path snapshotDir) throws IOException {
try (Response response = webTarget
.request(MediaType.APPLICATION_OCTET_STREAM)
.post(Entity.json(request))) {
if (response.getStatus() != 200) {
throw new WebApplicationException(response);
if (response.getStatus() == Status.NOT_MODIFIED.getStatusCode()) {
throw new NotModifiedException(response.getStatusInfo().getReasonPhrase());
} else if (response.getStatus() == Status.UNAUTHORIZED.getStatusCode()) {
throw new PermissionException(null, response.getStatusInfo().getReasonPhrase());
} else if (response.getStatus() != Status.OK.getStatusCode()) {
throw new RuntimeException(response.getStatusInfo().getReasonPhrase());
}

try (final InputStream stream = (InputStream) response.getEntity()) {
ZipUtil.unzip(stream, snapshotDir);
}
final String info = Files.readString(snapshotDir.resolve(Shard.SNAPSHOT_INFO_FILE_NAME));
return Instant.parse(info);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.StreamingOutput;
import jakarta.ws.rs.core.Response;

import java.io.InputStream;

Expand All @@ -51,7 +51,7 @@ public interface FileTransferResource extends RestResource {
@Operation(
summary = "Fetch Plan B snapshot",
operationId = "fetchSnapshot")
StreamingOutput fetchSnapshot(SnapshotRequest request);
Response fetchSnapshot(SnapshotRequest request);

@POST
@Path(SEND_PART_PATH_PART)
Expand All @@ -60,9 +60,9 @@ public interface FileTransferResource extends RestResource {
@Operation(
summary = "Send Plan B part",
operationId = "sendPart")
boolean sendPart(@HeaderParam("createTime") long createTime,
@HeaderParam("metaId") long metaId,
@HeaderParam("fileHash") String fileHash,
@HeaderParam("fileName") String fileName,
InputStream inputStream);
Response sendPart(@HeaderParam("createTime") long createTime,
@HeaderParam("metaId") long metaId,
@HeaderParam("fileHash") String fileHash,
@HeaderParam("fileName") String fileName,
InputStream inputStream);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@
import stroom.event.logging.rs.api.AutoLogged.OperationType;
import stroom.util.logging.LambdaLogger;
import stroom.util.logging.LambdaLoggerFactory;
import stroom.util.shared.PermissionException;

import jakarta.inject.Inject;
import jakarta.inject.Provider;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Response.Status;
import jakarta.ws.rs.core.StreamingOutput;

import java.io.InputStream;
Expand All @@ -41,26 +45,60 @@ public FileTransferResourceImpl(final Provider<FileTransferService> fileTransfer

@AutoLogged(OperationType.UNLOGGED)
@Override
public StreamingOutput fetchSnapshot(final SnapshotRequest request) {
return output -> fileTransferServiceProvider.get().fetchSnapshot(request, output);
public Response fetchSnapshot(final SnapshotRequest request) {
try {
// Check the status before we start streaming snapshot data as it is hard to capture meaningful errors mid
// stream.
fileTransferServiceProvider.get().checkSnapshotStatus(request);

// Stream the snapshhot content to the client as ZIP data
final StreamingOutput streamingOutput = output -> {
fileTransferServiceProvider.get().fetchSnapshot(request, output);
};

return Response
.ok(streamingOutput, MediaType.APPLICATION_OCTET_STREAM)
.build();
} catch (final NotModifiedException e) {
LOGGER.debug(e::getMessage, e);
return Response
.status(Status.NOT_MODIFIED.getStatusCode(), e.getMessage())
.build();
} catch (final PermissionException e) {
LOGGER.debug(e::getMessage, e);
return Response
.status(Status.UNAUTHORIZED.getStatusCode(), e.getMessage())
.build();
} catch (final Exception e) {
LOGGER.debug(e::getMessage, e);
return Response
.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage())
.build();
}
}

@AutoLogged(OperationType.UNLOGGED)
@Override
public boolean sendPart(final long createTime,
final long metaId,
final String fileHash,
final String fileName,
final InputStream inputStream) {
public Response sendPart(final long createTime,
final long metaId,
final String fileHash,
final String fileName,
final InputStream inputStream) {
try {
fileTransferServiceProvider.get().receivePart(createTime, metaId, fileHash, fileName, inputStream);
return true;
} catch (final RuntimeException e) {
LOGGER.error(e::getMessage, e);
throw e;
return Response
.ok()
.build();
} catch (final PermissionException e) {
LOGGER.debug(e::getMessage, e);
return Response
.status(Status.UNAUTHORIZED.getStatusCode(), e.getMessage())
.build();
} catch (final Exception e) {
LOGGER.error(e::getMessage, e);
throw new RuntimeException(e);
LOGGER.debug(e::getMessage, e);
return Response
.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage())
.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@

public interface FileTransferService {

void fetchSnapshot(final SnapshotRequest request, final OutputStream outputStream) throws IOException;
void checkSnapshotStatus(SnapshotRequest request);

void receivePart(final long createTime,
final long metaId,
final String fileHash,
final String fileName,
final InputStream inputStream) throws IOException;
void fetchSnapshot(SnapshotRequest request, OutputStream outputStream) throws IOException;

void receivePart(long createTime,
long metaId,
String fileHash,
String fileName,
InputStream inputStream) throws IOException;
}
Loading

0 comments on commit 7618e4d

Please sign in to comment.