diff --git a/redis/src/aio/multiplexed_connection.rs b/redis/src/aio/multiplexed_connection.rs index 1067bc2df..450b52f0a 100644 --- a/redis/src/aio/multiplexed_connection.rs +++ b/redis/src/aio/multiplexed_connection.rs @@ -349,7 +349,7 @@ where &mut self, item: SinkItem, timeout: Duration, - ) -> Result> { + ) -> Result { self.send_recv(item, None, timeout).await } @@ -359,7 +359,7 @@ where // If `None`, this is a single request, not a pipeline of multiple requests. pipeline_response_count: Option, timeout: Duration, - ) -> Result> { + ) -> Result { let (sender, receiver) = oneshot::channel(); self.sender @@ -369,15 +369,25 @@ where output: sender, }) .await - .map_err(|_| None)?; + .map_err(|err| { + // If an error occurs here, it means the request never reached the server. + // Since the server did not receive the data, it is safe to retry the request. + RedisError::from(( + crate::ErrorKind::IoErrorRetrySafe, + "Failed to send the request to the server", + format!("{err}"), + )) + })?; match Runtime::locate().timeout(timeout, receiver).await { - Ok(Ok(result)) => result.map_err(Some), + Ok(Ok(result)) => result, Ok(Err(_)) => { // The `sender` was dropped which likely means that the stream part - // failed for one reason or another - Err(None) + // failed for one reason or another. + // Since we don't know if the server received the request, retrying it isn't safe. + // Hence, we return an IoError instead of an IoErrorRetrySafe. + Err(RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe))) } - Err(elapsed) => Err(Some(elapsed.into())), + Err(elapsed) => Err(elapsed.into()), } } @@ -503,10 +513,7 @@ impl MultiplexedConnection { let result = self .pipeline .send_single(cmd.get_packed_command(), self.response_timeout) - .await - .map_err(|err| { - err.unwrap_or_else(|| RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe))) - }); + .await; if self.protocol != ProtocolVersion::RESP2 { if let Err(e) = &result { if e.is_connection_dropped() { @@ -537,10 +544,7 @@ impl MultiplexedConnection { Some(offset + count), self.response_timeout, ) - .await - .map_err(|err| { - err.unwrap_or_else(|| RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe))) - }); + .await; if self.protocol != ProtocolVersion::RESP2 { if let Err(e) = &result { diff --git a/redis/src/cluster.rs b/redis/src/cluster.rs index 5c0702d85..9a55e3513 100644 --- a/redis/src/cluster.rs +++ b/redis/src/cluster.rs @@ -771,7 +771,8 @@ where .wait_time_for_retry(retries); thread::sleep(sleep_time); } - crate::types::RetryMethod::Reconnect => { + crate::types::RetryMethod::Reconnect + | crate::types::RetryMethod::ReconnectAndRetry => { if *self.auto_reconnect.borrow() { if let Ok(mut conn) = self.connect(&addr) { if conn.check_connection() { diff --git a/redis/src/cluster_async/connections_container.rs b/redis/src/cluster_async/connections_container.rs index 2bfbb8b93..d4288c93e 100644 --- a/redis/src/cluster_async/connections_container.rs +++ b/redis/src/cluster_async/connections_container.rs @@ -255,16 +255,22 @@ where &self, amount: usize, conn_type: ConnectionType, - ) -> impl Iterator> + '_ { - self.connection_map - .iter() - .choose_multiple(&mut rand::thread_rng(), amount) - .into_iter() - .map(move |item| { - let (address, node) = (item.key(), item.value()); - let conn = node.get_connection(&conn_type); - (address.clone(), conn) - }) + ) -> Option> + '_> { + if self.connection_map.is_empty() { + None + } else { + Some( + self.connection_map + .iter() + .choose_multiple(&mut rand::thread_rng(), amount) + .into_iter() + .map(move |item| { + let (address, node) = (item.key(), item.value()); + let conn = node.get_connection(&conn_type); + (address.clone(), conn) + }), + ) + } } pub(crate) fn replace_or_add_connection_for_address( @@ -633,6 +639,7 @@ mod tests { let random_connections: HashSet<_> = container .random_connections(3, ConnectionType::User) + .expect("No connections found") .map(|pair| pair.1) .collect(); @@ -651,6 +658,7 @@ mod tests { 0, container .random_connections(1, ConnectionType::User) + .expect("No connections found") .count() ); } @@ -665,6 +673,7 @@ mod tests { ); let random_connections: Vec<_> = container .random_connections(1, ConnectionType::User) + .expect("No connections found") .collect(); assert_eq!(vec![(address, 4)], random_connections); @@ -675,6 +684,7 @@ mod tests { let container = create_container(); let mut random_connections: Vec<_> = container .random_connections(1000, ConnectionType::User) + .expect("No connections found") .map(|pair| pair.1) .collect(); random_connections.sort(); @@ -687,6 +697,7 @@ mod tests { let container = create_container_with_strategy(ReadFromReplicaStrategy::RoundRobin, true); let mut random_connections: Vec<_> = container .random_connections(1000, ConnectionType::PreferManagement) + .expect("No connections found") .map(|pair| pair.1) .collect(); random_connections.sort(); diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index d5dba5fe0..e415c7643 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -817,6 +817,7 @@ enum Next { // if not set, then a reconnect should happen without sending a request afterwards request: Option>, target: String, + should_retry: bool, }, RefreshSlots { // if not set, then a slot refresh should happen without sending a request afterwards @@ -860,6 +861,7 @@ impl Future for Request { let request = this.request.as_mut().unwrap(); // TODO - would be nice if we didn't need to repeat this code twice, with & without retries. if request.retry >= this.retry_params.number_of_retries { + let retry_method = err.retry_method(); let next = if err.kind() == ErrorKind::AllConnectionsUnavailable { Next::ReconnectToInitialNodes { request: None }.into() } else if matches!(err.retry_method(), crate::types::RetryMethod::MovedRedirect) @@ -870,11 +872,18 @@ impl Future for Request { sleep_duration: None, } .into() - } else if matches!(err.retry_method(), crate::types::RetryMethod::Reconnect) { + } else if matches!(retry_method, crate::types::RetryMethod::Reconnect) + || matches!(retry_method, crate::types::RetryMethod::ReconnectAndRetry) + { if let OperationTarget::Node { address } = target { + let should_retry = matches!( + retry_method, + crate::types::RetryMethod::ReconnectAndRetry + ); Next::Reconnect { request: None, target: address, + should_retry, } .into() } else { @@ -949,14 +958,20 @@ impl Future for Request { }); self.poll(cx) } - crate::types::RetryMethod::Reconnect => { + crate::types::RetryMethod::Reconnect + | crate::types::RetryMethod::ReconnectAndRetry => { let mut request = this.request.take().unwrap(); // TODO should we reset the redirect here? request.info.reset_routing(); warn!("disconnected from {:?}", address); + let should_retry = matches!( + err.retry_method(), + crate::types::RetryMethod::ReconnectAndRetry + ); Next::Reconnect { request: Some(request), target: address, + should_retry, } .into() } @@ -1201,8 +1216,11 @@ where Ok(connections.0) } - fn reconnect_to_initial_nodes(&mut self) -> impl Future { - let inner = self.inner.clone(); + // Reconnet to the initial nodes provided by the user in the creation of the client, + // and try to refresh the slots based on the initial connections. + // Being used when all cluster connections are unavailable. + fn reconnect_to_initial_nodes(inner: Arc>) -> impl Future { + let inner = inner.clone(); async move { let connection_map = match Self::create_initial_connections( &inner.initial_nodes, @@ -1708,7 +1726,9 @@ where Self::refresh_slots_inner(inner, curr_retry) .await .map_err(|err| { - if curr_retry > DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES { + if curr_retry > DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES + || err.kind() == ErrorKind::AllConnectionsUnavailable + { BackoffError::Permanent(err) } else { BackoffError::from(err) @@ -1817,7 +1837,6 @@ where core: Core, response_policy: Option, ) -> OperationResult { - trace!("execute_on_multiple_nodes"); let connections_container = core.conn_lock.read().await; if connections_container.is_empty() { return OperationResult::Err(( @@ -2101,14 +2120,20 @@ where } ConnectionCheck::RandomConnection => { let read_guard = core.conn_lock.read().await; - let (random_address, random_conn_future) = read_guard - .random_connections(1, ConnectionType::User) - .next() - .ok_or(RedisError::from(( + let random_conns_option = read_guard.random_connections(1, ConnectionType::User); + if let Some(mut random_connections) = random_conns_option { + let (random_address, random_conn_future) = + random_connections.next().ok_or(RedisError::from(( + ErrorKind::AllConnectionsUnavailable, + "No random connection found", + )))?; + return Ok((random_address, random_conn_future.await)); + } else { + return Err(RedisError::from(( ErrorKind::AllConnectionsUnavailable, "No random connection found", - )))?; - return Ok((random_address, random_conn_future.await)); + ))); + } } }; @@ -2123,29 +2148,42 @@ where ConnectionState::PollComplete => return Poll::Ready(Ok(())), ConnectionState::Recover(future) => future, }; - match recover_future { + let (next_state, poll) = match recover_future { RecoverFuture::RecoverSlots(ref mut future) => match ready!(future.as_mut().poll(cx)) { Ok(_) => { trace!("Recovered!"); - self.state = ConnectionState::PollComplete; - Poll::Ready(Ok(())) + (Some(ConnectionState::PollComplete), Poll::Ready(Ok(()))) } Err(err) => { trace!("Recover slots failed!"); - *future = Box::pin(Self::refresh_slots_and_subscriptions_with_retries( - self.inner.clone(), - &RefreshPolicy::Throttable, - )); - Poll::Ready(Err(err)) + + let next_state = if err.kind() == ErrorKind::AllConnectionsUnavailable { + Some(ConnectionState::Recover(RecoverFuture::Reconnect( + Box::pin(ClusterConnInner::reconnect_to_initial_nodes( + self.inner.clone(), + )), + ))) + } else { + Some(ConnectionState::Recover(RecoverFuture::RecoverSlots( + Box::pin(Self::refresh_slots_and_subscriptions_with_retries( + self.inner.clone(), + &RefreshPolicy::Throttable, + )), + ))) + }; + (next_state, Poll::Ready(Err(err))) } }, RecoverFuture::Reconnect(ref mut future) => { ready!(future.as_mut().poll(cx)); trace!("Reconnected connections"); - self.state = ConnectionState::PollComplete; - Poll::Ready(Ok(())) + (Some(ConnectionState::PollComplete), Poll::Ready(Ok(()))) } + }; + if let Some(state) = next_state { + self.state = state; } + poll } async fn handle_loading_error( @@ -2255,12 +2293,16 @@ where } } Next::Reconnect { - request, target, .. + request, + target, + should_retry, } => { poll_flush_action = poll_flush_action.change_state(PollFlushAction::Reconnect(vec![target])); if let Some(request) = request { - self.inner.pending_requests.lock().unwrap().push(request); + if should_retry { + self.inner.pending_requests.lock().unwrap().push(request); + } } } Next::ReconnectToInitialNodes { request } => { @@ -2399,7 +2441,7 @@ where } PollFlushAction::ReconnectFromInitialConnections => { self.state = ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin( - self.reconnect_to_initial_nodes(), + ClusterConnInner::reconnect_to_initial_nodes(self.inner.clone()), ))); } } @@ -2441,8 +2483,20 @@ async fn calculate_topology_from_random_nodes<'a, C>( where C: ConnectionLike + Connect + Clone + Send + Sync + 'static, { - let requested_nodes = - read_guard.random_connections(num_of_nodes_to_query, ConnectionType::PreferManagement); + let requested_nodes = match read_guard + .random_connections(num_of_nodes_to_query, ConnectionType::PreferManagement) + { + Some(random_conns) => random_conns, + None => { + return ( + Err(RedisError::from(( + ErrorKind::AllConnectionsUnavailable, + "No available connections to refresh slots from", + ))), + vec![], + ) + } + }; let topology_join_results = futures::future::join_all(requested_nodes.map(|(addr, conn)| async move { let mut conn: C = conn.await; diff --git a/redis/src/types.rs b/redis/src/types.rs index a024f16a7..b47eedd13 100644 --- a/redis/src/types.rs +++ b/redis/src/types.rs @@ -118,6 +118,8 @@ pub enum ErrorKind { /// not native to the system. This is usually the case if /// the cause is another error. IoError, + /// An I/O error that is considered safe to retry as the request was not received by the server + IoErrorRetrySafe, /// An error raised that was identified on the client before execution. ClientError, /// An extension error. This is an error created by the server @@ -802,6 +804,7 @@ impl fmt::Debug for RedisError { pub(crate) enum RetryMethod { Reconnect, + ReconnectAndRetry, NoRetry, RetryImmediately, WaitAndRetry, @@ -870,6 +873,7 @@ impl RedisError { ErrorKind::CrossSlot => "cross-slot", ErrorKind::MasterDown => "master down", ErrorKind::IoError => "I/O error", + ErrorKind::IoErrorRetrySafe => "I/O error - Request wasn't received by the server", ErrorKind::ExtensionError => "extension error", ErrorKind::ClientError => "client error", ErrorKind::ReadOnly => "read-only", @@ -957,6 +961,7 @@ impl RedisError { pub fn is_unrecoverable_error(&self) -> bool { match self.retry_method() { RetryMethod::Reconnect => true, + RetryMethod::ReconnectAndRetry => true, RetryMethod::NoRetry => false, RetryMethod::RetryImmediately => false, @@ -1064,12 +1069,14 @@ impl RedisError { io::ErrorKind::PermissionDenied => RetryMethod::NoRetry, io::ErrorKind::Unsupported => RetryMethod::NoRetry, + io::ErrorKind::TimedOut => RetryMethod::NoRetry, _ => RetryMethod::RetryImmediately, }, _ => RetryMethod::RetryImmediately, }, ErrorKind::NotAllSlotsCovered => RetryMethod::NoRetry, + ErrorKind::IoErrorRetrySafe => RetryMethod::ReconnectAndRetry, } } } diff --git a/redis/tests/test_cluster_async.rs b/redis/tests/test_cluster_async.rs index b690ed87b..f64863581 100644 --- a/redis/tests/test_cluster_async.rs +++ b/redis/tests/test_cluster_async.rs @@ -2514,8 +2514,8 @@ mod cluster_async { match port { 6380 => panic!("Node should not be called"), _ => match completed.fetch_add(1, Ordering::SeqCst) { - 0..=1 => Err(Err(RedisError::from(std::io::Error::new( - std::io::ErrorKind::ConnectionReset, + 0..=1 => Err(Err(RedisError::from(( + ErrorKind::IoErrorRetrySafe, "mock-io-error", )))), _ => Err(Ok(Value::BulkString(b"123".to_vec()))), @@ -2569,6 +2569,79 @@ mod cluster_async { assert_eq!(completed.load(Ordering::SeqCst), 1); } + #[test] + fn test_async_cluster_non_retryable_io_error_should_not_retry() { + let name = "test_async_cluster_non_retryable_io_error_should_not_retry"; + let requests = atomic::AtomicUsize::new(0); + let MockEnv { + runtime, + async_connection: mut connection, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]).retries(3), + name, + move |cmd: &[u8], _port| { + respond_startup_two_nodes(name, cmd)?; + let i = requests.fetch_add(1, atomic::Ordering::SeqCst); + match i { + // Respond that the key exists on a node that does not yet have a connection: + 0 => Err(Err(RedisError::from((ErrorKind::IoError, "io-error")))), + _ => { + panic!("Expected not to be retried!") + } + } + }, + ); + runtime + .block_on(async move { + let res = cmd("INCR") + .arg("foo") + .query_async::<_, Option>(&mut connection) + .await; + assert!(res.is_err()); + let err = res.unwrap_err(); + assert!(err.is_io_error()); + Ok::<_, RedisError>(()) + }) + .unwrap(); + } + #[test] + fn test_async_cluster_retry_safe_io_error_should_be_retried() { + let name = "test_async_cluster_retry_safe_io_error_should_be_retried"; + let requests = atomic::AtomicUsize::new(0); + let MockEnv { + runtime, + async_connection: mut connection, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]).retries(3), + name, + move |cmd: &[u8], _port| { + respond_startup_two_nodes(name, cmd)?; + let i = requests.fetch_add(1, atomic::Ordering::SeqCst); + match i { + 0 => Err(Err(RedisError::from(( + ErrorKind::IoErrorRetrySafe, + "server didn't receive the request, safe to retry", + )))), + _ => Err(Ok(Value::Int(1))), + } + }, + ); + runtime + .block_on(async move { + let res = cmd("INCR") + .arg("foo") + .query_async::<_, i32>(&mut connection) + .await; + assert!(res.is_ok()); + let value = res.unwrap(); + assert_eq!(value, 1); + Ok::<_, RedisError>(()) + }) + .unwrap(); + } + #[test] fn test_async_cluster_read_from_primary() { let name = "node"; @@ -3151,10 +3224,17 @@ mod cluster_async { }; // wait for new topology discovery + let max_requests = 10; + let mut i = 0; + let mut cmd = redis::cmd("INFO"); + cmd.arg("SERVER"); loop { - let mut cmd = redis::cmd("INFO"); - cmd.arg("SERVER"); - let res = publishing_con + if i == max_requests { + panic!("Failed to recover and discover new topology"); + } + i += 1; + + if let Ok(res) = publishing_con .route_command( &cmd, RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route::new( @@ -3162,21 +3242,21 @@ mod cluster_async { SlotAddr::Master, ))), ) - .await; - assert!(res.is_ok()); - let res = res.unwrap(); - match res { - Value::VerbatimString { format: _, text } => { - if text.contains(format!("tcp_port:{}", last_server_port).as_str()) { - // new topology rediscovered - break; + .await + { + match res { + Value::VerbatimString { format: _, text } => { + if text.contains(format!("tcp_port:{}", last_server_port).as_str()) { + // new topology rediscovered + break; + } + } + _ => { + panic!("Wrong return type for INFO SERVER command: {:?}", res); } } - _ => { - panic!("Wrong return type for INFO SERVER command: {:?}", res); - } + sleep(futures_time::time::Duration::from_secs(1)).await; } - sleep(futures_time::time::Duration::from_secs(1)).await; } // sleep for one one cycle of topology refresh @@ -3214,8 +3294,9 @@ mod cluster_async { } if use_sharded { + let mut cmd = redis::cmd("SPUBLISH"); // validate SPUBLISH - let result = cmd("SPUBLISH") + let result = cmd .arg("test_channel_?") .arg("test_message") .query_async(&mut publishing_con) @@ -3714,9 +3795,26 @@ mod cluster_async { false, ); - let result = connection.req_packed_command(&cmd).await.unwrap(); - assert_eq!(result, Value::SimpleString("PONG".to_string())); - Ok::<_, RedisError>(()) + let max_requests = 5; + let mut i = 0; + let mut last_err = None; + loop { + if i == max_requests { + break; + } + i += 1; + match connection.req_packed_command(&cmd).await { + Ok(result) => { + assert_eq!(result, Value::SimpleString("PONG".to_string())); + return Ok::<_, RedisError>(()); + } + Err(err) => { + last_err = Some(err); + let _ = sleep(futures_time::time::Duration::from_secs(1)).await; + } + } + } + panic!("Failed to recover after all nodes went down. Last error: {last_err:?}"); }) .unwrap(); } @@ -3742,19 +3840,37 @@ mod cluster_async { ); let cmd = cmd("PING"); - // explicitly route to all primaries and request all succeeded - let result = connection - .route_command( - &cmd, - RoutingInfo::MultiNode(( - MultipleNodeRoutingInfo::AllMasters, - Some(redis::cluster_routing::ResponsePolicy::AllSucceeded), - )), - ) - .await; - assert!(result.is_ok()); - Ok::<_, RedisError>(()) + let max_requests = 5; + let mut i = 0; + let mut last_err = None; + loop { + if i == max_requests { + break; + } + i += 1; + // explicitly route to all primaries and request all succeeded + match connection + .route_command( + &cmd, + RoutingInfo::MultiNode(( + MultipleNodeRoutingInfo::AllMasters, + Some(redis::cluster_routing::ResponsePolicy::AllSucceeded), + )), + ) + .await + { + Ok(result) => { + assert_eq!(result, Value::SimpleString("PONG".to_string())); + return Ok::<_, RedisError>(()); + } + Err(err) => { + last_err = Some(err); + let _ = sleep(futures_time::time::Duration::from_secs(1)).await; + } + } + } + panic!("Failed to recover after all nodes went down. Last error: {last_err:?}"); }) .unwrap(); } @@ -3825,7 +3941,10 @@ mod cluster_async { if connect_attempt > 5 { panic!("Too many pings!"); } - Err(Err(broken_pipe_error())) + Err(Err(RedisError::from(( + ErrorKind::IoErrorRetrySafe, + "mock-io-error", + )))) } else { respond_startup_two_nodes(name, cmd)?; let past_get_attempts = get_attempts.fetch_add(1, Ordering::Relaxed); @@ -3833,7 +3952,10 @@ mod cluster_async { if past_get_attempts == 0 { // Error once with io-error, ensure connection is reestablished w/out calling // other node (i.e., not doing a full slot rebuild) - Err(Err(broken_pipe_error())) + Err(Err(RedisError::from(( + ErrorKind::IoErrorRetrySafe, + "mock-io-error", + )))) } else { Err(Ok(Value::BulkString(b"123".to_vec()))) } @@ -3936,21 +4058,23 @@ mod cluster_async { } async fn kill_connection(killer_connection: &mut ClusterConnection, connection_to_kill: &str) { + let default_routing = RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode( + Route::new(0, SlotAddr::Master), + )); + kill_connection_with_routing(killer_connection, connection_to_kill, default_routing).await; + } + + async fn kill_connection_with_routing( + killer_connection: &mut ClusterConnection, + connection_to_kill: &str, + routing: RoutingInfo, + ) { let mut cmd = redis::cmd("CLIENT"); cmd.arg("KILL"); cmd.arg("ID"); cmd.arg(connection_to_kill); - // Kill the management connection in the primary node that holds slot 0 - assert!(killer_connection - .route_command( - &cmd, - RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route::new( - 0, - SlotAddr::Master, - )),), - ) - .await - .is_ok()); + // Kill the management connection for the routing node + assert!(killer_connection.route_command(&cmd, routing).await.is_ok()); } #[test]