diff --git a/README.md b/README.md index 0f1814e..0f02462 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,7 @@ Welcome to the documentation for the RabbitMQ Stream Rust Client. This guide pro - [Publishing Messages](#publishing-messages) - [Consuming Messages](#consuming-messages) - [Super Stream](#super-stream) + - [Single Active Consumer](#single-active-consumer) - [Filtering](#filtering) 5. [Examples](#examples) 6. [Development](#development) @@ -171,6 +172,19 @@ See the [Super Stream Producer Example using Routing key mode](./examples/supers See the [Super Stream Consumer Example](./examples/superstreams/receive_super_stream.rs) +## Single active consumer + +The client supports the single-active-consumer feature: + +[single-active-consumer feature](https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams) + +See the Java doc for further information (Same concepts apply here): + +[Single-Active-Consumer Java doc](https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#single-active-consumer) + +See the Rust full example here: + +[Single-Active-Consumer-Full-Example](/examples/single_active_consumer) ## Filtering diff --git a/examples/single_active_consumer/Cargo.toml b/examples/single_active_consumer/Cargo.toml new file mode 100644 index 0000000..a570acd --- /dev/null +++ b/examples/single_active_consumer/Cargo.toml @@ -0,0 +1,11 @@ +[workspace] + +[package] +name = "single_active_consumer" +version = "0.1.0" +edition = "2021" + +[dependencies] +futures = "0.3.31" +tokio = "1.41.1" +rabbitmq-stream-client = { path = "../../" } diff --git a/examples/single_active_consumer/README.md b/examples/single_active_consumer/README.md index 29523a8..b4cac14 100644 --- a/examples/single_active_consumer/README.md +++ b/examples/single_active_consumer/README.md @@ -1,22 +1,79 @@ -Single active consumer +Super stream example --- -This is an example to enable single active consumer functionality for superstream: -https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams -https://www.rabbitmq.com/blog/2022/07/13/rabbitmq-3-11-feature-preview-super-streams +[Super Streams Documentation](https://www.rabbitmq.com/streams.html#super-streams) for more details. +[Super Streams blog post](https://blog.rabbitmq.com/posts/2022/07/rabbitmq-3-11-feature-preview-super-streams) -This folder contains a consumer and a super-stream consumer configured to enable it. -You can use the example in the super-stream folder to produce messages for a super-stream. -You can then run the single_active_consumer_super_stream.rs in this folder. -Assuming the super-stream is composed by three streams, you can see that the Consumer will consume messages from all the streams part of the superstream. +This example shows how to use the Super Stream feature in RabbitMQ 3.11.0. -You can then run another consumer in parallel. -now you'll see that one of the two consumers will consume from 2 streams while the other on one stream. +Then run the producer in one terminal: -If you run another you'll see that every Consumer will read from a single stream. + $ cargo run --release -- --producer -If you then stop one of the Consumer you'll notice that the related stream is now read from on the Consumer which is still running. + +And the consumer in another terminal: + + $ cargo run --release -- --consumer + +You should see the consumer receiving the messages from the producer. + +It would be something like: +```bash +$ cargo run -- --producer +Starting SuperStream Producer +Super Stream Producer connected to RabbitMQ +Super Stream Producer sent 0 messages to invoices +Super Stream Producer sent 1 messages to invoices +Super Stream Producer sent 2 messages to invoices +Super Stream Producer sent 3 messages to invoices +``` + +```bash +$ cargo run --release -- --consumer my_first_consumer +Starting SuperStream Consumer my_first_consumer +Super Stream Consumer connected to RabbitMQ. ConsumerName my_first_consumer +Consumer Name my_first_consumer: Got message: super_stream_message_1 from stream: invoices-1 with offset: 33 +Consumer Name my_first_consumer: Got message: super_stream_message_2 from stream: invoices-2 with offset: 34 +Consumer Name my_first_consumer: Got message: super_stream_message_3 from stream: invoices-0 with offset: 37 +Consumer Name my_first_consumer: Got message: super_stream_message_4 from stream: invoices-0 with offset: 36 +Consumer Name my_first_consumer: Got message: super_stream_message_5 from stream: invoices-1 with offset: 39 +Consumer Name my_first_consumer: Got message: super_stream_message_6 from stream: invoices-2 with offset: 40 +Consumer Name my_first_consumer: Got message: super_stream_message_7 from stream: invoices-0 with offset: 41 +Consumer Name my_first_consumer: Got message: super_stream_message_8 from stream: invoices-1 with offset: 42 +Consumer Name my_first_consumer: Got message: super_stream_message_9 from stream: invoices-2 with offset: 43 +Consumer Name my_first_consumer: Got message: super_stream_message_10 from stream: invoices-1 with offset: 44 +``` + +To see the Single active consumer in action, run another consumer: + + $ cargo run --release -- --consumer my_second_consumer + +You should see the second consumer receiving the part of the messages from the producer. In thi case only the messages coming from the `invoices-1`. + +It should be something like: +```bash +$ cargo run --release -- --consumer my_second_consumer +Starting SuperStream Consumer my_second_consumer +Super Stream Consumer connected to RabbitMQ. ConsumerName my_second_consumer +Consumer Name my_second_consumer: Got message: super_stream_message_64 from stream: invoices-1 with offset: 86 +Consumer Name my_second_consumer: Got message: super_stream_message_65 from stream: invoices-1 with offset: 87 +Consumer Name my_second_consumer: Got message: super_stream_message_66 from stream: invoices-1 with offset: 88 +Consumer Name my_second_consumer: Got message: super_stream_message_67 from stream: invoices-1 with offset: 89 +Consumer Name my_second_consumer: Got message: super_stream_message_68 from stream: invoices-1 with offset: 90 +Consumer Name my_second_consumer: Got message: super_stream_message_69 from stream: invoices-1 with offset: 90 +Consumer Name my_second_consumer: Got message: super_stream_message_70 from stream: invoices-1 with offset: 90 +``` +and the first consumer should be receiving the rest of the messages: +```bash +Consumer Name my_first_consumer: Got message: super_stream_message_88 from stream: invoices-0 with offset: 92 +Consumer Name my_first_consumer: Got message: super_stream_message_87 from stream: invoices-2 with offset: 93 +Consumer Name my_first_consumer: Got message: super_stream_message_89 from stream: invoices-2 with offset: 95 +Consumer Name my_first_consumer: Got message: super_stream_message_90 from stream: invoices-0 with offset: 97 +Consumer Name my_first_consumer: Got message: super_stream_message_91 from stream: invoices-0 with offset: 96 +Consumer Name my_first_consumer: Got message: super_stream_message_92 from stream: invoices-2 with offset: 99 +Consumer Name my_first_consumer: Got message: super_stream_message_93 from stream: invoices-2 with offset: 101 +``` diff --git a/examples/single_active_consumer/single_active_consumer.rs b/examples/single_active_consumer/single_active_consumer.rs deleted file mode 100644 index 24b4705..0000000 --- a/examples/single_active_consumer/single_active_consumer.rs +++ /dev/null @@ -1,97 +0,0 @@ -use futures::StreamExt; -use rabbitmq_stream_client::error::StreamCreateError; -use rabbitmq_stream_client::types::{ - ByteCapacity, OffsetSpecification, ResponseCode, -}; - - -#[tokio::main] -async fn main() -> Result<(), Box> { - use rabbitmq_stream_client::Environment; - let environment = Environment::builder().build().await?; - let message_count = 1000000; - let stream = "hello-rust-super-stream-2"; - - let create_response = environment - .stream_creator() - .max_length(ByteCapacity::GB(5)) - .create(stream) - .await; - - if let Err(e) = create_response { - if let StreamCreateError::Create { stream, status } = e { - match status { - // we can ignore this error because the stream already exists - ResponseCode::StreamAlreadyExists => {} - err => { - println!("Error creating stream: {:?} {:?}", stream, err); - } - } - } - } - println!( - "Super stream consumer example, consuming messages from the super stream {}", - stream - ); - - let mut consumer = environment - .consumer() - // Mandatory if sac is enabled - .name("consumer-group-1") - .offset(OffsetSpecification::First) - .enable_single_active_consumer(true) - .client_provided_name("my super stream consumer for hello rust") - .consumer_update(move |active, message_context| async move { - let name = message_context.name(); - let stream = message_context.stream(); - let client = message_context.client(); - - println!( - "single active consumer: is active: {} on stream: {} with consumer_name: {}", - active, stream, name - ); - let stored_offset = client.query_offset(name, stream.as_str()).await; - - if let Err(e) = stored_offset { - return OffsetSpecification::First; - } - - let stored_offset_u = stored_offset.unwrap(); - println!("restarting from stored_offset: {}", stored_offset_u); - OffsetSpecification::Offset(stored_offset_u) - - }) - .build(stream) - .await - .unwrap(); - - for i in 0..message_count { - let delivery = consumer.next().await.unwrap(); - { - let delivery = delivery.unwrap(); - println!( - "Got message: {:#?} from stream: {} with offset: {}", - delivery - .message() - .data() - .map(|data| String::from_utf8(data.to_vec()).unwrap()) - .unwrap(), - delivery.stream(), - delivery.offset() - ); - - //store an offset - if i == 10000 { - let _ = consumer - .store_offset(i) - .await - .unwrap_or_else(|e| println!("Err: {}", e)); - } - } - } - - println!("Stopping consumer..."); - let _ = consumer.handle().close().await; - println!("consumer stopped"); - Ok(()) -} \ No newline at end of file diff --git a/examples/single_active_consumer/src/main.rs b/examples/single_active_consumer/src/main.rs new file mode 100644 index 0000000..696348a --- /dev/null +++ b/examples/single_active_consumer/src/main.rs @@ -0,0 +1,45 @@ +mod send_super_stream; +mod single_active_consumer_super_stream; + +use std::env; + +static SUPER_STREAM: &str = "invoices"; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let mut args = env::args().skip(1); + while let Some(arg) = args.next() { + match &arg[..] { + "-h" | "--help" => help(), + "--consumer" => { + let mut consumer_name = String::from(""); + let next = args.next().take(); + if next.is_some() { + println!("is some"); + consumer_name = next.clone().take().unwrap(); + } + println!("Starting SuperStream Consumer {}", consumer_name); + single_active_consumer_super_stream::start_consumer(consumer_name).await?; + } + + "--producer" => { + println!("Starting SuperStream Producer"); + send_super_stream::start_producer().await? + }, + + arg if arg.starts_with("-") => { + eprintln!("Unknown argument: {}", arg); + } + + _ => { + eprintln!("Unknown argument: {}", arg); + help(); + } + } + } + Ok(()) +} + +fn help() { + println!("--consumer or --producer") +} diff --git a/examples/single_active_consumer/src/send_super_stream.rs b/examples/single_active_consumer/src/send_super_stream.rs new file mode 100644 index 0000000..975197b --- /dev/null +++ b/examples/single_active_consumer/src/send_super_stream.rs @@ -0,0 +1,116 @@ +use rabbitmq_stream_client::error::StreamCreateError; +use rabbitmq_stream_client::types::{ + ByteCapacity, HashRoutingMurmurStrategy, Message, ResponseCode, RoutingStrategy, +}; +use std::convert::TryInto; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; +use tokio::sync::Notify; +use tokio::time; + +fn hash_strategy_value_extractor(message: &Message) -> String { + message + .application_properties() + .unwrap() + .get("id") + .unwrap() + .clone() + .try_into() + .unwrap() +} + +pub async fn start_producer() -> Result<(), Box> { + use rabbitmq_stream_client::Environment; + let environment = Environment::builder().build().await?; + println!("Super Stream Producer connected to RabbitMQ"); + let confirmed_messages = Arc::new(AtomicU32::new(0)); + let notify_on_send = Arc::new(Notify::new()); + let _ = environment + .stream_creator() + .max_length(ByteCapacity::GB(5)) + .create_super_stream(crate::SUPER_STREAM, 3, None) + .await; + + let create_response = environment + .stream_creator() + .max_length(ByteCapacity::GB(5)) + .create_super_stream(crate::SUPER_STREAM, 3, None) + .await; + + if let Err(e) = create_response { + if let StreamCreateError::Create { stream, status } = e { + match status { + // we can ignore this error because the stream already exists + ResponseCode::StreamAlreadyExists => {} + err => { + println!( + "[Super Stream Producer] Error creating stream: {:?} {:?}", + stream, err + ); + } + } + } + } + + let super_stream_producer = environment + .super_stream_producer(RoutingStrategy::HashRoutingStrategy( + HashRoutingMurmurStrategy { + routing_extractor: &hash_strategy_value_extractor, + }, + )) + .client_provided_name("rust stream producer - sac example") + .build(crate::SUPER_STREAM) + .await; + + match super_stream_producer { + Ok(mut producer) => { + println!("[Super Stream Producer] Successfully created super stream producer"); + let mut idx = 0; + loop { + let counter = confirmed_messages.clone(); + let notifier = notify_on_send.clone(); + let msg = Message::builder() + .body(format!("super stream message_{}", idx)) + .application_properties() + .insert("id", idx.to_string()) + .message_builder() + .build(); + + let send_result = producer + .send(msg, move |_| { + let inner_counter = counter.clone(); + let inner_notifier = notifier.clone(); + async move { + if inner_counter.fetch_add(1, Ordering::Relaxed) == idx - 1 { + inner_notifier.notify_one(); + } + } + }) + .await; + + match send_result { + Ok(_) => { + idx += 1; + println!( + "[Super Stream Producer] Message {} sent to {}", + idx, + crate::SUPER_STREAM + ); + } + Err(err) => { + println!( + "[Super Stream Producer] Failed to send message. error: {}", + err + ); + } + } + + time::sleep(time::Duration::from_millis(1_000)).await; + } + } + Err(err) => { + println!("Failed to create super stream producer. error {}", err); + Ok(()) + } + } +} diff --git a/examples/single_active_consumer/single_active_consumer_super_stream.rs b/examples/single_active_consumer/src/single_active_consumer_super_stream.rs similarity index 68% rename from examples/single_active_consumer/single_active_consumer_super_stream.rs rename to examples/single_active_consumer/src/single_active_consumer_super_stream.rs index c66b9aa..cd58fd9 100644 --- a/examples/single_active_consumer/single_active_consumer_super_stream.rs +++ b/examples/single_active_consumer/src/single_active_consumer_super_stream.rs @@ -3,19 +3,19 @@ use rabbitmq_stream_client::error::StreamCreateError; use rabbitmq_stream_client::types::{ ByteCapacity, OffsetSpecification, ResponseCode, SuperStreamConsumer, }; -use std::collections::HashMap; -#[tokio::main] -async fn main() -> Result<(), Box> { +pub async fn start_consumer(consumer_name: String) -> Result<(), Box> { use rabbitmq_stream_client::Environment; let environment = Environment::builder().build().await?; - let message_count = 1000000; - let super_stream = "hello-rust-super-stream"; + let mut consumer_name_tmp = String::from(""); + + + println!("Super Stream Consumer connected to RabbitMQ. ConsumerName {}", consumer_name); let create_response = environment .stream_creator() .max_length(ByteCapacity::GB(5)) - .create_super_stream(super_stream, 3, None) + .create_super_stream(crate::SUPER_STREAM, 3, None) .await; if let Err(e) = create_response { @@ -31,7 +31,7 @@ async fn main() -> Result<(), Box> { } println!( "Super stream consumer example, consuming messages from the super stream {}", - super_stream + crate::SUPER_STREAM ); let mut super_stream_consumer: SuperStreamConsumer = environment @@ -41,7 +41,7 @@ async fn main() -> Result<(), Box> { .offset(OffsetSpecification::First) .enable_single_active_consumer(true) .client_provided_name("my super stream consumer for hello rust") - .consumer_update(move |active, message_context| async move { + .consumer_update(move |active, message_context| async move { let name = message_context.name(); let stream = message_context.stream(); let client = message_context.client(); @@ -52,24 +52,24 @@ async fn main() -> Result<(), Box> { ); let stored_offset = client.query_offset(name, stream.as_str()).await; - if let Err(e) = stored_offset { + if let Err(_) = stored_offset { return OffsetSpecification::First; } let stored_offset_u = stored_offset.unwrap(); - println!("stored_offset_u {}", stored_offset_u.clone()); + println!("offset: {} stored", stored_offset_u.clone()); OffsetSpecification::Offset(stored_offset_u) - }) - .build(super_stream) + .build(crate::SUPER_STREAM) .await .unwrap(); - for _ in 0..message_count { + loop { let delivery = super_stream_consumer.next().await.unwrap(); { let delivery = delivery.unwrap(); println!( - "Got message: {:#?} from stream: {} with offset: {}", + "Consumer name {}: Got message: {:#?} from stream: {} with offset: {}", + consumer_name, delivery .message() .data() @@ -80,15 +80,16 @@ async fn main() -> Result<(), Box> { ); // Store an offset for every consumer - if delivery.consumer_name().is_some() && delivery.offset() == 1000 { - super_stream_consumer.client().store_offset(delivery.consumer_name().unwrap().as_str(), delivery.stream().as_str(), delivery.offset()).await; - } - + // store the offset each time a message is consumed + // that is not a best practice, but it is done here for demonstration purposes + super_stream_consumer + .client() + .store_offset( + delivery.consumer_name().unwrap().as_str(), + delivery.stream().as_str(), + delivery.offset(), + ) + .await?; } } - - println!("Stopping super stream consumer..."); - let _ = super_stream_consumer.handle().close().await; - println!("Super stream consumer stopped"); - Ok(()) -} \ No newline at end of file +}