Skip to content

Commit

Permalink
feat(cdviz-collector): allow to define transformers by reference (name)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidB committed Oct 28, 2024
1 parent e49484d commit 045b2b1
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 75 deletions.
15 changes: 8 additions & 7 deletions cdviz-collector/examples/assets/cdviz-collector.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ parameters = { root = "./opendal_fs" }

[sources.cdevents_local_csv]
enabled = true
transformer_refs = ["service_deployed"]

[sources.cdevents_local_csv.extractor]
type = "opendal"
Expand All @@ -27,7 +28,13 @@ recursive = false
path_patterns = ["cdevents.csv"]
parser = "csv_row"

[[sources.cdevents_local_csv.transformers]]
[sources.cdevents_webhook]
enabled = true
# type = "http"
# host = "0.0.0.0"
# port = 8080

[transformers.service_deployed]
type = "hbs"
template = """
{
Expand Down Expand Up @@ -55,9 +62,3 @@ template = """
}
}
"""

[sources.cdevents_webhook.extractor]
enabled = true
# type = "http"
# host = "0.0.0.0"
# port = 8080
15 changes: 13 additions & 2 deletions cdviz-collector/src/assets/cdviz-collector.base.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ pool_connections_max = 10

[sources.cdevents_local_json]
enabled = false
transformers = []

[sources.cdevents_local_json.extractor]
type = "opendal"
Expand All @@ -28,8 +27,20 @@ recursive = true
path_patterns = ["**/*.json"]
parser = "json"

[sources.cdevents_webhook.extractor]
[sources.cdevents_webhook]
enabled = false

[sources.cdevents_webhook.extractor]
type = "http"
host = "0.0.0.0"
port = 8080

[transformers.passthrough]
type = "passthrough"

[transformers.log]
type = "log"
target = "transformers::log"

[transformers.discard_all]
type = "discard_all"
79 changes: 79 additions & 0 deletions cdviz-collector/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use crate::{errors, sinks, sources};

use figment::{
providers::{Env, Format, Serialized, Toml},
Figment,
};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, path::PathBuf};

#[derive(Clone, Debug, Deserialize, Serialize, Default)]
pub(crate) struct Config {
pub(crate) sources: HashMap<String, sources::Config>,
pub(crate) sinks: HashMap<String, sinks::Config>,
// extractors: HashMap<String, sources::extractors::Config>,
pub(crate) transformers: HashMap<String, sources::transformers::Config>,
}

impl Config {
pub fn from_file(config_file: Option<PathBuf>) -> errors::Result<Self> {
if let Some(ref config_file) = config_file {
if !config_file.exists() {
return Err(errors::Error::ConfigNotFound {
path: config_file.to_string_lossy().to_string(),
});
}
}
let config_file_base = include_str!("assets/cdviz-collector.base.toml");

let mut figment = Figment::from(Serialized::defaults(Config::default()))
.merge(Toml::string(config_file_base));
if let Some(config_file) = config_file {
figment = figment.merge(Toml::file(config_file.as_path()));
}
let mut config: Config =
figment.merge(Env::prefixed("CDVIZ_COLLECTOR__").split("__")).extract()?;

// resolve transformers references
config.sources.iter_mut().try_for_each(|(_name, source_config)| {
source_config.resolve_transformers(&config.transformers)
})?;

Ok(config)
}
}

#[cfg(test)]
mod tests {
use super::*;
use figment::Jail;
use rstest::*;

#[rstest]
fn read_base_config_only() {
Jail::expect_with(|_jail| {
let config: Config = Config::from_file(None).unwrap();
assert!(!config.sinks.get("debug").unwrap().is_enabled());
Ok(())
});
}

#[rstest]
fn read_base_config_with_env_override() {
Jail::expect_with(|jail| {
jail.set_env("CDVIZ_COLLECTOR__SINKS__DEBUG__ENABLED", "true");
let config: Config = Config::from_file(None).unwrap();
assert!(config.sinks.get("debug").unwrap().is_enabled());
Ok(())
});
}

#[rstest]
fn read_samples_config(#[files("./**/cdviz-collector.toml")] path: PathBuf) {
Jail::expect_with(|_jail| {
assert!(path.exists());
let _config: Config = Config::from_file(Some(path)).unwrap();
Ok(())
});
}
}
2 changes: 2 additions & 0 deletions cdviz-collector/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub(crate) type Result<T> = std::result::Result<T, Error>;
pub(crate) enum Error {
#[error("config file not found: {path}")]
ConfigNotFound { path: String },
#[error("config of transformer not found: {0}")]
ConfigTransformerNotFound(String),
#[error("no source found (configured or started)")]
NoSource,
#[error("no sink found (configured or started)")]
Expand Down
67 changes: 3 additions & 64 deletions cdviz-collector/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
mod config;
mod errors;
mod pipes;
mod sinks;
mod sources;

use std::{collections::HashMap, path::PathBuf};
use std::path::PathBuf;

use cdevents_sdk::CDEvent;
use clap::Parser;
use clap_verbosity_flag::Verbosity;
use errors::{Error, Result};
use figment::{
providers::{Env, Format, Serialized, Toml},
Figment,
};
use futures::future::TryJoinAll;
use serde::{Deserialize, Serialize};
// use time::OffsetDateTime;
use tokio::sync::broadcast;

Expand All @@ -38,14 +34,6 @@ pub(crate) struct Cli {
verbose: clap_verbosity_flag::Verbosity,
}

#[derive(Clone, Debug, Deserialize, Serialize, Default)]
pub(crate) struct Config {
sources: HashMap<String, sources::Config>,
sinks: HashMap<String, sinks::Config>,
// extractors: HashMap<String, sources::extractors::Config>,
// transformers: HashMap<String, sources::transformers::Config>,
}

type Sender<T> = tokio::sync::broadcast::Sender<T>;
type Receiver<T> = tokio::sync::broadcast::Receiver<T>;

Expand Down Expand Up @@ -78,25 +66,6 @@ fn init_log(verbose: Verbosity) -> Result<()> {
Ok(())
}

fn read_config(config_file: Option<PathBuf>) -> Result<Config> {
if let Some(ref config_file) = config_file {
if !config_file.exists() {
return Err(errors::Error::ConfigNotFound {
path: config_file.to_string_lossy().to_string(),
});
}
}
let config_file_base = include_str!("assets/cdviz-collector.base.toml");

let mut figment = Figment::from(Serialized::defaults(Config::default()))
.merge(Toml::string(config_file_base));
if let Some(config_file) = config_file {
figment = figment.merge(Toml::file(config_file.as_path()));
}
let config: Config = figment.merge(Env::prefixed("CDVIZ_COLLECTOR__").split("__")).extract()?;
Ok(config)
}

//TODO add garcefull shutdown
//TODO use logfmt
//TODO use verbosity to configure tracing & log, but allow override and finer control with RUST_LOG & CDVIZ_COLLECTOR_LOG (higher priority)
Expand All @@ -110,7 +79,7 @@ fn read_config(config_file: Option<PathBuf>) -> Result<Config> {
async fn main() -> Result<()> {
let cli = Cli::parse();
init_log(cli.verbose)?;
let config = read_config(cli.config)?;
let config = config::Config::from_file(cli.config)?;

if let Some(dir) = cli.directory {
std::env::set_current_dir(dir)?;
Expand Down Expand Up @@ -161,8 +130,6 @@ async fn main() -> Result<()> {
#[cfg(test)]
mod tests {
use super::*;
use figment::Jail;
use rstest::*;

impl proptest::arbitrary::Arbitrary for Message {
type Parameters = ();
Expand All @@ -173,32 +140,4 @@ mod tests {
(any::<CDEvent>()).prop_map(Message::from).boxed()
}
}

#[rstest]
fn read_base_config_only() {
Jail::expect_with(|_jail| {
let config: Config = read_config(None).unwrap();
assert!(!config.sinks.get("debug").unwrap().is_enabled());
Ok(())
});
}

#[rstest]
fn read_base_config_with_env_override() {
Jail::expect_with(|jail| {
jail.set_env("CDVIZ_COLLECTOR__SINKS__DEBUG__ENABLED", "true");
let config: Config = read_config(None).unwrap();
assert!(config.sinks.get("debug").unwrap().is_enabled());
Ok(())
});
}

#[rstest]
fn read_samples_config(#[files("./**/cdviz-collector.toml")] path: PathBuf) {
Jail::expect_with(|_jail| {
assert!(path.exists());
let _config: Config = read_config(Some(path)).unwrap();
Ok(())
});
}
}
22 changes: 21 additions & 1 deletion cdviz-collector/src/sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod opendal;
mod send_cdevents;
pub(crate) mod transformers;

use crate::errors::Result;
use crate::errors::{Error, Result};
use crate::pipes::Pipe;
use crate::{Message, Sender};
use async_trait::async_trait;
Expand All @@ -34,13 +34,33 @@ pub(crate) struct Config {
#[serde(default)]
extractor: extractors::Config,
#[serde(default)]
transformer_refs: Vec<String>,
#[serde(default)]
transformers: Vec<transformers::Config>,
}

impl Config {
pub fn is_enabled(&self) -> bool {
self.enabled
}

pub fn resolve_transformers(
&mut self,
configs: &HashMap<String, transformers::Config>,
) -> Result<()> {
let mut transformers = self
.transformer_refs
.iter()
.map(|name| {
configs
.get(name)
.cloned()
.ok_or_else(|| Error::ConfigTransformerNotFound(name.to_string()))
})
.collect::<Result<Vec<_>>>()?;
self.transformers.append(&mut transformers);
Ok(())
}
}

pub(crate) fn start(_name: String, config: Config, tx: Sender<Message>) -> JoinHandle<Result<()>> {
Expand Down
2 changes: 1 addition & 1 deletion cdviz-collector/src/sources/transformers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub(crate) enum Config {
Passthrough,
#[serde(alias = "log")]
Log(log::Config),
#[serde(alias = "discrad_all")]
#[serde(alias = "discard_all")]
DiscardAll,
#[serde(alias = "hbs")]
Hbs { template: String },
Expand Down

0 comments on commit 045b2b1

Please sign in to comment.