Skip to content

Commit

Permalink
Standalone Client: Raise an error if more than one primary node is fo…
Browse files Browse the repository at this point in the history
…und (#1487)
  • Loading branch information
barshaul authored Jun 3, 2024
1 parent 5050741 commit 1403456
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 15 deletions.
16 changes: 16 additions & 0 deletions glide-core/src/client/reconnecting_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use futures_intrusive::sync::ManualResetEvent;
use logger_core::{log_debug, log_trace, log_warn};
use redis::aio::MultiplexedConnection;
use redis::{RedisConnectionInfo, RedisError, RedisResult};
use std::fmt;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::Mutex;
Expand Down Expand Up @@ -46,6 +47,12 @@ pub(super) struct ReconnectingConnection {
inner: Arc<InnerReconnectingConnection>,
}

impl fmt::Debug for ReconnectingConnection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.node_address())
}
}

async fn get_multiplexed_connection(client: &redis::Client) -> RedisResult<MultiplexedConnection> {
run_with_timeout(
Some(DEFAULT_CONNECTION_ATTEMPT_TIMEOUT),
Expand Down Expand Up @@ -149,6 +156,15 @@ impl ReconnectingConnection {
create_connection(backend, connection_retry_strategy).await
}

fn node_address(&self) -> String {
self.inner
.backend
.connection_info
.get_connection_info()
.addr
.to_string()
}

pub(super) fn is_dropped(&self) -> bool {
self.inner
.backend
Expand Down
28 changes: 23 additions & 5 deletions glide-core/src/client/standalone_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ use std::sync::Arc;
#[cfg(standalone_heartbeat)]
use tokio::task;

#[derive(Debug)]
enum ReadFrom {
Primary,
PreferReplica {
latest_read_replica_index: Arc<std::sync::atomic::AtomicUsize>,
},
}

#[derive(Debug)]
struct DropWrapper {
/// Connection to the primary node in the client.
primary_index: usize,
Expand All @@ -38,14 +40,15 @@ impl Drop for DropWrapper {
}
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct StandaloneClient {
inner: Arc<DropWrapper>,
}

pub enum StandaloneClientConnectionError {
NoAddressesProvided,
FailedConnection(Vec<(Option<String>, RedisError)>),
PrimaryConflictFound(String),
}

impl std::fmt::Debug for StandaloneClientConnectionError {
Expand Down Expand Up @@ -80,6 +83,12 @@ impl std::fmt::Debug for StandaloneClientConnectionError {
};
Ok(())
}
StandaloneClientConnectionError::PrimaryConflictFound(found_primaries) => {
writeln!(
f,
"Primary conflict. More than one primary found in a Standalone setup: {found_primaries}"
)
}
}
}
}
Expand Down Expand Up @@ -116,11 +125,20 @@ impl StandaloneClient {
match result {
Ok((connection, replication_status)) => {
nodes.push(connection);
if primary_index.is_none()
&& redis::from_owned_redis_value::<String>(replication_status)
.is_ok_and(|val| val.contains("role:master"))
if redis::from_owned_redis_value::<String>(replication_status)
.is_ok_and(|val| val.contains("role:master"))
{
primary_index = Some(nodes.len() - 1);
if let Some(primary_index) = primary_index {
// More than one primary found
return Err(StandaloneClientConnectionError::PrimaryConflictFound(
format!(
"Primary nodes: {:?}, {:?}",
nodes.pop(),
nodes.get(primary_index)
),
));
}
primary_index = Some(nodes.len().saturating_sub(1));
}
}
Err((address, (connection, err))) => {
Expand Down
76 changes: 66 additions & 10 deletions glide-core/tests/test_standalone_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@ mod utilities;

#[cfg(test)]
mod standalone_client_tests {
use std::collections::HashMap;

use crate::utilities::mocks::{Mock, ServerMock};

use super::*;
use glide_core::{client::StandaloneClient, connection_request::ReadFrom};
use glide_core::{
client::{ConnectionError, StandaloneClient},
connection_request::ReadFrom,
};
use redis::{FromRedisValue, Value};
use rstest::rstest;
use utilities::*;
Expand Down Expand Up @@ -87,10 +92,11 @@ mod standalone_client_tests {
});
}

fn create_primary_mock_with_replicas(replica_count: usize) -> Vec<ServerMock> {
let mut listeners: Vec<std::net::TcpListener> = (0..replica_count + 1)
.map(|_| get_listener_on_available_port())
.collect();
fn get_mock_addresses(mocks: &[ServerMock]) -> Vec<redis::ConnectionAddr> {
mocks.iter().flat_map(|mock| mock.get_addresses()).collect()
}

fn create_primary_responses() -> HashMap<String, Value> {
let mut primary_responses = std::collections::HashMap::new();
primary_responses.insert(
"*1\r\n$4\r\nPING\r\n".to_string(),
Expand All @@ -100,8 +106,10 @@ mod standalone_client_tests {
"*2\r\n$4\r\nINFO\r\n$11\r\nREPLICATION\r\n".to_string(),
Value::BulkString(b"role:master\r\nconnected_slaves:3\r\n".to_vec()),
);
let primary = ServerMock::new_with_listener(primary_responses, listeners.pop().unwrap());
let mut mocks = vec![primary];
primary_responses
}

fn create_replica_response() -> HashMap<String, Value> {
let mut replica_responses = std::collections::HashMap::new();
replica_responses.insert(
"*1\r\n$4\r\nPING\r\n".to_string(),
Expand All @@ -111,10 +119,32 @@ mod standalone_client_tests {
"*2\r\n$4\r\nINFO\r\n$11\r\nREPLICATION\r\n".to_string(),
Value::BulkString(b"role:slave\r\n".to_vec()),
);
replica_responses
}
fn create_primary_conflict_mock_two_primaries_one_replica() -> Vec<ServerMock> {
let mut listeners: Vec<std::net::TcpListener> =
(0..3).map(|_| get_listener_on_available_port()).collect();
let primary_1 =
ServerMock::new_with_listener(create_primary_responses(), listeners.pop().unwrap());
let primary_2 =
ServerMock::new_with_listener(create_primary_responses(), listeners.pop().unwrap());
let replica =
ServerMock::new_with_listener(create_replica_response(), listeners.pop().unwrap());
vec![primary_1, primary_2, replica]
}

fn create_primary_mock_with_replicas(replica_count: usize) -> Vec<ServerMock> {
let mut listeners: Vec<std::net::TcpListener> = (0..replica_count + 1)
.map(|_| get_listener_on_available_port())
.collect();
let primary =
ServerMock::new_with_listener(create_primary_responses(), listeners.pop().unwrap());
let mut mocks = vec![primary];

mocks.extend(
listeners
.into_iter()
.map(|listener| ServerMock::new_with_listener(replica_responses.clone(), listener)),
.map(|listener| ServerMock::new_with_listener(create_replica_response(), listener)),
);
mocks
}
Expand Down Expand Up @@ -156,8 +186,7 @@ mod standalone_client_tests {
}
}

let mut addresses: Vec<redis::ConnectionAddr> =
mocks.iter().flat_map(|mock| mock.get_addresses()).collect();
let mut addresses = get_mock_addresses(&mocks);

for i in 4 - config.number_of_missing_replicas..4 {
addresses.push(redis::ConnectionAddr::Tcp(
Expand Down Expand Up @@ -270,6 +299,33 @@ mod standalone_client_tests {
#[rstest]
#[serial_test::serial]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_primary_conflict_raises_error() {
let mocks = create_primary_conflict_mock_two_primaries_one_replica();
let addresses = get_mock_addresses(&mocks);
let connection_request =
create_connection_request(addresses.as_slice(), &Default::default());
block_on_all(async {
let client_res = StandaloneClient::create_client(connection_request.into())
.await
.map_err(ConnectionError::Standalone);
assert!(client_res.is_err());
let error = client_res.unwrap_err();
assert!(matches!(error, ConnectionError::Standalone(_),));
let primary_1_addr = addresses.first().unwrap().to_string();
let primary_2_addr = addresses.get(1).unwrap().to_string();
let replica_addr = addresses.get(2).unwrap().to_string();
let err_msg = error.to_string().to_ascii_lowercase();
assert!(
err_msg.contains("conflict")
&& err_msg.contains(&primary_1_addr)
&& err_msg.contains(&primary_2_addr)
&& !err_msg.contains(&replica_addr)
);
});
}

#[rstest]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_send_acl_request_to_all_nodes() {
let mocks = create_primary_mock_with_replicas(2);
let mut cmd = redis::cmd("ACL");
Expand Down

0 comments on commit 1403456

Please sign in to comment.