From 30b35f293a06ef1361bb841dd25a089af06c6d37 Mon Sep 17 00:00:00 2001 From: GilboaAWS Date: Mon, 6 Jan 2025 11:13:47 +0000 Subject: [PATCH] Fixing bug - Refresh addresses started to take place Signed-off-by: GilboaAWS --- .../redis-rs/redis/src/cluster_async/mod.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index b2a00026d7..7bda70fbb4 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -1405,7 +1405,7 @@ where .conn_lock .read() .expect(MUTEX_READ_ERR) - .refresh_addresses_done + .refresh_addresses_started .insert(address_clone_for_task.clone()); let mut cluster_params = inner_clone @@ -1443,6 +1443,10 @@ where // This strategy effectively creates a synchronization point at the beginning of poll_flush, where the connection map is // updated atomically for the next batch of operations. This approach balances the need for up-to-date connection information // with the requirement for consistent request handling within each processing cycle. + debug!( + "Succeeded to refresh connection for node {}.", + address_clone_for_task + ); let connection_container = inner_clone.conn_lock.read().expect(MUTEX_READ_ERR); if let Some(mut refresh_state) = connection_container @@ -2434,6 +2438,7 @@ where } fn update_refreshed_connection(&mut self) { + trace!("update_refreshed_connection started"); loop { let connections_container = self.inner.conn_lock.read().expect(MUTEX_WRITE_ERR); @@ -2464,17 +2469,17 @@ where if let Some(mut refresh_state) = connections_container.refresh_operations.get_mut(&address) { - info!( - "update_refreshed_connection: Update conn for addr: {}", - address - ); - // Take the node_conn out of RefreshState, replacing it with None if let Some(node_conn) = mem::take(&mut refresh_state.node_conn) { - info!("update_refreshed_connection: replacing/adding the conn"); + debug!( + "update_refreshed_connection: replacing/adding the connection: {}", + address + ); // Move the node_conn to the function connections_container .replace_or_add_connection_for_address(address.clone(), node_conn); + } else { + debug!("update_refreshed_connection: reconnection failed, no connection for address: {}", address); } } // Remove this entry from refresh_ops_map