Skip to content

Commit

Permalink
✨ (cdviz-collectopr) http sink sends cloudevents #21 (#57)
Browse files Browse the repository at this point in the history
* added cloud-event converter in the sink
* reworked http request in sink for cloud-event

---------

Signed-off-by: Hergy Fongue <[email protected]>
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
Co-authored-by: David Bernard <[email protected]>
  • Loading branch information
3 people authored May 1, 2024
1 parent f0c0b98 commit 4d7da34
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 14 deletions.
1 change: 1 addition & 0 deletions cdviz-collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ publish = false
axum = { version = "0.7", optional = true }
axum-tracing-opentelemetry = { version = "0.18", optional = true }
cdevents-sdk = { git = "https://github.com/cdevents/sdk-rust" }
cloudevents-sdk = { version = "0.7"}
# cloudevents-sdk = { version = "0.7", features = ["axum"] } // not compatible with axum 0.7
chrono = "0.4"
clap = { version = "4", features = ["derive", "env"] }
Expand Down
49 changes: 35 additions & 14 deletions cdviz-collector/src/sinks/http.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use cdevents_sdk::cloudevents::BuilderExt;
use cloudevents::{EventBuilder, EventBuilderV10};
use cloudevents::binding::reqwest::RequestBuilderExt;
use reqwest::Url;
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -44,20 +47,38 @@ impl HttpSink {
impl Sink for HttpSink {
//TODO use cloudevents
async fn send(&self, msg: &Message) -> Result<()> {
let json = serde_json::to_value(&msg.cdevent)?;
let resp = self
.client
.post(self.dest.clone())
.json(&json)
.send()
.await?;
if !resp.status().is_success() {
tracing::warn!(
cdevent = ?msg.cdevent,
http_status = ?resp.status(),
"failed to send event"
)
}
let cd_event = msg.cdevent.clone();
// convert CdEvent to cloudevents
let event_result = EventBuilderV10::new().with_cdevent(cd_event.clone());
match event_result {
Ok(event_builder) => {
let event_result = event_builder.build();
let value = event_result.map_err(|e| {
tracing::warn!(error = ?e, "Failed to build event")
})?;
reqwest::Client::new()
.post(self.dest.clone())
.event(value)
.map_err(|e| {
tracing::warn!(error = ?e, "Failed to build request-builder")
})?
.header("Access-Control-Allow-Origin", "*")
.send()
.await
.map_err(|e| {
tracing::warn!(error = ?e, "Failed to get response")
})?;
}
Err(err) => {
tracing::warn!(error = ?err, "Failed to convert to cloudevents");
// In error case, send the original event
self.client
.post(self.dest.clone())
.json(&cd_event)
.send()
.await?;
}
};
Ok(())
}
}

0 comments on commit 4d7da34

Please sign in to comment.