From 0aba4b1d6539f65d01edc0cdd68ad9369518a1c3 Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Mon, 11 Nov 2024 10:50:48 +0100 Subject: [PATCH] adding basic test --- src/consumer.rs | 4 +- tests/integration/common.rs | 9 ++ tests/integration/consumer_test.rs | 129 ++++++++++++++++++++++++++++- 3 files changed, 137 insertions(+), 5 deletions(-) diff --git a/src/consumer.rs b/src/consumer.rs index f651b94..4eb3567 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -99,8 +99,8 @@ impl MessageContext { self.consumer_name } - pub fn get_stream(self) -> String { - self.stream + pub fn get_stream(&self) -> String { + self.stream.clone() } } diff --git a/tests/integration/common.rs b/tests/integration/common.rs index ef36ab9..37c82b5 100644 --- a/tests/integration/common.rs +++ b/tests/integration/common.rs @@ -1,8 +1,10 @@ use std::{collections::HashMap, future::Future, sync::Arc}; use fake::{Fake, Faker}; +use rabbitmq_stream_client::types::MessageContext; use rabbitmq_stream_client::{Client, ClientOptions, Environment}; use rabbitmq_stream_protocol::commands::generic::GenericResponse; +use rabbitmq_stream_protocol::commands::subscribe::OffsetSpecification; use rabbitmq_stream_protocol::ResponseCode; use tokio::sync::Semaphore; @@ -174,3 +176,10 @@ pub async fn create_generic_super_stream( return (response, partitions); } + +pub fn consumer_update_handler( + is_active: u8, + message_context: &MessageContext, +) -> OffsetSpecification { + OffsetSpecification::First +} diff --git a/tests/integration/consumer_test.rs b/tests/integration/consumer_test.rs index f4f1899..0f26aac 100644 --- a/tests/integration/consumer_test.rs +++ b/tests/integration/consumer_test.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use crate::common::TestEnvironment; +use crate::common::{consumer_update_handler, TestEnvironment}; use fake::{Fake, Faker}; use futures::StreamExt; use rabbitmq_stream_client::{ @@ -11,10 +11,10 @@ use rabbitmq_stream_client::{ types::{Delivery, Message, OffsetSpecification, SuperStreamConsumer}, Consumer, FilterConfiguration, NoDedup, Producer, }; -use tokio::task; -use tokio::time::sleep; +use std::collections::HashMap; use crate::producer_test::routing_key_strategy_value_extractor; +use rabbitmq_stream_client::types::MessageContext; use rabbitmq_stream_client::types::{ HashRoutingMurmurStrategy, RoutingKeyRoutingStrategy, RoutingStrategy, }; @@ -667,3 +667,126 @@ async fn consumer_test_with_filtering_match_unfiltered() { assert!(repsonse_length == filtering_response_length); } + +#[tokio::test(flavor = "multi_thread")] +async fn super_stream_single_active_consumer_test() { + let env = TestEnvironment::create_super_stream().await; + + let message_count = 1000; + let mut super_stream_producer = env + .env + .super_stream_producer(RoutingStrategy::HashRoutingStrategy( + HashRoutingMurmurStrategy { + routing_extractor: &hash_strategy_value_extractor, + }, + )) + .client_provided_name("test super stream consumer ") + .build(&env.super_stream) + .await + .unwrap(); + + let mut properties = HashMap::new(); + + properties.insert("single-active-consumer".to_string(), "true".to_string()); + properties.insert("name".to_string(), "consumer-group-1".to_string()); + properties.insert("super-stream".to_string(), env.super_stream.clone()); + + let mut super_stream_consumer: SuperStreamConsumer = env + .env + .super_stream_consumer() + .offset(OffsetSpecification::First) + .properties(properties) + .build(&env.super_stream) + .await + .unwrap(); + + for n in 0..message_count { + let msg = Message::builder().body(format!("message{}", n)).build(); + let _ = super_stream_producer + .send(msg, |confirmation_status| async move {}) + .await + .unwrap(); + } + + let mut received_messages = 0; + let handle = super_stream_consumer.handle(); + + while let _ = super_stream_consumer.next().await.unwrap() { + received_messages = received_messages + 1; + if received_messages == message_count { + break; + } + } + + assert!(received_messages == message_count); + + super_stream_producer.close().await.unwrap(); + _ = handle.close().await; +} + +#[tokio::test(flavor = "multi_thread")] +async fn super_stream_single_active_consumer_test_with_callback() { + let env = TestEnvironment::create_super_stream().await; + + let message_count = 1000; + let mut super_stream_producer = env + .env + .super_stream_producer(RoutingStrategy::HashRoutingStrategy( + HashRoutingMurmurStrategy { + routing_extractor: &hash_strategy_value_extractor, + }, + )) + .client_provided_name("test super stream consumer ") + .build(&env.super_stream) + .await + .unwrap(); + + let mut properties = HashMap::new(); + + properties.insert("single-active-consumer".to_string(), "true".to_string()); + properties.insert("name".to_string(), "consumer-group-1".to_string()); + properties.insert("super-stream".to_string(), env.super_stream.clone()); + + let mut is_active: u8 = 0; + let mut stream: String = String::from(""); + + let mut super_stream_consumer: SuperStreamConsumer = env + .env + .super_stream_consumer() + .offset(OffsetSpecification::First) + .properties(properties) + .consumer_update(move |active, message_context| { + let mut stream_int = stream.clone(); + async move { + is_active = active; + stream_int = message_context.get_stream().clone(); + }; + OffsetSpecification::First + }) + .build(&env.super_stream) + .await + .unwrap(); + + for n in 0..message_count { + let msg = Message::builder().body(format!("message{}", n)).build(); + let _ = super_stream_producer + .send(msg, |confirmation_status| async move {}) + .await + .unwrap(); + } + + let mut received_messages = 0; + let handle = super_stream_consumer.handle(); + + while let _ = super_stream_consumer.next().await.unwrap() { + received_messages = received_messages + 1; + if received_messages == message_count { + break; + } + } + + assert!(received_messages == message_count); + + super_stream_producer.close().await.unwrap(); + _ = handle.close().await; +}