Skip to content

Commit

Permalink
reviewing sac example (#258)
Browse files Browse the repository at this point in the history
* reviewing sac example

* updating README
  • Loading branch information
DanielePalaia authored Nov 21, 2024
1 parent 30337e5 commit 481d0f1
Show file tree
Hide file tree
Showing 7 changed files with 280 additions and 133 deletions.
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Welcome to the documentation for the RabbitMQ Stream Rust Client. This guide pro
- [Publishing Messages](#publishing-messages)
- [Consuming Messages](#consuming-messages)
- [Super Stream](#super-stream)
- [Single Active Consumer](#single-active-consumer)
- [Filtering](#filtering)
5. [Examples](#examples)
6. [Development](#development)
Expand Down Expand Up @@ -171,6 +172,19 @@ See the [Super Stream Producer Example using Routing key mode](./examples/supers

See the [Super Stream Consumer Example](./examples/superstreams/receive_super_stream.rs)

## Single active consumer

The client supports the single-active-consumer feature:

[single-active-consumer feature](https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams)

See the Java doc for further information (Same concepts apply here):

[Single-Active-Consumer Java doc](https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#single-active-consumer)

See the Rust full example here:

[Single-Active-Consumer-Full-Example](/examples/single_active_consumer)

## Filtering

Expand Down
11 changes: 11 additions & 0 deletions examples/single_active_consumer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[workspace]

[package]
name = "single_active_consumer"
version = "0.1.0"
edition = "2021"

[dependencies]
futures = "0.3.31"
tokio = "1.41.1"
rabbitmq-stream-client = { path = "../../" }
81 changes: 69 additions & 12 deletions examples/single_active_consumer/README.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,79 @@
Single active consumer
Super stream example
---

This is an example to enable single active consumer functionality for superstream:
https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams
https://www.rabbitmq.com/blog/2022/07/13/rabbitmq-3-11-feature-preview-super-streams
[Super Streams Documentation](https://www.rabbitmq.com/streams.html#super-streams) for more details.
[Super Streams blog post](https://blog.rabbitmq.com/posts/2022/07/rabbitmq-3-11-feature-preview-super-streams)

This folder contains a consumer and a super-stream consumer configured to enable it.
You can use the example in the super-stream folder to produce messages for a super-stream.

You can then run the single_active_consumer_super_stream.rs in this folder.
Assuming the super-stream is composed by three streams, you can see that the Consumer will consume messages from all the streams part of the superstream.
This example shows how to use the Super Stream feature in RabbitMQ 3.11.0.

You can then run another consumer in parallel.
now you'll see that one of the two consumers will consume from 2 streams while the other on one stream.
Then run the producer in one terminal:

If you run another you'll see that every Consumer will read from a single stream.
$ cargo run --release -- --producer

If you then stop one of the Consumer you'll notice that the related stream is now read from on the Consumer which is still running.

And the consumer in another terminal:

$ cargo run --release -- --consumer

You should see the consumer receiving the messages from the producer.

It would be something like:
```bash
$ cargo run -- --producer
Starting SuperStream Producer
Super Stream Producer connected to RabbitMQ
Super Stream Producer sent 0 messages to invoices
Super Stream Producer sent 1 messages to invoices
Super Stream Producer sent 2 messages to invoices
Super Stream Producer sent 3 messages to invoices
```

```bash
$ cargo run --release -- --consumer my_first_consumer
Starting SuperStream Consumer my_first_consumer
Super Stream Consumer connected to RabbitMQ. ConsumerName my_first_consumer
Consumer Name my_first_consumer: Got message: super_stream_message_1 from stream: invoices-1 with offset: 33
Consumer Name my_first_consumer: Got message: super_stream_message_2 from stream: invoices-2 with offset: 34
Consumer Name my_first_consumer: Got message: super_stream_message_3 from stream: invoices-0 with offset: 37
Consumer Name my_first_consumer: Got message: super_stream_message_4 from stream: invoices-0 with offset: 36
Consumer Name my_first_consumer: Got message: super_stream_message_5 from stream: invoices-1 with offset: 39
Consumer Name my_first_consumer: Got message: super_stream_message_6 from stream: invoices-2 with offset: 40
Consumer Name my_first_consumer: Got message: super_stream_message_7 from stream: invoices-0 with offset: 41
Consumer Name my_first_consumer: Got message: super_stream_message_8 from stream: invoices-1 with offset: 42
Consumer Name my_first_consumer: Got message: super_stream_message_9 from stream: invoices-2 with offset: 43
Consumer Name my_first_consumer: Got message: super_stream_message_10 from stream: invoices-1 with offset: 44
```

To see the Single active consumer in action, run another consumer:

$ cargo run --release -- --consumer my_second_consumer

You should see the second consumer receiving the part of the messages from the producer. In thi case only the messages coming from the `invoices-1`.

It should be something like:
```bash
$ cargo run --release -- --consumer my_second_consumer
Starting SuperStream Consumer my_second_consumer
Super Stream Consumer connected to RabbitMQ. ConsumerName my_second_consumer
Consumer Name my_second_consumer: Got message: super_stream_message_64 from stream: invoices-1 with offset: 86
Consumer Name my_second_consumer: Got message: super_stream_message_65 from stream: invoices-1 with offset: 87
Consumer Name my_second_consumer: Got message: super_stream_message_66 from stream: invoices-1 with offset: 88
Consumer Name my_second_consumer: Got message: super_stream_message_67 from stream: invoices-1 with offset: 89
Consumer Name my_second_consumer: Got message: super_stream_message_68 from stream: invoices-1 with offset: 90
Consumer Name my_second_consumer: Got message: super_stream_message_69 from stream: invoices-1 with offset: 90
Consumer Name my_second_consumer: Got message: super_stream_message_70 from stream: invoices-1 with offset: 90
```
and the first consumer should be receiving the rest of the messages:
```bash
Consumer Name my_first_consumer: Got message: super_stream_message_88 from stream: invoices-0 with offset: 92
Consumer Name my_first_consumer: Got message: super_stream_message_87 from stream: invoices-2 with offset: 93
Consumer Name my_first_consumer: Got message: super_stream_message_89 from stream: invoices-2 with offset: 95
Consumer Name my_first_consumer: Got message: super_stream_message_90 from stream: invoices-0 with offset: 97
Consumer Name my_first_consumer: Got message: super_stream_message_91 from stream: invoices-0 with offset: 96
Consumer Name my_first_consumer: Got message: super_stream_message_92 from stream: invoices-2 with offset: 99
Consumer Name my_first_consumer: Got message: super_stream_message_93 from stream: invoices-2 with offset: 101
```



Expand Down
97 changes: 0 additions & 97 deletions examples/single_active_consumer/single_active_consumer.rs

This file was deleted.

45 changes: 45 additions & 0 deletions examples/single_active_consumer/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
mod send_super_stream;
mod single_active_consumer_super_stream;

use std::env;

static SUPER_STREAM: &str = "invoices";

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut args = env::args().skip(1);
while let Some(arg) = args.next() {
match &arg[..] {
"-h" | "--help" => help(),
"--consumer" => {
let mut consumer_name = String::from("");
let next = args.next().take();
if next.is_some() {
println!("is some");
consumer_name = next.clone().take().unwrap();
}
println!("Starting SuperStream Consumer {}", consumer_name);
single_active_consumer_super_stream::start_consumer(consumer_name).await?;
}

"--producer" => {
println!("Starting SuperStream Producer");
send_super_stream::start_producer().await?
},

arg if arg.starts_with("-") => {
eprintln!("Unknown argument: {}", arg);
}

_ => {
eprintln!("Unknown argument: {}", arg);
help();
}
}
}
Ok(())
}

fn help() {
println!("--consumer or --producer")
}
116 changes: 116 additions & 0 deletions examples/single_active_consumer/src/send_super_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
use rabbitmq_stream_client::error::StreamCreateError;
use rabbitmq_stream_client::types::{
ByteCapacity, HashRoutingMurmurStrategy, Message, ResponseCode, RoutingStrategy,
};
use std::convert::TryInto;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use tokio::sync::Notify;
use tokio::time;

fn hash_strategy_value_extractor(message: &Message) -> String {
message
.application_properties()
.unwrap()
.get("id")
.unwrap()
.clone()
.try_into()
.unwrap()
}

pub async fn start_producer() -> Result<(), Box<dyn std::error::Error>> {
use rabbitmq_stream_client::Environment;
let environment = Environment::builder().build().await?;
println!("Super Stream Producer connected to RabbitMQ");
let confirmed_messages = Arc::new(AtomicU32::new(0));
let notify_on_send = Arc::new(Notify::new());
let _ = environment
.stream_creator()
.max_length(ByteCapacity::GB(5))
.create_super_stream(crate::SUPER_STREAM, 3, None)
.await;

let create_response = environment
.stream_creator()
.max_length(ByteCapacity::GB(5))
.create_super_stream(crate::SUPER_STREAM, 3, None)
.await;

if let Err(e) = create_response {
if let StreamCreateError::Create { stream, status } = e {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!(
"[Super Stream Producer] Error creating stream: {:?} {:?}",
stream, err
);
}
}
}
}

let super_stream_producer = environment
.super_stream_producer(RoutingStrategy::HashRoutingStrategy(
HashRoutingMurmurStrategy {
routing_extractor: &hash_strategy_value_extractor,
},
))
.client_provided_name("rust stream producer - sac example")
.build(crate::SUPER_STREAM)
.await;

match super_stream_producer {
Ok(mut producer) => {
println!("[Super Stream Producer] Successfully created super stream producer");
let mut idx = 0;
loop {
let counter = confirmed_messages.clone();
let notifier = notify_on_send.clone();
let msg = Message::builder()
.body(format!("super stream message_{}", idx))
.application_properties()
.insert("id", idx.to_string())
.message_builder()
.build();

let send_result = producer
.send(msg, move |_| {
let inner_counter = counter.clone();
let inner_notifier = notifier.clone();
async move {
if inner_counter.fetch_add(1, Ordering::Relaxed) == idx - 1 {
inner_notifier.notify_one();
}
}
})
.await;

match send_result {
Ok(_) => {
idx += 1;
println!(
"[Super Stream Producer] Message {} sent to {}",
idx,
crate::SUPER_STREAM
);
}
Err(err) => {
println!(
"[Super Stream Producer] Failed to send message. error: {}",
err
);
}
}

time::sleep(time::Duration::from_millis(1_000)).await;
}
}
Err(err) => {
println!("Failed to create super stream producer. error {}", err);
Ok(())
}
}
}
Loading

0 comments on commit 481d0f1

Please sign in to comment.