diff --git a/stroom-app/src/main/resources/ui/noauth/swagger/stroom.json b/stroom-app/src/main/resources/ui/noauth/swagger/stroom.json index a2ed19db5e..6c73b3f948 100644 --- a/stroom-app/src/main/resources/ui/noauth/swagger/stroom.json +++ b/stroom-app/src/main/resources/ui/noauth/swagger/stroom.json @@ -20368,8 +20368,8 @@ "type" : "integer", "format" : "int64" }, - "mapName" : { - "type" : "string" + "planBDocRef" : { + "$ref" : "#/components/schemas/DocRef" } } }, diff --git a/stroom-app/src/main/resources/ui/noauth/swagger/stroom.yaml b/stroom-app/src/main/resources/ui/noauth/swagger/stroom.yaml index 3ceafb1b83..5fc823d4b7 100644 --- a/stroom-app/src/main/resources/ui/noauth/swagger/stroom.yaml +++ b/stroom-app/src/main/resources/ui/noauth/swagger/stroom.yaml @@ -15052,8 +15052,8 @@ components: effectiveTime: type: integer format: int64 - mapName: - type: string + planBDocRef: + $ref: "#/components/schemas/DocRef" SolrConnectionConfig: type: object properties: diff --git a/stroom-state/stroom-planb-impl/README.md b/stroom-state/stroom-planb-impl/README.md new file mode 100644 index 0000000000..ecaaa254ab --- /dev/null +++ b/stroom-state/stroom-planb-impl/README.md @@ -0,0 +1,430 @@ +# Introduction +Plan B is a new implementation of a state store with the aim of improving storage and retrieval performance of state values. + +We currently have the previous Stroom State feature and two versions of statistics called Stroom Statistics and Stroom Stats Store. +Choosing a unique name for this feature will make all discussions less confusing hence the current name, Plan B. +The name can be changed but it needs to remain unique and memorable for this reason. + +The initial implementation of Plan B seeks to prove the write/read process for an LMDB shard based approach to state storage and query before any optimisations are considered. +Future optimisations will be discussed in the document. + + +# Process +To use the new Plan B state store feature the user must create Plan B docs in the UI for each store they want to create. +Documents are typically analogous to maps in lookup functions and should be named as such. + +The maps must be uniquely named as all lower case characters and underscores, e.g. `my_map_name`. +Map names are case insentive to existing XSLT that loads reference data with a different case will still work, e.g. `MY_MAP_NAME` in XSLT will work with a Plan B document called `my_map_name`. + +## Writing +Once the Plan B documents are created, users can load data into the maps. +Just like the previous state store implementation, Plan B can store data in the following ways: +* State +* Temporal State +* Ranged State +* Temporal Ranged State +* Session + +Each of these store types require data to be descride in specific ways, however the first 4 types will work with data specified in the same way as any existing reference data. +Specifics for each state type will be discussed later in this document. + +Because the data required for the first 4 types is the same as current reference data, it is easy for users to load data into these stores by just creating a new pipeline that is similar to the `Reference Loader` standard pipeline. +The only difference is that the new pipeline will need a `PlanBFilter` inserted at the end of the pipeline instead of the `ReferenceDataFilter`. +Reference data loads data lazily on request whereas Plan B data is stored in advance. +For this reason the new pipeline can have a new processing filter created to immediately start loading reference data into the stores by processing all data with a stream type of `Reference Data`. + +Each stream processed by the loading pipeline will create a set of local LMDB instances, one per map, for the data contained within that stream provided to it by the `PlanBFilter`. +The location that the LMDB instances is written to is controlled by `stroom.planb.path` and defaults to `${stroom.home}/planb`. +Pipelines will write data to the `writer` sub directory of this path, e.g. `${stroom.home}/planb/writer`. +The final step of the pipeline execution before it completes processing it to upload the LMDB store to one or more storage nodes. + +## Uploading +Storage nodes are configured in the global Stroom config with the property `stroom.planb.nodeList`. +As the property is a list the system administrator must provide a comma delimited string of node names starting with a comma, e.g. `,node1a,node2a`. +If the property is not set then Plan B will assume that you are using a single node instance of Stroom as is commonly the case for demo and test purposes. + +The processing will zip all of the LMDB instances it has just created and upload them to all of the nodes in the node list. +If the node in the node list is the same as the one that has done the processing then it will just move the zip file. +This is also the case if no nodes are configured as a single node is assumed. +The stream processing will only complete sucessully without a fatal error if the zip file can be uploaded to all storage nodes. +Failure to upload the zip to any of the nodes will log a fatal error and the stream will need to be processed again once the problem has been resolved. +Upload could fail for a number of reasons including full disk, network problems, down nodes etc. +All processing and storage is expected to be idempotent so that future attempts to load the data again will just add the data to the store again. +It is assumed that the data is the same for every load attempt. +It may be necessary to rebuild a store if erroneous data is accidentally loaded unless the user is able to load data that nullifies the effect of the previous load by overwriting the same keys with corrected values. + +> It may be possible in a future iteration to delete data based on a search query to correct data. + +Data uploaded to a storage node (or moved if the processing node is a storage node) is placed in the `receive` sub direcotry of the Plan B path, e.g. `${stroom.home}/planb/receive`. +Temporary files will be created here as data is streamed from the processing node. +Once the processing node has finished successully streaming data to the receiving node then the data will be moved to the `staging` sub direcotry of the Plan B path, e.g. `${stroom.home}/planb/staging`. +This location represents a sequential file store that is waiting for a reader to process received items in order. + +## Merging +Storage nodes need to run the job `Plan B Merge Processor`. +This job will run perpertually once started and will merge data placed in the `staging` sequential file store as soon as it is available. +Each zip provided by the sequential file store using the `staging` directory is unzipped to the `merging` directory, e.g. `${stroom.home}/planb/merging`. +Once uncompressed each subdirectory containing an LMDB instance will be merged into shards in the `shards` directory, e.g. `${stroom.home}/planb/shards`. + +> Note that the LMDB databases representing a Plan B map are known as shards as it was originally planned that the keyspace for each map would be split amoung multiple shards. +> This is not currently the case but may end up being necessary to reduce the size of snapshots that are downloaded. +> Any future requirement for sharding will be dependant on specific store types and keyspace distribution. + +If an LMDB instance is the first for a given map then the LMDB instance will just be moved to become the basis for future merges. +If a shard already exists for a map then the new LMDB instance will be merged into the existing shard by reading the contents of the new instance and writing them to the existing. +This process should be very fast as bytebuffers can be directly copied with little to no serialisation/deserialisation needed. + +## Querying +Plan B stores can be queried in a variety of ways but in all cases the same mechanism for reading the data is used. +The system will always look to see if the current node that needs the data is a storage node. +If it is a storage node then the query will be performed directly on the data in the local store. +If the current node needing the data is not a storage node then it will use a snapshot. +A snapshot is requested from a storage node and stored locally for a period of time. +Snapshots are stored in the `snapshots` sub directory, e.g. `${stroom.home}/planb/snapshots`. +There may be multiple snapshots related to a Plan B document, fetched at different times. +The snapshots are in subdiretories under `/`. + +A node will try to get a snapshot from each of the storage nodes in the order specified in config and will try each until it is able to get a snapshot. +The storage node will zip the current LMDB instance it has and will return that data to the requesting node. + +> Note that we could slice the data in various ways to provide a more focussed snapshot, e.g. using an effective time window, but this won't be implemented unless we find it necessary. + +If no previous snapshot existed on the requesting node and no snapshot can be fetched then the requesting node will error. +If a previous snapshot is available then it will continue to be used until another attempt can be made after `stroom.planb.snapshotRetryFetchInterval` (default 1 minute). + +Snapshots will be kept at least as long as `stroom.planb.minTimeToKeepSnapshots` (default 10 minutes) after which time they will asynchronously try to get another snapshot, continuing to use the previous one until a new one has been successfully fetched. + +To improve read performance LMDB instrances will remain open between reads for at least `stroom.planb.minTimeToKeepEnvOpen` (default 1 minute). + +### XSLT `stroom:lookup()` +A user can query Plan B state using the standard `lookup()` XSLT function. +Lookups in XSLT are done in the same way as reference data lookups in the XSLT but the `Pipeline Reference` in the relevant `XSLTFilter` must point to the necessary Plan B documents for each of the maps that is required. +For example if you have `stroom:lookup('my_map_name', 'some_key')` then you will need a Plan B doc called `my_map_name` (case insensitive) that has data for `some_key` and the Plan B doc must be specified as a reference pipeline. + +> Note that Plan B documents are obviously not pipelines, but for the purposes of the initial version they will be treated as such by the UI when picking an XSLT lookup pipeline reference. +> The UI will show Plan B documents when picking a reference loader. +> The other properties such as feed can be left blank. + +Assuming all of the above setup is correct then the Plan B store ought to be able to be used for lookups in exactly the same way as reference data. + +The user that the pipeline is running as must have permission to `Use` the Plan B document that is referenced. + +### The `getState()` StroomQL/Dashboard function +The 'getState()' StroomQL/Dashboard function can be used to query Plan B. +As long as the map name used equals a Plan B document name and provided the user has permission to `Use` the document then a state lookup can be performed. +The function takes the map name, key and optional effective time (for temporal state) as parameters. + +### StroomQL/Dashboard Data Source +Plan B documents can be used as query data sources in StroomQL queries and dashboards. +In StroomQL you just need to specify the map name as the data source, e.g. `from my_map_name`. +Dashboard queries need to point to the Plan B document as the data source. + +The fields available will depend on the store type being queried. + +#### State +* Key (text) +* ValueType (text) +* Value (text) + +#### Temporal State +* Key (text) +* EffectiveTime (date) +* ValueType (text) +* Value (text) + +#### Ranged State +* KeyStart (number) +* KeyEnd (number) +* ValueType (text) +* Value (text) + +#### Temporal Ranged State +* KeyStart (number) +* KeyEnd (number) +* EffectiveTime (date) +* ValueType (text) +* Value (text) + +#### Session +* Key (text) +* Start (date) +* End (date) + +# Directory Structure And Processing Order +To recap the directories used by Plan B are as follows: +* `writer` - The initial directory used by pipelines writing data. +* `receive` - The dir that the receiving storage node initially writes zip data during an upload. +* `staging` - The sequential file store where zip files await merge. +* `merging` - A temporary location where data is unzipped during the merging process. +* `shards` - The stored shards resulting from the merge process. +* `snapshots` - Client node snapshots of data from storage nodes. + +# Data Structure +Each store type stores data in LMDB in a specific way for that store type. +There are various tradeoffs between performance and disk usage to be considered with each scheme. +The mak key length for LMDB of 512 bytes complicates even the simples key value storage scheme if keys are longer than the max size. +At present Plan B implements a single scheme for each store type that aims to fit all data and still be performant, however this may use more disk than is desirable for some data. +Due to the highly data dependant nature of this problem it is likely that future iterations will need to provide some advanced options for choosing specific schemes. + +Current and other possible schemes will now be described for each store type. + + + + +## State +State stores just key/value pairs. + +The data is currently stored as follows: + +* `` +* `` + +### Key +If we could gurantee that a key would always be less than 512 bytes then it would obviously be the best option to just store the key directly. + +Because we cannot gurantee that this is the case without some future user configuration we instead need to store the key differently. +The key is converted into a long hash using `xx3` byte hashing. +This long is used as the key in the table and the real key is inserted before the value. +As with any hashing there is a change that we will get hash clashes between different keys. +Whenever we insert data and already have a row with a matching key hash we also check that the full key in the value matches. +If we have a match then the data can be overwritten. +If the key does not match (i.e. we have a hash clash) then look across all rows for the same key hash to see if we can find one. +If we can't find a matching row then we insert a new row for the new key hash and key value pair. +Note that the LMDB instance is configured to support duplicate key rows for this scheme to operate. +Searching across existing values is potentially expensive if we are constantly overwriting data and/or have many hash clashes leading us to verify existing rows. +However, it is assumed that a long hash will not lead to many hash clashes. + +#### Option - Direct key storage +Allow storage of keys < 512 bytes directly. +Potentially much faster. + +#### Option - Increase the key size of LMDB +Increase the key size of LMDB. +This requires recompiling LMDB so isn't ideal. +It is also unknown what nasty side effects could be uncovered by doing this. + +#### Option - Reduce hash length +We could potentially get away with an `int` hash depending on probability of key clashes. +This would reduce storage size. + +#### Option - Use a lookup table +Alternatively we could use a lookup table for the key. +A lookup table could store the key as a value with a numeric index as the key. +The numeric index (pointer) could then be used in the primary table. +Inserts would need to first see if the key exists. +To make this performant we would still need to use a hash of the key for the index otehrwise we would need to scan the table to find the key and associated index. +We could cache this information but it is unlikely that we could hold the whole table in heap and unless there was a high chance of cache hits this would be pointless. +Hashing the key comes with the same problems as the current solution in terms of hash clashes. +We would potentially need to store multiple rows for each key if there were hash clashes and have an additional numeric part to uniquely identify them, e.g. + +* `` + +This would mean an even longer key lengh than the current solution. +If we had many key hash clashes then this would also suffer greatly and potentially require a `long` for the unique part. +Having said that we might also get away with an `int` hash and even shorter unique part depending on the data. + +For state storage we don't get any deduplication by storing keys in a lookup table as every key is used only once. +This means that this scheme would use certainly use more storage as there would be no deduplication benefit and storage of index pointers in the lookup and primary tables. + +In testing the use of lookup tables for keys and values was found to be far slower hence due to the additional processing involved. +Using lookup tables might be a useful future option for keys and values for data sets that have high degrees of duplication to save storage at the cost of performance. +However it will never be appropriate for storing the key of non temporal state. + +Deletions are harder with lookup tables if you want to delete from the lookup table as well as the primary table as you need to ensure there are no uses of a lookup value before you can delete it. + +### Value +The value is just a two part encoding of the value type (byte) and value (bytes). +In the current scheme this comes after the key as the key is a prefix for the value. + +`` + +There is no more efficient way of storing the value part from a performance perspective. + +#### Option - Compression +It is unlikley that additional compression would substantially affect the size of the value assuming that appropriate serialisation is performed upastream, e.g. `Fast Infoset`. + +#### Option - Use a lookup table +If we have datasets that use the same value for many keys we may benefit from the use of a lookup table as previously discussed for keys. +This would potentially save storage if we have highly duplicated large values but again comes as a read/write performance cost. + + + + + + +## Temporal State +The data is currently stored as follows: + +* `` +* `` + +### Key +If we could gurantee that a key would always be less than 512 bytes minus an effecive time suffix then it would obviously be the best option to just store the key directly. +This would be a max key length of 512 - key length (int) - effective time (long), e.g. `512 - 4 - 8 = 500` + +At present a long hash is used for the key as descibed for `State`. + +Because we have temporal state we may end up with many duplicate keys increasing storage use. + +#### Option - Direct key storage +Allow storage of keys < 500 bytes directly. + +* `` + +Potentially faster. +Because we have temporal state we may end up with many duplicate keys increasing storage use. + +#### Option - Increase the key size of LMDB +As above. + +#### Option - Reduce hash length +As above. + +#### Option - Use a lookup table +Pros and cons as above but potentially more beneficial as we are likely to insert the same keys multiple times due to storing temporal state. + +### Value +Currently stored the same way as the `State` scheme. + +#### Option - Use a lookup table +All considerations are the same as for `State` except that as with the key, the temporal nature of this storage type potentially increases the duplication of values. +However, despite keys being duplicated in temporal stores it is not necessarily the case that values will do so to the same degree as values could be completely unique over time. +The choice of a lookup table to deduplicate values is still highly data dependant. + + + + + + +## Ranged State +The data is currently stored as follows: + +* `` +* `` + +### Key +Keys are only ever 16 bytes long as they are always two longs. +This makes storing the key very simple without any need for alternative schemes. + +### Value +Because the key is simple the value is just stored directly as a simple type and data structure. + +#### Option - Use a lookup table +The same considerations apply as `State` for the pros and cons of deduplicating the value with a lookup table. + + + + + +## Temporal Ranged State +The data is currently stored as follows: + +* `` +* `` + +### Key +Keys are only ever 24 bytes long as they are always three longs. +This makes storing the key very simple without any need for alternative schemes. + +### Value +Because the key is simple the value is just stored directly as a simple type and data structure. + +#### Option - Use a lookup table +The same considerations apply as `Temporal State` for the pros and cons of deduplicating the value with a lookup table. + + + + + + +## Session +The data is currently stored as follows: + +* `` +* `` + +### Key +Sessions are just keys with start and end times. +The current scheme creates a hash of the key and stores the actual key in the value in a way similar to `State`. + +#### Option - Direct key storage +Allow storage of keys < 492 bytes directly. + +* `` +* `(empty)` + +Potentially faster. +Sessions may end up with many duplicate keys increasing storage use, especially without session compaction. +No need to store anything as a value in the table as we only care about the key. + +#### Option - Increase the key size of LMDB +As above. + +#### Option - Reduce hash length +As above. + +#### Option - Use a lookup table + +-- primary -- +* `` +* `(empty)` + +-- lookup -- +* `` +* `` + + +There are similar considerations to other store types however, if users can keep state names short then potentially there is less benefit to deduplication using a lookup table. +In my tests I have very short state names so a lookup table would not be beneficial. +If we used a lookup table to store the key then we wouldn't need to store a value in the primary table. + +### Value +No value is needed for `Session` stores as we only care about the key and session start/end times. + +# Compaction +Stores with temporal data can be compacted: +* `Temporal State` - Repeated confirmations of identical state can be removed. +* `Temporal Ranged State` - Repeated confirmations of identical state can be removed. +* `Session` - Overlapping sesions can be collapsed into a single session. + +Stores are compacted using the `Plan B Maintenance Processor` on the storage nodes. +The compaction settings in the Plan B document govern how compaction is performed and data to consider for compaction based on temporal state. + +> Note that when loading old data it may be necessary to disable compaction for a store until data is loaded and processing is up to date, otherwise some data could be compacted prematurely. + +> We could introduce an update time to all store rows and only compact data based on update time whcih would remove the risk of compacting old data we have only recently added. + +# Data Retention +Stores with temporal data can have data aged off: +* `Temporal State` - Old state data can be deleted. +* `Temporal Ranged State` - Old state data can be deleted. +* `Session` - Old sessions can be deleted. + +Data retention is performed with the same process as compaction, using the `Plan B Maintenance Processor` on the storage nodes. +The retention settings in the Plan B document govern how retention is performed and data to consider for deletion based on temporal state. + +> Note that when loading old data it may be necessary to disable data retention processing for a store until data is loaded and processing is up to date, otherwise some data could be deleted prematurely. + +> We could introduce an update time to all store rows and delete data based on update time rather than temporal state time. + +# Store Deletion +Deleting Plan B documents will not immediately delete the associated LMDB data. +Instead the data will be deleted with the same process as compaction and retention, using the `Plan B Maintenance Processor` on the storage nodes. +And LMDB data stores that are found that do not have an associated Plan B document will be deleted. + +# Cleanup +LMDB environments are kept open for reading and writing, this includes shards and snapshots. +A periodic cleanup job `Plan B shard cleanup` should be run on all nodes when using Plan B to ensure LMDB environments are closed and snapshots cleaned up if the environments have ben idle for longer than `stroom.planb.minTimeToKeepEnvOpen` (default 1 minute). + +# Future Optimisations + +## Snapshot Size Reduction +It may be ncessary to reduce snapshot sizes. +This could be achieved by sharding data on write to specific key ranges or by filtering data on read to produce slices to cover a certain keyspace or effective time range to meet the snapshot request. +Sharding by effective time would be expensive on write as changes to old shards would need to be copied through to all later shards. +Sharding by key ranges could be done but would ideally be optional with various settings to control keyspace splitting as it is largely data dependant. + + + + + diff --git a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/PlanBModule.java b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/PlanBModule.java index f86d2c222f..653509b395 100644 --- a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/PlanBModule.java +++ b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/PlanBModule.java @@ -93,10 +93,16 @@ protected void configure() { ScheduledJobsBinder.create(binder()) .bindJobTo(StateMergeRunnable.class, builder -> builder - .name(MergeProcessor.TASK_NAME) + .name(MergeProcessor.MERGE_TASK_NAME) .description("Plan B state store merge") .cronSchedule(CronExpressions.EVERY_MINUTE.getExpression()) .advanced(true)); + ScheduledJobsBinder.create(binder()) + .bindJobTo(StateMaintenanceRunnable.class, builder -> builder + .name(MergeProcessor.MAINTAIN_TASK_NAME) + .description("Plan B state store maintain") + .cronSchedule(CronExpressions.EVERY_10_MINUTES.getExpression()) + .advanced(true)); ScheduledJobsBinder.create(binder()) .bindJobTo(ShardManagerCleanupRunnable.class, builder -> builder .name(ShardManager.CLEANUP_TASK_NAME) @@ -110,7 +116,15 @@ private static class StateMergeRunnable extends RunnableWrapper { @Inject StateMergeRunnable(final MergeProcessor mergeProcessor) { - super(mergeProcessor::exec); + super(mergeProcessor::merge); + } + } + + private static class StateMaintenanceRunnable extends RunnableWrapper { + + @Inject + StateMaintenanceRunnable(final MergeProcessor mergeProcessor) { + super(mergeProcessor::maintainShards); } } diff --git a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/StateSearchProvider.java b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/StateSearchProvider.java index 1d58679d3b..be4c38e6df 100644 --- a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/StateSearchProvider.java +++ b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/StateSearchProvider.java @@ -40,6 +40,7 @@ import stroom.query.common.v2.ResultStoreFactory; import stroom.query.common.v2.SearchProcess; import stroom.query.common.v2.SearchProvider; +import stroom.security.api.SecurityContext; import stroom.task.api.TaskContextFactory; import stroom.task.api.TaskManager; import stroom.task.shared.TaskProgress; @@ -76,6 +77,7 @@ public class StateSearchProvider implements SearchProvider, IndexFieldProvider { private final TaskContextFactory taskContextFactory; private final ShardManager shardManager; private final ExpressionPredicateFactory expressionPredicateFactory; + private final SecurityContext securityContext; @Inject public StateSearchProvider(final Executor executor, @@ -86,7 +88,8 @@ public StateSearchProvider(final Executor executor, final TaskManager taskManager, final TaskContextFactory taskContextFactory, final ShardManager shardManager, - final ExpressionPredicateFactory expressionPredicateFactory) { + final ExpressionPredicateFactory expressionPredicateFactory, + final SecurityContext securityContext) { this.executor = executor; this.stateDocStore = stateDocStore; this.stateDocCache = stateDocCache; @@ -96,14 +99,17 @@ public StateSearchProvider(final Executor executor, this.taskContextFactory = taskContextFactory; this.shardManager = shardManager; this.expressionPredicateFactory = expressionPredicateFactory; + this.securityContext = securityContext; } private PlanBDoc getPlanBDoc(final DocRef docRef) { - Objects.requireNonNull(docRef, "Null doc reference"); - Objects.requireNonNull(docRef.getName(), "Null doc key"); - final PlanBDoc doc = stateDocCache.get(docRef.getName()); - Objects.requireNonNull(doc, "Null state doc"); - return doc; + return securityContext.useAsReadResult(() -> { + Objects.requireNonNull(docRef, "Null doc reference"); + Objects.requireNonNull(docRef.getName(), "Null doc key"); + final PlanBDoc doc = stateDocCache.get(docRef.getName()); + Objects.requireNonNull(doc, "Null state doc"); + return doc; + }); } @Override @@ -139,7 +145,7 @@ public IndexField getIndexField(final DocRef docRef, final String fieldName) { @Override public Optional fetchDocumentation(final DocRef docRef) { - return Optional.ofNullable(stateDocCache.get(docRef.getName())).map(PlanBDoc::getDescription); + return Optional.ofNullable(getPlanBDoc(docRef)).map(PlanBDoc::getDescription); } @Override @@ -159,7 +165,7 @@ public ResultStore createResultStore(final SearchRequest searchRequest) { final DocRef docRef = query.getDataSource(); // Check we have permission to read the doc. - final PlanBDoc doc = stateDocCache.get(docRef.getName()); + final PlanBDoc doc = getPlanBDoc(docRef); Objects.requireNonNull(doc, "Unable to find state doc with key: " + docRef.getName()); // Extract highlights. diff --git a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/FileTransferClientImpl.java b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/FileTransferClientImpl.java index 1ade3e7d6e..8242ad0b8e 100644 --- a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/FileTransferClientImpl.java +++ b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/FileTransferClientImpl.java @@ -164,7 +164,7 @@ public Instant fetchSnapshot(final String nodeName, LOGGER.info(() -> "Fetching snapshot from '" + nodeName + "' for '" + - request.getMapName() + + request.getPlanBDocRef() + "'"); final String url = NodeCallUtil.getBaseEndpointUrl(nodeInfo, nodeService, nodeName) + ResourcePaths.buildAuthenticatedApiPath( @@ -176,7 +176,7 @@ public Instant fetchSnapshot(final String nodeName, throw new RuntimeException("Error fetching snapshot from '" + nodeName + "' for '" + - request.getMapName() + + request.getPlanBDocRef() + "'", e); } }); diff --git a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/LocalShard.java b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/LocalShard.java index 1859016ecf..1706c1a398 100644 --- a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/LocalShard.java +++ b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/LocalShard.java @@ -3,7 +3,6 @@ import stroom.bytebuffer.impl6.ByteBufferFactory; import stroom.planb.impl.PlanBConfig; -import stroom.planb.impl.PlanBDocCache; import stroom.planb.impl.db.AbstractLmdb; import stroom.planb.impl.db.RangedStateDb; import stroom.planb.impl.db.SessionDb; @@ -12,6 +11,7 @@ import stroom.planb.impl.db.TemporalRangedStateDb; import stroom.planb.impl.db.TemporalStateDb; import stroom.planb.shared.PlanBDoc; +import stroom.util.io.FileUtil; import stroom.util.logging.LambdaLogger; import stroom.util.logging.LambdaLoggerFactory; import stroom.util.shared.time.SimpleDuration; @@ -43,15 +43,12 @@ class LocalShard implements Shard { private static final String LOCK_FILE_NAME = "lock.mdb"; private final ByteBufferFactory byteBufferFactory; - private final PlanBDocCache planBDocCache; private final Provider configProvider; - private final StatePaths statePaths; - private final String mapName; private final Path shardDir; private final ReentrantLock lock = new ReentrantLock(); - private volatile PlanBDoc doc; + private final PlanBDoc doc; private final AtomicInteger useCount = new AtomicInteger(); private volatile AbstractLmdb db; private volatile boolean open; @@ -59,17 +56,14 @@ class LocalShard implements Shard { private volatile Instant lastWriteTime; public LocalShard(final ByteBufferFactory byteBufferFactory, - final PlanBDocCache planBDocCache, final Provider configProvider, final StatePaths statePaths, - final String mapName) { + final PlanBDoc doc) { this.byteBufferFactory = byteBufferFactory; - this.planBDocCache = planBDocCache; this.configProvider = configProvider; - this.statePaths = statePaths; - this.mapName = mapName; + this.doc = doc; lastWriteTime = Instant.now(); - this.shardDir = statePaths.getShardDir().resolve(mapName); + this.shardDir = statePaths.getShardDir().resolve(doc.getUuid()); } private void incrementUseCount() { @@ -106,6 +100,20 @@ private void decrementUseCount() { } } + @Override + public void delete() { + lock.lock(); + try { + if (useCount.get() == 0) { + LOGGER.info(() -> "Deleting data for: " + doc); + cleanup(); + FileUtil.deleteDir(shardDir); + } + } finally { + lock.unlock(); + } + } + @Override public void merge(final Path sourceDir) { boolean success = false; @@ -141,19 +149,39 @@ public void merge(final Path sourceDir) { } @Override - public void condense() { + public void condense(final PlanBDoc doc) { try { - final PlanBDoc doc = getDoc(); - if (doc != null && doc.isCondense()) { + // Find out how old data needs to be before we condense it. + final long condenseBeforeMs; + if (doc.isCondense()) { final SimpleDuration duration = SimpleDuration .builder() .time(doc.getCondenseAge()) .timeUnit(doc.getCondenseTimeUnit()) .build(); - final Instant maxAge = SimpleDurationUtil.minus(Instant.now(), duration); + condenseBeforeMs = SimpleDurationUtil.minus(Instant.now(), duration).toEpochMilli(); + } else { + condenseBeforeMs = 0; + } + + // Find out how old data needs to be before we delete it. + final long deleteBeforeMs; + if (!doc.isRetainForever()) { + final SimpleDuration duration = SimpleDuration + .builder() + .time(doc.getRetainAge()) + .timeUnit(doc.getRetainTimeUnit()) + .build(); + deleteBeforeMs = SimpleDurationUtil.minus(Instant.now(), duration).toEpochMilli(); + } else { + deleteBeforeMs = 0; + } + + // If we are condensing or deleting data then do so. + if (condenseBeforeMs > 0 || deleteBeforeMs > 0) { incrementUseCount(); try { - db.condense(maxAge); + db.condense(condenseBeforeMs, deleteBeforeMs); } finally { decrementUseCount(); } @@ -256,31 +284,16 @@ private boolean isIdle() { configProvider.get().getMinTimeToKeepEnvOpen().getDuration())); } - private PlanBDoc getDoc() { - if (doc == null) { - doc = planBDocCache.get(mapName); - if (doc == null) { - LOGGER.warn(() -> "No PlanB doc found for '" + mapName + "'"); - throw new RuntimeException("No PlanB doc found for '" + mapName + "'"); - } - } - return doc; - } - private void open() { - final PlanBDoc doc = getDoc(); - final String mapName = doc.getName(); - - final Path shardDir = statePaths.getShardDir().resolve(mapName); if (Files.exists(shardDir)) { - LOGGER.info(() -> "Found local shard for '" + mapName + "'"); + LOGGER.info(() -> "Found local shard for '" + doc + "'"); db = openDb(doc, shardDir); } else { // If this node is supposed to be a node that stores shards, but it doesn't have it, then error. final String message = "Local Plan B shard not found for '" + - mapName + + doc + "'"; LOGGER.error(() -> message); throw new RuntimeException(message); @@ -308,4 +321,9 @@ private void open() { default -> throw new RuntimeException("Unexpected state type: " + doc.getStateType()); } } + + @Override + public PlanBDoc getDoc() { + return doc; + } } diff --git a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/MergeProcessor.java b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/MergeProcessor.java index e6008a9aa1..10182f5e36 100644 --- a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/MergeProcessor.java +++ b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/MergeProcessor.java @@ -17,26 +17,24 @@ import java.io.UncheckedIOException; import java.nio.file.Files; import java.nio.file.Path; -import java.time.Duration; -import java.time.Instant; import java.util.UUID; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Stream; @Singleton public class MergeProcessor { - public static final String TASK_NAME = "Plan B Merge Processor"; + public static final String MERGE_TASK_NAME = "Plan B Merge Processor"; + public static final String MAINTAIN_TASK_NAME = "Plan B Maintenance Processor"; private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(MergeProcessor.class); - private static final Duration MIN_CONDENSE_FREQUENCY = Duration.ofMinutes(10); - private final SequentialFileStore fileStore; private final Path mergingDir; private final SecurityContext securityContext; private final TaskContextFactory taskContextFactory; private final ShardManager shardManager; - private Instant nextCondenseTime; + private final ReentrantLock maintenanceLock = new ReentrantLock(); @Inject public MergeProcessor(final SequentialFileStore fileStore, @@ -55,8 +53,6 @@ public MergeProcessor(final SequentialFileStore fileStore, throw new RuntimeException("Unable to delete contents of: " + FileUtil.getCanonicalPath(mergingDir)); } } - - nextCondenseTime = Instant.now().plus(MIN_CONDENSE_FREQUENCY); } private boolean ensureDirExists(final Path path) { @@ -73,7 +69,7 @@ private boolean ensureDirExists(final Path path) { return false; } - public void exec() { + public void merge() { securityContext.asProcessingUser(() -> { final TaskContext taskContext = taskContextFactory.current(); try { @@ -93,19 +89,17 @@ public void exec() { // Wait until new data is available. final long currentStoreId = storeId; taskContext.info(() -> "Waiting for data..."); - final SequentialFile sequentialFile = fileStore.awaitNew(currentStoreId); - taskContext.info(() -> "Merging data: " + currentStoreId); - merge(sequentialFile); - - // Periodically we will condense all shards. - // This is done here so that the same thread is always used for writing. - if (nextCondenseTime.isBefore(Instant.now())) { - shardManager.condenseAll(); - nextCondenseTime = Instant.now().plus(MIN_CONDENSE_FREQUENCY); - } + final SequentialFile sequentialFile = fileStore.awaitNext(currentStoreId); + maintenanceLock.lock(); + try { + taskContext.info(() -> "Merging data: " + currentStoreId); + merge(sequentialFile); - // Increment store id. - storeId++; + // Increment store id. + storeId++; + } finally { + maintenanceLock.unlock(); + } } } catch (final IOException e) { throw new UncheckedIOException(e); @@ -113,19 +107,30 @@ public void exec() { }); } + public void maintainShards() { + securityContext.asProcessingUser(() -> { + maintenanceLock.lock(); + try { + shardManager.condenseAll(); + } finally { + maintenanceLock.unlock(); + } + }); + } + public void mergeCurrent() throws IOException { final long start = fileStore.getMinStoreId(); final long end = fileStore.getMaxStoreId(); for (long storeId = start; storeId <= end; storeId++) { // Wait until new data is available. - final SequentialFile sequentialFile = fileStore.awaitNew(storeId); + final SequentialFile sequentialFile = fileStore.awaitNext(storeId); merge(sequentialFile); } } public void merge(final long storeId) throws IOException { // Wait until new data is available. - final SequentialFile sequentialFile = fileStore.awaitNew(storeId); + final SequentialFile sequentialFile = fileStore.awaitNext(storeId); merge(sequentialFile); } diff --git a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/SequentialFile.java b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/SequentialFile.java index 5f29216975..af372341c6 100644 --- a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/SequentialFile.java +++ b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/SequentialFile.java @@ -83,10 +83,6 @@ public Path getZip() { return zip; } - public String getZipFileName() { - return root.relativize(zip).toString(); - } - public void delete() throws IOException { LOGGER.debug(() -> "Deleting: " + FileUtil.getCanonicalPath(zip)); Files.deleteIfExists(zip); diff --git a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/SequentialFileStore.java b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/SequentialFileStore.java index 380e658a77..c97d79f53d 100644 --- a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/SequentialFileStore.java +++ b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/SequentialFileStore.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -78,10 +79,7 @@ public SequentialFile createTemp() { return getTempFileSet(currentStoreId); } - /** - * Add sources to the DB. - */ - public SequentialFile awaitNew(final long storeId) { + public SequentialFile awaitNext(final long storeId) { try { lock.lockInterruptibly(); try { @@ -94,11 +92,35 @@ public SequentialFile awaitNew(final long storeId) { lock.unlock(); } } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); throw UncheckedInterruptedException.create(e); } return getStoreFileSet(storeId); } +// public Optional awaitNext(final long storeId, +// final long time, +// final TimeUnit timeUnit) { +// try { +// lock.lockInterruptibly(); +// try { +// long currentStoreId = addedStoreId.get(); +// while (currentStoreId < storeId) { +// if (!condition.await(time,timeUnit)) { +// return Optional.empty(); +// } +// currentStoreId = addedStoreId.get(); +// } +// } finally { +// lock.unlock(); +// } +// } catch (final InterruptedException e) { +// Thread.currentThread().interrupt(); +// throw UncheckedInterruptedException.create(e); +// } +// return Optional.of(getStoreFileSet(storeId)); +// } + private SequentialFile getTempFileSet(final long storeId) { return SequentialFile.get(receiveDir, storeId, true); } diff --git a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/Shard.java b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/Shard.java index 9a67d146c0..f9083380fa 100644 --- a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/Shard.java +++ b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/Shard.java @@ -1,6 +1,7 @@ package stroom.planb.impl.data; import stroom.planb.impl.db.AbstractLmdb; +import stroom.planb.shared.PlanBDoc; import java.io.OutputStream; import java.nio.file.Path; @@ -20,7 +21,7 @@ interface Shard { /** * Condense data in the shard. */ - void condense(); + void condense(PlanBDoc doc); /** * Determine if we are allowed to create a snapshot or if the snapshot we have is already the latest. @@ -50,4 +51,16 @@ interface Shard { * Close the DB if it isn't currently in use for read or write. */ void cleanup(); + + /** + * Delete the DB if the associated doc has been deleted. + */ + void delete(); + + /** + * Get the Plan B doc associated with this shard. + * + * @return The Plan B doc associated with this shard. + */ + PlanBDoc getDoc(); } diff --git a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/ShardManager.java b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/ShardManager.java index ebec27b133..6fcab61ada 100644 --- a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/ShardManager.java +++ b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/ShardManager.java @@ -1,13 +1,18 @@ package stroom.planb.impl.data; import stroom.bytebuffer.impl6.ByteBufferFactory; +import stroom.docref.DocRef; import stroom.node.api.NodeInfo; import stroom.planb.impl.PlanBConfig; import stroom.planb.impl.PlanBDocCache; +import stroom.planb.impl.PlanBDocStore; import stroom.planb.impl.db.AbstractLmdb; import stroom.planb.impl.db.StatePaths; +import stroom.planb.shared.PlanBDoc; import stroom.util.NullSafe; import stroom.util.io.FileUtil; +import stroom.util.logging.LambdaLogger; +import stroom.util.logging.LambdaLoggerFactory; import jakarta.inject.Inject; import jakarta.inject.Provider; @@ -24,10 +29,13 @@ @Singleton public class ShardManager { + private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(ShardManager.class); + public static final String CLEANUP_TASK_NAME = "Plan B Cleanup"; private final ByteBufferFactory byteBufferFactory; private final PlanBDocCache planBDocCache; + private final PlanBDocStore planBDocStore; private final Map shardMap = new ConcurrentHashMap<>(); private final NodeInfo nodeInfo; private final Provider configProvider; @@ -37,12 +45,14 @@ public class ShardManager { @Inject public ShardManager(final ByteBufferFactory byteBufferFactory, final PlanBDocCache planBDocCache, + final PlanBDocStore planBDocStore, final NodeInfo nodeInfo, final Provider configProvider, final StatePaths statePaths, final FileTransferClient fileTransferClient) { this.byteBufferFactory = byteBufferFactory; this.planBDocCache = planBDocCache; + this.planBDocStore = planBDocStore; this.nodeInfo = nodeInfo; this.configProvider = configProvider; this.statePaths = statePaths; @@ -60,27 +70,37 @@ private boolean isSnapshotNode() { } public void merge(final Path sourceDir) throws IOException { - final String mapName = sourceDir.getFileName().toString(); - final Shard shard = getShard(mapName); + final String docUuid = sourceDir.getFileName().toString(); + final Shard shard = getShardForDocUuid(docUuid); shard.merge(sourceDir); } public void condenseAll() { - shardMap.values().forEach(shard -> shard.condense()); + shardMap.values().forEach(shard -> { + final PlanBDoc doc = shard.getDoc(); + final PlanBDoc loaded = planBDocStore.readDocument(doc.asDocRef()); + // If we can't get the doc then we must have deleted it so delete the shard. + if (loaded == null) { + shard.delete(); + shardMap.remove(shard.getDoc().getUuid()); + } else { + shard.condense(loaded); + } + }); } public void checkSnapshotStatus(final SnapshotRequest request) { - final Shard shard = getShard(request.getMapName()); + final Shard shard = getShardForDocUuid(request.getPlanBDocRef().getUuid()); shard.checkSnapshotStatus(request); } public void createSnapshot(final SnapshotRequest request, final OutputStream outputStream) { - final Shard shard = getShard(request.getMapName()); + final Shard shard = getShardForDocUuid(request.getPlanBDocRef().getUuid()); shard.createSnapshot(request, outputStream); } public R get(final String mapName, final Function, R> function) { - final Shard shard = getShard(mapName); + final Shard shard = getShardForMapName(mapName); return shard.get(function); } @@ -88,24 +108,38 @@ public void cleanup() { shardMap.values().forEach(Shard::cleanup); } - private Shard getShard(final String mapName) { - return shardMap.computeIfAbsent(mapName, this::createShard); + private Shard getShardForMapName(final String mapName) { + final PlanBDoc doc = planBDocCache.get(mapName); + if (doc == null) { + LOGGER.warn(() -> "No PlanB doc found for '" + mapName + "'"); + throw new RuntimeException("No PlanB doc found for '" + mapName + "'"); + } + return shardMap.computeIfAbsent(doc.getUuid(), k -> createShard(doc)); + } + + private Shard getShardForDocUuid(final String docUuid) { + return shardMap.computeIfAbsent(docUuid, k -> { + final PlanBDoc doc = planBDocStore.readDocument(DocRef.builder().type(PlanBDoc.TYPE).uuid(k).build()); + if (doc == null) { + LOGGER.warn(() -> "No PlanB doc found for UUID '" + docUuid + "'"); + throw new RuntimeException("No PlanB doc found for UUID '" + docUuid + "'"); + } + return createShard(doc); + }); } - private Shard createShard(final String mapName) { + private Shard createShard(final PlanBDoc doc) { if (isSnapshotNode()) { return new SnapshotShard(byteBufferFactory, - planBDocCache, configProvider, statePaths, fileTransferClient, - mapName); + doc); } return new LocalShard( byteBufferFactory, - planBDocCache, configProvider, statePaths, - mapName); + doc); } } diff --git a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/SnapshotRequest.java b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/SnapshotRequest.java index f7fbb9661f..df4b6c9229 100644 --- a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/SnapshotRequest.java +++ b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/SnapshotRequest.java @@ -1,5 +1,7 @@ package stroom.planb.impl.data; +import stroom.docref.DocRef; + import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude.Include; @@ -11,23 +13,23 @@ public class SnapshotRequest { @JsonProperty - private final String mapName; + private final DocRef planBDocRef; @JsonProperty private final long effectiveTime; @JsonProperty private final Long currentSnapshotTime; @JsonCreator - public SnapshotRequest(@JsonProperty("mapName") final String mapName, + public SnapshotRequest(@JsonProperty("planBDocRef") final DocRef planBDocRef, @JsonProperty("effectiveTime") final long effectiveTime, @JsonProperty("currentSnapshotTime")final Long currentSnapshotTime) { - this.mapName = mapName; + this.planBDocRef = planBDocRef; this.effectiveTime = effectiveTime; this.currentSnapshotTime = currentSnapshotTime; } - public String getMapName() { - return mapName; + public DocRef getPlanBDocRef() { + return planBDocRef; } public long getEffectiveTime() { diff --git a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/SnapshotShard.java b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/SnapshotShard.java index 8c60f00979..375576c82e 100644 --- a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/SnapshotShard.java +++ b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/SnapshotShard.java @@ -2,7 +2,6 @@ import stroom.bytebuffer.impl6.ByteBufferFactory; import stroom.planb.impl.PlanBConfig; -import stroom.planb.impl.PlanBDocCache; import stroom.planb.impl.db.AbstractLmdb; import stroom.planb.impl.db.RangedStateDb; import stroom.planb.impl.db.SessionDb; @@ -34,36 +33,32 @@ class SnapshotShard implements Shard { private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(SnapshotShard.class); private final ByteBufferFactory byteBufferFactory; - private final PlanBDocCache planBDocCache; private final Provider configProvider; private final StatePaths statePaths; private final FileTransferClient fileTransferClient; - private final String mapName; + private final PlanBDoc doc; private final ReentrantLock lock = new ReentrantLock(); private volatile SnapshotInstance snapshotInstance; public SnapshotShard(final ByteBufferFactory byteBufferFactory, - final PlanBDocCache planBDocCache, final Provider configProvider, final StatePaths statePaths, final FileTransferClient fileTransferClient, - final String mapName) { + final PlanBDoc doc) { this.byteBufferFactory = byteBufferFactory; - this.planBDocCache = planBDocCache; this.configProvider = configProvider; this.statePaths = statePaths; this.fileTransferClient = fileTransferClient; - this.mapName = mapName; + this.doc = doc; snapshotInstance = new SnapshotInstance( byteBufferFactory, - planBDocCache, configProvider, statePaths, fileTransferClient, - mapName, + doc, Instant.now(), null); } @@ -82,11 +77,10 @@ private SnapshotInstance getDBInstance() { if (currentInstance.getExpiryTime().isBefore(Instant.now())) { final SnapshotInstance newInstance = new SnapshotInstance( byteBufferFactory, - planBDocCache, configProvider, statePaths, fileTransferClient, - mapName, + doc, Instant.now(), currentInstance.getCurrentSnapshotTime()); @@ -119,7 +113,7 @@ public void merge(final Path sourceDir) { } @Override - public void condense() { + public void condense(final PlanBDoc doc) { // Condense is not supported on snapshots } @@ -157,11 +151,15 @@ public void cleanup() { getDBInstance().cleanup(); } + @Override + public void delete() { + getDBInstance().destroy(); + } + private static class SnapshotInstance { private final ByteBufferFactory byteBufferFactory; private final Provider configProvider; - private final String mapName; private final PlanBDoc doc; private final Path dbDir; private final RuntimeException fetchException; @@ -176,30 +174,22 @@ private static class SnapshotInstance { private volatile boolean destroy; public SnapshotInstance(final ByteBufferFactory byteBufferFactory, - final PlanBDocCache planBDocCache, final Provider configProvider, final StatePaths statePaths, final FileTransferClient fileTransferClient, - final String mapName, + final PlanBDoc doc, final Instant createTime, final Instant previousSnapshotTime) { Instant currentSnapshotTime = null; Instant expiryTime = null; - PlanBDoc doc = null; Path dbDir = null; RuntimeException fetchException = null; try { - doc = planBDocCache.get(mapName); - if (doc == null) { - LOGGER.warn(() -> "No PlanB doc found for '" + mapName + "'"); - throw new RuntimeException("No PlanB doc found for '" + mapName + "'"); - } - // Get the snapshot dir. dbDir = statePaths .getSnapshotDir() - .resolve(mapName) + .resolve(doc.getUuid()) .resolve(DateUtil.createFileDateTimeString(createTime)); // Create dir. @@ -208,11 +198,11 @@ public SnapshotInstance(final ByteBufferFactory byteBufferFactory, // Go and get a snapshot. boolean fetchComplete = false; final SnapshotRequest request = new SnapshotRequest( - mapName, + doc.asDocRef(), 0L, NullSafe.get(previousSnapshotTime, Instant::toEpochMilli)); for (final String node : configProvider.get().getNodeList()) { - LOGGER.info(() -> "Fetching shard for '" + mapName + "'"); + LOGGER.info(() -> "Fetching shard for '" + doc + "'"); // Fetch snapshot. currentSnapshotTime = fileTransferClient.fetchSnapshot(node, request, dbDir); @@ -225,7 +215,7 @@ public SnapshotInstance(final ByteBufferFactory byteBufferFactory, } if (!fetchComplete) { - throw new RuntimeException("Unable to get snapshot shard for '" + mapName + "'"); + throw new RuntimeException("Unable to get snapshot shard for '" + doc + "'"); } } catch (final Exception e) { LOGGER.warn(e::getMessage, e); @@ -236,7 +226,6 @@ public SnapshotInstance(final ByteBufferFactory byteBufferFactory, this.byteBufferFactory = byteBufferFactory; this.configProvider = configProvider; - this.mapName = mapName; this.currentSnapshotTime = currentSnapshotTime; this.expiryTime = expiryTime; this.doc = doc; @@ -322,7 +311,7 @@ private void cleanup() { if (!open && destroy) { // Delete if this is an old snapshot. try { - LOGGER.info(() -> "Deleting snapshot for '" + mapName + "'"); + LOGGER.info(() -> "Deleting snapshot for '" + doc + "'"); FileUtil.deleteDir(dbDir); } catch (final Exception e) { LOGGER.error(e::getMessage, e); @@ -382,6 +371,11 @@ private void open() { } } + @Override + public PlanBDoc getDoc() { + return doc; + } + private static class DestroyedException extends Exception { } diff --git a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/TempStore.java b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/TempStore.java new file mode 100644 index 0000000000..408bb35738 --- /dev/null +++ b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/data/TempStore.java @@ -0,0 +1,100 @@ +package stroom.planb.impl.data; + +import stroom.planb.impl.db.StatePaths; +import stroom.util.concurrent.UncheckedInterruptedException; +import stroom.util.io.FileUtil; +import stroom.util.logging.LambdaLogger; +import stroom.util.logging.LambdaLoggerFactory; + +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.stream.Stream; + +@Singleton +public class TempStore { + + private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(TempStore.class); + + private final Path receiveDir; + private final AtomicLong tempId = new AtomicLong(); + + @Inject + public TempStore(final StatePaths statePaths) { + + // Create the root directory + ensureDirExists(statePaths.getRootDir()); + + // Create the receive directory. + receiveDir = statePaths.getReceiveDir(); + if (ensureDirExists(receiveDir)) { + if (!FileUtil.deleteContents(receiveDir)) { + throw new RuntimeException("Unable to delete contents of: " + FileUtil.getCanonicalPath(receiveDir)); + } + } + } + + public SequentialFile createTemp() { + final long currentStoreId = tempId.incrementAndGet(); + return getTempFileSet(currentStoreId); + } + + private SequentialFile getTempFileSet(final long storeId) { + return SequentialFile.get(receiveDir, storeId, true); + } + + + private boolean ensureDirExists(final Path path) { + if (Files.isDirectory(path)) { + return true; + } + + try { + Files.createDirectories(path); + } catch (final IOException e) { + throw new UncheckedIOException(e); + } + + return false; + } + + private record NumericFile(Path dir, long num) { + + } + + private static class NumericFileTest implements Consumer { + + private final Comparator comparator; + private NumericFile current; + + public NumericFileTest(final Comparator comparator) { + this.comparator = comparator; + } + + @Override + public void accept(final NumericFile numericFile) { + if (current == null || comparator.compare(numericFile.num, current.num) > 0) { + current = numericFile; + } + } + + public Optional get() { + return Optional.ofNullable(current); + } + } +} diff --git a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/AbstractLmdb.java b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/AbstractLmdb.java index cffed2fec3..acca15b275 100644 --- a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/AbstractLmdb.java +++ b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/AbstractLmdb.java @@ -212,7 +212,8 @@ public void merge(final Path source) { FileUtil.deleteDir(source); } - public void condense(Instant maxAge) { + public void condense(final long condenseBeforeMs, + final long deleteBeforeMs) { // Don't condense by default. } diff --git a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/RangedStateFields.java b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/RangedStateFields.java index ddc8d14cbf..848182d892 100644 --- a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/RangedStateFields.java +++ b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/RangedStateFields.java @@ -13,7 +13,7 @@ public interface RangedStateFields { String VALUE = "Value"; QueryField KEY_START_FIELD = QueryField.createLong(KEY_START); - QueryField KEY_END_FIELD = QueryField.createText(KEY_END); + QueryField KEY_END_FIELD = QueryField.createLong(KEY_END); QueryField VALUE_TYPE_FIELD = QueryField.createText(VALUE_TYPE, false); QueryField VALUE_FIELD = QueryField.createText(VALUE, false); diff --git a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/SessionDb.java b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/SessionDb.java index 0d2c634ac9..6e6ff6f727 100644 --- a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/SessionDb.java +++ b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/SessionDb.java @@ -19,7 +19,6 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.nio.file.Path; -import java.time.Instant; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; @@ -204,9 +203,8 @@ public Optional getState(final SessionRequest request) { // TODO: Note that LMDB does not free disk space just because you delete entries, instead it just fees pages for // reuse. We might want to create a new compacted instance instead of deleting in place. @Override - public void condense(final Instant maxAge) { - final long maxTime = maxAge.toEpochMilli(); - + public void condense(final long condenseBeforeMs, + final long deleteBeforeMs) { write(writer -> { Session lastSession = null; Session newSession = null; @@ -218,29 +216,36 @@ public void condense(final Instant maxAge) { final KeyVal keyVal = iterator.next(); final Session session = serde.getKey(keyVal); - if (lastSession != null && - Arrays.equals(lastSession.key(), session.key()) && - session.start() < maxTime && - lastSession.end() >= session.start()) { - - // Extend the session. - newSession = new Session(lastSession.key(), lastSession.start(), session.end()); + if (session.end() <= deleteBeforeMs) { + // If this is data we no longer want to retain then delete it. + dbi.delete(writer.getWriteTxn(), keyVal.key(), keyVal.val()); + writer.tryCommit(); - // Delete the previous session as we are extending it. - serde.createKeyByteBuffer(lastSession, keyByteBuffer -> { - dbi.delete(writer.getWriteTxn(), keyByteBuffer); - writer.tryCommit(); - return null; - }); } else { - // Insert new session. - if (newSession != null) { - insert(writer, newSession, newSession); - newSession = null; + if (lastSession != null && + Arrays.equals(lastSession.key(), session.key()) && + session.start() < condenseBeforeMs && + lastSession.end() >= session.start()) { + + // Extend the session. + newSession = new Session(lastSession.key(), lastSession.start(), session.end()); + + // Delete the previous session as we are extending it. + serde.createKeyByteBuffer(lastSession, keyByteBuffer -> { + dbi.delete(writer.getWriteTxn(), keyByteBuffer); + writer.tryCommit(); + return null; + }); + } else { + // Insert new session. + if (newSession != null) { + insert(writer, newSession, newSession); + newSession = null; + } } - } - lastSession = session; + lastSession = session; + } } } @@ -249,7 +254,7 @@ public void condense(final Instant maxAge) { // Delete the last session if it will be merged into the new one. if (lastSession != null && Arrays.equals(lastSession.key(), newSession.key()) && - newSession.start() < maxTime && + newSession.start() < condenseBeforeMs && lastSession.end() >= newSession.start()) { // Delete the previous session as we are extending it. diff --git a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/SessionFields.java b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/SessionFields.java index e7cf674ab4..16ed546585 100644 --- a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/SessionFields.java +++ b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/SessionFields.java @@ -11,22 +11,18 @@ public interface SessionFields { String KEY = "Key"; String START = "Start"; String END = "End"; - String TERMINAL = "Terminal"; QueryField KEY_FIELD = QueryField.createText(KEY); QueryField START_FIELD = QueryField.createDate(START); QueryField END_FIELD = QueryField.createDate(END); - QueryField TERMINAL_FIELD = QueryField.createText(TERMINAL, false); List FIELDS = Arrays.asList( KEY_FIELD, START_FIELD, - END_FIELD, - TERMINAL_FIELD); + END_FIELD); Map FIELD_MAP = Map.of( KEY, KEY_FIELD, START, START_FIELD, - END, END_FIELD, - TERMINAL, TERMINAL_FIELD); + END, END_FIELD); } diff --git a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/SessionSerde.java b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/SessionSerde.java index c653e400ab..e5356e572f 100644 --- a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/SessionSerde.java +++ b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/SessionSerde.java @@ -15,7 +15,7 @@ import java.util.function.Predicate; /** - * KEY = + * KEY = * VALUE = */ public class SessionSerde implements Serde { diff --git a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/ShardWriters.java b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/ShardWriters.java index 1369243bb3..8927ca3683 100644 --- a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/ShardWriters.java +++ b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/ShardWriters.java @@ -8,7 +8,6 @@ import stroom.planb.impl.data.FileHashUtil; import stroom.planb.impl.data.FileTransferClient; import stroom.planb.shared.PlanBDoc; -import stroom.planb.shared.StateType; import stroom.util.NullSafe; import stroom.util.io.FileUtil; import stroom.util.logging.LambdaLogger; @@ -66,7 +65,7 @@ public static class ShardWriter implements AutoCloseable { private final FileTransferClient fileTransferClient; private final Path dir; private final Meta meta; - private final Map writers = new HashMap<>(); + private final Map writers = new HashMap<>(); private final Map> stateDocMap = new HashMap<>(); private final boolean overwrite = true; @@ -82,7 +81,7 @@ public ShardWriter(final PlanBDocCache planBDocCache, this.meta = meta; } - public Optional getStateType(final String mapName, final Consumer errorConsumer) { + public Optional getDoc(final String mapName, final Consumer errorConsumer) { if (NullSafe.isBlankString(mapName)) { errorConsumer.accept("Null map key"); return Optional.empty(); @@ -105,7 +104,7 @@ public Optional getStateType(final String mapName, final Consumer new WriterInstance( + final WriterInstance writer = writers.computeIfAbsent(doc, k -> new WriterInstance( new StateDb(getLmdbEnvDir(k), byteBufferFactory, overwrite, false))); writer.addState(state); } - public void addTemporalState(final String mapName, + public void addTemporalState(final PlanBDoc doc, final TemporalState temporalState) { - final WriterInstance writer = writers.computeIfAbsent(mapName, k -> new WriterInstance( + final WriterInstance writer = writers.computeIfAbsent(doc, k -> new WriterInstance( new TemporalStateDb(getLmdbEnvDir(k), byteBufferFactory, overwrite, false))); writer.addTemporalState(temporalState); } - public void addRangedState(final String mapName, + public void addRangedState(final PlanBDoc doc, final RangedState rangedState) { - final WriterInstance writer = writers.computeIfAbsent(mapName, k -> new WriterInstance( + final WriterInstance writer = writers.computeIfAbsent(doc, k -> new WriterInstance( new RangedStateDb(getLmdbEnvDir(k), byteBufferFactory, overwrite, false))); writer.addRangedState(rangedState); } - public void addTemporalRangedState(final String mapName, + public void addTemporalRangedState(final PlanBDoc doc, final TemporalRangedState temporalRangedState) { - final WriterInstance writer = writers.computeIfAbsent(mapName, k -> new WriterInstance( + final WriterInstance writer = writers.computeIfAbsent(doc, k -> new WriterInstance( new TemporalRangedStateDb(getLmdbEnvDir(k), byteBufferFactory, overwrite, false))); writer.addTemporalRangedState(temporalRangedState); } - public void addSession(final String mapName, + public void addSession(final PlanBDoc doc, final Session session) { - final WriterInstance writer = writers.computeIfAbsent(mapName, k -> new WriterInstance( + final WriterInstance writer = writers.computeIfAbsent(doc, k -> new WriterInstance( new SessionDb(getLmdbEnvDir(k), byteBufferFactory, overwrite, false))); writer.addSession(session); } - private Path getLmdbEnvDir(final String name) { + private Path getLmdbEnvDir(final PlanBDoc doc) { try { - final Path path = dir.resolve(name); + final Path path = dir.resolve(doc.getUuid()); Files.createDirectory(path); return path; } catch (final IOException e) { diff --git a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/TemporalRangedStateDb.java b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/TemporalRangedStateDb.java index d9fec36b54..2eaf77d585 100644 --- a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/TemporalRangedStateDb.java +++ b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/TemporalRangedStateDb.java @@ -9,7 +9,6 @@ import java.nio.ByteBuffer; import java.nio.file.Path; -import java.time.Instant; import java.util.Iterator; import java.util.Optional; @@ -70,9 +69,8 @@ public Optional getState(final TemporalRangedStateRequest r // TODO: Note that LMDB does not free disk space just because you delete entries, instead it just fees pages for // reuse. We might want to create a new compacted instance instead of deleting in place. @Override - public void condense(final Instant maxAge) { - final long maxTime = maxAge.toEpochMilli(); - + public void condense(final long condenseBeforeMs, + final long deleteBeforeMs) { write(writer -> { Key lastKey = null; StateValue lastValue = null; @@ -84,19 +82,26 @@ public void condense(final Instant maxAge) { final Key key = serde.getKey(keyVal); final StateValue value = serde.getVal(keyVal); - if (lastKey != null && - lastKey.keyStart() == key.keyStart() && - lastKey.keyEnd() == key.keyEnd() && - lastValue.byteBuffer().equals(value.byteBuffer())) { - if (key.effectiveTime() <= maxTime) { - // If the key and value are the same then delete the duplicate entry. - dbi.delete(writer.getWriteTxn(), keyVal.key(), keyVal.val()); - writer.tryCommit(); + if (key.effectiveTime() <= deleteBeforeMs) { + // If this is data we no longer want to retain then delete it. + dbi.delete(writer.getWriteTxn(), keyVal.key(), keyVal.val()); + writer.tryCommit(); + + } else { + if (lastKey != null && + lastKey.keyStart() == key.keyStart() && + lastKey.keyEnd() == key.keyEnd() && + lastValue.byteBuffer().equals(value.byteBuffer())) { + if (key.effectiveTime() <= condenseBeforeMs) { + // If the key and value are the same then delete the duplicate entry. + dbi.delete(writer.getWriteTxn(), keyVal.key(), keyVal.val()); + writer.tryCommit(); + } } - } - lastKey = key; - lastValue = value; + lastKey = key; + lastValue = value; + } } } }); diff --git a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/TemporalRangedStateFields.java b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/TemporalRangedStateFields.java index dc7237df52..bde9171df3 100644 --- a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/TemporalRangedStateFields.java +++ b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/TemporalRangedStateFields.java @@ -15,7 +15,7 @@ public interface TemporalRangedStateFields { String VALUE = "Value"; QueryField KEY_START_FIELD = QueryField.createLong(KEY_START); - QueryField KEY_END_FIELD = QueryField.createText(KEY_END); + QueryField KEY_END_FIELD = QueryField.createLong(KEY_END); QueryField EFFECTIVE_TIME_FIELD = QueryField.createDate(EFFECTIVE_TIME); QueryField VALUE_TYPE_FIELD = QueryField.createText(VALUE_TYPE, false); QueryField VALUE_FIELD = QueryField.createText(VALUE, false); diff --git a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/TemporalStateDb.java b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/TemporalStateDb.java index 4aafd52e5f..5a57e616fa 100644 --- a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/TemporalStateDb.java +++ b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/db/TemporalStateDb.java @@ -10,7 +10,6 @@ import java.nio.ByteBuffer; import java.nio.file.Path; -import java.time.Instant; import java.util.Arrays; import java.util.Iterator; import java.util.Optional; @@ -77,9 +76,8 @@ public Optional getState(final TemporalStateRequest request) { // TODO: Note that LMDB does not free disk space just because you delete entries, instead it just fees pages for // reuse. We might want to create a new compacted instance instead of deleting in place. @Override - public void condense(final Instant maxAge) { - final long maxTime = maxAge.toEpochMilli(); - + public void condense(final long condenseBeforeMs, + final long deleteBeforeMs) { write(writer -> { Key lastKey = null; StateValue lastValue = null; @@ -91,18 +89,25 @@ public void condense(final Instant maxAge) { final Key key = serde.getKey(keyVal); final StateValue value = serde.getVal(keyVal); - if (lastKey != null && - Arrays.equals(lastKey.bytes(), key.bytes()) && - lastValue.byteBuffer().equals(value.byteBuffer())) { - if (key.effectiveTime() <= maxTime) { - // If the key and value are the same then delete the duplicate entry. - dbi.delete(writer.getWriteTxn(), keyVal.key(), keyVal.val()); - writer.tryCommit(); + if (key.effectiveTime() <= deleteBeforeMs) { + // If this is data we no longer want to retain then delete it. + dbi.delete(writer.getWriteTxn(), keyVal.key(), keyVal.val()); + writer.tryCommit(); + + } else { + if (lastKey != null && + Arrays.equals(lastKey.bytes(), key.bytes()) && + lastValue.byteBuffer().equals(value.byteBuffer())) { + if (key.effectiveTime() <= condenseBeforeMs) { + // If the key and value are the same then delete the duplicate entry. + dbi.delete(writer.getWriteTxn(), keyVal.key(), keyVal.val()); + writer.tryCommit(); + } } - } - lastKey = key; - lastValue = value; + lastKey = key; + lastValue = value; + } } } }); diff --git a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/pipeline/PlanBFilter.java b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/pipeline/PlanBFilter.java index 4f924fb901..a163a446c9 100644 --- a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/pipeline/PlanBFilter.java +++ b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/pipeline/PlanBFilter.java @@ -36,7 +36,7 @@ import stroom.planb.impl.db.StateValue; import stroom.planb.impl.db.TemporalRangedState; import stroom.planb.impl.db.TemporalState; -import stroom.planb.shared.StateType; +import stroom.planb.shared.PlanBDoc; import stroom.svg.shared.SvgImage; import stroom.util.CharBuffer; import stroom.util.NullSafe; @@ -533,11 +533,11 @@ private void handleValueEndElement() throws SAXException { } private void addData() { - final Optional optional = writer.getStateType(mapName, this::error); - optional.ifPresent(stateType -> { + final Optional optional = writer.getDoc(mapName, this::error); + optional.ifPresent(doc -> { // end of the ref data item so ensure it is persisted in the store try { - switch (stateType) { + switch (doc.getStateType()) { case STATE -> { if (key == null) { error(LogUtil.message("Key is null for {}", mapName)); @@ -552,7 +552,7 @@ private void addData() { .typeId(typeId) .byteBuffer(value) .build(); - writer.addState(mapName, new State(k, v)); + writer.addState(doc, new State(k, v)); } } case TEMPORAL_STATE -> { @@ -572,7 +572,7 @@ private void addData() { .typeId(typeId) .byteBuffer(value) .build(); - writer.addTemporalState(mapName, new TemporalState(k, v)); + writer.addTemporalState(doc, new TemporalState(k, v)); } } case RANGED_STATE -> { @@ -601,7 +601,7 @@ private void addData() { .typeId(typeId) .byteBuffer(value) .build(); - writer.addRangedState(mapName, new RangedState(k, v)); + writer.addRangedState(doc, new RangedState(k, v)); } } case TEMPORAL_RANGED_STATE -> { @@ -633,7 +633,7 @@ private void addData() { .typeId(typeId) .byteBuffer(value) .build(); - writer.addTemporalRangedState(mapName, new TemporalRangedState(k, v)); + writer.addTemporalRangedState(doc, new TemporalRangedState(k, v)); } } case SESSION -> { @@ -651,10 +651,10 @@ private void addData() { } LOGGER.trace("Putting session {} into table {}", key, mapName); - writer.addSession(mapName, sessionBuilder.build()); + writer.addSession(doc, sessionBuilder.build()); } } - default -> error("Unexpected state type: " + stateType); + default -> error("Unexpected state type: " + doc.getStateType()); } } catch (final BufferOverflowException boe) { final String msg = LogUtil.message("Value for key {} in map {} is too big for the buffer", diff --git a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/pipeline/PlanBLookupImpl.java b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/pipeline/PlanBLookupImpl.java index 54646f44c6..71b4e55cf1 100644 --- a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/pipeline/PlanBLookupImpl.java +++ b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/pipeline/PlanBLookupImpl.java @@ -21,6 +21,7 @@ import stroom.planb.impl.db.TemporalStateDb; import stroom.planb.impl.db.TemporalStateRequest; import stroom.planb.shared.PlanBDoc; +import stroom.security.api.SecurityContext; import stroom.util.pipeline.scope.PipelineScoped; import com.github.benmanes.caffeine.cache.Cache; @@ -42,12 +43,15 @@ public class PlanBLookupImpl implements PlanBLookup { private final Cache> cache; private final ShardManager shardManager; private final Map> stateDocMap = new HashMap<>(); + private final SecurityContext securityContext; @Inject public PlanBLookupImpl(final PlanBDocCache stateDocCache, - final ShardManager shardManager) { + final ShardManager shardManager, + final SecurityContext securityContext) { this.stateDocCache = stateDocCache; this.shardManager = shardManager; + this.securityContext = securityContext; cache = Caffeine.newBuilder().maximumSize(1000).build(); } @@ -67,7 +71,8 @@ private void getValue(final String mapName, final ReferenceDataResult result) { final String name = mapName.toLowerCase(Locale.ROOT); final Optional stateOptional = stateDocMap.computeIfAbsent(name, k -> - Optional.ofNullable(stateDocCache.get(name))); + securityContext.useAsReadResult(() -> + Optional.ofNullable(stateDocCache.get(name)))); stateOptional.ifPresent(stateDoc -> { final Key key = new Key(name, keyName, eventTime); final Optional optional = cache.get(key, diff --git a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/pipeline/StateProviderImpl.java b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/pipeline/StateProviderImpl.java index cf100867ed..c82e2c00a4 100644 --- a/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/pipeline/StateProviderImpl.java +++ b/stroom-state/stroom-planb-impl/src/main/java/stroom/planb/impl/pipeline/StateProviderImpl.java @@ -23,6 +23,7 @@ import stroom.query.language.functions.ValBoolean; import stroom.query.language.functions.ValNull; import stroom.query.language.functions.ValString; +import stroom.security.api.SecurityContext; import stroom.util.logging.LambdaLogger; import stroom.util.logging.LambdaLoggerFactory; @@ -32,9 +33,7 @@ import java.nio.charset.StandardCharsets; import java.time.Instant; -import java.util.HashMap; import java.util.Locale; -import java.util.Map; import java.util.Optional; public class StateProviderImpl implements StateProvider { @@ -43,21 +42,23 @@ public class StateProviderImpl implements StateProvider { private final PlanBDocCache stateDocCache; private final Cache cache; - private final Map> stateDocMap = new HashMap<>(); private final ShardManager shardManager; + private final SecurityContext securityContext; @Inject public StateProviderImpl(final PlanBDocCache stateDocCache, - final ShardManager shardManager) { + final ShardManager shardManager, + final SecurityContext securityContext) { this.stateDocCache = stateDocCache; this.shardManager = shardManager; + this.securityContext = securityContext; cache = Caffeine.newBuilder().maximumSize(1000).build(); } @Override public Val getState(final String mapName, final String keyName, final Instant effectiveTimeMs) { final String keyspace = mapName.toLowerCase(Locale.ROOT); - final Optional stateOptional = stateDocMap.computeIfAbsent(keyspace, k -> + final Optional stateOptional = securityContext.useAsReadResult(() -> Optional.ofNullable(stateDocCache.get(keyspace))); return stateOptional .map(stateDoc -> { diff --git a/stroom-state/stroom-planb-impl/src/test/java/stroom/planb/impl/data/TestFileTransferService.java b/stroom-state/stroom-planb-impl/src/test/java/stroom/planb/impl/data/TestFileTransferService.java index e29376e5eb..68ea022178 100644 --- a/stroom-state/stroom-planb-impl/src/test/java/stroom/planb/impl/data/TestFileTransferService.java +++ b/stroom-state/stroom-planb-impl/src/test/java/stroom/planb/impl/data/TestFileTransferService.java @@ -1,5 +1,7 @@ package stroom.planb.impl.data; +import stroom.docref.DocRef; +import stroom.planb.shared.PlanBDoc; import stroom.test.common.util.test.AbstractResourceTest; import stroom.util.io.StreamUtil; import stroom.util.zip.ZipUtil; @@ -77,7 +79,7 @@ void testFetchSnapshot() throws IOException { ZipUtil.zip(zipFile, sourceDir); final Path targetDir = Files.createTempDirectory("test"); - final String mapName = "TestMap"; + final DocRef planBDocRef = DocRef.builder().type(PlanBDoc.TYPE).uuid("TestUuid").name("TestMap").build(); final long requestTime = System.currentTimeMillis(); final FileTransferClientImpl fileTransferClient = new FileTransferClientImpl( null, @@ -99,7 +101,7 @@ void testFetchSnapshot() throws IOException { throw new NotModifiedException(); } - assertThat(request.getMapName()).isEqualTo(mapName); + assertThat(request.getPlanBDocRef()).isEqualTo(planBDocRef); assertThat(request.getEffectiveTime()).isEqualTo(requestTime); return null; }) @@ -110,7 +112,7 @@ void testFetchSnapshot() throws IOException { final SnapshotRequest request = invocation.getArgument(0); final OutputStream outputStream = invocation.getArgument(1); - assertThat(request.getMapName()).isEqualTo(mapName); + assertThat(request.getPlanBDocRef()).isEqualTo(planBDocRef); assertThat(request.getEffectiveTime()).isEqualTo(requestTime); outputStream.write(Files.readAllBytes(zipFile)); return null; @@ -119,7 +121,7 @@ void testFetchSnapshot() throws IOException { Mockito.any(SnapshotRequest.class), Mockito.any(OutputStream.class)); - final SnapshotRequest request = new SnapshotRequest(mapName, requestTime, null); + final SnapshotRequest request = new SnapshotRequest(planBDocRef, requestTime, null); final WebTarget webTarget = getWebTarget(FileTransferResource.FETCH_SNAPSHOT_PATH_PART); final Instant snapshotTime = fileTransferClient.fetchSnapshot(webTarget, request, targetDir); assertThat(snapshotTime).isBeforeOrEqualTo(Instant.now()); @@ -130,7 +132,7 @@ void testFetchSnapshot() throws IOException { // Test error if snapshot is up to date. assertThatThrownBy(() -> { - SnapshotRequest req = new SnapshotRequest(mapName, requestTime, lastWriteTime.toEpochMilli()); + SnapshotRequest req = new SnapshotRequest(planBDocRef, requestTime, lastWriteTime.toEpochMilli()); WebTarget wt = getWebTarget(FileTransferResource.FETCH_SNAPSHOT_PATH_PART); fileTransferClient.fetchSnapshot(wt, req, targetDir); }).isInstanceOf(NotModifiedException.class); diff --git a/stroom-state/stroom-planb-impl/src/test/java/stroom/planb/impl/data/TestSequentialFileStore.java b/stroom-state/stroom-planb-impl/src/test/java/stroom/planb/impl/data/TestSequentialFileStore.java index d9324e699e..06467384cf 100644 --- a/stroom-state/stroom-planb-impl/src/test/java/stroom/planb/impl/data/TestSequentialFileStore.java +++ b/stroom-state/stroom-planb-impl/src/test/java/stroom/planb/impl/data/TestSequentialFileStore.java @@ -36,7 +36,7 @@ void test() throws IOException { assertThat(fileStore.getMaxStoreId()).isEqualTo(9); long currentId = fileStore.getMinStoreId(); - SequentialFile sequentialFile = fileStore.awaitNew(currentId); + SequentialFile sequentialFile = fileStore.awaitNext(currentId); assertThat(fileStore.getMinStoreId()).isEqualTo(0); assertThat(fileStore.getMaxStoreId()).isEqualTo(9); @@ -48,7 +48,7 @@ void test() throws IOException { for (; currentId < 10; currentId++) { assertThat(fileStore.getMaxStoreId()).isEqualTo(9); - sequentialFile = fileStore.awaitNew(currentId); + sequentialFile = fileStore.awaitNext(currentId); sequentialFile.delete(); } diff --git a/stroom-state/stroom-planb-impl/src/test/java/stroom/planb/impl/db/TestSessionDb.java b/stroom-state/stroom-planb-impl/src/test/java/stroom/planb/impl/db/TestSessionDb.java index af2d31ef9c..1bea331337 100644 --- a/stroom-state/stroom-planb-impl/src/test/java/stroom/planb/impl/db/TestSessionDb.java +++ b/stroom-state/stroom-planb-impl/src/test/java/stroom/planb/impl/db/TestSessionDb.java @@ -201,7 +201,7 @@ void testMerge(@TempDir final Path rootDir) throws IOException { } @Test - void testCondense(@TempDir final Path rootDir) throws IOException { + void testCondenseAndDelete(@TempDir final Path rootDir) throws IOException { final Path dbPath = rootDir.resolve("db"); Files.createDirectory(dbPath); @@ -210,8 +210,10 @@ void testCondense(@TempDir final Path rootDir) throws IOException { final ByteBufferFactory byteBufferFactory = new ByteBufferFactoryImpl(); try (final SessionDb db = new SessionDb(dbPath, byteBufferFactory)) { assertThat(db.count()).isEqualTo(109); - db.condense(Instant.now()); + db.condense(System.currentTimeMillis(), 0); assertThat(db.count()).isEqualTo(1); + db.condense(System.currentTimeMillis(), System.currentTimeMillis()); + assertThat(db.count()).isEqualTo(0); } } diff --git a/stroom-state/stroom-planb-impl/src/test/java/stroom/planb/impl/db/TestStateDb.java b/stroom-state/stroom-planb-impl/src/test/java/stroom/planb/impl/db/TestStateDb.java index 60f7c73dbb..c613bdb086 100644 --- a/stroom-state/stroom-planb-impl/src/test/java/stroom/planb/impl/db/TestStateDb.java +++ b/stroom-state/stroom-planb-impl/src/test/java/stroom/planb/impl/db/TestStateDb.java @@ -19,8 +19,6 @@ import stroom.bytebuffer.impl6.ByteBufferFactory; import stroom.bytebuffer.impl6.ByteBufferFactoryImpl; -import stroom.cache.api.CacheManager; -import stroom.cache.impl.CacheManagerImpl; import stroom.docref.DocRef; import stroom.entity.shared.ExpressionCriteria; import stroom.pipeline.refdata.store.StringValue; @@ -67,6 +65,9 @@ class TestStateDb { + private static final String MAP_UUID = "map-uuid"; + private static final String MAP_NAME = "map-name"; + @Test void testReadWrite(@TempDir Path tempDir) { final Function keyFunction = i -> Key.builder().name("TEST_KEY").build(); @@ -121,40 +122,39 @@ void testFullProcess(@TempDir final Path rootDir) throws IOException { CompletableFuture.allOf(list.toArray(new CompletableFuture[0])).join(); // Consume and merge parts. - try (final CacheManager cacheManager = new CacheManagerImpl()) { - final PlanBDocStore planBDocStore = Mockito.mock(PlanBDocStore.class); - final PlanBDoc doc = PlanBDoc.builder().name("map-name").stateType(StateType.STATE).build(); - Mockito.when(planBDocStore.findByName(Mockito.anyString())) - .thenReturn(Collections.singletonList(doc.asDocRef())); - Mockito.when(planBDocStore.readDocument(Mockito.any(DocRef.class))) - .thenReturn(doc); - final PlanBDocCache planBDocCache = Mockito.mock(PlanBDocCache.class); - Mockito.when(planBDocCache.get(Mockito.any(String.class))) - .thenReturn(doc); - - final String path = rootDir.toAbsolutePath().toString(); - final PlanBConfig planBConfig = new PlanBConfig(path); - final ShardManager shardManager = new ShardManager( - new ByteBufferFactoryImpl(), - planBDocCache, - null, - () -> planBConfig, - statePaths, - null); - final MergeProcessor mergeProcessor = new MergeProcessor( - fileStore, - statePaths, - new MockSecurityContext(), - new SimpleTaskContextFactory(), - shardManager); - for (int i = 0; i < parts; i++) { - mergeProcessor.merge(i); - } + final PlanBDocStore planBDocStore = Mockito.mock(PlanBDocStore.class); + final PlanBDoc doc = PlanBDoc.builder().uuid(MAP_UUID).name(MAP_NAME).stateType(StateType.STATE).build(); + Mockito.when(planBDocStore.findByName(Mockito.anyString())) + .thenReturn(Collections.singletonList(doc.asDocRef())); + Mockito.when(planBDocStore.readDocument(Mockito.any(DocRef.class))) + .thenReturn(doc); + final PlanBDocCache planBDocCache = Mockito.mock(PlanBDocCache.class); + Mockito.when(planBDocCache.get(Mockito.any(String.class))) + .thenReturn(doc); + + final String path = rootDir.toAbsolutePath().toString(); + final PlanBConfig planBConfig = new PlanBConfig(path); + final ShardManager shardManager = new ShardManager( + new ByteBufferFactoryImpl(), + planBDocCache, + planBDocStore, + null, + () -> planBConfig, + statePaths, + null); + final MergeProcessor mergeProcessor = new MergeProcessor( + fileStore, + statePaths, + new MockSecurityContext(), + new SimpleTaskContextFactory(), + shardManager); + for (int i = 0; i < parts; i++) { + mergeProcessor.merge(i); } // Read merged final ByteBufferFactory byteBufferFactory = new ByteBufferFactoryImpl(); - try (final StateDb db = new StateDb(statePaths.getShardDir().resolve("map-name"), + try (final StateDb db = new StateDb(statePaths.getShardDir().resolve(MAP_UUID), byteBufferFactory, false, true)) { assertThat(db.count()).isEqualTo(2); } @@ -262,7 +262,7 @@ private void writePart(final SequentialFileStore fileStore, final String keyName return StateValue.builder().typeId(StringValue.TYPE_ID).byteBuffer(byteBuffer).build(); }; final Path partPath = Files.createTempDirectory("part"); - final Path mapPath = partPath.resolve("map-name"); + final Path mapPath = partPath.resolve(MAP_UUID); Files.createDirectories(mapPath); testWrite(mapPath, 100, keyFunction, valueFunction); final Path zipFile = Files.createTempFile("lmdb", "zip"); diff --git a/stroom-state/stroom-planb-impl/src/test/java/stroom/planb/impl/db/TestTemporalRangedStateDb.java b/stroom-state/stroom-planb-impl/src/test/java/stroom/planb/impl/db/TestTemporalRangedStateDb.java index b549d7facb..d1edcdd2a6 100644 --- a/stroom-state/stroom-planb-impl/src/test/java/stroom/planb/impl/db/TestTemporalRangedStateDb.java +++ b/stroom-state/stroom-planb-impl/src/test/java/stroom/planb/impl/db/TestTemporalRangedStateDb.java @@ -128,7 +128,7 @@ void testMerge(@TempDir final Path rootDir) throws IOException { } @Test - void testCondense(@TempDir final Path rootDir) throws IOException { + void testCondenseAndDelete(@TempDir final Path rootDir) throws IOException { final Path dbPath = rootDir.resolve("db"); Files.createDirectory(dbPath); @@ -137,8 +137,10 @@ void testCondense(@TempDir final Path rootDir) throws IOException { final ByteBufferFactory byteBufferFactory = new ByteBufferFactoryImpl(); try (final TemporalRangedStateDb db = new TemporalRangedStateDb(dbPath, byteBufferFactory)) { assertThat(db.count()).isEqualTo(100); - db.condense(Instant.now()); + db.condense(System.currentTimeMillis(), 0); assertThat(db.count()).isEqualTo(1); + db.condense(System.currentTimeMillis(), System.currentTimeMillis()); + assertThat(db.count()).isEqualTo(0); } } diff --git a/stroom-state/stroom-planb-impl/src/test/java/stroom/planb/impl/db/TestTemporalStateDb.java b/stroom-state/stroom-planb-impl/src/test/java/stroom/planb/impl/db/TestTemporalStateDb.java index 684dfc3940..516fc396eb 100644 --- a/stroom-state/stroom-planb-impl/src/test/java/stroom/planb/impl/db/TestTemporalStateDb.java +++ b/stroom-state/stroom-planb-impl/src/test/java/stroom/planb/impl/db/TestTemporalStateDb.java @@ -106,7 +106,7 @@ void testMerge(@TempDir final Path rootDir) throws IOException { } @Test - void testCondense(@TempDir final Path rootDir) throws IOException { + void testCondenseAndDelete(@TempDir final Path rootDir) throws IOException { final Path dbPath = rootDir.resolve("db"); Files.createDirectory(dbPath); @@ -115,8 +115,10 @@ void testCondense(@TempDir final Path rootDir) throws IOException { final ByteBufferFactory byteBufferFactory = new ByteBufferFactoryImpl(); try (final TemporalStateDb db = new TemporalStateDb(dbPath, byteBufferFactory)) { assertThat(db.count()).isEqualTo(100); - db.condense(Instant.now()); + db.condense(System.currentTimeMillis(), 0); assertThat(db.count()).isEqualTo(1); + db.condense(System.currentTimeMillis(), System.currentTimeMillis()); + assertThat(db.count()).isEqualTo(0); } }