diff --git a/redis-test/src/lib.rs b/redis-test/src/lib.rs index 49094e4262..62931fb426 100644 --- a/redis-test/src/lib.rs +++ b/redis-test/src/lib.rs @@ -289,6 +289,10 @@ impl AioConnectionLike for MockRedisConnection { fn get_db(&self) -> i64 { 0 } + + fn get_ip(&self) -> Option { + None + } } #[cfg(test)] diff --git a/redis/src/aio/connection.rs b/redis/src/aio/connection.rs index cdca0801c9..9cfa0c872b 100644 --- a/redis/src/aio/connection.rs +++ b/redis/src/aio/connection.rs @@ -35,6 +35,7 @@ pub struct Connection>> { // This flag is checked when attempting to send a command, and if it's raised, we attempt to // exit the pubsub state before executing the new request. pubsub: bool, + ip: Option, } fn assert_sync() {} @@ -52,6 +53,7 @@ impl Connection { decoder, db, pubsub, + ip, } = self; Connection { con: f(con), @@ -59,6 +61,7 @@ impl Connection { decoder, db, pubsub, + ip, } } } @@ -69,13 +72,18 @@ where { /// Constructs a new `Connection` out of a `AsyncRead + AsyncWrite` object /// and a `RedisConnectionInfo` - pub async fn new(connection_info: &RedisConnectionInfo, con: C) -> RedisResult { + pub async fn new( + connection_info: &RedisConnectionInfo, + con: C, + ip: Option, + ) -> RedisResult { let mut rv = Connection { con, buf: Vec::new(), decoder: combine::stream::Decoder::new(), db: connection_info.db, pubsub: false, + ip, }; authenticate(connection_info, &mut rv).await?; Ok(rv) @@ -171,7 +179,7 @@ where /// Constructs a new `Connection` out of a `async_std::io::AsyncRead + async_std::io::AsyncWrite` object /// and a `RedisConnectionInfo` pub async fn new_async_std(connection_info: &RedisConnectionInfo, con: C) -> RedisResult { - Connection::new(connection_info, async_std::AsyncStdWrapped::new(con)).await + Connection::new(connection_info, async_std::AsyncStdWrapped::new(con), None).await } } @@ -179,8 +187,8 @@ pub(crate) async fn connect(connection_info: &ConnectionInfo) -> RedisResult< where C: Unpin + RedisRuntime + AsyncRead + AsyncWrite + Send, { - let con = connect_simple::(connection_info).await?; - Connection::new(&connection_info.redis, con).await + let (con, ip) = connect_simple::(connection_info).await?; + Connection::new(&connection_info.redis, con, ip).await } impl ConnectionLike for Connection @@ -253,6 +261,10 @@ where fn get_db(&self) -> i64 { self.db } + + fn get_ip(&self) -> Option { + todo!() + } } /// Represents a `PubSub` connection. @@ -382,7 +394,7 @@ pub(crate) async fn get_socket_addrs( pub(crate) async fn connect_simple( connection_info: &ConnectionInfo, -) -> RedisResult { +) -> RedisResult<(T, Option)> { Ok(match connection_info.addr { ConnectionAddr::Tcp(ref host, port) => { let socket_addrs = get_socket_addrs(host, port).await?; @@ -395,7 +407,10 @@ pub(crate) async fn connect_simple( |acc, socket_addr| async move { match acc { ok @ Ok(_) => ok, - Err(_) => ::connect_tcp(socket_addr).await, + Err(_) => { + let conn = ::connect_tcp(socket_addr).await?; + Ok((conn, Some(socket_addr.ip().to_string()))) + } } }, ) @@ -406,7 +421,7 @@ pub(crate) async fn connect_simple( ConnectionAddr::TcpTls { ref host, port, - insecure, + insecure: _, } => { let socket_addrs = get_socket_addrs(host, port).await?; stream::iter(socket_addrs) @@ -418,7 +433,10 @@ pub(crate) async fn connect_simple( |acc, socket_addr| async move { match acc { ok @ Ok(_) => ok, - Err(_) => ::connect_tcp_tls(host, socket_addr, insecure).await, + Err(_) => { + let conn = ::connect_tcp(socket_addr).await?; + Ok((conn, Some(socket_addr.ip().to_string()))) + } } }, ) @@ -434,7 +452,7 @@ pub(crate) async fn connect_simple( } #[cfg(unix)] - ConnectionAddr::Unix(ref path) => ::connect_unix(path).await?, + ConnectionAddr::Unix(ref path) => (::connect_unix(path).await?, None), #[cfg(not(unix))] ConnectionAddr::Unix(_) => { diff --git a/redis/src/aio/connection_manager.rs b/redis/src/aio/connection_manager.rs index 32633ab2de..bf0f67d6a3 100644 --- a/redis/src/aio/connection_manager.rs +++ b/redis/src/aio/connection_manager.rs @@ -231,4 +231,8 @@ impl ConnectionLike for ConnectionManager { fn get_db(&self) -> i64 { self.client.connection_info().redis.db } + + fn get_ip(&self) -> Option { + None + } } diff --git a/redis/src/aio/mod.rs b/redis/src/aio/mod.rs index eaa29fd0b3..3237531a3e 100644 --- a/redis/src/aio/mod.rs +++ b/redis/src/aio/mod.rs @@ -72,6 +72,10 @@ pub trait ConnectionLike { /// also might be incorrect if the connection like object is not /// actually connected. fn get_db(&self) -> i64; + + /// Returns the connection's IP if it's a TCP connection and the connection is established, + /// otherwise returns None. + fn get_ip(&self) -> Option; } async fn authenticate(connection_info: &RedisConnectionInfo, con: &mut C) -> RedisResult<()> diff --git a/redis/src/aio/multiplexed_connection.rs b/redis/src/aio/multiplexed_connection.rs index 499b75dd24..6d45f0ec8a 100644 --- a/redis/src/aio/multiplexed_connection.rs +++ b/redis/src/aio/multiplexed_connection.rs @@ -1,7 +1,7 @@ use super::ConnectionLike; use crate::aio::authenticate; use crate::cmd::Cmd; -use crate::connection::RedisConnectionInfo; +use crate::connection::ConnectionInfo; #[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))] use crate::parser::ValueCodec; use crate::types::{RedisError, RedisFuture, RedisResult, Value}; @@ -311,6 +311,7 @@ where pub struct MultiplexedConnection { pipeline: Pipeline, Value, RedisError>, db: i64, + ip: Option, } impl Debug for MultiplexedConnection { @@ -326,8 +327,9 @@ impl MultiplexedConnection { /// Constructs a new `MultiplexedConnection` out of a `AsyncRead + AsyncWrite` object /// and a `ConnectionInfo` pub async fn new( - connection_info: &RedisConnectionInfo, + connection_info: &ConnectionInfo, stream: C, + ip: Option, ) -> RedisResult<(Self, impl Future)> where C: Unpin + AsyncRead + AsyncWrite + Send + 'static, @@ -348,10 +350,11 @@ impl MultiplexedConnection { let driver = boxed(driver); let mut con = MultiplexedConnection { pipeline, - db: connection_info.db, + db: connection_info.redis.db, + ip, }; let driver = { - let auth = authenticate(connection_info, &mut con); + let auth = authenticate(&connection_info.redis, &mut con); futures_util::pin_mut!(auth); match futures_util::future::select(auth, driver).await { @@ -419,4 +422,8 @@ impl ConnectionLike for MultiplexedConnection { fn get_db(&self) -> i64 { self.db } + + fn get_ip(&self) -> Option { + self.ip.clone() + } } diff --git a/redis/src/client.rs b/redis/src/client.rs index dd700aa0a7..c7ad753738 100644 --- a/redis/src/client.rs +++ b/redis/src/client.rs @@ -72,7 +72,7 @@ impl Client { /// Returns an async connection from the client. #[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))] pub async fn get_async_connection(&self) -> RedisResult { - let con = match Runtime::locate() { + let (con, ip) = match Runtime::locate() { #[cfg(feature = "tokio-comp")] Runtime::Tokio => { self.get_simple_async_connection::() @@ -85,7 +85,7 @@ impl Client { } }; - crate::aio::Connection::new(&self.connection_info.redis, con).await + crate::aio::Connection::new(&self.connection_info.redis, con, ip).await } /// Returns an async connection from the client. @@ -268,19 +268,21 @@ impl Client { where T: crate::aio::RedisRuntime, { - let con = self.get_simple_async_connection::().await?; - crate::aio::MultiplexedConnection::new(&self.connection_info.redis, con).await + let (con, ip) = self.get_simple_async_connection::().await?; + crate::aio::MultiplexedConnection::new(&self.connection_info, con, ip).await } async fn get_simple_async_connection( &self, - ) -> RedisResult>> + ) -> RedisResult<( + Pin>, + Option, + )> where T: crate::aio::RedisRuntime, { - Ok(crate::aio::connect_simple::(&self.connection_info) - .await? - .boxed()) + let (conn, ip) = crate::aio::connect_simple::(&self.connection_info).await?; + Ok((conn.boxed(), ip)) } #[cfg(feature = "connection-manager")] diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index 19755fc812..75cee76d74 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -1110,6 +1110,26 @@ where } } + /// This function takes a node's address, examines if its host has encountered a DNS change, where the node's endpoint now leads to a different IP address. + /// If no socket addresses are discovered for the node's host address, or if it's a non-DNS address, it returns false. + /// In case the node's host address resolves to socket addresses and none of them match the current connection's IP, a DNS change is detected, resulting in a true return. + async fn is_dns_changed(addr: &str, curr_ip: &String) -> bool { + let (host, port) = match get_host_and_port_from_addr(addr) { + Some((host, port)) => (host, port), + None => return false, + }; + let updated_addresses = match get_socket_addrs(host, port).await { + Ok(socket_addrs) => socket_addrs, + Err(_) => return false, + }; + for socket_addr in updated_addresses { + if socket_addr.ip().to_string() == *curr_ip { + return false; + } + } + true + } + async fn get_or_create_conn( addr: &str, conn_option: Option>, @@ -1117,6 +1137,13 @@ where ) -> RedisResult { if let Some(conn) = conn_option { let mut conn = conn.await; + if let Some(ip) = conn.get_ip() { + // Check for a DNS change + if Self::is_dns_changed(addr, &ip).await { + // A DNS change is detected, create a new connection + return connect_and_check(addr, params.clone()).await; + } + }; match check_connection(&mut conn, params.connection_timeout.into()).await { Ok(_) => Ok(conn), Err(_) => connect_and_check(addr, params.clone()).await, @@ -1289,7 +1316,12 @@ where fn get_db(&self) -> i64 { 0 } + + fn get_ip(&self) -> Option { + None + } } + /// Implements the process of connecting to a Redis server /// and obtaining a connection handle. pub trait Connect: Sized { @@ -1338,7 +1370,6 @@ async fn check_connection(conn: &mut C, timeout: futures_time::time::Duration where C: ConnectionLike + Send + 'static, { - // TODO: Add a check to re-resolve DNS addresses to verify we that we have a connection to the right node crate::cmd("PING") .query_async::<_, String>(conn) .timeout(timeout) diff --git a/redis/tests/support/mock_cluster.rs b/redis/tests/support/mock_cluster.rs index f6c8e7746d..d5b0fd99e2 100644 --- a/redis/tests/support/mock_cluster.rs +++ b/redis/tests/support/mock_cluster.rs @@ -28,6 +28,7 @@ static HANDLERS: Lazy>> = Lazy::new(Default::def pub struct MockConnection { pub handler: Handler, pub port: u16, + pub ip: String, } #[cfg(feature = "cluster-async")] @@ -50,6 +51,7 @@ impl cluster_async::Connect for MockConnection { .unwrap_or_else(|| panic!("Handler `{name}` were not installed")) .clone(), port, + ip: name.clone(), })) } } @@ -73,6 +75,7 @@ impl cluster::Connect for MockConnection { .unwrap_or_else(|| panic!("Handler `{name}` were not installed")) .clone(), port, + ip: name.clone(), }) } @@ -225,6 +228,10 @@ impl aio::ConnectionLike for MockConnection { fn get_db(&self) -> i64 { 0 } + + fn get_ip(&self) -> Option { + None + } } impl redis::ConnectionLike for MockConnection { diff --git a/redis/tests/test_cluster_async.rs b/redis/tests/test_cluster_async.rs index 13e6264379..8f9eb17dfd 100644 --- a/redis/tests/test_cluster_async.rs +++ b/redis/tests/test_cluster_async.rs @@ -273,6 +273,10 @@ impl ConnectionLike for ErrorConnection { fn get_db(&self) -> i64 { self.inner.get_db() } + + fn get_ip(&self) -> Option { + None + } } #[test]