From eea3eadaf81c492156be26e7f89287db85c50733 Mon Sep 17 00:00:00 2001 From: David Bernard Date: Tue, 9 Jan 2024 23:33:07 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=9A=A7=20cdviz-watcher=20watch=20local=20?= =?UTF-8?q?folder?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.toml | 13 +++++ cdviz-collector/Cargo.toml | 19 +++--- cdviz-watcher/Cargo.toml | 31 ++++++++++ cdviz-watcher/src/errors.rs | 21 +++++++ cdviz-watcher/src/http_sink.rs | 46 +++++++++++++++ cdviz-watcher/src/main.rs | 86 ++++++++++++++++++++++++++++ cdviz-watcher/src/settings.rs | 19 ++++++ cdviz-watcher/src/watch_directory.rs | 76 ++++++++++++++++++++++++ 8 files changed, 300 insertions(+), 11 deletions(-) create mode 100644 cdviz-watcher/Cargo.toml create mode 100644 cdviz-watcher/src/errors.rs create mode 100644 cdviz-watcher/src/http_sink.rs create mode 100644 cdviz-watcher/src/main.rs create mode 100644 cdviz-watcher/src/settings.rs create mode 100644 cdviz-watcher/src/watch_directory.rs diff --git a/Cargo.toml b/Cargo.toml index e8878aa..18942c1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ resolver = "2" members = [ "cdviz-collector", + "cdviz-watcher", # "examples/*", ] @@ -16,6 +17,18 @@ publish = false [workspace.dependencies] rstest = "0.18" +clap = { version = "4", features = ["derive", "env"] } +init-tracing-opentelemetry = { version = "0.16", features = [ + "otlp", + "tracing_subscriber_ext", +] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +time = "0.3" +thiserror = "1" +tokio = { version = "1.0", features = ["full"] } +tracing = "0.1" +tracing-opentelemetry-instrumentation-sdk = { version = "0.16" } # [profile.dev.package.insta] # opt-level = 3 diff --git a/cdviz-collector/Cargo.toml b/cdviz-collector/Cargo.toml index 334c9eb..5d10d51 100644 --- a/cdviz-collector/Cargo.toml +++ b/cdviz-collector/Cargo.toml @@ -14,13 +14,10 @@ publish.workspace = true [dependencies] axum = "0.7" axum-tracing-opentelemetry = { version = "0.16" } -clap = { version = "4", features = ["derive", "env"] } -init-tracing-opentelemetry = { version = "0.16", features = [ - "otlp", - "tracing_subscriber_ext", -] } +clap = { workspace = true } +init-tracing-opentelemetry = { workspace = true } # cloudevents-sdk = { version = "0.7", features = ["axum"] } // not compatible with axum 0.7 -serde_json = "1.0" +serde_json = { workspace = true } sqlx = { version = "0.7", features = [ "postgres", "runtime-tokio", @@ -30,11 +27,11 @@ sqlx = { version = "0.7", features = [ "json", "migrate", # for test but simpler to declare once ], default-features = false } -time = "0.3" -thiserror = "1" -tokio = { version = "1.0", features = ["full"] } -tracing = "0.1" -tracing-opentelemetry-instrumentation-sdk = { version = "0.16" } +time = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +tracing-opentelemetry-instrumentation-sdk = { workspace = true } [dev-dependencies] axum-test = "14" diff --git a/cdviz-watcher/Cargo.toml b/cdviz-watcher/Cargo.toml new file mode 100644 index 0000000..4f01662 --- /dev/null +++ b/cdviz-watcher/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "cdviz-watcher" +description = "Watch resources and push cdevents" +keywords = ["cd"] +categories = [] +edition.workspace = true +version.workspace = true +authors.workspace = true +repository.workspace = true +license.workspace = true +publish.workspace = true + +[dependencies] +clap = { workspace = true } +enum_dispatch = "0.3" +init-tracing-opentelemetry = { workspace = true } +futures = "0.3" +notify = "6" +reqwest = "0.11" +reqwest-middleware = "0.2" +reqwest-tracing = "0.4" +serde = { workspace = true } +serde_json = { workspace = true } +time = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } + +# [dev-dependencies] +# rstest = { workspace = true } +# tracing-subscriber = "0.3" diff --git a/cdviz-watcher/src/errors.rs b/cdviz-watcher/src/errors.rs new file mode 100644 index 0000000..bba7283 --- /dev/null +++ b/cdviz-watcher/src/errors.rs @@ -0,0 +1,21 @@ +use thiserror::Error; + +pub type Result = std::result::Result; + +#[derive(Error, Debug)] +pub enum Error { + #[error("no watcher found (configured or started)")] + NoWatcher, + #[error("no sink found (configured or started)")] + NoSink, + #[error(transparent)] + WatchDirectory(#[from] notify::Error), + #[error(transparent)] + InitTracing(#[from] init_tracing_opentelemetry::Error), + #[error(transparent)] + Http(#[from] reqwest_middleware::Error), + #[error(transparent)] + Json(#[from] serde_json::Error), + // #[error(transparent)] + // Other(#[from] anyhow::Error), +} diff --git a/cdviz-watcher/src/http_sink.rs b/cdviz-watcher/src/http_sink.rs new file mode 100644 index 0000000..7fff054 --- /dev/null +++ b/cdviz-watcher/src/http_sink.rs @@ -0,0 +1,46 @@ +use reqwest::Url; +use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; +//use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff}; +use crate::errors::Result; +use crate::{CDEvent, Sink}; +use reqwest_tracing::TracingMiddleware; + +#[derive(Debug)] +pub(crate) struct HttpSink { + client: ClientWithMiddleware, + dest: Url, +} + +impl HttpSink { + pub(crate) fn new(url: Url) -> Self { + // Retry up to 3 times with increasing intervals between attempts. + //let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3); + let client = ClientBuilder::new(reqwest::Client::new()) + // Trace HTTP requests. See the tracing crate to make use of these traces. + .with(TracingMiddleware::default()) + // Retry failed requests. + //.with(RetryTransientMiddleware::new_with_policy(retry_policy)) + .build(); + Self { dest: url, client } + } +} + +impl Sink for HttpSink { + async fn send(&self, cdevent: &CDEvent) -> Result<()> { + let json = serde_json::to_value(cdevent)?; + let resp = self + .client + .post(self.dest.clone()) + .json(&json) + .send() + .await?; + if !resp.status().is_success() { + tracing::warn!( + ?cdevent, + http_status = ?resp.status(), + "failed to send event" + ) + } + Ok(()) + } +} diff --git a/cdviz-watcher/src/main.rs b/cdviz-watcher/src/main.rs new file mode 100644 index 0000000..2b4b6ff --- /dev/null +++ b/cdviz-watcher/src/main.rs @@ -0,0 +1,86 @@ +mod errors; +mod http_sink; +mod settings; +mod watch_directory; + +use clap::Parser; +use enum_dispatch::enum_dispatch; +use errors::Result; +use http_sink::HttpSink; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use tokio::sync::mpsc; + +#[derive(Clone, Debug, Serialize, Deserialize)] +struct CDEvent { + json: Value, +} + +#[tokio::main] +async fn main() -> Result<()> { + let settings = settings::Settings::parse(); + + // very opinionated init of tracing, look as is source to make your own + //TODO use logfmt format (with traceid,...) see [tracing-logfmt-otel](https://github.com/elkowar/tracing-logfmt-otel) + init_tracing_opentelemetry::tracing_subscriber_ext::init_subscribers()?; + let (tx, mut rx) = mpsc::channel::(32); + + let mut watchers_count = 0; + let _watch_directory_guard = if let Some(directory) = settings.watch_directory { + let w = watch_directory::watch(tx, directory).await?; + watchers_count += 1; + Some(w) + } else { + None + }; + if watchers_count < 1 { + tracing::error!("no watcher configured or started"); + return Err(errors::Error::NoWatcher); + } + + let mut sinks = vec![]; + + if settings.sink_debug { + sinks.push(SinkEnum::from(DebugSink {})); + } + + if let Some(url) = settings.sink_http { + sinks.push(SinkEnum::from(HttpSink::new(url))); + } + + if sinks.len() < 1 { + tracing::error!("no sink configured or started"); + return Err(errors::Error::NoSink); + } + + while let Some(message) = rx.recv().await { + for sink in sinks.iter() { + if let Err(e) = sink.send(&message).await { + tracing::warn!(?e, ?sink, "failed to send to sink"); + } + } + } + Ok(()) +} + +#[enum_dispatch(SinkEnum)] +trait Sink { + async fn send(&self, cdevent: &CDEvent) -> Result<()>; +} + +#[enum_dispatch] +#[derive(Debug)] +enum SinkEnum { + DebugSink, + HttpSink, +} + +#[derive(Debug)] +struct DebugSink; + +impl Sink for DebugSink { + async fn send(&self, cdevent: &CDEvent) -> Result<()> { + tracing::debug!(?cdevent, "sending"); + Ok(()) + } +} diff --git a/cdviz-watcher/src/settings.rs b/cdviz-watcher/src/settings.rs new file mode 100644 index 0000000..b2824c6 --- /dev/null +++ b/cdviz-watcher/src/settings.rs @@ -0,0 +1,19 @@ +use std::path::PathBuf; + +use reqwest::Url; + +#[derive(Debug, Clone, clap::Parser)] +pub struct Settings { + /// watch a local file system directory + /// (create on event per valid cdevents json file) + #[clap(long)] + pub watch_directory: Option, + + /// push cdevents as json to this url + #[clap(long)] + pub sink_http: Option, + + /// push cdevents to log + #[clap(long)] + pub sink_debug: bool, +} diff --git a/cdviz-watcher/src/watch_directory.rs b/cdviz-watcher/src/watch_directory.rs new file mode 100644 index 0000000..3c30209 --- /dev/null +++ b/cdviz-watcher/src/watch_directory.rs @@ -0,0 +1,76 @@ +use crate::{errors::Result, CDEvent}; +use notify::{ + event::{AccessKind, AccessMode}, + Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher, +}; +use std::{ + fs, + path::{Path, PathBuf}, +}; +use tokio::sync::mpsc::Sender; + +// based on https://github.com/notify-rs/notify/blob/main/examples/async_monitor.rs +pub(crate) async fn watch>( + tx: Sender, + path: P, +) -> Result> { + // Automatically select the best implementation for your platform. + // You can also access each implementation directly e.g. INotifyWatcher. + let mut watcher = RecommendedWatcher::new( + move |res| { + futures::executor::block_on(async { + //dbg!(&res); + if let Ok(event) = res { + if let Some(cdevents) = maybe_to_cdevents(event) { + for cdevent in cdevents { + let sent = tx.send(cdevent).await; + if sent.is_err() { + tracing::warn!(?sent); + } + } + } + } else { + tracing::warn!(?res); + } + }) + }, + Config::default(), + )?; + + // Add a path to be watched. All files and directories at that path and + // below will be monitored for changes. + watcher.watch(path.as_ref(), RecursiveMode::Recursive)?; + tracing::info!(path = ?path.as_ref(), "start watching directory"); + Ok(Box::new(watcher)) +} + +fn maybe_to_cdevents(event: Event) -> Option> { + // Access is called after creation or modification of a file + if event.kind == EventKind::Access(AccessKind::Close(AccessMode::Write)) { + let v: Vec = event + .paths + .into_iter() + .filter(|p| p.is_file() && (p.extension().unwrap_or_default() == "json")) + .filter_map(maybe_to_cdevent) + .collect(); + if v.is_empty() { + None + } else { + Some(v) + } + } else { + None + } +} + +fn maybe_to_cdevent(p: PathBuf) -> Option { + fs::read_to_string(p) + .map_err(|error| tracing::warn!(?error)) + .ok() + .and_then(|txt| { + serde_json::from_str(&txt) + .map_err(|error| tracing::warn!(?error)) + .ok() + }) + .map(|json| CDEvent { json }) +}