Skip to content

Commit

Permalink
SAC: starting implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielePalaia committed Nov 8, 2024
1 parent 5a0bd3e commit 13faec7
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 18 deletions.
2 changes: 1 addition & 1 deletion protocol/src/commands/consumer_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ mod tests {
use super::ConsumerUpdateCommand;

#[test]
fn subscribe_request_test() {
fn consumer_update_response_test() {
command_encode_decode_test::<ConsumerUpdateCommand>();
}
}
2 changes: 1 addition & 1 deletion protocol/src/commands/consumer_update_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ mod tests {
use super::ConsumerUpdateRequestCommand;

#[test]
fn subscribe_request_test() {
fn consumer_update_request_test() {
command_encode_decode_test::<ConsumerUpdateRequestCommand>();
}
}
9 changes: 0 additions & 9 deletions protocol/src/response/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,13 +529,4 @@ mod tests {
COMMAND_CONSUMER_UPDATE
);
}

#[test]
fn consumer_update_request_response_test() {
response_test!(
ConsumerUpdateRequestCommand,
ResponseKind::ConsumerUpdateRequest,
COMMAND_CONSUMER_UPDATE_REQUEST
);
}
}
7 changes: 5 additions & 2 deletions src/client/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use futures::Stream;
use rabbitmq_stream_protocol::Response;
use rabbitmq_stream_protocol::{Response, ResponseKind};
use std::sync::{
atomic::{AtomicBool, AtomicU32, Ordering},
Arc,
Expand Down Expand Up @@ -167,7 +167,10 @@ where
while let Some(result) = stream.next().await {
match result {
Ok(item) => match item.correlation_id() {
Some(correlation_id) => state.dispatch(correlation_id, item).await,
Some(correlation_id) => match item.kind_ref() {
ResponseKind::ConsumerUpdate(consumer_update) => state.notify(item).await,
_ => state.dispatch(correlation_id, item).await,
},
None => state.notify(item).await,
},
Err(e) => {
Expand Down
36 changes: 31 additions & 5 deletions src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ use crate::{
Client, ClientOptions, Environment, MetricsCollector,
};
use futures::{task::AtomicWaker, Stream};
use rabbitmq_stream_protocol::commands::consumer_update::ConsumerUpdateCommand;
use rand::rngs::StdRng;
use rand::{seq::SliceRandom, SeedableRng};

type FilterPredicate = Option<Arc<dyn Fn(&Message) -> bool + Send + Sync>>;

type ConsumerUpdateListener = Option<Arc<dyn Fn(bool, &MessageContext) -> u64 + Send + Sync>>;

/// API for consuming RabbitMQ stream messages
pub struct Consumer {
// Mandatory in case of manual offset tracking
Expand Down Expand Up @@ -82,6 +85,26 @@ impl FilterConfiguration {
}
}

pub struct MessageContext {
consumer: Consumer,
subscriber_name: String,
reference: String,
}

impl MessageContext {
pub fn get_consumer(self) -> Consumer {
self.consumer
}

pub fn get_subscriber_name(self) -> String {
self.subscriber_name
}

pub fn get_reference(self) -> String {
self.reference
}
}

/// Builder for [`Consumer`]
pub struct ConsumerBuilder {
pub(crate) consumer_name: Option<String>,
Expand Down Expand Up @@ -317,27 +340,27 @@ impl MessageHandler for ConsumerMessageHandler {
async fn handle_message(&self, item: MessageResult) -> crate::RabbitMQStreamResult<()> {
match item {
Some(Ok(response)) => {
if let ResponseKind::Deliver(delivery) = response.kind() {
if let ResponseKind::Deliver(delivery) = response.kind_ref() {
let mut offset = delivery.chunk_first_offset;

let len = delivery.messages.len();
let d = delivery.clone();
trace!("Got delivery with messages {}", len);

// // client filter
let messages = match &self.0.filter_configuration {
Some(filter_input) => {
if let Some(f) = &filter_input.predicate {
delivery
.messages
d.messages
.into_iter()
.filter(|message| f(message))
.collect::<Vec<Message>>()
} else {
delivery.messages
d.messages
}
}

None => delivery.messages,
None => d.messages,
};

for message in messages {
Expand All @@ -363,6 +386,8 @@ impl MessageHandler for ConsumerMessageHandler {
// TODO handle credit fail
let _ = self.0.client.credit(self.0.subscription_id, 1).await;
self.0.metrics_collector.consume(len as u64).await;
} else {
println!("other message arrived");
}
}
Some(Err(err)) => {
Expand All @@ -377,6 +402,7 @@ impl MessageHandler for ConsumerMessageHandler {
Ok(())
}
}

/// Envelope from incoming message
#[derive(Debug)]
pub struct Delivery {
Expand Down
5 changes: 5 additions & 0 deletions src/superstream_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ impl SuperStreamConsumerBuilder {
self.client_provided_name = String::from(name);
self
}

pub fn properties(mut self, properties: HashMap<String, String>) -> Self {
self.properties = properties;
self
}
}

impl Stream for SuperStreamConsumer {
Expand Down

0 comments on commit 13faec7

Please sign in to comment.