Skip to content

Commit

Permalink
#4682 PlanB State Store
Browse files Browse the repository at this point in the history
  • Loading branch information
stroomdev66 committed Jan 16, 2025
1 parent 9c36b5f commit 891b388
Show file tree
Hide file tree
Showing 10 changed files with 248 additions and 46 deletions.
53 changes: 31 additions & 22 deletions content-packs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,98 +5,107 @@
repo: {
name: "stroom-content",
uri : "https://github.com/gchq/stroom-content.git",
branch: "7.5",
commit: "d5371bd52c8f812fdace2c35d40d84fc79c1f49d"
branch: "planb",
commit: "325c2df25e729b34ae058e1f0dab74bc9c262c9e"
}
}, {
name : "event-logging-xml-schema",
path : "source/event-logging-xml-schema/stroomContent",
repo: {
name: "stroom-content",
uri : "https://github.com/gchq/stroom-content.git",
branch: "7.5",
commit: "d5371bd52c8f812fdace2c35d40d84fc79c1f49d"
branch: "planb",
commit: "325c2df25e729b34ae058e1f0dab74bc9c262c9e"
}
}, {
name: "example-index",
path: "source/example-index/stroomContent",
repo: {
name: "stroom-content",
uri : "https://github.com/gchq/stroom-content.git",
branch: "7.5",
commit: "d5371bd52c8f812fdace2c35d40d84fc79c1f49d"
branch: "planb",
commit: "325c2df25e729b34ae058e1f0dab74bc9c262c9e"
}
}, {
name: "internal-dashboards",
path: "source/internal-dashboards/stroomContent",
repo: {
name: "stroom-content",
uri : "https://github.com/gchq/stroom-content.git",
branch: "7.5",
commit: "d5371bd52c8f812fdace2c35d40d84fc79c1f49d"
branch: "planb",
commit: "325c2df25e729b34ae058e1f0dab74bc9c262c9e"
}
}, {
name: "internal-statistics-sql",
path: "source/internal-statistics-sql/stroomContent",
repo: {
name: "stroom-content",
uri : "https://github.com/gchq/stroom-content.git",
branch: "7.5",
commit: "d5371bd52c8f812fdace2c35d40d84fc79c1f49d"
branch: "planb",
commit: "325c2df25e729b34ae058e1f0dab74bc9c262c9e"
}
}, {
name: "internal-statistics-stroom-stats",
path: "source/internal-statistics-stroom-stats/stroomContent",
repo: {
name: "stroom-content",
uri : "https://github.com/gchq/stroom-content.git",
branch: "7.5",
commit: "d5371bd52c8f812fdace2c35d40d84fc79c1f49d"
branch: "planb",
commit: "325c2df25e729b34ae058e1f0dab74bc9c262c9e"
}
}, {
name: "standard-pipelines",
path: "source/standard-pipelines/stroomContent",
repo: {
name: "stroom-content",
uri : "https://github.com/gchq/stroom-content.git",
branch: "7.5",
commit: "d5371bd52c8f812fdace2c35d40d84fc79c1f49d"
branch: "planb",
commit: "325c2df25e729b34ae058e1f0dab74bc9c262c9e"
}
}, {
name: "stroom-101",
path: "source/stroom-101/stroomContent",
repo: {
name: "stroom-content",
uri : "https://github.com/gchq/stroom-content.git",
branch: "7.5",
commit: "d5371bd52c8f812fdace2c35d40d84fc79c1f49d"
branch: "planb",
commit: "325c2df25e729b34ae058e1f0dab74bc9c262c9e"
}
}, {
name: "stroom-logs",
path: "source/stroom-logs/stroomContent",
repo: {
name: "stroom-content",
uri : "https://github.com/gchq/stroom-content.git",
branch: "7.5",
commit: "d5371bd52c8f812fdace2c35d40d84fc79c1f49d"
branch: "planb",
commit: "325c2df25e729b34ae058e1f0dab74bc9c262c9e"
}
}, {
name: "template-pipelines",
path: "source/template-pipelines/stroomContent",
repo: {
name: "stroom-content",
uri : "https://github.com/gchq/stroom-content.git",
branch: "7.5",
commit: "d5371bd52c8f812fdace2c35d40d84fc79c1f49d"
branch: "planb",
commit: "325c2df25e729b34ae058e1f0dab74bc9c262c9e"
}
}, {
name: "state",
path: "source/state/stroomContent",
repo: {
name: "state",
uri : "https://github.com/gchq/stroom-content.git",
branch: "7.5",
commit: "d5371bd52c8f812fdace2c35d40d84fc79c1f49d"
branch: "planb",
commit: "325c2df25e729b34ae058e1f0dab74bc9c262c9e"
}
}, {
name: "planb",
path: "source/planb/stroomContent",
repo: {
name: "planb",
uri : "https://github.com/gchq/stroom-content.git",
branch: "planb",
commit: "325c2df25e729b34ae058e1f0dab74bc9c262c9e"
}
}, {
name: "stroom-visualisations",
Expand Down
187 changes: 187 additions & 0 deletions stroom-app/src/test/java/stroom/pipeline/task/TestPlanBLookupTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Copyright 2016 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package stroom.pipeline.task;

import stroom.data.shared.StreamTypeNames;
import stroom.data.store.mock.MockStore;
import stroom.docref.DocRef;
import stroom.meta.mock.MockMetaService;
import stroom.meta.shared.Meta;
import stroom.meta.shared.MetaFields;
import stroom.pipeline.shared.PipelineDoc;
import stroom.pipeline.shared.data.PipelineDataUtil;
import stroom.pipeline.shared.data.PipelineReference;
import stroom.planb.impl.PlanBDocStore;
import stroom.planb.impl.data.MergeProcessor;
import stroom.planb.shared.PlanBDoc;
import stroom.planb.shared.StateType;
import stroom.processor.api.ProcessorFilterService;
import stroom.processor.api.ProcessorResult;
import stroom.processor.shared.CreateProcessFilterRequest;
import stroom.processor.shared.QueryData;
import stroom.query.api.v2.ExpressionOperator;
import stroom.query.api.v2.ExpressionTerm;
import stroom.test.AbstractProcessIntegrationTest;
import stroom.test.CommonTranslationTestHelper;
import stroom.test.StoreCreationTool;
import stroom.test.common.ComparisonHelper;
import stroom.test.common.StroomPipelineTestFileUtil;
import stroom.util.shared.Severity;

import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;

class TestPlanBLookupTask extends AbstractProcessIntegrationTest {

private static final int N3 = 3;
private static final int N4 = 4;

private static final String DIR = "TestPlanBLookupTask/";

@Inject
private MockMetaService metaService;
@Inject
private MockStore streamStore;
@Inject
private CommonTranslationTestHelper commonTranslationTestHelper;
@Inject
private PlanBDocStore stateDocStore;
@Inject
private ProcessorFilterService processorFilterService;
@Inject
private StoreCreationTool storeCreationTool;
@Inject
private MergeProcessor mergeProcessor;

@Test
void test() throws IOException {
// Load reference data and create processing pipelines.
commonTranslationTestHelper.createReferenceFeeds();

// Process reference data.
process(3);

final DocRef stateDoc1 = createStateDoc("hostname_to_location_map");
final DocRef stateDoc2 = createStateDoc("hostname_to_ip_map");
final DocRef stateDoc3 = createStateDoc("id_to_user_map");
final DocRef stateDoc4 = createStateDoc("number_to_id");

// Add reference data to state store.
// Setup the pipeline.
final DocRef pipelineRef = new DocRef(PipelineDoc.TYPE, "571a3ca3-5f94-4dcb-8289-119a95d60360");
// Setup the stream processor filter to process all reference data.
final QueryData findStreamQueryData = QueryData.builder()
.dataSource(MetaFields.STREAM_STORE_DOC_REF)
.expression(ExpressionOperator.builder()
.addTextTerm(MetaFields.TYPE,
ExpressionTerm.Condition.EQUALS,
StreamTypeNames.REFERENCE)
.build())
.build();
processorFilterService.create(
CreateProcessFilterRequest
.builder()
.pipeline(pipelineRef)
.queryData(findStreamQueryData)
.priority(2)
.build());

// Process reference data into state store.
final List<ProcessorResult> refDataProcessResults = commonTranslationTestHelper.processAll();
assertThat(refDataProcessResults).hasSize(3);

// Now merge all of the current state data.
mergeProcessor.mergeCurrent();

// Add event data and processor filters.
final List<PipelineReference> pipelineReferences = Stream
.of(stateDoc1, stateDoc2, stateDoc3, stateDoc4)
.map(docRef -> PipelineDataUtil.createReference(
"translationFilter",
"pipelineReference",
docRef,
null,
null))
.toList();
commonTranslationTestHelper.setupStateProcess(
CommonTranslationTestHelper.FEED_NAME,
Collections.singletonList(CommonTranslationTestHelper.VALID_RESOURCE_NAME),
pipelineReferences);

// Process event data, decorating with state.
process(1);

final Path inputDir = StroomPipelineTestFileUtil.getTestResourcesDir().resolve(DIR);
final Path outputDir = StroomPipelineTestFileUtil.getTestOutputDir().resolve(DIR);

for (final Entry<Long, Meta> entry : metaService.getMetaMap().entrySet()) {
final long streamId = entry.getKey();
final Meta meta = entry.getValue();
if (StreamTypeNames.EVENTS.equals(meta.getTypeName())) {
final byte[] data = streamStore.getFileData().get(streamId).get(meta.getTypeName());

// Write the actual XML out.
final OutputStream os = StroomPipelineTestFileUtil.getOutputStream(outputDir,
"TestPlanBLookupTask.out");
os.write(data);
os.flush();
os.close();

ComparisonHelper.compareFiles(inputDir.resolve("TestPlanBLookupTask.out"),
outputDir.resolve("TestPlanBLookupTask.out"));
}
}

// // Make sure 26 records were written.
// assertThat(results.get(N3).getWritten())
// .isEqualTo(26);
}

private void process(final int expectedProcessCount) {
final List<ProcessorResult> results = commonTranslationTestHelper.processAll();
assertThat(results).hasSize(expectedProcessCount);
for (final ProcessorResult result : results) {
assertThat(result.getWritten())
.as(result.toString())
.isGreaterThan(0);
assertThat(result.getRead())
.as(result.toString())
.isLessThanOrEqualTo(result.getWritten());
assertThat(result.getMarkerCount(Severity.SEVERITIES))
.as(result.toString())
.isZero();
}
}

private DocRef createStateDoc(final String name) {
final DocRef docRef = stateDocStore.createDocument(name);
final PlanBDoc doc = stateDocStore.readDocument(docRef);
doc.setStateType(StateType.TEMPORAL_STATE);
stateDocStore.writeDocument(doc);
return docRef;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package stroom.pipeline.task;


import stroom.data.shared.StreamTypeNames;
import stroom.data.store.mock.MockStore;
import stroom.docref.DocRef;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public void importStandardPacks() {
ContentPacks.EVENT_LOGGING_XML_SCHEMA_PACK,
ContentPacks.TEMPLATE_PIPELINES_PACK,
ContentPacks.STANDARD_PIPELINES_PACK,
ContentPacks.STATE
ContentPacks.STATE,
ContentPacks.PLANB
));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ void test() {
.collect(Collectors.joining("\n")));

assertThat(list.size())
.isEqualTo(6);
.isEqualTo(7);

criteria = new FindXMLSchemaCriteria();
criteria.setSystemId("file://event-logging-v3.0.0.xsd");
Expand All @@ -85,6 +85,6 @@ void test() {
list = xmlSchemaStore.find(criteria).getValues();
assertThat(list).isNotNull();
assertThat(list.size())
.isEqualTo(6);
.isEqualTo(7);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ public class ContentPacks {
private static final GitRepo STROOM_CONTENT_GIT_REPO =
new GitRepo("stroom-content",
"https://github.com/gchq/stroom-content.git",
"7.5",
"d5371bd52c8f812fdace2c35d40d84fc79c1f49d");
"planb",
"325c2df25e729b34ae058e1f0dab74bc9c262c9e");
private static final GitRepo STROOM_VISUALISATION_DEV_GIT_REPO =
new GitRepo("stroom-visualisations-dev",
"https://github.com/gchq/stroom-visualisations-dev.git",
Expand Down Expand Up @@ -43,6 +43,8 @@ public class ContentPacks {

public static final ContentPack STATE = createStandardContentPack("state");

public static final ContentPack PLANB = createStandardContentPack("planb");

//TEMPLATE_PIPELINES_PACK

public static final ContentPack VISUALISATIONS = createVisualisationContentPack(
Expand All @@ -59,6 +61,7 @@ public class ContentPacks {
STROOM_101,
STROOM_LOGS,
STATE,
PLANB,
TEMPLATE_PIPELINES_PACK,
VISUALISATIONS));

Expand Down

Large diffs are not rendered by default.

Loading

0 comments on commit 891b388

Please sign in to comment.