diff --git a/cdviz-collector/Cargo.toml b/cdviz-collector/Cargo.toml index bd4a0db..2a606f7 100644 --- a/cdviz-collector/Cargo.toml +++ b/cdviz-collector/Cargo.toml @@ -22,6 +22,7 @@ clap-verbosity-flag = "2.2.0" enum_dispatch = "0.3" figment = { version = "0.10", features = ["toml", "env"] } futures = "0.3" +globset = { version = "0.4", optional = true } humantime-serde = "1.1.1" init-tracing-opentelemetry = { version = "0.18", features = [ "otlp", @@ -67,7 +68,7 @@ tracing-subscriber = "0.3" default = ["source_http", "source_opendal", "sink_db"] sink_db = ["dep:sqlx"] source_http = ["dep:axum", "dep:axum-tracing-opentelemetry"] -source_opendal = ["dep:opendal"] +source_opendal = ["dep:opendal", "dep:globset"] [package.metadata.release] pre-release-commit-message = "🚀 (cargo-release) version {{version}}" diff --git a/cdviz-collector/cdviz-collector.toml b/cdviz-collector/cdviz-collector.toml index c4544f3..03f9a3a 100644 --- a/cdviz-collector/cdviz-collector.toml +++ b/cdviz-collector/cdviz-collector.toml @@ -12,6 +12,8 @@ type = "opendal" kind = "fs" polling_interval = "10s" parameters = { root = "../cdevents-spec/examples" } +recursive = true +path_patterns = ["**/*.json"] [sources.cdevents_webhook] type = "http" diff --git a/cdviz-collector/src/errors.rs b/cdviz-collector/src/errors.rs index 47c604c..b80ec99 100644 --- a/cdviz-collector/src/errors.rs +++ b/cdviz-collector/src/errors.rs @@ -26,6 +26,9 @@ pub(crate) enum Error { #[cfg(feature = "source_opendal")] #[error(transparent)] Opendal(#[from] opendal::Error), + #[cfg(feature = "source_opendal")] + #[error(transparent)] + GlobPattern(#[from] globset::Error), #[error(transparent)] BusSend(#[from] tokio::sync::broadcast::error::SendError), #[error(transparent)] diff --git a/cdviz-collector/src/sources/http.rs b/cdviz-collector/src/sources/http.rs index 641e252..0c04ae6 100644 --- a/cdviz-collector/src/sources/http.rs +++ b/cdviz-collector/src/sources/http.rs @@ -49,7 +49,7 @@ struct AppState { } impl Source for HttpSource { - async fn run(&self, tx: Sender) -> Result<()> { + async fn run(&mut self, tx: Sender) -> Result<()> { let app_state = AppState { tx }; let app = app().with_state(app_state); diff --git a/cdviz-collector/src/sources/mod.rs b/cdviz-collector/src/sources/mod.rs index 4b8fde1..bc8dc84 100644 --- a/cdviz-collector/src/sources/mod.rs +++ b/cdviz-collector/src/sources/mod.rs @@ -55,12 +55,12 @@ enum SourceEnum { #[enum_dispatch(SourceEnum)] trait Source { - async fn run(&self, tx: Sender) -> Result<()>; + async fn run(&mut self, tx: Sender) -> Result<()>; } pub(crate) fn start(_name: String, config: Config, tx: Sender) -> JoinHandle> { tokio::spawn(async move { - let source = SourceEnum::try_from(config)?; + let mut source = SourceEnum::try_from(config)?; source.run(tx).await?; Ok(()) }) diff --git a/cdviz-collector/src/sources/noop.rs b/cdviz-collector/src/sources/noop.rs index f24e8be..df43be9 100644 --- a/cdviz-collector/src/sources/noop.rs +++ b/cdviz-collector/src/sources/noop.rs @@ -21,7 +21,7 @@ impl TryFrom for NoopSource { pub(crate) struct NoopSource {} impl Source for NoopSource { - async fn run(&self, _tx: Sender) -> Result<()> { + async fn run(&mut self, _tx: Sender) -> Result<()> { loop { sleep(Duration::MAX).await; } diff --git a/cdviz-collector/src/sources/opendal.rs b/cdviz-collector/src/sources/opendal.rs deleted file mode 100644 index d8ca9aa..0000000 --- a/cdviz-collector/src/sources/opendal.rs +++ /dev/null @@ -1,121 +0,0 @@ -use crate::errors::{Error, Result}; -use crate::{Message, Sender}; -use cdevents_sdk::CDEvent; -use chrono::DateTime; -use chrono::Utc; -use futures::TryStreamExt; -use opendal::Entry; -use opendal::EntryMode; -use opendal::Metakey; -use opendal::Operator; -use opendal::Scheme; -use serde::Deserialize; -use serde::Serialize; -use serde_with::{serde_as, DisplayFromStr}; -use std::collections::HashMap; -use std::time::Duration; -use tokio::time::sleep; -use tracing::instrument; - -use super::Source; - -#[serde_as] -#[derive(Debug, Deserialize, Serialize)] -pub(crate) struct Config { - #[serde(with = "humantime_serde")] - polling_interval: Duration, - #[serde_as(as = "DisplayFromStr")] - kind: Scheme, - parameters: HashMap, -} - -impl TryFrom for OpendalSource { - type Error = crate::errors::Error; - - fn try_from(value: Config) -> Result { - let op: Operator = Operator::via_map(value.kind, value.parameters)?; - Ok(Self { - op, - polling_interval: value.polling_interval, - }) - } -} - -pub(crate) struct OpendalSource { - op: Operator, - polling_interval: Duration, -} - -// impl OpendalSource { -// pub(crate) fn from_local_path

(p: P) -> Result -// where -// P: AsRef + std::fmt::Debug, -// { -// let mut builder = Fs::default(); -// builder.root( -// p.as_ref() -// .to_str() -// .ok_or_else(|| Error::from(format!("failed to convert into str: {:?}", p)))?, -// ); -// let op: Operator = Operator::new(builder)?.finish(); - -// Ok(Self { -// op, -// polling_interval: Duration::from_secs(5), -// }) -// } -// } - -impl Source for OpendalSource { - async fn run(&self, tx: Sender) -> Result<()> { - let mut after = DateTime::::MIN_UTC; - loop { - let before = Utc::now(); - if let Err(err) = run_once(&tx, &self.op, (&after, &before)).await { - tracing::warn!(?err, after =? after, before =? before, scheme =? self.op.info().scheme(), root =? self.op.info().root(), "fail during scanning"); - } - after = before; - sleep(self.polling_interval).await; - } - } -} - -#[instrument] -pub(crate) async fn run_once( - tx: &Sender, - op: &Operator, - ts_beetween: (&DateTime, &DateTime), -) -> Result<()> { - let (after, before) = ts_beetween; - // TODO convert into arg of instrument - tracing::debug!(after =? after, before =? before, scheme =? op.info().scheme(), root =? op.info().root(), "scanning"); - let mut lister = op - .lister_with("") - // Make sure content-length and last-modified been fetched. - .metakey(Metakey::ContentLength | Metakey::LastModified) - .await?; - while let Some(entry) = lister.try_next().await? { - let meta = entry.metadata(); - if meta.mode() == EntryMode::FILE { - if let Some(last) = meta.last_modified() { - if &last > after && &last <= before && meta.content_length() > 0 { - if let Err(err) = process_entry(tx, op, &entry).await { - tracing::warn!(?err, path = entry.path(), "fail to process, skip") - } - } - } else { - tracing::warn!( - path = entry.path(), - "can not read last modified timestamp, skip" - ) - } - } - } - Ok(()) -} - -async fn process_entry(tx: &Sender, op: &Operator, entry: &Entry) -> Result { - let read = op.read(entry.path()).await?; - let cdevent: CDEvent = serde_json::from_slice::(&read)?; - tx.send(cdevent.into()).map_err(Error::from) -} diff --git a/cdviz-collector/src/sources/opendal/filter.rs b/cdviz-collector/src/sources/opendal/filter.rs new file mode 100644 index 0000000..5b89f7c --- /dev/null +++ b/cdviz-collector/src/sources/opendal/filter.rs @@ -0,0 +1,108 @@ +use crate::errors::Result; +use chrono::DateTime; +use chrono::Utc; +use globset::GlobSet; +use opendal::Entry; +use opendal::EntryMode; + +#[derive(Debug, Clone)] +pub(crate) struct Filter { + ts_after: DateTime, + ts_before: DateTime, + path_patterns: Option, +} + +impl Filter { + pub(crate) fn from_patterns(path_patterns: Option) -> Self { + Filter { + ts_after: DateTime::::MIN_UTC, + ts_before: Utc::now(), + path_patterns, + } + } + + pub(crate) fn accept(&self, entry: &Entry) -> bool { + let meta = entry.metadata(); + if meta.mode() == EntryMode::FILE { + if let Some(last) = meta.last_modified() { + last > self.ts_after + && last <= self.ts_before + && meta.content_length() > 0 + && is_match(&self.path_patterns, entry.path()) + } else { + tracing::warn!( + path = entry.path(), + "can not read last modified timestamp, skip" + ); + false + } + } else { + false + } + } + + pub(crate) fn jump_to_next_ts_window(&mut self) { + self.ts_after = self.ts_before; + self.ts_before = Utc::now(); + } +} + +#[inline] +fn is_match

(pattern: &Option, path: P) -> bool +where + P: AsRef, +{ + pattern.as_ref().map(|p| p.is_match(path)).unwrap_or(true) +} + +pub(crate) fn globset_from(patterns: &[String]) -> Result> { + if patterns.is_empty() { + Ok(None) + } else { + let mut builder = globset::GlobSetBuilder::new(); + for pattern in patterns { + let glob = globset::GlobBuilder::new(pattern.as_str()) + .literal_separator(true) + .build()?; + builder.add(glob); + } + Ok(Some(builder.build()?)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rstest::*; + + #[rstest] + #[case(vec![], "foo.json")] + #[case(vec!["*"], "foo.json")] + #[case(vec!["**"], "foo.json")] + #[case(vec!["*.json"], "foo.json")] + #[case(vec!["*.csv", "*.json"], "foo.json")] + #[case(vec!["**/*.json"], "foo.json")] + #[case(vec!["**/*.json"], "bar/foo.json")] + fn test_patterns_accept(#[case] patterns: Vec<&str>, #[case] path: &str) { + let patterns = patterns + .into_iter() + .map(String::from) + .collect::>(); + let globset = globset_from(&patterns).unwrap(); + assert!(is_match(&globset, path)); + } + + #[rstest] + #[case(vec!["*.json"], "foo.csv")] + #[case(vec!["*.json"], "foo.jsonl")] + #[case(vec!["*.json"], "bar/foo.json")] + #[case(vec!["*.json"], "/foo.json")] + fn test_patterns_reject(#[case] patterns: Vec<&str>, #[case] path: &str) { + let patterns = patterns + .into_iter() + .map(String::from) + .collect::>(); + let globset = globset_from(&patterns).unwrap(); + assert!(!is_match(&globset, path)); + } +} diff --git a/cdviz-collector/src/sources/opendal/mod.rs b/cdviz-collector/src/sources/opendal/mod.rs new file mode 100644 index 0000000..e466789 --- /dev/null +++ b/cdviz-collector/src/sources/opendal/mod.rs @@ -0,0 +1,100 @@ +mod filter; + +use crate::errors::{Error, Result}; +use crate::{Message, Sender}; +use cdevents_sdk::CDEvent; +use filter::{globset_from, Filter}; +use futures::TryStreamExt; +use opendal::Entry; +use opendal::Metakey; +use opendal::Operator; +use opendal::Scheme; +use serde::Deserialize; +use serde::Serialize; +use serde_with::{serde_as, DisplayFromStr}; +use std::collections::HashMap; +use std::time::Duration; +use tokio::time::sleep; +use tracing::instrument; + +use super::Source; + +//TODO add persistance for state (time window to not reprocess same file after restart) +//TODO add transformer: identity, csv+template -> bunch, jsonl + +#[serde_as] +#[derive(Debug, Deserialize, Serialize, Default)] +pub(crate) struct Config { + #[serde(with = "humantime_serde")] + polling_interval: Duration, + #[serde_as(as = "DisplayFromStr")] + kind: Scheme, + parameters: HashMap, + recursive: bool, + path_patterns: Vec, +} + +impl TryFrom for OpendalSource { + type Error = crate::errors::Error; + + fn try_from(value: Config) -> Result { + let op: Operator = Operator::via_map(value.kind, value.parameters)?; + let filter = Filter::from_patterns(globset_from(&value.path_patterns)?); + Ok(Self { + op, + polling_interval: value.polling_interval, + recursive: value.recursive, + filter, + }) + } +} + +pub(crate) struct OpendalSource { + op: Operator, + polling_interval: Duration, + recursive: bool, + filter: Filter, +} + +impl Source for OpendalSource { + async fn run(&mut self, tx: Sender) -> Result<()> { + loop { + if let Err(err) = run_once(&tx, &self.op, &self.filter, self.recursive).await { + tracing::warn!(?err, filter = ?self.filter, scheme =? self.op.info().scheme(), root =? self.op.info().root(), "fail during scanning"); + } + sleep(self.polling_interval).await; + self.filter.jump_to_next_ts_window(); + } + } +} + +#[instrument] +pub(crate) async fn run_once( + tx: &Sender, + op: &Operator, + filter: &Filter, + recursive: bool, +) -> Result<()> { + // TODO convert into arg of instrument + tracing::debug!(filter=? filter, scheme =? op.info().scheme(), root =? op.info().root(), "scanning"); + let mut lister = op + .lister_with("") + .recursive(recursive) + // Make sure content-length and last-modified been fetched. + .metakey(Metakey::ContentLength | Metakey::LastModified) + .await?; + while let Some(entry) = lister.try_next().await? { + if filter.accept(&entry) { + if let Err(err) = process_entry(tx, op, &entry).await { + tracing::warn!(?err, path = entry.path(), "fail to process, skip") + } + } + } + Ok(()) +} + +async fn process_entry(tx: &Sender, op: &Operator, entry: &Entry) -> Result { + let read = op.read(entry.path()).await?; + let cdevent: CDEvent = serde_json::from_slice::(&read)?; + tx.send(cdevent.into()).map_err(Error::from) +} diff --git a/demos/example_01/cdviz-collector.toml b/demos/example_01/cdviz-collector.toml index e232f99..10d3f97 100644 --- a/demos/example_01/cdviz-collector.toml +++ b/demos/example_01/cdviz-collector.toml @@ -12,3 +12,5 @@ type = "opendal" kind = "fs" polling_interval = "10s" parameters = { root = "./cdevents" } +recursive = false +path_patterns = ["*.json"]