diff --git a/cdviz-collector/src/sources/opendal/mod.rs b/cdviz-collector/src/sources/opendal/mod.rs index e7a3a38..52ee2ca 100644 --- a/cdviz-collector/src/sources/opendal/mod.rs +++ b/cdviz-collector/src/sources/opendal/mod.rs @@ -1,6 +1,7 @@ //TODO add persistance for state (time window to not reprocess same file after restart) mod filter; +mod texecutors; mod transformers; use self::filter::{globset_from, Filter}; @@ -22,7 +23,7 @@ use tokio::time::sleep; use tracing::instrument; #[serde_as] -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize)] pub(crate) struct Config { #[serde(with = "humantime_serde")] polling_interval: Duration, diff --git a/cdviz-collector/src/sources/opendal/texecutors.rs b/cdviz-collector/src/sources/opendal/texecutors.rs new file mode 100644 index 0000000..b336a83 --- /dev/null +++ b/cdviz-collector/src/sources/opendal/texecutors.rs @@ -0,0 +1,58 @@ +use crate::errors::Result; +use enum_dispatch::enum_dispatch; +use handlebars::Handlebars; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +#[derive(Debug, Deserialize, Serialize)] +#[serde(tag = "format", content = "content")] +pub(crate) enum TExecutorConfig { + #[serde(alias = "hbs")] + Hbs(String), + // #[serde(alias = "vrl")] + // Vrl(String), +} + +impl TryFrom for TExecutorEnum { + type Error = crate::errors::Error; + + fn try_from(value: TExecutorConfig) -> Result { + let out = match value { + TExecutorConfig::Hbs(template) => Hbs::new(&template)?.into(), + }; + Ok(out) + } +} + +#[enum_dispatch] +#[derive(Debug)] +pub(crate) enum TExecutorEnum { + Hbs, +} + +#[enum_dispatch(TExecutorEnum)] +pub(crate) trait TExecutor { + //TODO return a common Iterator or stream type (better than Vec) + fn execute(&self, json: Value) -> Result>; +} + +#[derive(Debug)] +pub(crate) struct Hbs { + hbs: Handlebars<'static>, +} + +impl Hbs { + pub(crate) fn new(template: &str) -> Result { + let mut hbs = Handlebars::new(); + hbs.set_dev_mode(false); + hbs.set_strict_mode(true); + hbs.register_template_string("tpl", template)?; + Ok(Self { hbs }) + } +} + +impl TExecutor for Hbs { + fn execute(&self, data: Value) -> Result> { + Ok(self.hbs.render("tpl", &data)?.into_bytes()) + } +} diff --git a/cdviz-collector/src/sources/opendal/transformers.rs b/cdviz-collector/src/sources/opendal/transformers.rs index f73a10d..7b754c8 100644 --- a/cdviz-collector/src/sources/opendal/transformers.rs +++ b/cdviz-collector/src/sources/opendal/transformers.rs @@ -3,23 +3,21 @@ use std::collections::HashMap; use crate::errors::Result; use bytes::Buf; use enum_dispatch::enum_dispatch; -use handlebars::Handlebars; use opendal::{Entry, Operator}; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; -#[derive(Debug, Deserialize, Serialize, Default)] -#[serde(tag = "type")] +use super::texecutors::{TExecutor, TExecutorConfig, TExecutorEnum}; + +#[derive(Debug, Deserialize, Serialize)] +#[serde(tag = "extractor")] pub(crate) enum Config { - #[serde(alias = "json_content_as_is")] - #[default] - JsonContentAsIs, - #[serde(alias = "metadata_only_via_template")] - MetadataOnlyViaTemplate { template: String }, - #[serde(alias = "json_via_template")] - JsonViaTemplate { template: String }, - #[serde(alias = "csv_row_via_template")] - CsvRowViaTemplate { template: String }, + #[serde(alias = "json")] + Json { transform: Option }, + #[serde(alias = "metadata")] + Metadata { transform: TExecutorConfig }, + #[serde(alias = "csv_row")] + CsvRow { transform: TExecutorConfig }, } impl TryFrom for TransformerEnum { @@ -27,12 +25,12 @@ impl TryFrom for TransformerEnum { fn try_from(value: Config) -> Result { let out = match value { - Config::JsonContentAsIs => JsonContentAsIs {}.into(), - Config::MetadataOnlyViaTemplate { template } => { - MetadataOnlyViaTemplate::new(&template)?.into() - } - Config::JsonViaTemplate { template } => JsonViaTemplate::new(&template)?.into(), - Config::CsvRowViaTemplate { template } => CsvRowViaTemplate::new(&template)?.into(), + Config::Json { transform } => match transform { + None => JsonContentAsIs {}.into(), + Some(te) => JsonExtractor::new(te.try_into()?).into(), + }, + Config::Metadata { transform } => MetadataExtractor::new(transform.try_into()?).into(), + Config::CsvRow { transform } => CsvRowExtractor::new(transform.try_into()?).into(), }; Ok(out) } @@ -42,9 +40,9 @@ impl TryFrom for TransformerEnum { #[derive(Debug)] pub(crate) enum TransformerEnum { JsonContentAsIs, - MetadataOnlyViaTemplate, - JsonViaTemplate, - CsvRowViaTemplate, + JsonExtractor, + MetadataExtractor, + CsvRowExtractor, } #[enum_dispatch(TransformerEnum)] @@ -64,41 +62,39 @@ impl Transformer for JsonContentAsIs { } #[derive(Debug)] -pub(crate) struct MetadataOnlyViaTemplate { - hbs: Handlebars<'static>, +pub(crate) struct MetadataExtractor { + texecutor: TExecutorEnum, } -impl MetadataOnlyViaTemplate { - fn new(template: &str) -> Result { - let hbs = new_renderer(template)?; - Ok(Self { hbs }) +impl MetadataExtractor { + fn new(texecutor: TExecutorEnum) -> Self { + Self { texecutor } } } -impl Transformer for MetadataOnlyViaTemplate { +impl Transformer for MetadataExtractor { async fn transform(&self, op: &Operator, entry: &Entry) -> Result>> { let metadata = extract_metadata(op, entry); let data = json!({ "metadata" : metadata, }); - let output = self.hbs.render("tpl", &data)?; - Ok(vec![output.into_bytes()]) + let output = self.texecutor.execute(data)?; + Ok(vec![output]) } } #[derive(Debug)] -pub(crate) struct JsonViaTemplate { - hbs: Handlebars<'static>, +pub(crate) struct JsonExtractor { + texecutor: TExecutorEnum, } -impl JsonViaTemplate { - fn new(template: &str) -> Result { - let hbs = new_renderer(template)?; - Ok(Self { hbs }) +impl JsonExtractor { + fn new(texecutor: TExecutorEnum) -> Self { + Self { texecutor } } } -impl Transformer for JsonViaTemplate { +impl Transformer for JsonExtractor { async fn transform(&self, op: &Operator, entry: &Entry) -> Result>> { let bytes = op.read(entry.path()).await?; let metadata = extract_metadata(op, entry); @@ -107,24 +103,23 @@ impl Transformer for JsonViaTemplate { "metadata" : metadata, "content": content, }); - let output = self.hbs.render("tpl", &data)?; - Ok(vec![output.into_bytes()]) + let output = self.texecutor.execute(data)?; + Ok(vec![output]) } } #[derive(Debug)] -pub(crate) struct CsvRowViaTemplate { - hbs: Handlebars<'static>, +pub(crate) struct CsvRowExtractor { + texecutor: TExecutorEnum, } -impl CsvRowViaTemplate { - fn new(template: &str) -> Result { - let hbs = new_renderer(template)?; - Ok(Self { hbs }) +impl CsvRowExtractor { + fn new(texecutor: TExecutorEnum) -> Self { + Self { texecutor } } } -impl Transformer for CsvRowViaTemplate { +impl Transformer for CsvRowExtractor { async fn transform(&self, op: &Operator, entry: &Entry) -> Result>> { use csv::Reader; @@ -143,21 +138,13 @@ impl Transformer for CsvRowViaTemplate { "metadata" : metadata.clone(), "content": content, }); - let output = self.hbs.render("tpl", &data)?; - out.push(output.into_bytes()); + let output = self.texecutor.execute(data)?; + out.push(output); } Ok(out) } } -fn new_renderer(template: &str) -> Result> { - let mut hbs = Handlebars::new(); - hbs.set_dev_mode(false); - hbs.set_strict_mode(true); - hbs.register_template_string("tpl", template)?; - Ok(hbs) -} - fn extract_metadata(op: &Operator, entry: &Entry) -> Value { json!({ "name": entry.name(), @@ -171,6 +158,8 @@ fn extract_metadata(op: &Operator, entry: &Entry) -> Value { mod tests { use std::path::Path; + use crate::sources::opendal::texecutors::Hbs; + use super::*; use assert2::{check, let_assert}; use chrono::prelude::*; @@ -212,7 +201,8 @@ mod tests { #[tokio::test] async fn csv_row_via_template_works() { let (op, entry) = provide_op_entry("cdevents.").await; - let sut = CsvRowViaTemplate::new(r#"{{content.env}}"#).unwrap(); + let sut = + CsvRowExtractor::new(TExecutorEnum::from(Hbs::new(r#"{{content.env}}"#).unwrap())); let_assert!(Ok(actual) = sut.transform(&op, &entry).await); check!(actual.len() == 3); check!(actual[0] == "dev".as_bytes()); diff --git a/demos/example_01/cdviz-collector.toml b/demos/example_01/cdviz-collector.toml index e216994..94789d9 100644 --- a/demos/example_01/cdviz-collector.toml +++ b/demos/example_01/cdviz-collector.toml @@ -14,4 +14,34 @@ polling_interval = "10s" parameters = { root = "./cdevents" } recursive = false path_patterns = ["*.json"] -transformer = { type = "json_content_as_is" } +transformer = { extractor = "json" } + +[sources.cdevents_csv] +type = "opendal" +kind = "fs" +polling_interval = "10s" +parameters = { root = "./cdevents" } +recursive = false +path_patterns = ["cdevents.csv"] +transformer = { extractor = "csv_row", transform = { format = "hbs", content = """ +{ + "context": { + "version": "0.4.0-draft", + "id": "{{ content.uuid }}", + "source": "/event/source/123", + "type": "dev.cdevents.service.deployed.0.1.1", + "timestamp": "{{ content.timestamp }}" + }, + "subject": { + "id": "{{ content.id }}", + "source": "/event/source/123", + "type": "service", + "content": { + "environment": { + "id": "{{ content.env }}" + }, + "artifactId": "{{ content.artifact_id }}" + } + } +} +""" } }