Skip to content

Commit

Permalink
#4682 PlanB State Store
Browse files Browse the repository at this point in the history
  • Loading branch information
stroomdev66 committed Jan 21, 2025
1 parent 6cc7cbd commit 2028b48
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import jakarta.inject.Inject;
import jakarta.inject.Provider;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Response.Status;
Expand Down Expand Up @@ -61,19 +62,13 @@ public Response fetchSnapshot(final SnapshotRequest request) {
.build();
} catch (final NotModifiedException e) {
LOGGER.debug(e::getMessage, e);
return Response
.status(Status.NOT_MODIFIED.getStatusCode(), e.getMessage())
.build();
throw new WebApplicationException(e.getMessage(), Status.NOT_MODIFIED);
} catch (final PermissionException e) {
LOGGER.debug(e::getMessage, e);
return Response
.status(Status.UNAUTHORIZED.getStatusCode(), e.getMessage())
.build();
throw new WebApplicationException(e.getMessage(), Status.UNAUTHORIZED);
} catch (final Exception e) {
LOGGER.debug(e::getMessage, e);
return Response
.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage())
.build();
throw new WebApplicationException(e.getMessage(), Status.NOT_FOUND);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,11 @@ private void createZip(final OutputStream outputStream,
ZipUtil.createOutputStream(new BufferedOutputStream(outputStream))) {
ZipUtil.zip(shardDir, zipOutputStream);
zipOutputStream.putArchiveEntry(new ZipArchiveEntry(SNAPSHOT_INFO_FILE_NAME));
zipOutputStream.write(lastWriteTime.toString().getBytes(StandardCharsets.UTF_8));
try {
zipOutputStream.write(lastWriteTime.toString().getBytes(StandardCharsets.UTF_8));
} finally {
zipOutputStream.closeArchiveEntry();
}
} catch (final IOException e) {
LOGGER.error(e::getMessage, e);
throw new UncheckedIOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import stroom.util.logging.LambdaLogger;
import stroom.util.logging.LambdaLoggerFactory;
import stroom.util.logging.LogUtil;
import stroom.util.string.StringIdUtil;
import stroom.util.zip.ZipUtil;

import jakarta.inject.Inject;
Expand All @@ -16,8 +17,8 @@
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;

Expand All @@ -31,6 +32,7 @@ public class MergeProcessor {

private final SequentialFileStore fileStore;
private final Path mergingDir;
private final AtomicLong mergingId = new AtomicLong();
private final SecurityContext securityContext;
private final TaskContextFactory taskContextFactory;
private final ShardManager shardManager;
Expand All @@ -49,27 +51,12 @@ public MergeProcessor(final SequentialFileStore fileStore,
this.shardManager = shardManager;

mergingDir = statePaths.getMergingDir();
if (ensureDirExists(mergingDir)) {
if (!FileUtil.deleteContents(mergingDir)) {
throw new RuntimeException("Unable to delete contents of: " + FileUtil.getCanonicalPath(mergingDir));
}
FileUtil.ensureDirExists(mergingDir);
if (!FileUtil.deleteContents(mergingDir)) {
throw new RuntimeException("Unable to delete contents of: " + FileUtil.getCanonicalPath(mergingDir));
}
}

private boolean ensureDirExists(final Path path) {
if (Files.isDirectory(path)) {
return true;
}

try {
Files.createDirectories(path);
} catch (final IOException e) {
throw new UncheckedIOException(e);
}

return false;
}

public void merge() {
if (!merging) {
synchronized (this) {
Expand Down Expand Up @@ -152,7 +139,8 @@ private void merge(final SequentialFile sequentialFile) {
try {
final Path zipFile = sequentialFile.getZip();
if (Files.isRegularFile(zipFile)) {
final Path dir = mergingDir.resolve(UUID.randomUUID().toString());
final String mergingDirName = StringIdUtil.idToString(mergingId.incrementAndGet());
final Path dir = mergingDir.resolve(mergingDirName);
ZipUtil.unzip(zipFile, dir);

// We ought to have one or more stores to merge.
Expand All @@ -167,6 +155,9 @@ private void merge(final SequentialFile sequentialFile) {
});
}

// Delete dir.
FileUtil.deleteDir(dir);

// Delete the original zip file.
sequentialFile.delete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public ShardManager(final ByteBufferFactory byteBufferFactory,
this.statePaths = statePaths;
this.fileTransferClient = fileTransferClient;

// Delete any existing shards that might have been left behind from the last use of Stroom.
FileUtil.deleteDir(statePaths.getShardDir());
// Delete any existing snapshots that might have been left behind from the last use of Stroom.
FileUtil.deleteDir(statePaths.getSnapshotDir());
}

private boolean isSnapshotNode() {
Expand Down

0 comments on commit 2028b48

Please sign in to comment.