Skip to content

Commit

Permalink
Implementing callback support and consumer_update response
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielePalaia committed Nov 11, 2024
1 parent 13faec7 commit 68ef2ad
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 39 deletions.
8 changes: 8 additions & 0 deletions protocol/src/commands/consumer_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ impl ConsumerUpdateCommand {
active,
}
}

Check warning on line 29 in protocol/src/commands/consumer_update.rs

View check run for this annotation

Codecov / codecov/patch

protocol/src/commands/consumer_update.rs#L23-L29

Added lines #L23 - L29 were not covered by tests

pub fn get_correlation_id(&self) -> u32 {
self.correlation_id
}

Check warning on line 33 in protocol/src/commands/consumer_update.rs

View check run for this annotation

Codecov / codecov/patch

protocol/src/commands/consumer_update.rs#L31-L33

Added lines #L31 - L33 were not covered by tests

pub fn is_active(&self) -> u8 {
self.active
}

Check warning on line 37 in protocol/src/commands/consumer_update.rs

View check run for this annotation

Codecov / codecov/patch

protocol/src/commands/consumer_update.rs#L35-L37

Added lines #L35 - L37 were not covered by tests
}

impl Encoder for ConsumerUpdateCommand {
Expand Down
5 changes: 0 additions & 5 deletions protocol/src/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,9 +397,4 @@ mod tests {
fn request_route_command() {
request_encode_decode_test::<SuperStreamRouteRequest>()
}

#[test]
fn request_consumer_update_request_command() {
request_encode_decode_test::<ConsumerUpdateRequestCommand>()
}
}
2 changes: 1 addition & 1 deletion src/client/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ where
match result {
Ok(item) => match item.correlation_id() {
Some(correlation_id) => match item.kind_ref() {
ResponseKind::ConsumerUpdate(consumer_update) => state.notify(item).await,
ResponseKind::ConsumerUpdate(_) => state.notify(item).await,

Check warning on line 171 in src/client/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

src/client/dispatcher.rs#L171

Added line #L171 was not covered by tests
_ => state.dispatch(correlation_id, item).await,
},
None => state.notify(item).await,
Expand Down
12 changes: 12 additions & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub use options::ClientOptions;
use rabbitmq_stream_protocol::{
commands::{
close::{CloseRequest, CloseResponse},
consumer_update_request::ConsumerUpdateRequestCommand,
create_stream::CreateStreamCommand,
create_super_stream::CreateSuperStreamCommand,
credit::CreditCommand,
Expand Down Expand Up @@ -851,4 +852,15 @@ impl Client {

Ok(config)
}

pub async fn consumer_update(
&self,
correlation_id: u32,
offset_specification: OffsetSpecification,
) -> RabbitMQStreamResult<GenericResponse> {
self.send_and_receive(|_| {
ConsumerUpdateRequestCommand::new(correlation_id, 1, offset_specification)
})
.await
}

Check warning on line 865 in src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/client/mod.rs#L856-L865

Added lines #L856 - L865 were not covered by tests
}
87 changes: 71 additions & 16 deletions src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use rabbitmq_stream_protocol::{
commands::subscribe::OffsetSpecification, message::Message, ResponseKind,
};

use core::option::Option::None;

use tokio::sync::mpsc::{channel, Receiver, Sender};
use tracing::trace;

Expand All @@ -26,13 +28,13 @@ use crate::{
Client, ClientOptions, Environment, MetricsCollector,
};
use futures::{task::AtomicWaker, Stream};
use rabbitmq_stream_protocol::commands::consumer_update::ConsumerUpdateCommand;
use rand::rngs::StdRng;
use rand::{seq::SliceRandom, SeedableRng};

type FilterPredicate = Option<Arc<dyn Fn(&Message) -> bool + Send + Sync>>;

type ConsumerUpdateListener = Option<Arc<dyn Fn(bool, &MessageContext) -> u64 + Send + Sync>>;
pub type ConsumerUpdateListener =
Arc<dyn Fn(u8, &MessageContext) -> OffsetSpecification + Send + Sync>;

/// API for consuming RabbitMQ stream messages
pub struct Consumer {
Expand All @@ -43,6 +45,7 @@ pub struct Consumer {
}

struct ConsumerInternal {
name: Option<String>,
client: Client,
stream: String,
offset_specification: OffsetSpecification,
Expand All @@ -52,6 +55,7 @@ struct ConsumerInternal {
waker: AtomicWaker,
metrics_collector: Arc<dyn MetricsCollector>,
filter_configuration: Option<FilterConfiguration>,
consumer_update_listener: Option<ConsumerUpdateListener>,
}

impl ConsumerInternal {
Expand Down Expand Up @@ -86,22 +90,17 @@ impl FilterConfiguration {
}

pub struct MessageContext {
consumer: Consumer,
subscriber_name: String,
reference: String,
consumer_name: Option<String>,
stream: String,
}

impl MessageContext {
pub fn get_consumer(self) -> Consumer {
self.consumer
pub fn get_name(self) -> Option<String> {
self.consumer_name
}

Check warning on line 100 in src/consumer.rs

View check run for this annotation

Codecov / codecov/patch

src/consumer.rs#L98-L100

Added lines #L98 - L100 were not covered by tests

pub fn get_subscriber_name(self) -> String {
self.subscriber_name
}

pub fn get_reference(self) -> String {
self.reference
pub fn get_stream(self) -> String {
self.stream
}

Check warning on line 104 in src/consumer.rs

View check run for this annotation

Codecov / codecov/patch

src/consumer.rs#L102-L104

Added lines #L102 - L104 were not covered by tests
}

Expand All @@ -111,6 +110,7 @@ pub struct ConsumerBuilder {
pub(crate) environment: Environment,
pub(crate) offset_specification: OffsetSpecification,
pub(crate) filter_configuration: Option<FilterConfiguration>,
pub(crate) consumer_update_listener: Option<ConsumerUpdateListener>,
pub(crate) client_provided_name: String,
pub(crate) properties: HashMap<String, String>,
}
Expand Down Expand Up @@ -172,6 +172,7 @@ impl ConsumerBuilder {
let subscription_id = 1;
let (tx, rx) = channel(10000);
let consumer = Arc::new(ConsumerInternal {
name: self.consumer_name.clone(),
subscription_id,
stream: stream.to_string(),
client: client.clone(),
Expand All @@ -181,6 +182,7 @@ impl ConsumerBuilder {
waker: AtomicWaker::new(),
metrics_collector: collector,
filter_configuration: self.filter_configuration.clone(),
consumer_update_listener: self.consumer_update_listener.clone(),
});
let msg_handler = ConsumerMessageHandler(consumer.clone());
client.set_handler(msg_handler).await;
Expand Down Expand Up @@ -213,7 +215,7 @@ impl ConsumerBuilder {

if response.is_ok() {
Ok(Consumer {
name: self.consumer_name,
name: self.consumer_name.clone(),
receiver: rx,
internal: consumer,
})
Expand Down Expand Up @@ -245,6 +247,26 @@ impl ConsumerBuilder {
self
}

pub fn consumer_update(
mut self,
consumer_update_listener: impl Fn(u8, &MessageContext) -> OffsetSpecification
+ Send
+ Sync
+ 'static,
) -> Self {
let f = Arc::new(consumer_update_listener);
self.consumer_update_listener = Some(f);
self
}

Check warning on line 260 in src/consumer.rs

View check run for this annotation

Codecov / codecov/patch

src/consumer.rs#L250-L260

Added lines #L250 - L260 were not covered by tests

pub fn consumer_update_arc(
mut self,
consumer_update_listener: Option<crate::consumer::ConsumerUpdateListener>,
) -> Self {
self.consumer_update_listener = consumer_update_listener;
self
}

pub fn properties(mut self, properties: HashMap<String, String>) -> Self {
self.properties = properties;
self
Expand Down Expand Up @@ -386,8 +408,41 @@ impl MessageHandler for ConsumerMessageHandler {
// TODO handle credit fail
let _ = self.0.client.credit(self.0.subscription_id, 1).await;
self.0.metrics_collector.consume(len as u64).await;
} else {
println!("other message arrived");
} else if let ResponseKind::ConsumerUpdate(consumer_update) = response.kind_ref() {
trace!("Received a ConsumerUpdate message");

Check warning on line 412 in src/consumer.rs

View check run for this annotation

Codecov / codecov/patch

src/consumer.rs#L412

Added line #L412 was not covered by tests
// If no callback is provided by the user we will restart from Next by protocol
// We need to respond to the server too
if self.0.consumer_update_listener.is_none() {
trace!("User defined callback is not provided");
let offset_specification = OffsetSpecification::Next;
let _ = self
.0
.client
.consumer_update(
consumer_update.get_correlation_id(),
offset_specification,
)
.await;

Check warning on line 425 in src/consumer.rs

View check run for this annotation

Codecov / codecov/patch

src/consumer.rs#L415-L425

Added lines #L415 - L425 were not covered by tests
} else {
// Otherwise the Offset specification is returned by the user callback
let is_active = consumer_update.is_active();
let message_context = MessageContext {
consumer_name: self.0.name.clone(),
stream: self.0.stream.clone(),
};
let consumer_update_listener_callback =
self.0.consumer_update_listener.clone().unwrap();
let offset_specification =
consumer_update_listener_callback(is_active, &message_context);
let _ = self
.0
.client
.consumer_update(
consumer_update.get_correlation_id(),
offset_specification,
)
.await;

Check warning on line 444 in src/consumer.rs

View check run for this annotation

Codecov / codecov/patch

src/consumer.rs#L428-L444

Added lines #L428 - L444 were not covered by tests
}
}
}
Some(Err(err)) => {
Expand Down
2 changes: 2 additions & 0 deletions src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl Environment {
environment: self.clone(),
offset_specification: OffsetSpecification::Next,
filter_configuration: None,
consumer_update_listener: None,
client_provided_name: String::from("rust-stream-consumer"),
properties: HashMap::new(),
}
Expand All @@ -84,6 +85,7 @@ impl Environment {
environment: self.clone(),
offset_specification: OffsetSpecification::Next,
filter_configuration: None,
consumer_update_listener: None,
client_provided_name: String::from("rust-super-stream-consumer"),
properties: HashMap::new(),
}
Expand Down
6 changes: 4 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,16 @@ pub type RabbitMQStreamResult<T> = Result<T, error::ClientError>;

pub use crate::client::{Client, ClientOptions, MetricsCollector};

pub use crate::consumer::{Consumer, ConsumerBuilder, ConsumerHandle, FilterConfiguration};
pub use crate::consumer::{
Consumer, ConsumerBuilder, ConsumerHandle, FilterConfiguration, MessageContext,
};
pub use crate::environment::{Environment, EnvironmentBuilder, TlsConfiguration};
pub use crate::producer::{Dedup, NoDedup, Producer, ProducerBuilder};
pub mod types {

pub use crate::byte_capacity::ByteCapacity;
pub use crate::client::{Broker, MessageResult, StreamMetadata};
pub use crate::consumer::Delivery;
pub use crate::consumer::{Delivery, MessageContext};
pub use crate::offset_specification::OffsetSpecification;
pub use crate::stream_creator::StreamCreator;
pub use crate::superstream::HashRoutingMurmurStrategy;
Expand Down
20 changes: 18 additions & 2 deletions src/superstream_consumer.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::client::Client;
use crate::consumer::Delivery;
use crate::consumer::{ConsumerUpdateListener, Delivery};
use crate::error::{ConsumerCloseError, ConsumerDeliveryError};
use crate::superstream::DefaultSuperStreamMetadata;
use crate::{error::ConsumerCreateError, ConsumerHandle, Environment, FilterConfiguration};
use crate::{
error::ConsumerCreateError, ConsumerHandle, Environment, FilterConfiguration, MessageContext,
};
use futures::task::AtomicWaker;
use futures::{Stream, StreamExt};
use rabbitmq_stream_protocol::commands::subscribe::OffsetSpecification;
Expand Down Expand Up @@ -33,6 +35,7 @@ pub struct SuperStreamConsumerBuilder {
pub(crate) environment: Environment,
pub(crate) offset_specification: OffsetSpecification,
pub(crate) filter_configuration: Option<FilterConfiguration>,
pub(crate) consumer_update_listener: Option<ConsumerUpdateListener>,
pub(crate) client_provided_name: String,
pub(crate) properties: HashMap<String, String>,
}
Expand Down Expand Up @@ -64,6 +67,7 @@ impl SuperStreamConsumerBuilder {
.offset(self.offset_specification.clone())
.client_provided_name(self.client_provided_name.as_str())
.filter_input(self.filter_configuration.clone())
.consumer_update_arc(self.consumer_update_listener.clone())
.properties(self.properties.clone())
.build(partition.as_str())
.await
Expand Down Expand Up @@ -101,6 +105,18 @@ impl SuperStreamConsumerBuilder {
self
}

pub fn consumer_update(
mut self,
consumer_update_listener: impl Fn(u8, &MessageContext) -> OffsetSpecification
+ Send
+ Sync
+ 'static,
) -> Self {
let f = Arc::new(consumer_update_listener);
self.consumer_update_listener = Some(f);
self
}

Check warning on line 118 in src/superstream_consumer.rs

View check run for this annotation

Codecov / codecov/patch

src/superstream_consumer.rs#L108-L118

Added lines #L108 - L118 were not covered by tests

pub fn client_provided_name(mut self, name: &str) -> Self {
self.client_provided_name = String::from(name);
self
Expand Down
13 changes: 0 additions & 13 deletions tests/integration/client_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,16 +457,3 @@ async fn client_test_route_test() {
test.partitions.get(0).unwrap()
);
}

#[tokio::test(flavor = "multi_thread")]
async fn client_consumer_update_request_test() {
let test = TestClient::create().await;

let response = test
.client
.consumer_update(OffsetSpecification::Next)
.await
.unwrap();

assert_eq!(&ResponseCode::Ok, response.code());
}

0 comments on commit 68ef2ad

Please sign in to comment.