Skip to content

Commit

Permalink
#4682 PlanB State Store
Browse files Browse the repository at this point in the history
  • Loading branch information
stroomdev66 committed Jan 20, 2025
1 parent 2ce46a9 commit 5cdca6f
Show file tree
Hide file tree
Showing 33 changed files with 928 additions and 269 deletions.
4 changes: 2 additions & 2 deletions stroom-app/src/main/resources/ui/noauth/swagger/stroom.json
Original file line number Diff line number Diff line change
Expand Up @@ -20368,8 +20368,8 @@
"type" : "integer",
"format" : "int64"
},
"mapName" : {
"type" : "string"
"planBDocRef" : {
"$ref" : "#/components/schemas/DocRef"
}
}
},
Expand Down
4 changes: 2 additions & 2 deletions stroom-app/src/main/resources/ui/noauth/swagger/stroom.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15052,8 +15052,8 @@ components:
effectiveTime:
type: integer
format: int64
mapName:
type: string
planBDocRef:
$ref: "#/components/schemas/DocRef"
SolrConnectionConfig:
type: object
properties:
Expand Down
430 changes: 430 additions & 0 deletions stroom-state/stroom-planb-impl/README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,16 @@ protected void configure() {

ScheduledJobsBinder.create(binder())
.bindJobTo(StateMergeRunnable.class, builder -> builder
.name(MergeProcessor.TASK_NAME)
.name(MergeProcessor.MERGE_TASK_NAME)
.description("Plan B state store merge")
.cronSchedule(CronExpressions.EVERY_MINUTE.getExpression())
.advanced(true));
ScheduledJobsBinder.create(binder())
.bindJobTo(StateMaintenanceRunnable.class, builder -> builder
.name(MergeProcessor.MAINTAIN_TASK_NAME)
.description("Plan B state store maintain")
.cronSchedule(CronExpressions.EVERY_10_MINUTES.getExpression())
.advanced(true));
ScheduledJobsBinder.create(binder())
.bindJobTo(ShardManagerCleanupRunnable.class, builder -> builder
.name(ShardManager.CLEANUP_TASK_NAME)
Expand All @@ -110,7 +116,15 @@ private static class StateMergeRunnable extends RunnableWrapper {

@Inject
StateMergeRunnable(final MergeProcessor mergeProcessor) {
super(mergeProcessor::exec);
super(mergeProcessor::merge);
}
}

private static class StateMaintenanceRunnable extends RunnableWrapper {

@Inject
StateMaintenanceRunnable(final MergeProcessor mergeProcessor) {
super(mergeProcessor::maintainShards);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import stroom.query.common.v2.ResultStoreFactory;
import stroom.query.common.v2.SearchProcess;
import stroom.query.common.v2.SearchProvider;
import stroom.security.api.SecurityContext;
import stroom.task.api.TaskContextFactory;
import stroom.task.api.TaskManager;
import stroom.task.shared.TaskProgress;
Expand Down Expand Up @@ -76,6 +77,7 @@ public class StateSearchProvider implements SearchProvider, IndexFieldProvider {
private final TaskContextFactory taskContextFactory;
private final ShardManager shardManager;
private final ExpressionPredicateFactory expressionPredicateFactory;
private final SecurityContext securityContext;

@Inject
public StateSearchProvider(final Executor executor,
Expand All @@ -86,7 +88,8 @@ public StateSearchProvider(final Executor executor,
final TaskManager taskManager,
final TaskContextFactory taskContextFactory,
final ShardManager shardManager,
final ExpressionPredicateFactory expressionPredicateFactory) {
final ExpressionPredicateFactory expressionPredicateFactory,
final SecurityContext securityContext) {
this.executor = executor;
this.stateDocStore = stateDocStore;
this.stateDocCache = stateDocCache;
Expand All @@ -96,14 +99,17 @@ public StateSearchProvider(final Executor executor,
this.taskContextFactory = taskContextFactory;
this.shardManager = shardManager;
this.expressionPredicateFactory = expressionPredicateFactory;
this.securityContext = securityContext;
}

private PlanBDoc getPlanBDoc(final DocRef docRef) {
Objects.requireNonNull(docRef, "Null doc reference");
Objects.requireNonNull(docRef.getName(), "Null doc key");
final PlanBDoc doc = stateDocCache.get(docRef.getName());
Objects.requireNonNull(doc, "Null state doc");
return doc;
return securityContext.useAsReadResult(() -> {
Objects.requireNonNull(docRef, "Null doc reference");
Objects.requireNonNull(docRef.getName(), "Null doc key");
final PlanBDoc doc = stateDocCache.get(docRef.getName());
Objects.requireNonNull(doc, "Null state doc");
return doc;
});
}

@Override
Expand Down Expand Up @@ -139,7 +145,7 @@ public IndexField getIndexField(final DocRef docRef, final String fieldName) {

@Override
public Optional<String> fetchDocumentation(final DocRef docRef) {
return Optional.ofNullable(stateDocCache.get(docRef.getName())).map(PlanBDoc::getDescription);
return Optional.ofNullable(getPlanBDoc(docRef)).map(PlanBDoc::getDescription);
}

@Override
Expand All @@ -159,7 +165,7 @@ public ResultStore createResultStore(final SearchRequest searchRequest) {
final DocRef docRef = query.getDataSource();

// Check we have permission to read the doc.
final PlanBDoc doc = stateDocCache.get(docRef.getName());
final PlanBDoc doc = getPlanBDoc(docRef);
Objects.requireNonNull(doc, "Unable to find state doc with key: " + docRef.getName());

// Extract highlights.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public Instant fetchSnapshot(final String nodeName,
LOGGER.info(() -> "Fetching snapshot from '" +
nodeName +
"' for '" +
request.getMapName() +
request.getPlanBDocRef() +
"'");
final String url = NodeCallUtil.getBaseEndpointUrl(nodeInfo, nodeService, nodeName)
+ ResourcePaths.buildAuthenticatedApiPath(
Expand All @@ -176,7 +176,7 @@ public Instant fetchSnapshot(final String nodeName,
throw new RuntimeException("Error fetching snapshot from '" +
nodeName +
"' for '" +
request.getMapName() +
request.getPlanBDocRef() +
"'", e);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import stroom.bytebuffer.impl6.ByteBufferFactory;
import stroom.planb.impl.PlanBConfig;
import stroom.planb.impl.PlanBDocCache;
import stroom.planb.impl.db.AbstractLmdb;
import stroom.planb.impl.db.RangedStateDb;
import stroom.planb.impl.db.SessionDb;
Expand All @@ -12,6 +11,7 @@
import stroom.planb.impl.db.TemporalRangedStateDb;
import stroom.planb.impl.db.TemporalStateDb;
import stroom.planb.shared.PlanBDoc;
import stroom.util.io.FileUtil;
import stroom.util.logging.LambdaLogger;
import stroom.util.logging.LambdaLoggerFactory;
import stroom.util.shared.time.SimpleDuration;
Expand Down Expand Up @@ -43,33 +43,27 @@ class LocalShard implements Shard {
private static final String LOCK_FILE_NAME = "lock.mdb";

private final ByteBufferFactory byteBufferFactory;
private final PlanBDocCache planBDocCache;
private final Provider<PlanBConfig> configProvider;
private final StatePaths statePaths;
private final String mapName;
private final Path shardDir;

private final ReentrantLock lock = new ReentrantLock();

private volatile PlanBDoc doc;
private final PlanBDoc doc;
private final AtomicInteger useCount = new AtomicInteger();
private volatile AbstractLmdb<?, ?> db;
private volatile boolean open;
private volatile Instant lastAccessTime;
private volatile Instant lastWriteTime;

public LocalShard(final ByteBufferFactory byteBufferFactory,
final PlanBDocCache planBDocCache,
final Provider<PlanBConfig> configProvider,
final StatePaths statePaths,
final String mapName) {
final PlanBDoc doc) {
this.byteBufferFactory = byteBufferFactory;
this.planBDocCache = planBDocCache;
this.configProvider = configProvider;
this.statePaths = statePaths;
this.mapName = mapName;
this.doc = doc;
lastWriteTime = Instant.now();
this.shardDir = statePaths.getShardDir().resolve(mapName);
this.shardDir = statePaths.getShardDir().resolve(doc.getUuid());
}

private void incrementUseCount() {
Expand Down Expand Up @@ -106,6 +100,20 @@ private void decrementUseCount() {
}
}

@Override
public void delete() {
lock.lock();
try {
if (useCount.get() == 0) {
LOGGER.info(() -> "Deleting data for: " + doc);
cleanup();
FileUtil.deleteDir(shardDir);
}
} finally {
lock.unlock();
}
}

@Override
public void merge(final Path sourceDir) {
boolean success = false;
Expand Down Expand Up @@ -141,19 +149,39 @@ public void merge(final Path sourceDir) {
}

@Override
public void condense() {
public void condense(final PlanBDoc doc) {
try {
final PlanBDoc doc = getDoc();
if (doc != null && doc.isCondense()) {
// Find out how old data needs to be before we condense it.
final long condenseBeforeMs;
if (doc.isCondense()) {
final SimpleDuration duration = SimpleDuration
.builder()
.time(doc.getCondenseAge())
.timeUnit(doc.getCondenseTimeUnit())
.build();
final Instant maxAge = SimpleDurationUtil.minus(Instant.now(), duration);
condenseBeforeMs = SimpleDurationUtil.minus(Instant.now(), duration).toEpochMilli();
} else {
condenseBeforeMs = 0;
}

// Find out how old data needs to be before we delete it.
final long deleteBeforeMs;
if (!doc.isRetainForever()) {
final SimpleDuration duration = SimpleDuration
.builder()
.time(doc.getRetainAge())
.timeUnit(doc.getRetainTimeUnit())
.build();
deleteBeforeMs = SimpleDurationUtil.minus(Instant.now(), duration).toEpochMilli();
} else {
deleteBeforeMs = 0;
}

// If we are condensing or deleting data then do so.
if (condenseBeforeMs > 0 || deleteBeforeMs > 0) {
incrementUseCount();
try {
db.condense(maxAge);
db.condense(condenseBeforeMs, deleteBeforeMs);
} finally {
decrementUseCount();
}
Expand Down Expand Up @@ -256,31 +284,16 @@ private boolean isIdle() {
configProvider.get().getMinTimeToKeepEnvOpen().getDuration()));
}

private PlanBDoc getDoc() {
if (doc == null) {
doc = planBDocCache.get(mapName);
if (doc == null) {
LOGGER.warn(() -> "No PlanB doc found for '" + mapName + "'");
throw new RuntimeException("No PlanB doc found for '" + mapName + "'");
}
}
return doc;
}

private void open() {
final PlanBDoc doc = getDoc();
final String mapName = doc.getName();

final Path shardDir = statePaths.getShardDir().resolve(mapName);
if (Files.exists(shardDir)) {
LOGGER.info(() -> "Found local shard for '" + mapName + "'");
LOGGER.info(() -> "Found local shard for '" + doc + "'");
db = openDb(doc, shardDir);


} else {
// If this node is supposed to be a node that stores shards, but it doesn't have it, then error.
final String message = "Local Plan B shard not found for '" +
mapName +
doc +
"'";
LOGGER.error(() -> message);
throw new RuntimeException(message);
Expand Down Expand Up @@ -308,4 +321,9 @@ private void open() {
default -> throw new RuntimeException("Unexpected state type: " + doc.getStateType());
}
}

@Override
public PlanBDoc getDoc() {
return doc;
}
}
Loading

0 comments on commit 5cdca6f

Please sign in to comment.