-
Notifications
You must be signed in to change notification settings - Fork 17
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
82f93ad
commit 1019da2
Showing
6 changed files
with
266 additions
and
133 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
[workspace] | ||
|
||
[package] | ||
name = "single_active_consumer" | ||
version = "0.1.0" | ||
edition = "2021" | ||
|
||
[dependencies] | ||
futures = "0.3.31" | ||
tokio = "1.41.1" | ||
rabbitmq-stream-client = { path = "../../" } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
mod send_super_stream; | ||
mod single_active_consumer_super_stream; | ||
|
||
use std::env; | ||
|
||
static SUPER_STREAM: &str = "invoices"; | ||
|
||
#[tokio::main] | ||
async fn main() -> Result<(), Box<dyn std::error::Error>> { | ||
let mut args = env::args().skip(1); | ||
while let Some(arg) = args.next() { | ||
match &arg[..] { | ||
"-h" | "--help" => help(), | ||
"--consumer" => { | ||
let mut consumer_name = String::from(""); | ||
let next = args.next().take(); | ||
if next.is_some() { | ||
println!("is some"); | ||
consumer_name = next.clone().take().unwrap(); | ||
} | ||
println!("Starting SuperStream Consumer {}", consumer_name); | ||
single_active_consumer_super_stream::start_consumer(consumer_name).await?; | ||
} | ||
|
||
"--producer" => { | ||
println!("Starting SuperStream Producer"); | ||
send_super_stream::start_producer().await? | ||
}, | ||
|
||
arg if arg.starts_with("-") => { | ||
eprintln!("Unknown argument: {}", arg); | ||
} | ||
|
||
_ => { | ||
eprintln!("Unknown argument: {}", arg); | ||
help(); | ||
} | ||
} | ||
} | ||
Ok(()) | ||
} | ||
|
||
fn help() { | ||
println!("--consumer or --producer") | ||
} |
116 changes: 116 additions & 0 deletions
116
examples/single_active_consumer/src/send_super_stream.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
use rabbitmq_stream_client::error::StreamCreateError; | ||
use rabbitmq_stream_client::types::{ | ||
ByteCapacity, HashRoutingMurmurStrategy, Message, ResponseCode, RoutingStrategy, | ||
}; | ||
use std::convert::TryInto; | ||
use std::sync::atomic::{AtomicU32, Ordering}; | ||
use std::sync::Arc; | ||
use tokio::sync::Notify; | ||
use tokio::time; | ||
|
||
fn hash_strategy_value_extractor(message: &Message) -> String { | ||
message | ||
.application_properties() | ||
.unwrap() | ||
.get("id") | ||
.unwrap() | ||
.clone() | ||
.try_into() | ||
.unwrap() | ||
} | ||
|
||
pub async fn start_producer() -> Result<(), Box<dyn std::error::Error>> { | ||
use rabbitmq_stream_client::Environment; | ||
let environment = Environment::builder().build().await?; | ||
println!("Super Stream Producer connected to RabbitMQ"); | ||
let confirmed_messages = Arc::new(AtomicU32::new(0)); | ||
let notify_on_send = Arc::new(Notify::new()); | ||
let _ = environment | ||
.stream_creator() | ||
.max_length(ByteCapacity::GB(5)) | ||
.create_super_stream(crate::SUPER_STREAM, 3, None) | ||
.await; | ||
|
||
let create_response = environment | ||
.stream_creator() | ||
.max_length(ByteCapacity::GB(5)) | ||
.create_super_stream(crate::SUPER_STREAM, 3, None) | ||
.await; | ||
|
||
if let Err(e) = create_response { | ||
if let StreamCreateError::Create { stream, status } = e { | ||
match status { | ||
// we can ignore this error because the stream already exists | ||
ResponseCode::StreamAlreadyExists => {} | ||
err => { | ||
println!( | ||
"[Super Stream Producer] Error creating stream: {:?} {:?}", | ||
stream, err | ||
); | ||
} | ||
} | ||
} | ||
} | ||
|
||
let super_stream_producer = environment | ||
.super_stream_producer(RoutingStrategy::HashRoutingStrategy( | ||
HashRoutingMurmurStrategy { | ||
routing_extractor: &hash_strategy_value_extractor, | ||
}, | ||
)) | ||
.client_provided_name("rust stream producer - sac example") | ||
.build(crate::SUPER_STREAM) | ||
.await; | ||
|
||
match super_stream_producer { | ||
Ok(mut producer) => { | ||
println!("[Super Stream Producer] Successfully created super stream producer"); | ||
let mut idx = 0; | ||
loop { | ||
let counter = confirmed_messages.clone(); | ||
let notifier = notify_on_send.clone(); | ||
let msg = Message::builder() | ||
.body(format!("super stream message_{}", idx)) | ||
.application_properties() | ||
.insert("id", idx.to_string()) | ||
.message_builder() | ||
.build(); | ||
|
||
let send_result = producer | ||
.send(msg, move |_| { | ||
let inner_counter = counter.clone(); | ||
let inner_notifier = notifier.clone(); | ||
async move { | ||
if inner_counter.fetch_add(1, Ordering::Relaxed) == idx - 1 { | ||
inner_notifier.notify_one(); | ||
} | ||
} | ||
}) | ||
.await; | ||
|
||
match send_result { | ||
Ok(_) => { | ||
idx += 1; | ||
println!( | ||
"[Super Stream Producer] Message {} sent to {}", | ||
idx, | ||
crate::SUPER_STREAM | ||
); | ||
} | ||
Err(err) => { | ||
println!( | ||
"[Super Stream Producer] Failed to send message. error: {}", | ||
err | ||
); | ||
} | ||
} | ||
|
||
time::sleep(time::Duration::from_millis(1_000)).await; | ||
} | ||
} | ||
Err(err) => { | ||
println!("Failed to create super stream producer. error {}", err); | ||
Ok(()) | ||
} | ||
} | ||
} |
Oops, something went wrong.