diff --git a/glide-core/src/client/mod.rs b/glide-core/src/client/mod.rs index 2abcd06aa4..aa545c5f9c 100644 --- a/glide-core/src/client/mod.rs +++ b/glide-core/src/client/mod.rs @@ -3,19 +3,20 @@ */ mod types; +use crate::cluster_scan_container::insert_cluster_scan_cursor; use crate::scripts_container::get_script; use futures::FutureExt; use logger_core::log_info; use redis::aio::ConnectionLike; use redis::cluster_async::ClusterConnection; use redis::cluster_routing::{Routable, RoutingInfo, SingleNodeRoutingInfo}; -use redis::{Cmd, ErrorKind, PushInfo, Value}; -use redis::{RedisError, RedisResult}; +use redis::{Cmd, ErrorKind, ObjectType, PushInfo, RedisError, RedisResult, ScanStateRC, Value}; pub use standalone_client::StandaloneClient; use std::io; use std::ops::Deref; use std::time::Duration; pub use types::*; +use value_conversion::ExpectedReturnType; use self::value_conversion::{convert_to_expected_type, expected_type_for_cmd, get_value_type}; mod reconnecting_connection; @@ -217,6 +218,42 @@ fn get_request_timeout(cmd: &Cmd, default_timeout: Duration) -> RedisResult( + &'a mut self, + scan_state_cursor: &'a ScanStateRC, + match_pattern: &'a Option<&str>, + count: Option, + object_type: Option, + ) -> RedisResult { + match self.internal_client { + ClientWrapper::Standalone(_) => { + unreachable!("Cluster scan is not supported in standalone mode") + } + ClientWrapper::Cluster { ref mut client } => { + let (cursor, keys) = client + .cluster_scan( + scan_state_cursor.clone(), + *match_pattern, + count, + object_type, + ) + .await?; + + let cluster_hash = if cursor.is_finished() { + "finished".to_string() + } else { + insert_cluster_scan_cursor(cursor) + }; + convert_to_expected_type( + Value::Array(vec![Value::SimpleString(cluster_hash), Value::Array(keys)]), + Some(ExpectedReturnType::ClusterScanReturnType { + cursor: &ExpectedReturnType::BulkString, + keys: &ExpectedReturnType::ArrayOfStrings, + }), + ) + } + } + } pub fn send_command<'a>( &'a mut self, cmd: &'a Cmd, diff --git a/glide-core/src/client/value_conversion.rs b/glide-core/src/client/value_conversion.rs index 6b8e93a69a..86ccafd939 100644 --- a/glide-core/src/client/value_conversion.rs +++ b/glide-core/src/client/value_conversion.rs @@ -33,6 +33,10 @@ pub(crate) enum ExpectedReturnType<'a> { KeyWithMemberAndScore, FunctionStatsReturnType, GeoSearchReturnType, + ClusterScanReturnType { + cursor: &'a ExpectedReturnType<'a>, + keys: &'a ExpectedReturnType<'a>, + }, } pub(crate) fn convert_to_expected_type( @@ -44,6 +48,31 @@ pub(crate) fn convert_to_expected_type( }; match expected { + ExpectedReturnType::ClusterScanReturnType { + cursor, + keys, + } => match value { + Value::Nil => Ok(value), + Value::Array(array) => { + if array.len() != 2 { + return Err(( + ErrorKind::TypeError, + "Array must contain exactly two elements", + ) + .into()); + } + + let cursor = convert_to_expected_type(array[0].clone(), Some(*cursor))?; + let keys = convert_to_expected_type(array[1].clone(), Some(*keys))?; + Ok(Value::Array(vec![cursor, keys])) + }, + _ => Err(( + ErrorKind::TypeError, + "Response couldn't be converted to map", + format!("(response was {:?})", get_value_type(&value)), + ) + .into()), + } ExpectedReturnType::Map { key_type, value_type, diff --git a/glide-core/src/cluster_scan_container.rs b/glide-core/src/cluster_scan_container.rs new file mode 100644 index 0000000000..810fa7902f --- /dev/null +++ b/glide-core/src/cluster_scan_container.rs @@ -0,0 +1,40 @@ +/** + * Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 + */ +use logger_core::log_info; +use once_cell::sync::Lazy; +use redis::{RedisResult, ScanStateRC}; +use sha1_smol::Sha1; +use std::{collections::HashMap, sync::Mutex}; + +static CONTAINER: Lazy>> = + Lazy::new(|| Mutex::new(HashMap::new())); + +pub fn insert_cluster_scan_cursor(scan_state: ScanStateRC) -> String { + let hash = Sha1::new(); + let hash = hash.digest().to_string(); + CONTAINER + .lock() + .unwrap() + .insert(hash.clone(), scan_state.into()); + hash +} + +pub fn get_cluster_scan_cursor(hash: String) -> RedisResult { + let scan_state_rc = CONTAINER.lock().unwrap().get(&hash).cloned(); + match scan_state_rc { + Some(scan_state_rc) => Ok(scan_state_rc.into()), + None => Err(redis::RedisError::from(( + redis::ErrorKind::ResponseError, + "Invalid scan_state_cursor hash", + ))), + } +} + +pub fn remove_scan_state_cursor(hash: String) { + log_info( + "scan_state_cursor lifetime", + format!("Removed scan_state_cursor with hash: `{hash}`"), + ); + CONTAINER.lock().unwrap().remove(&hash); +} diff --git a/glide-core/src/lib.rs b/glide-core/src/lib.rs index f904928be1..34ab3b6ee1 100644 --- a/glide-core/src/lib.rs +++ b/glide-core/src/lib.rs @@ -15,4 +15,5 @@ pub use socket_listener::*; pub mod errors; pub mod scripts_container; pub use client::ConnectionRequest; +pub mod cluster_scan_container; pub mod request_type; diff --git a/glide-core/src/protobuf/redis_request.proto b/glide-core/src/protobuf/redis_request.proto index 1362911ffd..9e04d5f104 100644 --- a/glide-core/src/protobuf/redis_request.proto +++ b/glide-core/src/protobuf/redis_request.proto @@ -238,6 +238,15 @@ enum RequestType { FunctionRestore = 197; } +enum ObjectType { + StringType = 0; + ListType = 1; + SetType = 2; + ZSetType = 3; + HashType = 5; + None = 6; +} + message Command { message ArgsArray { repeated bytes args = 1; @@ -260,6 +269,13 @@ message Transaction { repeated Command commands = 1; } +message ClusterScan { + string cursor = 1; + optional string match_pattern = 2; + optional int64 count = 3; + optional ObjectType object_type = 4; +} + message RedisRequest { uint32 callback_idx = 1; @@ -267,6 +283,7 @@ message RedisRequest { Command single_command = 2; Transaction transaction = 3; ScriptInvocation script_invocation = 4; + ClusterScan cluster_scan = 5; } - Routes route = 5; + Routes route = 6; } diff --git a/glide-core/src/socket_listener.rs b/glide-core/src/socket_listener.rs index 2c9f91d753..e99a86487f 100644 --- a/glide-core/src/socket_listener.rs +++ b/glide-core/src/socket_listener.rs @@ -3,10 +3,12 @@ */ use super::rotating_buffer::RotatingBuffer; use crate::client::Client; +use crate::cluster_scan_container::get_cluster_scan_cursor; use crate::connection_request::ConnectionRequest; use crate::errors::{error_message, error_type, RequestErrorType}; use crate::redis_request::{ - command, redis_request, Command, RedisRequest, Routes, ScriptInvocation, SlotTypes, Transaction, + command, redis_request, ClusterScan, Command, ObjectType, RedisRequest, Routes, + ScriptInvocation, SlotTypes, Transaction, }; use crate::response; use crate::response::Response; @@ -20,8 +22,7 @@ use redis::cluster_routing::{ MultipleNodeRoutingInfo, Route, RoutingInfo, SingleNodeRoutingInfo, SlotAddr, }; use redis::cluster_routing::{ResponsePolicy, Routable}; -use redis::RedisError; -use redis::{Cmd, PushInfo, Value}; +use redis::{Cmd, PushInfo, RedisError, ScanStateRC, Value}; use std::cell::Cell; use std::rc::Rc; use std::{env, str}; @@ -311,6 +312,47 @@ async fn send_command( .map_err(|err| err.into()) } +async fn cluster_scan(cluster_scan: ClusterScan, mut client: Client) -> ClientUsageResult { + let cursor = cluster_scan.cursor.into(); + let cluster_scan_cursor = if cursor == String::new() { + ScanStateRC::new() + } else { + get_cluster_scan_cursor(cursor)? + }; + + let match_pattern_string: String; + let match_pattern = match cluster_scan.match_pattern { + Some(pattern) => { + match_pattern_string = pattern.to_string(); + Some(match_pattern_string.as_str()) + } + None => None, + }; + let count = match cluster_scan.count { + Some(count) => Some(count as usize), + None => None, + }; + let object_type = match cluster_scan.object_type { + Some(object_type) => match object_type.enum_value_or(ObjectType::None) { + ObjectType::StringType => Some(redis::ObjectType::String), + ObjectType::ListType => Some(redis::ObjectType::List), + ObjectType::SetType => Some(redis::ObjectType::Set), + ObjectType::ZSetType => Some(redis::ObjectType::ZSet), + ObjectType::HashType => Some(redis::ObjectType::Hash), + _ => None, + }, + None => None, + }; + + let result = client + .cluster_scan(&cluster_scan_cursor, &match_pattern, count, object_type) + .await; + match result { + Ok(result) => Ok(result.into()), + Err(err) => Err(err.into()), + } +} + async fn invoke_script( script: ScriptInvocation, mut client: Client, @@ -415,6 +457,10 @@ fn handle_request(request: RedisRequest, client: Client, writer: Rc) { task::spawn_local(async move { let result = match request.command { Some(action) => match action { + redis_request::Command::ClusterScan(cluster_scan_command) => { + cluster_scan(cluster_scan_command, client).await + } + redis_request::Command::SingleCommand(command) => { match get_redis_command(&command) { Ok(cmd) => match get_route(request.route.0, Some(&cmd)) { diff --git a/python/DEVELOPER.md b/python/DEVELOPER.md index 58918ce352..32ab7531bb 100644 --- a/python/DEVELOPER.md +++ b/python/DEVELOPER.md @@ -33,6 +33,7 @@ source "$HOME/.cargo/env" rustc --version # Install protobuf compiler PB_REL="https://github.com/protocolbuffers/protobuf/releases" +# For other arch the signature of the should be protoc---.zip, e.g. protoc-3.20.3-linux-aarch_64.zip for ARM64. curl -LO $PB_REL/download/v3.20.3/protoc-3.20.3-linux-x86_64.zip unzip protoc-3.20.3-linux-x86_64.zip -d $HOME/.local export PATH="$PATH:$HOME/.local/bin" diff --git a/python/python/glide/__init__.py b/python/python/glide/__init__.py index c8e8b0a60b..e0654d949c 100644 --- a/python/python/glide/__init__.py +++ b/python/python/glide/__init__.py @@ -91,7 +91,7 @@ SlotType, ) -from .glide import Script +from .glide import ClusterScanCursor, Script __all__ = [ # Client @@ -163,6 +163,7 @@ "TrimByMaxLen", "TrimByMinId", "UpdateOptions", + "ClusterScanCursor" # Logger "Logger", "LogLevel", diff --git a/python/python/glide/async_commands/cluster_commands.py b/python/python/glide/async_commands/cluster_commands.py index 65957bc5f7..d164ee5e53 100644 --- a/python/python/glide/async_commands/cluster_commands.py +++ b/python/python/glide/async_commands/cluster_commands.py @@ -2,9 +2,9 @@ from __future__ import annotations -from typing import Dict, List, Mapping, Optional, cast +from typing import Dict, List, Mapping, Optional, Tuple, cast -from glide.async_commands.command_args import Limit, OrderBy +from glide.async_commands.command_args import Limit, ObjectType, OrderBy from glide.async_commands.core import ( CoreCommands, FlushMode, @@ -16,6 +16,8 @@ from glide.protobuf.redis_request_pb2 import RequestType from glide.routes import Route +from ..glide import ClusterScanCursor + class ClusterCommands(CoreCommands): async def custom_command( @@ -554,3 +556,51 @@ async def copy( bool, await self._execute_command(RequestType.Copy, args), ) + async def cluster_scan( + self, + cursor: ClusterScanCursor, + match: Optional[str] = None, + count: Optional[int] = None, + type: Optional[ObjectType] = None, + ) -> Tuple[ClusterScanCursor, List[str]]: + """ + Incrementally iterates over the keys in the Redis Cluster. + The method returns a tuple containing the next cursor and a list of keys. + + This command is similar to the SCAN command, but it is designed to work in a Redis Cluster environment. + It do so by iterating over the keys in the cluster, one node at a time, while maintaining a consistent view of + the slots that are being scanned. + + See https://valkey.io/commands/scan/ for more details. + + Args: + cursor (ClusterScanCursor): The cursor object wrapping the scan state - when starting a new scan + creation of new empty ClusterScanCursor is needed `ClusterScanCursor()`. + match (Optional[str]): A pattern to match keys against. + count (Optional[int]): The number of keys to return in a single iteration - the amount returned can vary and + not obligated to return exactly count. + This param is just a hint to the server of how much steps to do in each iteration. + type (Optional[ObjectType]): The type of object to scan for (STRING, LIST, SET, ZSET, HASH). + + Returns: + Tuple[str, List[str]]: A tuple containing the next cursor and a list of keys. + + Examples: + >>> let cursor = ClusterScanCursor() + all_keys = [] + while not cursor.is_finished(): + cursor, keys = await client.cluster_scan(cursor, match="my_key*", count=10) + all_keys.extend(keys) + >>> let cursor = ClusterScanCursor() + all_keys = [] + while not cursor.is_finished(): + cursor, keys = await client.cluster_scan(cursor, type=ClusterScanObjectType.STRING) + all_keys.extend(keys) + """ + response = await self._cluster_scan(cursor, match, count, type) + casted_response = cast( + Tuple[str, List[str]], + response, + ) + cursor = ClusterScanCursor(casted_response[0]) + return cursor, casted_response[1] diff --git a/python/python/glide/async_commands/command_args.py b/python/python/glide/async_commands/command_args.py index 39d3e4982c..1ff062f14a 100644 --- a/python/python/glide/async_commands/command_args.py +++ b/python/python/glide/async_commands/command_args.py @@ -63,3 +63,31 @@ class ListDirection(Enum): """ RIGHT: Represents the option that elements should be popped from or added to the right side of a list. """ + + +class ObjectType(Enum): + """ + Represents a string object in Redis. + """ + + STRING = 0 + + """ + Represents a list object in Redis. + """ + LIST = 1 + + """ + Represents a set object in Redis. + """ + SET = 2 + + """ + Represents a sorted set object in Redis. + """ + ZSET = 3 + + """ + Represents a hash object in Redis. + """ + HASH = 4 diff --git a/python/python/glide/async_commands/core.py b/python/python/glide/async_commands/core.py index c131127d6e..811fd0661d 100644 --- a/python/python/glide/async_commands/core.py +++ b/python/python/glide/async_commands/core.py @@ -26,7 +26,7 @@ _create_bitfield_args, _create_bitfield_read_only_args, ) -from glide.async_commands.command_args import Limit, ListDirection, OrderBy +from glide.async_commands.command_args import Limit, ListDirection, ObjectType, OrderBy from glide.async_commands.sorted_set import ( AggregationType, GeoSearchByBox, @@ -54,7 +54,7 @@ from glide.protobuf.redis_request_pb2 import RequestType from glide.routes import Route -from ..glide import Script +from ..glide import ClusterScanCursor, Script class ConditionalChange(Enum): @@ -357,6 +357,14 @@ async def _execute_script( route: Optional[Route] = None, ) -> TResult: ... + async def _cluster_scan( + self, + cursor: ClusterScanCursor, + match: Optional[str] = ..., + count: Optional[int] = ..., + type: Optional[ObjectType] = ..., + ) -> TResult: ... + async def set( self, key: str, diff --git a/python/python/glide/constants.py b/python/python/glide/constants.py index 6c2cf47148..72114b1481 100644 --- a/python/python/glide/constants.py +++ b/python/python/glide/constants.py @@ -1,10 +1,11 @@ # Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 -from typing import Dict, List, Literal, Mapping, Optional, Set, TypeVar, Union +from typing import Dict, List, Literal, Mapping, Optional, Set, TypeVar, Union, Tuple from glide.protobuf.connection_request_pb2 import ConnectionRequest from glide.protobuf.redis_request_pb2 import RedisRequest from glide.routes import ByAddressRoute, RandomNode, SlotIdRoute, SlotKeyRoute +from .glide import ClusterScanCursor OK: str = "OK" DEFAULT_READ_BYTES_SIZE: int = pow(2, 16) @@ -22,6 +23,7 @@ float, Set[T], List[T], + Tuple[ClusterScanCursor, List[str]] ] TRequest = Union[RedisRequest, ConnectionRequest] # When routing to a single node, response will be T diff --git a/python/python/glide/glide.pyi b/python/python/glide/glide.pyi index d155757bbd..f090240973 100644 --- a/python/python/glide/glide.pyi +++ b/python/python/glide/glide.pyi @@ -20,6 +20,12 @@ class Script: def get_hash(self) -> str: ... def __del__(self) -> None: ... +class ClusterScanCursor: + def __init__(self, cursor: Optional[str]) -> None: ... + def get_cursor(self) -> ClusterScanCursor: ... + def is_finished(self) -> bool: ... + def __del__(self) -> None: ... + def start_socket_listener_external(init_callback: Callable) -> None: ... def value_from_pointer(pointer: int) -> TResult: ... def create_leaked_value(message: str) -> int: ... diff --git a/python/python/glide/redis_client.py b/python/python/glide/redis_client.py index edb9fd4122..ce99eec61f 100644 --- a/python/python/glide/redis_client.py +++ b/python/python/glide/redis_client.py @@ -6,6 +6,7 @@ import async_timeout from glide.async_commands.cluster_commands import ClusterCommands +from glide.async_commands.command_args import ObjectType from glide.async_commands.core import CoreCommands from glide.async_commands.standalone_commands import StandaloneCommands from glide.config import BaseClientConfiguration @@ -29,6 +30,7 @@ from .glide import ( DEFAULT_TIMEOUT_IN_MILLISECONDS, + ClusterScanCursor, start_socket_listener_external, value_from_pointer, ) @@ -467,6 +469,30 @@ class RedisClusterClient(BaseRedisClient, ClusterCommands): https://github.com/aws/babushka/wiki/Python-wrapper#redis-cluster """ + async def _cluster_scan( + self, + cursor: ClusterScanCursor, + match: Optional[str] = None, + count: Optional[int] = None, + type: Optional[ObjectType] = None, + ) -> Tuple[ClusterScanCursor, List[str]]: + if self._is_closed: + raise ClosingError( + "Unable to execute requests; the client is closed. Please create a new client." + ) + request = RedisRequest() + request.callback_idx = self._get_callback_index() + cursor = cursor.get_cursor() + if cursor is not None: + request.cluster_scan.cursor = cursor + if match is not None: + request.cluster_scan.match_pattern = match + if count is not None: + request.cluster_scan.count = count + if type is not None: + request.cluster_scan.object_type = type.value + return await self._write_request_await_response(request) + def _get_protobuf_conn_request(self) -> ConnectionRequest: return self.config._create_a_protobuf_conn_request(cluster_mode=True) diff --git a/python/python/tests/test_async_client.py b/python/python/tests/test_async_client.py index 20be6a28f6..8fa1a67e97 100644 --- a/python/python/tests/test_async_client.py +++ b/python/python/tests/test_async_client.py @@ -11,7 +11,7 @@ from typing import Any, Dict, List, Union, cast import pytest -from glide import ClosingError, RequestError, Script +from glide import ClosingError, ClusterScanCursor, RequestError, Script from glide.async_commands.bitmap import ( BitFieldGet, BitFieldIncrBy, @@ -26,7 +26,7 @@ SignedEncoding, UnsignedEncoding, ) -from glide.async_commands.command_args import Limit, ListDirection, OrderBy +from glide.async_commands.command_args import Limit, ListDirection, ObjectType, OrderBy from glide.async_commands.core import ( ConditionalChange, ExpireOptions, @@ -5752,6 +5752,41 @@ async def test_copy_database(self, redis_client: RedisClient): await redis_client.copy(source, destination, -1, replace=True) finally: assert await redis_client.select(0) == OK + @pytest.mark.parametrize("cluster_mode", [True]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_simple_cluster_scan(self, redis_client: RedisClusterClient): + expected_keys = [f"key:{i}" for i in range(100)] + for key in expected_keys: + await redis_client.set(key, "value") + cursor = ClusterScanCursor(None) + keys = [] + while True: + result = await redis_client.cluster_scan(cursor) + cursor = result[0] + keys.extend(result[1]) + if cursor.is_finished(): + break + assert set(expected_keys) == set(keys) + + @pytest.mark.parametrize("cluster_mode", [True]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_cluster_scan_with_object_type( + self, redis_client: RedisClusterClient + ): + expected_keys = [f"key:{i}" for i in range(100)] + for key in expected_keys: + await redis_client.set(key, "value") + unexpected_keys = [f"key:{i}" for i in range(100, 200)] + for key in unexpected_keys: + await redis_client.sadd(key, ["value"]) + cursor = ClusterScanCursor(None) + keys = [] + while not cursor.is_finished(): + result = await redis_client.cluster_scan(cursor, type=ObjectType.STRING) + cursor = result[0] + keys.extend(result[1]) + assert set(expected_keys) == set(keys) + assert not set(unexpected_keys).intersection(set(keys)) class TestMultiKeyCommandCrossSlot: diff --git a/python/src/lib.rs b/python/src/lib.rs index 4380b064c9..02f5f1cdde 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -29,6 +29,41 @@ impl Level { } } +/// This struct is used to keep track of the cursor of a cluster scan. +/// We want to avoid passing the cursor between layers of the application, +/// So we keep the state in the container and only pass the hash of the cursor. +/// The cursor is stored in the container and can be retrieved using the hash. +/// The cursor is removed from the container when the object is deleted (dropped). +#[pyclass] +pub struct ClusterScanCursor { + cursor: String, +} + +#[pymethods] +impl ClusterScanCursor { + #[new] + fn new(new_cursor: Option) -> Self { + match new_cursor { + Some(cursor) => ClusterScanCursor { cursor }, + None => ClusterScanCursor { + cursor: String::new(), + }, + } + } + + fn get_cursor(&self) -> String { + self.cursor.clone() + } + + fn __del__(&mut self) { + glide_core::cluster_scan_container::remove_scan_state_cursor(self.cursor.clone()); + } + + fn is_finished(&self) -> bool { + self.cursor == "finished" + } +} + #[pyclass] pub struct Script { hash: String, @@ -56,6 +91,7 @@ impl Script { fn glide(_py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::