Skip to content

Commit

Permalink
#4682 PlanB State Store
Browse files Browse the repository at this point in the history
  • Loading branch information
stroomdev66 committed Jan 10, 2025
1 parent a93427c commit 517dd8c
Show file tree
Hide file tree
Showing 23 changed files with 858 additions and 190 deletions.
101 changes: 101 additions & 0 deletions stroom-core-shared/src/main/java/stroom/planb/shared/PlanBDoc.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,4 +240,105 @@ public String toString() {
", retainTimeUnit=" + retainTimeUnit +
'}';
}

public static Builder builder() {
return new Builder();
}

public Builder copy() {
return new Builder(this);
}

public static class Builder extends AbstractBuilder<PlanBDoc, PlanBDoc.Builder> {

private String description;
private StateType stateType;
private boolean condense;
private int condenseAge;
private TimeUnit condenseTimeUnit;
private boolean retainForever;
private int retainAge;
private TimeUnit retainTimeUnit;

public Builder() {
}

public Builder(final PlanBDoc doc) {
super(doc);
this.description = doc.description;
this.stateType = doc.stateType;
this.condense = doc.condense;
this.condenseAge = doc.condenseAge;
this.condenseTimeUnit = doc.condenseTimeUnit;
this.retainForever = doc.retainForever;
this.retainAge = doc.retainAge;
this.retainTimeUnit = doc.retainTimeUnit;
}

public Builder description(final String description) {
this.description = description;
return self();
}

public Builder stateType(final StateType stateType) {
this.stateType = stateType;
return self();
}

public Builder condense(final boolean condense) {
this.condense = condense;
return self();
}

public Builder condenseAge(final int condenseAge) {
this.condenseAge = condenseAge;
return self();
}

public Builder condenseTimeUnit(final TimeUnit condenseTimeUnit) {
this.condenseTimeUnit = condenseTimeUnit;
return self();
}

public Builder retainForever(final boolean retainForever) {
this.retainForever = retainForever;
return self();
}

public Builder retainAge(final int retainAge) {
this.retainAge = retainAge;
return self();
}

public Builder retainTimeUnit(final TimeUnit retainTimeUnit) {
this.retainTimeUnit = retainTimeUnit;
return self();
}

@Override
protected Builder self() {
return this;
}

@Override
public PlanBDoc build() {
return new PlanBDoc(
type,
uuid,
name,
version,
createTimeMs,
updateTimeMs,
createUser,
updateUser,
description,
stateType,
condense,
condenseAge,
condenseTimeUnit,
retainForever,
retainAge,
retainTimeUnit);
}
}
}
2 changes: 2 additions & 0 deletions stroom-state/stroom-planb-impl/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ ext.moduleName = 'stroom.planb.impl'
dependencies {
implementation project(':stroom-bytebuffer')
implementation project(':stroom-cache:stroom-cache-api')
implementation project(':stroom-cache:stroom-cache-impl')
implementation project(':stroom-cluster:stroom-cluster-task-api')
implementation project(':stroom-cluster:stroom-cluster-lock-api')
implementation project(':stroom-core-shared')
Expand Down Expand Up @@ -45,6 +46,7 @@ dependencies {
implementation libs.ws_rs_api
implementation libs.zero_allocation_hashing

testImplementation project(':stroom-security:stroom-security-mock')
testImplementation project(':stroom-test-common')
testImplementation libs.assertj_core
testImplementation libs.mockito_core
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected void configure() {
ScheduledJobsBinder.create(binder())
.bindJobTo(StateMergeRunnable.class, builder -> builder
.name(MergeProcessor.TASK_NAME)
.description("PlanB State store merge")
.description("PlanB state store merge")
.cronSchedule("0 0 0 * * ?")
.advanced(true));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ public void fetchSnapshot(final SnapshotRequest request, final OutputStream outp
if (!Files.exists(shard)) {
throw new RuntimeException("Shard not found");
}
final Path lmdbDataFile = shard.resolve("data.mdb");
if (!Files.exists(lmdbDataFile)) {
throw new RuntimeException("LMDB data file not found");
}

// TODO : Possibly create windowed snapshots.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package stroom.planb.impl.data;

import stroom.bytebuffer.impl6.ByteBufferFactory;
import stroom.planb.impl.PlanBConfig;
import stroom.planb.impl.PlanBDocCache;
import stroom.planb.impl.io.RangedStateWriter;
import stroom.planb.impl.io.SessionWriter;
Expand All @@ -13,11 +12,12 @@
import stroom.security.api.SecurityContext;
import stroom.task.api.TaskContextFactory;
import stroom.util.io.FileUtil;
import stroom.util.io.PathCreator;
import stroom.util.logging.LambdaLogger;
import stroom.util.logging.LambdaLoggerFactory;
import stroom.util.logging.LogUtil;
import stroom.util.zip.ZipUtil;

import jakarta.inject.Inject;
import jakarta.inject.Provider;
import jakarta.inject.Singleton;

import java.io.IOException;
Expand All @@ -32,6 +32,8 @@ public class MergeProcessor {

public static final String TASK_NAME = "PlanB Merge Processor";

private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(MergeProcessor.class);

private final SequentialFileStore fileStore;
private final ByteBufferFactory byteBufferFactory;
private final Path mergingDir;
Expand All @@ -42,8 +44,6 @@ public class MergeProcessor {

@Inject
public MergeProcessor(final SequentialFileStore fileStore,
final Provider<PlanBConfig> configProvider,
final PathCreator pathCreator,
final ByteBufferFactory byteBufferFactory,
final PlanBDocCache planBDocCache,
final StatePaths statePaths,
Expand Down Expand Up @@ -80,78 +80,108 @@ private boolean ensureDirExists(final Path path) {

public void exec() {
securityContext.asProcessingUser(() -> {
taskContextFactory.context("State Condenser", taskContext -> {
taskContextFactory.context("PlanB state merge", taskContext -> {
try {
final long storeId = fileStore.awaitNew(0);
final SequentialFile sequentialFile = fileStore.getStoreFileSet(storeId);
final Path zipFile = sequentialFile.getZip();
final Path dir = mergingDir.resolve(UUID.randomUUID().toString());
ZipUtil.unzip(zipFile, dir);

// We ought to have one or more stores to merge.
try (final Stream<Path> stream = Files.list(dir)) {
stream.forEach(source -> {
try {
final String mapName = source.getFileName().toString();
final PlanBDoc doc = planBDocCache.get(mapName);
if (doc != null) {
// Get shard dir.
final Path target = shardDir.resolve(mapName);
if (!Files.isDirectory(target)) {
Files.move(source, target);
} else {
// merge.
merge(doc, source, target);

// Delete source.
FileUtil.deleteDir(source);
}
}
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
});
final long minStoreId = fileStore.getMinStoreId();
final long maxStoreId = fileStore.getMaxStoreId();
LOGGER.info(() -> LogUtil.message("Min store id = {}, max store id = {}", minStoreId, maxStoreId));

long storeId = minStoreId;
if (storeId == -1) {
LOGGER.info("Store is empty");
storeId = 0;
}

// Delete the original zip file.
sequentialFile.delete();
while (!taskContext.isTerminated() && !Thread.currentThread().isInterrupted()) {
// Wait until new data is available.
final long currentStoreId = storeId;
taskContext.info(() -> "Waiting for data...");
final SequentialFile sequentialFile = fileStore.awaitNew(currentStoreId);
taskContext.info(() -> "Merging data: " + currentStoreId);
merge(sequentialFile);

// Increment store id.
storeId++;
}
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
});
}).run();
});
}

public void merge(final long storeId) throws IOException {
// Wait until new data is available.
final SequentialFile sequentialFile = fileStore.awaitNew(storeId);
merge(sequentialFile);
}

private void merge(final SequentialFile sequentialFile) throws IOException {
final Path zipFile = sequentialFile.getZip();
if (Files.isRegularFile(zipFile)) {
final Path dir = mergingDir.resolve(UUID.randomUUID().toString());
ZipUtil.unzip(zipFile, dir);

// We ought to have one or more stores to merge.
try (final Stream<Path> stream = Files.list(dir)) {
stream.forEach(source -> {
try {
final String mapName = source.getFileName().toString();
final PlanBDoc doc = planBDocCache.get(mapName);
if (doc != null) {
// Get shard dir.
final Path target = shardDir.resolve(mapName);
if (!Files.isDirectory(target)) {
Files.createDirectories(shardDir);
Files.move(source, target);
} else {
// merge.
merge(doc, source, target);

// Delete source.
FileUtil.deleteDir(source);
}
}
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
});
}

// Delete the original zip file.
sequentialFile.delete();
}
}

private void merge(final PlanBDoc doc, final Path sourcePath, final Path targetPath) {
switch (doc.getStateType()) {
case STATE -> {
try (final StateWriter writer =
new StateWriter(targetPath, byteBufferFactory, false)) {
new StateWriter(targetPath, byteBufferFactory)) {
writer.merge(sourcePath);
}
}
case TEMPORAL_STATE -> {
try (final TemporalStateWriter writer =
new TemporalStateWriter(targetPath, byteBufferFactory, false)) {
new TemporalStateWriter(targetPath, byteBufferFactory)) {
writer.merge(sourcePath);
}
}
case RANGED_STATE -> {
try (final RangedStateWriter writer =
new RangedStateWriter(targetPath, byteBufferFactory, false)) {
new RangedStateWriter(targetPath, byteBufferFactory)) {
writer.merge(sourcePath);
}
}
case TEMPORAL_RANGED_STATE -> {
try (final TemporalRangedStateWriter writer =
new TemporalRangedStateWriter(targetPath, byteBufferFactory, false)) {
new TemporalRangedStateWriter(targetPath, byteBufferFactory)) {
writer.merge(sourcePath);
}
}
case SESSION -> {
try (final SessionWriter writer =
new SessionWriter(targetPath, byteBufferFactory, false)) {
new SessionWriter(targetPath, byteBufferFactory)) {
writer.merge(sourcePath);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import java.util.ArrayList;
import java.util.List;

class SequentialFile {
public class SequentialFile {

public static final String ZIP_EXTENSION = ".zip";

Expand Down
Loading

0 comments on commit 517dd8c

Please sign in to comment.