Skip to content

Commit

Permalink
✨ (cdviz-collector) opendal source support path's pattern and recursive
Browse files Browse the repository at this point in the history
  • Loading branch information
davidB committed Apr 6, 2024
1 parent 0a9d879 commit 2796770
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 126 deletions.
3 changes: 2 additions & 1 deletion cdviz-collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ clap-verbosity-flag = "2.2.0"
enum_dispatch = "0.3"
figment = { version = "0.10", features = ["toml", "env"] }
futures = "0.3"
globset = { version = "0.4", optional = true }
humantime-serde = "1.1.1"
init-tracing-opentelemetry = { version = "0.18", features = [
"otlp",
Expand Down Expand Up @@ -67,7 +68,7 @@ tracing-subscriber = "0.3"
default = ["source_http", "source_opendal", "sink_db"]
sink_db = ["dep:sqlx"]
source_http = ["dep:axum", "dep:axum-tracing-opentelemetry"]
source_opendal = ["dep:opendal"]
source_opendal = ["dep:opendal", "dep:globset"]

[package.metadata.release]
pre-release-commit-message = "🚀 (cargo-release) version {{version}}"
Expand Down
2 changes: 2 additions & 0 deletions cdviz-collector/cdviz-collector.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ type = "opendal"
kind = "fs"
polling_interval = "10s"
parameters = { root = "../cdevents-spec/examples" }
recursive = true
path_patterns = ["**/*.json"]

[sources.cdevents_webhook]
type = "http"
Expand Down
3 changes: 3 additions & 0 deletions cdviz-collector/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub(crate) enum Error {
#[cfg(feature = "source_opendal")]
#[error(transparent)]
Opendal(#[from] opendal::Error),
#[cfg(feature = "source_opendal")]
#[error(transparent)]
GlobPattern(#[from] globset::Error),
#[error(transparent)]
BusSend(#[from] tokio::sync::broadcast::error::SendError<Message>),
#[error(transparent)]
Expand Down
2 changes: 1 addition & 1 deletion cdviz-collector/src/sources/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ struct AppState {
}

impl Source for HttpSource {
async fn run(&self, tx: Sender<Message>) -> Result<()> {
async fn run(&mut self, tx: Sender<Message>) -> Result<()> {
let app_state = AppState { tx };

let app = app().with_state(app_state);
Expand Down
4 changes: 2 additions & 2 deletions cdviz-collector/src/sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ enum SourceEnum {

#[enum_dispatch(SourceEnum)]
trait Source {
async fn run(&self, tx: Sender<Message>) -> Result<()>;
async fn run(&mut self, tx: Sender<Message>) -> Result<()>;
}

pub(crate) fn start(_name: String, config: Config, tx: Sender<Message>) -> JoinHandle<Result<()>> {
tokio::spawn(async move {
let source = SourceEnum::try_from(config)?;
let mut source = SourceEnum::try_from(config)?;
source.run(tx).await?;
Ok(())
})
Expand Down
2 changes: 1 addition & 1 deletion cdviz-collector/src/sources/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl TryFrom<Config> for NoopSource {
pub(crate) struct NoopSource {}

impl Source for NoopSource {
async fn run(&self, _tx: Sender<Message>) -> Result<()> {
async fn run(&mut self, _tx: Sender<Message>) -> Result<()> {
loop {
sleep(Duration::MAX).await;
}
Expand Down
121 changes: 0 additions & 121 deletions cdviz-collector/src/sources/opendal.rs

This file was deleted.

108 changes: 108 additions & 0 deletions cdviz-collector/src/sources/opendal/filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use crate::errors::Result;
use chrono::DateTime;
use chrono::Utc;
use globset::GlobSet;
use opendal::Entry;
use opendal::EntryMode;

#[derive(Debug, Clone)]
pub(crate) struct Filter {
ts_after: DateTime<Utc>,
ts_before: DateTime<Utc>,
path_patterns: Option<GlobSet>,
}

impl Filter {
pub(crate) fn from_patterns(path_patterns: Option<GlobSet>) -> Self {
Filter {
ts_after: DateTime::<Utc>::MIN_UTC,
ts_before: Utc::now(),
path_patterns,
}
}

pub(crate) fn accept(&self, entry: &Entry) -> bool {
let meta = entry.metadata();
if meta.mode() == EntryMode::FILE {
if let Some(last) = meta.last_modified() {
last > self.ts_after
&& last <= self.ts_before
&& meta.content_length() > 0
&& is_match(&self.path_patterns, entry.path())
} else {
tracing::warn!(
path = entry.path(),
"can not read last modified timestamp, skip"
);
false
}
} else {
false
}
}

pub(crate) fn jump_to_next_ts_window(&mut self) {
self.ts_after = self.ts_before;
self.ts_before = Utc::now();
}
}

#[inline]
fn is_match<P>(pattern: &Option<GlobSet>, path: P) -> bool
where
P: AsRef<std::path::Path>,
{
pattern.as_ref().map(|p| p.is_match(path)).unwrap_or(true)
}

pub(crate) fn globset_from(patterns: &[String]) -> Result<Option<GlobSet>> {
if patterns.is_empty() {
Ok(None)
} else {
let mut builder = globset::GlobSetBuilder::new();
for pattern in patterns {
let glob = globset::GlobBuilder::new(pattern.as_str())
.literal_separator(true)
.build()?;
builder.add(glob);
}
Ok(Some(builder.build()?))
}
}

#[cfg(test)]
mod tests {
use super::*;
use rstest::*;

#[rstest]
#[case(vec![], "foo.json")]
#[case(vec!["*"], "foo.json")]
#[case(vec!["**"], "foo.json")]
#[case(vec!["*.json"], "foo.json")]
#[case(vec!["*.csv", "*.json"], "foo.json")]
#[case(vec!["**/*.json"], "foo.json")]
#[case(vec!["**/*.json"], "bar/foo.json")]
fn test_patterns_accept(#[case] patterns: Vec<&str>, #[case] path: &str) {
let patterns = patterns
.into_iter()
.map(String::from)
.collect::<Vec<String>>();
let globset = globset_from(&patterns).unwrap();
assert!(is_match(&globset, path));
}

#[rstest]
#[case(vec!["*.json"], "foo.csv")]
#[case(vec!["*.json"], "foo.jsonl")]
#[case(vec!["*.json"], "bar/foo.json")]
#[case(vec!["*.json"], "/foo.json")]
fn test_patterns_reject(#[case] patterns: Vec<&str>, #[case] path: &str) {
let patterns = patterns
.into_iter()
.map(String::from)
.collect::<Vec<String>>();
let globset = globset_from(&patterns).unwrap();
assert!(!is_match(&globset, path));
}
}
Loading

0 comments on commit 2796770

Please sign in to comment.