Skip to content

Commit

Permalink
🚧 (cdviz-collector) enhance transformer/executor
Browse files Browse the repository at this point in the history
  • Loading branch information
davidB committed Jul 8, 2024
1 parent 0175e51 commit 5326412
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 59 deletions.
3 changes: 2 additions & 1 deletion cdviz-collector/src/sources/opendal/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//TODO add persistance for state (time window to not reprocess same file after restart)

mod filter;
mod texecutors;
mod transformers;

use self::filter::{globset_from, Filter};
Expand All @@ -22,7 +23,7 @@ use tokio::time::sleep;
use tracing::instrument;

#[serde_as]
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Deserialize, Serialize)]
pub(crate) struct Config {
#[serde(with = "humantime_serde")]
polling_interval: Duration,
Expand Down
58 changes: 58 additions & 0 deletions cdviz-collector/src/sources/opendal/texecutors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use crate::errors::Result;
use enum_dispatch::enum_dispatch;
use handlebars::Handlebars;
use serde::{Deserialize, Serialize};
use serde_json::Value;

#[derive(Debug, Deserialize, Serialize)]
#[serde(tag = "format", content = "content")]
pub(crate) enum TExecutorConfig {
#[serde(alias = "hbs")]
Hbs(String),
// #[serde(alias = "vrl")]
// Vrl(String),
}

impl TryFrom<TExecutorConfig> for TExecutorEnum {
type Error = crate::errors::Error;

fn try_from(value: TExecutorConfig) -> Result<Self> {
let out = match value {
TExecutorConfig::Hbs(template) => Hbs::new(&template)?.into(),
};
Ok(out)
}
}

#[enum_dispatch]
#[derive(Debug)]
pub(crate) enum TExecutorEnum {
Hbs,
}

#[enum_dispatch(TExecutorEnum)]
pub(crate) trait TExecutor {
//TODO return a common Iterator or stream type (better than Vec)
fn execute(&self, json: Value) -> Result<Vec<u8>>;
}

#[derive(Debug)]
pub(crate) struct Hbs {
hbs: Handlebars<'static>,
}

impl Hbs {
pub(crate) fn new(template: &str) -> Result<Self> {
let mut hbs = Handlebars::new();
hbs.set_dev_mode(false);
hbs.set_strict_mode(true);
hbs.register_template_string("tpl", template)?;
Ok(Self { hbs })
}
}

impl TExecutor for Hbs {
fn execute(&self, data: Value) -> Result<Vec<u8>> {
Ok(self.hbs.render("tpl", &data)?.into_bytes())
}
}
104 changes: 47 additions & 57 deletions cdviz-collector/src/sources/opendal/transformers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,34 @@ use std::collections::HashMap;
use crate::errors::Result;
use bytes::Buf;
use enum_dispatch::enum_dispatch;
use handlebars::Handlebars;
use opendal::{Entry, Operator};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};

#[derive(Debug, Deserialize, Serialize, Default)]
#[serde(tag = "type")]
use super::texecutors::{TExecutor, TExecutorConfig, TExecutorEnum};

#[derive(Debug, Deserialize, Serialize)]
#[serde(tag = "extractor")]
pub(crate) enum Config {
#[serde(alias = "json_content_as_is")]
#[default]
JsonContentAsIs,
#[serde(alias = "metadata_only_via_template")]
MetadataOnlyViaTemplate { template: String },
#[serde(alias = "json_via_template")]
JsonViaTemplate { template: String },
#[serde(alias = "csv_row_via_template")]
CsvRowViaTemplate { template: String },
#[serde(alias = "json")]
Json { transform: Option<TExecutorConfig> },
#[serde(alias = "metadata")]
Metadata { transform: TExecutorConfig },
#[serde(alias = "csv_row")]
CsvRow { transform: TExecutorConfig },
}

impl TryFrom<Config> for TransformerEnum {
type Error = crate::errors::Error;

fn try_from(value: Config) -> Result<Self> {
let out = match value {
Config::JsonContentAsIs => JsonContentAsIs {}.into(),
Config::MetadataOnlyViaTemplate { template } => {
MetadataOnlyViaTemplate::new(&template)?.into()
}
Config::JsonViaTemplate { template } => JsonViaTemplate::new(&template)?.into(),
Config::CsvRowViaTemplate { template } => CsvRowViaTemplate::new(&template)?.into(),
Config::Json { transform } => match transform {
None => JsonContentAsIs {}.into(),
Some(te) => JsonExtractor::new(te.try_into()?).into(),
},
Config::Metadata { transform } => MetadataExtractor::new(transform.try_into()?).into(),
Config::CsvRow { transform } => CsvRowExtractor::new(transform.try_into()?).into(),
};
Ok(out)
}
Expand All @@ -42,9 +40,9 @@ impl TryFrom<Config> for TransformerEnum {
#[derive(Debug)]
pub(crate) enum TransformerEnum {
JsonContentAsIs,
MetadataOnlyViaTemplate,
JsonViaTemplate,
CsvRowViaTemplate,
JsonExtractor,
MetadataExtractor,
CsvRowExtractor,
}

#[enum_dispatch(TransformerEnum)]
Expand All @@ -64,41 +62,39 @@ impl Transformer for JsonContentAsIs {
}

#[derive(Debug)]
pub(crate) struct MetadataOnlyViaTemplate {
hbs: Handlebars<'static>,
pub(crate) struct MetadataExtractor {
texecutor: TExecutorEnum,
}

impl MetadataOnlyViaTemplate {
fn new(template: &str) -> Result<Self> {
let hbs = new_renderer(template)?;
Ok(Self { hbs })
impl MetadataExtractor {
fn new(texecutor: TExecutorEnum) -> Self {
Self { texecutor }
}
}

impl Transformer for MetadataOnlyViaTemplate {
impl Transformer for MetadataExtractor {
async fn transform(&self, op: &Operator, entry: &Entry) -> Result<Vec<Vec<u8>>> {
let metadata = extract_metadata(op, entry);
let data = json!({
"metadata" : metadata,
});
let output = self.hbs.render("tpl", &data)?;
Ok(vec![output.into_bytes()])
let output = self.texecutor.execute(data)?;
Ok(vec![output])
}
}

#[derive(Debug)]
pub(crate) struct JsonViaTemplate {
hbs: Handlebars<'static>,
pub(crate) struct JsonExtractor {
texecutor: TExecutorEnum,
}

impl JsonViaTemplate {
fn new(template: &str) -> Result<Self> {
let hbs = new_renderer(template)?;
Ok(Self { hbs })
impl JsonExtractor {
fn new(texecutor: TExecutorEnum) -> Self {
Self { texecutor }
}
}

impl Transformer for JsonViaTemplate {
impl Transformer for JsonExtractor {
async fn transform(&self, op: &Operator, entry: &Entry) -> Result<Vec<Vec<u8>>> {
let bytes = op.read(entry.path()).await?;
let metadata = extract_metadata(op, entry);
Expand All @@ -107,24 +103,23 @@ impl Transformer for JsonViaTemplate {
"metadata" : metadata,
"content": content,
});
let output = self.hbs.render("tpl", &data)?;
Ok(vec![output.into_bytes()])
let output = self.texecutor.execute(data)?;
Ok(vec![output])
}
}

#[derive(Debug)]
pub(crate) struct CsvRowViaTemplate {
hbs: Handlebars<'static>,
pub(crate) struct CsvRowExtractor {
texecutor: TExecutorEnum,
}

impl CsvRowViaTemplate {
fn new(template: &str) -> Result<Self> {
let hbs = new_renderer(template)?;
Ok(Self { hbs })
impl CsvRowExtractor {
fn new(texecutor: TExecutorEnum) -> Self {
Self { texecutor }
}
}

impl Transformer for CsvRowViaTemplate {
impl Transformer for CsvRowExtractor {
async fn transform(&self, op: &Operator, entry: &Entry) -> Result<Vec<Vec<u8>>> {
use csv::Reader;

Expand All @@ -143,21 +138,13 @@ impl Transformer for CsvRowViaTemplate {
"metadata" : metadata.clone(),
"content": content,
});
let output = self.hbs.render("tpl", &data)?;
out.push(output.into_bytes());
let output = self.texecutor.execute(data)?;
out.push(output);
}
Ok(out)
}
}

fn new_renderer(template: &str) -> Result<Handlebars<'static>> {
let mut hbs = Handlebars::new();
hbs.set_dev_mode(false);
hbs.set_strict_mode(true);
hbs.register_template_string("tpl", template)?;
Ok(hbs)
}

fn extract_metadata(op: &Operator, entry: &Entry) -> Value {
json!({
"name": entry.name(),
Expand All @@ -171,6 +158,8 @@ fn extract_metadata(op: &Operator, entry: &Entry) -> Value {
mod tests {
use std::path::Path;

use crate::sources::opendal::texecutors::Hbs;

use super::*;
use assert2::{check, let_assert};
use chrono::prelude::*;
Expand Down Expand Up @@ -212,7 +201,8 @@ mod tests {
#[tokio::test]
async fn csv_row_via_template_works() {
let (op, entry) = provide_op_entry("cdevents.").await;
let sut = CsvRowViaTemplate::new(r#"{{content.env}}"#).unwrap();
let sut =
CsvRowExtractor::new(TExecutorEnum::from(Hbs::new(r#"{{content.env}}"#).unwrap()));
let_assert!(Ok(actual) = sut.transform(&op, &entry).await);
check!(actual.len() == 3);
check!(actual[0] == "dev".as_bytes());
Expand Down
32 changes: 31 additions & 1 deletion demos/example_01/cdviz-collector.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,34 @@ polling_interval = "10s"
parameters = { root = "./cdevents" }
recursive = false
path_patterns = ["*.json"]
transformer = { type = "json_content_as_is" }
transformer = { extractor = "json" }

[sources.cdevents_csv]
type = "opendal"
kind = "fs"
polling_interval = "10s"
parameters = { root = "./cdevents" }
recursive = false
path_patterns = ["cdevents.csv"]
transformer = { extractor = "csv_row", transform = { format = "hbs", content = """
{
"context": {
"version": "0.4.0-draft",
"id": "{{ content.uuid }}",
"source": "/event/source/123",
"type": "dev.cdevents.service.deployed.0.1.1",
"timestamp": "{{ content.timestamp }}"
},
"subject": {
"id": "{{ content.id }}",
"source": "/event/source/123",
"type": "service",
"content": {
"environment": {
"id": "{{ content.env }}"
},
"artifactId": "{{ content.artifact_id }}"
}
}
}
""" } }

0 comments on commit 5326412

Please sign in to comment.