Skip to content

Commit

Permalink
feat(cdviz-collector): replace "0" by a CID in context.id
Browse files Browse the repository at this point in the history
  • Loading branch information
davidB committed Oct 13, 2024
1 parent f2b4daf commit fdbcab9
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 6 deletions.
3 changes: 3 additions & 0 deletions cdviz-collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ bytes = { version = "1.7", optional = true }
# cdevents-sdk = { git = "https://github.com/cdevents/sdk-rust" }
cdevents-sdk = "0.1"
chrono = { version = "0.4", features = ["serde"] }
cid = "0.11"
clap = { version = "4", features = ["derive", "env"] }
clap-verbosity-flag = "2.2"
cloudevents-sdk = { version = "0.7", features = ["http-binding"] }
Expand All @@ -40,6 +41,7 @@ init-tracing-opentelemetry = { version = "0.22", features = [
"tracing_subscriber_ext",
"logfmt",
] }
multihash = "0.19"
opendal = { version = "0.50", default-features = false, features = [
"services-fs",
"services-s3",
Expand All @@ -50,6 +52,7 @@ reqwest-tracing = { version = "0.5", optional = true }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_with = "3.11"
sha2 = "0.10"
sqlx = { version = "0.8", features = [
"postgres",
"runtime-tokio",
Expand Down
2 changes: 1 addition & 1 deletion cdviz-collector/examples/assets/cdviz-collector.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ template = """
"body": {
"context": {
"version": "0.4.0-draft",
"id": "{{ body.uuid }}",
"id": "0",
"source": "/event/source/123",
"type": "dev.cdevents.service.deployed.0.1.1",
"timestamp": "{{ body.timestamp }}"
Expand Down
8 changes: 4 additions & 4 deletions cdviz-collector/examples/assets/opendal_fs/cdevents.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
timestamp,uuid,id,env,artifact_id
2024-03-20T14:27:05.315384Z,0fe3789b-6094-4cce-bb23-c949e1c4fdf1,1,dev,pkg:oci/myapp@sha256%3A0b31b1c02ff458ad9b7b81cbdf8f028bd54699fa151f221d1e8de6817db93427
2024-03-20T15:27:05.315384Z,20222a04-71d0-4164-97b7-40a2f26d5951,2,stg,pkg:oci/myapp@sha256%3A0b31b1c02ff458ad9b7b81cbdf8f028bd54699fa151f221d1e8de6817db93427
2024-03-20T16:27:05.315384Z,5111bde6-8cf7-4961-a4f5-f5621146fa2b,3,pro,pkg:oci/myapp@sha256%3A0b31b1c02ff458ad9b7b81cbdf8f028bd54699fa151f221d1e8de6817db93427
timestamp,id,env,artifact_id
2024-03-20T14:27:05.315384Z,1,dev,pkg:oci/myapp@sha256%3A0b31b1c02ff458ad9b7b81cbdf8f028bd54699fa151f221d1e8de6817db93427
2024-03-20T15:27:05.315384Z,2,stg,pkg:oci/myapp@sha256%3A0b31b1c02ff458ad9b7b81cbdf8f028bd54699fa151f221d1e8de6817db93427
2024-03-20T16:27:05.315384Z,3,pro,pkg:oci/myapp@sha256%3A0b31b1c02ff458ad9b7b81cbdf8f028bd54699fa151f221d1e8de6817db93427
3 changes: 3 additions & 0 deletions cdviz-collector/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ pub(crate) enum Error {
CloudEventMessage(#[from] cloudevents::message::Error),
// #[error(transparent)]
// ConfigTomlError(#[from] toml::de::Error),
#[error(transparent)]
MultiHash(#[from] multihash::Error),

#[error("{txt}")]
Custom { txt: String },
// #[error(transparent)]
Expand Down
1 change: 1 addition & 0 deletions cdviz-collector/src/sources/extractors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub(crate) enum Config {
}

impl Config {
//TODO include some metadata into the extractor like the source name
pub(crate) fn make_extractor(&self, next: EventSourcePipe) -> Result<Box<dyn Extractor>> {
let out: Box<dyn Extractor> = match self {
Config::Sleep => Box::new(SleepExtractor {}),
Expand Down
65 changes: 64 additions & 1 deletion cdviz-collector/src/sources/send_cdevents.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@ use crate::errors::Result;
use crate::pipes::Pipe;
use crate::Message;
use cdevents_sdk::CDEvent;
use cid::Cid;
use multihash::Multihash;
use serde_json::json;
use sha2::{Digest, Sha256};
use tokio::sync::broadcast::Sender;

use super::EventSource;

const RAW: u64 = 0x55;
const SHA2_256: u64 = 0x12;

pub(crate) struct Processor {
next: Sender<Message>,
}
Expand All @@ -19,9 +26,65 @@ impl Processor {
impl Pipe for Processor {
type Input = EventSource;
fn send(&mut self, input: Self::Input) -> Result<()> {
let cdevent: CDEvent = serde_json::from_value(input.body)?;
let mut body = input.body;
set_id_zero_to_cid(&mut body)?;
// TODO if source is empty, set a default value based on configuration TBD
let cdevent: CDEvent = serde_json::from_value(body)?;

// TODO include headers into message
self.next.send(cdevent.into())?;
Ok(())
}
}

fn set_id_zero_to_cid(body: &mut serde_json::Value) -> Result<()> {
if body["context"]["id"] == json!("0") {
// Do not use multihash-codetable because one of it's transitive dependency raise
// an alert "unmaintained advisory detected" about `proc-macro-error`
// https://rustsec.org/advisories/RUSTSEC-2024-0370
// let hash = Code::Sha2_256.digest(serde_json::to_string(&input.body)?.as_bytes());
let mut hasher = Sha256::new();
hasher.update(serde_json::to_string(&body)?.as_bytes());
let hash = hasher.finalize();
let mhash = Multihash::<64>::wrap(SHA2_256, hash.as_slice())?;
let cid = Cid::new_v1(RAW, mhash);
body["context"]["id"] = json!(cid.to_string());
}
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;

#[test]
fn test_set_id_zero_to_cid() {
let mut body = json!({
"context": {
"id": "0",
"source": "/event/source/123",
"type": "dev.cdevents.service.deployed.0.1.1",
"timestamp": "2023-03-20T14:27:05.315384Z"
},
"subject": {
"id": "mySubject123",
"source": "/event/source/123",
"type": "service",
"content": {
"environment": {
"id": "test123"
},
"artifactId": "pkg:oci/myapp@sha256%3A0b31b1c02ff458ad9b7b81cbdf8f028bd54699fa151f221d1e8de6817db93427"
}
}
});

set_id_zero_to_cid(&mut body).unwrap();

assert_eq!(
body["context"]["id"],
json!("bafkreid4ehbvqs3ae6l3htd35xhxbhbfehfkrq3gyf242s6nfcsnz2ueve")
)
}
}

0 comments on commit fdbcab9

Please sign in to comment.