Skip to content

Commit

Permalink
🚧 cdviz-watcher watch local folder
Browse files Browse the repository at this point in the history
  • Loading branch information
davidB committed Feb 1, 2024
1 parent fd66c8b commit eea3ead
Show file tree
Hide file tree
Showing 8 changed files with 300 additions and 11 deletions.
13 changes: 13 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
resolver = "2"
members = [
"cdviz-collector",
"cdviz-watcher",
# "examples/*",
]

Expand All @@ -16,6 +17,18 @@ publish = false

[workspace.dependencies]
rstest = "0.18"
clap = { version = "4", features = ["derive", "env"] }
init-tracing-opentelemetry = { version = "0.16", features = [
"otlp",
"tracing_subscriber_ext",
] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
time = "0.3"
thiserror = "1"
tokio = { version = "1.0", features = ["full"] }
tracing = "0.1"
tracing-opentelemetry-instrumentation-sdk = { version = "0.16" }

# [profile.dev.package.insta]
# opt-level = 3
Expand Down
19 changes: 8 additions & 11 deletions cdviz-collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,10 @@ publish.workspace = true
[dependencies]
axum = "0.7"
axum-tracing-opentelemetry = { version = "0.16" }
clap = { version = "4", features = ["derive", "env"] }
init-tracing-opentelemetry = { version = "0.16", features = [
"otlp",
"tracing_subscriber_ext",
] }
clap = { workspace = true }
init-tracing-opentelemetry = { workspace = true }
# cloudevents-sdk = { version = "0.7", features = ["axum"] } // not compatible with axum 0.7
serde_json = "1.0"
serde_json = { workspace = true }
sqlx = { version = "0.7", features = [
"postgres",
"runtime-tokio",
Expand All @@ -30,11 +27,11 @@ sqlx = { version = "0.7", features = [
"json",
"migrate", # for test but simpler to declare once
], default-features = false }
time = "0.3"
thiserror = "1"
tokio = { version = "1.0", features = ["full"] }
tracing = "0.1"
tracing-opentelemetry-instrumentation-sdk = { version = "0.16" }
time = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-opentelemetry-instrumentation-sdk = { workspace = true }

[dev-dependencies]
axum-test = "14"
Expand Down
31 changes: 31 additions & 0 deletions cdviz-watcher/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[package]
name = "cdviz-watcher"
description = "Watch resources and push cdevents"
keywords = ["cd"]
categories = []
edition.workspace = true
version.workspace = true
authors.workspace = true
repository.workspace = true
license.workspace = true
publish.workspace = true

[dependencies]
clap = { workspace = true }
enum_dispatch = "0.3"
init-tracing-opentelemetry = { workspace = true }
futures = "0.3"
notify = "6"
reqwest = "0.11"
reqwest-middleware = "0.2"
reqwest-tracing = "0.4"
serde = { workspace = true }
serde_json = { workspace = true }
time = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }

# [dev-dependencies]
# rstest = { workspace = true }
# tracing-subscriber = "0.3"
21 changes: 21 additions & 0 deletions cdviz-watcher/src/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use thiserror::Error;

pub type Result<T> = std::result::Result<T, Error>;

#[derive(Error, Debug)]
pub enum Error {
#[error("no watcher found (configured or started)")]
NoWatcher,
#[error("no sink found (configured or started)")]
NoSink,
#[error(transparent)]
WatchDirectory(#[from] notify::Error),
#[error(transparent)]
InitTracing(#[from] init_tracing_opentelemetry::Error),
#[error(transparent)]
Http(#[from] reqwest_middleware::Error),
#[error(transparent)]
Json(#[from] serde_json::Error),
// #[error(transparent)]
// Other(#[from] anyhow::Error),
}
46 changes: 46 additions & 0 deletions cdviz-watcher/src/http_sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use reqwest::Url;
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
//use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff};
use crate::errors::Result;
use crate::{CDEvent, Sink};
use reqwest_tracing::TracingMiddleware;

#[derive(Debug)]
pub(crate) struct HttpSink {
client: ClientWithMiddleware,
dest: Url,
}

impl HttpSink {
pub(crate) fn new(url: Url) -> Self {
// Retry up to 3 times with increasing intervals between attempts.
//let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);
let client = ClientBuilder::new(reqwest::Client::new())
// Trace HTTP requests. See the tracing crate to make use of these traces.
.with(TracingMiddleware::default())
// Retry failed requests.
//.with(RetryTransientMiddleware::new_with_policy(retry_policy))
.build();
Self { dest: url, client }
}
}

impl Sink for HttpSink {
async fn send(&self, cdevent: &CDEvent) -> Result<()> {
let json = serde_json::to_value(cdevent)?;
let resp = self
.client
.post(self.dest.clone())
.json(&json)
.send()
.await?;
if !resp.status().is_success() {
tracing::warn!(
?cdevent,
http_status = ?resp.status(),
"failed to send event"
)
}
Ok(())
}
}
86 changes: 86 additions & 0 deletions cdviz-watcher/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
mod errors;
mod http_sink;
mod settings;
mod watch_directory;

use clap::Parser;
use enum_dispatch::enum_dispatch;
use errors::Result;
use http_sink::HttpSink;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::sync::mpsc;

#[derive(Clone, Debug, Serialize, Deserialize)]
struct CDEvent {
json: Value,
}

#[tokio::main]
async fn main() -> Result<()> {
let settings = settings::Settings::parse();

// very opinionated init of tracing, look as is source to make your own
//TODO use logfmt format (with traceid,...) see [tracing-logfmt-otel](https://github.com/elkowar/tracing-logfmt-otel)
init_tracing_opentelemetry::tracing_subscriber_ext::init_subscribers()?;
let (tx, mut rx) = mpsc::channel::<CDEvent>(32);

let mut watchers_count = 0;
let _watch_directory_guard = if let Some(directory) = settings.watch_directory {
let w = watch_directory::watch(tx, directory).await?;
watchers_count += 1;
Some(w)
} else {
None
};
if watchers_count < 1 {
tracing::error!("no watcher configured or started");
return Err(errors::Error::NoWatcher);
}

let mut sinks = vec![];

if settings.sink_debug {
sinks.push(SinkEnum::from(DebugSink {}));
}

if let Some(url) = settings.sink_http {
sinks.push(SinkEnum::from(HttpSink::new(url)));
}

if sinks.len() < 1 {
tracing::error!("no sink configured or started");
return Err(errors::Error::NoSink);
}

while let Some(message) = rx.recv().await {
for sink in sinks.iter() {
if let Err(e) = sink.send(&message).await {
tracing::warn!(?e, ?sink, "failed to send to sink");
}
}
}
Ok(())
}

#[enum_dispatch(SinkEnum)]
trait Sink {
async fn send(&self, cdevent: &CDEvent) -> Result<()>;
}

#[enum_dispatch]
#[derive(Debug)]
enum SinkEnum {
DebugSink,
HttpSink,
}

#[derive(Debug)]
struct DebugSink;

impl Sink for DebugSink {
async fn send(&self, cdevent: &CDEvent) -> Result<()> {
tracing::debug!(?cdevent, "sending");
Ok(())
}
}
19 changes: 19 additions & 0 deletions cdviz-watcher/src/settings.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use std::path::PathBuf;

use reqwest::Url;

#[derive(Debug, Clone, clap::Parser)]
pub struct Settings {
/// watch a local file system directory
/// (create on event per valid cdevents json file)
#[clap(long)]
pub watch_directory: Option<PathBuf>,

/// push cdevents as json to this url
#[clap(long)]
pub sink_http: Option<Url>,

/// push cdevents to log
#[clap(long)]
pub sink_debug: bool,
}
76 changes: 76 additions & 0 deletions cdviz-watcher/src/watch_directory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use crate::{errors::Result, CDEvent};
use notify::{
event::{AccessKind, AccessMode},
Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher,
};
use std::{
fs,
path::{Path, PathBuf},
};
use tokio::sync::mpsc::Sender;

// based on https://github.com/notify-rs/notify/blob/main/examples/async_monitor.rs
pub(crate) async fn watch<P: AsRef<Path>>(
tx: Sender<CDEvent>,
path: P,
) -> Result<Box<dyn Watcher>> {
// Automatically select the best implementation for your platform.
// You can also access each implementation directly e.g. INotifyWatcher.
let mut watcher = RecommendedWatcher::new(
move |res| {
futures::executor::block_on(async {
//dbg!(&res);
if let Ok(event) = res {
if let Some(cdevents) = maybe_to_cdevents(event) {
for cdevent in cdevents {
let sent = tx.send(cdevent).await;
if sent.is_err() {
tracing::warn!(?sent);
}
}
}
} else {
tracing::warn!(?res);
}
})
},
Config::default(),
)?;

// Add a path to be watched. All files and directories at that path and
// below will be monitored for changes.
watcher.watch(path.as_ref(), RecursiveMode::Recursive)?;
tracing::info!(path = ?path.as_ref(), "start watching directory");
Ok(Box::new(watcher))
}

fn maybe_to_cdevents(event: Event) -> Option<Vec<CDEvent>> {
// Access is called after creation or modification of a file
if event.kind == EventKind::Access(AccessKind::Close(AccessMode::Write)) {
let v: Vec<CDEvent> = event
.paths
.into_iter()
.filter(|p| p.is_file() && (p.extension().unwrap_or_default() == "json"))
.filter_map(maybe_to_cdevent)
.collect();
if v.is_empty() {
None
} else {
Some(v)
}
} else {
None
}
}

fn maybe_to_cdevent(p: PathBuf) -> Option<CDEvent> {
fs::read_to_string(p)
.map_err(|error| tracing::warn!(?error))
.ok()
.and_then(|txt| {
serde_json::from_str(&txt)
.map_err(|error| tracing::warn!(?error))
.ok()
})
.map(|json| CDEvent { json })
}

0 comments on commit eea3ead

Please sign in to comment.