diff --git a/glide-core/src/client/reconnecting_connection.rs b/glide-core/src/client/reconnecting_connection.rs index d79a59c574..ac33f6c005 100644 --- a/glide-core/src/client/reconnecting_connection.rs +++ b/glide-core/src/client/reconnecting_connection.rs @@ -7,6 +7,7 @@ use futures_intrusive::sync::ManualResetEvent; use logger_core::{log_debug, log_trace, log_warn}; use redis::aio::MultiplexedConnection; use redis::{RedisConnectionInfo, RedisError, RedisResult}; +use std::fmt; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::sync::Mutex; @@ -46,6 +47,12 @@ pub(super) struct ReconnectingConnection { inner: Arc, } +impl fmt::Debug for ReconnectingConnection { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.node_address()) + } +} + async fn get_multiplexed_connection(client: &redis::Client) -> RedisResult { run_with_timeout( Some(DEFAULT_CONNECTION_ATTEMPT_TIMEOUT), @@ -149,6 +156,15 @@ impl ReconnectingConnection { create_connection(backend, connection_retry_strategy).await } + fn node_address(&self) -> String { + self.inner + .backend + .connection_info + .get_connection_info() + .addr + .to_string() + } + pub(super) fn is_dropped(&self) -> bool { self.inner .backend diff --git a/glide-core/src/client/standalone_client.rs b/glide-core/src/client/standalone_client.rs index 79246a7b76..736155bbf0 100644 --- a/glide-core/src/client/standalone_client.rs +++ b/glide-core/src/client/standalone_client.rs @@ -16,6 +16,7 @@ use std::sync::Arc; #[cfg(standalone_heartbeat)] use tokio::task; +#[derive(Debug)] enum ReadFrom { Primary, PreferReplica { @@ -23,6 +24,7 @@ enum ReadFrom { }, } +#[derive(Debug)] struct DropWrapper { /// Connection to the primary node in the client. primary_index: usize, @@ -38,7 +40,7 @@ impl Drop for DropWrapper { } } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct StandaloneClient { inner: Arc, } @@ -46,6 +48,7 @@ pub struct StandaloneClient { pub enum StandaloneClientConnectionError { NoAddressesProvided, FailedConnection(Vec<(Option, RedisError)>), + PrimaryConflictFound(String), } impl std::fmt::Debug for StandaloneClientConnectionError { @@ -80,6 +83,12 @@ impl std::fmt::Debug for StandaloneClientConnectionError { }; Ok(()) } + StandaloneClientConnectionError::PrimaryConflictFound(found_primaries) => { + writeln!( + f, + "Primary conflict. More than one primary found in a Standalone setup: {found_primaries}" + ) + } } } } @@ -116,11 +125,20 @@ impl StandaloneClient { match result { Ok((connection, replication_status)) => { nodes.push(connection); - if primary_index.is_none() - && redis::from_owned_redis_value::(replication_status) - .is_ok_and(|val| val.contains("role:master")) + if redis::from_owned_redis_value::(replication_status) + .is_ok_and(|val| val.contains("role:master")) { - primary_index = Some(nodes.len() - 1); + if let Some(primary_index) = primary_index { + // More than one primary found + return Err(StandaloneClientConnectionError::PrimaryConflictFound( + format!( + "Primary nodes: {:?}, {:?}", + nodes.pop(), + nodes.get(primary_index) + ), + )); + } + primary_index = Some(nodes.len().saturating_sub(1)); } } Err((address, (connection, err))) => { diff --git a/glide-core/tests/test_standalone_client.rs b/glide-core/tests/test_standalone_client.rs index 04a87a4cb6..aa7f3b6609 100644 --- a/glide-core/tests/test_standalone_client.rs +++ b/glide-core/tests/test_standalone_client.rs @@ -5,10 +5,15 @@ mod utilities; #[cfg(test)] mod standalone_client_tests { + use std::collections::HashMap; + use crate::utilities::mocks::{Mock, ServerMock}; use super::*; - use glide_core::{client::StandaloneClient, connection_request::ReadFrom}; + use glide_core::{ + client::{ConnectionError, StandaloneClient}, + connection_request::ReadFrom, + }; use redis::{FromRedisValue, Value}; use rstest::rstest; use utilities::*; @@ -87,10 +92,11 @@ mod standalone_client_tests { }); } - fn create_primary_mock_with_replicas(replica_count: usize) -> Vec { - let mut listeners: Vec = (0..replica_count + 1) - .map(|_| get_listener_on_available_port()) - .collect(); + fn get_mock_addresses(mocks: &[ServerMock]) -> Vec { + mocks.iter().flat_map(|mock| mock.get_addresses()).collect() + } + + fn create_primary_responses() -> HashMap { let mut primary_responses = std::collections::HashMap::new(); primary_responses.insert( "*1\r\n$4\r\nPING\r\n".to_string(), @@ -100,8 +106,10 @@ mod standalone_client_tests { "*2\r\n$4\r\nINFO\r\n$11\r\nREPLICATION\r\n".to_string(), Value::BulkString(b"role:master\r\nconnected_slaves:3\r\n".to_vec()), ); - let primary = ServerMock::new_with_listener(primary_responses, listeners.pop().unwrap()); - let mut mocks = vec![primary]; + primary_responses + } + + fn create_replica_response() -> HashMap { let mut replica_responses = std::collections::HashMap::new(); replica_responses.insert( "*1\r\n$4\r\nPING\r\n".to_string(), @@ -111,10 +119,32 @@ mod standalone_client_tests { "*2\r\n$4\r\nINFO\r\n$11\r\nREPLICATION\r\n".to_string(), Value::BulkString(b"role:slave\r\n".to_vec()), ); + replica_responses + } + fn create_primary_conflict_mock_two_primaries_one_replica() -> Vec { + let mut listeners: Vec = + (0..3).map(|_| get_listener_on_available_port()).collect(); + let primary_1 = + ServerMock::new_with_listener(create_primary_responses(), listeners.pop().unwrap()); + let primary_2 = + ServerMock::new_with_listener(create_primary_responses(), listeners.pop().unwrap()); + let replica = + ServerMock::new_with_listener(create_replica_response(), listeners.pop().unwrap()); + vec![primary_1, primary_2, replica] + } + + fn create_primary_mock_with_replicas(replica_count: usize) -> Vec { + let mut listeners: Vec = (0..replica_count + 1) + .map(|_| get_listener_on_available_port()) + .collect(); + let primary = + ServerMock::new_with_listener(create_primary_responses(), listeners.pop().unwrap()); + let mut mocks = vec![primary]; + mocks.extend( listeners .into_iter() - .map(|listener| ServerMock::new_with_listener(replica_responses.clone(), listener)), + .map(|listener| ServerMock::new_with_listener(create_replica_response(), listener)), ); mocks } @@ -156,8 +186,7 @@ mod standalone_client_tests { } } - let mut addresses: Vec = - mocks.iter().flat_map(|mock| mock.get_addresses()).collect(); + let mut addresses = get_mock_addresses(&mocks); for i in 4 - config.number_of_missing_replicas..4 { addresses.push(redis::ConnectionAddr::Tcp( @@ -270,6 +299,33 @@ mod standalone_client_tests { #[rstest] #[serial_test::serial] #[timeout(SHORT_STANDALONE_TEST_TIMEOUT)] + fn test_primary_conflict_raises_error() { + let mocks = create_primary_conflict_mock_two_primaries_one_replica(); + let addresses = get_mock_addresses(&mocks); + let connection_request = + create_connection_request(addresses.as_slice(), &Default::default()); + block_on_all(async { + let client_res = StandaloneClient::create_client(connection_request.into()) + .await + .map_err(ConnectionError::Standalone); + assert!(client_res.is_err()); + let error = client_res.unwrap_err(); + assert!(matches!(error, ConnectionError::Standalone(_),)); + let primary_1_addr = addresses.first().unwrap().to_string(); + let primary_2_addr = addresses.get(1).unwrap().to_string(); + let replica_addr = addresses.get(2).unwrap().to_string(); + let err_msg = error.to_string().to_ascii_lowercase(); + assert!( + err_msg.contains("conflict") + && err_msg.contains(&primary_1_addr) + && err_msg.contains(&primary_2_addr) + && !err_msg.contains(&replica_addr) + ); + }); + } + + #[rstest] + #[timeout(SHORT_STANDALONE_TEST_TIMEOUT)] fn test_send_acl_request_to_all_nodes() { let mocks = create_primary_mock_with_replicas(2); let mut cmd = redis::cmd("ACL");