Skip to content

Commit

Permalink
#4682 PlanB State Store
Browse files Browse the repository at this point in the history
  • Loading branch information
stroomdev66 committed Jan 15, 2025
1 parent 6d348f9 commit 9c36b5f
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -95,10 +95,10 @@ void test() throws IOException {
scyllaDbDoc.setConnection(ScyllaDbUtil.getDefaultConnection());
scyllaDbDocStore.writeDocument(scyllaDbDoc);

createStateDoc(scyllaDbDoc, "hostname_to_location_map");
createStateDoc(scyllaDbDoc, "hostname_to_ip_map");
createStateDoc(scyllaDbDoc, "id_to_user_map");
createStateDoc(scyllaDbDoc, "number_to_id");
final DocRef stateDoc1 = createStateDoc(scyllaDbDoc, "hostname_to_location_map");
final DocRef stateDoc2 = createStateDoc(scyllaDbDoc, "hostname_to_ip_map");
final DocRef stateDoc3 = createStateDoc(scyllaDbDoc, "id_to_user_map");
final DocRef stateDoc4 = createStateDoc(scyllaDbDoc, "number_to_id");

// Add reference data to state store.
// Setup the pipeline.
Expand All @@ -125,12 +125,15 @@ void test() throws IOException {
assertThat(refDataProcessResults).hasSize(3);

// Add event data and processor filters.
final List<PipelineReference> pipelineReferences = Collections.singletonList(PipelineDataUtil.createReference(
"translationFilter",
"pipelineReference",
new DocRef(StateDoc.TYPE, UUID.randomUUID().toString()),
null,
null));
final List<PipelineReference> pipelineReferences = Stream
.of(stateDoc1, stateDoc2, stateDoc3, stateDoc4)
.map(docRef -> PipelineDataUtil.createReference(
"translationFilter",
"pipelineReference",
docRef,
null,
null))
.toList();
commonTranslationTestHelper.setupStateProcess(
CommonTranslationTestHelper.FEED_NAME,
Collections.singletonList(CommonTranslationTestHelper.VALID_RESOURCE_NAME),
Expand Down Expand Up @@ -181,11 +184,12 @@ private void process(final int expectedProcessCount) {
}
}

private void createStateDoc(final ScyllaDbDoc scyllaDbDoc, final String name) {
private DocRef createStateDoc(final ScyllaDbDoc scyllaDbDoc, final String name) {
final DocRef docRef = stateDocStore.createDocument(name);
final StateDoc doc = stateDocStore.readDocument(docRef);
doc.setScyllaDbRef(scyllaDbDoc.asDocRef());
doc.setStateType(StateType.TEMPORAL_STATE);
stateDocStore.writeDocument(doc);
return docRef;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import stroom.util.logging.LogUtil;
import stroom.util.shared.Severity;

import jakarta.annotation.Nullable;
import jakarta.inject.Inject;

import java.time.Instant;
Expand Down Expand Up @@ -98,8 +99,8 @@ public class ReferenceData {
final PipelineStore pipelineStore,
final SecurityContext securityContext,
final TaskContextFactory taskContextFactory,
final StateLookup stateLookup,
final PlanBLookup planBLookup) {
@Nullable final StateLookup stateLookup,
@Nullable final PlanBLookup planBLookup) {
this.effectiveStreamService = effectiveStreamService;
this.feedHolder = feedHolder;
this.metaHolder = metaHolder;
Expand Down Expand Up @@ -244,13 +245,21 @@ private void doGetValue(final List<PipelineReference> pipelineReferences,
final String mapName = lookupIdentifier.getPrimaryMapName();
if (PlanBDoc.TYPE.equals(pipeline.getType())) {
// TODO : @66 TEMPORARY INTEGRATION OF STATE LOOKUP USING PIPELINE AS STATE DOC REFERENCE.
if (planBLookup != null && mapName.equalsIgnoreCase(pipeline.getName())) {
Objects.requireNonNull(planBLookup,
"Attempt to perform Plan B state lookup but Plan B service is not present");
Objects.requireNonNull(pipeline.getName(), "Null name for Plan B doc ref in lookup");

if (mapName.equalsIgnoreCase(pipeline.getName())) {
planBLookup.lookup(lookupIdentifier, referenceDataResult);
}

} else if (StateDoc.TYPE.equals(pipeline.getType())) {
// TODO : @66 TEMPORARY INTEGRATION OF STATE LOOKUP USING PIPELINE AS STATE DOC REFERENCE.
if (planBLookup != null && mapName.equalsIgnoreCase(pipeline.getName())) {
Objects.requireNonNull(stateLookup,
"Attempt to perform state lookup but state lookup service is not present");
Objects.requireNonNull(pipeline.getName(), "Null name for state doc ref in lookup");

if (mapName.equalsIgnoreCase(pipeline.getName())) {
stateLookup.lookup(lookupIdentifier, referenceDataResult);
}

Expand Down

0 comments on commit 9c36b5f

Please sign in to comment.