From fdbcab9b69fe4ecc2f6027eb231c1368e224e3ce Mon Sep 17 00:00:00 2001 From: David Bernard Date: Sun, 13 Oct 2024 18:58:50 +0200 Subject: [PATCH] feat(cdviz-collector): replace "0" by a CID in `context.id` --- cdviz-collector/Cargo.toml | 3 + .../examples/assets/cdviz-collector.toml | 2 +- .../examples/assets/opendal_fs/cdevents.csv | 8 +-- cdviz-collector/src/errors.rs | 3 + cdviz-collector/src/sources/extractors.rs | 1 + cdviz-collector/src/sources/send_cdevents.rs | 65 ++++++++++++++++++- 6 files changed, 76 insertions(+), 6 deletions(-) diff --git a/cdviz-collector/Cargo.toml b/cdviz-collector/Cargo.toml index 189d3ce..c8aa2f7 100644 --- a/cdviz-collector/Cargo.toml +++ b/cdviz-collector/Cargo.toml @@ -19,6 +19,7 @@ bytes = { version = "1.7", optional = true } # cdevents-sdk = { git = "https://github.com/cdevents/sdk-rust" } cdevents-sdk = "0.1" chrono = { version = "0.4", features = ["serde"] } +cid = "0.11" clap = { version = "4", features = ["derive", "env"] } clap-verbosity-flag = "2.2" cloudevents-sdk = { version = "0.7", features = ["http-binding"] } @@ -40,6 +41,7 @@ init-tracing-opentelemetry = { version = "0.22", features = [ "tracing_subscriber_ext", "logfmt", ] } +multihash = "0.19" opendal = { version = "0.50", default-features = false, features = [ "services-fs", "services-s3", @@ -50,6 +52,7 @@ reqwest-tracing = { version = "0.5", optional = true } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_with = "3.11" +sha2 = "0.10" sqlx = { version = "0.8", features = [ "postgres", "runtime-tokio", diff --git a/cdviz-collector/examples/assets/cdviz-collector.toml b/cdviz-collector/examples/assets/cdviz-collector.toml index 7fa29e3..d071ea6 100644 --- a/cdviz-collector/examples/assets/cdviz-collector.toml +++ b/cdviz-collector/examples/assets/cdviz-collector.toml @@ -36,7 +36,7 @@ template = """ "body": { "context": { "version": "0.4.0-draft", - "id": "{{ body.uuid }}", + "id": "0", "source": "/event/source/123", "type": "dev.cdevents.service.deployed.0.1.1", "timestamp": "{{ body.timestamp }}" diff --git a/cdviz-collector/examples/assets/opendal_fs/cdevents.csv b/cdviz-collector/examples/assets/opendal_fs/cdevents.csv index 598e74b..12bd529 100644 --- a/cdviz-collector/examples/assets/opendal_fs/cdevents.csv +++ b/cdviz-collector/examples/assets/opendal_fs/cdevents.csv @@ -1,4 +1,4 @@ -timestamp,uuid,id,env,artifact_id -2024-03-20T14:27:05.315384Z,0fe3789b-6094-4cce-bb23-c949e1c4fdf1,1,dev,pkg:oci/myapp@sha256%3A0b31b1c02ff458ad9b7b81cbdf8f028bd54699fa151f221d1e8de6817db93427 -2024-03-20T15:27:05.315384Z,20222a04-71d0-4164-97b7-40a2f26d5951,2,stg,pkg:oci/myapp@sha256%3A0b31b1c02ff458ad9b7b81cbdf8f028bd54699fa151f221d1e8de6817db93427 -2024-03-20T16:27:05.315384Z,5111bde6-8cf7-4961-a4f5-f5621146fa2b,3,pro,pkg:oci/myapp@sha256%3A0b31b1c02ff458ad9b7b81cbdf8f028bd54699fa151f221d1e8de6817db93427 +timestamp,id,env,artifact_id +2024-03-20T14:27:05.315384Z,1,dev,pkg:oci/myapp@sha256%3A0b31b1c02ff458ad9b7b81cbdf8f028bd54699fa151f221d1e8de6817db93427 +2024-03-20T15:27:05.315384Z,2,stg,pkg:oci/myapp@sha256%3A0b31b1c02ff458ad9b7b81cbdf8f028bd54699fa151f221d1e8de6817db93427 +2024-03-20T16:27:05.315384Z,3,pro,pkg:oci/myapp@sha256%3A0b31b1c02ff458ad9b7b81cbdf8f028bd54699fa151f221d1e8de6817db93427 diff --git a/cdviz-collector/src/errors.rs b/cdviz-collector/src/errors.rs index 9708fb3..25f3307 100644 --- a/cdviz-collector/src/errors.rs +++ b/cdviz-collector/src/errors.rs @@ -56,6 +56,9 @@ pub(crate) enum Error { CloudEventMessage(#[from] cloudevents::message::Error), // #[error(transparent)] // ConfigTomlError(#[from] toml::de::Error), + #[error(transparent)] + MultiHash(#[from] multihash::Error), + #[error("{txt}")] Custom { txt: String }, // #[error(transparent)] diff --git a/cdviz-collector/src/sources/extractors.rs b/cdviz-collector/src/sources/extractors.rs index ec01f25..05cec34 100644 --- a/cdviz-collector/src/sources/extractors.rs +++ b/cdviz-collector/src/sources/extractors.rs @@ -17,6 +17,7 @@ pub(crate) enum Config { } impl Config { + //TODO include some metadata into the extractor like the source name pub(crate) fn make_extractor(&self, next: EventSourcePipe) -> Result> { let out: Box = match self { Config::Sleep => Box::new(SleepExtractor {}), diff --git a/cdviz-collector/src/sources/send_cdevents.rs b/cdviz-collector/src/sources/send_cdevents.rs index 1b0a1fa..a121058 100644 --- a/cdviz-collector/src/sources/send_cdevents.rs +++ b/cdviz-collector/src/sources/send_cdevents.rs @@ -2,10 +2,17 @@ use crate::errors::Result; use crate::pipes::Pipe; use crate::Message; use cdevents_sdk::CDEvent; +use cid::Cid; +use multihash::Multihash; +use serde_json::json; +use sha2::{Digest, Sha256}; use tokio::sync::broadcast::Sender; use super::EventSource; +const RAW: u64 = 0x55; +const SHA2_256: u64 = 0x12; + pub(crate) struct Processor { next: Sender, } @@ -19,9 +26,65 @@ impl Processor { impl Pipe for Processor { type Input = EventSource; fn send(&mut self, input: Self::Input) -> Result<()> { - let cdevent: CDEvent = serde_json::from_value(input.body)?; + let mut body = input.body; + set_id_zero_to_cid(&mut body)?; + // TODO if source is empty, set a default value based on configuration TBD + let cdevent: CDEvent = serde_json::from_value(body)?; + // TODO include headers into message self.next.send(cdevent.into())?; Ok(()) } } + +fn set_id_zero_to_cid(body: &mut serde_json::Value) -> Result<()> { + if body["context"]["id"] == json!("0") { + // Do not use multihash-codetable because one of it's transitive dependency raise + // an alert "unmaintained advisory detected" about `proc-macro-error` + // https://rustsec.org/advisories/RUSTSEC-2024-0370 + // let hash = Code::Sha2_256.digest(serde_json::to_string(&input.body)?.as_bytes()); + let mut hasher = Sha256::new(); + hasher.update(serde_json::to_string(&body)?.as_bytes()); + let hash = hasher.finalize(); + let mhash = Multihash::<64>::wrap(SHA2_256, hash.as_slice())?; + let cid = Cid::new_v1(RAW, mhash); + body["context"]["id"] = json!(cid.to_string()); + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn test_set_id_zero_to_cid() { + let mut body = json!({ + "context": { + "id": "0", + "source": "/event/source/123", + "type": "dev.cdevents.service.deployed.0.1.1", + "timestamp": "2023-03-20T14:27:05.315384Z" + }, + "subject": { + "id": "mySubject123", + "source": "/event/source/123", + "type": "service", + "content": { + "environment": { + "id": "test123" + }, + "artifactId": "pkg:oci/myapp@sha256%3A0b31b1c02ff458ad9b7b81cbdf8f028bd54699fa151f221d1e8de6817db93427" + } + } + }); + + set_id_zero_to_cid(&mut body).unwrap(); + + assert_eq!( + body["context"]["id"], + json!("bafkreid4ehbvqs3ae6l3htd35xhxbhbfehfkrq3gyf242s6nfcsnz2ueve") + ) + } +}