Skip to content

Commit

Permalink
🚧 read cdevents json from an opendal compatible storage
Browse files Browse the repository at this point in the history
Signed-off-by: David Bernard <[email protected]>
  • Loading branch information
davidB committed Feb 8, 2024
1 parent ecb395f commit 8cae44d
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 89 deletions.
4 changes: 3 additions & 1 deletion cdviz-sensors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ publish.workspace = true

[dependencies]
clap = { workspace = true }
cdevents-sdk = { git = "https://github.com/cdevents/sdk-rust" }
chrono = "0.4"
enum_dispatch = "0.3"
init-tracing-opentelemetry = { workspace = true }
futures = "0.3"
notify = "6"
opendal = "0.45"
reqwest = "0.11"
reqwest-middleware = "0.2"
reqwest-tracing = "0.4"
Expand Down
24 changes: 22 additions & 2 deletions cdviz-sensors/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use cdevents_sdk::CDEvent;
use thiserror::Error;

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -8,14 +9,33 @@ pub enum Error {
NoWatcher,
#[error("no sink found (configured or started)")]
NoSink,
#[error(transparent)]
WatchDirectory(#[from] notify::Error),
// #[error(transparent)]
// WatchDirectory(#[from] notify::Error),
#[error(transparent)]
InitTracing(#[from] init_tracing_opentelemetry::Error),
#[error(transparent)]
Http(#[from] reqwest_middleware::Error),
#[error(transparent)]
Json(#[from] serde_json::Error),
#[error(transparent)]
Opendal(#[from] opendal::Error),
// #[error(transparent)]
// Other(#[from] anyhow::Error),
#[error(transparent)]
MspcSendError(#[from] tokio::sync::mpsc::error::SendError<CDEvent>),
#[error("{txt}")]
Custom { txt: String },
}

fn to_err<T>(txt: T) -> Error
where
T: Into<String>,
{
Error::Custom { txt: txt.into() }
}

impl From<String> for Error {
fn from(value: String) -> Self {
to_err(value)
}
}
17 changes: 7 additions & 10 deletions cdviz-sensors/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
mod errors;
mod http_sink;
mod settings;
mod watch_directory;
mod sources;

use cdevents_sdk::CDEvent;
use clap::Parser;
use enum_dispatch::enum_dispatch;
use errors::Result;
use http_sink::HttpSink;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use sources::opendal_list;
use tokio::sync::mpsc;

#[derive(Clone, Debug, Serialize, Deserialize)]
struct CDEvent {
json: Value,
}

#[tokio::main]
async fn main() -> Result<()> {
let settings = settings::Settings::parse();
Expand All @@ -27,9 +22,11 @@ async fn main() -> Result<()> {

let mut watchers_count = 0;
let _watch_directory_guard = if let Some(directory) = settings.watch_directory {
let w = watch_directory::watch(tx, directory).await?;
// let w = watch_directory::watch(tx, directory).await?;
let w = opendal_list::Source::from_local_path(directory)?;
let h = w.start(tx.clone()).await;
watchers_count += 1;
Some(w)
Some(h)
} else {
None
};
Expand Down
1 change: 1 addition & 0 deletions cdviz-sensors/src/sources/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod opendal_list;
114 changes: 114 additions & 0 deletions cdviz-sensors/src/sources/opendal_list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use std::collections::HashMap;
use std::path::Path;
use std::time::Duration;

use crate::errors::{Error, Result};
use cdevents_sdk::CDEvent;
use chrono::DateTime;
use chrono::Utc;
use futures::TryStreamExt;
use opendal::services::Fs;
use opendal::Entry;
use opendal::EntryMode;
use opendal::Metakey;
use opendal::Operator;
use opendal::Scheme;
use tokio::sync::mpsc::Sender;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tracing::instrument;

pub struct Source {
op: Operator,
poll_interval: Duration,
}

impl Source {
pub fn from_local_path<P>(p: P) -> Result<Self>
where
P: AsRef<Path> + std::fmt::Debug,
{
let mut builder = Fs::default();
builder.root(
p.as_ref()
.to_str()
.ok_or_else(|| Error::from(format!("failed to convert into str: {:?}", p)))?,
);
let op: Operator = Operator::new(builder)?.finish();

Ok(Self {
op,
poll_interval: Duration::from_secs(5),
})
}
pub fn from_config() -> Result<Self> {
let map = HashMap::from([
// Set the root for fs, all operations will happen under this root.
//
// NOTE: the root must be absolute path.
("root".to_string(), "/tmp".to_string()),
]);

// Build an `Operator` to start operating the storage.
let op: Operator = Operator::via_map(Scheme::Fs, map)?;

Ok(Self {
op,
poll_interval: Duration::from_secs(5),
})
}

pub async fn start(self, tx: Sender<CDEvent>) -> JoinHandle<Result<()>> {
tokio::spawn(async move {
// Process each socket concurrently.
run_in_loop(tx, self.op, self.poll_interval).await
})
}
}

pub async fn run_in_loop(tx: Sender<CDEvent>, op: Operator, poll_interval: Duration) -> Result<()> {
let mut after = DateTime::<Utc>::MIN_UTC;
loop {
let before = Utc::now();
if let Err(err) = run_once(&tx, &op, (&after, &before)).await {
tracing::warn!(?err, after =? after, before =? before, scheme =? op.info().scheme(), root =? op.info().root(), "fail on scanning");
}
after = before;
sleep(poll_interval).await;
}
}

#[instrument]
pub async fn run_once(
tx: &Sender<CDEvent>,
op: &Operator,
ts_beetween: (&DateTime<Utc>, &DateTime<Utc>),
) -> Result<()> {
let (after, before) = ts_beetween;
// TODO convert into arg of instrument
tracing::debug!(after =? after, before =? before, scheme =? op.info().scheme(), root =? op.info().root(), "scanning");
let mut lister = op
.lister_with("")
// Make sure content-length and last-modified been fetched.
.metakey(Metakey::ContentLength | Metakey::LastModified)
.await?;
while let Some(entry) = lister.try_next().await? {
let meta = entry.metadata();
if meta.mode() == EntryMode::FILE {
if let Some(last) = meta.last_modified() {
if &last > after && &last <= before && meta.content_length() > 0 {
if let Err(err) = process_entry(tx, op, &entry).await {
tracing::warn!(?err, path = entry.path(), "fail to process, skip")
}
}
}
}
}
Ok(())
}

async fn process_entry(tx: &Sender<CDEvent>, op: &Operator, entry: &Entry) -> Result<()> {
let read = op.read(entry.path()).await?;
let cdevent: CDEvent = serde_json::from_slice::<CDEvent>(&read)?;
tx.send(cdevent).await.map_err(Error::from)
}
76 changes: 0 additions & 76 deletions cdviz-sensors/src/watch_directory.rs

This file was deleted.

0 comments on commit 8cae44d

Please sign in to comment.