Skip to content

Commit

Permalink
feat: add the transformer [Vector Remap Language (VRL)](https://vecto…
Browse files Browse the repository at this point in the history
  • Loading branch information
davidB committed Dec 9, 2024
1 parent a703cbe commit 684c5a0
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 5 deletions.
10 changes: 9 additions & 1 deletion cdviz-collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ time = "0.3"
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
tracing-opentelemetry-instrumentation-sdk = { version = "0.24" }
vrl = { version = "0.20", optional = true }

[dev-dependencies]
assert2 = "0.3"
Expand Down Expand Up @@ -117,14 +118,21 @@ source_opendal = [
"dep:handlebars_misc_helpers",
"dep:opendal",
"transformer_hbs",
"transformer_vrl",
]

#! ### Transformers
transformer_hbs = ["dep:handlebars", "dep:handlebars_misc_helpers"]
transformer_vrl = ["dep:vrl"]

#! ### Tools
tool_ui = ["dep:cliclack", "dep:console", "dep:similar"]
tool_transform = ["source_opendal", "tool_ui", "transformer_hbs"]
tool_transform = [
"source_opendal",
"tool_ui",
"transformer_hbs",
"transformer_vrl",
]

[target.'cfg(all(target_env = "musl", target_pointer_width = "64"))'.dependencies.jemallocator]
version = "0.5"
Expand Down
3 changes: 2 additions & 1 deletion cdviz-collector/deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ ignore = [
# See https://spdx.org/licenses/ for list of possible licenses
# [possible values: any SPDX 3.11 short identifier (+ optional exception)].
allow = [
"0BSD",
"Apache-2.0",
"Apache-2.0 WITH LLVM-exception",
"BSD-2-Clause",
Expand All @@ -100,7 +101,7 @@ allow = [
"MPL-2.0",
"OpenSSL",
"Unicode-3.0",
"Unicode-DFS-2016",
# "Unicode-DFS-2016",
"Unlicense",
]
# The confidence threshold for detecting a license from license text.
Expand Down
3 changes: 2 additions & 1 deletion cdviz-collector/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ pub(crate) enum Error {
HandlebarsRender(handlebars::RenderError),
#[cfg(feature = "transformer_hbs")]
HandlebarsTemplate(handlebars::TemplateError),
#[cfg(feature = "source_opendal")]
#[cfg(feature = "transformer_vrl")]
VrlExpression(vrl::compiler::ExpressionError),
Csv(csv::Error),
BusSend(tokio::sync::broadcast::error::SendError<Message>),
BusRecv(tokio::sync::broadcast::error::RecvError),
Expand Down
9 changes: 7 additions & 2 deletions cdviz-collector/src/sources/transformers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#[cfg(feature = "transformer_hbs")]
mod hbs;
#[cfg(feature = "transformer_vrl")]
mod vrl;

use std::collections::HashMap;

Expand All @@ -23,8 +25,9 @@ pub(crate) enum Config {
#[cfg(feature = "transformer_hbs")]
#[serde(alias = "hbs")]
Hbs { template: String },
// #[serde(alias = "vrl")]
// Vrl(String),
#[cfg(feature = "transformer_vrl")]
#[serde(alias = "vrl")]
Vrl { template: String },
}

impl Config {
Expand All @@ -35,6 +38,8 @@ impl Config {
Config::DiscardAll => Box::new(discard_all::Processor::new()),
#[cfg(feature = "transformer_hbs")]
Config::Hbs { template } => Box::new(hbs::Processor::new(template, next)?),
#[cfg(feature = "transformer_vrl")]
Config::Vrl { template } => Box::new(vrl::Processor::new(template, next)?),
};
Ok(out)
}
Expand Down
90 changes: 90 additions & 0 deletions cdviz-collector/src/sources/transformers/vrl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use crate::errors::{Error, Result};
use crate::pipes::Pipe;
use crate::sources::{EventSource, EventSourcePipe};
use vrl::compiler::{Program, TargetValue};
use vrl::core::Value;
use vrl::prelude::state::RuntimeState;
use vrl::prelude::{Context, TimeZone};
use vrl::value::Secrets;

pub(crate) struct Processor {
next: EventSourcePipe,
renderer: Program,
}

impl Processor {
pub(crate) fn new(template: &str, next: EventSourcePipe) -> Result<Self> {
// Use all of the std library functions
let fns = vrl::stdlib::all();
// Compile the program (and panic if it's invalid)
//TODO check result of compilation, log the error, warning, etc.
let src = if template.is_empty() {
// empty fallback to identity
"."
} else {
template
};
match vrl::compiler::compile(src, &fns) {
Err(err) => {
tracing::error!("VRL compilation error: {:?}", err);
Err(Error::from("VRL compilation error"))
}
Ok(res) => Ok(Self { next, renderer: res.program }),
}
}
}

impl Pipe for Processor {
type Input = EventSource;
//TODO optimize EventSource to avoid serialization/deserialization via json to convert to/from Value
//TODO build a microbenchmark to compare the performance of converting to/from Value
fn send(&mut self, input: Self::Input) -> Result<()> {
// This is the target that can be accessed / modified in the VRL program.
// You can use any custom type that implements `Target`, but `TargetValue` is also provided for convenience.

let mut target = TargetValue {
// the value starts as just an object with a single field "x" set to 1
value: serde_json::from_value(serde_json::to_value(input)?)?,
// the metadata is empty
metadata: Value::Object(std::collections::BTreeMap::new()),
// and there are no secrets associated with the target
secrets: Secrets::default(),
};

// The current state of the runtime (i.e. local variables)
let mut state = RuntimeState::default();

let timezone = TimeZone::default();

// A context bundles all the info necessary for the runtime to resolve a value.
let mut ctx = Context::new(&mut target, &mut state, &timezone);

// This executes the VRL program, making any modifications to the target, and returning a result.
let res = self.renderer.resolve(&mut ctx)?;

let output: EventSource = serde_json::from_value(serde_json::to_value(res)?)?;
self.next.send(output)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::pipes::collect_to_vec;
use pretty_assertions::assert_eq;

#[test]
fn test_empty_template() {
let collector = collect_to_vec::Collector::<EventSource>::new();
let mut processor = Processor::new("", Box::new(collector.create_pipe())).unwrap();
let input = EventSource {
metadata: serde_json::json!({"foo": "bar"}),
header: std::collections::HashMap::new(),
body: serde_json::json!({"a": 1, "b": 2}),
};
processor.send(input.clone()).unwrap();
let output = collector.try_into_iter().unwrap().next().unwrap();
//dbg!(&output);
assert_eq!(output, input);
}
}

0 comments on commit 684c5a0

Please sign in to comment.