Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This PR describes Ably's proposal of adding an in-memory cache within Liftbridge. The main objective is to ensure idempotency while ensuring a minimal computation time. It should also be possible to query the system for presence or data using a key.
Requirements
Proposal
This feature could be written as a plugin and loaded at runtime, and could be implemented as an interface that would allow plugins to watch and/or filter incoming messages. It would be active only on the Raft Leader, so that consistency issues do not have to be faced, for now.
Current Proof-of-Concept implementation
A simple Proof-of-Concept grade implementation is provided, and displays most of the aforementioned features. These are only preliminary developments and would require further engineering to be possibly integrated into Liftbridge. The following paragraphs provide some details on the implementation and possible ways to improve it.
Initialization
Loading the plugin is for now hardcoded in the main function. An attempt has been made to use Golang's official plugin API/build system, but some issues and limitations have been encountered (see for example golang/go#27751) that have triggered the choice to integrate the plugin in the code for now. How exactly the integration could take place will be discussed at a later stage. An interface is still used to manage the interaction between the plugin and Liftbridge, in order to keep the code as "non-invasive" as possible regarding the current codebase.
Configuration
The plugin loads a sample configuration file using the same functions as Liftbridge. The plugin is, as of now, enabled for all messages. This is only temporary. The final version should expose a configuration parameter allowing it to be enabled/disabled completely and expose an option for each stream.
Memory data storage
In order to store messages (to ensure idempotence and fast querying) by key the plugin needs a container indexed by a string. In this PoC it has been decided to use a standard Go map with a mutex. Later implementations could make use of other, more specialized containers, depending on the memory and speed requirements.
Input message filtering
On of the plugin's objective is to ensure idempotence in the case where multiple messages with the same key are received. In this case the messages should not be processed at all. This filtering is, for now, implemented as a function call in partition.messageProcessingLoop that returns a boolean. If the plugin returns false then the message is ignored, otherwise it is processed as usual.
Stream data recovery
When the Liftbridge server starts (or restarts) the plugin need to load into memory a selection of messages from each stream where memory caching is enabled. This is, for now, implemented in metadata.AddPartition, where the plugin "subscribes" to each stream/partition as it is created. This allows "loading" all messages from all streams, but is not particularly efficient since the plugin's function gets called once per loaded message per stream, so
times for n messages.
There is certainly room for improvement regarding this, but this solution seemed to be a good compromise between low-level code (reimplementing some existing functions) and high-level code (that does not necessarily allow "loading" all existing messages on start).
Keeping only some messages
This part has not been implemented yet, but should present no particular difficulty as it can be implemented as a sliding window. This window could operate using the number of messages (keep the most recent X messages or the Y most recently queried message) or the message's age (keep only messages received during the last Z minutes).
API to query using a key
Retrieving cached messages can be done using a gRPC-based API in a similar fashion as it is possible with Liftbridge. The API, for now, only allows returning a byte array when querying using a string key, a stream and a subject name. This first concept uses Liftbridge's message key as a unique identifier for storage inside the cache. A further version of this plugin could offer to use a separate key per message.