diff --git a/Makefile b/Makefile index 32829f4f09..f397017eb0 100644 --- a/Makefile +++ b/Makefile @@ -9,10 +9,15 @@ test: @REDISRS_SERVER_TYPE=tcp RUST_BACKTRACE=1 cargo test -p redis --no-default-features -- --nocapture --test-threads=1 @echo "====================================================================" - @echo "Testing Connection Type TCP with all features" + @echo "Testing Connection Type TCP with all features and RESP2" @echo "====================================================================" @REDISRS_SERVER_TYPE=tcp RUST_BACKTRACE=1 cargo test -p redis --all-features -- --nocapture --test-threads=1 --skip test_module + @echo "====================================================================" + @echo "Testing Connection Type TCP with all features and RESP3" + @echo "====================================================================" + @REDISRS_SERVER_TYPE=tcp RESP3=true cargo test -p redis --all-features -- --nocapture --test-threads=1 --skip test_module + @echo "====================================================================" @echo "Testing Connection Type TCP with all features and Rustls support" @echo "====================================================================" @@ -36,12 +41,12 @@ test: @echo "====================================================================" @echo "Testing async-std with Rustls" @echo "====================================================================" - @REDISRS_SERVER_TYPE=tcp RUST_BACKTRACE=1 cargo test -p redis --features=async-std-rustls-comp,cluster-async -- --nocapture --test-threads=1 + @REDISRS_SERVER_TYPE=tcp RUST_BACKTRACE=1 cargo test -p redis --features=async-std-rustls-comp,cluster-async -- --nocapture --test-threads=1 --skip test_module @echo "====================================================================" @echo "Testing async-std with native-TLS" @echo "====================================================================" - @REDISRS_SERVER_TYPE=tcp RUST_BACKTRACE=1 cargo test -p redis --features=async-std-native-tls-comp,cluster-async -- --nocapture --test-threads=1 + @REDISRS_SERVER_TYPE=tcp RUST_BACKTRACE=1 cargo test -p redis --features=async-std-native-tls-comp,cluster-async -- --nocapture --test-threads=1 --skip test_module @echo "====================================================================" @echo "Testing redis-test" diff --git a/redis-test/src/lib.rs b/redis-test/src/lib.rs index 49094e4262..4fd7965de0 100644 --- a/redis-test/src/lib.rs +++ b/redis-test/src/lib.rs @@ -27,7 +27,7 @@ use std::collections::VecDeque; use std::iter::FromIterator; use std::sync::{Arc, Mutex}; -use redis::{Cmd, ConnectionLike, ErrorKind, Pipeline, RedisError, RedisResult, Value}; +use redis::{Cmd, ConnectionLike, ErrorKind, Pipeline, PushKind, RedisError, RedisResult, Value}; #[cfg(feature = "aio")] use futures::{future, FutureExt}; @@ -45,26 +45,26 @@ pub trait IntoRedisValue { impl IntoRedisValue for String { fn into_redis_value(self) -> Value { - Value::Data(self.as_bytes().to_vec()) + Value::BulkString(self.as_bytes().to_vec()) } } impl IntoRedisValue for &str { fn into_redis_value(self) -> Value { - Value::Data(self.as_bytes().to_vec()) + Value::BulkString(self.as_bytes().to_vec()) } } #[cfg(feature = "bytes")] impl IntoRedisValue for bytes::Bytes { fn into_redis_value(self) -> Value { - Value::Data(self.to_vec()) + Value::BulkString(self.to_vec()) } } impl IntoRedisValue for Vec { fn into_redis_value(self) -> Value { - Value::Data(self) + Value::BulkString(self) } } @@ -257,6 +257,10 @@ impl ConnectionLike for MockRedisConnection { fn is_open(&self) -> bool { true } + + fn execute_push_message(&mut self, _kind: PushKind, _data: Vec) { + // TODO - implement handling RESP3 push messages + } } #[cfg(feature = "aio")] @@ -311,7 +315,7 @@ mod tests { cmd("SET").arg("bar").arg("foo").execute(&mut conn); assert_eq!( cmd("GET").arg("bar").query(&mut conn), - Ok(Value::Data(b"foo".as_ref().into())) + Ok(Value::BulkString(b"foo".as_ref().into())) ); } @@ -402,10 +406,10 @@ mod tests { fn pipeline_atomic_test() { let mut conn = MockRedisConnection::new(vec![MockCmd::with_values( pipe().atomic().cmd("GET").arg("foo").cmd("GET").arg("bar"), - Ok(vec![Value::Bulk( + Ok(vec![Value::Array( vec!["hello", "world"] .into_iter() - .map(|x| Value::Data(x.as_bytes().into())) + .map(|x| Value::BulkString(x.as_bytes().into())) .collect(), )]), )]); diff --git a/redis/Cargo.toml b/redis/Cargo.toml index 1bbd1dd5bc..11da8f562e 100644 --- a/redis/Cargo.toml +++ b/redis/Cargo.toml @@ -88,10 +88,12 @@ serde_json = { version = "1.0.82", optional = true } # Optional aHash support ahash = { version = "0.8.6", optional = true } +num-bigint = "0.4.3" tracing = "0.1" futures-time = { version = "3.0.0", optional = true } arcstr = "1.1.5" +ordered-float = "4.1.1" [features] default = ["acl", "streams", "geospatial", "script", "keep-alive"] diff --git a/redis/benches/bench_basic.rs b/redis/benches/bench_basic.rs index cfe5073674..a0e0943a0a 100644 --- a/redis/benches/bench_basic.rs +++ b/redis/benches/bench_basic.rs @@ -254,12 +254,12 @@ fn bench_decode_simple(b: &mut Bencher, input: &[u8]) { b.iter(|| redis::parse_redis_value(input).unwrap()); } fn bench_decode(c: &mut Criterion) { - let value = Value::Bulk(vec![ + let value = Value::Array(vec![ Value::Okay, - Value::Status("testing".to_string()), - Value::Bulk(vec![]), + Value::SimpleString("testing".to_string()), + Value::Array(vec![]), Value::Nil, - Value::Data(vec![b'a'; 10]), + Value::BulkString(vec![b'a'; 10]), Value::Int(7512182390), ]); diff --git a/redis/examples/streams.rs b/redis/examples/streams.rs index d22c0601ef..0fb0fb4b63 100644 --- a/redis/examples/streams.rs +++ b/redis/examples/streams.rs @@ -220,7 +220,7 @@ fn read_records(client: &redis::Client) -> RedisResult<()> { for StreamId { id, map } in ids { println!("\tID {id}"); for (n, s) in map { - if let Value::Data(bytes) = s { + if let Value::BulkString(bytes) = s { println!("\t\t{}: {}", n, String::from_utf8(bytes).expect("utf8")) } else { panic!("Weird data") diff --git a/redis/src/acl.rs b/redis/src/acl.rs index 2e2e984a7c..ef85877ba6 100644 --- a/redis/src/acl.rs +++ b/redis/src/acl.rs @@ -159,11 +159,11 @@ impl FromRedisValue for AclInfo { let flags = flags .as_sequence() .ok_or_else(|| { - not_convertible_error!(flags, "Expect a bulk response of ACL flags") + not_convertible_error!(flags, "Expect an array response of ACL flags") })? .iter() .map(|flag| match flag { - Value::Data(flag) => match flag.as_slice() { + Value::BulkString(flag) => match flag.as_slice() { b"on" => Ok(Rule::On), b"off" => Ok(Rule::Off), b"allkeys" => Ok(Rule::AllKeys), @@ -181,14 +181,14 @@ impl FromRedisValue for AclInfo { let passwords = passwords .as_sequence() .ok_or_else(|| { - not_convertible_error!(flags, "Expect a bulk response of ACL flags") + not_convertible_error!(flags, "Expect an array response of ACL flags") })? .iter() .map(|pass| Ok(Rule::AddHashedPass(String::from_redis_value(pass)?))) .collect::>()?; let commands = match commands { - Value::Data(cmd) => std::str::from_utf8(cmd)?, + Value::BulkString(cmd) => std::str::from_utf8(cmd)?, _ => { return Err(not_convertible_error!( commands, @@ -281,18 +281,18 @@ mod tests { #[test] fn test_from_redis_value() { - let redis_value = Value::Bulk(vec![ - Value::Data("flags".into()), - Value::Bulk(vec![ - Value::Data("on".into()), - Value::Data("allchannels".into()), + let redis_value = Value::Array(vec![ + Value::BulkString("flags".into()), + Value::Array(vec![ + Value::BulkString("on".into()), + Value::BulkString("allchannels".into()), ]), - Value::Data("passwords".into()), - Value::Bulk(vec![]), - Value::Data("commands".into()), - Value::Data("-@all +get".into()), - Value::Data("keys".into()), - Value::Bulk(vec![Value::Data("pat:*".into())]), + Value::BulkString("passwords".into()), + Value::Array(vec![]), + Value::BulkString("commands".into()), + Value::BulkString("-@all +get".into()), + Value::BulkString("keys".into()), + Value::Array(vec![Value::BulkString("pat:*".into())]), ]); let acl_info = AclInfo::from_redis_value(&redis_value).expect("Parse successfully"); diff --git a/redis/src/aio/connection.rs b/redis/src/aio/connection.rs index 88af088c46..2542a03f3e 100644 --- a/redis/src/aio/connection.rs +++ b/redis/src/aio/connection.rs @@ -3,7 +3,10 @@ use super::async_std; use super::ConnectionLike; use super::{setup_connection, AsyncStream, RedisRuntime}; use crate::cmd::{cmd, Cmd}; -use crate::connection::{ConnectionAddr, ConnectionInfo, Msg, RedisConnectionInfo}; +use crate::connection::{ + resp2_is_pub_sub_state_cleared, resp3_is_pub_sub_state_cleared, ConnectionAddr, ConnectionInfo, + Msg, RedisConnectionInfo, +}; #[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))] use crate::parser::ValueCodec; use crate::types::{ErrorKind, FromRedisValue, RedisError, RedisFuture, RedisResult, Value}; @@ -36,6 +39,9 @@ pub struct Connection>> { // This flag is checked when attempting to send a command, and if it's raised, we attempt to // exit the pubsub state before executing the new request. pubsub: bool, + + // Flag indicating whether resp3 mode is enabled. + resp3: bool, } fn assert_sync() {} @@ -53,6 +59,7 @@ impl Connection { decoder, db, pubsub, + resp3, } = self; Connection { con: f(con), @@ -60,6 +67,7 @@ impl Connection { decoder, db, pubsub, + resp3, } } } @@ -77,6 +85,7 @@ where decoder: combine::stream::Decoder::new(), db: connection_info.db, pubsub: false, + resp3: connection_info.use_resp3, }; setup_connection(connection_info, &mut rv).await?; Ok(rv) @@ -143,17 +152,32 @@ where // messages are received until the _subscription count_ in the responses reach zero. let mut received_unsub = false; let mut received_punsub = false; - loop { - let res: (Vec, (), isize) = from_redis_value(&self.read_response().await?)?; - - match res.0.first() { - Some(&b'u') => received_unsub = true, - Some(&b'p') => received_punsub = true, - _ => (), + if self.resp3 { + while let Value::Push { kind, data } = from_redis_value(&self.read_response().await?)? { + if data.len() >= 2 { + if let Value::Int(num) = data[1] { + if resp3_is_pub_sub_state_cleared( + &mut received_unsub, + &mut received_punsub, + &kind, + num as isize, + ) { + break; + } + } + } } - - if received_unsub && received_punsub && res.2 == 0 { - break; + } else { + loop { + let res: (Vec, (), isize) = from_redis_value(&self.read_response().await?)?; + if resp2_is_pub_sub_state_cleared( + &mut received_unsub, + &mut received_punsub, + &res.0, + res.2, + ) { + break; + } } } @@ -199,7 +223,17 @@ where self.buf.clear(); cmd.write_packed_command(&mut self.buf); self.con.write_all(&self.buf).await?; - self.read_response().await + if cmd.is_no_response() { + return Ok(Value::Nil); + } + loop { + match self.read_response().await? { + Value::Push { .. } => { + //self.execute_push_message(kind, data) //TODO + } + val => return Ok(val), + } + } }) .boxed() } @@ -231,11 +265,20 @@ where } let mut rv = Vec::with_capacity(count); - for _ in 0..count { + let mut count = count; + let mut idx = 0; + while idx < count { let response = self.read_response().await; match response { Ok(item) => { - rv.push(item); + // RESP3 can insert push data between command replies + if let Value::Push { .. } = item { + // if that is the case we have to extend the loop and handle push data + count += 1; + // self.execute_push_message(kind, data); //TODO + } else { + rv.push(item); + } } Err(err) => { if first_err.is_none() { @@ -243,6 +286,7 @@ where } } } + idx += 1; } if let Some(err) = first_err { @@ -275,31 +319,42 @@ where /// Subscribes to a new channel. pub async fn subscribe(&mut self, channel: T) -> RedisResult<()> { - cmd("SUBSCRIBE").arg(channel).query_async(&mut self.0).await + let mut cmd = cmd("SUBSCRIBE"); + cmd.arg(channel); + if self.0.resp3 { + cmd.set_no_response(true); + } + cmd.query_async(&mut self.0).await } /// Subscribes to a new channel with a pattern. pub async fn psubscribe(&mut self, pchannel: T) -> RedisResult<()> { - cmd("PSUBSCRIBE") - .arg(pchannel) - .query_async(&mut self.0) - .await + let mut cmd = cmd("PSUBSCRIBE"); + cmd.arg(pchannel); + if self.0.resp3 { + cmd.set_no_response(true); + } + cmd.query_async(&mut self.0).await } /// Unsubscribes from a channel. pub async fn unsubscribe(&mut self, channel: T) -> RedisResult<()> { - cmd("UNSUBSCRIBE") - .arg(channel) - .query_async(&mut self.0) - .await + let mut cmd = cmd("UNSUBSCRIBE"); + cmd.arg(channel); + if self.0.resp3 { + cmd.set_no_response(true); + } + cmd.query_async(&mut self.0).await } /// Unsubscribes from a channel with a pattern. pub async fn punsubscribe(&mut self, pchannel: T) -> RedisResult<()> { - cmd("PUNSUBSCRIBE") - .arg(pchannel) - .query_async(&mut self.0) - .await + let mut cmd = cmd("PUNSUBSCRIBE"); + cmd.arg(pchannel); + if self.0.resp3 { + cmd.set_no_response(true); + } + cmd.query_async(&mut self.0).await } /// Returns [`Stream`] of [`Msg`]s from this [`PubSub`]s subscriptions. diff --git a/redis/src/aio/mod.rs b/redis/src/aio/mod.rs index 83207fb95c..b64f06d3b6 100644 --- a/redis/src/aio/mod.rs +++ b/redis/src/aio/mod.rs @@ -1,6 +1,6 @@ //! Adds async IO support to redis. use crate::cmd::{cmd, Cmd}; -use crate::connection::RedisConnectionInfo; +use crate::connection::{get_resp3_hello_command_error, RedisConnectionInfo}; use crate::types::{ErrorKind, RedisFuture, RedisResult, Value}; use ::tokio::io::{AsyncRead, AsyncWrite}; use async_trait::async_trait; @@ -84,7 +84,13 @@ async fn setup_connection(connection_info: &RedisConnectionInfo, con: &mut C) where C: ConnectionLike, { - if let Some(password) = &connection_info.password { + if connection_info.use_resp3 { + let hello_cmd = resp3_hello(connection_info); + let val: RedisResult = hello_cmd.query_async(con).await; + if let Err(err) = val { + return Err(get_resp3_hello_command_error(err)); + } + } else if let Some(password) = &connection_info.password { let mut command = cmd("AUTH"); if let Some(username) = &connection_info.username { command.arg(username); @@ -152,4 +158,5 @@ mod connection_manager; #[cfg(feature = "connection-manager")] pub use connection_manager::*; mod runtime; +use crate::commands::resp3_hello; pub(super) use runtime::*; diff --git a/redis/src/aio/multiplexed_connection.rs b/redis/src/aio/multiplexed_connection.rs index dfb2f006ff..8e933b90c9 100644 --- a/redis/src/aio/multiplexed_connection.rs +++ b/redis/src/aio/multiplexed_connection.rs @@ -129,11 +129,11 @@ where match result { Ok(item) => { entry.buffer = Some(match entry.buffer.take() { - Some(Value::Bulk(mut values)) => { + Some(Value::Array(mut values)) => { values.push(item); - Value::Bulk(values) + Value::Array(values) } - Some(value) => Value::Bulk(vec![value, item]), + Some(value) => Value::Array(vec![value, item]), None => item, }); } @@ -154,7 +154,7 @@ where let entry = self_.in_flight.pop_front().unwrap(); let response = match entry.first_err { Some(err) => Err(err), - None => Ok(entry.buffer.unwrap_or(Value::Bulk(vec![]))), + None => Ok(entry.buffer.unwrap_or(Value::Array(vec![]))), }; // `Err` means that the receiver was dropped in which case it does not @@ -396,7 +396,7 @@ impl MultiplexedConnection { })?; match value { - Value::Bulk(mut values) => { + Value::Array(mut values) => { values.drain(..offset); Ok(values) } diff --git a/redis/src/client.rs b/redis/src/client.rs index 98822d6305..8f4d7cfdb9 100644 --- a/redis/src/client.rs +++ b/redis/src/client.rs @@ -1,5 +1,6 @@ use std::time::Duration; +#[cfg(feature = "aio")] use std::net::IpAddr; #[cfg(feature = "aio")] use std::net::SocketAddr; @@ -481,6 +482,7 @@ impl Client { #[cfg(feature = "aio")] use crate::aio::Runtime; +use crate::types::PushKind; impl ConnectionLike for Client { fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult { @@ -516,6 +518,10 @@ impl ConnectionLike for Client { false } } + + fn execute_push_message(&mut self, _kind: PushKind, _data: Vec) { + // TODO - implement handling RESP3 push messages + } } #[cfg(test)] diff --git a/redis/src/cluster.rs b/redis/src/cluster.rs index 5bf5bdf047..b1a1c561e1 100644 --- a/redis/src/cluster.rs +++ b/redis/src/cluster.rs @@ -54,12 +54,12 @@ use crate::connection::{ }; use crate::parser::parse_redis_value; use crate::types::{ErrorKind, HashMap, RedisError, RedisResult, Value}; -use crate::IntoConnectionInfo; pub use crate::TlsMode; // Pub for backwards compatibility use crate::{ cluster_client::ClusterParams, cluster_routing::{Redirect, Route, RoutingInfo}, }; +use crate::{IntoConnectionInfo, PushKind}; pub use crate::cluster_client::{ClusterClient, ClusterClientBuilder}; pub use crate::cluster_pipeline::{cluster_pipe, ClusterPipeline}; @@ -131,7 +131,7 @@ impl From for Value { fn from(value: Output) -> Self { match value { Output::Single(value) => value, - Output::Multi(values) => Value::Bulk(values), + Output::Multi(values) => Value::Array(values), } } } @@ -593,11 +593,11 @@ where .map(|(index, result)| { let addr = addresses[index]; result.map(|val| { - Value::Bulk(vec![Value::Data(addr.as_bytes().to_vec()), val]) + Value::Array(vec![Value::BulkString(addr.as_bytes().to_vec()), val]) }) }) .collect::>>()?; - Ok(Value::Bulk(results)) + Ok(Value::Array(results)) } } } @@ -847,6 +847,10 @@ impl ConnectionLike for ClusterConnection { } true } + + fn execute_push_message(&mut self, _kind: PushKind, _data: Vec) { + // TODO - implement handling RESP3 push messages + } } #[derive(Debug)] diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index 6db0e2808a..a74ec91798 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -741,13 +741,13 @@ where // TODO - once Value::Error is merged, we can use join_all and report separate errors and also pass successes. future::try_join_all(receivers.into_iter().map(|(addr, receiver)| async move { let result = convert_result(receiver.await)?; - Ok(Value::Bulk(vec![ - Value::Data(addr.as_bytes().to_vec()), + Ok(Value::Array(vec![ + Value::BulkString(addr.as_bytes().to_vec()), result, ])) })) .await - .map(Value::Bulk) + .map(Value::Array) } } } diff --git a/redis/src/cluster_pipeline.rs b/redis/src/cluster_pipeline.rs index 14f4fd9291..71e5d8fda1 100644 --- a/redis/src/cluster_pipeline.rs +++ b/redis/src/cluster_pipeline.rs @@ -120,7 +120,7 @@ impl ClusterPipeline { from_redis_value( &(if self.commands.is_empty() { - Value::Bulk(vec![]) + Value::Array(vec![]) } else { self.make_pipeline_results(con.execute_pipeline(self)?) }), diff --git a/redis/src/cluster_routing.rs b/redis/src/cluster_routing.rs index 67fd23b12f..f8e5f5658d 100644 --- a/redis/src/cluster_routing.rs +++ b/redis/src/cluster_routing.rs @@ -134,7 +134,7 @@ pub(crate) fn logical_aggregate(values: Vec, op: LogicalAggregateOp) -> R }; let results = values.into_iter().try_fold(Vec::new(), |acc, curr| { let values = match curr { - Value::Bulk(values) => values, + Value::Array(values) => values, _ => { return RedisResult::Err( ( @@ -167,7 +167,7 @@ pub(crate) fn logical_aggregate(values: Vec, op: LogicalAggregateOp) -> R } Ok(acc) })?; - Ok(Value::Bulk( + Ok(Value::Array( results .into_iter() .map(|result| Value::Int(result as i64)) @@ -180,14 +180,14 @@ pub(crate) fn combine_array_results(values: Vec) -> RedisResult { for value in values { match value { - Value::Bulk(values) => results.extend(values), + Value::Array(values) => results.extend(values), _ => { return Err((ErrorKind::TypeError, "expected array of values as response").into()); } } } - Ok(Value::Bulk(results)) + Ok(Value::Array(results)) } /// Combines multiple call results in the `values` field, each assume to be an array of results, @@ -201,7 +201,7 @@ pub(crate) fn combine_and_sort_array_results<'a>( let mut results = Vec::new(); results.resize( values.iter().fold(0, |acc, value| match value { - Value::Bulk(values) => values.len() + acc, + Value::Array(values) => values.len() + acc, _ => 0, }), Value::Nil, @@ -210,7 +210,7 @@ pub(crate) fn combine_and_sort_array_results<'a>( for (key_indices, value) in sorting_order.into_iter().zip(values) { match value { - Value::Bulk(values) => { + Value::Array(values) => { assert_eq!(values.len(), key_indices.len()); for (index, value) in key_indices.iter().zip(values) { results[*index] = value; @@ -222,7 +222,7 @@ pub(crate) fn combine_and_sort_array_results<'a>( } } - Ok(Value::Bulk(results)) + Ok(Value::Array(results)) } fn get_route(is_readonly: bool, key: &[u8]) -> Route { @@ -564,8 +564,8 @@ impl Routable for Cmd { impl Routable for Value { fn arg_idx(&self, idx: usize) -> Option<&[u8]> { match self { - Value::Bulk(args) => match args.get(idx) { - Some(Value::Data(ref data)) => Some(&data[..]), + Value::Array(args) => match args.get(idx) { + Some(Value::BulkString(ref data)) => Some(&data[..]), _ => None, }, _ => None, @@ -574,8 +574,8 @@ impl Routable for Value { fn position(&self, candidate: &[u8]) -> Option { match self { - Value::Bulk(args) => args.iter().position(|a| match a { - Value::Data(d) => d.eq_ignore_ascii_case(candidate), + Value::Array(args) => args.iter().position(|a| match a { + Value::BulkString(d) => d.eq_ignore_ascii_case(candidate), _ => false, }), _ => None, @@ -1004,12 +1004,12 @@ mod tests { #[test] fn test_combining_results_into_single_array() { - let res1 = Value::Bulk(vec![Value::Nil, Value::Okay]); - let res2 = Value::Bulk(vec![ - Value::Data("1".as_bytes().to_vec()), - Value::Data("4".as_bytes().to_vec()), + let res1 = Value::Array(vec![Value::Nil, Value::Okay]); + let res2 = Value::Array(vec![ + Value::BulkString("1".as_bytes().to_vec()), + Value::BulkString("4".as_bytes().to_vec()), ]); - let res3 = Value::Bulk(vec![Value::Status("2".to_string()), Value::Int(3)]); + let res3 = Value::Array(vec![Value::SimpleString("2".to_string()), Value::Int(3)]); let results = super::combine_and_sort_array_results( vec![res1, res2, res3], [vec![0, 5], vec![1, 4], vec![2, 3]].iter(), @@ -1017,12 +1017,12 @@ mod tests { assert_eq!( results.unwrap(), - Value::Bulk(vec![ + Value::Array(vec![ Value::Nil, - Value::Data("1".as_bytes().to_vec()), - Value::Status("2".to_string()), + Value::BulkString("1".as_bytes().to_vec()), + Value::SimpleString("2".to_string()), Value::Int(3), - Value::Data("4".as_bytes().to_vec()), + Value::BulkString("4".as_bytes().to_vec()), Value::Okay, ]) ); diff --git a/redis/src/cluster_topology.rs b/redis/src/cluster_topology.rs index 150ec16650..a9192cd18e 100644 --- a/redis/src/cluster_topology.rs +++ b/redis/src/cluster_topology.rs @@ -226,9 +226,9 @@ pub(crate) fn parse_slots(raw_slot_resp: &Value, tls: Option) -> RedisR // Parse response. let mut result = Vec::with_capacity(2); - if let Value::Bulk(items) = raw_slot_resp { + if let Value::Array(items) = raw_slot_resp { let mut iter = items.iter(); - while let Some(Value::Bulk(item)) = iter.next() { + while let Some(Value::Array(item)) = iter.next() { if item.len() < 3 { continue; } @@ -249,12 +249,12 @@ pub(crate) fn parse_slots(raw_slot_resp: &Value, tls: Option) -> RedisR .iter() .skip(2) .filter_map(|node| { - if let Value::Bulk(node) = node { + if let Value::Array(node) = node { if node.len() < 2 { return None; } - let ip = if let Value::Data(ref ip) = node[0] { + let ip = if let Value::BulkString(ref ip) = node[0] { String::from_utf8_lossy(ip) } else { return None; @@ -423,54 +423,54 @@ mod tests { } fn get_view(view_type: &ViewType) -> Value { match view_type { - ViewType::SingleNodeViewFullCoverage => Value::Bulk(vec![Value::Bulk(vec![ + ViewType::SingleNodeViewFullCoverage => Value::Array(vec![Value::Array(vec![ Value::Int(0_i64), Value::Int(16383_i64), - Value::Bulk(vec![ - Value::Data("node1".as_bytes().to_vec()), + Value::Array(vec![ + Value::BulkString("node1".as_bytes().to_vec()), Value::Int(6379_i64), ]), ])]), - ViewType::SingleNodeViewMissingSlots => Value::Bulk(vec![Value::Bulk(vec![ + ViewType::SingleNodeViewMissingSlots => Value::Array(vec![Value::Array(vec![ Value::Int(0_i64), Value::Int(4000_i64), - Value::Bulk(vec![ - Value::Data("node1".as_bytes().to_vec()), + Value::Array(vec![ + Value::BulkString("node1".as_bytes().to_vec()), Value::Int(6379_i64), ]), ])]), - ViewType::TwoNodesViewFullCoverage => Value::Bulk(vec![ - Value::Bulk(vec![ + ViewType::TwoNodesViewFullCoverage => Value::Array(vec![ + Value::Array(vec![ Value::Int(0_i64), Value::Int(4000_i64), - Value::Bulk(vec![ - Value::Data("node1".as_bytes().to_vec()), + Value::Array(vec![ + Value::BulkString("node1".as_bytes().to_vec()), Value::Int(6379_i64), ]), ]), - Value::Bulk(vec![ + Value::Array(vec![ Value::Int(4001_i64), Value::Int(16383_i64), - Value::Bulk(vec![ - Value::Data("node2".as_bytes().to_vec()), + Value::Array(vec![ + Value::BulkString("node2".as_bytes().to_vec()), Value::Int(6380_i64), ]), ]), ]), - ViewType::TwoNodesViewMissingSlots => Value::Bulk(vec![ - Value::Bulk(vec![ + ViewType::TwoNodesViewMissingSlots => Value::Array(vec![ + Value::Array(vec![ Value::Int(0_i64), Value::Int(3000_i64), - Value::Bulk(vec![ - Value::Data("node3".as_bytes().to_vec()), + Value::Array(vec![ + Value::BulkString("node3".as_bytes().to_vec()), Value::Int(6381_i64), ]), ]), - Value::Bulk(vec![ + Value::Array(vec![ Value::Int(4001_i64), Value::Int(16383_i64), - Value::Bulk(vec![ - Value::Data("node4".as_bytes().to_vec()), + Value::Array(vec![ + Value::BulkString("node4".as_bytes().to_vec()), Value::Int(6382_i64), ]), ]), diff --git a/redis/src/cmd.rs b/redis/src/cmd.rs index 2223f8607c..d0b57e3491 100644 --- a/redis/src/cmd.rs +++ b/redis/src/cmd.rs @@ -28,6 +28,8 @@ pub struct Cmd { // Arg::Simple contains the offset that marks the end of the argument args: Vec>, cursor: Option, + // If it's true command's response won't be read from socket. Useful for Pub/Sub. + no_response: bool, } /// Represents a redis iterator. @@ -324,6 +326,7 @@ impl Cmd { data: vec![], args: vec![], cursor: None, + no_response: false, } } @@ -333,6 +336,7 @@ impl Cmd { data: Vec::with_capacity(size_of_data), args: Vec::with_capacity(arg_count), cursor: None, + no_response: false, } } @@ -564,6 +568,19 @@ impl Cmd { } Some(&self.data[start..end]) } + + /// Client won't read and wait for results. Currently only used for Pub/Sub commands in RESP3. + #[inline] + pub fn set_no_response(&mut self, nr: bool) -> &mut Cmd { + self.no_response = nr; + self + } + + /// Check whether command's result will be waited for. + #[inline] + pub fn is_no_response(&self) -> bool { + self.no_response + } } /// Shortcut function to creating a command with a single argument. diff --git a/redis/src/commands/mod.rs b/redis/src/commands/mod.rs index 86a6dac535..1080850cb4 100644 --- a/redis/src/commands/mod.rs +++ b/redis/src/commands/mod.rs @@ -29,6 +29,7 @@ use crate::streams; #[cfg(feature = "acl")] use crate::acl; +use crate::RedisConnectionInfo; implement_commands! { 'a @@ -2155,3 +2156,20 @@ impl ToRedisArgs for SetOptions { } } } + +/// Creates HELLO command for RESP3 with RedisConnectionInfo +pub fn resp3_hello(connection_info: &RedisConnectionInfo) -> Cmd{ + let mut hello_cmd = cmd("HELLO"); + hello_cmd.arg("3"); + if connection_info.password.is_some() { + let username:&str = match connection_info.username.as_ref() { + None => "default", + Some(username) => username + }; + hello_cmd + .arg("AUTH") + .arg(username) + .arg(connection_info.password.as_ref().unwrap()); + } + hello_cmd +} diff --git a/redis/src/connection.rs b/redis/src/connection.rs index 982508225d..92e249dfa1 100644 --- a/redis/src/connection.rs +++ b/redis/src/connection.rs @@ -11,14 +11,17 @@ use crate::cmd::{cmd, pipe, Cmd}; use crate::parser::Parser; use crate::pipeline::Pipeline; use crate::types::{ - from_redis_value, ErrorKind, FromRedisValue, RedisError, RedisResult, ToRedisArgs, Value, + from_redis_value, ErrorKind, FromRedisValue, PushKind, RedisError, RedisResult, ToRedisArgs, + Value, }; #[cfg(unix)] use crate::types::HashMap; #[cfg(unix)] use std::os::unix::net::UnixStream; +use std::vec::IntoIter; +use crate::commands::resp3_hello; #[cfg(all(feature = "tls-native-tls", not(feature = "tls-rustls")))] use native_tls::{TlsConnector, TlsStream}; @@ -222,6 +225,8 @@ pub struct RedisConnectionInfo { pub username: Option, /// Optionally a password that should be used for connection. pub password: Option, + /// Use RESP 3 mode, Redis 6 or newer is required. + pub use_resp3: bool, } impl FromStr for ConnectionInfo { @@ -346,6 +351,7 @@ fn url_to_tcp_connection_info(url: url::Url) -> RedisResult { } else { ConnectionAddr::Tcp(host, port) }; + let query: HashMap<_, _> = url.query_pairs().collect(); Ok(ConnectionInfo { addr, redis: RedisConnectionInfo { @@ -377,6 +383,10 @@ fn url_to_tcp_connection_info(url: url::Url) -> RedisResult { }, None => None, }, + use_resp3: match query.get("resp3") { + Some(v) => v == "true", + _ => false, + }, }, }) } @@ -399,6 +409,10 @@ fn url_to_unix_connection_info(url: url::Url) -> RedisResult { }, username: query.get("user").map(|username| username.to_string()), password: query.get("pass").map(|password| password.to_string()), + use_resp3: match query.get("resp3") { + Some(v) => v == "true", + _ => false, + }, }, }) } @@ -486,6 +500,9 @@ pub struct Connection { /// This flag is checked when attempting to send a command, and if it's raised, we attempt to /// exit the pubsub state before executing the new request. pubsub: bool, + + // Flag indicating whether resp3 mode is enabled. + resp3: bool, } /// Represents a pubsub connection. @@ -937,12 +954,18 @@ fn setup_connection( parser: Parser::new(), db: connection_info.db, pubsub: false, + resp3: connection_info.use_resp3, }; - if connection_info.password.is_some() { + if connection_info.use_resp3 { + let hello_cmd = resp3_hello(connection_info); + let val: RedisResult = hello_cmd.query(&mut rv); + if let Err(err) = val { + return Err(get_resp3_hello_command_error(err)); + } + } else if connection_info.password.is_some() { connect_auth(&mut rv, connection_info)?; } - if connection_info.db != 0 { match cmd("SELECT") .arg(connection_info.db) @@ -1017,6 +1040,9 @@ pub trait ConnectionLike { /// sockets the connection is open until writing a command failed with a /// `BrokenPipe` error. fn is_open(&self) -> bool; + + /// Executes received push message from server. + fn execute_push_message(&mut self, kind: PushKind, data: Vec); } /// A connection is an object that represents a single redis connection. It @@ -1111,17 +1137,32 @@ impl Connection { // messages are received until the _subscription count_ in the responses reach zero. let mut received_unsub = false; let mut received_punsub = false; - loop { - let res: (Vec, (), isize) = from_redis_value(&self.recv_response()?)?; - - match res.0.first() { - Some(&b'u') => received_unsub = true, - Some(&b'p') => received_punsub = true, - _ => (), + if self.resp3 { + while let Value::Push { kind, data } = from_redis_value(&self.recv_response()?)? { + if data.len() >= 2 { + if let Value::Int(num) = data[1] { + if resp3_is_pub_sub_state_cleared( + &mut received_unsub, + &mut received_punsub, + &kind, + num as isize, + ) { + break; + } + } + } } - - if received_unsub && received_punsub && res.2 == 0 { - break; + } else { + loop { + let res: (Vec, (), isize) = from_redis_value(&self.recv_response()?)?; + if resp2_is_pub_sub_state_cleared( + &mut received_unsub, + &mut received_punsub, + &res.0, + res.2, + ) { + break; + } } } @@ -1186,13 +1227,36 @@ impl Connection { } impl ConnectionLike for Connection { + /// Sends a [Cmd] into the TCP socket and reads a single response from it. + fn req_command(&mut self, cmd: &Cmd) -> RedisResult { + let pcmd = cmd.get_packed_command(); + if self.pubsub { + self.exit_pubsub()?; + } + + self.con.send_bytes(&pcmd)?; + if cmd.is_no_response() { + return Ok(Value::Nil); + } + loop { + match self.read_response()? { + Value::Push { kind, data } => self.execute_push_message(kind, data), + val => return Ok(val), + } + } + } fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult { if self.pubsub { self.exit_pubsub()?; } self.con.send_bytes(cmd)?; - self.read_response() + loop { + match self.read_response()? { + Value::Push { kind, data } => self.execute_push_message(kind, data), + val => return Ok(val), + } + } } fn req_packed_commands( @@ -1207,7 +1271,9 @@ impl ConnectionLike for Connection { self.con.send_bytes(cmd)?; let mut rv = vec![]; let mut first_err = None; - for idx in 0..(offset + count) { + let mut count = count; + let mut idx = 0; + while idx < (offset + count) { // When processing a transaction, some responses may be errors. // We need to keep processing the rest of the responses in that case, // so bailing early with `?` would not be correct. @@ -1215,7 +1281,12 @@ impl ConnectionLike for Connection { let response = self.read_response(); match response { Ok(item) => { - if idx >= offset { + // RESP3 can insert push data between command replies + if let Value::Push { kind, data } = item { + // if that is the case we have to extend the loop and handle push data + count += 1; + self.execute_push_message(kind, data); + } else if idx >= offset { rv.push(item); } } @@ -1225,6 +1296,7 @@ impl ConnectionLike for Connection { } } } + idx += 1; } first_err.map_or(Ok(rv), Err) @@ -1234,12 +1306,17 @@ impl ConnectionLike for Connection { self.db } + fn check_connection(&mut self) -> bool { + cmd("PING").query::(self).is_ok() + } + fn is_open(&self) -> bool { self.con.is_open() } - fn check_connection(&mut self) -> bool { - cmd("PING").query::(self).is_ok() + /// Executes received push message from server. + fn execute_push_message(&mut self, _kind: PushKind, _data: Vec) { + // TODO - implement handling RESP3 push message } } @@ -1280,6 +1357,11 @@ where fn is_open(&self) -> bool { self.deref().is_open() } + + /// Executes received push message from server. + fn execute_push_message(&mut self, _kind: PushKind, _data: Vec) { + // TODO - implement handling RESP3 push messages + } } /// The pubsub object provides convenient access to the redis pubsub @@ -1311,8 +1393,11 @@ impl<'a> PubSub<'a> { } } - fn cache_messages_until_received_response(&mut self, cmd: &Cmd) -> RedisResult<()> { - let mut response = self.con.req_packed_command(&cmd.get_packed_command())?; + fn cache_messages_until_received_response(&mut self, cmd: &mut Cmd) -> RedisResult<()> { + if self.con.resp3 { + cmd.set_no_response(true); + } + let mut response = cmd.query(self.con)?; loop { if let Some(msg) = Msg::from_value(&response) { self.waiting_messages.push_back(msg); @@ -1333,11 +1418,6 @@ impl<'a> PubSub<'a> { self.cache_messages_until_received_response(cmd("PSUBSCRIBE").arg(pchannel)) } - /// Unsubscribes from a channel. - pub fn unsubscribe(&mut self, channel: T) -> RedisResult<()> { - self.cache_messages_until_received_response(cmd("UNSUBSCRIBE").arg(channel)) - } - /// Unsubscribes from a channel with a pattern. pub fn punsubscribe(&mut self, pchannel: T) -> RedisResult<()> { self.cache_messages_until_received_response(cmd("PUNSUBSCRIBE").arg(pchannel)) @@ -1382,25 +1462,39 @@ impl<'a> Drop for PubSub<'a> { /// connection. It only contains actual message data. impl Msg { /// Tries to convert provided [`Value`] into [`Msg`]. + #[allow(clippy::unnecessary_to_owned)] pub fn from_value(value: &Value) -> Option { - let raw_msg: Vec = from_redis_value(value).ok()?; - let mut iter = raw_msg.into_iter(); - let msg_type: String = from_redis_value(&iter.next()?).ok()?; let mut pattern = None; let payload; let channel; - if msg_type == "message" { - channel = iter.next()?; - payload = iter.next()?; - } else if msg_type == "pmessage" { - pattern = Some(iter.next()?); - channel = iter.next()?; - payload = iter.next()?; + if let Value::Push { kind, data } = value { + let mut iter: IntoIter = data.to_vec().into_iter(); + if kind == &PushKind::Message { + channel = iter.next()?; + payload = iter.next()?; + } else if kind == &PushKind::PMessage { + pattern = Some(iter.next()?); + channel = iter.next()?; + payload = iter.next()?; + } else { + return None; + } } else { - return None; - } - + let raw_msg: Vec = from_redis_value(value).ok()?; + let mut iter = raw_msg.into_iter(); + let msg_type: String = from_redis_value(&iter.next()?).ok()?; + if msg_type == "message" { + channel = iter.next()?; + payload = iter.next()?; + } else if msg_type == "pmessage" { + pattern = Some(iter.next()?); + channel = iter.next()?; + payload = iter.next()?; + } else { + return None; + } + }; Some(Msg { payload, channel, @@ -1419,7 +1513,7 @@ impl Msg { /// not happen) then the return value is `"?"`. pub fn get_channel_name(&self) -> &str { match self.channel { - Value::Data(ref bytes) => from_utf8(bytes).unwrap_or("?"), + Value::BulkString(ref bytes) => from_utf8(bytes).unwrap_or("?"), _ => "?", } } @@ -1434,7 +1528,7 @@ impl Msg { /// in the raw bytes in it. pub fn get_payload_bytes(&self) -> &[u8] { match self.payload { - Value::Data(ref bytes) => bytes, + Value::BulkString(ref bytes) => bytes, _ => b"", } } @@ -1518,6 +1612,51 @@ pub fn transaction< } } } +//TODO: for both clearing logic support sharded channels. + +/// Common logic for clearing subscriptions in RESP2 async/sync +pub fn resp2_is_pub_sub_state_cleared( + received_unsub: &mut bool, + received_punsub: &mut bool, + kind: &[u8], + num: isize, +) -> bool { + match kind.first() { + Some(&b'u') => *received_unsub = true, + Some(&b'p') => *received_punsub = true, + _ => (), + }; + *received_unsub && *received_punsub && num == 0 +} + +/// Common logic for clearing subscriptions in RESP3 async/sync +pub fn resp3_is_pub_sub_state_cleared( + received_unsub: &mut bool, + received_punsub: &mut bool, + kind: &PushKind, + num: isize, +) -> bool { + match kind { + PushKind::Unsubscribe => *received_unsub = true, + PushKind::PUnsubscribe => *received_punsub = true, + _ => (), + }; + *received_unsub && *received_punsub && num == 0 +} + +/// Common logic for checking real cause of hello3 command error +pub fn get_resp3_hello_command_error(err: RedisError) -> RedisError { + if let Some(detail) = err.detail() { + if detail.starts_with("unknown command `HELLO`") { + return ( + ErrorKind::RESP3NotSupported, + "Redis Server doesn't support HELLO command therefore resp3 cannot be used", + ) + .into(); + } + } + err +} #[cfg(test)] mod tests { @@ -1568,6 +1707,7 @@ mod tests { db: 2, username: Some("%johndoe%".to_string()), password: Some("#@<>$".to_string()), + use_resp3: false, }, }, ), @@ -1634,6 +1774,7 @@ mod tests { db: 0, username: None, password: None, + use_resp3: false, }, }, ), @@ -1645,6 +1786,7 @@ mod tests { db: 1, username: None, password: None, + use_resp3: false, }, }, ), @@ -1659,6 +1801,7 @@ mod tests { db: 2, username: Some("%johndoe%".to_string()), password: Some("#@<>$".to_string()), + use_resp3: false, }, }, ), @@ -1673,6 +1816,7 @@ mod tests { db: 2, username: Some("%johndoe%".to_string()), password: Some("&?= *+".to_string()), + use_resp3: false, }, }, ), diff --git a/redis/src/geo.rs b/redis/src/geo.rs index fd1ac47c49..6195264a7c 100644 --- a/redis/src/geo.rs +++ b/redis/src/geo.rs @@ -263,7 +263,7 @@ impl FromRedisValue for RadiusSearchResult { } // Try to parse the result from multitple values - if let Value::Bulk(ref items) = *v { + if let Value::Array(ref items) = *v { if let Some(result) = RadiusSearchResult::parse_multi_values(items) { return Ok(result); } diff --git a/redis/src/lib.rs b/redis/src/lib.rs index ff97aff034..0e2ae31a91 100644 --- a/redis/src/lib.rs +++ b/redis/src/lib.rs @@ -405,6 +405,8 @@ pub use crate::types::{ // low level values Value, + PushKind, + VerbatimFormat }; #[cfg(feature = "aio")] diff --git a/redis/src/parser.rs b/redis/src/parser.rs index 8740830e41..7bb8276dc3 100644 --- a/redis/src/parser.rs +++ b/redis/src/parser.rs @@ -1,9 +1,12 @@ +use std::vec::IntoIter; use std::{ io::{self, Read}, str, }; -use crate::types::{make_extension_error, ErrorKind, RedisError, RedisResult, Value}; +use crate::types::{ + make_extension_error, ErrorKind, PushKind, RedisError, RedisResult, Value, VerbatimFormat, +}; use combine::{ any, @@ -17,6 +20,7 @@ use combine::{ stream::{PointerOffset, RangeStream, StreamErrorFor}, ParseError, Parser as _, }; +use num_bigint::BigInt; struct ResultExtend(Result); @@ -55,6 +59,46 @@ where const MAX_RECURSE_DEPTH: usize = 100; +fn err_parser(line: &str) -> RedisError { + let desc = "An error was signalled by the server"; + let mut pieces = line.splitn(2, ' '); + let kind = match pieces.next().unwrap() { + "ERR" => ErrorKind::ResponseError, + "EXECABORT" => ErrorKind::ExecAbortError, + "LOADING" => ErrorKind::BusyLoadingError, + "NOSCRIPT" => ErrorKind::NoScriptError, + "MOVED" => ErrorKind::Moved, + "ASK" => ErrorKind::Ask, + "TRYAGAIN" => ErrorKind::TryAgain, + "CLUSTERDOWN" => ErrorKind::ClusterDown, + "CROSSSLOT" => ErrorKind::CrossSlot, + "MASTERDOWN" => ErrorKind::MasterDown, + "READONLY" => ErrorKind::ReadOnly, + "NOTBUSY" => ErrorKind::NotBusy, + code => return make_extension_error(code, pieces.next()), + }; + match pieces.next() { + Some(detail) => RedisError::from((kind, desc, detail.to_string())), + None => RedisError::from((kind, desc)), + } +} + +pub fn get_push_kind(kind: String) -> PushKind { + match kind.as_str() { + "invalidate" => PushKind::Invalidate, + "message" => PushKind::Message, + "pmessage" => PushKind::PMessage, + "smessage" => PushKind::SMessage, + "unsubscribe" => PushKind::Unsubscribe, + "punsubscribe" => PushKind::PUnsubscribe, + "sunsubscribe" => PushKind::SUnsubscribe, + "subscribe" => PushKind::Subscribe, + "psubscribe" => PushKind::PSubscribe, + "ssubscribe" => PushKind::SSubscribe, + _ => PushKind::Other(kind), + } +} + fn value<'a, I>( count: Option, ) -> impl combine::Parser, PartialState = AnySendSyncPartialState> @@ -83,12 +127,12 @@ where ) }; - let status = || { + let simple_string = || { line().map(|line| { if line == "OK" { Value::Okay } else { - Value::Status(line.into()) + Value::SimpleString(line.into()) } }) }; @@ -108,58 +152,175 @@ where combine::value(Value::Nil).left() } else { take(*size as usize) - .map(|bs: &[u8]| Value::Data(bs.to_vec())) + .map(|bs: &[u8]| Value::BulkString(bs.to_vec())) .skip(crlf()) .right() } }) }; + let blob = || { + int().then_partial(move |size| { + take(*size as usize) + .map(|bs: &[u8]| String::from_utf8_lossy(bs).to_string()) + .skip(crlf()) + }) + }; - let bulk = || { + let array = || { int().then_partial(move |&mut length| { if length < 0 { combine::value(Value::Nil).map(Ok).left() } else { let length = length as usize; combine::count_min_max(length, length, value(Some(count + 1))) - .map(|result: ResultExtend<_, _>| result.0.map(Value::Bulk)) + .map(|result: ResultExtend<_, _>| result.0.map(Value::Array)) .right() } }) }; - let error = || { - line().map(|line: &str| { - let desc = "An error was signalled by the server"; - let mut pieces = line.splitn(2, ' '); - let kind = match pieces.next().unwrap() { - "ERR" => ErrorKind::ResponseError, - "EXECABORT" => ErrorKind::ExecAbortError, - "LOADING" => ErrorKind::BusyLoadingError, - "NOSCRIPT" => ErrorKind::NoScriptError, - "MOVED" => ErrorKind::Moved, - "ASK" => ErrorKind::Ask, - "TRYAGAIN" => ErrorKind::TryAgain, - "CLUSTERDOWN" => ErrorKind::ClusterDown, - "CROSSSLOT" => ErrorKind::CrossSlot, - "MASTERDOWN" => ErrorKind::MasterDown, - "READONLY" => ErrorKind::ReadOnly, - "NOTBUSY" => ErrorKind::NotBusy, - code => return make_extension_error(code, pieces.next()), - }; - match pieces.next() { - Some(detail) => RedisError::from((kind, desc, detail.to_string())), - None => RedisError::from((kind, desc)), + let error = || line().map(err_parser); + let map = || { + int().then_partial(move |&mut kv_length| { + let length = kv_length as usize * 2; + combine::count_min_max(length, length, value(Some(count + 1))).map( + move |result: ResultExtend, _>| { + let mut it: IntoIter = result.0?.into_iter(); + let mut x = vec![]; + for _ in 0..kv_length { + if let (Some(k), Some(v)) = (it.next(), it.next()) { + x.push((k, v)) + } + } + Ok(Value::Map(x)) + }, + ) + }) + }; + let attribute = || { + int().then_partial(move |&mut kv_length| { + // + 1 is for data! + let length = kv_length as usize * 2 + 1; + combine::count_min_max(length, length, value(Some(count + 1))).map( + move |result: ResultExtend, _>| { + let mut it: IntoIter = result.0?.into_iter(); + let mut attributes = vec![]; + for _ in 0..kv_length { + if let (Some(k), Some(v)) = (it.next(), it.next()) { + attributes.push((k, v)) + } + } + Ok(Value::Attribute { + data: Box::new(it.next().unwrap()), + attributes, + }) + }, + ) + }) + }; + let set = || { + int().then_partial(move |&mut length| { + let length = length as usize; + combine::count_min_max(length, length, value(Some(count + 1))) + .map(|result: ResultExtend<_, _>| result.0.map(Value::Set)) + }) + }; + let push = || { + int().then_partial(move |&mut length| { + if length <= 0 { + combine::value(Value::Push { + kind: PushKind::Other("".to_string()), + data: vec![], + }) + .map(Ok) + .left() + } else { + let length = length as usize; + combine::count_min_max(length, length, value(Some(count + 1))) + .map(|result: ResultExtend, _>| { + let mut it: IntoIter = result.0?.into_iter(); + let first = it.next().unwrap_or(Value::Nil); + if let Value::BulkString(kind) = first { + Ok(Value::Push { + kind: get_push_kind(String::from_utf8(kind)?), + data: it.collect(), + }) + } else if let Value::SimpleString(kind) = first { + Ok(Value::Push { + kind: get_push_kind(kind), + data: it.collect(), + }) + } else { + Err(RedisError::from(( + ErrorKind::ResponseError, + "parse error", + ))) + } + }) + .right() } }) }; - + let null = || line().map(|_| Ok(Value::Nil)); + let double = || { + line().and_then(|line| match line.trim().parse::() { + Err(_) => Err(StreamErrorFor::::message_static_message( + "Expected double, got garbage", + )), + Ok(value) => Ok(value), + }) + }; + let boolean = || { + line().and_then(|line: &str| match line { + "t" => Ok(true), + "f" => Ok(false), + _ => Err(StreamErrorFor::::message_static_message( + "Expected boolean, got garbage", + )), + }) + }; + let blob_error = || blob().map(|line| err_parser(&line)); + let verbatim = || { + blob().map(|line| { + if let Some((format, text)) = line.split_once(':') { + let format = match format { + "txt" => VerbatimFormat::Text, + "mkd" => VerbatimFormat::Markdown, + x => VerbatimFormat::Unknown(x.to_string()), + }; + Ok(Value::VerbatimString { + format, + text: text.to_string(), + }) + } else { + Err(RedisError::from((ErrorKind::ResponseError, "parse error"))) + } + }) + }; + let big_number = || { + line().and_then(|line| match BigInt::parse_bytes(line.as_bytes(), 10) { + None => Err(StreamErrorFor::::message_static_message( + "Expected bigint, got garbage", + )), + Some(value) => Ok(value), + }) + }; combine::dispatch!(b; - b'+' => status().map(Ok), + b'+' => simple_string().map(Ok), b':' => int().map(|i| Ok(Value::Int(i))), b'$' => data().map(Ok), - b'*' => bulk(), + b'*' => array(), + b'%' => map(), + b'|' => attribute(), + b'~' => set(), b'-' => error().map(Err), + b'_' => null(), + b',' => double().map(|i| Ok(Value::Double(i.into()))), + b'#' => boolean().map(|b| Ok(Value::Boolean(b))), + b'!' => blob_error().map(Err), + b'=' => verbatim(), + b'(' => big_number().map(|i| Ok(Value::BigNumber(i))), + b'>' => push(), b => combine::unexpected_any(combine::error::Token(b)) ) }) @@ -334,7 +495,6 @@ pub fn parse_redis_value(bytes: &[u8]) -> RedisResult { #[cfg(test)] mod tests { - use super::*; #[cfg(feature = "aio")] @@ -352,6 +512,116 @@ mod tests { assert_eq!(codec.decode_eof(&mut bytes), Ok(None)); } + #[test] + fn decode_resp3_double() { + let val = parse_redis_value(b",1.23\r\n").unwrap(); + assert_eq!(val, Value::Double(1.23.into())); + let val = parse_redis_value(b",nan\r\n").unwrap(); + if let Value::Double(val) = val { + assert!(val.is_sign_positive()); + assert!(val.is_nan()); + } else { + panic!("expected double"); + } + // -nan is supported prior to redis 7.2 + let val = parse_redis_value(b",-nan\r\n").unwrap(); + if let Value::Double(val) = val { + assert!(val.is_sign_negative()); + assert!(val.is_nan()); + } else { + panic!("expected double"); + } + //Allow doubles in scientific E notation + let val = parse_redis_value(b",2.67923e+8\r\n").unwrap(); + assert_eq!(val, Value::Double(267923000.0.into())); + let val = parse_redis_value(b",2.67923E+8\r\n").unwrap(); + assert_eq!(val, Value::Double(267923000.0.into())); + let val = parse_redis_value(b",-2.67923E+8\r\n").unwrap(); + assert_eq!(val, Value::Double((-267923000.0).into())); + let val = parse_redis_value(b",2.1E-2\r\n").unwrap(); + assert_eq!(val, Value::Double(0.021.into())); + + let val = parse_redis_value(b",-inf\r\n").unwrap(); + assert_eq!(val, Value::Double((-f64::INFINITY).into())); + let val = parse_redis_value(b",-inf\r\n").unwrap(); + assert_eq!(val, Value::Double(f64::NEG_INFINITY.into())); + let val = parse_redis_value(b",inf\r\n").unwrap(); + assert_eq!(val, Value::Double(f64::INFINITY.into())); + } + + #[test] + fn decode_resp3_map() { + let val = parse_redis_value(b"%2\r\n+first\r\n:1\r\n+second\r\n:2\r\n").unwrap(); + let mut v = val.as_map_iter().unwrap(); + assert_eq!( + (&Value::SimpleString("first".to_string()), &Value::Int(1)), + v.next().unwrap() + ); + assert_eq!( + (&Value::SimpleString("second".to_string()), &Value::Int(2)), + v.next().unwrap() + ); + } + + #[test] + fn decode_resp3_boolean() { + let val = parse_redis_value(b"#t\r\n").unwrap(); + assert_eq!(val, Value::Boolean(true)); + let val = parse_redis_value(b"#f\r\n").unwrap(); + assert_eq!(val, Value::Boolean(false)); + let val = parse_redis_value(b"#x\r\n"); + assert!(val.is_err()); + let val = parse_redis_value(b"#\r\n"); + assert!(val.is_err()); + } + + #[test] + fn decode_resp3_blob_error() { + let val = parse_redis_value(b"!21\r\nSYNTAX invalid syntax\r\n"); + assert_eq!( + val.err(), + Some(make_extension_error("SYNTAX", Some("invalid syntax"))) + ) + } + + #[test] + fn decode_resp3_big_number() { + let val = parse_redis_value(b"(3492890328409238509324850943850943825024385\r\n").unwrap(); + assert_eq!( + val, + Value::BigNumber( + BigInt::parse_bytes(b"3492890328409238509324850943850943825024385", 10).unwrap() + ) + ); + } + + #[test] + fn decode_resp3_set() { + let val = parse_redis_value(b"~5\r\n+orange\r\n+apple\r\n#t\r\n:100\r\n:999\r\n").unwrap(); + let v = val.as_sequence().unwrap(); + assert_eq!(Value::SimpleString("orange".to_string()), v[0]); + assert_eq!(Value::SimpleString("apple".to_string()), v[1]); + assert_eq!(Value::Boolean(true), v[2]); + assert_eq!(Value::Int(100), v[3]); + assert_eq!(Value::Int(999), v[4]); + } + + #[test] + fn decode_resp3_push() { + let val = parse_redis_value(b">3\r\n+message\r\n+somechannel\r\n+this is the message\r\n") + .unwrap(); + if let Value::Push { ref kind, ref data } = val { + assert_eq!(&PushKind::Message, kind); + assert_eq!(Value::SimpleString("somechannel".to_string()), data[0]); + assert_eq!( + Value::SimpleString("this is the message".to_string()), + data[1] + ); + } else { + panic!("Expected Value::Push") + } + } + #[test] fn test_max_recursion_depth() { let bytes = b"*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n*1\r\n"; diff --git a/redis/src/pipeline.rs b/redis/src/pipeline.rs index 803793d709..6ead125f43 100644 --- a/redis/src/pipeline.rs +++ b/redis/src/pipeline.rs @@ -96,7 +96,7 @@ impl Pipeline { )?; match resp.pop() { Some(Value::Nil) => Ok(Value::Nil), - Some(Value::Bulk(items)) => Ok(self.make_pipeline_results(items)), + Some(Value::Array(items)) => Ok(self.make_pipeline_results(items)), _ => fail!(( ErrorKind::ResponseError, "Invalid response when parsing multi response" @@ -131,7 +131,7 @@ impl Pipeline { } from_redis_value( &(if self.commands.is_empty() { - Value::Bulk(vec![]) + Value::Array(vec![]) } else if self.transaction_mode { self.execute_transaction(con)? } else { @@ -161,7 +161,7 @@ impl Pipeline { .await?; match resp.pop() { Some(Value::Nil) => Ok(Value::Nil), - Some(Value::Bulk(items)) => Ok(self.make_pipeline_results(items)), + Some(Value::Array(items)) => Ok(self.make_pipeline_results(items)), _ => Err(( ErrorKind::ResponseError, "Invalid response when parsing multi response", @@ -178,7 +178,7 @@ impl Pipeline { C: crate::aio::ConnectionLike, { let v = if self.commands.is_empty() { - return from_redis_value(&Value::Bulk(vec![])); + return from_redis_value(&Value::Array(vec![])); } else if self.transaction_mode { self.execute_transaction_async(con).await? } else { @@ -311,7 +311,7 @@ macro_rules! implement_pipeline_commands { rv.push(result); } } - Value::Bulk(rv) + Value::Array(rv) } } diff --git a/redis/src/sentinel.rs b/redis/src/sentinel.rs index 3834371849..9a32ac8898 100644 --- a/redis/src/sentinel.rs +++ b/redis/src/sentinel.rs @@ -58,6 +58,7 @@ //! db: 1, //! username: Some(String::from("foo")), //! password: Some(String::from("bar")), +//! use_resp3: false, //! }), //! }), //! ) @@ -93,6 +94,7 @@ //! db: 0, //! username: Some(String::from("user")), //! password: Some(String::from("pass")), +//! use_resp3: false, //! }), //! }), //! redis::sentinel::SentinelServerType::Master, diff --git a/redis/src/streams.rs b/redis/src/streams.rs index 9f5149195d..6bc5c844da 100644 --- a/redis/src/streams.rs +++ b/redis/src/streams.rs @@ -425,10 +425,10 @@ pub struct StreamId { } impl StreamId { - /// Converts a `Value::Bulk` into a `StreamId`. - fn from_bulk_value(v: &Value) -> RedisResult { + /// Converts a `Value::Array` into a `StreamId`. + fn from_array_value(v: &Value) -> RedisResult { let mut stream_id = StreamId::default(); - if let Value::Bulk(ref values) = *v { + if let Value::Array(ref values) = *v { if let Some(v) = values.get(0) { stream_id.id = from_redis_value(v)?; } @@ -559,11 +559,11 @@ impl FromRedisValue for StreamPendingCountReply { fn from_redis_value(v: &Value) -> RedisResult { let mut reply = StreamPendingCountReply::default(); match v { - Value::Bulk(outer_tuple) => { + Value::Array(outer_tuple) => { for outer in outer_tuple { match outer { - Value::Bulk(inner_tuple) => match &inner_tuple[..] { - [Value::Data(id_bytes), Value::Data(consumer_bytes), Value::Int(last_delivered_ms_u64), Value::Int(times_delivered_u64)] => + Value::Array(inner_tuple) => match &inner_tuple[..] { + [Value::BulkString(id_bytes), Value::BulkString(consumer_bytes), Value::Int(last_delivered_ms_u64), Value::Int(times_delivered_u64)] => { let id = String::from_utf8(id_bytes.to_vec())?; let consumer = String::from_utf8(consumer_bytes.to_vec())?; @@ -614,10 +614,10 @@ impl FromRedisValue for StreamInfoStreamReply { reply.length = from_redis_value(v)?; } if let Some(v) = &map.get("first-entry") { - reply.first_entry = StreamId::from_bulk_value(v)?; + reply.first_entry = StreamId::from_array_value(v)?; } if let Some(v) = &map.get("last-entry") { - reply.last_entry = StreamId::from_bulk_value(v)?; + reply.last_entry = StreamId::from_array_value(v)?; } Ok(reply) } diff --git a/redis/src/types.rs b/redis/src/types.rs index 1e53f161b5..a880126bd1 100644 --- a/redis/src/types.rs +++ b/redis/src/types.rs @@ -11,6 +11,8 @@ use std::string::FromUtf8Error; #[cfg(feature = "ahash")] pub(crate) use ahash::{AHashMap as HashMap, AHashSet as HashSet}; +use num_bigint::BigInt; +use ordered_float::OrderedFloat; #[cfg(not(feature = "ahash"))] pub(crate) use std::collections::{HashMap, HashSet}; use std::ops::Deref; @@ -133,6 +135,10 @@ pub enum ErrorKind { #[cfg(feature = "json")] /// Error Serializing a struct to JSON form Serialize, + + /// Redis Servers prior to v6.0.0 doesn't support RESP3. + /// Try disabling resp3 option + RESP3NotSupported, } /// Internal low-level redis value enum. @@ -145,29 +151,141 @@ pub enum Value { /// is why this library generally treats integers and strings /// the same for all numeric responses. Int(i64), - /// An arbitary binary data. - Data(Vec), - /// A bulk response of more data. This is generally used by redis + /// An arbitrary binary data, usually represents a binary-safe string. + BulkString(Vec), + /// A response containing an array with more data. This is generally used by redis /// to express nested structures. - Bulk(Vec), - /// A status response. - Status(String), + Array(Vec), + /// A simple string response, without line breaks and not binary safe. + SimpleString(String), /// A status response which represents the string "OK". Okay, + /// Unordered key,value list from the server. Use `as_map_iter` function. + Map(Vec<(Value, Value)>), + /// Attribute value from the server. Client will give data instead of whole Attribute type. + Attribute { + /// Data that attributes belong to. + data: Box, + /// Key,Value list of attributes. + attributes: Vec<(Value, Value)>, + }, + /// Unordered set value from the server. + Set(Vec), + /// A floating number response from the server. + Double(OrderedFloat), + /// A boolean response from the server. + Boolean(bool), + /// First String is format and other is the string + VerbatimString { + /// Text's format type + format: VerbatimFormat, + /// Remaining string check format before using! + text: String, + }, + /// Very large number that out of the range of the signed 64 bit numbers + BigNumber(BigInt), + /// Push data from the server. + Push { + /// Push Kind + kind: PushKind, + /// Remaining data from push message + data: Vec, + }, +} + +/// `VerbatimString`'s format types defined by spec +#[derive(PartialEq, Eq, Clone, Debug, Hash)] +pub enum VerbatimFormat { + /// Unknown type to catch future formats. + Unknown(String), + /// `mkd` format + Markdown, + /// `txt` format + Text, +} + +/// `Push` type's currently known kinds. +#[derive(PartialEq, Eq, Clone, Debug, Hash)] +pub enum PushKind { + /// Other kind to catch future kinds. + Other(String), + /// `invalidate` is received when a key is changed/deleted. + Invalidate, + /// `message` is received when pubsub message published by another client. + Message, + /// `pmessage` is received when pubsub message published by another client and client subscribed to topic via pattern. + PMessage, + /// `smessage` is received when pubsub message published by another client and client subscribed to it with sharding. + SMessage, + /// `unsubscribe` is received when client unsubscribed from a channel. + Unsubscribe, + /// `punsubscribe` is received when client unsubscribed from a pattern. + PUnsubscribe, + /// `sunsubscribe` is received when client unsubscribed from a shard channel. + SUnsubscribe, + /// `subscribe` is received when client subscribed to a channel. + Subscribe, + /// `psubscribe` is received when client subscribed to a pattern. + PSubscribe, + /// `ssubscribe` is received when client subscribed to a shard channel. + SSubscribe, +} + +impl fmt::Display for VerbatimFormat { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + VerbatimFormat::Markdown => write!(f, "mkd"), + VerbatimFormat::Unknown(val) => write!(f, "{val}"), + VerbatimFormat::Text => write!(f, "txt"), + } + } +} + +impl fmt::Display for PushKind { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + PushKind::Other(kind) => write!(f, "{}", kind), + PushKind::Invalidate => write!(f, "invalidate"), + PushKind::Message => write!(f, "message"), + PushKind::PMessage => write!(f, "pmessage"), + PushKind::SMessage => write!(f, "smessage"), + PushKind::Unsubscribe => write!(f, "unsubscribe"), + PushKind::PUnsubscribe => write!(f, "punsubscribe"), + PushKind::SUnsubscribe => write!(f, "sunsubscribe"), + PushKind::Subscribe => write!(f, "subscribe"), + PushKind::PSubscribe => write!(f, "psubscribe"), + PushKind::SSubscribe => write!(f, "ssubscribe"), + } + } } -pub struct MapIter<'a>(std::slice::Iter<'a, Value>); +pub struct MapIter<'a> { + bulk: Option>, + map: Option>, +} impl<'a> Iterator for MapIter<'a> { type Item = (&'a Value, &'a Value); fn next(&mut self) -> Option { - Some((self.0.next()?, self.0.next()?)) + if let Some(m) = &mut self.map { + let (k, v) = m.next()?; + return Some((k, v)); + } else if let Some(bulk) = &mut self.bulk { + return Some((bulk.next()?, bulk.next()?)); + } + None } fn size_hint(&self) -> (usize, Option) { - let (low, high) = self.0.size_hint(); - (low / 2, high.map(|h| h / 2)) + if let Some(ref m) = self.map { + return m.size_hint(); + } + if let Some(ref bulk) = self.bulk { + let (low, high) = bulk.size_hint(); + return (low / 2, high.map(|h| h / 2)); + } + (0, None) } } @@ -180,23 +298,23 @@ impl<'a> Iterator for MapIter<'a> { /// types. impl Value { /// Checks if the return value looks like it fulfils the cursor - /// protocol. That means the result is a bulk item of length - /// two with the first one being a cursor and the second a - /// bulk response. + /// protocol. That means the result is an array item of length + /// two with the first one being a cursor and the second an + /// array response. pub fn looks_like_cursor(&self) -> bool { match *self { - Value::Bulk(ref items) => { + Value::Array(ref items) => { if items.len() != 2 { return false; } match items[0] { - Value::Data(_) => {} + Value::BulkString(_) => {} _ => { return false; } }; match items[1] { - Value::Bulk(_) => {} + Value::Array(_) => {} _ => { return false; } @@ -210,7 +328,8 @@ impl Value { /// Returns an `&[Value]` if `self` is compatible with a sequence type pub fn as_sequence(&self) -> Option<&[Value]> { match self { - Value::Bulk(items) => Some(&items[..]), + Value::Array(items) => Some(&items[..]), + Value::Set(items) => Some(&items[..]), Value::Nil => Some(&[]), _ => None, } @@ -219,7 +338,14 @@ impl Value { /// Returns an iterator of `(&Value, &Value)` if `self` is compatible with a map type pub fn as_map_iter(&self) -> Option> { match self { - Value::Bulk(items) => Some(MapIter(items.iter())), + Value::Array(items) => Some(MapIter { + bulk: Some(items.iter()), + map: None, + }), + Value::Map(items) => Some(MapIter { + bulk: None, + map: Some(items.iter()), + }), _ => None, } } @@ -230,24 +356,29 @@ impl fmt::Debug for Value { match *self { Value::Nil => write!(fmt, "nil"), Value::Int(val) => write!(fmt, "int({val:?})"), - Value::Data(ref val) => match from_utf8(val) { + Value::BulkString(ref val) => match from_utf8(val) { Ok(x) => write!(fmt, "string-data('{x:?}')"), Err(_) => write!(fmt, "binary-data({val:?})"), }, - Value::Bulk(ref values) => { - write!(fmt, "bulk(")?; - let mut is_first = true; - for val in values.iter() { - if !is_first { - write!(fmt, ", ")?; - } - write!(fmt, "{val:?}")?; - is_first = false; - } - write!(fmt, ")") - } + Value::Array(ref values) => write!(fmt, "bulk({values:?})"), + Value::Push { ref kind, ref data } => write!(fmt, "push({kind:?}, {data:?})"), Value::Okay => write!(fmt, "ok"), - Value::Status(ref s) => write!(fmt, "status({s:?})"), + Value::SimpleString(ref s) => write!(fmt, "status({s:?})"), + Value::Map(ref values) => write!(fmt, "map({values:?})"), + Value::Attribute { + ref data, + attributes: _, + } => write!(fmt, "attribute({data:?})"), + Value::Set(ref values) => write!(fmt, "set({values:?})"), + Value::Double(ref d) => write!(fmt, "double({d:?})"), + Value::Boolean(ref b) => write!(fmt, "boolean({b:?})"), + Value::VerbatimString { + ref format, + ref text, + } => { + write!(fmt, "verbatim-string({:?},{:?})", format, text) + } + Value::BigNumber(ref m) => write!(fmt, "big-number({:?})", m), } } } @@ -504,6 +635,7 @@ impl RedisError { ErrorKind::NotBusy => "not busy", #[cfg(feature = "json")] ErrorKind::Serialize => "serializing", + ErrorKind::RESP3NotSupported => "resp3 is not supported by server", } } @@ -655,6 +787,7 @@ impl RedisError { ErrorKind::NotBusy => false, #[cfg(feature = "json")] ErrorKind::Serialize => false, + ErrorKind::RESP3NotSupported => false, } } } @@ -714,7 +847,7 @@ impl InfoDict { let mut p = line.splitn(2, ':'); let k = unwrap_or!(p.next(), continue).to_string(); let v = unwrap_or!(p.next(), continue).to_string(); - map.insert(k, Value::Status(v)); + map.insert(k, Value::SimpleString(v)); } InfoDict { map } } @@ -1219,25 +1352,46 @@ pub trait FromRedisValue: Sized { /// Convert bytes to a single element vector. fn from_byte_vec(_vec: &[u8]) -> Option> { - Self::from_redis_value(&Value::Data(_vec.into())) + Self::from_redis_value(&Value::BulkString(_vec.into())) .map(|rv| vec![rv]) .ok() } } +fn get_inner_value(v: &Value) -> &Value { + if let Value::Attribute { + data, + attributes: _, + } = v + { + data.as_ref() + } else { + v + } +} + macro_rules! from_redis_value_for_num_internal { ($t:ty, $v:expr) => {{ - let v = $v; + let v = if let Value::Attribute { + data, + attributes: _, + } = $v + { + data + } else { + $v + }; match *v { Value::Int(val) => Ok(val as $t), - Value::Status(ref s) => match s.parse::<$t>() { + Value::SimpleString(ref s) => match s.parse::<$t>() { Ok(rv) => Ok(rv), Err(_) => invalid_type_error!(v, "Could not convert from string."), }, - Value::Data(ref bytes) => match from_utf8(bytes)?.parse::<$t>() { + Value::BulkString(ref bytes) => match from_utf8(bytes)?.parse::<$t>() { Ok(rv) => Ok(rv), Err(_) => invalid_type_error!(v, "Could not convert from string."), }, + Value::Double(val) => Ok(val.into_inner() as $t), _ => invalid_type_error!(v, "Response type not convertible to numeric."), } }}; @@ -1280,10 +1434,11 @@ from_redis_value_for_num!(usize); impl FromRedisValue for bool { fn from_redis_value(v: &Value) -> RedisResult { + let v = get_inner_value(v); match *v { Value::Nil => Ok(false), Value::Int(val) => Ok(val != 0), - Value::Status(ref s) => { + Value::SimpleString(ref s) => { if &s[..] == "1" { Ok(true) } else if &s[..] == "0" { @@ -1292,7 +1447,7 @@ impl FromRedisValue for bool { invalid_type_error!(v, "Response status not valid boolean"); } } - Value::Data(ref bytes) => { + Value::BulkString(ref bytes) => { if bytes == b"1" { Ok(true) } else if bytes == b"0" { @@ -1301,6 +1456,7 @@ impl FromRedisValue for bool { invalid_type_error!(v, "Response type not bool compatible."); } } + Value::Boolean(b) => Ok(b), Value::Okay => Ok(true), _ => invalid_type_error!(v, "Response type not bool compatible."), } @@ -1309,10 +1465,11 @@ impl FromRedisValue for bool { impl FromRedisValue for CString { fn from_redis_value(v: &Value) -> RedisResult { + let v = get_inner_value(v); match *v { - Value::Data(ref bytes) => Ok(CString::new(bytes.as_slice())?), + Value::BulkString(ref bytes) => Ok(CString::new(bytes.as_slice())?), Value::Okay => Ok(CString::new("OK")?), - Value::Status(ref val) => Ok(CString::new(val.as_bytes())?), + Value::SimpleString(ref val) => Ok(CString::new(val.as_bytes())?), _ => invalid_type_error!(v, "Response type not CString compatible."), } } @@ -1320,10 +1477,16 @@ impl FromRedisValue for CString { impl FromRedisValue for String { fn from_redis_value(v: &Value) -> RedisResult { + let v = get_inner_value(v); match *v { - Value::Data(ref bytes) => Ok(from_utf8(bytes)?.to_string()), + Value::BulkString(ref bytes) => Ok(from_utf8(bytes)?.to_string()), Value::Okay => Ok("OK".to_string()), - Value::Status(ref val) => Ok(val.to_string()), + Value::SimpleString(ref val) => Ok(val.to_string()), + Value::VerbatimString { + format: _, + ref text, + } => Ok(text.to_string()), + Value::Double(ref val) => Ok(val.to_string()), _ => invalid_type_error!(v, "Response type not string compatible."), } } @@ -1344,20 +1507,35 @@ macro_rules! from_vec_from_redis_value { match v { // All binary data except u8 will try to parse into a single element vector. // u8 has its own implementation of from_byte_vec. - Value::Data(bytes) => match FromRedisValue::from_byte_vec(bytes) { + Value::BulkString(bytes) => match FromRedisValue::from_byte_vec(bytes) { Some(x) => Ok($convert(x)), None => invalid_type_error!( v, format!("Conversion to {} failed.", std::any::type_name::<$Type>()) ), }, - Value::Bulk(items) => FromRedisValue::from_redis_values(items).map($convert), + Value::Array(items) => FromRedisValue::from_redis_values(items).map($convert), + Value::Set(ref items) => FromRedisValue::from_redis_values(items).map($convert), + Value::Map(ref items) => { + let mut n: Vec = vec![]; + for item in items { + match FromRedisValue::from_redis_value(&Value::Map(vec![item.clone()])) { + Ok(v) => { + n.push(v); + } + Err(e) => { + return Err(e); + } + } + } + Ok($convert(n)) + } Value::Nil => Ok($convert(Vec::new())), _ => invalid_type_error!(v, "Response type not vector compatible."), } } } - }; + } } from_vec_from_redis_value!( Vec); @@ -1368,6 +1546,7 @@ impl for std::collections::HashMap { fn from_redis_value(v: &Value) -> RedisResult> { + let v = get_inner_value(v); match *v { Value::Nil => Ok(Default::default()), _ => v @@ -1384,6 +1563,7 @@ impl #[cfg(feature = "ahash")] impl FromRedisValue for ahash::AHashMap { fn from_redis_value(v: &Value) -> RedisResult> { + let v = get_inner_value(v); match *v { Value::Nil => Ok(ahash::AHashMap::with_hasher(Default::default())), _ => v @@ -1402,6 +1582,7 @@ where K: Ord, { fn from_redis_value(v: &Value) -> RedisResult> { + let v = get_inner_value(v); v.as_map_iter() .ok_or_else(|| invalid_type_error_inner!(v, "Response type not btreemap compatible"))? .map(|(k, v)| Ok((from_redis_value(k)?, from_redis_value(v)?))) @@ -1413,6 +1594,7 @@ impl FromRedisValue for std::collections::HashSet { fn from_redis_value(v: &Value) -> RedisResult> { + let v = get_inner_value(v); let items = v .as_sequence() .ok_or_else(|| invalid_type_error_inner!(v, "Response type not hashset compatible"))?; @@ -1423,6 +1605,7 @@ impl FromRedisValue #[cfg(feature = "ahash")] impl FromRedisValue for ahash::AHashSet { fn from_redis_value(v: &Value) -> RedisResult> { + let v = get_inner_value(v); let items = v .as_sequence() .ok_or_else(|| invalid_type_error_inner!(v, "Response type not hashset compatible"))?; @@ -1435,6 +1618,7 @@ where T: Ord, { fn from_redis_value(v: &Value) -> RedisResult> { + let v = get_inner_value(v); let items = v .as_sequence() .ok_or_else(|| invalid_type_error_inner!(v, "Response type not btreeset compatible"))?; @@ -1463,13 +1647,14 @@ macro_rules! from_redis_value_for_tuple { // variables are unused. #[allow(non_snake_case, unused_variables)] fn from_redis_value(v: &Value) -> RedisResult<($($name,)*)> { + let v = get_inner_value(v); match *v { - Value::Bulk(ref items) => { + Value::Array(ref items) => { // hacky way to count the tuple size let mut n = 0; $(let $name = (); n += 1;)* if items.len() != n { - invalid_type_error!(v, "Bulk response of wrong dimension") + invalid_type_error!(v, "Array response of wrong dimension") } // this is pretty ugly too. The { i += 1; i - 1} is rust's @@ -1478,6 +1663,28 @@ macro_rules! from_redis_value_for_tuple { Ok(($({let $name = (); from_redis_value( &items[{ i += 1; i - 1 }])?},)*)) } + + Value::Map(ref items) => { + // hacky way to count the tuple size + let mut n = 0; + $(let $name = (); n += 1;)* + if n != 2 { + invalid_type_error!(v, "Map response of wrong dimension") + } + + let mut flatten_items = vec![]; + for (k,v) in items { + flatten_items.push(k); + flatten_items.push(v); + } + + // this is pretty ugly too. The { i += 1; i - 1} is rust's + // postfix increment :) + let mut i = 0; + Ok(($({let $name = (); from_redis_value( + &flatten_items[{ i += 1; i - 1 }])?},)*)) + } + _ => invalid_type_error!(v, "Not a bulk response") } } @@ -1487,20 +1694,36 @@ macro_rules! from_redis_value_for_tuple { // hacky way to count the tuple size let mut n = 0; $(let $name = (); n += 1;)* - if items.len() % n != 0 { - invalid_type_error!(items, "Bulk response of wrong dimension") - } - - // this is pretty ugly too. The { i += 1; i - 1} is rust's - // postfix increment :) let mut rv = vec![]; if items.len() == 0 { return Ok(rv) } - for chunk in items.chunks_exact(n) { + //It's uglier then before! + for item in items { + match item { + Value::Array(ch) => { + if let [$($name),*] = &ch[..] { + rv.push(($(from_redis_value(&$name)?),*),) + } else { + unreachable!() + }; + }, + _ => {}, + + } + } + if !rv.is_empty(){ + return Ok(rv); + } + + if let [$($name),*] = items{ + rv.push(($(from_redis_value($name)?),*),); + return Ok(rv); + } + for chunk in items.chunks_exact(n) { match chunk { [$($name),*] => rv.push(($(from_redis_value($name)?),*),), - _ => unreachable!(), + _ => {}, } } Ok(rv) @@ -1521,6 +1744,7 @@ from_redis_value_for_tuple! { T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, impl FromRedisValue for InfoDict { fn from_redis_value(v: &Value) -> RedisResult { + let v = get_inner_value(v); let s: String = from_redis_value(v)?; Ok(InfoDict::new(&s)) } @@ -1528,6 +1752,7 @@ impl FromRedisValue for InfoDict { impl FromRedisValue for Option { fn from_redis_value(v: &Value) -> RedisResult> { + let v = get_inner_value(v); if *v == Value::Nil { return Ok(None); } @@ -1538,8 +1763,9 @@ impl FromRedisValue for Option { #[cfg(feature = "bytes")] impl FromRedisValue for bytes::Bytes { fn from_redis_value(v: &Value) -> RedisResult { + let v = get_inner_value(v); match v { - Value::Data(bytes_vec) => Ok(bytes::Bytes::copy_from_slice(bytes_vec.as_ref())), + Value::BulkString(bytes_vec) => Ok(bytes::Bytes::copy_from_slice(bytes_vec.as_ref())), _ => invalid_type_error!(v, "Not binary data"), } } diff --git a/redis/tests/parser.rs b/redis/tests/parser.rs index 9acead79ba..581766e5a2 100644 --- a/redis/tests/parser.rs +++ b/redis/tests/parser.rs @@ -1,5 +1,6 @@ use std::{io, pin::Pin}; +use ordered_float::OrderedFloat; use redis::Value; use { futures::{ @@ -27,8 +28,10 @@ impl ::quickcheck::Arbitrary for ArbitraryValue { match self.0 { Value::Nil | Value::Okay => Box::new(None.into_iter()), Value::Int(i) => Box::new(i.shrink().map(Value::Int).map(ArbitraryValue)), - Value::Data(ref xs) => Box::new(xs.shrink().map(Value::Data).map(ArbitraryValue)), - Value::Bulk(ref xs) => { + Value::BulkString(ref xs) => { + Box::new(xs.shrink().map(Value::BulkString).map(ArbitraryValue)) + } + Value::Array(ref xs) | Value::Set(ref xs) => { let ys = xs .iter() .map(|x| ArbitraryValue(x.clone())) @@ -36,13 +39,56 @@ impl ::quickcheck::Arbitrary for ArbitraryValue { Box::new( ys.shrink() .map(|xs| xs.into_iter().map(|x| x.0).collect()) - .map(Value::Bulk) + .map(Value::Array) .map(ArbitraryValue), ) } - Value::Status(ref status) => { - Box::new(status.shrink().map(Value::Status).map(ArbitraryValue)) + Value::Map(ref _xs) => Box::new(vec![ArbitraryValue(Value::Map(vec![]))].into_iter()), + Value::Attribute { + ref data, + ref attributes, + } => Box::new( + vec![ArbitraryValue(Value::Attribute { + data: data.clone(), + attributes: attributes.clone(), + })] + .into_iter(), + ), + Value::Push { ref kind, ref data } => { + let mut ys = data + .iter() + .map(|x| ArbitraryValue(x.clone())) + .collect::>(); + ys.insert(0, ArbitraryValue(Value::SimpleString(kind.to_string()))); + Box::new( + ys.shrink() + .map(|xs| xs.into_iter().map(|x| x.0).collect()) + .map(Value::Array) + .map(ArbitraryValue), + ) + } + Value::SimpleString(ref status) => { + Box::new(status.shrink().map(Value::SimpleString).map(ArbitraryValue)) + } + Value::Double(i) => Box::new( + i.shrink() + .map(|f| Value::Double(OrderedFloat(f))) + .map(ArbitraryValue), + ), + Value::Boolean(i) => Box::new(i.shrink().map(Value::Boolean).map(ArbitraryValue)), + Value::BigNumber(ref i) => { + Box::new(vec![ArbitraryValue(Value::BigNumber(i.clone()))].into_iter()) } + Value::VerbatimString { + ref format, + ref text, + } => Box::new( + vec![ArbitraryValue(Value::VerbatimString { + format: format.clone(), + text: text.clone(), + })] + .into_iter(), + ), } } } @@ -55,13 +101,13 @@ fn arbitrary_value(g: &mut Gen, recursive_size: usize) -> Value { match u8::arbitrary(g) % 6 { 0 => Value::Nil, 1 => Value::Int(Arbitrary::arbitrary(g)), - 2 => Value::Data(Arbitrary::arbitrary(g)), + 2 => Value::BulkString(Arbitrary::arbitrary(g)), 3 => { let size = { let s = g.size(); usize::arbitrary(g) % s }; - Value::Bulk( + Value::Array( (0..size) .map(|_| arbitrary_value(g, recursive_size / size)) .collect(), @@ -73,18 +119,18 @@ fn arbitrary_value(g: &mut Gen, recursive_size: usize) -> Value { usize::arbitrary(g) % s }; - let mut status = String::with_capacity(size); + let mut string = String::with_capacity(size); for _ in 0..size { let c = char::arbitrary(g); if c.is_ascii_alphabetic() { - status.push(c); + string.push(c); } } - if status == "OK" { + if string == "OK" { Value::Okay } else { - Value::Status(status) + Value::SimpleString(string) } } 5 => Value::Okay, diff --git a/redis/tests/support/cluster.rs b/redis/tests/support/cluster.rs index 1245d1bf35..edb63825d2 100644 --- a/redis/tests/support/cluster.rs +++ b/redis/tests/support/cluster.rs @@ -422,8 +422,9 @@ impl TestClusterContext { .unwrap(); // subsequent unauthenticated command should fail: - let mut con = client.get_connection().unwrap(); - assert!(redis::cmd("PING").query::<()>(&mut con).is_err()); + if let Ok(mut con) = client.get_connection() { + assert!(redis::cmd("PING").query::<()>(&mut con).is_err()); + } } } diff --git a/redis/tests/support/mock_cluster.rs b/redis/tests/support/mock_cluster.rs index ccdf28eea8..a5fd37587a 100644 --- a/redis/tests/support/mock_cluster.rs +++ b/redis/tests/support/mock_cluster.rs @@ -21,6 +21,7 @@ use redis::{aio, cluster_async, RedisFuture}; #[cfg(feature = "cluster-async")] use futures::future; +use redis::PushKind; #[cfg(feature = "cluster-async")] use tokio::runtime::Runtime; @@ -114,18 +115,18 @@ pub fn contains_slice(xs: &[u8], ys: &[u8]) -> bool { pub fn respond_startup(name: &str, cmd: &[u8]) -> Result<(), RedisResult> { if contains_slice(cmd, b"PING") { - Err(Ok(Value::Status("OK".into()))) + Err(Ok(Value::SimpleString("OK".into()))) } else if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") { - Err(Ok(Value::Bulk(vec![Value::Bulk(vec![ + Err(Ok(Value::Array(vec![Value::Array(vec![ Value::Int(0), Value::Int(16383), - Value::Bulk(vec![ - Value::Data(name.as_bytes().to_vec()), + Value::Array(vec![ + Value::BulkString(name.as_bytes().to_vec()), Value::Int(6379), ]), ])]))) } else if contains_slice(cmd, b"READONLY") { - Err(Ok(Value::Status("OK".into()))) + Err(Ok(Value::SimpleString("OK".into()))) } else { Ok(()) } @@ -168,21 +169,21 @@ pub fn create_topology_from_config(name: &str, slots_config: Vec) let mut config = vec![ Value::Int(slot_config.slot_range.start as i64), Value::Int(slot_config.slot_range.end as i64), - Value::Bulk(vec![ - Value::Data(name.as_bytes().to_vec()), + Value::Array(vec![ + Value::BulkString(name.as_bytes().to_vec()), Value::Int(slot_config.primary_port as i64), ]), ]; config.extend(slot_config.replica_ports.into_iter().map(|replica_port| { - Value::Bulk(vec![ - Value::Data(name.as_bytes().to_vec()), + Value::Array(vec![ + Value::BulkString(name.as_bytes().to_vec()), Value::Int(replica_port as i64), ]) })); - Value::Bulk(config) + Value::Array(config) }) .collect(); - Value::Bulk(slots_vec) + Value::Array(slots_vec) } pub fn respond_startup_with_replica_using_config( @@ -203,12 +204,12 @@ pub fn respond_startup_with_replica_using_config( }, ]); if contains_slice(cmd, b"PING") { - Err(Ok(Value::Status("OK".into()))) + Err(Ok(Value::SimpleString("OK".into()))) } else if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") { let slots = create_topology_from_config(name, slots_config); Err(Ok(slots)) } else if contains_slice(cmd, b"READONLY") { - Err(Ok(Value::Status("OK".into()))) + Err(Ok(Value::SimpleString("OK".into()))) } else { Ok(()) } @@ -252,9 +253,9 @@ impl redis::ConnectionLike for MockConnection { match res { Err(err) => Err(err), Ok(res) => { - if let Value::Bulk(results) = res { + if let Value::Array(results) = res { match results.into_iter().nth(offset) { - Some(Value::Bulk(res)) => Ok(res), + Some(Value::Array(res)) => Ok(res), _ => Err((ErrorKind::ResponseError, "non-array response").into()), } } else { @@ -280,6 +281,10 @@ impl redis::ConnectionLike for MockConnection { fn is_open(&self) -> bool { true } + + fn execute_push_message(&mut self, _kind: PushKind, _data: Vec) { + // TODO - implement handling RESP3 push messages + } } pub struct MockEnv { diff --git a/redis/tests/support/mod.rs b/redis/tests/support/mod.rs index fe17471dd7..9c2fdb5fbf 100644 --- a/redis/tests/support/mod.rs +++ b/redis/tests/support/mod.rs @@ -12,7 +12,7 @@ use std::{ }; use futures::Future; -use redis::{ConnectionAddr, InfoDict, Value}; +use redis::{ConnectionAddr, InfoDict, RedisConnectionInfo, Value}; #[cfg(feature = "tls-rustls")] use redis::{ClientTlsConfig, TlsCertificates}; @@ -20,6 +20,10 @@ use redis::{ClientTlsConfig, TlsCertificates}; use socket2::{Domain, Socket, Type}; use tempfile::TempDir; +fn use_resp3() -> bool { + env::var("RESP3").unwrap_or_default() == "true" +} + pub fn current_thread_runtime() -> tokio::runtime::Runtime { let mut builder = tokio::runtime::Builder::new_current_thread(); @@ -284,7 +288,10 @@ impl RedisServer { pub fn connection_info(&self) -> redis::ConnectionInfo { redis::ConnectionInfo { addr: self.client_addr().clone(), - redis: Default::default(), + redis: RedisConnectionInfo { + use_resp3: use_resp3(), + ..Default::default() + }, } } @@ -325,6 +332,7 @@ impl Drop for RedisServer { pub struct TestContext { pub server: RedisServer, pub client: redis::Client, + pub use_resp3: bool, } pub(crate) fn is_tls_enabled() -> bool { @@ -388,7 +396,11 @@ impl TestContext { } redis::cmd("FLUSHDB").execute(&mut con); - TestContext { server, client } + TestContext { + server, + client, + use_resp3: use_resp3(), + } } pub fn with_modules(modules: &[Module], mtls_enabled: bool) -> TestContext { @@ -425,7 +437,11 @@ impl TestContext { } redis::cmd("FLUSHDB").execute(&mut con); - TestContext { server, client } + TestContext { + server, + client, + use_resp3: use_resp3(), + } } pub fn connection(&self) -> redis::Connection { @@ -474,6 +490,27 @@ impl TestContext { } } +fn encode_iter(values: &Vec, writer: &mut W, prefix: &str) -> io::Result<()> +where + W: io::Write, +{ + write!(writer, "{}{}\r\n", prefix, values.len())?; + for val in values.iter() { + encode_value(val, writer)?; + } + Ok(()) +} +fn encode_map(values: &Vec<(Value, Value)>, writer: &mut W, prefix: &str) -> io::Result<()> +where + W: io::Write, +{ + write!(writer, "{}{}\r\n", prefix, values.len())?; + for (k, v) in values.iter() { + encode_value(k, writer)?; + encode_value(v, writer)?; + } + Ok(()) +} pub fn encode_value(value: &Value, writer: &mut W) -> io::Result<()> where W: io::Write, @@ -482,20 +519,48 @@ where match *value { Value::Nil => write!(writer, "$-1\r\n"), Value::Int(val) => write!(writer, ":{val}\r\n"), - Value::Data(ref val) => { + Value::BulkString(ref val) => { write!(writer, "${}\r\n", val.len())?; writer.write_all(val)?; writer.write_all(b"\r\n") } - Value::Bulk(ref values) => { - write!(writer, "*{}\r\n", values.len())?; - for val in values.iter() { + Value::Array(ref values) => encode_iter(values, writer, "*"), + Value::Okay => write!(writer, "+OK\r\n"), + Value::SimpleString(ref s) => write!(writer, "+{s}\r\n"), + Value::Map(ref values) => encode_map(values, writer, "%"), + Value::Attribute { + ref data, + ref attributes, + } => { + encode_map(attributes, writer, "|")?; + encode_value(data, writer)?; + Ok(()) + } + Value::Set(ref values) => encode_iter(values, writer, "~"), + // Value::Nil => write!(writer, "_\r\n"), //TODO is it okey to use $-1 in resp3 ? + Value::Double(val) => write!(writer, ",{}\r\n", val), + Value::Boolean(v) => { + if v { + write!(writer, "#t\r\n") + } else { + write!(writer, "#f\r\n") + } + } + Value::VerbatimString { + ref format, + ref text, + } => { + // format is always 3 bytes + write!(writer, "={}\r\n{}:{}\r\n", 3 + text.len(), format, text) + } + Value::BigNumber(ref val) => write!(writer, "({}\r\n", val), + Value::Push { ref kind, ref data } => { + write!(writer, ">{}\r\n+{kind}\r\n", data.len() + 1)?; + for val in data.iter() { encode_value(val, writer)?; } Ok(()) } - Value::Okay => write!(writer, "+OK\r\n"), - Value::Status(ref s) => write!(writer, "+{s}\r\n"), } } diff --git a/redis/tests/test_async.rs b/redis/tests/test_async.rs index 01fc85df14..d9ca2b7642 100644 --- a/redis/tests/test_async.rs +++ b/redis/tests/test_async.rs @@ -97,6 +97,30 @@ fn test_pipeline_transaction() { .unwrap(); } +#[test] +fn test_client_tracking_doesnt_block_execution() { + //It checks if the library distinguish a push-type message from the others and continues its normal operation. + let ctx = TestContext::new(); + block_on_all(async move { + let mut con = ctx.async_connection().await.unwrap(); + let mut pipe = redis::pipe(); + pipe.cmd("CLIENT") + .arg("TRACKING") + .arg("ON") + .ignore() + .cmd("GET") + .arg("key_1") + .ignore() + .cmd("SET") + .arg("key_1") + .arg(42) + .ignore(); + let _: RedisResult<()> = pipe.query_async(&mut con).await; + let num: i32 = con.get("key_1").await.unwrap(); + assert_eq!(num, 42); + }); +} + #[test] fn test_pipeline_transaction_with_errors() { use redis::RedisError; @@ -447,6 +471,7 @@ async fn invalid_password_issue_343() { db: 0, username: None, password: Some("asdcasc".to_string()), + use_resp3: false, }, }; let client = redis::Client::open(coninfo).unwrap(); diff --git a/redis/tests/test_basic.rs b/redis/tests/test_basic.rs index c00c334f46..48da016985 100644 --- a/redis/tests/test_basic.rs +++ b/redis/tests/test_basic.rs @@ -103,6 +103,43 @@ fn test_key_type() { assert_eq!(hash_key_type, "hash"); } +#[test] +fn test_client_tracking_doesnt_block_execution() { + //It checks if the library distinguish a push-type message from the others and continues its normal operation. + let ctx = TestContext::new(); + let mut con = ctx.connection(); + let (k1, k2): (i32, i32) = redis::pipe() + .cmd("CLIENT") + .arg("TRACKING") + .arg("ON") + .ignore() + .cmd("GET") + .arg("key_1") + .ignore() + .cmd("SET") + .arg("key_1") + .arg(42) + .ignore() + .cmd("SET") + .arg("key_2") + .arg(43) + .ignore() + .cmd("GET") + .arg("key_1") + .cmd("GET") + .arg("key_2") + .cmd("SET") + .arg("key_1") + .arg(45) + .ignore() + .query(&mut con) + .unwrap(); + assert_eq!(k1, 42); + assert_eq!(k2, 43); + let num: i32 = con.get("key_1").unwrap(); + assert_eq!(num, 45); +} + #[test] fn test_incr() { let ctx = TestContext::new(); @@ -169,7 +206,7 @@ fn test_info() { let info: redis::InfoDict = redis::cmd("INFO").query(&mut con).unwrap(); assert_eq!( info.find(&"role"), - Some(&redis::Value::Status("master".to_string())) + Some(&redis::Value::SimpleString("master".to_string())) ); assert_eq!(info.get("role"), Some("master".to_string())); assert_eq!(info.get("loading"), Some(false)); @@ -1107,6 +1144,15 @@ fn test_zunionstore_weights() { ("two".to_string(), "10".to_string()) ]) ); + // test converting to double + assert_eq!( + con.zrange_withscores("out", 0, -1), + Ok(vec![ + ("one".to_string(), 5.0), + ("three".to_string(), 9.0), + ("two".to_string(), 10.0) + ]) + ); // zunionstore_min_weights assert_eq!( @@ -1204,11 +1250,19 @@ fn test_zrandmember() { let results: Vec = con.zrandmember(setname, Some(-5)).unwrap(); assert_eq!(results.len(), 5); - let results: Vec = con.zrandmember_withscores(setname, 5).unwrap(); - assert_eq!(results.len(), 10); + if !ctx.use_resp3 { + let results: Vec = con.zrandmember_withscores(setname, 5).unwrap(); + assert_eq!(results.len(), 10); - let results: Vec = con.zrandmember_withscores(setname, -5).unwrap(); - assert_eq!(results.len(), 10); + let results: Vec = con.zrandmember_withscores(setname, -5).unwrap(); + assert_eq!(results.len(), 10); + } + + let results: Vec<(String, f64)> = con.zrandmember_withscores(setname, 5).unwrap(); + assert_eq!(results.len(), 5); + + let results: Vec<(String, f64)> = con.zrandmember_withscores(setname, -5).unwrap(); + assert_eq!(results.len(), 5); } #[test] diff --git a/redis/tests/test_cluster.rs b/redis/tests/test_cluster.rs index e7048e2941..a02dad9092 100644 --- a/redis/tests/test_cluster.rs +++ b/redis/tests/test_cluster.rs @@ -311,7 +311,7 @@ fn test_cluster_retries() { match requests.fetch_add(1, atomic::Ordering::SeqCst) { 0..=4 => Err(parse_redis_value(b"-TRYAGAIN mock\r\n")), - _ => Err(Ok(Value::Data(b"123".to_vec()))), + _ => Err(Ok(Value::BulkString(b"123".to_vec()))), } }, ); @@ -373,7 +373,7 @@ fn test_cluster_move_error_when_new_node_is_added() { started.store(true, atomic::Ordering::SeqCst); if contains_slice(cmd, b"PING") { - return Err(Ok(Value::Status("OK".into()))); + return Err(Ok(Value::SimpleString("OK".into()))); } let i = requests.fetch_add(1, atomic::Ordering::SeqCst); @@ -382,20 +382,20 @@ fn test_cluster_move_error_when_new_node_is_added() { // Respond that the key exists on a node that does not yet have a connection: 0 => Err(parse_redis_value(b"-MOVED 123\r\n")), // Respond with the new masters - 1 => Err(Ok(Value::Bulk(vec![ - Value::Bulk(vec![ + 1 => Err(Ok(Value::Array(vec![ + Value::Array(vec![ Value::Int(0), Value::Int(1), - Value::Bulk(vec![ - Value::Data(name.as_bytes().to_vec()), + Value::Array(vec![ + Value::BulkString(name.as_bytes().to_vec()), Value::Int(6379), ]), ]), - Value::Bulk(vec![ + Value::Array(vec![ Value::Int(2), Value::Int(16383), - Value::Bulk(vec![ - Value::Data(name.as_bytes().to_vec()), + Value::Array(vec![ + Value::BulkString(name.as_bytes().to_vec()), Value::Int(6380), ]), ]), @@ -403,7 +403,7 @@ fn test_cluster_move_error_when_new_node_is_added() { _ => { // Check that the correct node receives the request after rebuilding assert_eq!(port, 6380); - Err(Ok(Value::Data(b"123".to_vec()))) + Err(Ok(Value::BulkString(b"123".to_vec()))) } } }); @@ -442,7 +442,7 @@ fn test_cluster_ask_redirect() { } 2 => { assert!(contains_slice(cmd, b"GET")); - Err(Ok(Value::Data(b"123".to_vec()))) + Err(Ok(Value::BulkString(b"123".to_vec()))) } _ => panic!("Node should not be called now"), }, @@ -475,7 +475,7 @@ fn test_cluster_ask_error_when_new_node_is_added() { started.store(true, atomic::Ordering::SeqCst); if contains_slice(cmd, b"PING") { - return Err(Ok(Value::Status("OK".into()))); + return Err(Ok(Value::SimpleString("OK".into()))); } let i = requests.fetch_add(1, atomic::Ordering::SeqCst); @@ -493,7 +493,7 @@ fn test_cluster_ask_error_when_new_node_is_added() { 2 => { assert_eq!(port, 6380); assert!(contains_slice(cmd, b"GET")); - Err(Ok(Value::Data(b"123".to_vec()))) + Err(Ok(Value::BulkString(b"123".to_vec()))) } _ => { panic!("Unexpected request: {:?}", cmd); @@ -524,7 +524,7 @@ fn test_cluster_replica_read() { respond_startup_with_replica(name, cmd)?; match port { - 6380 => Err(Ok(Value::Data(b"123".to_vec()))), + 6380 => Err(Ok(Value::BulkString(b"123".to_vec()))), _ => panic!("Wrong node"), } }, @@ -546,7 +546,7 @@ fn test_cluster_replica_read() { move |cmd: &[u8], port| { respond_startup_with_replica(name, cmd)?; match port { - 6379 => Err(Ok(Value::Status("OK".into()))), + 6379 => Err(Ok(Value::SimpleString("OK".into()))), _ => panic!("Wrong node"), } }, @@ -556,7 +556,7 @@ fn test_cluster_replica_read() { .arg("test") .arg("123") .query::>(&mut connection); - assert_eq!(value, Ok(Some(Value::Status("OK".to_owned())))); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); } #[test] @@ -581,7 +581,7 @@ fn test_cluster_io_error() { std::io::ErrorKind::ConnectionReset, "mock-io-error", )))), - _ => Err(Ok(Value::Data(b"123".to_vec()))), + _ => Err(Ok(Value::BulkString(b"123".to_vec()))), }, } }, @@ -654,7 +654,7 @@ fn test_cluster_fan_out( respond_startup_with_replica_using_config(name, received_cmd, slots_config.clone())?; if received_cmd == packed_cmd { ports_clone.lock().unwrap().push(port); - return Err(Ok(Value::Status("OK".into()))); + return Err(Ok(Value::SimpleString("OK".into()))); } Ok(()) }, @@ -747,13 +747,15 @@ fn test_cluster_split_multi_shard_command_and_combine_arrays_of_values() { .iter() .filter_map(|expected_key| { if cmd_str.contains(expected_key) { - Some(Value::Data(format!("{expected_key}-{port}").into_bytes())) + Some(Value::BulkString( + format!("{expected_key}-{port}").into_bytes(), + )) } else { None } }) .collect(); - Err(Ok(Value::Bulk(results))) + Err(Ok(Value::Array(results))) }, ); @@ -781,15 +783,15 @@ fn test_cluster_route_correctly_on_packed_transaction_with_single_node_requests( respond_startup_with_replica_using_config(name, received_cmd, None)?; if port == 6381 { let results = vec![ - Value::Data("OK".as_bytes().to_vec()), - Value::Data("QUEUED".as_bytes().to_vec()), - Value::Data("QUEUED".as_bytes().to_vec()), - Value::Bulk(vec![ - Value::Data("OK".as_bytes().to_vec()), - Value::Data("bar".as_bytes().to_vec()), + Value::BulkString("OK".as_bytes().to_vec()), + Value::BulkString("QUEUED".as_bytes().to_vec()), + Value::BulkString("QUEUED".as_bytes().to_vec()), + Value::Array(vec![ + Value::BulkString("OK".as_bytes().to_vec()), + Value::BulkString("bar".as_bytes().to_vec()), ]), ]; - return Err(Ok(Value::Bulk(results))); + return Err(Ok(Value::Array(results))); } Err(Err(RedisError::from(std::io::Error::new( std::io::ErrorKind::ConnectionReset, @@ -804,8 +806,8 @@ fn test_cluster_route_correctly_on_packed_transaction_with_single_node_requests( assert_eq!( result, vec![ - Value::Data("OK".as_bytes().to_vec()), - Value::Data("bar".as_bytes().to_vec()), + Value::BulkString("OK".as_bytes().to_vec()), + Value::BulkString("bar".as_bytes().to_vec()), ] ); } @@ -817,15 +819,15 @@ fn test_cluster_route_correctly_on_packed_transaction_with_single_node_requests2 pipeline.atomic().set("foo", "bar").get("foo"); let packed_pipeline = pipeline.get_packed_pipeline(); let results = vec![ - Value::Data("OK".as_bytes().to_vec()), - Value::Data("QUEUED".as_bytes().to_vec()), - Value::Data("QUEUED".as_bytes().to_vec()), - Value::Bulk(vec![ - Value::Data("OK".as_bytes().to_vec()), - Value::Data("bar".as_bytes().to_vec()), + Value::BulkString("OK".as_bytes().to_vec()), + Value::BulkString("QUEUED".as_bytes().to_vec()), + Value::BulkString("QUEUED".as_bytes().to_vec()), + Value::Array(vec![ + Value::BulkString("OK".as_bytes().to_vec()), + Value::BulkString("bar".as_bytes().to_vec()), ]), ]; - let expected_result = Value::Bulk(results); + let expected_result = Value::Array(results); let cloned_result = expected_result.clone(); let MockEnv { diff --git a/redis/tests/test_cluster_async.rs b/redis/tests/test_cluster_async.rs index 9aa7f4fde2..e3326be25a 100644 --- a/redis/tests/test_cluster_async.rs +++ b/redis/tests/test_cluster_async.rs @@ -130,7 +130,7 @@ fn test_async_cluster_route_info_to_nodes() { let cluster = TestClusterContext::new(12, 1); let split_to_addresses_and_info = |res| -> (Vec, Vec) { - if let Value::Bulk(values) = res { + if let Value::Array(values) = res { let mut pairs: Vec<_> = values .into_iter() .map(|value| redis::from_redis_value::<(String, String)>(&value).unwrap()) @@ -471,7 +471,7 @@ fn test_async_cluster_retries() { match requests.fetch_add(1, atomic::Ordering::SeqCst) { 0..=4 => Err(parse_redis_value(b"-TRYAGAIN mock\r\n")), - _ => Err(Ok(Value::Data(b"123".to_vec()))), + _ => Err(Ok(Value::BulkString(b"123".to_vec()))), } }, ); @@ -565,13 +565,13 @@ fn test_async_cluster_move_error_when_new_node_is_added() { started.store(true, atomic::Ordering::SeqCst); if contains_slice(cmd, b"PING") { - return Err(Ok(Value::Status("OK".into()))); + return Err(Ok(Value::SimpleString("OK".into()))); } let i = requests.fetch_add(1, atomic::Ordering::SeqCst); let is_get_cmd = contains_slice(cmd, b"GET"); - let get_response = Err(Ok(Value::Data(b"123".to_vec()))); + let get_response = Err(Ok(Value::BulkString(b"123".to_vec()))); match i { // Respond that the key exists on a node that does not yet have a connection: 0 => Err(parse_redis_value( @@ -585,20 +585,20 @@ fn test_async_cluster_move_error_when_new_node_is_added() { .get(&port) .unwrap() .swap(true, Ordering::SeqCst)); - Err(Ok(Value::Bulk(vec![ - Value::Bulk(vec![ + Err(Ok(Value::Array(vec![ + Value::Array(vec![ Value::Int(0), Value::Int(1), - Value::Bulk(vec![ - Value::Data(name.as_bytes().to_vec()), + Value::Array(vec![ + Value::BulkString(name.as_bytes().to_vec()), Value::Int(6379), ]), ]), - Value::Bulk(vec![ + Value::Array(vec![ Value::Int(2), Value::Int(16383), - Value::Bulk(vec![ - Value::Data(name.as_bytes().to_vec()), + Value::Array(vec![ + Value::BulkString(name.as_bytes().to_vec()), Value::Int(6380), ]), ]), @@ -649,12 +649,12 @@ fn test_cluster_refresh_topology_after_moved_assert_get_succeed_and_expected_ret started.store(true, atomic::Ordering::SeqCst); if contains_slice(cmd, b"PING") { - return Err(Ok(Value::Status("OK".into()))); + return Err(Ok(Value::SimpleString("OK".into()))); } let i = requests.fetch_add(1, atomic::Ordering::SeqCst); let is_get_cmd = contains_slice(cmd, b"GET"); - let get_response = Err(Ok(Value::Data(b"123".to_vec()))); + let get_response = Err(Ok(Value::BulkString(b"123".to_vec()))); let moved_node = ports[0]; match i { // Respond that the key exists on a node that does not yet have a connection: @@ -729,7 +729,7 @@ fn test_cluster_refresh_topology_in_client_init_get_succeed( let is_started = started.load(atomic::Ordering::SeqCst); if !is_started { if contains_slice(cmd, b"PING") { - return Err(Ok(Value::Status("OK".into()))); + return Err(Ok(Value::SimpleString("OK".into()))); } else if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") { let view_index = get_node_view_index(slots_config_vec.len(), &ports, port); return Err(Ok(create_topology_from_config( @@ -737,16 +737,16 @@ fn test_cluster_refresh_topology_in_client_init_get_succeed( slots_config_vec[view_index].clone(), ))); } else if contains_slice(cmd, b"READONLY") { - return Err(Ok(Value::Status("OK".into()))); + return Err(Ok(Value::SimpleString("OK".into()))); } } started.store(true, atomic::Ordering::SeqCst); if contains_slice(cmd, b"PING") { - return Err(Ok(Value::Status("OK".into()))); + return Err(Ok(Value::SimpleString("OK".into()))); } let is_get_cmd = contains_slice(cmd, b"GET"); - let get_response = Err(Ok(Value::Data(b"123".to_vec()))); + let get_response = Err(Ok(Value::BulkString(b"123".to_vec()))); { assert!(is_get_cmd, "{:?}", std::str::from_utf8(cmd)); get_response @@ -878,7 +878,7 @@ fn test_async_cluster_ask_redirect() { } 2 => { assert!(contains_slice(cmd, b"GET")); - Err(Ok(Value::Data(b"123".to_vec()))) + Err(Ok(Value::BulkString(b"123".to_vec()))) } _ => panic!("Node should not be called now"), }, @@ -967,7 +967,7 @@ fn test_async_cluster_ask_error_when_new_node_is_added() { started.store(true, atomic::Ordering::SeqCst); if contains_slice(cmd, b"PING") { - return Err(Ok(Value::Status("OK".into()))); + return Err(Ok(Value::SimpleString("OK".into()))); } let i = requests.fetch_add(1, atomic::Ordering::SeqCst); @@ -985,7 +985,7 @@ fn test_async_cluster_ask_error_when_new_node_is_added() { 2 => { assert_eq!(port, 6380); assert!(contains_slice(cmd, b"GET")); - Err(Ok(Value::Data(b"123".to_vec()))) + Err(Ok(Value::BulkString(b"123".to_vec()))) } _ => { panic!("Unexpected request: {:?}", cmd); @@ -1020,7 +1020,7 @@ fn test_async_cluster_replica_read() { move |cmd: &[u8], port| { respond_startup_with_replica(name, cmd)?; match port { - 6380 => Err(Ok(Value::Data(b"123".to_vec()))), + 6380 => Err(Ok(Value::BulkString(b"123".to_vec()))), _ => panic!("Wrong node"), } }, @@ -1047,7 +1047,7 @@ fn test_async_cluster_replica_read() { move |cmd: &[u8], port| { respond_startup_with_replica(name, cmd)?; match port { - 6379 => Err(Ok(Value::Status("OK".into()))), + 6379 => Err(Ok(Value::SimpleString("OK".into()))), _ => panic!("Wrong node"), } }, @@ -1059,7 +1059,7 @@ fn test_async_cluster_replica_read() { .arg("123") .query_async::<_, Option>(&mut connection), ); - assert_eq!(value, Ok(Some(Value::Status("OK".to_owned())))); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); } fn test_cluster_fan_out( @@ -1090,7 +1090,7 @@ fn test_cluster_fan_out( respond_startup_with_replica_using_config(name, received_cmd, slots_config.clone())?; if received_cmd == packed_cmd { ports_clone.lock().unwrap().push(port); - return Err(Ok(Value::Status("OK".into()))); + return Err(Ok(Value::SimpleString("OK".into()))); } Ok(()) }, @@ -1267,14 +1267,14 @@ fn test_cluster_fan_out_and_aggregate_logical_array_response() { respond_startup_with_replica_using_config(name, received_cmd, None)?; if port == 6381 { - return Err(Ok(Value::Bulk(vec![ + return Err(Ok(Value::Array(vec![ Value::Int(0), Value::Int(0), Value::Int(1), Value::Int(1), ]))); } else if port == 6379 { - return Err(Ok(Value::Bulk(vec![ + return Err(Ok(Value::Array(vec![ Value::Int(0), Value::Int(1), Value::Int(0), @@ -1437,7 +1437,7 @@ fn test_cluster_fan_out_and_return_one_succeeded_ignoring_empty_values() { move |received_cmd: &[u8], port| { respond_startup_with_replica_using_config(name, received_cmd, None)?; if port == 6381 { - return Err(Ok(Value::Data("foo".as_bytes().to_vec()))); + return Err(Ok(Value::BulkString("foo".as_bytes().to_vec()))); } Err(Ok(Value::Nil)) }, @@ -1466,7 +1466,9 @@ fn test_cluster_fan_out_and_return_map_of_results_for_special_response_policy() name, move |received_cmd: &[u8], port| { respond_startup_with_replica_using_config(name, received_cmd, None)?; - Err(Ok(Value::Data(format!("latency: {port}").into_bytes()))) + Err(Ok(Value::BulkString( + format!("latency: {port}").into_bytes(), + ))) }, ); @@ -1503,7 +1505,7 @@ fn test_cluster_fan_out_and_combine_arrays_of_values() { name, move |received_cmd: &[u8], port| { respond_startup_with_replica_using_config(name, received_cmd, None)?; - Err(Ok(Value::Bulk(vec![Value::Data( + Err(Ok(Value::Array(vec![Value::BulkString( format!("key:{port}").into_bytes(), )]))) }, @@ -1542,13 +1544,15 @@ fn test_cluster_split_multi_shard_command_and_combine_arrays_of_values() { .iter() .filter_map(|expected_key| { if cmd_str.contains(expected_key) { - Some(Value::Data(format!("{expected_key}-{port}").into_bytes())) + Some(Value::BulkString( + format!("{expected_key}-{port}").into_bytes(), + )) } else { None } }) .collect(); - Err(Ok(Value::Bulk(results))) + Err(Ok(Value::Array(results))) }, ); @@ -1588,13 +1592,15 @@ fn test_cluster_handle_asking_error_in_split_multi_shard_command() { .iter() .filter_map(|expected_key| { if cmd_str.contains(expected_key) { - Some(Value::Data(format!("{expected_key}-{port}").into_bytes())) + Some(Value::BulkString( + format!("{expected_key}-{port}").into_bytes(), + )) } else { None } }) .collect(); - Err(Ok(Value::Bulk(results))) + Err(Ok(Value::Array(results))) }, ); @@ -1660,7 +1666,7 @@ fn test_async_cluster_io_error() { std::io::ErrorKind::ConnectionReset, "mock-io-error", )))), - _ => Err(Ok(Value::Data(b"123".to_vec()))), + _ => Err(Ok(Value::BulkString(b"123".to_vec()))), }, } }, @@ -1845,7 +1851,7 @@ fn get_queried_node_id_if_master(cluster_nodes_output: Value) -> Option // Returns the node ID of the connection that was queried for CLUSTER NODES (using the 'myself' flag), if it's a master. // Otherwise, returns None. match cluster_nodes_output { - Value::Data(val) => match from_utf8(&val) { + Value::BulkString(val) => match from_utf8(&val) { Ok(str_res) => { let parts: Vec<&str> = str_res.split('\n').collect(); for node_entry in parts { diff --git a/redis/tests/test_module_json.rs b/redis/tests/test_module_json.rs index 2d06c7cab8..296e8e6620 100644 --- a/redis/tests/test_module_json.rs +++ b/redis/tests/test_module_json.rs @@ -68,7 +68,7 @@ fn test_module_json_arr_append() { let json_append: RedisResult = con.json_arr_append(TEST_KEY, "$..a", &3i64); - assert_eq!(json_append, Ok(Bulk(vec![Int(2i64), Int(3i64), Nil]))); + assert_eq!(json_append, Ok(Array(vec![Int(2i64), Int(3i64), Nil]))); } #[test] @@ -86,7 +86,7 @@ fn test_module_json_arr_index() { let json_arrindex: RedisResult = con.json_arr_index(TEST_KEY, "$..a", &2i64); - assert_eq!(json_arrindex, Ok(Bulk(vec![Int(1i64), Int(-1i64)]))); + assert_eq!(json_arrindex, Ok(Array(vec![Int(1i64), Int(-1i64)]))); let update_initial: RedisResult = con.json_set( TEST_KEY, @@ -99,7 +99,7 @@ fn test_module_json_arr_index() { let json_arrindex_2: RedisResult = con.json_arr_index_ss(TEST_KEY, "$..a", &2i64, &0, &0); - assert_eq!(json_arrindex_2, Ok(Bulk(vec![Int(1i64), Nil]))); + assert_eq!(json_arrindex_2, Ok(Array(vec![Int(1i64), Nil]))); } #[test] @@ -117,7 +117,7 @@ fn test_module_json_arr_insert() { let json_arrinsert: RedisResult = con.json_arr_insert(TEST_KEY, "$..a", 0, &1i64); - assert_eq!(json_arrinsert, Ok(Bulk(vec![Int(2), Int(3)]))); + assert_eq!(json_arrinsert, Ok(Array(vec![Int(2), Int(3)]))); let update_initial: RedisResult = con.json_set( TEST_KEY, @@ -129,7 +129,7 @@ fn test_module_json_arr_insert() { let json_arrinsert_2: RedisResult = con.json_arr_insert(TEST_KEY, "$..a", 0, &1i64); - assert_eq!(json_arrinsert_2, Ok(Bulk(vec![Int(5), Nil]))); + assert_eq!(json_arrinsert_2, Ok(Array(vec![Int(5), Nil]))); } #[test] @@ -147,7 +147,7 @@ fn test_module_json_arr_len() { let json_arrlen: RedisResult = con.json_arr_len(TEST_KEY, "$..a"); - assert_eq!(json_arrlen, Ok(Bulk(vec![Int(1), Int(2)]))); + assert_eq!(json_arrlen, Ok(Array(vec![Int(1), Int(2)]))); let update_initial: RedisResult = con.json_set( TEST_KEY, @@ -159,7 +159,7 @@ fn test_module_json_arr_len() { let json_arrlen_2: RedisResult = con.json_arr_len(TEST_KEY, "$..a"); - assert_eq!(json_arrlen_2, Ok(Bulk(vec![Int(4), Nil]))); + assert_eq!(json_arrlen_2, Ok(Array(vec![Int(4), Nil]))); } #[test] @@ -179,10 +179,10 @@ fn test_module_json_arr_pop() { assert_eq!( json_arrpop, - Ok(Bulk(vec![ + Ok(Array(vec![ // convert string 3 to its ascii value as bytes - Data(Vec::from("3".as_bytes())), - Data(Vec::from("4".as_bytes())) + BulkString(Vec::from("3".as_bytes())), + BulkString(Vec::from("4".as_bytes())) ])) ); @@ -198,7 +198,11 @@ fn test_module_json_arr_pop() { assert_eq!( json_arrpop_2, - Ok(Bulk(vec![Data(Vec::from("\"bar\"".as_bytes())), Nil, Nil])) + Ok(Array(vec![ + BulkString(Vec::from("\"bar\"".as_bytes())), + Nil, + Nil + ])) ); } @@ -217,7 +221,7 @@ fn test_module_json_arr_trim() { let json_arrtrim: RedisResult = con.json_arr_trim(TEST_KEY, "$..a", 1, 1); - assert_eq!(json_arrtrim, Ok(Bulk(vec![Int(0), Int(1)]))); + assert_eq!(json_arrtrim, Ok(Array(vec![Int(0), Int(1)]))); let update_initial: RedisResult = con.json_set( TEST_KEY, @@ -229,7 +233,7 @@ fn test_module_json_arr_trim() { let json_arrtrim_2: RedisResult = con.json_arr_trim(TEST_KEY, "$..a", 1, 1); - assert_eq!(json_arrtrim_2, Ok(Bulk(vec![Int(1), Nil]))); + assert_eq!(json_arrtrim_2, Ok(Array(vec![Int(1), Nil]))); } #[test] @@ -327,9 +331,9 @@ fn test_module_json_mget() { assert_eq!( json_mget, - Ok(Bulk(vec![ - Data(Vec::from("[1,3]".as_bytes())), - Data(Vec::from("[4,6]".as_bytes())) + Ok(Array(vec![ + BulkString(Vec::from("[1,3]".as_bytes())), + BulkString(Vec::from("[4,6]".as_bytes())) ])) ); } @@ -375,11 +379,11 @@ fn test_module_json_obj_keys() { assert_eq!( json_objkeys, - Ok(Bulk(vec![ + Ok(Array(vec![ Nil, - Bulk(vec![ - Data(Vec::from("b".as_bytes())), - Data(Vec::from("c".as_bytes())) + Array(vec![ + BulkString(Vec::from("b".as_bytes())), + BulkString(Vec::from("c".as_bytes())) ]) ])) ); @@ -400,7 +404,7 @@ fn test_module_json_obj_len() { let json_objlen: RedisResult = con.json_obj_len(TEST_KEY, "$..a"); - assert_eq!(json_objlen, Ok(Bulk(vec![Nil, Int(2)]))); + assert_eq!(json_objlen, Ok(Array(vec![Nil, Int(2)]))); } #[test] @@ -428,7 +432,7 @@ fn test_module_json_str_append() { let json_strappend: RedisResult = con.json_str_append(TEST_KEY, "$..a", "\"baz\""); - assert_eq!(json_strappend, Ok(Bulk(vec![Int(6), Int(8), Nil]))); + assert_eq!(json_strappend, Ok(Array(vec![Int(6), Int(8), Nil]))); let json_get_check: RedisResult = con.json_get(TEST_KEY, "$"); @@ -453,7 +457,7 @@ fn test_module_json_str_len() { let json_strlen: RedisResult = con.json_str_len(TEST_KEY, "$..a"); - assert_eq!(json_strlen, Ok(Bulk(vec![Int(3), Int(5), Nil]))); + assert_eq!(json_strlen, Ok(Array(vec![Int(3), Int(5), Nil]))); } #[test] @@ -466,10 +470,10 @@ fn test_module_json_toggle() { assert_eq!(set_initial, Ok(true)); let json_toggle_a: RedisResult = con.json_toggle(TEST_KEY, "$.bool"); - assert_eq!(json_toggle_a, Ok(Bulk(vec![Int(0)]))); + assert_eq!(json_toggle_a, Ok(Array(vec![Int(0)]))); let json_toggle_b: RedisResult = con.json_toggle(TEST_KEY, "$.bool"); - assert_eq!(json_toggle_b, Ok(Bulk(vec![Int(1)]))); + assert_eq!(json_toggle_b, Ok(Array(vec![Int(1)]))); } #[test] @@ -489,20 +493,20 @@ fn test_module_json_type() { assert_eq!( json_type_a, - Ok(Bulk(vec![Data(Vec::from("string".as_bytes()))])) + Ok(Array(vec![BulkString(Vec::from("string".as_bytes()))])) ); let json_type_b: RedisResult = con.json_type(TEST_KEY, "$..a"); assert_eq!( json_type_b, - Ok(Bulk(vec![ - Data(Vec::from("integer".as_bytes())), - Data(Vec::from("boolean".as_bytes())) + Ok(Array(vec![ + BulkString(Vec::from("integer".as_bytes())), + BulkString(Vec::from("boolean".as_bytes())) ])) ); let json_type_c: RedisResult = con.json_type(TEST_KEY, "$..dummy"); - assert_eq!(json_type_c, Ok(Bulk(vec![]))); + assert_eq!(json_type_c, Ok(Array(vec![]))); } diff --git a/redis/tests/test_types.rs b/redis/tests/test_types.rs index 593c236d25..1c8452b897 100644 --- a/redis/tests/test_types.rs +++ b/redis/tests/test_types.rs @@ -23,7 +23,7 @@ fn test_is_single_arg() { fn test_info_dict() { use redis::{FromRedisValue, InfoDict, Value}; - let d: InfoDict = FromRedisValue::from_redis_value(&Value::Status( + let d: InfoDict = FromRedisValue::from_redis_value(&Value::SimpleString( "# this is a comment\nkey1:foo\nkey2:42\n".into(), )) .unwrap(); @@ -37,16 +37,17 @@ fn test_info_dict() { fn test_i32() { use redis::{ErrorKind, FromRedisValue, Value}; - let i = FromRedisValue::from_redis_value(&Value::Status("42".into())); + let i = FromRedisValue::from_redis_value(&Value::SimpleString("42".into())); assert_eq!(i, Ok(42i32)); let i = FromRedisValue::from_redis_value(&Value::Int(42)); assert_eq!(i, Ok(42i32)); - let i = FromRedisValue::from_redis_value(&Value::Data("42".into())); + let i = FromRedisValue::from_redis_value(&Value::BulkString("42".into())); assert_eq!(i, Ok(42i32)); - let bad_i: Result = FromRedisValue::from_redis_value(&Value::Status("42x".into())); + let bad_i: Result = + FromRedisValue::from_redis_value(&Value::SimpleString("42x".into())); assert_eq!(bad_i.unwrap_err().kind(), ErrorKind::TypeError); } @@ -54,10 +55,10 @@ fn test_i32() { fn test_u32() { use redis::{ErrorKind, FromRedisValue, Value}; - let i = FromRedisValue::from_redis_value(&Value::Status("42".into())); + let i = FromRedisValue::from_redis_value(&Value::SimpleString("42".into())); assert_eq!(i, Ok(42u32)); - let bad_i: Result = FromRedisValue::from_redis_value(&Value::Status("-1".into())); + let bad_i: Result = FromRedisValue::from_redis_value(&Value::SimpleString("-1".into())); assert_eq!(bad_i.unwrap_err().kind(), ErrorKind::TypeError); } @@ -65,23 +66,23 @@ fn test_u32() { fn test_vec() { use redis::{FromRedisValue, Value}; - let v = FromRedisValue::from_redis_value(&Value::Bulk(vec![ - Value::Data("1".into()), - Value::Data("2".into()), - Value::Data("3".into()), + let v = FromRedisValue::from_redis_value(&Value::Array(vec![ + Value::BulkString("1".into()), + Value::BulkString("2".into()), + Value::BulkString("3".into()), ])); assert_eq!(v, Ok(vec![1i32, 2, 3])); let content: &[u8] = b"\x01\x02\x03\x04"; let content_vec: Vec = Vec::from(content); - let v = FromRedisValue::from_redis_value(&Value::Data(content_vec.clone())); + let v = FromRedisValue::from_redis_value(&Value::BulkString(content_vec.clone())); assert_eq!(v, Ok(content_vec)); let content: &[u8] = b"1"; let content_vec: Vec = Vec::from(content); - let v = FromRedisValue::from_redis_value(&Value::Data(content_vec.clone())); + let v = FromRedisValue::from_redis_value(&Value::BulkString(content_vec.clone())); assert_eq!(v, Ok(vec![b'1'])); - let v = FromRedisValue::from_redis_value(&Value::Data(content_vec)); + let v = FromRedisValue::from_redis_value(&Value::BulkString(content_vec)); assert_eq!(v, Ok(vec![1_u16])); } @@ -89,28 +90,28 @@ fn test_vec() { fn test_box_slice() { use redis::{FromRedisValue, Value}; - let v = FromRedisValue::from_redis_value(&Value::Bulk(vec![ - Value::Data("1".into()), - Value::Data("2".into()), - Value::Data("3".into()), + let v = FromRedisValue::from_redis_value(&Value::Array(vec![ + Value::BulkString("1".into()), + Value::BulkString("2".into()), + Value::BulkString("3".into()), ])); assert_eq!(v, Ok(vec![1i32, 2, 3].into_boxed_slice())); let content: &[u8] = b"\x01\x02\x03\x04"; let content_vec: Vec = Vec::from(content); - let v = FromRedisValue::from_redis_value(&Value::Data(content_vec.clone())); + let v = FromRedisValue::from_redis_value(&Value::BulkString(content_vec.clone())); assert_eq!(v, Ok(content_vec.into_boxed_slice())); let content: &[u8] = b"1"; let content_vec: Vec = Vec::from(content); - let v = FromRedisValue::from_redis_value(&Value::Data(content_vec.clone())); + let v = FromRedisValue::from_redis_value(&Value::BulkString(content_vec.clone())); assert_eq!(v, Ok(vec![b'1'].into_boxed_slice())); - let v = FromRedisValue::from_redis_value(&Value::Data(content_vec)); + let v = FromRedisValue::from_redis_value(&Value::BulkString(content_vec)); assert_eq!(v, Ok(vec![1_u16].into_boxed_slice())); assert_eq!( Box::<[i32]>::from_redis_value( - &Value::Data("just a string".into()) + &Value::BulkString("just a string".into()) ).unwrap_err().to_string(), "Response was of incompatible type - TypeError: \"Conversion to alloc::boxed::Box<[i32]> failed.\" (response was string-data('\"just a string\"'))", ); @@ -121,28 +122,28 @@ fn test_arc_slice() { use redis::{FromRedisValue, Value}; use std::sync::Arc; - let v = FromRedisValue::from_redis_value(&Value::Bulk(vec![ - Value::Data("1".into()), - Value::Data("2".into()), - Value::Data("3".into()), + let v = FromRedisValue::from_redis_value(&Value::Array(vec![ + Value::BulkString("1".into()), + Value::BulkString("2".into()), + Value::BulkString("3".into()), ])); assert_eq!(v, Ok(Arc::from(vec![1i32, 2, 3]))); let content: &[u8] = b"\x01\x02\x03\x04"; let content_vec: Vec = Vec::from(content); - let v = FromRedisValue::from_redis_value(&Value::Data(content_vec.clone())); + let v = FromRedisValue::from_redis_value(&Value::BulkString(content_vec.clone())); assert_eq!(v, Ok(Arc::from(content_vec))); let content: &[u8] = b"1"; let content_vec: Vec = Vec::from(content); - let v = FromRedisValue::from_redis_value(&Value::Data(content_vec.clone())); + let v = FromRedisValue::from_redis_value(&Value::BulkString(content_vec.clone())); assert_eq!(v, Ok(Arc::from(vec![b'1']))); - let v = FromRedisValue::from_redis_value(&Value::Data(content_vec)); + let v = FromRedisValue::from_redis_value(&Value::BulkString(content_vec)); assert_eq!(v, Ok(Arc::from(vec![1_u16]))); assert_eq!( Arc::<[i32]>::from_redis_value( - &Value::Data("just a string".into()) + &Value::BulkString("just a string".into()) ).unwrap_err().to_string(), "Response was of incompatible type - TypeError: \"Conversion to alloc::sync::Arc<[i32]> failed.\" (response was string-data('\"just a string\"'))", ); @@ -152,7 +153,7 @@ fn test_arc_slice() { fn test_single_bool_vec() { use redis::{FromRedisValue, Value}; - let v = FromRedisValue::from_redis_value(&Value::Data("1".into())); + let v = FromRedisValue::from_redis_value(&Value::BulkString("1".into())); assert_eq!(v, Ok(vec![true])); } @@ -161,7 +162,7 @@ fn test_single_bool_vec() { fn test_single_i32_vec() { use redis::{FromRedisValue, Value}; - let v = FromRedisValue::from_redis_value(&Value::Data("1".into())); + let v = FromRedisValue::from_redis_value(&Value::BulkString("1".into())); assert_eq!(v, Ok(vec![1i32])); } @@ -170,7 +171,7 @@ fn test_single_i32_vec() { fn test_single_u32_vec() { use redis::{FromRedisValue, Value}; - let v = FromRedisValue::from_redis_value(&Value::Data("42".into())); + let v = FromRedisValue::from_redis_value(&Value::BulkString("42".into())); assert_eq!(v, Ok(vec![42u32])); } @@ -179,7 +180,7 @@ fn test_single_u32_vec() { fn test_single_string_vec() { use redis::{FromRedisValue, Value}; - let v = FromRedisValue::from_redis_value(&Value::Data("1".into())); + let v = FromRedisValue::from_redis_value(&Value::BulkString("1".into())); assert_eq!(v, Ok(vec!["1".to_string()])); } @@ -188,10 +189,10 @@ fn test_single_string_vec() { fn test_tuple() { use redis::{FromRedisValue, Value}; - let v = FromRedisValue::from_redis_value(&Value::Bulk(vec![Value::Bulk(vec![ - Value::Data("1".into()), - Value::Data("2".into()), - Value::Data("3".into()), + let v = FromRedisValue::from_redis_value(&Value::Array(vec![Value::Array(vec![ + Value::BulkString("1".into()), + Value::BulkString("2".into()), + Value::BulkString("3".into()), ])])); assert_eq!(v, Ok(((1i32, 2, 3,),))); @@ -206,13 +207,13 @@ fn test_hashmap() { type Hm = HashMap; - let v: Result = FromRedisValue::from_redis_value(&Value::Bulk(vec![ - Value::Data("a".into()), - Value::Data("1".into()), - Value::Data("b".into()), - Value::Data("2".into()), - Value::Data("c".into()), - Value::Data("3".into()), + let v: Result = FromRedisValue::from_redis_value(&Value::Array(vec![ + Value::BulkString("a".into()), + Value::BulkString("1".into()), + Value::BulkString("b".into()), + Value::BulkString("2".into()), + Value::BulkString("c".into()), + Value::BulkString("3".into()), ])); let mut e: Hm = HashMap::new(); e.insert("a".into(), 1); @@ -222,13 +223,13 @@ fn test_hashmap() { type Hasher = BuildHasherDefault; type HmHasher = HashMap; - let v: Result = FromRedisValue::from_redis_value(&Value::Bulk(vec![ - Value::Data("a".into()), - Value::Data("1".into()), - Value::Data("b".into()), - Value::Data("2".into()), - Value::Data("c".into()), - Value::Data("3".into()), + let v: Result = FromRedisValue::from_redis_value(&Value::Array(vec![ + Value::BulkString("a".into()), + Value::BulkString("1".into()), + Value::BulkString("b".into()), + Value::BulkString("2".into()), + Value::BulkString("c".into()), + Value::BulkString("3".into()), ])); let fnv = Hasher::default(); @@ -243,22 +244,23 @@ fn test_hashmap() { fn test_bool() { use redis::{ErrorKind, FromRedisValue, Value}; - let v = FromRedisValue::from_redis_value(&Value::Data("1".into())); + let v = FromRedisValue::from_redis_value(&Value::BulkString("1".into())); assert_eq!(v, Ok(true)); - let v = FromRedisValue::from_redis_value(&Value::Data("0".into())); + let v = FromRedisValue::from_redis_value(&Value::BulkString("0".into())); assert_eq!(v, Ok(false)); - let v: Result = FromRedisValue::from_redis_value(&Value::Data("garbage".into())); + let v: Result = FromRedisValue::from_redis_value(&Value::BulkString("garbage".into())); assert_eq!(v.unwrap_err().kind(), ErrorKind::TypeError); - let v = FromRedisValue::from_redis_value(&Value::Status("1".into())); + let v = FromRedisValue::from_redis_value(&Value::SimpleString("1".into())); assert_eq!(v, Ok(true)); - let v = FromRedisValue::from_redis_value(&Value::Status("0".into())); + let v = FromRedisValue::from_redis_value(&Value::SimpleString("0".into())); assert_eq!(v, Ok(false)); - let v: Result = FromRedisValue::from_redis_value(&Value::Status("garbage".into())); + let v: Result = + FromRedisValue::from_redis_value(&Value::SimpleString("garbage".into())); assert_eq!(v.unwrap_err().kind(), ErrorKind::TypeError); let v = FromRedisValue::from_redis_value(&Value::Okay); @@ -284,10 +286,11 @@ fn test_bytes() { let content_vec: Vec = Vec::from(content); let content_bytes = Bytes::from_static(content); - let v: RedisResult = FromRedisValue::from_redis_value(&Value::Data(content_vec)); + let v: RedisResult = FromRedisValue::from_redis_value(&Value::BulkString(content_vec)); assert_eq!(v, Ok(content_bytes)); - let v: RedisResult = FromRedisValue::from_redis_value(&Value::Status("garbage".into())); + let v: RedisResult = + FromRedisValue::from_redis_value(&Value::SimpleString("garbage".into())); assert_eq!(v.unwrap_err().kind(), ErrorKind::TypeError); let v: RedisResult = FromRedisValue::from_redis_value(&Value::Okay); @@ -311,18 +314,18 @@ fn test_cstring() { let content: &[u8] = b"\x01\x02\x03\x04"; let content_vec: Vec = Vec::from(content); - let v: RedisResult = FromRedisValue::from_redis_value(&Value::Data(content_vec)); + let v: RedisResult = FromRedisValue::from_redis_value(&Value::BulkString(content_vec)); assert_eq!(v, Ok(CString::new(content).unwrap())); let v: RedisResult = - FromRedisValue::from_redis_value(&Value::Status("garbage".into())); + FromRedisValue::from_redis_value(&Value::SimpleString("garbage".into())); assert_eq!(v, Ok(CString::new("garbage").unwrap())); let v: RedisResult = FromRedisValue::from_redis_value(&Value::Okay); assert_eq!(v, Ok(CString::new("OK").unwrap())); let v: RedisResult = - FromRedisValue::from_redis_value(&Value::Status("gar\0bage".into())); + FromRedisValue::from_redis_value(&Value::SimpleString("gar\0bage".into())); assert_eq!(v.unwrap_err().kind(), ErrorKind::TypeError); let v: RedisResult = FromRedisValue::from_redis_value(&Value::Nil); @@ -378,3 +381,30 @@ fn test_types_to_redis_args() { .to_redis_args() .is_empty()); } + +#[test] +fn test_attributes() { + use redis::{parse_redis_value, FromRedisValue, Value}; + let bytes: &[u8] = b"*3\r\n:1\r\n:2\r\n|1\r\n+ttl\r\n:3600\r\n:3\r\n"; + let val = parse_redis_value(bytes).unwrap(); + { + // The case user doesn't expect attributes from server + let x: Vec = redis::FromRedisValue::from_redis_value(&val).unwrap(); + assert_eq!(x, vec![1, 2, 3]); + } + { + // The case user wants raw value from server + let x: Value = FromRedisValue::from_redis_value(&val).unwrap(); + assert_eq!( + x, + Value::Array(vec![ + Value::Int(1), + Value::Int(2), + Value::Attribute { + data: Box::new(Value::Int(3)), + attributes: vec![(Value::SimpleString("ttl".to_string()), Value::Int(3600))] + } + ]) + ) + } +}