Skip to content

Commit

Permalink
feat(enriching): add memory enrichment table (#21348)
Browse files Browse the repository at this point in the history
* feat(vrl): add caching feature for VRL

This adds additional VRL functions for reading and storing data into caches that can be configured
in global options. Caches can store any VRL value and are meant to store data for shorter periods.
All data gets TTL (time-to-live) assigned, based on cache configuration and gets removed when that
TTL expires.

* Add reads and writes metrics for VRL cache

* Rename `cache_set` to `cache_put`

* Add `cache_delete` VRL function

* Add key not found test case for `cache_get`

* Add placeholder implementation for fetching TTL using `cache_get`

* Add memory enrichment table

* Add basic sink implementation for memory enrichment table

This implementation is based on `evmap`, for "lock-free" reading and writing. There is still a lock
when data is refreshed, but that can be controlled, to have less interruptions.

* Add ttl, refresh and scan intervals implementation for memory enrichment table

* Add internal events for memory enrichment table

* Remove initial implementation of VRL cache

* Remove table name from memory enrichment table events

* Add `SinkConfig` impl for `MemoryConfig`

This adds a `SinkConfig` implementation for `MemoryConfig`, making it a shared configuration for
both enrichment table component and the sink component. To ensure that the exact same `Memory`
instance is used, created `Memory` is cached inside the config instance and then shared whenever one
of the `build` variants are invoked.

`Memory` is already built with sharing between different instances in mind, since it had to be
shared across different threads and it is cloneable while keeping the same backing structure for
data.

The actual sink building is a placeholder.

* Hook up enrichment tables as sinks when possible

* Fix flushing in memory enrichment table

* Fix failing memory table tests

* Remove enrichment_tables from Graph and use them as sinks

* Wrap memory metadata in a single object for easier mutex usage

* Add byte size limit to memory enrichment table

* Remove unnecessary duplicated key from memory table entries

* Remove debugging log from memory table

* Ensure `ConfigDiff` takes tables into account

* Implement running topology changes for enrichment_tables like sinks

* Make memory tables visible in `vector top` as sinks

* Remove unnecessary clone when handling events

* Fix tests after removing clones in `handle_value`

* Reduce key clones when writing to memory table

* Store data in memory table as JSON strings instead of `Value` objects

* Enable configuration for disabling key tag in internal metrics

* Add changelog entry

* Add docs for memory enrichment_table

* Fix typo in memory table docs

* Apply suggestions from code review in documentation

Co-authored-by: May Lee <may.lee@datadoghq.com>

* Apply docs suggestions in code too

* Run scan and flush on intervals and not only on writes

* Ensure `scan_interval` can't be zero and handle zero `flush_interval`

* Rename `sinks_and_table_sinks` to `all_sinks`

* Use `Option` instead of `0` for optional memory config values

* Rename `MemoryTableInternalMetricsConfig` to `InternalMetricsConfig`

* Remove enrichment table `unwrap` from topology running

* Use `expect` instead of `unwrap` when capturing write lock

Co-authored-by: Pavlos Rontidis <pavlos.rontidis@gmail.com>

* Fix typo in `src/enrichment_tables/memory/table.rs`

* Add a how it works section for memory enrichment table

* Add documentation above `as_sink` per discussion on the PR

* Fix enrichment memory table tests

* Fix spellcheck error

* Update LICENSE-3rdparty

* Fix cue formatting

* Use NonZeroU64 new_unchecked to fix MSRV check

---------

Co-authored-by: May Lee <may.lee@datadoghq.com>
Co-authored-by: Pavlos Rontidis <pavlos.rontidis@gmail.com>
  • Loading branch information
3 people authored Jan 23, 2025
1 parent b890bf6 commit 318930b
Show file tree
Hide file tree
Showing 24 changed files with 1,429 additions and 37 deletions.
35 changes: 33 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ dirs-next = { version = "2.0.0", default-features = false, optional = true }
dyn-clone = { version = "1.0.17", default-features = false }
encoding_rs = { version = "0.8.35", default-features = false, features = ["serde"] }
enum_dispatch = { version = "0.3.13", default-features = false }
evmap = { version = "10.0.2", default-features = false, optional = true }
evmap-derive = { version = "0.2.0", default-features = false, optional = true }
exitcode = { version = "1.1.2", default-features = false }
flate2.workspace = true
futures-util = { version = "0.3.29", default-features = false }
Expand Down Expand Up @@ -372,6 +374,7 @@ tokio-tungstenite = { version = "0.20.1", default-features = false, features = [
toml.workspace = true
tonic = { workspace = true, optional = true }
hickory-proto = { workspace = true, optional = true }
thread_local = { version = "1.1.8", default-features = false, optional = true }
typetag = { version = "0.2.19", default-features = false }
url = { version = "2.5.4", default-features = false, features = ["serde"] }
warp = { version = "0.3.7", default-features = false }
Expand Down Expand Up @@ -525,9 +528,10 @@ protobuf-build = ["dep:tonic-build", "dep:prost-build"]
gcp = ["dep:base64", "dep:goauth", "dep:smpl_jwt"]

# Enrichment Tables
enrichment-tables = ["enrichment-tables-geoip", "enrichment-tables-mmdb"]
enrichment-tables = ["enrichment-tables-geoip", "enrichment-tables-mmdb", "enrichment-tables-memory"]
enrichment-tables-geoip = ["dep:maxminddb"]
enrichment-tables-mmdb = ["dep:maxminddb"]
enrichment-tables-memory = ["dep:evmap", "dep:evmap-derive", "dep:thread_local"]

# Codecs
codecs-syslog = ["vector-lib/syslog"]
Expand Down Expand Up @@ -980,7 +984,7 @@ remap-benches = ["transforms-remap"]
transform-benches = ["transforms-filter", "transforms-dedupe", "transforms-reduce", "transforms-route"]
codecs-benches = []
loki-benches = ["sinks-loki"]
enrichment-tables-benches = ["enrichment-tables-geoip", "enrichment-tables-mmdb"]
enrichment-tables-benches = ["enrichment-tables-geoip", "enrichment-tables-mmdb", "enrichment-tables-memory"]
proptest = ["dep:proptest", "dep:proptest-derive", "vrl/proptest"]

[[bench]]
Expand Down
2 changes: 2 additions & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ error-code,https://github.com/DoumanAsh/error-code,BSL-1.0,Douman <douman@gmx.se
event-listener,https://github.com/smol-rs/event-listener,Apache-2.0 OR MIT,Stjepan Glavina <stjepang@gmail.com>
event-listener,https://github.com/smol-rs/event-listener,Apache-2.0 OR MIT,"Stjepan Glavina <stjepang@gmail.com>, John Nunley <dev@notgull.net>"
event-listener-strategy,https://github.com/smol-rs/event-listener-strategy,Apache-2.0 OR MIT,John Nunley <dev@notgull.net>
evmap,https://github.com/jonhoo/rust-evmap,MIT OR Apache-2.0,Jon Gjengset <jon@thesquareplanet.com>
executor-trait,https://github.com/amqp-rs/executor-trait,Apache-2.0 OR MIT,Marc-Antoine Perennou <Marc-Antoine@Perennou.com>
exitcode,https://github.com/benwilber/exitcode,Apache-2.0,Ben Wilber <benwilber@gmail.com>
fakedata_generator,https://github.com/kevingimbel/fakedata_generator,MIT,Kevin Gimbel <hallo@kevingimbel.com>
Expand Down Expand Up @@ -267,6 +268,7 @@ group,https://github.com/zkcrypto/group,MIT OR Apache-2.0,"Sean Bowe <ewillbeful
h2,https://github.com/hyperium/h2,MIT,"Carl Lerche <me@carllerche.com>, Sean McArthur <sean@seanmonstar.com>"
half,https://github.com/starkat99/half-rs,MIT OR Apache-2.0,Kathryn Long <squeeself@gmail.com>
hash_hasher,https://github.com/Fraser999/Hash-Hasher,Apache-2.0 OR MIT,Fraser Hutchison <fraser.hutchison@maidsafe.net>
hashbag,https://github.com/jonhoo/hashbag,MIT OR Apache-2.0,Jon Gjengset <jon@thesquareplanet.com>
hashbrown,https://github.com/rust-lang/hashbrown,MIT OR Apache-2.0,Amanieu d'Antras <amanieu@gmail.com>
headers,https://github.com/hyperium/headers,MIT,Sean McArthur <sean@seanmonstar.com>
heck,https://github.com/withoutboats/heck,MIT OR Apache-2.0,The heck Authors
Expand Down
4 changes: 4 additions & 0 deletions changelog.d/21348_memory_enrichment_table.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Add a new type of `enrichment_table` - `memory`, which can also act as a sink, taking in all the
data and storing it per key, enabling it to be read from as all other enrichment tables.

authors: esensar
9 changes: 8 additions & 1 deletion src/api/schema/components/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,14 @@ pub fn update_config(config: &Config) {
}

// Sinks
for (component_key, sink) in config.sinks() {
let table_sinks = config
.enrichment_tables()
.filter_map(|(k, e)| e.as_sink().map(|s| (k, s)))
.collect::<Vec<_>>();
for (component_key, sink) in config
.sinks()
.chain(table_sinks.iter().map(|(key, sink)| (*key, sink)))
{
new_components.insert(
component_key.clone(),
Component::Sink(sink::Sink(sink::Data {
Expand Down
14 changes: 12 additions & 2 deletions src/config/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct ConfigBuilder {

/// All configured enrichment tables.
#[serde(default)]
pub enrichment_tables: IndexMap<ComponentKey, EnrichmentTableOuter>,
pub enrichment_tables: IndexMap<ComponentKey, EnrichmentTableOuter<String>>,

/// All configured sources.
#[serde(default)]
Expand Down Expand Up @@ -106,6 +106,11 @@ impl From<Config> for ConfigBuilder {
.map(|(key, sink)| (key, sink.map_inputs(ToString::to_string)))
.collect();

let enrichment_tables = enrichment_tables
.into_iter()
.map(|(key, table)| (key, table.map_inputs(ToString::to_string)))
.collect();

let tests = tests.into_iter().map(TestDefinition::stringify).collect();

ConfigBuilder {
Expand Down Expand Up @@ -145,11 +150,16 @@ impl ConfigBuilder {
pub fn add_enrichment_table<K: Into<String>, E: Into<EnrichmentTables>>(
&mut self,
key: K,
inputs: &[&str],
enrichment_table: E,
) {
let inputs = inputs
.iter()
.map(|value| value.to_string())
.collect::<Vec<_>>();
self.enrichment_tables.insert(
ComponentKey::from(key.into()),
EnrichmentTableOuter::new(enrichment_table),
EnrichmentTableOuter::new(inputs, enrichment_table),
);
}

Expand Down
20 changes: 18 additions & 2 deletions src/config/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::{
OutputId,
};

use indexmap::IndexSet;
use indexmap::{IndexMap, IndexSet};
use vector_lib::id::Inputs;

pub fn compile(mut builder: ConfigBuilder) -> Result<(Config, Vec<String>), Vec<String>> {
Expand Down Expand Up @@ -52,8 +52,17 @@ pub fn compile(mut builder: ConfigBuilder) -> Result<(Config, Vec<String>), Vec<
graceful_shutdown_duration,
allow_empty: _,
} = builder;
let all_sinks = sinks
.clone()
.into_iter()
.chain(
enrichment_tables
.iter()
.filter_map(|(key, table)| table.as_sink().map(|s| (key.clone(), s))),
)
.collect::<IndexMap<_, _>>();

let graph = match Graph::new(&sources, &transforms, &sinks, schema) {
let graph = match Graph::new(&sources, &transforms, &all_sinks, schema) {
Ok(graph) => graph,
Err(graph_errors) => {
errors.extend(graph_errors);
Expand Down Expand Up @@ -85,6 +94,13 @@ pub fn compile(mut builder: ConfigBuilder) -> Result<(Config, Vec<String>), Vec<
(key, transform.with_inputs(inputs))
})
.collect();
let enrichment_tables = enrichment_tables
.into_iter()
.map(|(key, table)| {
let inputs = graph.inputs_for(&key);
(key, table.with_inputs(inputs))
})
.collect();
let tests = tests
.into_iter()
.map(|test| test.resolve_outputs(&graph))
Expand Down
8 changes: 7 additions & 1 deletion src/config/diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,32 @@ impl ConfigDiff {
self.sources.flip();
self.transforms.flip();
self.sinks.flip();
self.enrichment_tables.flip();
self
}

/// Checks whether or not the given component is present at all.
pub fn contains(&self, key: &ComponentKey) -> bool {
self.sources.contains(key) || self.transforms.contains(key) || self.sinks.contains(key)
self.sources.contains(key)
|| self.transforms.contains(key)
|| self.sinks.contains(key)
|| self.enrichment_tables.contains(key)
}

/// Checks whether or not the given component is changed.
pub fn is_changed(&self, key: &ComponentKey) -> bool {
self.sources.is_changed(key)
|| self.transforms.is_changed(key)
|| self.sinks.is_changed(key)
|| self.enrichment_tables.contains(key)
}

/// Checks whether or not the given component is removed.
pub fn is_removed(&self, key: &ComponentKey) -> bool {
self.sources.is_removed(key)
|| self.transforms.is_removed(key)
|| self.sinks.is_removed(key)
|| self.enrichment_tables.contains(key)
}
}

Expand Down
82 changes: 78 additions & 4 deletions src/config/enrichment_table.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,91 @@
use enum_dispatch::enum_dispatch;
use serde::Serialize;
use vector_lib::config::GlobalOptions;
use vector_lib::configurable::{configurable_component, NamedComponent};
use vector_lib::configurable::{configurable_component, Configurable, NamedComponent, ToValue};
use vector_lib::id::Inputs;

use crate::enrichment_tables::EnrichmentTables;

use super::dot_graph::GraphConfig;
use super::{SinkConfig, SinkOuter};

/// Fully resolved enrichment table component.
#[configurable_component]
#[derive(Clone, Debug)]
pub struct EnrichmentTableOuter {
pub struct EnrichmentTableOuter<T>
where
T: Configurable + Serialize + 'static + ToValue + Clone,
{
#[serde(flatten)]
pub inner: EnrichmentTables,
#[configurable(derived)]
#[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
pub graph: GraphConfig,
#[configurable(derived)]
#[serde(
default = "Inputs::<T>::default",
skip_serializing_if = "Inputs::is_empty"
)]
pub inputs: Inputs<T>,
}

impl EnrichmentTableOuter {
pub fn new<I: Into<EnrichmentTables>>(inner: I) -> Self {
impl<T> EnrichmentTableOuter<T>
where
T: Configurable + Serialize + 'static + ToValue + Clone,
{
pub fn new<I, IET>(inputs: I, inner: IET) -> Self
where
I: IntoIterator<Item = T>,
IET: Into<EnrichmentTables>,
{
Self {
inner: inner.into(),
graph: Default::default(),
inputs: Inputs::from_iter(inputs),
}
}

// Components are currently built in a way that they match exactly one of the roles (source,
// transform, sink, enrichment table). Due to specific requirements of the "memory" enrichment
// table, it has to fulfill 2 of these roles (sink and enrichment table). To reduce the impact
// of this very specific requirement, any enrichment table can now be optionally mapped into a
// sink, but this will only work for a "memory" enrichment table, since other tables will not
// have a "sink_config" present.
// This is also not ideal, since `SinkOuter` is not meant to represent the actual configuration,
// but it should just be a representation of that config used for deserialization.
// In the future, if more such components come up, it would be good to limit such "Outer"
// components to deserialization and build up the components and the topology in a more granular
// way, with each having "modules" for inputs (making them valid as sinks), for healthchecks,
// for providing outputs, etc.
pub fn as_sink(&self) -> Option<SinkOuter<T>> {
self.inner.sink_config().map(|sink| SinkOuter {
graph: self.graph.clone(),
inputs: self.inputs.clone(),
healthcheck_uri: None,
healthcheck: Default::default(),
buffer: Default::default(),
proxy: Default::default(),
inner: sink,
})
}

pub(super) fn map_inputs<U>(self, f: impl Fn(&T) -> U) -> EnrichmentTableOuter<U>
where
U: Configurable + Serialize + 'static + ToValue + Clone,
{
let inputs = self.inputs.iter().map(f).collect::<Vec<_>>();
self.with_inputs(inputs)
}

pub(crate) fn with_inputs<I, U>(self, inputs: I) -> EnrichmentTableOuter<U>
where
I: IntoIterator<Item = U>,
U: Configurable + Serialize + 'static + ToValue + Clone,
{
EnrichmentTableOuter {
inputs: Inputs::from_iter(inputs),
inner: self.inner,
graph: self.graph,
}
}
}
Expand All @@ -36,4 +106,8 @@ pub trait EnrichmentTableConfig: NamedComponent + core::fmt::Debug + Send + Sync
&self,
globals: &GlobalOptions,
) -> crate::Result<Box<dyn vector_lib::enrichment::Table + Send + Sync>>;

fn sink_config(&self) -> Option<Box<dyn SinkConfig>> {
None
}
}
4 changes: 2 additions & 2 deletions src/config/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl Graph {
);
}

for (id, config) in sinks.iter() {
for (id, config) in sinks {
graph.nodes.insert(
id.clone(),
Node::Sink {
Expand All @@ -133,7 +133,7 @@ impl Graph {
}
}

for (id, config) in sinks.iter() {
for (id, config) in sinks {
for input in config.inputs.iter() {
if let Err(e) = graph.add_input(input, id, &available_inputs) {
errors.push(e);
Expand Down
2 changes: 1 addition & 1 deletion src/config/loading/config_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Process for ConfigBuilderLoader {
}
Some(ComponentHint::EnrichmentTable) => {
self.builder.enrichment_tables.extend(deserialize_table::<
IndexMap<ComponentKey, EnrichmentTableOuter>,
IndexMap<ComponentKey, EnrichmentTableOuter<_>>,
>(table)?);
}
Some(ComponentHint::Test) => {
Expand Down
Loading

0 comments on commit 318930b

Please sign in to comment.