Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core + Py: add allow_non_covered_slots to ClusterScan and related commands #2814

Merged
merged 4 commits into from
Dec 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 37 additions & 113 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,13 @@ pub mod testing {
use crate::{
client::GlideConnectionOptions,
cluster_routing::{Routable, RoutingInfo, ShardUpdateResult},
cluster_slotmap::SlotMap,
cluster_topology::{
calculate_topology, get_slot, SlotRefreshState, DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES,
DEFAULT_REFRESH_SLOTS_RETRY_BASE_DURATION_MILLIS, DEFAULT_REFRESH_SLOTS_RETRY_BASE_FACTOR,
SLOT_SIZE,
},
cmd,
commands::cluster_scan::{cluster_scan, ClusterScanArgs, ObjectType, ScanStateRC},
FromRedisValue, InfoDict, ToRedisArgs,
commands::cluster_scan::{cluster_scan, ClusterScanArgs, ScanStateRC},
FromRedisValue, InfoDict,
};
use dashmap::DashMap;
use std::{
Expand Down Expand Up @@ -111,7 +109,7 @@ use crate::types::RetryMethod;

pub(crate) const MUTEX_READ_ERR: &str = "Failed to obtain read lock. Poisoned mutex?";
const MUTEX_WRITE_ERR: &str = "Failed to obtain write lock. Poisoned mutex?";
/// This represents an async Redis Cluster connection. It stores the
/// This represents an async Cluster connection. It stores the
/// underlying connections maintained for each node in the cluster, as well
/// as common parameters for connecting to nodes and executing commands.
#[derive(Clone)]
Expand Down Expand Up @@ -142,79 +140,18 @@ where
})
}

/// Special handling for `SCAN` command, using `cluster_scan`.
/// If you wish to use a match pattern, use [`cluster_scan_with_pattern`].
/// Perform a `SCAN` command on a Redis cluster, using scan state object in order to handle changes in topology
/// and make sure that all keys that were in the cluster from start to end of the scan are scanned.
/// In order to make sure all keys in the cluster scanned, topology refresh occurs more frequently and may affect performance.
///
/// # Arguments
///
/// * `scan_state_rc` - A reference to the scan state, For initiating new scan send [`ScanStateRC::new()`],
/// for each subsequent iteration use the returned [`ScanStateRC`].
/// * `count` - An optional count of keys requested,
/// the amount returned can vary and not obligated to return exactly count.
/// * `object_type` - An optional [`ObjectType`] enum of requested key redis type.
///
/// # Returns
///
/// A [`ScanStateRC`] for the updated state of the scan and the vector of keys that were found in the scan.
/// structure of returned value:
/// `Ok((ScanStateRC, Vec<Value>))`
///
/// When the scan is finished [`ScanStateRC`] will be None, and can be checked by calling `scan_state_wrapper.is_finished()`.
///
/// # Example
/// ```rust,no_run
/// use redis::cluster::ClusterClient;
/// use redis::{ScanStateRC, FromRedisValue, from_redis_value, Value, ObjectType};
///
/// async fn scan_all_cluster() -> Vec<String> {
/// let nodes = vec!["redis://127.0.0.1/"];
/// let client = ClusterClient::new(nodes).unwrap();
/// let mut connection = client.get_async_connection(None).await.unwrap();
/// let mut scan_state_rc = ScanStateRC::new();
/// let mut keys: Vec<String> = vec![];
/// loop {
/// let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) =
/// connection.cluster_scan(scan_state_rc, None, None).await.unwrap();
/// scan_state_rc = next_cursor;
/// let mut scan_keys = scan_keys
/// .into_iter()
/// .map(|v| from_redis_value(&v).unwrap())
/// .collect::<Vec<String>>(); // Change the type of `keys` to `Vec<String>`
/// keys.append(&mut scan_keys);
/// if scan_state_rc.is_finished() {
/// break;
/// }
/// }
/// keys
/// }
/// ```
pub async fn cluster_scan(
&mut self,
scan_state_rc: ScanStateRC,
count: Option<usize>,
object_type: Option<ObjectType>,
) -> RedisResult<(ScanStateRC, Vec<Value>)> {
let cluster_scan_args = ClusterScanArgs::new(scan_state_rc, None, count, object_type);
self.route_cluster_scan(cluster_scan_args).await
}

/// Special handling for `SCAN` command, using `cluster_scan_with_pattern`.
/// It is a special case of [`cluster_scan`], with an additional match pattern.
/// Perform a `SCAN` command on a Redis cluster, using scan state object in order to handle changes in topology
/// Perform a `SCAN` command on a cluster, using scan state object in order to handle changes in topology
/// and make sure that all keys that were in the cluster from start to end of the scan are scanned.
/// In order to make sure all keys in the cluster scanned, topology refresh occurs more frequently and may affect performance.
///
/// # Arguments
///
/// * `scan_state_rc` - A reference to the scan state, For initiating new scan send [`ScanStateRC::new()`],
/// for each subsequent iteration use the returned [`ScanStateRC`].
/// * `match_pattern` - A match pattern of requested keys.
/// * `count` - An optional count of keys requested,
/// the amount returned can vary and not obligated to return exactly count.
/// * `object_type` - An optional [`ObjectType`] enum of requested key redis type.
/// * `cluster_scan_args` - A [`ClusterScanArgs`] struct containing the arguments for the cluster scan command - match pattern, count,
/// object type and the allow_non_covered_slots flag.
///
/// # Returns
///
Expand All @@ -227,17 +164,18 @@ where
/// # Example
/// ```rust,no_run
/// use redis::cluster::ClusterClient;
/// use redis::{ScanStateRC, FromRedisValue, from_redis_value, Value, ObjectType};
/// use redis::{ScanStateRC, from_redis_value, Value, ObjectType, ClusterScanArgs};
///
/// async fn scan_all_cluster() -> Vec<String> {
/// let nodes = vec!["redis://127.0.0.1/"];
/// let client = ClusterClient::new(nodes).unwrap();
/// let mut connection = client.get_async_connection(None).await.unwrap();
/// let mut scan_state_rc = ScanStateRC::new();
/// let mut keys: Vec<String> = vec![];
/// let cluster_scan_args = ClusterScanArgs::builder().with_count(1000).with_object_type(ObjectType::String).build();
/// loop {
/// let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) =
/// connection.cluster_scan_with_pattern(scan_state_rc, b"my_key", None, None).await.unwrap();
/// connection.cluster_scan(scan_state_rc, cluster_scan_args.clone()).await.unwrap();
/// scan_state_rc = next_cursor;
/// let mut scan_keys = scan_keys
/// .into_iter()
Expand All @@ -251,19 +189,12 @@ where
/// keys
/// }
/// ```
pub async fn cluster_scan_with_pattern<K: ToRedisArgs>(
pub async fn cluster_scan(
&mut self,
scan_state_rc: ScanStateRC,
match_pattern: K,
count: Option<usize>,
object_type: Option<ObjectType>,
mut cluster_scan_args: ClusterScanArgs,
) -> RedisResult<(ScanStateRC, Vec<Value>)> {
let cluster_scan_args = ClusterScanArgs::new(
scan_state_rc,
Some(match_pattern.to_redis_args().concat()),
count,
object_type,
);
cluster_scan_args.set_scan_state_cursor(scan_state_rc);
self.route_cluster_scan(cluster_scan_args).await
}

Expand All @@ -279,18 +210,18 @@ where
sender,
})
.await
.map_err(|_| {
.map_err(|e| {
RedisError::from(io::Error::new(
io::ErrorKind::BrokenPipe,
"redis_cluster: Unable to send command",
format!("Cluster: Error occurred while trying to send SCAN command to internal send task. {e:?}"),
))
})?;
receiver
.await
.unwrap_or_else(|_| {
.unwrap_or_else(|e| {
Err(RedisError::from(io::Error::new(
io::ErrorKind::BrokenPipe,
"redis_cluster: Unable to receive command",
format!("Cluster: Failed to receive SCAN command response from internal send task. {e:?}"),
)))
})
.map(|response| match response {
Expand All @@ -316,18 +247,20 @@ where
sender,
})
.await
.map_err(|_| {
.map_err(|e| {
RedisError::from(io::Error::new(
io::ErrorKind::BrokenPipe,
"redis_cluster: Unable to send command",
format!("Cluster: Error occurred while trying to send command to internal sender. {e:?}"),
))
})?;
receiver
.await
.unwrap_or_else(|_| {
.unwrap_or_else(|e| {
Err(RedisError::from(io::Error::new(
io::ErrorKind::BrokenPipe,
"redis_cluster: Unable to receive command",
format!(
"Cluster: Failed to receive command response from internal sender. {e:?}"
),
)))
})
.map(|response| match response {
Expand Down Expand Up @@ -489,21 +422,8 @@ where
.map_err(|_| RedisError::from((ErrorKind::ClientError, MUTEX_WRITE_ERR)))
}

// return address of node for slot
pub(crate) async fn get_address_from_slot(
&self,
slot: u16,
slot_addr: SlotAddr,
) -> Option<Arc<String>> {
self.conn_lock
.read()
.expect(MUTEX_READ_ERR)
.slot_map
.get_node_address_for_slot(slot, slot_addr)
}

// return epoch of node
pub(crate) async fn get_address_epoch(&self, node_address: &str) -> Result<u64, RedisError> {
pub(crate) async fn address_epoch(&self, node_address: &str) -> Result<u64, RedisError> {
let command = cmd("CLUSTER").arg("INFO").to_owned();
let node_conn = self
.conn_lock
Expand Down Expand Up @@ -541,14 +461,26 @@ where
}
}

// return slots of node
pub(crate) async fn get_slots_of_address(&self, node_address: Arc<String>) -> Vec<u16> {
/// return slots of node
pub(crate) async fn slots_of_address(&self, node_address: Arc<String>) -> Vec<u16> {
self.conn_lock
.read()
.expect(MUTEX_READ_ERR)
.slot_map
.get_slots_of_node(node_address)
}

/// Get connection for address
pub(crate) async fn connection_for_address(
&self,
address: &str,
) -> Option<ConnectionFuture<C>> {
self.conn_lock
.read()
.expect(MUTEX_READ_ERR)
.connection_for_address(address)
.map(|(_, conn)| conn)
}
}

pub(crate) struct ClusterConnInner<C> {
Expand Down Expand Up @@ -1884,14 +1816,6 @@ where
Self::refresh_slots_inner(inner, curr_retry).await
}

pub(crate) fn check_if_all_slots_covered(slot_map: &SlotMap) -> bool {
let mut slots_covered = 0;
for (end, slots) in slot_map.slots.iter() {
slots_covered += end.saturating_sub(slots.start).saturating_add(1);
}
slots_covered == SLOT_SIZE
}

// Query a node to discover slot-> master mappings
async fn refresh_slots_inner(inner: Arc<InnerCore<C>>, curr_retry: usize) -> RedisResult<()> {
let num_of_nodes = inner.conn_lock.read().expect(MUTEX_READ_ERR).len();
Expand Down
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/src/cluster_slotmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl SlotMap {
.collect()
}

pub(crate) fn get_node_address_for_slot(
pub(crate) fn node_address_for_slot(
&self,
slot: u16,
slot_addr: SlotAddr,
Expand Down
Loading
Loading