diff --git a/Cargo.lock b/Cargo.lock index 5db3d7c7540b7..dfdcac6a86da6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3497,6 +3497,28 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "evmap" +version = "10.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e3ea06a83f97d3dc2eb06e51e7a729b418f0717a5558a5c870e3d5156dc558d" +dependencies = [ + "hashbag", + "slab", + "smallvec", +] + +[[package]] +name = "evmap-derive" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "332b1937705b7ed2fce76837024e9ae6f41cd2ad18a32c052de081f89982561b" +dependencies = [ + "proc-macro2 1.0.93", + "quote 1.0.38", + "syn 1.0.109", +] + [[package]] name = "executor-trait" version = "2.1.0" @@ -4131,6 +4153,12 @@ version = "2.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74721d007512d0cb3338cd20f0654ac913920061a4c4d0d8708edb3f2a698c0c" +[[package]] +name = "hashbag" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98f494b2060b2a8f5e63379e1e487258e014cee1b1725a735816c0107a2e9d93" + [[package]] name = "hashbrown" version = "0.12.3" @@ -9788,9 +9816,9 @@ dependencies = [ [[package]] name = "thread_local" -version = "1.1.7" +version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" dependencies = [ "cfg-if", "once_cell", @@ -11004,6 +11032,8 @@ dependencies = [ "dyn-clone", "encoding_rs", "enum_dispatch", + "evmap", + "evmap-derive", "exitcode", "fakedata", "flate2", @@ -11106,6 +11136,7 @@ dependencies = [ "syslog", "tempfile", "test-generator", + "thread_local", "tikv-jemallocator", "tokio", "tokio-openssl", diff --git a/Cargo.toml b/Cargo.toml index c4ac0ffdf9cf5..d2e01c3fa2adb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -311,6 +311,8 @@ dirs-next = { version = "2.0.0", default-features = false, optional = true } dyn-clone = { version = "1.0.17", default-features = false } encoding_rs = { version = "0.8.35", default-features = false, features = ["serde"] } enum_dispatch = { version = "0.3.13", default-features = false } +evmap = { version = "10.0.2", default-features = false, optional = true } +evmap-derive = { version = "0.2.0", default-features = false, optional = true } exitcode = { version = "1.1.2", default-features = false } flate2.workspace = true futures-util = { version = "0.3.29", default-features = false } @@ -372,6 +374,7 @@ tokio-tungstenite = { version = "0.20.1", default-features = false, features = [ toml.workspace = true tonic = { workspace = true, optional = true } hickory-proto = { workspace = true, optional = true } +thread_local = { version = "1.1.8", default-features = false, optional = true } typetag = { version = "0.2.19", default-features = false } url = { version = "2.5.4", default-features = false, features = ["serde"] } warp = { version = "0.3.7", default-features = false } @@ -525,9 +528,10 @@ protobuf-build = ["dep:tonic-build", "dep:prost-build"] gcp = ["dep:base64", "dep:goauth", "dep:smpl_jwt"] # Enrichment Tables -enrichment-tables = ["enrichment-tables-geoip", "enrichment-tables-mmdb"] +enrichment-tables = ["enrichment-tables-geoip", "enrichment-tables-mmdb", "enrichment-tables-memory"] enrichment-tables-geoip = ["dep:maxminddb"] enrichment-tables-mmdb = ["dep:maxminddb"] +enrichment-tables-memory = ["dep:evmap", "dep:evmap-derive", "dep:thread_local"] # Codecs codecs-syslog = ["vector-lib/syslog"] @@ -980,7 +984,7 @@ remap-benches = ["transforms-remap"] transform-benches = ["transforms-filter", "transforms-dedupe", "transforms-reduce", "transforms-route"] codecs-benches = [] loki-benches = ["sinks-loki"] -enrichment-tables-benches = ["enrichment-tables-geoip", "enrichment-tables-mmdb"] +enrichment-tables-benches = ["enrichment-tables-geoip", "enrichment-tables-mmdb", "enrichment-tables-memory"] proptest = ["dep:proptest", "dep:proptest-derive", "vrl/proptest"] [[bench]] diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 9e634db4709e8..0f9b5e396389a 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -216,6 +216,7 @@ error-code,https://github.com/DoumanAsh/error-code,BSL-1.0,Douman event-listener,https://github.com/smol-rs/event-listener,Apache-2.0 OR MIT,"Stjepan Glavina , John Nunley " event-listener-strategy,https://github.com/smol-rs/event-listener-strategy,Apache-2.0 OR MIT,John Nunley +evmap,https://github.com/jonhoo/rust-evmap,MIT OR Apache-2.0,Jon Gjengset executor-trait,https://github.com/amqp-rs/executor-trait,Apache-2.0 OR MIT,Marc-Antoine Perennou exitcode,https://github.com/benwilber/exitcode,Apache-2.0,Ben Wilber fakedata_generator,https://github.com/kevingimbel/fakedata_generator,MIT,Kevin Gimbel @@ -267,6 +268,7 @@ group,https://github.com/zkcrypto/group,MIT OR Apache-2.0,"Sean Bowe , Sean McArthur " half,https://github.com/starkat99/half-rs,MIT OR Apache-2.0,Kathryn Long hash_hasher,https://github.com/Fraser999/Hash-Hasher,Apache-2.0 OR MIT,Fraser Hutchison +hashbag,https://github.com/jonhoo/hashbag,MIT OR Apache-2.0,Jon Gjengset hashbrown,https://github.com/rust-lang/hashbrown,MIT OR Apache-2.0,Amanieu d'Antras headers,https://github.com/hyperium/headers,MIT,Sean McArthur heck,https://github.com/withoutboats/heck,MIT OR Apache-2.0,The heck Authors diff --git a/changelog.d/21348_memory_enrichment_table.feature.md b/changelog.d/21348_memory_enrichment_table.feature.md new file mode 100644 index 0000000000000..9a87239968ffc --- /dev/null +++ b/changelog.d/21348_memory_enrichment_table.feature.md @@ -0,0 +1,4 @@ +Add a new type of `enrichment_table` - `memory`, which can also act as a sink, taking in all the +data and storing it per key, enabling it to be read from as all other enrichment tables. + +authors: esensar diff --git a/src/api/schema/components/mod.rs b/src/api/schema/components/mod.rs index 7398a7c2154d1..5edb2885fe3a6 100644 --- a/src/api/schema/components/mod.rs +++ b/src/api/schema/components/mod.rs @@ -301,7 +301,14 @@ pub fn update_config(config: &Config) { } // Sinks - for (component_key, sink) in config.sinks() { + let table_sinks = config + .enrichment_tables() + .filter_map(|(k, e)| e.as_sink().map(|s| (k, s))) + .collect::>(); + for (component_key, sink) in config + .sinks() + .chain(table_sinks.iter().map(|(key, sink)| (*key, sink))) + { new_components.insert( component_key.clone(), Component::Sink(sink::Sink(sink::Data { diff --git a/src/config/builder.rs b/src/config/builder.rs index 3a0413e38aafc..351122aa5ca7d 100644 --- a/src/config/builder.rs +++ b/src/config/builder.rs @@ -38,7 +38,7 @@ pub struct ConfigBuilder { /// All configured enrichment tables. #[serde(default)] - pub enrichment_tables: IndexMap, + pub enrichment_tables: IndexMap>, /// All configured sources. #[serde(default)] @@ -106,6 +106,11 @@ impl From for ConfigBuilder { .map(|(key, sink)| (key, sink.map_inputs(ToString::to_string))) .collect(); + let enrichment_tables = enrichment_tables + .into_iter() + .map(|(key, table)| (key, table.map_inputs(ToString::to_string))) + .collect(); + let tests = tests.into_iter().map(TestDefinition::stringify).collect(); ConfigBuilder { @@ -145,11 +150,16 @@ impl ConfigBuilder { pub fn add_enrichment_table, E: Into>( &mut self, key: K, + inputs: &[&str], enrichment_table: E, ) { + let inputs = inputs + .iter() + .map(|value| value.to_string()) + .collect::>(); self.enrichment_tables.insert( ComponentKey::from(key.into()), - EnrichmentTableOuter::new(enrichment_table), + EnrichmentTableOuter::new(inputs, enrichment_table), ); } diff --git a/src/config/compiler.rs b/src/config/compiler.rs index 28015e01ab63e..ff661bcc9f231 100644 --- a/src/config/compiler.rs +++ b/src/config/compiler.rs @@ -3,7 +3,7 @@ use super::{ OutputId, }; -use indexmap::IndexSet; +use indexmap::{IndexMap, IndexSet}; use vector_lib::id::Inputs; pub fn compile(mut builder: ConfigBuilder) -> Result<(Config, Vec), Vec> { @@ -52,8 +52,17 @@ pub fn compile(mut builder: ConfigBuilder) -> Result<(Config, Vec), Vec< graceful_shutdown_duration, allow_empty: _, } = builder; + let all_sinks = sinks + .clone() + .into_iter() + .chain( + enrichment_tables + .iter() + .filter_map(|(key, table)| table.as_sink().map(|s| (key.clone(), s))), + ) + .collect::>(); - let graph = match Graph::new(&sources, &transforms, &sinks, schema) { + let graph = match Graph::new(&sources, &transforms, &all_sinks, schema) { Ok(graph) => graph, Err(graph_errors) => { errors.extend(graph_errors); @@ -85,6 +94,13 @@ pub fn compile(mut builder: ConfigBuilder) -> Result<(Config, Vec), Vec< (key, transform.with_inputs(inputs)) }) .collect(); + let enrichment_tables = enrichment_tables + .into_iter() + .map(|(key, table)| { + let inputs = graph.inputs_for(&key); + (key, table.with_inputs(inputs)) + }) + .collect(); let tests = tests .into_iter() .map(|test| test.resolve_outputs(&graph)) diff --git a/src/config/diff.rs b/src/config/diff.rs index da5ed54faeb9d..1a9097c71e59d 100644 --- a/src/config/diff.rs +++ b/src/config/diff.rs @@ -31,12 +31,16 @@ impl ConfigDiff { self.sources.flip(); self.transforms.flip(); self.sinks.flip(); + self.enrichment_tables.flip(); self } /// Checks whether or not the given component is present at all. pub fn contains(&self, key: &ComponentKey) -> bool { - self.sources.contains(key) || self.transforms.contains(key) || self.sinks.contains(key) + self.sources.contains(key) + || self.transforms.contains(key) + || self.sinks.contains(key) + || self.enrichment_tables.contains(key) } /// Checks whether or not the given component is changed. @@ -44,6 +48,7 @@ impl ConfigDiff { self.sources.is_changed(key) || self.transforms.is_changed(key) || self.sinks.is_changed(key) + || self.enrichment_tables.contains(key) } /// Checks whether or not the given component is removed. @@ -51,6 +56,7 @@ impl ConfigDiff { self.sources.is_removed(key) || self.transforms.is_removed(key) || self.sinks.is_removed(key) + || self.enrichment_tables.contains(key) } } diff --git a/src/config/enrichment_table.rs b/src/config/enrichment_table.rs index 5e2cd72a00858..2193c3ef71933 100644 --- a/src/config/enrichment_table.rs +++ b/src/config/enrichment_table.rs @@ -1,21 +1,91 @@ use enum_dispatch::enum_dispatch; +use serde::Serialize; use vector_lib::config::GlobalOptions; -use vector_lib::configurable::{configurable_component, NamedComponent}; +use vector_lib::configurable::{configurable_component, Configurable, NamedComponent, ToValue}; +use vector_lib::id::Inputs; use crate::enrichment_tables::EnrichmentTables; +use super::dot_graph::GraphConfig; +use super::{SinkConfig, SinkOuter}; + /// Fully resolved enrichment table component. #[configurable_component] #[derive(Clone, Debug)] -pub struct EnrichmentTableOuter { +pub struct EnrichmentTableOuter +where + T: Configurable + Serialize + 'static + ToValue + Clone, +{ #[serde(flatten)] pub inner: EnrichmentTables, + #[configurable(derived)] + #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")] + pub graph: GraphConfig, + #[configurable(derived)] + #[serde( + default = "Inputs::::default", + skip_serializing_if = "Inputs::is_empty" + )] + pub inputs: Inputs, } -impl EnrichmentTableOuter { - pub fn new>(inner: I) -> Self { +impl EnrichmentTableOuter +where + T: Configurable + Serialize + 'static + ToValue + Clone, +{ + pub fn new(inputs: I, inner: IET) -> Self + where + I: IntoIterator, + IET: Into, + { Self { inner: inner.into(), + graph: Default::default(), + inputs: Inputs::from_iter(inputs), + } + } + + // Components are currently built in a way that they match exactly one of the roles (source, + // transform, sink, enrichment table). Due to specific requirements of the "memory" enrichment + // table, it has to fulfill 2 of these roles (sink and enrichment table). To reduce the impact + // of this very specific requirement, any enrichment table can now be optionally mapped into a + // sink, but this will only work for a "memory" enrichment table, since other tables will not + // have a "sink_config" present. + // This is also not ideal, since `SinkOuter` is not meant to represent the actual configuration, + // but it should just be a representation of that config used for deserialization. + // In the future, if more such components come up, it would be good to limit such "Outer" + // components to deserialization and build up the components and the topology in a more granular + // way, with each having "modules" for inputs (making them valid as sinks), for healthchecks, + // for providing outputs, etc. + pub fn as_sink(&self) -> Option> { + self.inner.sink_config().map(|sink| SinkOuter { + graph: self.graph.clone(), + inputs: self.inputs.clone(), + healthcheck_uri: None, + healthcheck: Default::default(), + buffer: Default::default(), + proxy: Default::default(), + inner: sink, + }) + } + + pub(super) fn map_inputs(self, f: impl Fn(&T) -> U) -> EnrichmentTableOuter + where + U: Configurable + Serialize + 'static + ToValue + Clone, + { + let inputs = self.inputs.iter().map(f).collect::>(); + self.with_inputs(inputs) + } + + pub(crate) fn with_inputs(self, inputs: I) -> EnrichmentTableOuter + where + I: IntoIterator, + U: Configurable + Serialize + 'static + ToValue + Clone, + { + EnrichmentTableOuter { + inputs: Inputs::from_iter(inputs), + inner: self.inner, + graph: self.graph, } } } @@ -36,4 +106,8 @@ pub trait EnrichmentTableConfig: NamedComponent + core::fmt::Debug + Send + Sync &self, globals: &GlobalOptions, ) -> crate::Result>; + + fn sink_config(&self) -> Option> { + None + } } diff --git a/src/config/graph.rs b/src/config/graph.rs index 70c59399ec98d..9ed1af6ff17fe 100644 --- a/src/config/graph.rs +++ b/src/config/graph.rs @@ -112,7 +112,7 @@ impl Graph { ); } - for (id, config) in sinks.iter() { + for (id, config) in sinks { graph.nodes.insert( id.clone(), Node::Sink { @@ -133,7 +133,7 @@ impl Graph { } } - for (id, config) in sinks.iter() { + for (id, config) in sinks { for input in config.inputs.iter() { if let Err(e) = graph.add_input(input, id, &available_inputs) { errors.push(e); diff --git a/src/config/loading/config_builder.rs b/src/config/loading/config_builder.rs index 1e40d38b985c4..4ef0010d6fb20 100644 --- a/src/config/loading/config_builder.rs +++ b/src/config/loading/config_builder.rs @@ -63,7 +63,7 @@ impl Process for ConfigBuilderLoader { } Some(ComponentHint::EnrichmentTable) => { self.builder.enrichment_tables.extend(deserialize_table::< - IndexMap, + IndexMap>, >(table)?); } Some(ComponentHint::Test) => { diff --git a/src/config/mod.rs b/src/config/mod.rs index 2326a8bc0eb02..bb3082e8cef0f 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -26,7 +26,7 @@ mod builder; mod cmd; mod compiler; mod diff; -mod dot_graph; +pub mod dot_graph; mod enrichment_table; pub mod format; mod graph; @@ -104,7 +104,7 @@ pub struct Config { sources: IndexMap, sinks: IndexMap>, transforms: IndexMap>, - pub enrichment_tables: IndexMap, + pub enrichment_tables: IndexMap>, tests: Vec, secret: IndexMap, pub graceful_shutdown_duration: Option, @@ -143,11 +143,22 @@ impl Config { self.sinks.get(id) } + pub fn enrichment_tables( + &self, + ) -> impl Iterator)> { + self.enrichment_tables.iter() + } + + pub fn enrichment_table(&self, id: &ComponentKey) -> Option<&EnrichmentTableOuter> { + self.enrichment_tables.get(id) + } + pub fn inputs_for_node(&self, id: &ComponentKey) -> Option<&[OutputId]> { self.transforms .get(id) .map(|t| &t.inputs[..]) .or_else(|| self.sinks.get(id).map(|s| &s.inputs[..])) + .or_else(|| self.enrichment_tables.get(id).map(|s| &s.inputs[..])) } pub fn propagate_acknowledgements(&mut self) -> Result<(), Vec> { diff --git a/src/config/sink.rs b/src/config/sink.rs index a322863f11926..7c07575589893 100644 --- a/src/config/sink.rs +++ b/src/config/sink.rs @@ -65,11 +65,11 @@ where /// This must be a valid URI, which requires at least the scheme and host. All other /// components -- port, path, etc -- are allowed as well. #[configurable(deprecated, metadata(docs::hidden), validation(format = "uri"))] - healthcheck_uri: Option, + pub healthcheck_uri: Option, #[configurable(derived, metadata(docs::advanced))] #[serde(default, deserialize_with = "crate::serde::bool_or_struct")] - healthcheck: SinkHealthcheckOptions, + pub healthcheck: SinkHealthcheckOptions, #[configurable(derived)] #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")] @@ -77,7 +77,7 @@ where #[configurable(derived)] #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")] - proxy: ProxyConfig, + pub proxy: ProxyConfig, #[serde(flatten)] #[configurable(metadata(docs::hidden))] diff --git a/src/config/validation.rs b/src/config/validation.rs index 56423e6aa742a..48f8ebbe1b785 100644 --- a/src/config/validation.rs +++ b/src/config/validation.rs @@ -349,6 +349,11 @@ pub fn warnings(config: &Config) -> Vec { .collect::>() }); + let table_sinks = config + .enrichment_tables + .iter() + .filter_map(|(key, table)| table.as_sink().map(|s| (key, s))) + .collect::>(); for (input_type, id) in transform_ids.chain(source_ids) { if !config .transforms @@ -358,6 +363,9 @@ pub fn warnings(config: &Config) -> Vec { .sinks .iter() .any(|(_, sink)| sink.inputs.contains(&id)) + && !table_sinks + .iter() + .any(|(_, sink)| sink.inputs.contains(&id)) { warnings.push(format!( "{} \"{}\" has no consumers", diff --git a/src/enrichment_tables/memory/config.rs b/src/enrichment_tables/memory/config.rs new file mode 100644 index 0000000000000..d617dbd742e11 --- /dev/null +++ b/src/enrichment_tables/memory/config.rs @@ -0,0 +1,137 @@ +use std::num::NonZeroU64; +use std::sync::Arc; + +use crate::sinks::Healthcheck; +use crate::{config::SinkContext, enrichment_tables::memory::Memory}; +use async_trait::async_trait; +use futures::{future, FutureExt}; +use tokio::sync::Mutex; +use vector_lib::config::{AcknowledgementsConfig, Input}; +use vector_lib::enrichment::Table; +use vector_lib::{configurable::configurable_component, sink::VectorSink}; + +use crate::config::{EnrichmentTableConfig, SinkConfig}; + +use super::internal_events::InternalMetricsConfig; + +/// Configuration for the `memory` enrichment table. +#[configurable_component(enrichment_table("memory"))] +#[derive(Clone)] +pub struct MemoryConfig { + /// TTL (time-to-live in seconds) is used to limit the lifetime of data stored in the cache. + /// When TTL expires, data behind a specific key in the cache is removed. + /// TTL is reset when the key is replaced. + #[serde(default = "default_ttl")] + pub ttl: u64, + /// The scan interval used to look for expired records. This is provided + /// as an optimization to ensure that TTL is updated, but without doing + /// too many cache scans. + #[serde(default = "default_scan_interval")] + pub scan_interval: NonZeroU64, + /// The interval used for making writes visible in the table. + /// Longer intervals might get better performance, + /// but there is a longer delay before the data is visible in the table. + /// Since every TTL scan makes its changes visible, only use this value + /// if it is shorter than the `scan_interval`. + /// + /// By default, all writes are made visible immediately. + #[serde(skip_serializing_if = "vector_lib::serde::is_default")] + pub flush_interval: Option, + /// Maximum size of the table in bytes. All insertions that make + /// this table bigger than the maximum size are rejected. + /// + /// By default, there is no size limit. + #[serde(skip_serializing_if = "vector_lib::serde::is_default")] + pub max_byte_size: Option, + + /// Configuration of internal metrics + #[configurable(derived)] + #[serde(default)] + pub internal_metrics: InternalMetricsConfig, + + #[serde(skip)] + memory: Arc>>>, +} + +impl PartialEq for MemoryConfig { + fn eq(&self, other: &Self) -> bool { + self.ttl == other.ttl + && self.scan_interval == other.scan_interval + && self.flush_interval == other.flush_interval + } +} +impl Eq for MemoryConfig {} + +impl Default for MemoryConfig { + fn default() -> Self { + Self { + ttl: default_ttl(), + scan_interval: default_scan_interval(), + flush_interval: None, + memory: Arc::new(Mutex::new(None)), + max_byte_size: None, + internal_metrics: InternalMetricsConfig::default(), + } + } +} + +const fn default_ttl() -> u64 { + 600 +} + +const fn default_scan_interval() -> NonZeroU64 { + unsafe { NonZeroU64::new_unchecked(30) } +} + +impl MemoryConfig { + async fn get_or_build_memory(&self) -> Memory { + let mut boxed_memory = self.memory.lock().await; + *boxed_memory + .get_or_insert_with(|| Box::new(Memory::new(self.clone()))) + .clone() + } +} + +impl EnrichmentTableConfig for MemoryConfig { + async fn build( + &self, + _globals: &crate::config::GlobalOptions, + ) -> crate::Result> { + Ok(Box::new(self.get_or_build_memory().await)) + } + + fn sink_config(&self) -> Option> { + Some(Box::new(self.clone())) + } +} + +#[async_trait] +#[typetag::serde(name = "memory_enrichment_table")] +impl SinkConfig for MemoryConfig { + async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let sink = VectorSink::from_event_streamsink(self.get_or_build_memory().await); + + Ok((sink, future::ok(()).boxed())) + } + + fn input(&self) -> Input { + Input::log() + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &AcknowledgementsConfig::DEFAULT + } +} + +impl std::fmt::Debug for MemoryConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MemoryConfig") + .field("ttl", &self.ttl) + .field("scan_interval", &self.scan_interval) + .field("flush_interval", &self.flush_interval) + .field("max_byte_size", &self.max_byte_size) + .finish() + } +} + +impl_generate_config_from_default!(MemoryConfig); diff --git a/src/enrichment_tables/memory/internal_events.rs b/src/enrichment_tables/memory/internal_events.rs new file mode 100644 index 0000000000000..7a954389388d4 --- /dev/null +++ b/src/enrichment_tables/memory/internal_events.rs @@ -0,0 +1,154 @@ +use metrics::{counter, gauge}; +use vector_lib::configurable::configurable_component; +use vector_lib::internal_event::InternalEvent; + +/// Configuration of internal metrics for enrichment memory table. +#[configurable_component] +#[derive(Clone, Debug, PartialEq, Eq, Default)] +#[serde(deny_unknown_fields)] +pub struct InternalMetricsConfig { + /// Determines whether to include the key tag on internal metrics. + /// + /// This is useful for distinguishing between different keys while monitoring. However, the tag's + /// cardinality is unbounded. + #[serde(default = "crate::serde::default_false")] + pub include_key_tag: bool, +} + +#[derive(Debug)] +pub(crate) struct MemoryEnrichmentTableRead<'a> { + pub key: &'a str, + pub include_key_metric_tag: bool, +} + +impl InternalEvent for MemoryEnrichmentTableRead<'_> { + fn emit(self) { + if self.include_key_metric_tag { + counter!( + "memory_enrichment_table_reads_total", + "key" => self.key.to_owned() + ) + .increment(1); + } else { + counter!("memory_enrichment_table_reads_total",).increment(1); + } + } + + fn name(&self) -> Option<&'static str> { + Some("MemoryEnrichmentTableRead") + } +} + +#[derive(Debug)] +pub(crate) struct MemoryEnrichmentTableInserted<'a> { + pub key: &'a str, + pub include_key_metric_tag: bool, +} + +impl InternalEvent for MemoryEnrichmentTableInserted<'_> { + fn emit(self) { + if self.include_key_metric_tag { + counter!( + "memory_enrichment_table_insertions_total", + "key" => self.key.to_owned() + ) + .increment(1); + } else { + counter!("memory_enrichment_table_insertions_total",).increment(1); + } + } + + fn name(&self) -> Option<&'static str> { + Some("MemoryEnrichmentTableInserted") + } +} + +#[derive(Debug)] +pub(crate) struct MemoryEnrichmentTableFlushed { + pub new_objects_count: usize, + pub new_byte_size: usize, +} + +impl InternalEvent for MemoryEnrichmentTableFlushed { + fn emit(self) { + counter!("memory_enrichment_table_flushes_total",).increment(1); + gauge!("memory_enrichment_table_objects_count",).set(self.new_objects_count as f64); + gauge!("memory_enrichment_table_byte_size",).set(self.new_byte_size as f64); + } + + fn name(&self) -> Option<&'static str> { + Some("MemoryEnrichmentTableFlushed") + } +} + +#[derive(Debug)] +pub(crate) struct MemoryEnrichmentTableTtlExpired<'a> { + pub key: &'a str, + pub include_key_metric_tag: bool, +} + +impl InternalEvent for MemoryEnrichmentTableTtlExpired<'_> { + fn emit(self) { + if self.include_key_metric_tag { + counter!( + "memory_enrichment_table_ttl_expirations", + "key" => self.key.to_owned() + ) + .increment(1); + } else { + counter!("memory_enrichment_table_ttl_expirations",).increment(1); + } + } + + fn name(&self) -> Option<&'static str> { + Some("MemoryEnrichmentTableTtlExpired") + } +} + +#[derive(Debug)] +pub(crate) struct MemoryEnrichmentTableReadFailed<'a> { + pub key: &'a str, + pub include_key_metric_tag: bool, +} + +impl InternalEvent for MemoryEnrichmentTableReadFailed<'_> { + fn emit(self) { + if self.include_key_metric_tag { + counter!( + "memory_enrichment_table_failed_reads", + "key" => self.key.to_owned() + ) + .increment(1); + } else { + counter!("memory_enrichment_table_failed_reads",).increment(1); + } + } + + fn name(&self) -> Option<&'static str> { + Some("MemoryEnrichmentTableReadFailed") + } +} + +#[derive(Debug)] +pub(crate) struct MemoryEnrichmentTableInsertFailed<'a> { + pub key: &'a str, + pub include_key_metric_tag: bool, +} + +impl InternalEvent for MemoryEnrichmentTableInsertFailed<'_> { + fn emit(self) { + if self.include_key_metric_tag { + counter!( + "memory_enrichment_table_failed_insertions", + "key" => self.key.to_owned() + ) + .increment(1); + } else { + counter!("memory_enrichment_table_failed_insertions",).increment(1); + } + } + + fn name(&self) -> Option<&'static str> { + Some("MemoryEnrichmentTableInsertFailed") + } +} diff --git a/src/enrichment_tables/memory/mod.rs b/src/enrichment_tables/memory/mod.rs new file mode 100644 index 0000000000000..36a1ed878e25f --- /dev/null +++ b/src/enrichment_tables/memory/mod.rs @@ -0,0 +1,8 @@ +//! Handles enrichment tables for `type = memory`. + +mod config; +mod internal_events; +mod table; + +pub use config::*; +pub use table::*; diff --git a/src/enrichment_tables/memory/table.rs b/src/enrichment_tables/memory/table.rs new file mode 100644 index 0000000000000..9c52149dadc42 --- /dev/null +++ b/src/enrichment_tables/memory/table.rs @@ -0,0 +1,617 @@ +use crate::enrichment_tables::memory::internal_events::{ + MemoryEnrichmentTableFlushed, MemoryEnrichmentTableInsertFailed, MemoryEnrichmentTableInserted, + MemoryEnrichmentTableRead, MemoryEnrichmentTableReadFailed, MemoryEnrichmentTableTtlExpired, +}; +use crate::enrichment_tables::memory::MemoryConfig; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use evmap::shallow_copy::CopyValue; +use evmap::{self}; +use evmap_derive::ShallowCopy; +use thread_local::ThreadLocal; +use tokio::time::interval; +use tokio_stream::wrappers::IntervalStream; +use vector_lib::{ByteSizeOf, EstimatedJsonEncodedSizeOf}; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::stream::BoxStream; +use tokio_stream::StreamExt; +use vector_lib::enrichment::{Case, Condition, IndexHandle, Table}; +use vector_lib::event::{Event, EventStatus, Finalizable}; +use vector_lib::internal_event::{ + ByteSize, BytesSent, CountByteSize, EventsSent, InternalEventHandle, Output, Protocol, +}; +use vector_lib::sink::StreamSink; +use vrl::value::{KeyString, ObjectMap, Value}; + +/// Single memory entry containing the value and TTL +#[derive(Clone, Eq, PartialEq, Hash, ShallowCopy)] +pub struct MemoryEntry { + value: String, + update_time: CopyValue, +} + +impl ByteSizeOf for MemoryEntry { + fn allocated_bytes(&self) -> usize { + self.value.size_of() + } +} + +impl MemoryEntry { + fn as_object_map(&self, now: Instant, total_ttl: u64, key: &str) -> Result { + let ttl = total_ttl.saturating_sub(now.duration_since(*self.update_time).as_secs()); + Ok(ObjectMap::from([ + ( + KeyString::from("key"), + Value::Bytes(Bytes::copy_from_slice(key.as_bytes())), + ), + ( + KeyString::from("value"), + serde_json::from_str::(&self.value) + .map_err(|_| "Failed to read value from memory!")?, + ), + ( + KeyString::from("ttl"), + Value::Integer(ttl.try_into().unwrap_or(i64::MAX)), + ), + ])) + } + + fn expired(&self, now: Instant, ttl: u64) -> bool { + now.duration_since(*self.update_time).as_secs() > ttl + } +} + +#[derive(Default)] +struct MemoryMetadata { + byte_size: u64, +} + +// Used to ensure that these 2 are locked together +struct MemoryWriter { + write_handle: evmap::WriteHandle, + metadata: MemoryMetadata, +} + +/// A struct that implements [vector_lib::enrichment::Table] to handle loading enrichment data from a memory structure. +pub struct Memory { + read_handle_factory: evmap::ReadHandleFactory, + read_handle: ThreadLocal>, + write_handle: Arc>, + config: MemoryConfig, +} + +impl Memory { + /// Creates a new [Memory] based on the provided config. + pub fn new(config: MemoryConfig) -> Self { + let (read_handle, write_handle) = evmap::new(); + Self { + config, + read_handle_factory: read_handle.factory(), + read_handle: ThreadLocal::new(), + write_handle: Arc::new(Mutex::new(MemoryWriter { + write_handle, + metadata: MemoryMetadata::default(), + })), + } + } + + fn get_read_handle(&self) -> &evmap::ReadHandle { + self.read_handle + .get_or(|| self.read_handle_factory.handle()) + } + + fn handle_value(&mut self, value: ObjectMap) { + let mut writer = self.write_handle.lock().expect("mutex poisoned"); + let now = Instant::now(); + + for (k, v) in value.into_iter() { + let new_entry_key = String::from(k); + let Ok(v) = serde_json::to_string(&v) else { + emit!(MemoryEnrichmentTableInsertFailed { + key: &new_entry_key, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); + continue; + }; + let new_entry = MemoryEntry { + value: v, + update_time: now.into(), + }; + let new_entry_size = new_entry_key.size_of() + new_entry.size_of(); + if let Some(max_byte_size) = self.config.max_byte_size { + if writer + .metadata + .byte_size + .saturating_add(new_entry_size as u64) + > max_byte_size + { + // Reject new entries + emit!(MemoryEnrichmentTableInsertFailed { + key: &new_entry_key, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); + continue; + } + } + writer.metadata.byte_size = writer + .metadata + .byte_size + .saturating_add(new_entry_size as u64); + emit!(MemoryEnrichmentTableInserted { + key: &new_entry_key, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); + writer.write_handle.update(new_entry_key, new_entry); + } + + if self.config.flush_interval.is_none() { + writer.write_handle.refresh(); + } + } + + fn scan_and_mark_for_deletion(&mut self) -> bool { + let mut writer = self.write_handle.lock().expect("mutex poisoned"); + let now = Instant::now(); + + let mut needs_flush = false; + // Since evmap holds 2 separate maps for the data, we are free to directly remove + // elements via the writer, while we are iterating the reader + // Refresh will happen only after we manually invoke it after iteration + if let Some(reader) = self.get_read_handle().read() { + for (k, v) in reader.iter() { + if let Some(entry) = v.get_one() { + if entry.expired(now, self.config.ttl) { + // Byte size is not reduced at this point, because the actual deletion + // will only happen at refresh time + writer.write_handle.empty(k.clone()); + emit!(MemoryEnrichmentTableTtlExpired { + key: k, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); + needs_flush = true; + } + } + } + }; + + needs_flush + } + + fn scan(&mut self) { + let needs_flush = self.scan_and_mark_for_deletion(); + if needs_flush { + self.flush(); + } + } + + fn flush(&mut self) { + let mut writer = self.write_handle.lock().expect("mutex poisoned"); + + writer.write_handle.refresh(); + if let Some(reader) = self.get_read_handle().read() { + let mut byte_size = 0; + for (k, v) in reader.iter() { + byte_size += k.size_of() + v.get_one().size_of(); + } + writer.metadata.byte_size = byte_size as u64; + emit!(MemoryEnrichmentTableFlushed { + new_objects_count: reader.len(), + new_byte_size: byte_size + }); + } + } +} + +impl Clone for Memory { + fn clone(&self) -> Self { + Self { + read_handle_factory: self.read_handle_factory.clone(), + read_handle: ThreadLocal::new(), + write_handle: Arc::clone(&self.write_handle), + config: self.config.clone(), + } + } +} + +impl Table for Memory { + fn find_table_row<'a>( + &self, + case: Case, + condition: &'a [Condition<'a>], + select: Option<&'a [String]>, + index: Option, + ) -> Result { + let mut rows = self.find_table_rows(case, condition, select, index)?; + + match rows.pop() { + Some(row) if rows.is_empty() => Ok(row), + Some(_) => Err("More than 1 row found".to_string()), + None => Err("Key not found".to_string()), + } + } + + fn find_table_rows<'a>( + &self, + _case: Case, + condition: &'a [Condition<'a>], + _select: Option<&'a [String]>, + _index: Option, + ) -> Result, String> { + match condition.first() { + Some(_) if condition.len() > 1 => Err("Only one condition is allowed".to_string()), + Some(Condition::Equals { value, .. }) => { + let key = value.to_string_lossy(); + match self.get_read_handle().get_one(key.as_ref()) { + Some(row) => { + emit!(MemoryEnrichmentTableRead { + key: &key, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); + row.as_object_map(Instant::now(), self.config.ttl, &key) + .map(|r| vec![r]) + } + None => { + emit!(MemoryEnrichmentTableReadFailed { + key: &key, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); + Ok(Default::default()) + } + } + } + Some(_) => Err("Only equality condition is allowed".to_string()), + None => Err("Key condition must be specified".to_string()), + } + } + + fn add_index(&mut self, _case: Case, fields: &[&str]) -> Result { + match fields.len() { + 0 => Err("Key field is required".to_string()), + 1 => Ok(IndexHandle(0)), + _ => Err("Only one field is allowed".to_string()), + } + } + + /// Returns a list of the field names that are in each index + fn index_fields(&self) -> Vec<(Case, Vec)> { + Vec::new() + } + + /// Doesn't need reload, data is written directly + fn needs_reload(&self) -> bool { + false + } +} + +impl std::fmt::Debug for Memory { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Memory {} row(s)", self.get_read_handle().len()) + } +} + +#[async_trait] +impl StreamSink for Memory { + async fn run(mut self: Box, mut input: BoxStream<'_, Event>) -> Result<(), ()> { + let events_sent = register!(EventsSent::from(Output(None))); + let bytes_sent = register!(BytesSent::from(Protocol("memory_enrichment_table".into(),))); + let mut flush_interval = IntervalStream::new(interval( + self.config + .flush_interval + .map(Duration::from_secs) + .unwrap_or(Duration::MAX), + )); + let mut scan_interval = IntervalStream::new(interval(Duration::from_secs( + self.config.scan_interval.into(), + ))); + + loop { + tokio::select! { + event = input.next() => { + let mut event = if let Some(event) = event { + event + } else { + break; + }; + let event_byte_size = event.estimated_json_encoded_size_of(); + + let finalizers = event.take_finalizers(); + + // Panic: This sink only accepts Logs, so this should never panic + let log = event.into_log(); + + if let (Value::Object(map), _) = log.into_parts() { + self.handle_value(map) + }; + + finalizers.update_status(EventStatus::Delivered); + events_sent.emit(CountByteSize(1, event_byte_size)); + bytes_sent.emit(ByteSize(event_byte_size.get())); + } + + Some(_) = flush_interval.next() => { + self.flush(); + } + + Some(_) = scan_interval.next() => { + self.scan(); + } + } + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use futures::future::ready; + use futures_util::stream; + use std::time::Duration; + + use vector_lib::sink::VectorSink; + + use super::*; + use crate::{ + event::{Event, LogEvent}, + test_util::components::{run_and_assert_sink_compliance, SINK_TAGS}, + }; + + fn build_memory_config(modfn: impl Fn(&mut MemoryConfig)) -> MemoryConfig { + let mut config = MemoryConfig::default(); + modfn(&mut config); + config + } + + #[test] + fn finds_row() { + let mut memory = Memory::new(Default::default()); + memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))])); + + let condition = Condition::Equals { + field: "key", + value: Value::from("test_key"), + }; + + assert_eq!( + Ok(ObjectMap::from([ + ("key".into(), Value::from("test_key")), + ("ttl".into(), Value::from(memory.config.ttl)), + ("value".into(), Value::from(5)), + ])), + memory.find_table_row(Case::Sensitive, &[condition], None, None) + ); + } + + #[test] + fn calculates_ttl() { + let ttl = 100; + let secs_to_subtract = 10; + let memory = Memory::new(build_memory_config(|c| c.ttl = ttl)); + { + let mut handle = memory.write_handle.lock().unwrap(); + handle.write_handle.update( + "test_key".to_string(), + MemoryEntry { + value: "5".to_string(), + update_time: (Instant::now() - Duration::from_secs(secs_to_subtract)).into(), + }, + ); + handle.write_handle.refresh(); + } + + let condition = Condition::Equals { + field: "key", + value: Value::from("test_key"), + }; + + assert_eq!( + Ok(ObjectMap::from([ + ("key".into(), Value::from("test_key")), + ("ttl".into(), Value::from(ttl - secs_to_subtract)), + ("value".into(), Value::from(5)), + ])), + memory.find_table_row(Case::Sensitive, &[condition], None, None) + ); + } + + #[test] + fn removes_expired_records_on_scan_interval() { + let ttl = 100; + let mut memory = Memory::new(build_memory_config(|c| { + c.ttl = ttl; + })); + { + let mut handle = memory.write_handle.lock().unwrap(); + handle.write_handle.update( + "test_key".to_string(), + MemoryEntry { + value: "5".to_string(), + update_time: (Instant::now() - Duration::from_secs(ttl + 10)).into(), + }, + ); + handle.write_handle.refresh(); + } + + // Finds the value before scan + let condition = Condition::Equals { + field: "key", + value: Value::from("test_key"), + }; + assert_eq!( + Ok(ObjectMap::from([ + ("key".into(), Value::from("test_key")), + ("ttl".into(), Value::from(0)), + ("value".into(), Value::from(5)), + ])), + memory.find_table_row(Case::Sensitive, &[condition.clone()], None, None) + ); + + // Force scan + memory.scan(); + + // The value is not present anymore + assert!(memory + .find_table_rows(Case::Sensitive, &[condition], None, None) + .unwrap() + .pop() + .is_none()); + } + + #[test] + fn does_not_show_values_before_flush_interval() { + let ttl = 100; + let mut memory = Memory::new(build_memory_config(|c| { + c.ttl = ttl; + c.flush_interval = Some(10); + })); + memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))])); + + let condition = Condition::Equals { + field: "key", + value: Value::from("test_key"), + }; + + assert!(memory + .find_table_rows(Case::Sensitive, &[condition], None, None) + .unwrap() + .pop() + .is_none()); + } + + #[test] + fn updates_ttl_on_value_replacement() { + let ttl = 100; + let mut memory = Memory::new(build_memory_config(|c| c.ttl = ttl)); + { + let mut handle = memory.write_handle.lock().unwrap(); + handle.write_handle.update( + "test_key".to_string(), + MemoryEntry { + value: "5".to_string(), + update_time: (Instant::now() - Duration::from_secs(ttl / 2)).into(), + }, + ); + handle.write_handle.refresh(); + } + let condition = Condition::Equals { + field: "key", + value: Value::from("test_key"), + }; + + assert_eq!( + Ok(ObjectMap::from([ + ("key".into(), Value::from("test_key")), + ("ttl".into(), Value::from(ttl / 2)), + ("value".into(), Value::from(5)), + ])), + memory.find_table_row(Case::Sensitive, &[condition.clone()], None, None) + ); + + memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))])); + + assert_eq!( + Ok(ObjectMap::from([ + ("key".into(), Value::from("test_key")), + ("ttl".into(), Value::from(ttl)), + ("value".into(), Value::from(5)), + ])), + memory.find_table_row(Case::Sensitive, &[condition], None, None) + ); + } + + #[test] + fn ignores_all_values_over_byte_size_limit() { + let mut memory = Memory::new(build_memory_config(|c| { + c.max_byte_size = Some(1); + })); + memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))])); + + let condition = Condition::Equals { + field: "key", + value: Value::from("test_key"), + }; + + assert!(memory + .find_table_rows(Case::Sensitive, &[condition], None, None) + .unwrap() + .pop() + .is_none()); + } + + #[test] + fn ignores_values_when_byte_size_limit_is_reached() { + let ttl = 100; + let mut memory = Memory::new(build_memory_config(|c| { + c.ttl = ttl; + c.max_byte_size = Some(150); + })); + memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))])); + memory.handle_value(ObjectMap::from([("rejected_key".into(), Value::from(5))])); + + assert_eq!( + Ok(ObjectMap::from([ + ("key".into(), Value::from("test_key")), + ("ttl".into(), Value::from(ttl)), + ("value".into(), Value::from(5)), + ])), + memory.find_table_row( + Case::Sensitive, + &[Condition::Equals { + field: "key", + value: Value::from("test_key") + }], + None, + None + ) + ); + + assert!(memory + .find_table_rows( + Case::Sensitive, + &[Condition::Equals { + field: "key", + value: Value::from("rejected_key") + }], + None, + None + ) + .unwrap() + .pop() + .is_none()); + } + + #[test] + fn missing_key() { + let memory = Memory::new(Default::default()); + + let condition = Condition::Equals { + field: "key", + value: Value::from("test_key"), + }; + + assert!(memory + .find_table_rows(Case::Sensitive, &[condition], None, None) + .unwrap() + .pop() + .is_none()); + } + + #[tokio::test] + async fn sink_spec_compliance() { + let event = Event::Log(LogEvent::from(ObjectMap::from([( + "test_key".into(), + Value::from(5), + )]))); + + let memory = Memory::new(Default::default()); + + run_and_assert_sink_compliance( + VectorSink::from_event_streamsink(memory), + stream::once(ready(event)), + &SINK_TAGS, + ) + .await; + } +} diff --git a/src/enrichment_tables/mod.rs b/src/enrichment_tables/mod.rs index 97a93b0059022..75b128e50b360 100644 --- a/src/enrichment_tables/mod.rs +++ b/src/enrichment_tables/mod.rs @@ -1,4 +1,5 @@ //! Functionality to handle enrichment tables. +use crate::sinks::prelude::SinkConfig; use enum_dispatch::enum_dispatch; use vector_lib::configurable::{configurable_component, NamedComponent}; pub use vector_lib::enrichment::{Condition, IndexHandle, Table}; @@ -7,6 +8,9 @@ use crate::config::{EnrichmentTableConfig, GlobalOptions}; pub mod file; +#[cfg(feature = "enrichment-tables-memory")] +pub mod memory; + #[cfg(feature = "enrichment-tables-geoip")] pub mod geoip; @@ -22,6 +26,11 @@ pub enum EnrichmentTables { /// Exposes data from a static file as an enrichment table. File(file::FileConfig), + /// Exposes data from a memory cache as an enrichment table. The cache can be written to using + /// a sink. + #[cfg(feature = "enrichment-tables-memory")] + Memory(memory::MemoryConfig), + /// Exposes data from a [MaxMind][maxmind] [GeoIP2][geoip2] database as an enrichment table. /// /// [maxmind]: https://www.maxmind.com/ @@ -41,6 +50,8 @@ impl NamedComponent for EnrichmentTables { fn get_component_name(&self) -> &'static str { match self { Self::File(config) => config.get_component_name(), + #[cfg(feature = "enrichment-tables-memory")] + Self::Memory(config) => config.get_component_name(), #[cfg(feature = "enrichment-tables-geoip")] Self::Geoip(config) => config.get_component_name(), #[cfg(feature = "enrichment-tables-mmdb")] diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 8602e5cce6c13..061f4ac3f018f 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -155,7 +155,7 @@ impl<'a> Builder<'a> { let mut enrichment_tables = HashMap::new(); // Build enrichment tables - 'tables: for (name, table) in self.config.enrichment_tables.iter() { + 'tables: for (name, table_outer) in self.config.enrichment_tables.iter() { let table_name = name.to_string(); if ENRICHMENT_TABLES.needs_reload(&table_name) { let indexes = if !self.diff.enrichment_tables.is_added(name) { @@ -166,7 +166,7 @@ impl<'a> Builder<'a> { None }; - let mut table = match table.inner.build(&self.config.global).await { + let mut table = match table_outer.inner.build(&self.config.global).await { Ok(table) => table, Err(error) => { self.errors @@ -506,10 +506,22 @@ impl<'a> Builder<'a> { } async fn build_sinks(&mut self, enrichment_tables: &vector_lib::enrichment::TableRegistry) { + let table_sinks = self + .config + .enrichment_tables + .iter() + .filter_map(|(key, table)| table.as_sink().map(|s| (key, s))) + .collect::>(); for (key, sink) in self .config .sinks() .filter(|(key, _)| self.diff.sinks.contains_new(key)) + .chain( + table_sinks + .iter() + .map(|(key, sink)| (*key, sink)) + .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)), + ) { debug!(component = %key, "Building new sink."); diff --git a/src/topology/running.rs b/src/topology/running.rs index 796661b7b4e0c..4ff91c510fe8c 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -15,7 +15,10 @@ use super::{ BuiltBuffer, TaskHandle, }; use crate::{ - config::{ComponentKey, Config, ConfigDiff, HealthcheckOptions, Inputs, OutputId, Resource}, + config::{ + ComponentKey, Config, ConfigDiff, EnrichmentTableOuter, HealthcheckOptions, Inputs, + OutputId, Resource, + }, event::EventArray, extra_context::ExtraContext, shutdown::SourceShutdownCoordinator, @@ -419,7 +422,25 @@ impl RunningTopology { let remove_sink = diff .sinks .removed_and_changed() - .map(|key| (key, self.config.sink(key).unwrap().resources(key))); + .map(|key| { + ( + key, + self.config + .sink(key) + .map(|s| s.resources(key)) + .unwrap_or_default(), + ) + }) + .chain( + diff.enrichment_tables + .removed_and_changed() + .filter_map(|key| { + self.config + .enrichment_table(key) + .and_then(|t| t.as_sink()) + .map(|s| (key, s.resources(key))) + }), + ); let add_source = diff .sources .changed_and_added() @@ -427,7 +448,25 @@ impl RunningTopology { let add_sink = diff .sinks .changed_and_added() - .map(|key| (key, new_config.sink(key).unwrap().resources(key))); + .map(|key| { + ( + key, + new_config + .sink(key) + .map(|s| s.resources(key)) + .unwrap_or_default(), + ) + }) + .chain( + diff.enrichment_tables + .changed_and_added() + .filter_map(|key| { + self.config + .enrichment_table(key) + .and_then(|t| t.as_sink()) + .map(|s| (key, s.resources(key))) + }), + ); let conflicts = Resource::conflicts( remove_sink.map(|(key, value)| ((true, key), value)).chain( add_sink @@ -450,7 +489,17 @@ impl RunningTopology { .to_change .iter() .filter(|&key| { - self.config.sink(key).unwrap().buffer == new_config.sink(key).unwrap().buffer + self.config.sink(key).map(|s| s.buffer.clone()).or_else(|| { + self.config + .enrichment_table(key) + .and_then(EnrichmentTableOuter::as_sink) + .map(|s| s.buffer) + }) == new_config.sink(key).map(|s| s.buffer.clone()).or_else(|| { + self.config + .enrichment_table(key) + .and_then(EnrichmentTableOuter::as_sink) + .map(|s| s.buffer) + }) }) .cloned() .collect::>(); @@ -463,7 +512,18 @@ impl RunningTopology { .collect::>(); // First, we remove any inputs to removed sinks so they can naturally shut down. - for key in &diff.sinks.to_remove { + let removed_sinks = diff + .sinks + .to_remove + .iter() + .chain(diff.enrichment_tables.to_remove.iter().filter(|key| { + self.config + .enrichment_table(key) + .and_then(EnrichmentTableOuter::as_sink) + .is_some() + })) + .collect::>(); + for key in &removed_sinks { debug!(component = %key, "Removing sink."); self.remove_inputs(key, diff, new_config).await; } @@ -472,7 +532,18 @@ impl RunningTopology { // they can naturally shutdown and allow us to recover their buffers if possible. let mut buffer_tx = HashMap::new(); - for key in &diff.sinks.to_change { + let sinks_to_change = diff + .sinks + .to_change + .iter() + .chain(diff.enrichment_tables.to_change.iter().filter(|key| { + self.config + .enrichment_table(key) + .and_then(EnrichmentTableOuter::as_sink) + .is_some() + })) + .collect::>(); + for key in &sinks_to_change { debug!(component = %key, "Changing sink."); if reuse_buffers.contains(key) { self.detach_triggers @@ -491,7 +562,7 @@ impl RunningTopology { // basically a no-op since we're reusing the same buffer) than it is to pass around // info about which sinks are having their buffers reused and treat them differently // at other stages. - buffer_tx.insert(key.clone(), self.inputs.get(key).unwrap().clone()); + buffer_tx.insert((*key).clone(), self.inputs.get(key).unwrap().clone()); } self.remove_inputs(key, diff, new_config).await; } @@ -502,7 +573,7 @@ impl RunningTopology { // // If a sink we're removing isn't tying up any resource that a changed/added sink depends // on, we don't bother waiting for it to shutdown. - for key in &diff.sinks.to_remove { + for key in &removed_sinks { let previous = self.tasks.remove(key).unwrap(); if wait_for_sinks.contains(key) { debug!(message = "Waiting for sink to shutdown.", %key); @@ -513,7 +584,7 @@ impl RunningTopology { } let mut buffers = HashMap::::new(); - for key in &diff.sinks.to_change { + for key in &sinks_to_change { if wait_for_sinks.contains(key) { let previous = self.tasks.remove(key).unwrap(); debug!(message = "Waiting for sink to shutdown.", %key); @@ -533,7 +604,7 @@ impl RunningTopology { _ => unreachable!(), }; - buffers.insert(key.clone(), (tx, Arc::new(Mutex::new(Some(rx))))); + buffers.insert((*key).clone(), (tx, Arc::new(Mutex::new(Some(rx))))); } } } @@ -567,6 +638,17 @@ impl RunningTopology { self.inputs_tap_metadata.remove(key); } + let removed_sinks = diff.enrichment_tables.to_remove.iter().filter(|key| { + self.config + .enrichment_table(key) + .and_then(EnrichmentTableOuter::as_sink) + .is_some() + }); + for key in removed_sinks { + // Sinks only have inputs + self.inputs_tap_metadata.remove(key); + } + for key in diff.sources.changed_and_added() { if let Some(task) = new_pieces.tasks.get(key) { self.outputs_tap_metadata @@ -613,6 +695,15 @@ impl RunningTopology { debug!(component = %key, "Connecting inputs for sink."); self.setup_inputs(key, diff, new_pieces).await; } + let added_changed_tables: Vec<&ComponentKey> = diff + .enrichment_tables + .changed_and_added() + .filter(|k| new_pieces.tasks.contains_key(k)) + .collect(); + for key in added_changed_tables { + debug!(component = %key, "Connecting inputs for enrichment table sink."); + self.setup_inputs(key, diff, new_pieces).await; + } // We do a final pass here to reconnect unchanged components. // @@ -847,6 +938,30 @@ impl RunningTopology { trace!(message = "Spawning new sink.", key = %key); self.spawn_sink(key, &mut new_pieces); } + + let changed_tables: Vec<&ComponentKey> = diff + .enrichment_tables + .to_change + .iter() + .filter(|k| new_pieces.tasks.contains_key(k)) + .collect(); + + let added_tables: Vec<&ComponentKey> = diff + .enrichment_tables + .to_add + .iter() + .filter(|k| new_pieces.tasks.contains_key(k)) + .collect(); + + for key in changed_tables { + debug!(message = "Spawning changed enrichment table sink.", key = %key); + self.spawn_sink(key, &mut new_pieces); + } + + for key in added_tables { + debug!(message = "Spawning enrichment table new sink.", key = %key); + self.spawn_sink(key, &mut new_pieces); + } } fn spawn_sink(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) { diff --git a/website/content/en/docs/reference/configuration/_index.md b/website/content/en/docs/reference/configuration/_index.md index acd0a3dcdccaa..b6effbcd91986 100644 --- a/website/content/en/docs/reference/configuration/_index.md +++ b/website/content/en/docs/reference/configuration/_index.md @@ -435,6 +435,80 @@ sinks: inputs: ["app*", "system_logs"] ``` +### Enrichment tables + +#### Memory enrichment table + +Memory enrichment table has to be used as a sink to feed it data, which can then be queried like any +other enrichment table. The data has to conform to a specific format - the memory table will only +accept [VRL objects](/docs/reference/vrl/expressions/#object), where each key-value pair will be +stored as a separate entry in the table, associating the value with the key in the table. Value here +can be any VRL type. + +```yaml +enrichment_tables: + memory_table: + type: memory + ttl: 60 + flush_interval: 5 + inputs: ["cache_generator"] + +sources: + demo_logs_test: + type: "demo_logs" + format: "json" + +transforms: + demo_logs_processor: + type: "remap" + inputs: ["demo_logs_test"] + source: | + . = parse_json!(.message) + user_id = get!(., path: ["user-identifier"]) + + # Look for existing value in the table, using "user-identifier" as key + existing, err = get_enrichment_table_record("memory_table", { "key": user_id }) + + if err == null { + # Value found, just use the cached value + # In this case existing looks like this { "key": user_id, "value": {}, "ttl": 50 } + # Where value is the value we cached, ttl is the time left before this value is removed from + # the cache and key is the key we queried the table with + . = existing.value + .source = "cache" + } else { + # Do some processing, because we don't have this value in the table + .referer = parse_url!(.referer) + .referer.host = encode_punycode!(.referer.host) + .source = "transform" + } + + cache_generator: + type: "remap" + inputs: ["demo_logs_processor"] + source: | + existing, err = get_enrichment_table_record("memory_table", { "key": get!(., path: ["user-identifier"]) }) + if err != null { + # We don't have this key cached, so we need to prepare it for the table + data = . + # Since the memory enrichment table takes in all key value pairs it receives and stores them + # We want to produce an object that has the value of "user-identifier" as its key and + # rest of the object as its value + . = set!(value: {}, path: [get!(data, path: ["user-identifier"])], data: data) + } else { + . = {} + } + +# We can observe that after some time that some events have "source" set to "cache" +sinks: + console: + inputs: ["demo_logs_processor"] + target: "stdout" + type: "console" + encoding: + codec: "json" +``` + ## Sections {{< sections >}} diff --git a/website/cue/reference/configuration.cue b/website/cue/reference/configuration.cue index 277447950e181..fe632399145dc 100644 --- a/website/cue/reference/configuration.cue +++ b/website/cue/reference/configuration.cue @@ -99,6 +99,7 @@ configuration: { * [CSV](\(urls.csv)) files * [MaxMind](\(urls.maxmind)) databases + * In-memory storage For the lookup in the enrichment tables to be as performant as possible, the data is indexed according to the fields that are used in the search. Note that indices can only be created for fields for which an @@ -116,9 +117,10 @@ configuration: { required: true type: string: { enum: { - "file": "Enrich data from a CSV file." - "geoip": "Enrich data from a [GeoIp](\(urls.maxmind_geoip2)) [MaxMind](\(urls.maxmind)) database." - "mmdb": "Enrich data from any [MaxMind](\(urls.maxmind)) database." + "file": "Enrich data from a CSV file." + "geoip": "Enrich data from a [GeoIp](\(urls.maxmind_geoip2)) [MaxMind](\(urls.maxmind)) database." + "mmdb": "Enrich data from any [MaxMind](\(urls.maxmind)) database." + "memory": "Enrich data from memory, which can be populated by using the table as a sink." } } } @@ -277,6 +279,93 @@ configuration: { } } } + type: object: options: { + memory: { + required: true + description: """ + Configuration options for in-memory enrichment table. + + This enrichment table only supports lookup with key field. + + To write data into this table, you have to define inputs to use this the table as a sink. + They are expected to produce objects, where each key-value pair is stored as a separate + record in the table. [Read more on how to use this component](\(urls.vector_enrichment_memory_how_it_works)). + """ + type: object: options: { + inputs: base.components.sinks.configuration.inputs + ttl: { + description: """ + TTL (time-to-live in seconds) is used to limit the lifetime of data stored in the cache. + When TTL expires, data behind a specific key in the cache is removed. + TTL is reset when the key is replaced. + """ + required: false + type: uint: { + default: 600 + examples: [3600, 21600] + } + } + scan_interval: { + description: """ + The scan interval used to look for expired records. This is provided + as an optimization to ensure that TTL is updated, but without doing + too many cache scans. + """ + required: false + type: uint: { + default: 30 + examples: [600, 60] + } + } + flush_interval: { + description: """ + The interval used for making writes visible in the table. + Longer intervals might get better performance, + but there is a longer delay before the data is visible in the table. + Since every TTL scan makes its changes visible, only use this value + if it is shorter than the `scan_interval`. + + By default, all writes are made visible immediately. + """ + required: false + type: uint: { + default: 0 + examples: [15, 30] + } + } + max_byte_size: { + description: """ + Maximum size of the table in bytes. All insertions that make this table bigger than the maximum size are rejected. + + By default, there is no size limit. + """ + required: false + type: uint: { + default: 0 + examples: [64000, 1000000000] + } + } + internal_metrics: { + description: """ + Configuration of internal metrics for enrichment memory table. + """ + required: false + type: object: options: { + include_key_tag: { + description: """ + Determines whether to include the key tag on internal metrics. + + This is useful for distinguishing between different keys while monitoring. However, the tag's + cardinality is unbounded. + """ + required: false + type: bool: default: false + } + } + } + } + } + } } schema: { common: false diff --git a/website/cue/reference/urls.cue b/website/cue/reference/urls.cue index 04e27fa6b0020..e6d9edc9310e3 100644 --- a/website/cue/reference/urls.cue +++ b/website/cue/reference/urls.cue @@ -560,6 +560,8 @@ urls: { vector_download: "/releases/latest/download/" vector_download_nightly: "/releases/nightly/download/" vector_enriching_transforms: "/components/?functions%5B%5D=enrich" + vector_enrichment_memory: "/docs/reference/configuration/global-options/#enrichment_tables.memory" + vector_enrichment_memory_how_it_works: "/docs/reference/configuration/#memory-enrichment-table" vector_file_source: "/docs/reference/configuration/sources/file/" vector_exec_source: "/docs/reference/configuration/sources/exec" vector_file_source: "/docs/reference/configuration/sources/file/"