Skip to content

Commit

Permalink
improved test
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielePalaia committed Nov 11, 2024
1 parent fc9578d commit 5040aa4
Showing 1 changed file with 73 additions and 11 deletions.
84 changes: 73 additions & 11 deletions tests/integration/consumer_test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::time::Duration;

use crate::common::{consumer_update_handler, TestEnvironment};
use crate::common::TestEnvironment;
use fake::{Fake, Faker};
use futures::StreamExt;
use rabbitmq_stream_client::{
Expand All @@ -20,6 +20,8 @@ use rabbitmq_stream_client::types::{
};
use rabbitmq_stream_protocol::ResponseCode;
use std::sync::atomic::{AtomicU32, Ordering};
use tokio::sync::Notify;
use tokio::task;
use {std::sync::Arc, std::sync::Mutex};

#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -686,6 +688,7 @@ async fn super_stream_single_active_consumer_test() {
.unwrap();

let mut properties = HashMap::new();
let notify_received_messages = Arc::new(Notify::new());

properties.insert("single-active-consumer".to_string(), "true".to_string());
properties.insert("name".to_string(), "consumer-group-1".to_string());
Expand All @@ -695,7 +698,25 @@ async fn super_stream_single_active_consumer_test() {
.env
.super_stream_consumer()
.offset(OffsetSpecification::First)
.properties(properties)
.properties(properties.clone())
.build(&env.super_stream)
.await
.unwrap();

let mut super_stream_consumer_2: SuperStreamConsumer = env
.env
.super_stream_consumer()
.offset(OffsetSpecification::First)
.properties(properties.clone())
.build(&env.super_stream)
.await
.unwrap();

let mut super_stream_consumer_3: SuperStreamConsumer = env
.env
.super_stream_consumer()
.offset(OffsetSpecification::First)
.properties(properties.clone())
.build(&env.super_stream)
.await
.unwrap();
Expand All @@ -708,20 +729,61 @@ async fn super_stream_single_active_consumer_test() {
.unwrap();
}

let mut received_messages = 0;
let handle = super_stream_consumer.handle();
let mut received_messages = Arc::new(AtomicU32::new(1));
let handle_consumer_1 = super_stream_consumer.handle();
let handle_consumer_2 = super_stream_consumer_2.handle();
let handle_consumer_3 = super_stream_consumer_3.handle();

let received_message_outer = received_messages.clone();
let notify_received_messages_outer = notify_received_messages.clone();
task::spawn(async move {
let received_messages_int = received_message_outer.clone();
let notify_received_messages_inner = notify_received_messages_outer.clone();
while let _ = super_stream_consumer.next().await.unwrap() {
let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed);
if message_count == rec_msg {
notify_received_messages_inner.notify_one();
break;
}
}
});

while let _ = super_stream_consumer.next().await.unwrap() {
received_messages = received_messages + 1;
if received_messages == message_count {
break;
let received_message_outer = received_messages.clone();
let notify_received_messages_outer = notify_received_messages.clone();
task::spawn(async move {
let received_messages_int = received_message_outer.clone();
let notify_received_messages_inner = notify_received_messages_outer.clone();
while let _ = super_stream_consumer_2.next().await.unwrap() {
let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed);
if message_count == rec_msg {
notify_received_messages_inner.notify_one();
break;
}
}
}
});

assert!(received_messages == message_count);
let received_message_outer = received_messages.clone();
let notify_received_messages_outer = notify_received_messages.clone();
task::spawn(async move {
let received_messages_int = received_message_outer.clone();
let notify_received_messages_inner = notify_received_messages_outer.clone();
while let _ = super_stream_consumer_3.next().await.unwrap() {
let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed);
if message_count == rec_msg {
notify_received_messages_inner.notify_one();
break;
}
}
});

notify_received_messages.notified().await;

assert!(received_messages.load(Ordering::Relaxed) == message_count + 1);

super_stream_producer.close().await.unwrap();
_ = handle.close().await;
_ = handle_consumer_1.close().await;
_ = handle_consumer_2.close().await;
_ = handle_consumer_3.close().await;
}

#[tokio::test(flavor = "multi_thread")]
Expand Down

0 comments on commit 5040aa4

Please sign in to comment.