Skip to content

Commit

Permalink
Merge branch 'main' into add_clientname_support
Browse files Browse the repository at this point in the history
  • Loading branch information
ikolomi authored Dec 10, 2023
2 parents 2627187 + 46e8a9d commit 85bd7b2
Show file tree
Hide file tree
Showing 35 changed files with 1,479 additions and 474 deletions.
11 changes: 8 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@ test:
@REDISRS_SERVER_TYPE=tcp RUST_BACKTRACE=1 cargo test -p redis --no-default-features -- --nocapture --test-threads=1

@echo "===================================================================="
@echo "Testing Connection Type TCP with all features"
@echo "Testing Connection Type TCP with all features and RESP2"
@echo "===================================================================="
@REDISRS_SERVER_TYPE=tcp RUST_BACKTRACE=1 cargo test -p redis --all-features -- --nocapture --test-threads=1 --skip test_module

@echo "===================================================================="
@echo "Testing Connection Type TCP with all features and RESP3"
@echo "===================================================================="
@REDISRS_SERVER_TYPE=tcp RESP3=true cargo test -p redis --all-features -- --nocapture --test-threads=1 --skip test_module

@echo "===================================================================="
@echo "Testing Connection Type TCP with all features and Rustls support"
@echo "===================================================================="
Expand All @@ -36,12 +41,12 @@ test:
@echo "===================================================================="
@echo "Testing async-std with Rustls"
@echo "===================================================================="
@REDISRS_SERVER_TYPE=tcp RUST_BACKTRACE=1 cargo test -p redis --features=async-std-rustls-comp,cluster-async -- --nocapture --test-threads=1
@REDISRS_SERVER_TYPE=tcp RUST_BACKTRACE=1 cargo test -p redis --features=async-std-rustls-comp,cluster-async -- --nocapture --test-threads=1 --skip test_module

@echo "===================================================================="
@echo "Testing async-std with native-TLS"
@echo "===================================================================="
@REDISRS_SERVER_TYPE=tcp RUST_BACKTRACE=1 cargo test -p redis --features=async-std-native-tls-comp,cluster-async -- --nocapture --test-threads=1
@REDISRS_SERVER_TYPE=tcp RUST_BACKTRACE=1 cargo test -p redis --features=async-std-native-tls-comp,cluster-async -- --nocapture --test-threads=1 --skip test_module

@echo "===================================================================="
@echo "Testing redis-test"
Expand Down
20 changes: 12 additions & 8 deletions redis-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::collections::VecDeque;
use std::iter::FromIterator;
use std::sync::{Arc, Mutex};

use redis::{Cmd, ConnectionLike, ErrorKind, Pipeline, RedisError, RedisResult, Value};
use redis::{Cmd, ConnectionLike, ErrorKind, Pipeline, PushKind, RedisError, RedisResult, Value};

#[cfg(feature = "aio")]
use futures::{future, FutureExt};
Expand All @@ -45,26 +45,26 @@ pub trait IntoRedisValue {

impl IntoRedisValue for String {
fn into_redis_value(self) -> Value {
Value::Data(self.as_bytes().to_vec())
Value::BulkString(self.as_bytes().to_vec())
}
}

impl IntoRedisValue for &str {
fn into_redis_value(self) -> Value {
Value::Data(self.as_bytes().to_vec())
Value::BulkString(self.as_bytes().to_vec())
}
}

#[cfg(feature = "bytes")]
impl IntoRedisValue for bytes::Bytes {
fn into_redis_value(self) -> Value {
Value::Data(self.to_vec())
Value::BulkString(self.to_vec())
}
}

impl IntoRedisValue for Vec<u8> {
fn into_redis_value(self) -> Value {
Value::Data(self)
Value::BulkString(self)
}
}

Expand Down Expand Up @@ -257,6 +257,10 @@ impl ConnectionLike for MockRedisConnection {
fn is_open(&self) -> bool {
true
}

fn execute_push_message(&mut self, _kind: PushKind, _data: Vec<Value>) {
// TODO - implement handling RESP3 push messages
}
}

#[cfg(feature = "aio")]
Expand Down Expand Up @@ -311,7 +315,7 @@ mod tests {
cmd("SET").arg("bar").arg("foo").execute(&mut conn);
assert_eq!(
cmd("GET").arg("bar").query(&mut conn),
Ok(Value::Data(b"foo".as_ref().into()))
Ok(Value::BulkString(b"foo".as_ref().into()))
);
}

Expand Down Expand Up @@ -402,10 +406,10 @@ mod tests {
fn pipeline_atomic_test() {
let mut conn = MockRedisConnection::new(vec![MockCmd::with_values(
pipe().atomic().cmd("GET").arg("foo").cmd("GET").arg("bar"),
Ok(vec![Value::Bulk(
Ok(vec![Value::Array(
vec!["hello", "world"]
.into_iter()
.map(|x| Value::Data(x.as_bytes().into()))
.map(|x| Value::BulkString(x.as_bytes().into()))
.collect(),
)]),
)]);
Expand Down
2 changes: 2 additions & 0 deletions redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,12 @@ serde_json = { version = "1.0.82", optional = true }

# Optional aHash support
ahash = { version = "0.8.6", optional = true }
num-bigint = "0.4.3"

tracing = "0.1"
futures-time = { version = "3.0.0", optional = true }
arcstr = "1.1.5"
ordered-float = "4.1.1"

[features]
default = ["acl", "streams", "geospatial", "script", "keep-alive"]
Expand Down
8 changes: 4 additions & 4 deletions redis/benches/bench_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,12 @@ fn bench_decode_simple(b: &mut Bencher, input: &[u8]) {
b.iter(|| redis::parse_redis_value(input).unwrap());
}
fn bench_decode(c: &mut Criterion) {
let value = Value::Bulk(vec![
let value = Value::Array(vec![
Value::Okay,
Value::Status("testing".to_string()),
Value::Bulk(vec![]),
Value::SimpleString("testing".to_string()),
Value::Array(vec![]),
Value::Nil,
Value::Data(vec![b'a'; 10]),
Value::BulkString(vec![b'a'; 10]),
Value::Int(7512182390),
]);

Expand Down
2 changes: 1 addition & 1 deletion redis/examples/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ fn read_records(client: &redis::Client) -> RedisResult<()> {
for StreamId { id, map } in ids {
println!("\tID {id}");
for (n, s) in map {
if let Value::Data(bytes) = s {
if let Value::BulkString(bytes) = s {
println!("\t\t{}: {}", n, String::from_utf8(bytes).expect("utf8"))
} else {
panic!("Weird data")
Expand Down
30 changes: 15 additions & 15 deletions redis/src/acl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,11 @@ impl FromRedisValue for AclInfo {
let flags = flags
.as_sequence()
.ok_or_else(|| {
not_convertible_error!(flags, "Expect a bulk response of ACL flags")
not_convertible_error!(flags, "Expect an array response of ACL flags")
})?
.iter()
.map(|flag| match flag {
Value::Data(flag) => match flag.as_slice() {
Value::BulkString(flag) => match flag.as_slice() {
b"on" => Ok(Rule::On),
b"off" => Ok(Rule::Off),
b"allkeys" => Ok(Rule::AllKeys),
Expand All @@ -181,14 +181,14 @@ impl FromRedisValue for AclInfo {
let passwords = passwords
.as_sequence()
.ok_or_else(|| {
not_convertible_error!(flags, "Expect a bulk response of ACL flags")
not_convertible_error!(flags, "Expect an array response of ACL flags")
})?
.iter()
.map(|pass| Ok(Rule::AddHashedPass(String::from_redis_value(pass)?)))
.collect::<RedisResult<_>>()?;

let commands = match commands {
Value::Data(cmd) => std::str::from_utf8(cmd)?,
Value::BulkString(cmd) => std::str::from_utf8(cmd)?,
_ => {
return Err(not_convertible_error!(
commands,
Expand Down Expand Up @@ -281,18 +281,18 @@ mod tests {

#[test]
fn test_from_redis_value() {
let redis_value = Value::Bulk(vec![
Value::Data("flags".into()),
Value::Bulk(vec![
Value::Data("on".into()),
Value::Data("allchannels".into()),
let redis_value = Value::Array(vec![
Value::BulkString("flags".into()),
Value::Array(vec![
Value::BulkString("on".into()),
Value::BulkString("allchannels".into()),
]),
Value::Data("passwords".into()),
Value::Bulk(vec![]),
Value::Data("commands".into()),
Value::Data("-@all +get".into()),
Value::Data("keys".into()),
Value::Bulk(vec![Value::Data("pat:*".into())]),
Value::BulkString("passwords".into()),
Value::Array(vec![]),
Value::BulkString("commands".into()),
Value::BulkString("-@all +get".into()),
Value::BulkString("keys".into()),
Value::Array(vec![Value::BulkString("pat:*".into())]),
]);
let acl_info = AclInfo::from_redis_value(&redis_value).expect("Parse successfully");

Expand Down
109 changes: 82 additions & 27 deletions redis/src/aio/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use super::async_std;
use super::ConnectionLike;
use super::{setup_connection, AsyncStream, RedisRuntime};
use crate::cmd::{cmd, Cmd};
use crate::connection::{ConnectionAddr, ConnectionInfo, Msg, RedisConnectionInfo};
use crate::connection::{
resp2_is_pub_sub_state_cleared, resp3_is_pub_sub_state_cleared, ConnectionAddr, ConnectionInfo,
Msg, RedisConnectionInfo,
};
#[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
use crate::parser::ValueCodec;
use crate::types::{ErrorKind, FromRedisValue, RedisError, RedisFuture, RedisResult, Value};
Expand Down Expand Up @@ -36,6 +39,9 @@ pub struct Connection<C = Pin<Box<dyn AsyncStream + Send + Sync>>> {
// This flag is checked when attempting to send a command, and if it's raised, we attempt to
// exit the pubsub state before executing the new request.
pubsub: bool,

// Flag indicating whether resp3 mode is enabled.
resp3: bool,
}

fn assert_sync<T: Sync>() {}
Expand All @@ -53,13 +59,15 @@ impl<C> Connection<C> {
decoder,
db,
pubsub,
resp3,
} = self;
Connection {
con: f(con),
buf,
decoder,
db,
pubsub,
resp3,
}
}
}
Expand All @@ -77,6 +85,7 @@ where
decoder: combine::stream::Decoder::new(),
db: connection_info.db,
pubsub: false,
resp3: connection_info.use_resp3,
};
setup_connection(connection_info, &mut rv).await?;
Ok(rv)
Expand Down Expand Up @@ -143,17 +152,32 @@ where
// messages are received until the _subscription count_ in the responses reach zero.
let mut received_unsub = false;
let mut received_punsub = false;
loop {
let res: (Vec<u8>, (), isize) = from_redis_value(&self.read_response().await?)?;

match res.0.first() {
Some(&b'u') => received_unsub = true,
Some(&b'p') => received_punsub = true,
_ => (),
if self.resp3 {
while let Value::Push { kind, data } = from_redis_value(&self.read_response().await?)? {
if data.len() >= 2 {
if let Value::Int(num) = data[1] {
if resp3_is_pub_sub_state_cleared(
&mut received_unsub,
&mut received_punsub,
&kind,
num as isize,
) {
break;
}
}
}
}

if received_unsub && received_punsub && res.2 == 0 {
break;
} else {
loop {
let res: (Vec<u8>, (), isize) = from_redis_value(&self.read_response().await?)?;
if resp2_is_pub_sub_state_cleared(
&mut received_unsub,
&mut received_punsub,
&res.0,
res.2,
) {
break;
}
}
}

Expand Down Expand Up @@ -199,7 +223,17 @@ where
self.buf.clear();
cmd.write_packed_command(&mut self.buf);
self.con.write_all(&self.buf).await?;
self.read_response().await
if cmd.is_no_response() {
return Ok(Value::Nil);
}
loop {
match self.read_response().await? {
Value::Push { .. } => {
//self.execute_push_message(kind, data) //TODO
}
val => return Ok(val),
}
}
})
.boxed()
}
Expand Down Expand Up @@ -231,18 +265,28 @@ where
}

let mut rv = Vec::with_capacity(count);
for _ in 0..count {
let mut count = count;
let mut idx = 0;
while idx < count {
let response = self.read_response().await;
match response {
Ok(item) => {
rv.push(item);
// RESP3 can insert push data between command replies
if let Value::Push { .. } = item {
// if that is the case we have to extend the loop and handle push data
count += 1;
// self.execute_push_message(kind, data); //TODO
} else {
rv.push(item);
}
}
Err(err) => {
if first_err.is_none() {
first_err = Some(err);
}
}
}
idx += 1;
}

if let Some(err) = first_err {
Expand Down Expand Up @@ -275,31 +319,42 @@ where

/// Subscribes to a new channel.
pub async fn subscribe<T: ToRedisArgs>(&mut self, channel: T) -> RedisResult<()> {
cmd("SUBSCRIBE").arg(channel).query_async(&mut self.0).await
let mut cmd = cmd("SUBSCRIBE");
cmd.arg(channel);
if self.0.resp3 {
cmd.set_no_response(true);
}
cmd.query_async(&mut self.0).await
}

/// Subscribes to a new channel with a pattern.
pub async fn psubscribe<T: ToRedisArgs>(&mut self, pchannel: T) -> RedisResult<()> {
cmd("PSUBSCRIBE")
.arg(pchannel)
.query_async(&mut self.0)
.await
let mut cmd = cmd("PSUBSCRIBE");
cmd.arg(pchannel);
if self.0.resp3 {
cmd.set_no_response(true);
}
cmd.query_async(&mut self.0).await
}

/// Unsubscribes from a channel.
pub async fn unsubscribe<T: ToRedisArgs>(&mut self, channel: T) -> RedisResult<()> {
cmd("UNSUBSCRIBE")
.arg(channel)
.query_async(&mut self.0)
.await
let mut cmd = cmd("UNSUBSCRIBE");
cmd.arg(channel);
if self.0.resp3 {
cmd.set_no_response(true);
}
cmd.query_async(&mut self.0).await
}

/// Unsubscribes from a channel with a pattern.
pub async fn punsubscribe<T: ToRedisArgs>(&mut self, pchannel: T) -> RedisResult<()> {
cmd("PUNSUBSCRIBE")
.arg(pchannel)
.query_async(&mut self.0)
.await
let mut cmd = cmd("PUNSUBSCRIBE");
cmd.arg(pchannel);
if self.0.resp3 {
cmd.set_no_response(true);
}
cmd.query_async(&mut self.0).await
}

/// Returns [`Stream`] of [`Msg`]s from this [`PubSub`]s subscriptions.
Expand Down
Loading

0 comments on commit 85bd7b2

Please sign in to comment.