diff --git a/cdviz-collector/Cargo.toml b/cdviz-collector/Cargo.toml index 137ed86..c0374f0 100644 --- a/cdviz-collector/Cargo.toml +++ b/cdviz-collector/Cargo.toml @@ -69,6 +69,7 @@ time = "0.3" tokio = { version = "1", features = ["full"] } tracing = "0.1" tracing-opentelemetry-instrumentation-sdk = { version = "0.24" } +vrl = { version = "0.20", optional = true } [dev-dependencies] assert2 = "0.3" @@ -117,14 +118,21 @@ source_opendal = [ "dep:handlebars_misc_helpers", "dep:opendal", "transformer_hbs", + "transformer_vrl", ] #! ### Transformers transformer_hbs = ["dep:handlebars", "dep:handlebars_misc_helpers"] +transformer_vrl = ["dep:vrl"] #! ### Tools tool_ui = ["dep:cliclack", "dep:console", "dep:similar"] -tool_transform = ["source_opendal", "tool_ui", "transformer_hbs"] +tool_transform = [ + "source_opendal", + "tool_ui", + "transformer_hbs", + "transformer_vrl", +] [target.'cfg(all(target_env = "musl", target_pointer_width = "64"))'.dependencies.jemallocator] version = "0.5" diff --git a/cdviz-collector/deny.toml b/cdviz-collector/deny.toml index 2579353..e3631a1 100644 --- a/cdviz-collector/deny.toml +++ b/cdviz-collector/deny.toml @@ -89,6 +89,7 @@ ignore = [ # See https://spdx.org/licenses/ for list of possible licenses # [possible values: any SPDX 3.11 short identifier (+ optional exception)]. allow = [ + "0BSD", "Apache-2.0", "Apache-2.0 WITH LLVM-exception", "BSD-2-Clause", @@ -100,7 +101,7 @@ allow = [ "MPL-2.0", "OpenSSL", "Unicode-3.0", - "Unicode-DFS-2016", + # "Unicode-DFS-2016", "Unlicense", ] # The confidence threshold for detecting a license from license text. diff --git a/cdviz-collector/src/errors.rs b/cdviz-collector/src/errors.rs index 168f444..ac0d501 100644 --- a/cdviz-collector/src/errors.rs +++ b/cdviz-collector/src/errors.rs @@ -35,7 +35,8 @@ pub(crate) enum Error { HandlebarsRender(handlebars::RenderError), #[cfg(feature = "transformer_hbs")] HandlebarsTemplate(handlebars::TemplateError), - #[cfg(feature = "source_opendal")] + #[cfg(feature = "transformer_vrl")] + VrlExpression(vrl::compiler::ExpressionError), Csv(csv::Error), BusSend(tokio::sync::broadcast::error::SendError), BusRecv(tokio::sync::broadcast::error::RecvError), diff --git a/cdviz-collector/src/sources/transformers/mod.rs b/cdviz-collector/src/sources/transformers/mod.rs index 8e66c0e..be8c183 100644 --- a/cdviz-collector/src/sources/transformers/mod.rs +++ b/cdviz-collector/src/sources/transformers/mod.rs @@ -1,5 +1,7 @@ #[cfg(feature = "transformer_hbs")] mod hbs; +#[cfg(feature = "transformer_vrl")] +mod vrl; use std::collections::HashMap; @@ -23,8 +25,9 @@ pub(crate) enum Config { #[cfg(feature = "transformer_hbs")] #[serde(alias = "hbs")] Hbs { template: String }, - // #[serde(alias = "vrl")] - // Vrl(String), + #[cfg(feature = "transformer_vrl")] + #[serde(alias = "vrl")] + Vrl { template: String }, } impl Config { @@ -35,6 +38,8 @@ impl Config { Config::DiscardAll => Box::new(discard_all::Processor::new()), #[cfg(feature = "transformer_hbs")] Config::Hbs { template } => Box::new(hbs::Processor::new(template, next)?), + #[cfg(feature = "transformer_vrl")] + Config::Vrl { template } => Box::new(vrl::Processor::new(template, next)?), }; Ok(out) } diff --git a/cdviz-collector/src/sources/transformers/vrl.rs b/cdviz-collector/src/sources/transformers/vrl.rs new file mode 100644 index 0000000..bb8160f --- /dev/null +++ b/cdviz-collector/src/sources/transformers/vrl.rs @@ -0,0 +1,90 @@ +use crate::errors::{Error, Result}; +use crate::pipes::Pipe; +use crate::sources::{EventSource, EventSourcePipe}; +use vrl::compiler::{Program, TargetValue}; +use vrl::core::Value; +use vrl::prelude::state::RuntimeState; +use vrl::prelude::{Context, TimeZone}; +use vrl::value::Secrets; + +pub(crate) struct Processor { + next: EventSourcePipe, + renderer: Program, +} + +impl Processor { + pub(crate) fn new(template: &str, next: EventSourcePipe) -> Result { + // Use all of the std library functions + let fns = vrl::stdlib::all(); + // Compile the program (and panic if it's invalid) + //TODO check result of compilation, log the error, warning, etc. + let src = if template.is_empty() { + // empty fallback to identity + "." + } else { + template + }; + match vrl::compiler::compile(src, &fns) { + Err(err) => { + tracing::error!("VRL compilation error: {:?}", err); + Err(Error::from("VRL compilation error")) + } + Ok(res) => Ok(Self { next, renderer: res.program }), + } + } +} + +impl Pipe for Processor { + type Input = EventSource; + //TODO optimize EventSource to avoid serialization/deserialization via json to convert to/from Value + //TODO build a microbenchmark to compare the performance of converting to/from Value + fn send(&mut self, input: Self::Input) -> Result<()> { + // This is the target that can be accessed / modified in the VRL program. + // You can use any custom type that implements `Target`, but `TargetValue` is also provided for convenience. + + let mut target = TargetValue { + // the value starts as just an object with a single field "x" set to 1 + value: serde_json::from_value(serde_json::to_value(input)?)?, + // the metadata is empty + metadata: Value::Object(std::collections::BTreeMap::new()), + // and there are no secrets associated with the target + secrets: Secrets::default(), + }; + + // The current state of the runtime (i.e. local variables) + let mut state = RuntimeState::default(); + + let timezone = TimeZone::default(); + + // A context bundles all the info necessary for the runtime to resolve a value. + let mut ctx = Context::new(&mut target, &mut state, &timezone); + + // This executes the VRL program, making any modifications to the target, and returning a result. + let res = self.renderer.resolve(&mut ctx)?; + + let output: EventSource = serde_json::from_value(serde_json::to_value(res)?)?; + self.next.send(output) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::pipes::collect_to_vec; + use pretty_assertions::assert_eq; + + #[test] + fn test_empty_template() { + let collector = collect_to_vec::Collector::::new(); + let mut processor = Processor::new("", Box::new(collector.create_pipe())).unwrap(); + let input = EventSource { + metadata: serde_json::json!({"foo": "bar"}), + header: std::collections::HashMap::new(), + body: serde_json::json!({"a": 1, "b": 2}), + }; + processor.send(input.clone()).unwrap(); + let output = collector.try_into_iter().unwrap().next().unwrap(); + //dbg!(&output); + assert_eq!(output, input); + } +}