Skip to content

Commit

Permalink
#4682 PlanB State Store
Browse files Browse the repository at this point in the history
  • Loading branch information
stroomdev66 committed Jan 14, 2025
1 parent bfa015e commit 2551d04
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 109 deletions.
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
package stroom.planb.impl.data;

import stroom.bytebuffer.impl6.ByteBufferFactory;
import stroom.planb.impl.PlanBDocCache;
import stroom.planb.impl.io.StatePaths;
import stroom.security.api.SecurityContext;
import stroom.util.io.StreamUtil;
import stroom.util.logging.LambdaLogger;
import stroom.util.logging.LambdaLoggerFactory;
import stroom.util.shared.PermissionException;
import stroom.util.zip.ZipUtil;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;

@Singleton
Expand All @@ -27,16 +24,15 @@ public class FileTransferServiceImpl implements FileTransferService {

private final SequentialFileStore fileStore;
private final SecurityContext securityContext;
private final Path shardDir;
private final ShardManager shardManager;

@Inject
public FileTransferServiceImpl(final SequentialFileStore fileStore,
final SecurityContext securityContext,
final StatePaths statePaths,
final ByteBufferFactory byteBufferFactory) {
final ShardManager shardManager) {
this.fileStore = fileStore;
this.securityContext = securityContext;
shardDir = statePaths.getShardDir();
this.shardManager = shardManager;
}

@Override
Expand All @@ -45,24 +41,10 @@ public void fetchSnapshot(final SnapshotRequest request, final OutputStream outp
throw new PermissionException(securityContext.getUserRef(), "Only processing users can use this resource");
}

final Path shard = shardDir.resolve(request.getMapName());
if (!Files.exists(shard)) {
throw new RuntimeException("Shard not found");
}
final Path lmdbDataFile = shard.resolve("data.mdb");
if (!Files.exists(lmdbDataFile)) {
throw new RuntimeException("LMDB data file not found");
}

// TODO : Possibly create windowed snapshots.

// For now we'll just stream the whole map to the requestor. In future we could easily just create a time
// window snapshot.

try (final ZipArchiveOutputStream zipOutputStream =
ZipUtil.createOutputStream(new BufferedOutputStream(outputStream))) {
ZipUtil.zip(shard, zipOutputStream);
}
final String mapName = request.getMapName();
shardManager.zip(mapName, outputStream);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
package stroom.planb.impl.data;

import stroom.bytebuffer.impl6.ByteBufferFactory;
import stroom.planb.impl.PlanBDocCache;
import stroom.planb.impl.io.RangedStateWriter;
import stroom.planb.impl.io.SessionWriter;
import stroom.planb.impl.io.StatePaths;
import stroom.planb.impl.io.StateWriter;
import stroom.planb.impl.io.TemporalRangedStateWriter;
import stroom.planb.impl.io.TemporalStateWriter;
import stroom.planb.shared.PlanBDoc;
import stroom.security.api.SecurityContext;
import stroom.task.api.TaskContext;
import stroom.task.api.TaskContextFactory;
Expand Down Expand Up @@ -36,33 +28,28 @@ public class MergeProcessor {
private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(MergeProcessor.class);

private final SequentialFileStore fileStore;
private final ByteBufferFactory byteBufferFactory;
private final Path mergingDir;
private final Path shardDir;
private final PlanBDocCache planBDocCache;
private final SecurityContext securityContext;
private final TaskContextFactory taskContextFactory;
private final ShardManager shardManager;

@Inject
public MergeProcessor(final SequentialFileStore fileStore,
final ByteBufferFactory byteBufferFactory,
final PlanBDocCache planBDocCache,
final StatePaths statePaths,
final SecurityContext securityContext,
final TaskContextFactory taskContextFactory) {
final TaskContextFactory taskContextFactory,
final ShardManager shardManager) {
this.fileStore = fileStore;
this.byteBufferFactory = byteBufferFactory;
this.planBDocCache = planBDocCache;
this.securityContext = securityContext;
this.taskContextFactory = taskContextFactory;
this.shardManager = shardManager;

mergingDir = statePaths.getMergingDir();
if (ensureDirExists(mergingDir)) {
if (!FileUtil.deleteContents(mergingDir)) {
throw new RuntimeException("Unable to delete contents of: " + FileUtil.getCanonicalPath(mergingDir));
}
}
shardDir = statePaths.getShardDir();
}

private boolean ensureDirExists(final Path path) {
Expand Down Expand Up @@ -128,22 +115,10 @@ private void merge(final SequentialFile sequentialFile) throws IOException {
try (final Stream<Path> stream = Files.list(dir)) {
stream.forEach(source -> {
try {
final String mapName = source.getFileName().toString();
final PlanBDoc doc = planBDocCache.get(mapName);
if (doc != null) {
// Get shard dir.
final Path target = shardDir.resolve(mapName);
if (!Files.isDirectory(target)) {
Files.createDirectories(shardDir);
Files.move(source, target);
} else {
// merge.
merge(doc, source, target);

// Delete source.
FileUtil.deleteDir(source);
}
}
// Merge source.
shardManager.merge(source);
// Delete source.
FileUtil.deleteDir(source);
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
Expand All @@ -154,39 +129,4 @@ private void merge(final SequentialFile sequentialFile) throws IOException {
sequentialFile.delete();
}
}

private void merge(final PlanBDoc doc, final Path sourcePath, final Path targetPath) {
switch (doc.getStateType()) {
case STATE -> {
try (final StateWriter writer =
new StateWriter(targetPath, byteBufferFactory)) {
writer.merge(sourcePath);
}
}
case TEMPORAL_STATE -> {
try (final TemporalStateWriter writer =
new TemporalStateWriter(targetPath, byteBufferFactory)) {
writer.merge(sourcePath);
}
}
case RANGED_STATE -> {
try (final RangedStateWriter writer =
new RangedStateWriter(targetPath, byteBufferFactory)) {
writer.merge(sourcePath);
}
}
case TEMPORAL_RANGED_STATE -> {
try (final TemporalRangedStateWriter writer =
new TemporalRangedStateWriter(targetPath, byteBufferFactory)) {
writer.merge(sourcePath);
}
}
case SESSION -> {
try (final SessionWriter writer =
new SessionWriter(targetPath, byteBufferFactory)) {
writer.merge(sourcePath);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package stroom.planb.impl.data;

import stroom.bytebuffer.impl6.ByteBufferFactory;
import stroom.planb.impl.PlanBDocCache;
import stroom.planb.impl.io.AbstractLmdbWriter;
import stroom.planb.impl.io.RangedStateWriter;
import stroom.planb.impl.io.SessionWriter;
import stroom.planb.impl.io.StatePaths;
import stroom.planb.impl.io.StateWriter;
import stroom.planb.impl.io.TemporalRangedStateWriter;
import stroom.planb.impl.io.TemporalStateWriter;
import stroom.planb.shared.PlanBDoc;
import stroom.util.zip.ZipUtil;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.locks.ReentrantLock;

@Singleton
public class ShardManager {

private final ByteBufferFactory byteBufferFactory;
private final Path shardDir;
private final PlanBDocCache planBDocCache;


private final ReentrantLock reentrantLock = new ReentrantLock();
private volatile String currentMapName;
private volatile AbstractLmdbWriter<?, ?> currentWriter;

@Inject
public ShardManager(final ByteBufferFactory byteBufferFactory,
final PlanBDocCache planBDocCache,
final StatePaths statePaths) {
this.byteBufferFactory = byteBufferFactory;
this.planBDocCache = planBDocCache;
shardDir = statePaths.getShardDir();
}

public void merge(final Path sourceDir) throws IOException {
final String mapName = sourceDir.getFileName().toString();
final PlanBDoc doc = planBDocCache.get(mapName);
if (doc != null) {
// Get shard dir.
final Path target = shardDir.resolve(mapName);

// If we don't already have the shard dir then just move the source to the target.
if (!Files.isDirectory(target)) {
lock(() -> {
try {
Files.createDirectories(shardDir);
Files.move(sourceDir, target);
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
});

} else {
// If we do already have a target then merge the source to the target.
lock(() -> {
currentMapName = mapName;
currentWriter = getWriter(doc, target);
});
try {
currentWriter.merge(sourceDir);
} finally {
lock(() -> {
currentMapName = null;
currentWriter.close();
});
}
}
}
}

public void zip(final String mapName, final OutputStream outputStream) throws IOException {
final PlanBDoc doc = planBDocCache.get(mapName);
if (doc != null) {

// Get shard dir.
final Path shard = shardDir.resolve(mapName);
if (!Files.exists(shard)) {
throw new RuntimeException("Shard not found");
}
final Path lmdbDataFile = shard.resolve("data.mdb");
if (!Files.exists(lmdbDataFile)) {
throw new RuntimeException("LMDB data file not found");
}

lock(() -> {
if (currentMapName.equals(mapName)) {
if (currentWriter != null) {
currentWriter.lock(() -> zip(shard, outputStream));
} else {
zip(shard, outputStream);
}
} else {
zip(shard, outputStream);
}
});
}
}

private void lock(final Runnable runnable) {
reentrantLock.lock();
try {
runnable.run();
} finally {
reentrantLock.unlock();
}
}

private void zip(final Path shard, final OutputStream outputStream) {
try (final ZipArchiveOutputStream zipOutputStream =
ZipUtil.createOutputStream(new BufferedOutputStream(outputStream))) {
ZipUtil.zip(shard, zipOutputStream);
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
}


private AbstractLmdbWriter<?, ?> getWriter(final PlanBDoc doc, final Path targetPath) {
switch (doc.getStateType()) {
case STATE -> {
return new StateWriter(targetPath, byteBufferFactory);
}
case TEMPORAL_STATE -> {
return new TemporalStateWriter(targetPath, byteBufferFactory);
}
case RANGED_STATE -> {
return new RangedStateWriter(targetPath, byteBufferFactory);
}
case TEMPORAL_RANGED_STATE -> {
return new TemporalRangedStateWriter(targetPath, byteBufferFactory);
}
case SESSION -> {
return new SessionWriter(targetPath, byteBufferFactory);
}
default -> throw new RuntimeException("Unexpected state type: " + doc.getStateType());
}
}
}
Loading

0 comments on commit 2551d04

Please sign in to comment.