Skip to content

Commit

Permalink
[redis-rs][core] Move connection refresh to the background
Browse files Browse the repository at this point in the history
Signed-off-by: GilboaAWS <[email protected]>
  • Loading branch information
GilboaAWS committed Jan 5, 2025
1 parent ffc679a commit bf2cbe2
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ use crate::cluster_async::ConnectionFuture;
use crate::cluster_routing::{Route, ShardAddrs, SlotAddr};
use crate::cluster_slotmap::{ReadFromReplicaStrategy, SlotMap, SlotMapValue};
use crate::cluster_topology::TopologyHash;
use dashmap::DashMap;
use dashmap::{DashMap, DashSet};
use futures::FutureExt;
use rand::seq::IteratorRandom;
use std::net::IpAddr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use telemetrylib::Telemetry;

use tokio::task::JoinHandle;

/// Count the number of connections in a connections_map object
macro_rules! count_connections {
($conn_map:expr) => {{
Expand Down Expand Up @@ -121,6 +123,12 @@ pub(crate) enum ConnectionType {

pub(crate) struct ConnectionsMap<Connection>(pub(crate) DashMap<String, ClusterNode<Connection>>);

pub(crate) struct RefreshState<Connection> {
pub handle: JoinHandle<()>, // The currect running refresh task
pub node_conn: Option<ClusterNode<Connection>> // The refreshed connection after the task is done
}


impl<Connection> std::fmt::Display for ConnectionsMap<Connection> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for item in self.0.iter() {
Expand All @@ -139,6 +147,14 @@ pub(crate) struct ConnectionsContainer<Connection> {
pub(crate) slot_map: SlotMap,
read_from_replica_strategy: ReadFromReplicaStrategy,
topology_hash: TopologyHash,


// Holds all the failed addresses that started a refresh task.
pub(crate) refresh_addresses_started: DashSet<String>,
// Follow the refresh ops on the connections
pub(crate) refresh_operations: DashMap<String, RefreshState<Connection>>,
// Holds all the refreshed addresses that are ready to be inserted into the connection_map
pub(crate) refresh_addresses_done: DashSet<String>,
}

impl<Connection> Drop for ConnectionsContainer<Connection> {
Expand All @@ -155,6 +171,9 @@ impl<Connection> Default for ConnectionsContainer<Connection> {
slot_map: Default::default(),
read_from_replica_strategy: ReadFromReplicaStrategy::AlwaysFromPrimary,
topology_hash: 0,
refresh_addresses_started: DashSet::new(),
refresh_operations: DashMap::new(),
refresh_addresses_done: DashSet::new(),
}
}
}
Expand Down Expand Up @@ -182,6 +201,9 @@ where
slot_map,
read_from_replica_strategy,
topology_hash,
refresh_addresses_started: DashSet::new(),
refresh_operations: DashMap::new(),
refresh_addresses_done: DashSet::new(),
}
}

Expand Down Expand Up @@ -572,6 +594,9 @@ mod tests {
connection_map,
read_from_replica_strategy: ReadFromReplicaStrategy::AZAffinity("use-1a".to_string()),
topology_hash: 0,
refresh_addresses_started: DashSet::new(),
refresh_operations: DashMap::new(),
refresh_addresses_done: DashSet::new(),
}
}

Expand Down Expand Up @@ -628,6 +653,9 @@ mod tests {
connection_map,
read_from_replica_strategy: strategy,
topology_hash: 0,
refresh_addresses_started: DashSet::new(),
refresh_operations: DashMap::new(),
refresh_addresses_done: DashSet::new(),
}
}

Expand Down
Loading

0 comments on commit bf2cbe2

Please sign in to comment.