From 789f324cda78fde264d6b95d5f06c029eea139c9 Mon Sep 17 00:00:00 2001 From: avifenesh Date: Sun, 7 Jul 2024 12:34:03 +0000 Subject: [PATCH] added bytes support --- redis/src/cluster_async/mod.rs | 74 +++++++++++++++++++++++++++--- redis/src/commands/cluster_scan.rs | 10 ++-- redis/tests/test_cluster_scan.rs | 43 ++++++++--------- 3 files changed, 91 insertions(+), 36 deletions(-) diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index d07ce645c..833fc01ce 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -33,7 +33,7 @@ use crate::{ cluster_topology::SLOT_SIZE, cmd, commands::cluster_scan::{cluster_scan, ClusterScanArgs, ObjectType, ScanStateRC}, - FromRedisValue, InfoDict, + FromRedisValue, InfoDict, ToRedisArgs, }; #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] use async_std::task::{spawn, JoinHandle}; @@ -139,7 +139,8 @@ where }) } - // Special handling for `SCAN` command, using cluster_scan + /// Special handling for `SCAN` command, using `cluster_scan`. + /// If you wish to use a match pattern, use [`cluster_scan_with_pattern`]. /// Perform a `SCAN` command on a Redis cluster, using scan state object in order to handle changes in topology /// and make sure that all keys that were in the cluster from start to end of the scan are scanned. /// In order to make sure all keys in the cluster scanned, topology refresh occurs more frequently and may affect performance. @@ -147,8 +148,7 @@ where /// # Arguments /// /// * `scan_state_rc` - A reference to the scan state, For initiating new scan send [`ScanStateRC::new()`], - /// for each subsequent iteration use the returned [`ScanStateRC`]. - /// * `match_pattern` - An optional match pattern of requested keys. + /// for each subsequent iteration use the returned [`ScanStateRC`]. /// * `count` - An optional count of keys requested, /// the amount returned can vary and not obligated to return exactly count. /// * `object_type` - An optional [`ObjectType`] enum of requested key redis type. @@ -174,7 +174,7 @@ where /// let mut keys: Vec = vec![]; /// loop { /// let (next_cursor, scan_keys): (ScanStateRC, Vec) = - /// connection.cluster_scan(scan_state_rc, None, None, None).await.unwrap(); + /// connection.cluster_scan(scan_state_rc, None, None).await.unwrap(); /// scan_state_rc = next_cursor; /// let mut scan_keys = scan_keys /// .into_iter() @@ -191,13 +191,73 @@ where pub async fn cluster_scan( &mut self, scan_state_rc: ScanStateRC, - match_pattern: Option<&str>, + count: Option, + object_type: Option, + ) -> RedisResult<(ScanStateRC, Vec)> { + let cluster_scan_args = ClusterScanArgs::new(scan_state_rc, None, count, object_type); + self.route_cluster_scan(cluster_scan_args).await + } + + /// Special handling for `SCAN` command, using `cluster_scan_with_pattern`. + /// It is a special case of [`cluster_scan`], with an additional match pattern. + /// Perform a `SCAN` command on a Redis cluster, using scan state object in order to handle changes in topology + /// and make sure that all keys that were in the cluster from start to end of the scan are scanned. + /// In order to make sure all keys in the cluster scanned, topology refresh occurs more frequently and may affect performance. + /// + /// # Arguments + /// + /// * `scan_state_rc` - A reference to the scan state, For initiating new scan send [`ScanStateRC::new()`], + /// for each subsequent iteration use the returned [`ScanStateRC`]. + /// * `match_pattern` - A match pattern of requested keys. + /// * `count` - An optional count of keys requested, + /// the amount returned can vary and not obligated to return exactly count. + /// * `object_type` - An optional [`ObjectType`] enum of requested key redis type. + /// + /// # Returns + /// + /// A [`ScanStateRC`] for the updated state of the scan and the vector of keys that were found in the scan. + /// structure of returned value: + /// `Ok((ScanStateRC, Vec))` + /// + /// When the scan is finished [`ScanStateRC`] will be None, and can be checked by calling `scan_state_wrapper.is_finished()`. + /// + /// # Example + /// ```rust,no_run + /// use redis::cluster::ClusterClient; + /// use redis::{ScanStateRC, FromRedisValue, from_redis_value, Value, ObjectType}; + /// + /// async fn scan_all_cluster() -> Vec { + /// let nodes = vec!["redis://127.0.0.1/"]; + /// let client = ClusterClient::new(nodes).unwrap(); + /// let mut connection = client.get_async_connection(None).await.unwrap(); + /// let mut scan_state_rc = ScanStateRC::new(); + /// let mut keys: Vec = vec![]; + /// loop { + /// let (next_cursor, scan_keys): (ScanStateRC, Vec) = + /// connection.cluster_scan_with_pattern(scan_state_rc, b"my_key", None, None).await.unwrap(); + /// scan_state_rc = next_cursor; + /// let mut scan_keys = scan_keys + /// .into_iter() + /// .map(|v| from_redis_value(&v).unwrap()) + /// .collect::>(); // Change the type of `keys` to `Vec` + /// keys.append(&mut scan_keys); + /// if scan_state_rc.is_finished() { + /// break; + /// } + /// } + /// keys + /// } + /// ``` + pub async fn cluster_scan_with_pattern( + &mut self, + scan_state_rc: ScanStateRC, + match_pattern: K, count: Option, object_type: Option, ) -> RedisResult<(ScanStateRC, Vec)> { let cluster_scan_args = ClusterScanArgs::new( scan_state_rc, - match_pattern.map(|s| s.to_string()), + Some(match_pattern.to_redis_args().concat()), count, object_type, ); diff --git a/redis/src/commands/cluster_scan.rs b/redis/src/commands/cluster_scan.rs index ef05ea033..83881f4a7 100644 --- a/redis/src/commands/cluster_scan.rs +++ b/redis/src/commands/cluster_scan.rs @@ -31,10 +31,10 @@ const BITS_ARRAY_SIZE: usize = NUM_OF_SLOTS / BITS_PER_U64; const END_OF_SCAN: u16 = NUM_OF_SLOTS as u16 + 1; type SlotsBitsArray = [u64; BITS_ARRAY_SIZE]; -#[derive(Debug, Clone)] +#[derive(Clone)] pub(crate) struct ClusterScanArgs { pub(crate) scan_state_cursor: ScanStateRC, - match_pattern: Option, + match_pattern: Option>, count: Option, object_type: Option, } @@ -59,7 +59,7 @@ pub enum ObjectType { impl ClusterScanArgs { pub(crate) fn new( scan_state_cursor: ScanStateRC, - match_pattern: Option, + match_pattern: Option>, count: Option, object_type: Option, ) -> Self { @@ -487,7 +487,7 @@ where async fn send_scan( scan_state: &ScanState, core: &C, - match_pattern: Option, + match_pattern: Option>, count: Option, object_type: Option, ) -> RedisResult @@ -518,7 +518,7 @@ where async fn retry_scan( scan_state: &ScanState, core: &C, - match_pattern: Option, + match_pattern: Option>, count: Option, object_type: Option, ) -> RedisResult<(RedisResult, ScanState)> diff --git a/redis/tests/test_cluster_scan.rs b/redis/tests/test_cluster_scan.rs index 348d1a6cc..a4bb85625 100644 --- a/redis/tests/test_cluster_scan.rs +++ b/redis/tests/test_cluster_scan.rs @@ -66,7 +66,7 @@ mod test_cluster_scan_async { let mut keys: Vec = vec![]; loop { let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection - .cluster_scan(scan_state_rc, None, None, None) + .cluster_scan(scan_state_rc, None, None) .await .unwrap(); scan_state_rc = next_cursor; @@ -112,7 +112,7 @@ mod test_cluster_scan_async { loop { count += 1; let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection - .cluster_scan(scan_state_rc, None, None, None) + .cluster_scan(scan_state_rc, None, None) .await .unwrap(); scan_state_rc = next_cursor; @@ -183,9 +183,8 @@ mod test_cluster_scan_async { let mut result: RedisResult = Ok(Value::Nil); loop { count += 1; - let scan_response: RedisResult<(ScanStateRC, Vec)> = connection - .cluster_scan(scan_state_rc, None, None, None) - .await; + let scan_response: RedisResult<(ScanStateRC, Vec)> = + connection.cluster_scan(scan_state_rc, None, None).await; let (next_cursor, scan_keys) = match scan_response { Ok((cursor, keys)) => (cursor, keys), Err(e) => { @@ -256,9 +255,8 @@ mod test_cluster_scan_async { let mut count = 0; loop { count += 1; - let scan_response: RedisResult<(ScanStateRC, Vec)> = connection - .cluster_scan(scan_state_rc, None, None, None) - .await; + let scan_response: RedisResult<(ScanStateRC, Vec)> = + connection.cluster_scan(scan_state_rc, None, None).await; if scan_response.is_err() { println!("error: {:?}", scan_response); } @@ -427,9 +425,8 @@ mod test_cluster_scan_async { let mut count = 0; loop { count += 1; - let scan_response: RedisResult<(ScanStateRC, Vec)> = connection - .cluster_scan(scan_state_rc, None, None, None) - .await; + let scan_response: RedisResult<(ScanStateRC, Vec)> = + connection.cluster_scan(scan_state_rc, None, None).await; if scan_response.is_err() { println!("error: {:?}", scan_response); } @@ -497,7 +494,7 @@ mod test_cluster_scan_async { let mut keys: Vec = vec![]; loop { let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection - .cluster_scan(scan_state_rc, None, None, None) + .cluster_scan(scan_state_rc, None, None) .await .unwrap(); scan_state_rc = next_cursor; @@ -557,7 +554,7 @@ mod test_cluster_scan_async { let mut keys: Vec = vec![]; loop { let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection - .cluster_scan(scan_state_rc, None, None, None) + .cluster_scan(scan_state_rc, None, None) .await .unwrap(); scan_state_rc = next_cursor; @@ -625,7 +622,7 @@ mod test_cluster_scan_async { let mut keys: Vec = vec![]; loop { let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection - .cluster_scan(scan_state_rc, Some("key:pattern:*"), None, None) + .cluster_scan_with_pattern(scan_state_rc, "key:pattern:*", None, None) .await .unwrap(); scan_state_rc = next_cursor; @@ -683,7 +680,7 @@ mod test_cluster_scan_async { let mut keys: Vec = vec![]; loop { let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection - .cluster_scan(scan_state_rc, None, None, Some(ObjectType::Set)) + .cluster_scan(scan_state_rc, None, Some(ObjectType::Set)) .await .unwrap(); scan_state_rc = next_cursor; @@ -736,11 +733,11 @@ mod test_cluster_scan_async { let mut comparing_times = 0; loop { let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection - .cluster_scan(scan_state_rc.clone(), None, Some(100), None) + .cluster_scan(scan_state_rc.clone(), Some(100), None) .await .unwrap(); let (_, scan_without_count_keys): (ScanStateRC, Vec) = connection - .cluster_scan(scan_state_rc, None, Some(100), None) + .cluster_scan(scan_state_rc, Some(100), None) .await .unwrap(); if !scan_keys.is_empty() && !scan_without_count_keys.is_empty() { @@ -795,9 +792,8 @@ mod test_cluster_scan_async { let mut count = 0; loop { count += 1; - let scan_response: RedisResult<(ScanStateRC, Vec)> = connection - .cluster_scan(scan_state_rc, None, None, None) - .await; + let scan_response: RedisResult<(ScanStateRC, Vec)> = + connection.cluster_scan(scan_state_rc, None, None).await; if scan_response.is_err() { println!("error: {:?}", scan_response); } @@ -810,7 +806,7 @@ mod test_cluster_scan_async { if count == 5 { drop(cluster); let scan_response: RedisResult<(ScanStateRC, Vec)> = connection - .cluster_scan(scan_state_rc.clone(), None, None, None) + .cluster_scan(scan_state_rc.clone(), None, None) .await; assert!(scan_response.is_err()); break; @@ -819,9 +815,8 @@ mod test_cluster_scan_async { cluster = TestClusterContext::new(3, 0); connection = cluster.async_connection(None).await; loop { - let scan_response: RedisResult<(ScanStateRC, Vec)> = connection - .cluster_scan(scan_state_rc, None, None, None) - .await; + let scan_response: RedisResult<(ScanStateRC, Vec)> = + connection.cluster_scan(scan_state_rc, None, None).await; if scan_response.is_err() { println!("error: {:?}", scan_response); }