|
1 | 1 | use cdevents_sdk::cloudevents::BuilderExt;
|
2 | 2 | use cloudevents::{EventBuilder, EventBuilderV10};
|
| 3 | +use cloudevents::binding::reqwest::RequestBuilderExt; |
3 | 4 | use reqwest::Url;
|
4 | 5 | use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
|
5 | 6 | use serde::{Deserialize, Serialize};
|
@@ -48,25 +49,34 @@ impl Sink for HttpSink {
|
48 | 49 | async fn send(&self, msg: &Message) -> Result<()> {
|
49 | 50 | let cd_event = msg.cdevent.clone();
|
50 | 51 | // convert CdEvent to cloudevents
|
51 |
| - let event_result = EventBuilderV10::new().with_cdevent(cd_event).unwrap().build(); |
52 |
| - |
| 52 | + let event_result = EventBuilderV10::new().with_cdevent(cd_event.clone()); |
53 | 53 | match event_result {
|
54 |
| - Ok(value) => { |
55 |
| - let resp = self |
56 |
| - .client |
| 54 | + Ok(event_builder) => { |
| 55 | + let event_result = event_builder.build(); |
| 56 | + let value = event_result.map_err(|e| { |
| 57 | + tracing::warn!(error = ?e, "Failed to build event") |
| 58 | + })?; |
| 59 | + reqwest::Client::new() |
57 | 60 | .post(self.dest.clone())
|
58 |
| - .json(&value) |
| 61 | + .event(value) |
| 62 | + .map_err(|e| { |
| 63 | + tracing::warn!(error = ?e, "Failed to build request-builder") |
| 64 | + })? |
| 65 | + .header("Access-Control-Allow-Origin", "*") |
59 | 66 | .send()
|
60 |
| - .await?; |
61 |
| - if !resp.status().is_success() { |
62 |
| - tracing::warn!( |
63 |
| - cdevent = ?serde_json::to_value(&value)?, |
64 |
| - http_status = ?resp.status(), |
65 |
| - "failed to send event") |
66 |
| - } |
| 67 | + .await |
| 68 | + .map_err(|e| { |
| 69 | + tracing::warn!(error = ?e, "Failed to get response") |
| 70 | + })?; |
67 | 71 | }
|
68 | 72 | Err(err) => {
|
69 | 73 | tracing::warn!(error = ?err, "Failed to convert to cloudevents");
|
| 74 | + // In error case, send the original event |
| 75 | + self.client |
| 76 | + .post(self.dest.clone()) |
| 77 | + .json(&cd_event) |
| 78 | + .send() |
| 79 | + .await?; |
70 | 80 | }
|
71 | 81 | };
|
72 | 82 | Ok(())
|
|
0 commit comments