Skip to content

Commit

Permalink
draft of cluster scan impl
Browse files Browse the repository at this point in the history
  • Loading branch information
avifenesh committed Jun 24, 2024
1 parent 2390e71 commit 251af1c
Show file tree
Hide file tree
Showing 17 changed files with 378 additions and 15 deletions.
41 changes: 39 additions & 2 deletions glide-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,20 @@
*/
mod types;

use crate::cluster_scan_container::insert_cluster_scan_cursor;
use crate::scripts_container::get_script;
use futures::FutureExt;
use logger_core::log_info;
use redis::aio::ConnectionLike;
use redis::cluster_async::ClusterConnection;
use redis::cluster_routing::{Routable, RoutingInfo, SingleNodeRoutingInfo};
use redis::{Cmd, ErrorKind, PushInfo, Value};
use redis::{RedisError, RedisResult};
use redis::{Cmd, ErrorKind, ObjectType, PushInfo, RedisError, RedisResult, ScanStateRC, Value};
pub use standalone_client::StandaloneClient;
use std::io;
use std::ops::Deref;
use std::time::Duration;
pub use types::*;
use value_conversion::ExpectedReturnType;

use self::value_conversion::{convert_to_expected_type, expected_type_for_cmd, get_value_type};
mod reconnecting_connection;
Expand Down Expand Up @@ -217,6 +218,42 @@ fn get_request_timeout(cmd: &Cmd, default_timeout: Duration) -> RedisResult<Opti
}

impl Client {
pub async fn cluster_scan<'a>(
&'a mut self,
scan_state_cursor: &'a ScanStateRC,
match_pattern: &'a Option<&str>,
count: Option<usize>,
object_type: Option<ObjectType>,
) -> RedisResult<Value> {
match self.internal_client {
ClientWrapper::Standalone(_) => {
unreachable!("Cluster scan is not supported in standalone mode")
}
ClientWrapper::Cluster { ref mut client } => {
let (cursor, keys) = client
.cluster_scan(
scan_state_cursor.clone(),
*match_pattern,
count,
object_type,
)
.await?;

let cluster_hash = if cursor.is_finished() {
"finished".to_string()
} else {
insert_cluster_scan_cursor(cursor)
};
convert_to_expected_type(
Value::Array(vec![Value::SimpleString(cluster_hash), Value::Array(keys)]),
Some(ExpectedReturnType::ClusterScanReturnType {
cursor: &ExpectedReturnType::BulkString,
keys: &ExpectedReturnType::ArrayOfStrings,
}),
)
}
}
}
pub fn send_command<'a>(
&'a mut self,
cmd: &'a Cmd,
Expand Down
29 changes: 29 additions & 0 deletions glide-core/src/client/value_conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ pub(crate) enum ExpectedReturnType<'a> {
KeyWithMemberAndScore,
FunctionStatsReturnType,
GeoSearchReturnType,
ClusterScanReturnType {
cursor: &'a ExpectedReturnType<'a>,
keys: &'a ExpectedReturnType<'a>,
},
}

pub(crate) fn convert_to_expected_type(
Expand All @@ -44,6 +48,31 @@ pub(crate) fn convert_to_expected_type(
};

match expected {
ExpectedReturnType::ClusterScanReturnType {
cursor,
keys,
} => match value {
Value::Nil => Ok(value),
Value::Array(array) => {
if array.len() != 2 {
return Err((
ErrorKind::TypeError,
"Array must contain exactly two elements",
)
.into());
}

let cursor = convert_to_expected_type(array[0].clone(), Some(*cursor))?;
let keys = convert_to_expected_type(array[1].clone(), Some(*keys))?;
Ok(Value::Array(vec![cursor, keys]))
},
_ => Err((
ErrorKind::TypeError,
"Response couldn't be converted to map",
format!("(response was {:?})", get_value_type(&value)),
)
.into()),
}
ExpectedReturnType::Map {
key_type,
value_type,
Expand Down
40 changes: 40 additions & 0 deletions glide-core/src/cluster_scan_container.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0
*/
use logger_core::log_info;
use once_cell::sync::Lazy;
use redis::{RedisResult, ScanStateRC};
use sha1_smol::Sha1;
use std::{collections::HashMap, sync::Mutex};

static CONTAINER: Lazy<Mutex<HashMap<String, ScanStateRC>>> =
Lazy::new(|| Mutex::new(HashMap::new()));

pub fn insert_cluster_scan_cursor(scan_state: ScanStateRC) -> String {
let hash = Sha1::new();
let hash = hash.digest().to_string();
CONTAINER
.lock()
.unwrap()
.insert(hash.clone(), scan_state.into());
hash
}

pub fn get_cluster_scan_cursor(hash: String) -> RedisResult<ScanStateRC> {
let scan_state_rc = CONTAINER.lock().unwrap().get(&hash).cloned();
match scan_state_rc {
Some(scan_state_rc) => Ok(scan_state_rc.into()),
None => Err(redis::RedisError::from((
redis::ErrorKind::ResponseError,
"Invalid scan_state_cursor hash",
))),
}
}

pub fn remove_scan_state_cursor(hash: String) {
log_info(
"scan_state_cursor lifetime",
format!("Removed scan_state_cursor with hash: `{hash}`"),
);
CONTAINER.lock().unwrap().remove(&hash);
}
1 change: 1 addition & 0 deletions glide-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ pub use socket_listener::*;
pub mod errors;
pub mod scripts_container;
pub use client::ConnectionRequest;
pub mod cluster_scan_container;
pub mod request_type;
19 changes: 18 additions & 1 deletion glide-core/src/protobuf/redis_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,15 @@ enum RequestType {
FunctionRestore = 197;
}

enum ObjectType {
StringType = 0;
ListType = 1;
SetType = 2;
ZSetType = 3;
HashType = 5;
None = 6;
}

message Command {
message ArgsArray {
repeated bytes args = 1;
Expand All @@ -260,13 +269,21 @@ message Transaction {
repeated Command commands = 1;
}

message ClusterScan {
string cursor = 1;
optional string match_pattern = 2;
optional int64 count = 3;
optional ObjectType object_type = 4;
}

message RedisRequest {
uint32 callback_idx = 1;

oneof command {
Command single_command = 2;
Transaction transaction = 3;
ScriptInvocation script_invocation = 4;
ClusterScan cluster_scan = 5;
}
Routes route = 5;
Routes route = 6;
}
52 changes: 49 additions & 3 deletions glide-core/src/socket_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
*/
use super::rotating_buffer::RotatingBuffer;
use crate::client::Client;
use crate::cluster_scan_container::get_cluster_scan_cursor;
use crate::connection_request::ConnectionRequest;
use crate::errors::{error_message, error_type, RequestErrorType};
use crate::redis_request::{
command, redis_request, Command, RedisRequest, Routes, ScriptInvocation, SlotTypes, Transaction,
command, redis_request, ClusterScan, Command, ObjectType, RedisRequest, Routes,
ScriptInvocation, SlotTypes, Transaction,
};
use crate::response;
use crate::response::Response;
Expand All @@ -20,8 +22,7 @@ use redis::cluster_routing::{
MultipleNodeRoutingInfo, Route, RoutingInfo, SingleNodeRoutingInfo, SlotAddr,
};
use redis::cluster_routing::{ResponsePolicy, Routable};
use redis::RedisError;
use redis::{Cmd, PushInfo, Value};
use redis::{Cmd, PushInfo, RedisError, ScanStateRC, Value};
use std::cell::Cell;
use std::rc::Rc;
use std::{env, str};
Expand Down Expand Up @@ -311,6 +312,47 @@ async fn send_command(
.map_err(|err| err.into())
}

async fn cluster_scan(cluster_scan: ClusterScan, mut client: Client) -> ClientUsageResult<Value> {
let cursor = cluster_scan.cursor.into();
let cluster_scan_cursor = if cursor == String::new() {
ScanStateRC::new()
} else {
get_cluster_scan_cursor(cursor)?
};

let match_pattern_string: String;
let match_pattern = match cluster_scan.match_pattern {
Some(pattern) => {
match_pattern_string = pattern.to_string();
Some(match_pattern_string.as_str())
}
None => None,
};
let count = match cluster_scan.count {
Some(count) => Some(count as usize),
None => None,
};
let object_type = match cluster_scan.object_type {
Some(object_type) => match object_type.enum_value_or(ObjectType::None) {
ObjectType::StringType => Some(redis::ObjectType::String),
ObjectType::ListType => Some(redis::ObjectType::List),
ObjectType::SetType => Some(redis::ObjectType::Set),
ObjectType::ZSetType => Some(redis::ObjectType::ZSet),
ObjectType::HashType => Some(redis::ObjectType::Hash),
_ => None,
},
None => None,
};

let result = client
.cluster_scan(&cluster_scan_cursor, &match_pattern, count, object_type)
.await;
match result {
Ok(result) => Ok(result.into()),
Err(err) => Err(err.into()),
}
}

async fn invoke_script(
script: ScriptInvocation,
mut client: Client,
Expand Down Expand Up @@ -415,6 +457,10 @@ fn handle_request(request: RedisRequest, client: Client, writer: Rc<Writer>) {
task::spawn_local(async move {
let result = match request.command {
Some(action) => match action {
redis_request::Command::ClusterScan(cluster_scan_command) => {
cluster_scan(cluster_scan_command, client).await
}

redis_request::Command::SingleCommand(command) => {
match get_redis_command(&command) {
Ok(cmd) => match get_route(request.route.0, Some(&cmd)) {
Expand Down
1 change: 1 addition & 0 deletions python/DEVELOPER.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ source "$HOME/.cargo/env"
rustc --version
# Install protobuf compiler
PB_REL="https://github.com/protocolbuffers/protobuf/releases"
# For other arch the signature of the should be protoc-<version>-<os>-<arch>.zip, e.g. protoc-3.20.3-linux-aarch_64.zip for ARM64.
curl -LO $PB_REL/download/v3.20.3/protoc-3.20.3-linux-x86_64.zip
unzip protoc-3.20.3-linux-x86_64.zip -d $HOME/.local
export PATH="$PATH:$HOME/.local/bin"
Expand Down
3 changes: 2 additions & 1 deletion python/python/glide/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
SlotType,
)

from .glide import Script
from .glide import ClusterScanCursor, Script

__all__ = [
# Client
Expand Down Expand Up @@ -163,6 +163,7 @@
"TrimByMaxLen",
"TrimByMinId",
"UpdateOptions",
"ClusterScanCursor"
# Logger
"Logger",
"LogLevel",
Expand Down
54 changes: 52 additions & 2 deletions python/python/glide/async_commands/cluster_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

from __future__ import annotations

from typing import Dict, List, Mapping, Optional, cast
from typing import Dict, List, Mapping, Optional, Tuple, cast

from glide.async_commands.command_args import Limit, OrderBy
from glide.async_commands.command_args import Limit, ObjectType, OrderBy
from glide.async_commands.core import (
CoreCommands,
FlushMode,
Expand All @@ -16,6 +16,8 @@
from glide.protobuf.redis_request_pb2 import RequestType
from glide.routes import Route

from ..glide import ClusterScanCursor


class ClusterCommands(CoreCommands):
async def custom_command(
Expand Down Expand Up @@ -554,3 +556,51 @@ async def copy(
bool,
await self._execute_command(RequestType.Copy, args),
)
async def cluster_scan(
self,
cursor: ClusterScanCursor,
match: Optional[str] = None,
count: Optional[int] = None,
type: Optional[ObjectType] = None,
) -> Tuple[ClusterScanCursor, List[str]]:
"""
Incrementally iterates over the keys in the Redis Cluster.
The method returns a tuple containing the next cursor and a list of keys.
This command is similar to the SCAN command, but it is designed to work in a Redis Cluster environment.
It do so by iterating over the keys in the cluster, one node at a time, while maintaining a consistent view of
the slots that are being scanned.
See https://valkey.io/commands/scan/ for more details.
Args:
cursor (ClusterScanCursor): The cursor object wrapping the scan state - when starting a new scan
creation of new empty ClusterScanCursor is needed `ClusterScanCursor()`.
match (Optional[str]): A pattern to match keys against.
count (Optional[int]): The number of keys to return in a single iteration - the amount returned can vary and
not obligated to return exactly count.
This param is just a hint to the server of how much steps to do in each iteration.
type (Optional[ObjectType]): The type of object to scan for (STRING, LIST, SET, ZSET, HASH).
Returns:
Tuple[str, List[str]]: A tuple containing the next cursor and a list of keys.
Examples:
>>> let cursor = ClusterScanCursor()
all_keys = []
while not cursor.is_finished():
cursor, keys = await client.cluster_scan(cursor, match="my_key*", count=10)
all_keys.extend(keys)
>>> let cursor = ClusterScanCursor()
all_keys = []
while not cursor.is_finished():
cursor, keys = await client.cluster_scan(cursor, type=ClusterScanObjectType.STRING)
all_keys.extend(keys)
"""
response = await self._cluster_scan(cursor, match, count, type)
casted_response = cast(
Tuple[str, List[str]],
response,
)
cursor = ClusterScanCursor(casted_response[0])
return cursor, casted_response[1]
Loading

0 comments on commit 251af1c

Please sign in to comment.