From 82f93ad3e55523eabe58671f747513b0c4389a4b Mon Sep 17 00:00:00 2001 From: Daniele Date: Thu, 21 Nov 2024 12:25:33 +0100 Subject: [PATCH] Single active consumer implementation (#248) * implementing Consumer_Update command * implementing ConsumerUpdateRequest command * SAC: starting implementation * Implementing callback support and consumer_update response * adding basic test * improved test * expand unit test scope * adding example * Adding README * enabling naming for super_stream consumers and setting up sac properties internally * expanding test * few improvements and test for simple SAC * making consumer_update callback able to call async methods * making Delivery export client in order to use store_offset and review super_stream example --- examples/single_active_consumer/README.md | 23 ++ .../single_active_consumer.rs | 97 ++++++ .../single_active_consumer_super_stream.rs | 94 ++++++ protocol/src/commands/consumer_update.rs | 88 ++++++ .../src/commands/consumer_update_request.rs | 86 ++++++ protocol/src/commands/mod.rs | 2 + protocol/src/protocol.rs | 1 + protocol/src/request/mod.rs | 22 +- protocol/src/request/shims.rs | 9 +- protocol/src/response/mod.rs | 53 +++- src/client/dispatcher.rs | 7 +- src/client/mod.rs | 15 +- src/consumer.rs | 135 +++++++- src/environment.rs | 5 + src/error.rs | 3 + src/lib.rs | 6 +- src/superstream_consumer.rs | 61 +++- tests/integration/consumer_test.rs | 291 +++++++++++++++++- 18 files changed, 962 insertions(+), 36 deletions(-) create mode 100644 examples/single_active_consumer/README.md create mode 100644 examples/single_active_consumer/single_active_consumer.rs create mode 100644 examples/single_active_consumer/single_active_consumer_super_stream.rs create mode 100644 protocol/src/commands/consumer_update.rs create mode 100644 protocol/src/commands/consumer_update_request.rs diff --git a/examples/single_active_consumer/README.md b/examples/single_active_consumer/README.md new file mode 100644 index 0000000..29523a8 --- /dev/null +++ b/examples/single_active_consumer/README.md @@ -0,0 +1,23 @@ +Single active consumer +--- + +This is an example to enable single active consumer functionality for superstream: +https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams +https://www.rabbitmq.com/blog/2022/07/13/rabbitmq-3-11-feature-preview-super-streams + +This folder contains a consumer and a super-stream consumer configured to enable it. +You can use the example in the super-stream folder to produce messages for a super-stream. + +You can then run the single_active_consumer_super_stream.rs in this folder. +Assuming the super-stream is composed by three streams, you can see that the Consumer will consume messages from all the streams part of the superstream. + +You can then run another consumer in parallel. +now you'll see that one of the two consumers will consume from 2 streams while the other on one stream. + +If you run another you'll see that every Consumer will read from a single stream. + +If you then stop one of the Consumer you'll notice that the related stream is now read from on the Consumer which is still running. + + + + diff --git a/examples/single_active_consumer/single_active_consumer.rs b/examples/single_active_consumer/single_active_consumer.rs new file mode 100644 index 0000000..24b4705 --- /dev/null +++ b/examples/single_active_consumer/single_active_consumer.rs @@ -0,0 +1,97 @@ +use futures::StreamExt; +use rabbitmq_stream_client::error::StreamCreateError; +use rabbitmq_stream_client::types::{ + ByteCapacity, OffsetSpecification, ResponseCode, +}; + + +#[tokio::main] +async fn main() -> Result<(), Box> { + use rabbitmq_stream_client::Environment; + let environment = Environment::builder().build().await?; + let message_count = 1000000; + let stream = "hello-rust-super-stream-2"; + + let create_response = environment + .stream_creator() + .max_length(ByteCapacity::GB(5)) + .create(stream) + .await; + + if let Err(e) = create_response { + if let StreamCreateError::Create { stream, status } = e { + match status { + // we can ignore this error because the stream already exists + ResponseCode::StreamAlreadyExists => {} + err => { + println!("Error creating stream: {:?} {:?}", stream, err); + } + } + } + } + println!( + "Super stream consumer example, consuming messages from the super stream {}", + stream + ); + + let mut consumer = environment + .consumer() + // Mandatory if sac is enabled + .name("consumer-group-1") + .offset(OffsetSpecification::First) + .enable_single_active_consumer(true) + .client_provided_name("my super stream consumer for hello rust") + .consumer_update(move |active, message_context| async move { + let name = message_context.name(); + let stream = message_context.stream(); + let client = message_context.client(); + + println!( + "single active consumer: is active: {} on stream: {} with consumer_name: {}", + active, stream, name + ); + let stored_offset = client.query_offset(name, stream.as_str()).await; + + if let Err(e) = stored_offset { + return OffsetSpecification::First; + } + + let stored_offset_u = stored_offset.unwrap(); + println!("restarting from stored_offset: {}", stored_offset_u); + OffsetSpecification::Offset(stored_offset_u) + + }) + .build(stream) + .await + .unwrap(); + + for i in 0..message_count { + let delivery = consumer.next().await.unwrap(); + { + let delivery = delivery.unwrap(); + println!( + "Got message: {:#?} from stream: {} with offset: {}", + delivery + .message() + .data() + .map(|data| String::from_utf8(data.to_vec()).unwrap()) + .unwrap(), + delivery.stream(), + delivery.offset() + ); + + //store an offset + if i == 10000 { + let _ = consumer + .store_offset(i) + .await + .unwrap_or_else(|e| println!("Err: {}", e)); + } + } + } + + println!("Stopping consumer..."); + let _ = consumer.handle().close().await; + println!("consumer stopped"); + Ok(()) +} \ No newline at end of file diff --git a/examples/single_active_consumer/single_active_consumer_super_stream.rs b/examples/single_active_consumer/single_active_consumer_super_stream.rs new file mode 100644 index 0000000..c66b9aa --- /dev/null +++ b/examples/single_active_consumer/single_active_consumer_super_stream.rs @@ -0,0 +1,94 @@ +use futures::StreamExt; +use rabbitmq_stream_client::error::StreamCreateError; +use rabbitmq_stream_client::types::{ + ByteCapacity, OffsetSpecification, ResponseCode, SuperStreamConsumer, +}; +use std::collections::HashMap; + +#[tokio::main] +async fn main() -> Result<(), Box> { + use rabbitmq_stream_client::Environment; + let environment = Environment::builder().build().await?; + let message_count = 1000000; + let super_stream = "hello-rust-super-stream"; + + let create_response = environment + .stream_creator() + .max_length(ByteCapacity::GB(5)) + .create_super_stream(super_stream, 3, None) + .await; + + if let Err(e) = create_response { + if let StreamCreateError::Create { stream, status } = e { + match status { + // we can ignore this error because the stream already exists + ResponseCode::StreamAlreadyExists => {} + err => { + println!("Error creating stream: {:?} {:?}", stream, err); + } + } + } + } + println!( + "Super stream consumer example, consuming messages from the super stream {}", + super_stream + ); + + let mut super_stream_consumer: SuperStreamConsumer = environment + .super_stream_consumer() + // Mandatory if sac is enabled + .name("consumer-group-1") + .offset(OffsetSpecification::First) + .enable_single_active_consumer(true) + .client_provided_name("my super stream consumer for hello rust") + .consumer_update(move |active, message_context| async move { + let name = message_context.name(); + let stream = message_context.stream(); + let client = message_context.client(); + + println!( + "single active consumer: is active: {} on stream: {} with consumer_name: {}", + active, stream, name + ); + let stored_offset = client.query_offset(name, stream.as_str()).await; + + if let Err(e) = stored_offset { + return OffsetSpecification::First; + } + let stored_offset_u = stored_offset.unwrap(); + println!("stored_offset_u {}", stored_offset_u.clone()); + OffsetSpecification::Offset(stored_offset_u) + + }) + .build(super_stream) + .await + .unwrap(); + + for _ in 0..message_count { + let delivery = super_stream_consumer.next().await.unwrap(); + { + let delivery = delivery.unwrap(); + println!( + "Got message: {:#?} from stream: {} with offset: {}", + delivery + .message() + .data() + .map(|data| String::from_utf8(data.to_vec()).unwrap()) + .unwrap(), + delivery.stream(), + delivery.offset() + ); + + // Store an offset for every consumer + if delivery.consumer_name().is_some() && delivery.offset() == 1000 { + super_stream_consumer.client().store_offset(delivery.consumer_name().unwrap().as_str(), delivery.stream().as_str(), delivery.offset()).await; + } + + } + } + + println!("Stopping super stream consumer..."); + let _ = super_stream_consumer.handle().close().await; + println!("Super stream consumer stopped"); + Ok(()) +} \ No newline at end of file diff --git a/protocol/src/commands/consumer_update.rs b/protocol/src/commands/consumer_update.rs new file mode 100644 index 0000000..6d07472 --- /dev/null +++ b/protocol/src/commands/consumer_update.rs @@ -0,0 +1,88 @@ +use std::io::Write; + +#[cfg(test)] +use fake::Fake; + +use crate::{ + codec::{Decoder, Encoder}, + error::{DecodeError, EncodeError}, + protocol::commands::COMMAND_CONSUMER_UPDATE, +}; + +use super::Command; + +#[cfg_attr(test, derive(fake::Dummy))] +#[derive(PartialEq, Eq, Debug)] +pub struct ConsumerUpdateCommand { + pub(crate) correlation_id: u32, + subscription_id: u8, + active: u8, +} + +impl ConsumerUpdateCommand { + pub fn new(correlation_id: u32, subscription_id: u8, active: u8) -> Self { + Self { + correlation_id, + subscription_id, + active, + } + } + + pub fn get_correlation_id(&self) -> u32 { + self.correlation_id + } + + pub fn is_active(&self) -> u8 { + self.active + } +} + +impl Encoder for ConsumerUpdateCommand { + fn encoded_size(&self) -> u32 { + self.correlation_id.encoded_size() + + self.subscription_id.encoded_size() + + self.active.encoded_size() + } + + fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> { + self.correlation_id.encode(writer)?; + self.subscription_id.encode(writer)?; + self.active.encode(writer)?; + Ok(()) + } +} + +impl Decoder for ConsumerUpdateCommand { + fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> { + let (input, correlation_id) = u32::decode(input)?; + let (input, subscription_id) = u8::decode(input)?; + let (input, active) = u8::decode(input)?; + + Ok(( + input, + ConsumerUpdateCommand { + correlation_id, + subscription_id, + active, + }, + )) + } +} + +impl Command for ConsumerUpdateCommand { + fn key(&self) -> u16 { + COMMAND_CONSUMER_UPDATE + } +} + +#[cfg(test)] +mod tests { + use crate::commands::tests::command_encode_decode_test; + + use super::ConsumerUpdateCommand; + + #[test] + fn consumer_update_response_test() { + command_encode_decode_test::(); + } +} diff --git a/protocol/src/commands/consumer_update_request.rs b/protocol/src/commands/consumer_update_request.rs new file mode 100644 index 0000000..38c1f5f --- /dev/null +++ b/protocol/src/commands/consumer_update_request.rs @@ -0,0 +1,86 @@ +use std::io::Write; + +#[cfg(test)] +use fake::Fake; + +use crate::{ + codec::{Decoder, Encoder}, + error::{DecodeError, EncodeError}, + protocol::commands::COMMAND_CONSUMER_UPDATE_REQUEST, +}; + +use crate::commands::subscribe::OffsetSpecification; + +use super::Command; + +#[cfg_attr(test, derive(fake::Dummy))] +#[derive(PartialEq, Eq, Debug)] +pub struct ConsumerUpdateRequestCommand { + pub(crate) correlation_id: u32, + response_code: u16, + offset_specification: OffsetSpecification, +} + +impl ConsumerUpdateRequestCommand { + pub fn new( + correlation_id: u32, + response_code: u16, + offset_specification: OffsetSpecification, + ) -> Self { + Self { + correlation_id, + response_code, + offset_specification, + } + } +} + +impl Encoder for ConsumerUpdateRequestCommand { + fn encoded_size(&self) -> u32 { + self.correlation_id.encoded_size() + + self.response_code.encoded_size() + + self.offset_specification.encoded_size() + } + + fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> { + self.correlation_id.encode(writer)?; + self.response_code.encode(writer)?; + self.offset_specification.encode(writer)?; + Ok(()) + } +} + +impl Decoder for ConsumerUpdateRequestCommand { + fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> { + let (input, correlation_id) = u32::decode(input)?; + let (input, response_code) = u16::decode(input)?; + let (input, offset_specification) = OffsetSpecification::decode(input)?; + + Ok(( + input, + ConsumerUpdateRequestCommand { + correlation_id, + response_code, + offset_specification, + }, + )) + } +} + +impl Command for ConsumerUpdateRequestCommand { + fn key(&self) -> u16 { + COMMAND_CONSUMER_UPDATE_REQUEST + } +} + +#[cfg(test)] +mod tests { + use crate::commands::tests::command_encode_decode_test; + + use super::ConsumerUpdateRequestCommand; + + #[test] + fn consumer_update_request_test() { + command_encode_decode_test::(); + } +} diff --git a/protocol/src/commands/mod.rs b/protocol/src/commands/mod.rs index f6ecf1f..c8f8517 100644 --- a/protocol/src/commands/mod.rs +++ b/protocol/src/commands/mod.rs @@ -1,6 +1,8 @@ use crate::protocol::version::PROTOCOL_VERSION; pub mod close; +pub mod consumer_update; +pub mod consumer_update_request; pub mod create_stream; pub mod create_super_stream; pub mod credit; diff --git a/protocol/src/protocol.rs b/protocol/src/protocol.rs index 5dcb0d3..2c41cbe 100644 --- a/protocol/src/protocol.rs +++ b/protocol/src/protocol.rs @@ -32,6 +32,7 @@ pub mod commands { pub const COMMAND_STREAMS_STATS: u16 = 28; pub const COMMAND_CREATE_SUPER_STREAM: u16 = 29; pub const COMMAND_DELETE_SUPER_STREAM: u16 = 30; + pub const COMMAND_CONSUMER_UPDATE_REQUEST: u16 = 32794; } // server responses diff --git a/protocol/src/request/mod.rs b/protocol/src/request/mod.rs index 1eea47b..fe48803 100644 --- a/protocol/src/request/mod.rs +++ b/protocol/src/request/mod.rs @@ -3,9 +3,9 @@ use std::io::Write; use crate::{ codec::{decoder::read_u32, Decoder, Encoder}, commands::{ - close::CloseRequest, create_stream::CreateStreamCommand, - create_super_stream::CreateSuperStreamCommand, credit::CreditCommand, - declare_publisher::DeclarePublisherCommand, delete::Delete, + close::CloseRequest, consumer_update_request::ConsumerUpdateRequestCommand, + create_stream::CreateStreamCommand, create_super_stream::CreateSuperStreamCommand, + credit::CreditCommand, declare_publisher::DeclarePublisherCommand, delete::Delete, delete_publisher::DeletePublisherCommand, delete_super_stream::DeleteSuperStreamCommand, exchange_command_versions::ExchangeCommandVersionsRequest, heart_beat::HeartBeatCommand, metadata::MetadataCommand, open::OpenCommand, peer_properties::PeerPropertiesCommand, @@ -68,6 +68,7 @@ pub enum RequestKind { DeleteSuperStream(DeleteSuperStreamCommand), SuperStreamPartitions(SuperStreamPartitionsRequest), SuperStreamRoute(SuperStreamRouteRequest), + ConsumerUpdateRequest(ConsumerUpdateRequestCommand), } impl Encoder for RequestKind { @@ -105,6 +106,9 @@ impl Encoder for RequestKind { super_stream_partitions.encoded_size() } RequestKind::SuperStreamRoute(super_stream_route) => super_stream_route.encoded_size(), + RequestKind::ConsumerUpdateRequest(consumer_update_request) => { + consumer_update_request.encoded_size() + } } } @@ -142,6 +146,9 @@ impl Encoder for RequestKind { super_stream_partition.encode(writer) } RequestKind::SuperStreamRoute(super_stream_route) => super_stream_route.encode(writer), + RequestKind::ConsumerUpdateRequest(consumer_update_request) => { + consumer_update_request.encode(writer) + } } } } @@ -222,6 +229,9 @@ impl Decoder for Request { COMMAND_ROUTE => { SuperStreamRouteRequest::decode(input).map(|(i, kind)| (i, kind.into()))? } + COMMAND_CONSUMER_UPDATE_REQUEST => { + ConsumerUpdateRequestCommand::decode(input).map(|(i, kind)| (i, kind.into()))? + } n => return Err(DecodeError::UnsupportedResponseType(n)), }; Ok((input, Request { header, kind: cmd })) @@ -234,9 +244,9 @@ mod tests { use crate::{ codec::{Decoder, Encoder}, commands::{ - close::CloseRequest, create_stream::CreateStreamCommand, - create_super_stream::CreateSuperStreamCommand, credit::CreditCommand, - declare_publisher::DeclarePublisherCommand, delete::Delete, + close::CloseRequest, consumer_update_request::ConsumerUpdateRequestCommand, + create_stream::CreateStreamCommand, create_super_stream::CreateSuperStreamCommand, + credit::CreditCommand, declare_publisher::DeclarePublisherCommand, delete::Delete, delete_publisher::DeletePublisherCommand, delete_super_stream::DeleteSuperStreamCommand, exchange_command_versions::ExchangeCommandVersionsRequest, diff --git a/protocol/src/request/shims.rs b/protocol/src/request/shims.rs index 102efe6..2d8c73b 100644 --- a/protocol/src/request/shims.rs +++ b/protocol/src/request/shims.rs @@ -2,7 +2,8 @@ use crate::commands::create_super_stream::CreateSuperStreamCommand; use crate::commands::delete_super_stream::DeleteSuperStreamCommand; use crate::{ commands::{ - close::CloseRequest, create_stream::CreateStreamCommand, credit::CreditCommand, + close::CloseRequest, consumer_update_request::ConsumerUpdateRequestCommand, + create_stream::CreateStreamCommand, credit::CreditCommand, declare_publisher::DeclarePublisherCommand, delete::Delete, delete_publisher::DeletePublisherCommand, exchange_command_versions::ExchangeCommandVersionsRequest, heart_beat::HeartBeatCommand, @@ -164,3 +165,9 @@ impl From for RequestKind { RequestKind::SuperStreamRoute(cmd) } } + +impl From for RequestKind { + fn from(cmd: ConsumerUpdateRequestCommand) -> Self { + RequestKind::ConsumerUpdateRequest(cmd) + } +} diff --git a/protocol/src/response/mod.rs b/protocol/src/response/mod.rs index 0f394c1..9c1e920 100644 --- a/protocol/src/response/mod.rs +++ b/protocol/src/response/mod.rs @@ -6,9 +6,10 @@ use crate::{ Decoder, }, commands::{ - close::CloseResponse, credit::CreditResponse, deliver::DeliverCommand, - exchange_command_versions::ExchangeCommandVersionsResponse, generic::GenericResponse, - heart_beat::HeartbeatResponse, metadata::MetadataResponse, + close::CloseResponse, consumer_update::ConsumerUpdateCommand, + consumer_update_request::ConsumerUpdateRequestCommand, credit::CreditResponse, + deliver::DeliverCommand, exchange_command_versions::ExchangeCommandVersionsResponse, + generic::GenericResponse, heart_beat::HeartbeatResponse, metadata::MetadataResponse, metadata_update::MetadataUpdateCommand, open::OpenResponse, peer_properties::PeerPropertiesResponse, publish_confirm::PublishConfirm, publish_error::PublishErrorResponse, query_offset::QueryOffsetResponse, @@ -72,6 +73,8 @@ pub enum ResponseKind { ExchangeCommandVersions(ExchangeCommandVersionsResponse), SuperStreamPartitions(SuperStreamPartitionsResponse), SuperStreamRoute(SuperStreamRouteResponse), + ConsumerUpdate(ConsumerUpdateCommand), + ConsumerUpdateRequest(ConsumerUpdateRequestCommand), } impl Response { @@ -107,6 +110,12 @@ impl Response { ResponseKind::SuperStreamRoute(super_stream_route_command) => { Some(super_stream_route_command.correlation_id) } + ResponseKind::ConsumerUpdate(consumer_update_command) => { + Some(consumer_update_command.correlation_id) + } + ResponseKind::ConsumerUpdateRequest(consumer_update_request_command) => { + Some(consumer_update_request_command.correlation_id) + } } } @@ -151,6 +160,7 @@ impl Decoder for Response { | COMMAND_CREATE_STREAM | COMMAND_CREATE_SUPER_STREAM | COMMAND_DELETE_SUPER_STREAM + | COMMAND_CONSUMER_UPDATE_REQUEST | COMMAND_DELETE_STREAM => { GenericResponse::decode(input).map(|(i, kind)| (i, ResponseKind::Generic(kind)))? } @@ -188,6 +198,8 @@ impl Decoder for Response { .map(|(remaining, kind)| (remaining, ResponseKind::SuperStreamPartitions(kind)))?, COMMAND_ROUTE => SuperStreamRouteResponse::decode(input) .map(|(remaining, kind)| (remaining, ResponseKind::SuperStreamRoute(kind)))?, + COMMAND_CONSUMER_UPDATE => ConsumerUpdateCommand::decode(input) + .map(|(remaining, kind)| (remaining, ResponseKind::ConsumerUpdate(kind)))?, n => return Err(DecodeError::UnsupportedResponseType(n)), }; Ok((input, Response { header, kind })) @@ -219,7 +231,8 @@ mod tests { use crate::{ codec::{Decoder, Encoder}, commands::{ - close::CloseResponse, deliver::DeliverCommand, + close::CloseResponse, consumer_update::ConsumerUpdateCommand, + consumer_update_request::ConsumerUpdateRequestCommand, deliver::DeliverCommand, exchange_command_versions::ExchangeCommandVersionsResponse, generic::GenericResponse, heart_beat::HeartbeatResponse, metadata::MetadataResponse, metadata_update::MetadataUpdateCommand, open::OpenResponse, @@ -232,11 +245,11 @@ mod tests { }, protocol::{ commands::{ - COMMAND_CLOSE, COMMAND_DELIVER, COMMAND_HEARTBEAT, COMMAND_METADATA, - COMMAND_METADATA_UPDATE, COMMAND_OPEN, COMMAND_PARTITIONS, COMMAND_PEER_PROPERTIES, - COMMAND_PUBLISH_CONFIRM, COMMAND_PUBLISH_ERROR, COMMAND_QUERY_OFFSET, - COMMAND_QUERY_PUBLISHER_SEQUENCE, COMMAND_ROUTE, COMMAND_SASL_AUTHENTICATE, - COMMAND_SASL_HANDSHAKE, COMMAND_TUNE, + COMMAND_CLOSE, COMMAND_CONSUMER_UPDATE, COMMAND_CONSUMER_UPDATE_REQUEST, + COMMAND_DELIVER, COMMAND_HEARTBEAT, COMMAND_METADATA, COMMAND_METADATA_UPDATE, + COMMAND_OPEN, COMMAND_PARTITIONS, COMMAND_PEER_PROPERTIES, COMMAND_PUBLISH_CONFIRM, + COMMAND_PUBLISH_ERROR, COMMAND_QUERY_OFFSET, COMMAND_QUERY_PUBLISHER_SEQUENCE, + COMMAND_ROUTE, COMMAND_SASL_AUTHENTICATE, COMMAND_SASL_HANDSHAKE, COMMAND_TUNE, }, version::PROTOCOL_VERSION, }, @@ -244,6 +257,7 @@ mod tests { types::Header, ResponseCode, }; + impl Encoder for ResponseKind { fn encoded_size(&self) -> u32 { match self { @@ -273,6 +287,12 @@ mod tests { ResponseKind::SuperStreamRoute(super_stream_response) => { super_stream_response.encoded_size() } + ResponseKind::ConsumerUpdate(consumer_update_response) => { + consumer_update_response.encoded_size() + } + ResponseKind::ConsumerUpdateRequest(consumer_update_request_response) => { + consumer_update_request_response.encoded_size() + } } } @@ -307,6 +327,12 @@ mod tests { ResponseKind::SuperStreamRoute(super_stream_command_versions) => { super_stream_command_versions.encode(writer) } + ResponseKind::ConsumerUpdate(consumer_update_command_version) => { + consumer_update_command_version.encode(writer) + } + ResponseKind::ConsumerUpdateRequest(consumer_update_request_command_version) => { + consumer_update_request_command_version.encode(writer) + } } } } @@ -494,4 +520,13 @@ mod tests { COMMAND_ROUTE ); } + + #[test] + fn consumer_update_response_test() { + response_test!( + ConsumerUpdateCommand, + ResponseKind::ConsumerUpdate, + COMMAND_CONSUMER_UPDATE + ); + } } diff --git a/src/client/dispatcher.rs b/src/client/dispatcher.rs index c8b1291..1abcf43 100644 --- a/src/client/dispatcher.rs +++ b/src/client/dispatcher.rs @@ -1,5 +1,5 @@ use futures::Stream; -use rabbitmq_stream_protocol::Response; +use rabbitmq_stream_protocol::{Response, ResponseKind}; use std::sync::{ atomic::{AtomicBool, AtomicU32, Ordering}, Arc, @@ -167,7 +167,10 @@ where while let Some(result) = stream.next().await { match result { Ok(item) => match item.correlation_id() { - Some(correlation_id) => state.dispatch(correlation_id, item).await, + Some(correlation_id) => match item.kind_ref() { + ResponseKind::ConsumerUpdate(_) => state.notify(item).await, + _ => state.dispatch(correlation_id, item).await, + }, None => state.notify(item).await, }, Err(e) => { diff --git a/src/client/mod.rs b/src/client/mod.rs index dc242d6..98bab85 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -34,6 +34,7 @@ use tokio_rustls::{rustls, TlsConnector}; use tokio_util::codec::Framed; use tracing::trace; +use crate::{error::ClientError, RabbitMQStreamResult}; pub use message::ClientMessage; pub use metadata::{Broker, StreamMetadata}; pub use metrics::MetricsCollector; @@ -41,6 +42,7 @@ pub use options::ClientOptions; use rabbitmq_stream_protocol::{ commands::{ close::{CloseRequest, CloseResponse}, + consumer_update_request::ConsumerUpdateRequestCommand, create_stream::CreateStreamCommand, create_super_stream::CreateSuperStreamCommand, credit::CreditCommand, @@ -71,8 +73,6 @@ use rabbitmq_stream_protocol::{ FromResponse, Request, Response, ResponseCode, ResponseKind, }; -use crate::{error::ClientError, RabbitMQStreamResult}; - pub use self::handler::{MessageHandler, MessageResult}; use self::{ channel::{channel, ChannelReceiver, ChannelSender}, @@ -852,4 +852,15 @@ impl Client { Ok(config) } + + pub async fn consumer_update( + &self, + correlation_id: u32, + offset_specification: OffsetSpecification, + ) -> RabbitMQStreamResult { + self.send_and_receive(|_| { + ConsumerUpdateRequestCommand::new(correlation_id, 1, offset_specification) + }) + .await + } } diff --git a/src/consumer.rs b/src/consumer.rs index 519cc54..8a41bb1 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -15,6 +15,9 @@ use rabbitmq_stream_protocol::{ commands::subscribe::OffsetSpecification, message::Message, ResponseKind, }; +use core::option::Option::None; +use futures::FutureExt; +use std::future::Future; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tracing::trace; @@ -25,12 +28,15 @@ use crate::{ error::{ConsumerCloseError, ConsumerCreateError, ConsumerDeliveryError}, Client, ClientOptions, Environment, MetricsCollector, }; -use futures::{task::AtomicWaker, Stream}; +use futures::{future::BoxFuture, task::AtomicWaker, Stream}; use rand::rngs::StdRng; use rand::{seq::SliceRandom, SeedableRng}; type FilterPredicate = Option bool + Send + Sync>>; +pub type ConsumerUpdateListener = + Arc BoxFuture<'static, OffsetSpecification> + Send + Sync>; + /// API for consuming RabbitMQ stream messages pub struct Consumer { // Mandatory in case of manual offset tracking @@ -40,6 +46,7 @@ pub struct Consumer { } struct ConsumerInternal { + name: Option, client: Client, stream: String, offset_specification: OffsetSpecification, @@ -49,6 +56,7 @@ struct ConsumerInternal { waker: AtomicWaker, metrics_collector: Arc, filter_configuration: Option, + consumer_update_listener: Option, } impl ConsumerInternal { @@ -82,21 +90,50 @@ impl FilterConfiguration { } } +#[derive(Clone)] +pub struct MessageContext { + name: String, + stream: String, + client: Client, +} + +impl MessageContext { + pub fn name(&self) -> String { + self.name.clone() + } + + pub fn stream(&self) -> String { + self.stream.clone() + } + + pub fn client(&self) -> Client { + self.client.clone() + } +} + /// Builder for [`Consumer`] pub struct ConsumerBuilder { pub(crate) consumer_name: Option, pub(crate) environment: Environment, pub(crate) offset_specification: OffsetSpecification, pub(crate) filter_configuration: Option, + pub(crate) consumer_update_listener: Option, pub(crate) client_provided_name: String, pub(crate) properties: HashMap, + pub(crate) is_single_active_consumer: bool, } impl ConsumerBuilder { pub async fn build(mut self, stream: &str) -> Result { + if (self.is_single_active_consumer + || self.properties.contains_key("single-active-consumer")) + && self.consumer_name.is_none() + { + return Err(ConsumerCreateError::SingleActiveConsumerNotSupported); + } + // Connect to the user specified node first, then look for a random replica to connect to instead. // This is recommended for load balancing purposes - let mut opt_with_client_provided_name = self.environment.options.client_options.clone(); opt_with_client_provided_name.client_provided_name = self.client_provided_name.clone(); @@ -149,6 +186,7 @@ impl ConsumerBuilder { let subscription_id = 1; let (tx, rx) = channel(10000); let consumer = Arc::new(ConsumerInternal { + name: self.consumer_name.clone(), subscription_id, stream: stream.to_string(), client: client.clone(), @@ -158,6 +196,7 @@ impl ConsumerBuilder { waker: AtomicWaker::new(), metrics_collector: collector, filter_configuration: self.filter_configuration.clone(), + consumer_update_listener: self.consumer_update_listener.clone(), }); let msg_handler = ConsumerMessageHandler(consumer.clone()); client.set_handler(msg_handler).await; @@ -178,6 +217,13 @@ impl ConsumerBuilder { ); } + if self.is_single_active_consumer { + self.properties + .insert("single-active-consumer".to_string(), "true".to_string()); + self.properties + .insert("name".to_string(), self.consumer_name.clone().unwrap()); + } + let response = client .subscribe( subscription_id, @@ -190,7 +236,7 @@ impl ConsumerBuilder { if response.is_ok() { Ok(Consumer { - name: self.consumer_name, + name: self.consumer_name.clone(), receiver: rx, internal: consumer, }) @@ -217,11 +263,41 @@ impl ConsumerBuilder { self } + pub fn name_optional(mut self, consumer_name: Option) -> Self { + self.consumer_name = consumer_name; + self + } + + pub fn enable_single_active_consumer(mut self, is_single_active_consumer: bool) -> Self { + self.is_single_active_consumer = is_single_active_consumer; + self + } + pub fn filter_input(mut self, filter_configuration: Option) -> Self { self.filter_configuration = filter_configuration; self } + pub fn consumer_update( + mut self, + consumer_update_listener: impl Fn(u8, MessageContext) -> Fut + Send + Sync + 'static, + ) -> Self + where + Fut: Future + Send + Sync + 'static, + { + let f = Arc::new(move |a, b| consumer_update_listener(a, b).boxed()); + self.consumer_update_listener = Some(f); + self + } + + pub fn consumer_update_arc( + mut self, + consumer_update_listener: Option, + ) -> Self { + self.consumer_update_listener = consumer_update_listener; + self + } + pub fn properties(mut self, properties: HashMap) -> Self { self.properties = properties; self @@ -317,27 +393,27 @@ impl MessageHandler for ConsumerMessageHandler { async fn handle_message(&self, item: MessageResult) -> crate::RabbitMQStreamResult<()> { match item { Some(Ok(response)) => { - if let ResponseKind::Deliver(delivery) = response.kind() { + if let ResponseKind::Deliver(delivery) = response.kind_ref() { let mut offset = delivery.chunk_first_offset; let len = delivery.messages.len(); + let d = delivery.clone(); trace!("Got delivery with messages {}", len); // // client filter let messages = match &self.0.filter_configuration { Some(filter_input) => { if let Some(f) = &filter_input.predicate { - delivery - .messages + d.messages .into_iter() .filter(|message| f(message)) .collect::>() } else { - delivery.messages + d.messages } } - None => delivery.messages, + None => d.messages, }; for message in messages { @@ -351,6 +427,7 @@ impl MessageHandler for ConsumerMessageHandler { .0 .sender .send(Ok(Delivery { + name: self.0.name.clone(), stream: self.0.stream.clone(), subscription_id: self.0.subscription_id, message, @@ -363,6 +440,42 @@ impl MessageHandler for ConsumerMessageHandler { // TODO handle credit fail let _ = self.0.client.credit(self.0.subscription_id, 1).await; self.0.metrics_collector.consume(len as u64).await; + } else if let ResponseKind::ConsumerUpdate(consumer_update) = response.kind_ref() { + trace!("Received a ConsumerUpdate message"); + // If no callback is provided by the user we will restart from Next by protocol + // We need to respond to the server too + if self.0.consumer_update_listener.is_none() { + trace!("User defined callback is not provided"); + let offset_specification = OffsetSpecification::Next; + let _ = self + .0 + .client + .consumer_update( + consumer_update.get_correlation_id(), + offset_specification, + ) + .await; + } else { + // Otherwise the Offset specification is returned by the user callback + let is_active = consumer_update.is_active(); + let message_context = MessageContext { + name: self.0.name.clone().unwrap(), + stream: self.0.stream.clone(), + client: self.0.client.clone(), + }; + let consumer_update_listener_callback = + self.0.consumer_update_listener.clone().unwrap(); + let offset_specification = + consumer_update_listener_callback(is_active, message_context).await; + let _ = self + .0 + .client + .consumer_update( + consumer_update.get_correlation_id(), + offset_specification, + ) + .await; + } } } Some(Err(err)) => { @@ -377,9 +490,11 @@ impl MessageHandler for ConsumerMessageHandler { Ok(()) } } + /// Envelope from incoming message #[derive(Debug)] pub struct Delivery { + name: Option, stream: String, subscription_id: u8, message: Message, @@ -406,4 +521,8 @@ impl Delivery { pub fn offset(&self) -> u64 { self.offset } + + pub fn consumer_name(&self) -> Option { + self.name.clone() + } } diff --git a/src/environment.rs b/src/environment.rs index 290415d..859cd84 100644 --- a/src/environment.rs +++ b/src/environment.rs @@ -74,18 +74,23 @@ impl Environment { environment: self.clone(), offset_specification: OffsetSpecification::Next, filter_configuration: None, + consumer_update_listener: None, client_provided_name: String::from("rust-stream-consumer"), properties: HashMap::new(), + is_single_active_consumer: false, } } pub fn super_stream_consumer(&self) -> SuperStreamConsumerBuilder { SuperStreamConsumerBuilder { + super_stream_consumer_name: None, environment: self.clone(), offset_specification: OffsetSpecification::Next, filter_configuration: None, + consumer_update_listener: None, client_provided_name: String::from("rust-super-stream-consumer"), properties: HashMap::new(), + is_single_active_consumer: false, } } diff --git a/src/error.rs b/src/error.rs index 250f82e..f138a69 100644 --- a/src/error.rs +++ b/src/error.rs @@ -159,6 +159,9 @@ pub enum ConsumerCreateError { #[error("Filtering is not supported by the broker (requires RabbitMQ 3.13+ and stream_filtering feature flag activated)")] FilteringNotSupport, + + #[error("if you set single active consumer a consumer and super_stream consumer name need to be setup")] + SingleActiveConsumerNotSupported, } #[derive(Error, Debug)] diff --git a/src/lib.rs b/src/lib.rs index 2fe8c66..67d15e6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -87,14 +87,16 @@ pub type RabbitMQStreamResult = Result; pub use crate::client::{Client, ClientOptions, MetricsCollector}; -pub use crate::consumer::{Consumer, ConsumerBuilder, ConsumerHandle, FilterConfiguration}; +pub use crate::consumer::{ + Consumer, ConsumerBuilder, ConsumerHandle, FilterConfiguration, MessageContext, +}; pub use crate::environment::{Environment, EnvironmentBuilder, TlsConfiguration}; pub use crate::producer::{Dedup, NoDedup, Producer, ProducerBuilder}; pub mod types { pub use crate::byte_capacity::ByteCapacity; pub use crate::client::{Broker, MessageResult, StreamMetadata}; - pub use crate::consumer::Delivery; + pub use crate::consumer::{Delivery, MessageContext}; pub use crate::offset_specification::OffsetSpecification; pub use crate::stream_creator::StreamCreator; pub use crate::superstream::HashRoutingMurmurStrategy; diff --git a/src/superstream_consumer.rs b/src/superstream_consumer.rs index 708dd42..1ddce9c 100644 --- a/src/superstream_consumer.rs +++ b/src/superstream_consumer.rs @@ -1,12 +1,17 @@ use crate::client::Client; -use crate::consumer::Delivery; +use crate::consumer::{ConsumerUpdateListener, Delivery}; use crate::error::{ConsumerCloseError, ConsumerDeliveryError}; use crate::superstream::DefaultSuperStreamMetadata; -use crate::{error::ConsumerCreateError, ConsumerHandle, Environment, FilterConfiguration}; +use crate::{ + error::ConsumerCreateError, ConsumerHandle, Environment, FilterConfiguration, MessageContext, +}; use futures::task::AtomicWaker; -use futures::{Stream, StreamExt}; +use futures::FutureExt; +use futures::Stream; +use futures::StreamExt; use rabbitmq_stream_protocol::commands::subscribe::OffsetSpecification; use std::collections::HashMap; +use std::future::Future; use std::pin::Pin; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::{Relaxed, SeqCst}; @@ -30,20 +35,30 @@ struct SuperStreamConsumerInternal { /// Builder for [`Consumer`] pub struct SuperStreamConsumerBuilder { + pub(crate) super_stream_consumer_name: Option, pub(crate) environment: Environment, pub(crate) offset_specification: OffsetSpecification, pub(crate) filter_configuration: Option, + pub(crate) consumer_update_listener: Option, pub(crate) client_provided_name: String, + pub(crate) is_single_active_consumer: bool, pub(crate) properties: HashMap, } impl SuperStreamConsumerBuilder { pub async fn build( - self, + &mut self, super_stream: &str, ) -> Result { // Connect to the user specified node first, then look for a random replica to connect to instead. // This is recommended for load balancing purposes. + if (self.is_single_active_consumer + || self.properties.contains_key("single-active-consumer")) + && self.super_stream_consumer_name.is_none() + { + return Err(ConsumerCreateError::SingleActiveConsumerNotSupported); + } + let client = self.environment.create_client().await?; let (tx, rx) = channel(10000); @@ -55,16 +70,24 @@ impl SuperStreamConsumerBuilder { }; let partitions = super_stream_metadata.partitions().await; + if self.is_single_active_consumer { + self.properties + .insert("super-stream".to_string(), super_stream.to_string()); + } + let mut handlers = Vec::::new(); for partition in partitions.into_iter() { let tx_cloned = tx.clone(); let mut consumer = self .environment .consumer() + .name_optional(self.super_stream_consumer_name.clone()) .offset(self.offset_specification.clone()) .client_provided_name(self.client_provided_name.as_str()) .filter_input(self.filter_configuration.clone()) + .consumer_update_arc(self.consumer_update_listener.clone()) .properties(self.properties.clone()) + .enable_single_active_consumer(self.is_single_active_consumer) .build(partition.as_str()) .await .unwrap(); @@ -96,15 +119,41 @@ impl SuperStreamConsumerBuilder { self } + pub fn name(mut self, consumer_name: &str) -> Self { + self.super_stream_consumer_name = Some(String::from(consumer_name)); + self + } + + pub fn enable_single_active_consumer(mut self, is_single_active_consumer: bool) -> Self { + self.is_single_active_consumer = is_single_active_consumer; + self + } + pub fn filter_input(mut self, filter_configuration: Option) -> Self { self.filter_configuration = filter_configuration; self } + pub fn consumer_update( + mut self, + consumer_update_listener: impl Fn(u8, MessageContext) -> Fut + Send + Sync + 'static, + ) -> Self + where + Fut: Future + Send + Sync + 'static, + { + let f = Arc::new(move |a, b| consumer_update_listener(a, b).boxed()); + self.consumer_update_listener = Some(f); + self + } pub fn client_provided_name(mut self, name: &str) -> Self { self.client_provided_name = String::from(name); self } + + pub fn properties(mut self, properties: HashMap) -> Self { + self.properties = properties; + self + } } impl Stream for SuperStreamConsumer { @@ -129,6 +178,10 @@ impl SuperStreamConsumer { pub fn handle(&self) -> SuperStreamConsumerHandle { SuperStreamConsumerHandle(self.internal.clone()) } + + pub fn client(&self) -> Client { + self.internal.client.clone() + } } impl SuperStreamConsumerInternal { diff --git a/tests/integration/consumer_test.rs b/tests/integration/consumer_test.rs index f4f1899..9b8d7db 100644 --- a/tests/integration/consumer_test.rs +++ b/tests/integration/consumer_test.rs @@ -11,15 +11,17 @@ use rabbitmq_stream_client::{ types::{Delivery, Message, OffsetSpecification, SuperStreamConsumer}, Consumer, FilterConfiguration, NoDedup, Producer, }; -use tokio::task; -use tokio::time::sleep; +use std::collections::HashMap; use crate::producer_test::routing_key_strategy_value_extractor; +use rabbitmq_stream_client::types::MessageContext; use rabbitmq_stream_client::types::{ HashRoutingMurmurStrategy, RoutingKeyRoutingStrategy, RoutingStrategy, }; use rabbitmq_stream_protocol::ResponseCode; use std::sync::atomic::{AtomicU32, Ordering}; +use tokio::sync::Notify; +use tokio::task; use {std::sync::Arc, std::sync::Mutex}; #[tokio::test(flavor = "multi_thread")] @@ -667,3 +669,288 @@ async fn consumer_test_with_filtering_match_unfiltered() { assert!(repsonse_length == filtering_response_length); } + +#[tokio::test(flavor = "multi_thread")] +async fn super_stream_single_active_consumer_test() { + let env = TestEnvironment::create_super_stream().await; + + let message_count = 1000; + let mut super_stream_producer = env + .env + .super_stream_producer(RoutingStrategy::HashRoutingStrategy( + HashRoutingMurmurStrategy { + routing_extractor: &hash_strategy_value_extractor, + }, + )) + .client_provided_name("test super stream consumer ") + .build(&env.super_stream) + .await + .unwrap(); + + let notify_received_messages = Arc::new(Notify::new()); + + let mut super_stream_consumer: SuperStreamConsumer = env + .env + .super_stream_consumer() + .name("super-stream-with-sac-enabled") + .enable_single_active_consumer(true) + .offset(OffsetSpecification::First) + .build(&env.super_stream) + .await + .unwrap(); + + let mut super_stream_consumer_2: SuperStreamConsumer = env + .env + .super_stream_consumer() + .name("super-stream-with-sac-enabled") + .enable_single_active_consumer(true) + .offset(OffsetSpecification::First) + .build(&env.super_stream) + .await + .unwrap(); + + let mut super_stream_consumer_3: SuperStreamConsumer = env + .env + .super_stream_consumer() + .name("super-stream-with-sac-enabled") + .enable_single_active_consumer(true) + .offset(OffsetSpecification::First) + .build(&env.super_stream) + .await + .unwrap(); + + for n in 0..message_count { + let msg = Message::builder().body(format!("message{}", n)).build(); + let _ = super_stream_producer + .send(msg, |confirmation_status| async move {}) + .await + .unwrap(); + } + + let mut received_messages = Arc::new(AtomicU32::new(1)); + let handle_consumer_1 = super_stream_consumer.handle(); + let handle_consumer_2 = super_stream_consumer_2.handle(); + let handle_consumer_3 = super_stream_consumer_3.handle(); + + let received_message_outer = received_messages.clone(); + let notify_received_messages_outer = notify_received_messages.clone(); + task::spawn(async move { + let received_messages_int = received_message_outer.clone(); + let notify_received_messages_inner = notify_received_messages_outer.clone(); + while let _ = super_stream_consumer.next().await.unwrap() { + let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed); + if message_count == rec_msg { + notify_received_messages_inner.notify_one(); + break; + } + } + }); + + let received_message_outer = received_messages.clone(); + let notify_received_messages_outer = notify_received_messages.clone(); + task::spawn(async move { + let received_messages_int = received_message_outer.clone(); + let notify_received_messages_inner = notify_received_messages_outer.clone(); + while let _ = super_stream_consumer_2.next().await.unwrap() { + let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed); + if message_count == rec_msg { + notify_received_messages_inner.notify_one(); + break; + } + } + }); + + let received_message_outer = received_messages.clone(); + let notify_received_messages_outer = notify_received_messages.clone(); + task::spawn(async move { + let received_messages_int = received_message_outer.clone(); + let notify_received_messages_inner = notify_received_messages_outer.clone(); + while let _ = super_stream_consumer_3.next().await.unwrap() { + let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed); + if message_count == rec_msg { + notify_received_messages_inner.notify_one(); + break; + } + } + }); + + notify_received_messages.notified().await; + + assert!(received_messages.load(Ordering::Relaxed) == message_count + 1); + + super_stream_producer.close().await.unwrap(); + _ = handle_consumer_1.close().await; + _ = handle_consumer_2.close().await; + _ = handle_consumer_3.close().await; +} + +#[tokio::test(flavor = "multi_thread")] +async fn super_stream_single_active_consumer_test_with_callback() { + let env = TestEnvironment::create_super_stream().await; + let super_stream_consumer_name = "super-stream-with-sac-enabled"; + + let message_count = 1000; + let mut super_stream_producer = env + .env + .super_stream_producer(RoutingStrategy::HashRoutingStrategy( + HashRoutingMurmurStrategy { + routing_extractor: &hash_strategy_value_extractor, + }, + )) + .client_provided_name("test super stream consumer ") + .build(&env.super_stream) + .await + .unwrap(); + + let notify_received_messages = Arc::new(Notify::new()); + + let mut result_stream_name_1 = Arc::new(Mutex::new(String::from(""))); + let mut result_stream_name_2 = Arc::new(Mutex::new(String::from(""))); + let mut result_stream_name_3 = Arc::new(Mutex::new(String::from(""))); + + let mut result_name_1 = Arc::new(Mutex::new(String::from(""))); + let mut result_name_2 = Arc::new(Mutex::new(String::from(""))); + let mut result_name_3 = Arc::new(Mutex::new(String::from(""))); + + let mut result_stream_name_outer = result_stream_name_1.clone(); + let mut result_stream_name_2_outer = result_stream_name_2.clone(); + let mut result_stream_name_3_outer = result_stream_name_3.clone(); + + let mut result_name_1_outer = result_name_1.clone(); + let mut result_name_2_outer = result_name_2.clone(); + let mut result_name_3_outer = result_name_3.clone(); + + let mut super_stream_consumer: SuperStreamConsumer = env + .env + .super_stream_consumer() + .name(super_stream_consumer_name) + .enable_single_active_consumer(true) + .offset(OffsetSpecification::First) + .consumer_update(move |active, message_context| { + let mut result_stream_name_int = result_stream_name_outer.clone(); + let mut result_consumer_name_int = result_name_1_outer.clone(); + async move { + *result_stream_name_int.lock().unwrap() = message_context.stream().clone(); + *result_consumer_name_int.lock().unwrap() = message_context.name().clone(); + + OffsetSpecification::First + } + }) + .build(&env.super_stream) + .await + .unwrap(); + + let mut super_stream_consumer_2: SuperStreamConsumer = env + .env + .super_stream_consumer() + .name("super-stream-with-sac-enabled") + .enable_single_active_consumer(true) + .offset(OffsetSpecification::First) + .consumer_update(move |active, message_context| { + let mut result_stream_name_int = result_stream_name_2_outer.clone(); + let mut result_consumer_name_int = result_name_2_outer.clone(); + async move { + *result_stream_name_int.lock().unwrap() = message_context.stream().clone(); + *result_consumer_name_int.lock().unwrap() = message_context.name().clone(); + OffsetSpecification::First + } + }) + .build(&env.super_stream) + .await + .unwrap(); + + let mut super_stream_consumer_3: SuperStreamConsumer = env + .env + .super_stream_consumer() + .name("super-stream-with-sac-enabled") + .enable_single_active_consumer(true) + .offset(OffsetSpecification::First) + .consumer_update(move |active, message_context| { + let mut result_stream_name_int = result_stream_name_3_outer.clone(); + let mut result_consumer_name_int = result_name_3_outer.clone(); + async move { + *result_stream_name_int.lock().unwrap() = message_context.stream().clone(); + *result_consumer_name_int.lock().unwrap() = message_context.name().clone(); + OffsetSpecification::First + } + }) + .build(&env.super_stream) + .await + .unwrap(); + + for n in 0..message_count { + let msg = Message::builder().body(format!("message{}", n)).build(); + let _ = super_stream_producer + .send(msg, |confirmation_status| async move {}) + .await + .unwrap(); + } + + let mut received_messages = Arc::new(AtomicU32::new(1)); + let handle_consumer_1 = super_stream_consumer.handle(); + let handle_consumer_2 = super_stream_consumer_2.handle(); + let handle_consumer_3 = super_stream_consumer_3.handle(); + + let received_message_outer = received_messages.clone(); + let notify_received_messages_outer = notify_received_messages.clone(); + task::spawn(async move { + let received_messages_int = received_message_outer.clone(); + let notify_received_messages_inner = notify_received_messages_outer.clone(); + while let _ = super_stream_consumer.next().await.unwrap() { + let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed); + if message_count == rec_msg { + notify_received_messages_inner.notify_one(); + break; + } + } + }); + + let received_message_outer = received_messages.clone(); + let notify_received_messages_outer = notify_received_messages.clone(); + task::spawn(async move { + let received_messages_int = received_message_outer.clone(); + let notify_received_messages_inner = notify_received_messages_outer.clone(); + while let _ = super_stream_consumer_2.next().await.unwrap() { + let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed); + if message_count == rec_msg { + notify_received_messages_inner.notify_one(); + break; + } + } + }); + + let received_message_outer = received_messages.clone(); + let notify_received_messages_outer = notify_received_messages.clone(); + task::spawn(async move { + let received_messages_int = received_message_outer.clone(); + let notify_received_messages_inner = notify_received_messages_outer.clone(); + while let _ = super_stream_consumer_3.next().await.unwrap() { + let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed); + if message_count == rec_msg { + notify_received_messages_inner.notify_one(); + break; + } + } + }); + + notify_received_messages.notified().await; + + assert!(received_messages.load(Ordering::Relaxed) == message_count + 1); + assert!(env + .partitions + .contains(&(*result_stream_name_1.clone().lock().unwrap()))); + assert!(env + .partitions + .contains(&(*result_stream_name_2.clone().lock().unwrap()))); + assert!(env + .partitions + .contains(&(*result_stream_name_3.clone().lock().unwrap()))); + assert!(super_stream_consumer_name == *result_name_1.clone().lock().unwrap()); + assert!(super_stream_consumer_name == *result_name_2.clone().lock().unwrap()); + assert!(super_stream_consumer_name == *result_name_3.clone().lock().unwrap()); + + super_stream_producer.close().await.unwrap(); + _ = handle_consumer_1.close().await; + _ = handle_consumer_2.close().await; + _ = handle_consumer_3.close().await; +}