Skip to content

Commit 554577b

Browse files
committed
Bridge: update to use new rust lib, refactor poller input
This diff updates bridge's Poller input to be based on the stream exposed by the Polling Endpoint API.
1 parent 0308086 commit 554577b

File tree

7 files changed

+184
-93
lines changed

7 files changed

+184
-93
lines changed

bridge/Cargo.lock

+126-28
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bridge/svix-bridge-plugin-kafka/src/input.rs

+16-16
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ use rdkafka::{
99
Message as _,
1010
};
1111
use svix_bridge_types::{
12-
async_trait, svix::api::Svix, CreateMessageRequest, JsObject, SenderInput, SenderOutputOpts,
13-
TransformationConfig, TransformerInput, TransformerInputFormat, TransformerJob,
14-
TransformerOutput, TransformerTx,
12+
async_trait,
13+
svix::api::{MessageCreateOptions, Svix},
14+
CreateMessageRequest, JsObject, SenderInput, SenderOutputOpts, TransformationConfig,
15+
TransformerInput, TransformerInputFormat, TransformerJob, TransformerOutput, TransformerTx,
1516
};
1617
use tokio::task::spawn_blocking;
1718

@@ -69,27 +70,26 @@ impl KafkaConsumer {
6970
serde_json::from_slice(payload).map_err(Error::Deserialization)?
7071
};
7172

72-
let CreateMessageRequest {
73-
app_id,
74-
message,
75-
mut post_options,
76-
} = payload;
73+
let CreateMessageRequest { app_id, message } = payload;
7774

7875
let KafkaInputOpts::Inner {
7976
group_id, topic, ..
8077
} = &self.opts;
8178

82-
// If committing the message fails or the process crashes after posting the webhook but
83-
// before committing, this makes sure that the next run of this fn with the same kafka
84-
// message doesn't end up creating a duplicate webhook in svix.
85-
let idempotency_key = format!("svix_bridge_kafka_{group_id}_{topic}_{}", msg.offset());
86-
post_options
87-
.get_or_insert_with(Default::default)
88-
.idempotency_key = Some(idempotency_key);
79+
let options = MessageCreateOptions {
80+
with_content: None,
81+
// If committing the message fails or the process crashes after posting the webhook but
82+
// before committing, this makes sure that the next run of this fn with the same kafka
83+
// message doesn't end up creating a duplicate webhook in svix.
84+
idempotency_key: Some(format!(
85+
"svix_bridge_kafka_{group_id}_{topic}_{}",
86+
msg.offset()
87+
)),
88+
};
8989

9090
self.svix_client
9191
.message()
92-
.create(app_id, message, post_options.map(Into::into))
92+
.create(app_id, message, Some(options))
9393
.await?;
9494

9595
Ok(())

0 commit comments

Comments
 (0)