From 7805814c44b44591a026f6ef0c7f67daaf75f3f4 Mon Sep 17 00:00:00 2001 From: barshaul Date: Wed, 31 Jul 2024 14:26:32 +0000 Subject: [PATCH 1/3] Refactored the connections map from HashMap to DashMap to minimize write lock contention on the connection container --- Cargo.lock | 15 ++++ redis/Cargo.toml | 5 +- .../cluster_async/connections_container.rs | 85 ++++++++++--------- redis/src/cluster_async/mod.rs | 23 ++--- 4 files changed, 77 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ce94dfeaa..a6a8c9b7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -652,6 +652,20 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" +[[package]] +name = "dashmap" +version = "6.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "804c8821570c3f8b70230c2ba75ffa5c0f9a4189b9a432b6656c536712acae28" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.3", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "derivative" version = "2.2.0" @@ -1637,6 +1651,7 @@ dependencies = [ "combine", "crc16", "criterion", + "dashmap", "derivative", "dispose", "fast-math", diff --git a/redis/Cargo.toml b/redis/Cargo.toml index 0d836a698..d9711e695 100644 --- a/redis/Cargo.toml +++ b/redis/Cargo.toml @@ -63,6 +63,9 @@ crc16 = { version = "0.4", optional = true } rand = { version = "0.8", optional = true } derivative = { version = "2.2.0", optional = true } +# Only needed for async cluster +dashmap = { version = "6.0.1", optional = true } + # Only needed for async_std support async-std = { version = "1.8.0", optional = true } async-trait = { version = "0.1.24", optional = true } @@ -124,7 +127,7 @@ tokio-native-tls-comp = ["tokio-comp", "tls-native-tls", "tokio-native-tls"] tokio-rustls-comp = ["tokio-comp", "tls-rustls", "tokio-rustls"] connection-manager = ["futures", "aio", "tokio-retry"] streams = [] -cluster-async = ["cluster", "futures", "futures-util"] +cluster-async = ["cluster", "futures", "futures-util", "dashmap"] keep-alive = ["socket2"] sentinel = ["rand"] tcp_nodelay = [] diff --git a/redis/src/cluster_async/connections_container.rs b/redis/src/cluster_async/connections_container.rs index 2a438aa77..8e8e83bbe 100644 --- a/redis/src/cluster_async/connections_container.rs +++ b/redis/src/cluster_async/connections_container.rs @@ -1,14 +1,13 @@ use crate::cluster_async::ConnectionFuture; +use crate::cluster_routing::{Route, SlotAddr}; +use crate::cluster_slotmap::{ReadFromReplicaStrategy, SlotMap, SlotMapValue}; +use crate::cluster_topology::TopologyHash; use arcstr::ArcStr; +use dashmap::DashMap; use futures::FutureExt; use rand::seq::IteratorRandom; -use std::collections::HashMap; use std::net::IpAddr; -use crate::cluster_routing::{Route, SlotAddr}; -use crate::cluster_slotmap::{ReadFromReplicaStrategy, SlotMap, SlotMapValue}; -use crate::cluster_topology::TopologyHash; - /// A struct that encapsulates a network connection along with its associated IP address. #[derive(Clone, Eq, PartialEq, Debug)] pub struct ConnectionWithIp { @@ -86,11 +85,12 @@ pub(crate) enum ConnectionType { PreferManagement, } -pub(crate) struct ConnectionsMap(pub(crate) HashMap>); +pub(crate) struct ConnectionsMap(pub(crate) DashMap>); impl std::fmt::Display for ConnectionsMap { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - for (address, node) in self.0.iter() { + for item in self.0.iter() { + let (address, node) = (item.key(), item.value()); match node.user_connection.ip { Some(ip) => writeln!(f, "{address} - {ip}")?, None => writeln!(f, "{address}")?, @@ -101,7 +101,7 @@ impl std::fmt::Display for ConnectionsMap { } pub(crate) struct ConnectionsContainer { - connection_map: HashMap>, + connection_map: DashMap>, pub(crate) slot_map: SlotMap, read_from_replica_strategy: ReadFromReplicaStrategy, topology_hash: TopologyHash, @@ -213,9 +213,10 @@ where pub(crate) fn all_node_connections( &self, ) -> impl Iterator> + '_ { - self.connection_map - .iter() - .map(move |(address, node)| (address.clone(), node.user_connection.conn.clone())) + self.connection_map.iter().map(move |item| { + let (node, address) = (item.key(), item.value()); + (node.clone(), address.user_connection.conn.clone()) + }) } pub(crate) fn all_primary_connections( @@ -228,16 +229,19 @@ where } pub(crate) fn node_for_address(&self, address: &str) -> Option> { - self.connection_map.get(address).cloned() + self.connection_map + .get(address) + .map(|item| item.value().clone()) } pub(crate) fn connection_for_address( &self, address: &str, ) -> Option> { - self.connection_map - .get_key_value(address) - .map(|(address, conn)| (address.clone(), conn.user_connection.conn.clone())) + self.connection_map.get(address).map(|item| { + let (address, conn) = (item.key(), item.value()); + (address.clone(), conn.user_connection.conn.clone()) + }) } pub(crate) fn random_connections( @@ -249,14 +253,15 @@ where .iter() .choose_multiple(&mut rand::thread_rng(), amount) .into_iter() - .map(move |(address, node)| { + .map(move |item| { + let (address, node) = (item.key(), item.value()); let conn = node.get_connection(&conn_type); (address.clone(), conn) }) } pub(crate) fn replace_or_add_connection_for_address( - &mut self, + &self, address: impl Into, node: ClusterNode, ) -> ArcStr { @@ -265,8 +270,10 @@ where address } - pub(crate) fn remove_node(&mut self, address: &ArcStr) -> Option> { - self.connection_map.remove(address) + pub(crate) fn remove_node(&self, address: &ArcStr) -> Option> { + self.connection_map + .remove(address) + .map(|(_key, value)| value) } pub(crate) fn len(&self) -> usize { @@ -302,13 +309,13 @@ mod tests { } } } - fn remove_nodes(container: &mut ConnectionsContainer, addresss: &[&str]) { + fn remove_nodes(container: &ConnectionsContainer, addresss: &[&str]) { for address in addresss { container.remove_node(&(*address).into()); } } - fn remove_all_connections(container: &mut ConnectionsContainer) { + fn remove_all_connections(container: &ConnectionsContainer) { remove_nodes( container, &[ @@ -366,7 +373,7 @@ mod tests { ], ReadFromReplicaStrategy::AlwaysFromPrimary, // this argument shouldn't matter, since we overload the RFR strategy. ); - let mut connection_map = HashMap::new(); + let connection_map = DashMap::new(); connection_map.insert( "primary1".into(), create_cluster_node(1, use_management_connections), @@ -514,7 +521,7 @@ mod tests { #[test] fn get_replica_connection_for_replica_route_if_some_but_not_all_replicas_were_removed() { - let mut container = create_container(); + let container = create_container(); container.remove_node(&"replica3-2".into()); assert_eq!( @@ -540,8 +547,8 @@ mod tests { #[test] fn get_primary_connection_for_replica_route_if_all_replicas_were_removed() { - let mut container = create_container(); - remove_nodes(&mut container, &["replica2-1", "replica3-1", "replica3-2"]); + let container = create_container(); + remove_nodes(&container, &["replica2-1", "replica3-1", "replica3-2"]); assert_eq!( 2, @@ -593,7 +600,7 @@ mod tests { #[test] fn get_connection_by_address_returns_none_if_connection_was_removed() { - let mut container = create_container(); + let container = create_container(); container.remove_node(&"primary1".into()); assert!(container.connection_for_address("primary1").is_none()); @@ -601,7 +608,7 @@ mod tests { #[test] fn get_connection_by_address_returns_added_connection() { - let mut container = create_container(); + let container = create_container(); let address = container.replace_or_add_connection_for_address( "foobar", ClusterNode::new_only_with_user_conn(4), @@ -630,8 +637,8 @@ mod tests { #[test] fn get_random_connections_returns_none_if_all_connections_were_removed() { - let mut container = create_container(); - remove_all_connections(&mut container); + let container = create_container(); + remove_all_connections(&container); assert_eq!( 0, @@ -643,8 +650,8 @@ mod tests { #[test] fn get_random_connections_returns_added_connection() { - let mut container = create_container(); - remove_all_connections(&mut container); + let container = create_container(); + remove_all_connections(&container); let address = container.replace_or_add_connection_for_address( "foobar", ClusterNode::new_only_with_user_conn(4), @@ -694,7 +701,7 @@ mod tests { #[test] fn get_all_user_connections_returns_added_connection() { - let mut container = create_container(); + let container = create_container(); container.replace_or_add_connection_for_address( "foobar", ClusterNode::new_only_with_user_conn(4), @@ -711,7 +718,7 @@ mod tests { #[test] fn get_all_user_connections_does_not_return_removed_connection() { - let mut container = create_container(); + let container = create_container(); container.remove_node(&"primary1".into()); let mut connections: Vec<_> = container @@ -738,7 +745,7 @@ mod tests { #[test] fn get_all_primaries_does_not_return_removed_connection() { - let mut container = create_container(); + let container = create_container(); container.remove_node(&"primary1".into()); let mut connections: Vec<_> = container @@ -752,7 +759,7 @@ mod tests { #[test] fn len_is_adjusted_on_removals_and_additions() { - let mut container = create_container(); + let container = create_container(); assert_eq!(container.len(), 6); @@ -769,7 +776,7 @@ mod tests { #[test] fn len_is_not_adjusted_on_removals_of_nonexisting_connections_or_additions_of_existing_connections( ) { - let mut container = create_container(); + let container = create_container(); assert_eq!(container.len(), 6); @@ -785,7 +792,7 @@ mod tests { #[test] fn remove_node_returns_connection_if_it_exists() { - let mut container = create_container(); + let container = create_container(); let connection = container.remove_node(&"primary1".into()); assert_eq!(connection, Some(ClusterNode::new_only_with_user_conn(1))); @@ -796,7 +803,7 @@ mod tests { #[test] fn test_is_empty() { - let mut container = create_container(); + let container = create_container(); assert!(!container.is_empty()); container.remove_node(&"primary1".into()); @@ -829,7 +836,7 @@ mod tests { #[test] fn is_primary_returns_false_for_removed_node() { - let mut container = create_container(); + let container = create_container(); let address = "primary1".into(); container.remove_node(&address); diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index 0a4d56e99..03ff87180 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -39,6 +39,7 @@ use crate::{ }; #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] use async_std::task::{spawn, JoinHandle}; +use dashmap::DashMap; #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] use futures::executor::block_on; use std::{ @@ -1090,10 +1091,10 @@ where .buffer_unordered(initial_nodes.len()) .fold( ( - ConnectionsMap(HashMap::with_capacity(initial_nodes.len())), + ConnectionsMap(DashMap::with_capacity(initial_nodes.len())), None, ), - |mut connections: (ConnectionMap, Option), addr_conn_res| async move { + |connections: (ConnectionMap, Option), addr_conn_res| async move { match addr_conn_res { Ok((addr, node)) => { connections.0 .0.insert(addr.into(), node); @@ -1156,14 +1157,14 @@ where conn_type: RefreshConnectionType, ) { info!("Started refreshing connections to {:?}", addresses); - let mut connections_container = inner.conn_lock.write().await; + let connections_container = inner.conn_lock.read().await; let cluster_params = &inner.cluster_params; let subscriptions_by_address = &inner.subscriptions_by_address; let push_sender = &inner.push_sender; stream::iter(addresses.into_iter()) .fold( - &mut *connections_container, + &*connections_container, |connections_container, address| async move { let node_option = connections_container.remove_node(&address); @@ -1476,12 +1477,12 @@ where drop(subs_by_address_guard); if !addrs_to_refresh.is_empty() { - let mut conns_write_guard = inner.conn_lock.write().await; + let conns_read_guard = inner.conn_lock.read().await; // have to remove or otherwise the refresh_connection wont trigger node recreation for addr_to_refresh in addrs_to_refresh.iter() { - conns_write_guard.remove_node(addr_to_refresh); + conns_read_guard.remove_node(addr_to_refresh); } - drop(conns_write_guard); + drop(conns_read_guard); // immediately trigger connection reestablishment Self::refresh_connections( inner.clone(), @@ -1609,8 +1610,8 @@ where .await; let new_connections: ConnectionMap = stream::iter(addresses_and_connections_iter) .fold( - ConnectionsMap(HashMap::with_capacity(nodes_len)), - |mut connections, (addr, node)| async { + ConnectionsMap(DashMap::with_capacity(nodes_len)), + |connections, (addr, node)| async { let mut cluster_params = inner.cluster_params.clone(); let subs_guard = inner.subscriptions_by_address.read().await; cluster_params.pubsub_subscriptions = @@ -1923,7 +1924,7 @@ where { Ok(node) => { let connection_clone = node.user_connection.conn.clone().await; - let mut connections = core.conn_lock.write().await; + let connections = core.conn_lock.read().await; let address = connections.replace_or_add_connection_for_address(addr, node); drop(connections); (address, connection_clone) @@ -1993,7 +1994,7 @@ where if !is_primary { // If the connection is a replica, remove the connection and retry. // The connection will be established again on the next call to refresh slots once the replica is no longer in loading state. - core.conn_lock.write().await.remove_node(&address); + core.conn_lock.read().await.remove_node(&address); } else { // If the connection is primary, just sleep and retry let sleep_duration = core.cluster_params.retry_params.wait_time_for_retry(retry); From ac45f15dca737acc11a3e15354e9d3dcdbe8d9f2 Mon Sep 17 00:00:00 2001 From: barshaul Date: Mon, 5 Aug 2024 14:09:10 +0000 Subject: [PATCH 2/3] Addressing PR comments --- redis/Cargo.toml | 2 +- redis/src/cluster_async/connections_container.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/redis/Cargo.toml b/redis/Cargo.toml index d9711e695..fd79ff079 100644 --- a/redis/Cargo.toml +++ b/redis/Cargo.toml @@ -64,7 +64,7 @@ rand = { version = "0.8", optional = true } derivative = { version = "2.2.0", optional = true } # Only needed for async cluster -dashmap = { version = "6.0.1", optional = true } +dashmap = { version = "6.0", optional = true } # Only needed for async_std support async-std = { version = "1.8.0", optional = true } diff --git a/redis/src/cluster_async/connections_container.rs b/redis/src/cluster_async/connections_container.rs index 8e8e83bbe..98391edd9 100644 --- a/redis/src/cluster_async/connections_container.rs +++ b/redis/src/cluster_async/connections_container.rs @@ -309,8 +309,8 @@ mod tests { } } } - fn remove_nodes(container: &ConnectionsContainer, addresss: &[&str]) { - for address in addresss { + fn remove_nodes(container: &ConnectionsContainer, addresses: &[&str]) { + for address in addresses { container.remove_node(&(*address).into()); } } From 39c00aa550441135441591129efba8d22422fbc8 Mon Sep 17 00:00:00 2001 From: barshaul Date: Mon, 5 Aug 2024 14:35:15 +0000 Subject: [PATCH 3/3] Replaced the connection map key from ArcStr to String --- .../cluster_async/connections_container.rs | 15 ++++--- redis/src/cluster_async/mod.rs | 40 +++++++++---------- 2 files changed, 26 insertions(+), 29 deletions(-) diff --git a/redis/src/cluster_async/connections_container.rs b/redis/src/cluster_async/connections_container.rs index 98391edd9..e0b2fc87d 100644 --- a/redis/src/cluster_async/connections_container.rs +++ b/redis/src/cluster_async/connections_container.rs @@ -2,7 +2,6 @@ use crate::cluster_async::ConnectionFuture; use crate::cluster_routing::{Route, SlotAddr}; use crate::cluster_slotmap::{ReadFromReplicaStrategy, SlotMap, SlotMapValue}; use crate::cluster_topology::TopologyHash; -use arcstr::ArcStr; use dashmap::DashMap; use futures::FutureExt; use rand::seq::IteratorRandom; @@ -85,7 +84,7 @@ pub(crate) enum ConnectionType { PreferManagement, } -pub(crate) struct ConnectionsMap(pub(crate) DashMap>); +pub(crate) struct ConnectionsMap(pub(crate) DashMap>); impl std::fmt::Display for ConnectionsMap { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -101,7 +100,7 @@ impl std::fmt::Display for ConnectionsMap { } pub(crate) struct ConnectionsContainer { - connection_map: DashMap>, + connection_map: DashMap>, pub(crate) slot_map: SlotMap, read_from_replica_strategy: ReadFromReplicaStrategy, topology_hash: TopologyHash, @@ -118,7 +117,7 @@ impl Default for ConnectionsContainer { } } -pub(crate) type ConnectionAndAddress = (ArcStr, Connection); +pub(crate) type ConnectionAndAddress = (String, Connection); impl ConnectionsContainer where @@ -139,7 +138,7 @@ where } /// Returns true if the address represents a known primary node. - pub(crate) fn is_primary(&self, address: &ArcStr) -> bool { + pub(crate) fn is_primary(&self, address: &String) -> bool { self.connection_for_address(address).is_some() && self .slot_map @@ -262,15 +261,15 @@ where pub(crate) fn replace_or_add_connection_for_address( &self, - address: impl Into, + address: impl Into, node: ClusterNode, - ) -> ArcStr { + ) -> String { let address = address.into(); self.connection_map.insert(address.clone(), node); address } - pub(crate) fn remove_node(&self, address: &ArcStr) -> Option> { + pub(crate) fn remove_node(&self, address: &String) -> Option> { self.connection_map .remove(address) .map(|(_key, value)| value) diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index 03ff87180..55e1aeff8 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -82,7 +82,6 @@ use std::time::Duration; #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] use crate::aio::{async_std::AsyncStd, RedisRuntime}; -use arcstr::ArcStr; #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] use backoff_std_async::future::retry; #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] @@ -382,7 +381,7 @@ pub(crate) struct InnerCore { slot_refresh_state: SlotRefreshState, initial_nodes: Vec, push_sender: Option>, - subscriptions_by_address: RwLock>, + subscriptions_by_address: RwLock>, unassigned_subscriptions: RwLock, } @@ -517,7 +516,7 @@ pub(crate) enum InternalSingleNodeRouting { SpecificNode(Route), ByAddress(String), Connection { - address: ArcStr, + address: String, conn: ConnectionFuture, }, Redirect { @@ -614,14 +613,14 @@ pub(crate) enum Response { } pub(crate) enum OperationTarget { - Node { address: ArcStr }, + Node { address: String }, FanOut, NotFound, } type OperationResult = Result; -impl From for OperationTarget { - fn from(address: ArcStr) -> Self { +impl From for OperationTarget { + fn from(address: String) -> Self { OperationTarget::Node { address } } } @@ -762,12 +761,12 @@ enum Next { }, RetryBusyLoadingError { request: PendingRequest, - address: ArcStr, + address: String, }, Reconnect { // if not set, then a reconnect should happen without sending a request afterwards request: Option>, - target: ArcStr, + target: String, }, RefreshSlots { // if not set, then a slot refresh should happen without sending a request afterwards @@ -944,7 +943,7 @@ impl Request { } enum ConnectionCheck { - Found((ArcStr, ConnectionFuture)), + Found((String, ConnectionFuture)), OnlyAddress(String), RandomConnection, } @@ -1097,7 +1096,7 @@ where |connections: (ConnectionMap, Option), addr_conn_res| async move { match addr_conn_res { Ok((addr, node)) => { - connections.0 .0.insert(addr.into(), node); + connections.0 .0.insert(addr, node); (connections.0, None) } Err(e) => (connections.0, Some(e.to_string())), @@ -1153,7 +1152,7 @@ where async fn refresh_connections( inner: Arc>, - addresses: Vec, + addresses: Vec, conn_type: RefreshConnectionType, ) { info!("Started refreshing connections to {:?}", addresses); @@ -1201,7 +1200,7 @@ where } async fn aggregate_results( - receivers: Vec<(Option, oneshot::Receiver>)>, + receivers: Vec<(Option, oneshot::Receiver>)>, routing: &MultipleNodeRoutingInfo, response_policy: Option, ) -> RedisResult { @@ -1405,7 +1404,7 @@ where return; } - let mut addrs_to_refresh: HashSet = HashSet::new(); + let mut addrs_to_refresh: HashSet = HashSet::new(); let mut subs_by_address_guard = inner.subscriptions_by_address.write().await; let mut unassigned_subs_guard = inner.unassigned_subscriptions.write().await; let conns_read_guard = inner.conn_lock.read().await; @@ -1614,8 +1613,7 @@ where |connections, (addr, node)| async { let mut cluster_params = inner.cluster_params.clone(); let subs_guard = inner.subscriptions_by_address.read().await; - cluster_params.pubsub_subscriptions = - subs_guard.get(&ArcStr::from(addr.as_str())).cloned(); + cluster_params.pubsub_subscriptions = subs_guard.get(addr).cloned(); drop(subs_guard); let node = get_or_create_conn( addr, @@ -1674,7 +1672,7 @@ where Item = Option<(Arc, ConnectionAndAddress>)>, >, ) -> ( - Vec<(Option, Receiver>)>, + Vec<(Option, Receiver>)>, Vec>>, ) { iterator @@ -1786,7 +1784,7 @@ where pipeline: Arc, offset: usize, count: usize, - conn: impl Future>, + conn: impl Future>, ) -> OperationResult { trace!("try_pipeline_request"); let (address, mut conn) = conn.await.map_err(|err| (OperationTarget::NotFound, err))?; @@ -1834,7 +1832,7 @@ where routing: InternalSingleNodeRouting, core: Core, cmd: Option>, - ) -> RedisResult<(ArcStr, C)> { + ) -> RedisResult<(String, C)> { let read_guard = core.conn_lock.read().await; let mut asking = false; @@ -1986,7 +1984,7 @@ where async fn handle_loading_error( core: Core, info: RequestInfo, - address: ArcStr, + address: String, retry: u32, ) -> OperationResult { let is_primary = core.conn_lock.read().await.is_primary(&address); @@ -2138,7 +2136,7 @@ where enum PollFlushAction { None, RebuildSlots, - Reconnect(Vec), + Reconnect(Vec), ReconnectFromInitialConnections, } @@ -2270,7 +2268,7 @@ async fn calculate_topology_from_random_nodes<'a, C>( crate::cluster_slotmap::SlotMap, crate::cluster_topology::TopologyHash, )>, - Vec, + Vec, ) where C: ConnectionLike + Connect + Clone + Send + Sync + 'static,