Skip to content

Commit

Permalink
adding basic test
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielePalaia committed Nov 11, 2024
1 parent 68ef2ad commit fc9578d
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 5 deletions.
4 changes: 2 additions & 2 deletions src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ impl MessageContext {
self.consumer_name
}

pub fn get_stream(self) -> String {
self.stream
pub fn get_stream(&self) -> String {
self.stream.clone()
}
}

Expand Down
129 changes: 126 additions & 3 deletions tests/integration/consumer_test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::time::Duration;

use crate::common::TestEnvironment;
use crate::common::{consumer_update_handler, TestEnvironment};
use fake::{Fake, Faker};
use futures::StreamExt;
use rabbitmq_stream_client::{
Expand All @@ -11,10 +11,10 @@ use rabbitmq_stream_client::{
types::{Delivery, Message, OffsetSpecification, SuperStreamConsumer},
Consumer, FilterConfiguration, NoDedup, Producer,
};
use tokio::task;
use tokio::time::sleep;
use std::collections::HashMap;

use crate::producer_test::routing_key_strategy_value_extractor;
use rabbitmq_stream_client::types::MessageContext;
use rabbitmq_stream_client::types::{
HashRoutingMurmurStrategy, RoutingKeyRoutingStrategy, RoutingStrategy,
};
Expand Down Expand Up @@ -667,3 +667,126 @@ async fn consumer_test_with_filtering_match_unfiltered() {

assert!(repsonse_length == filtering_response_length);
}

#[tokio::test(flavor = "multi_thread")]
async fn super_stream_single_active_consumer_test() {
let env = TestEnvironment::create_super_stream().await;

let message_count = 1000;
let mut super_stream_producer = env
.env
.super_stream_producer(RoutingStrategy::HashRoutingStrategy(
HashRoutingMurmurStrategy {
routing_extractor: &hash_strategy_value_extractor,
},
))
.client_provided_name("test super stream consumer ")
.build(&env.super_stream)
.await
.unwrap();

let mut properties = HashMap::new();

properties.insert("single-active-consumer".to_string(), "true".to_string());
properties.insert("name".to_string(), "consumer-group-1".to_string());
properties.insert("super-stream".to_string(), env.super_stream.clone());

let mut super_stream_consumer: SuperStreamConsumer = env
.env
.super_stream_consumer()
.offset(OffsetSpecification::First)
.properties(properties)
.build(&env.super_stream)
.await
.unwrap();

for n in 0..message_count {
let msg = Message::builder().body(format!("message{}", n)).build();
let _ = super_stream_producer
.send(msg, |confirmation_status| async move {})
.await
.unwrap();
}

let mut received_messages = 0;
let handle = super_stream_consumer.handle();

while let _ = super_stream_consumer.next().await.unwrap() {
received_messages = received_messages + 1;
if received_messages == message_count {
break;
}
}

assert!(received_messages == message_count);

super_stream_producer.close().await.unwrap();
_ = handle.close().await;
}

#[tokio::test(flavor = "multi_thread")]
async fn super_stream_single_active_consumer_test_with_callback() {
let env = TestEnvironment::create_super_stream().await;

let message_count = 1000;
let mut super_stream_producer = env
.env
.super_stream_producer(RoutingStrategy::HashRoutingStrategy(
HashRoutingMurmurStrategy {
routing_extractor: &hash_strategy_value_extractor,
},
))
.client_provided_name("test super stream consumer ")
.build(&env.super_stream)
.await
.unwrap();

let mut properties = HashMap::new();

properties.insert("single-active-consumer".to_string(), "true".to_string());
properties.insert("name".to_string(), "consumer-group-1".to_string());
properties.insert("super-stream".to_string(), env.super_stream.clone());

let mut is_active: u8 = 0;
let mut stream: String = String::from("");

let mut super_stream_consumer: SuperStreamConsumer = env
.env
.super_stream_consumer()
.offset(OffsetSpecification::First)
.properties(properties)
.consumer_update(move |active, message_context| {
let mut stream_int = stream.clone();
async move {
is_active = active;
stream_int = message_context.get_stream().clone();
};
OffsetSpecification::First
})
.build(&env.super_stream)
.await
.unwrap();

for n in 0..message_count {
let msg = Message::builder().body(format!("message{}", n)).build();
let _ = super_stream_producer
.send(msg, |confirmation_status| async move {})
.await
.unwrap();
}

let mut received_messages = 0;
let handle = super_stream_consumer.handle();

while let _ = super_stream_consumer.next().await.unwrap() {
received_messages = received_messages + 1;
if received_messages == message_count {
break;
}
}

assert!(received_messages == message_count);

super_stream_producer.close().await.unwrap();
_ = handle.close().await;
}

0 comments on commit fc9578d

Please sign in to comment.