From 5bf14362c5432fc25a1c7d810236f29a7fdeec74 Mon Sep 17 00:00:00 2001 From: Shachar Langbeheim Date: Tue, 19 Mar 2024 19:25:06 +0200 Subject: [PATCH] Separate connection info from protobuf. (#1088) * Separate connection info from protobuf. This will allow the FFI clients to remove their dependency on protobuf. Pros: * faster compile times * no need for protobuf dependency in wrappers. * Update glide-core/src/client/types.rs Co-authored-by: Aaron <69273634+aaron-congo@users.noreply.github.com> * round --------- Co-authored-by: Shachar Langbeheim <98546660+shachlanAmazon@users.noreply.github.com> Co-authored-by: Aaron <69273634+aaron-congo@users.noreply.github.com> --- .github/workflows/lint-rust/action.yml | 5 + .github/workflows/rust.yml | 4 +- benchmarks/rust/src/main.rs | 33 ++-- csharp/lib/src/lib.rs | 36 ++-- glide-core/Cargo.toml | 23 ++- glide-core/benches/memory_benchmark.rs | 4 +- glide-core/build.rs | 8 +- glide-core/src/client/mod.rs | 113 +++++------- .../src/client/reconnecting_connection.rs | 2 +- glide-core/src/client/standalone_client.rs | 22 ++- glide-core/src/client/types.rs | 168 ++++++++++++++++++ glide-core/src/lib.rs | 5 + glide-core/src/retry_strategies.rs | 6 +- glide-core/src/socket_listener.rs | 2 +- glide-core/tests/test_client.rs | 7 +- glide-core/tests/test_socket_listener.rs | 4 +- glide-core/tests/test_standalone_client.rs | 4 +- glide-core/tests/utilities/cluster.rs | 2 +- glide-core/tests/utilities/mod.rs | 2 +- java/Cargo.toml | 2 +- node/rust-client/Cargo.toml | 2 +- python/Cargo.toml | 2 +- 22 files changed, 309 insertions(+), 147 deletions(-) create mode 100644 glide-core/src/client/types.rs diff --git a/.github/workflows/lint-rust/action.yml b/.github/workflows/lint-rust/action.yml index e1cbad7d14..ec1531e877 100644 --- a/.github/workflows/lint-rust/action.yml +++ b/.github/workflows/lint-rust/action.yml @@ -30,6 +30,11 @@ runs: working-directory: ${{ inputs.cargo-toml-folder }} shell: bash + # We run clippy without features + - run: cargo clippy --all-targets -- -D warnings + working-directory: ${{ inputs.cargo-toml-folder }} + shell: bash + - run: | cargo update cargo install cargo-deny diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 536af28ac9..87b1a17dc1 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -56,11 +56,11 @@ jobs: - name: Run tests working-directory: ./glide-core - run: cargo test -- --nocapture --test-threads=1 # TODO remove the concurrency limit after we fix test flakyness. + run: cargo test --all-features -- --nocapture --test-threads=1 # TODO remove the concurrency limit after we fix test flakyness. - name: Run logger tests working-directory: ./logger_core - run: cargo test -- --nocapture --test-threads=1 + run: cargo test --all-features -- --nocapture --test-threads=1 - name: Check features working-directory: ./glide-core diff --git a/benchmarks/rust/src/main.rs b/benchmarks/rust/src/main.rs index daacb0a44a..8503375195 100644 --- a/benchmarks/rust/src/main.rs +++ b/benchmarks/rust/src/main.rs @@ -11,10 +11,7 @@ static GLOBAL: Jemalloc = Jemalloc; use clap::Parser; use futures::{self, future::join_all, stream, StreamExt}; -use glide_core::{ - client::Client, - connection_request::{ConnectionRequest, NodeAddress, TlsMode}, -}; +use glide_core::client::{Client, ConnectionRequest, NodeAddress, TlsMode}; use rand::{thread_rng, Rng}; use serde_json::Value; use std::{ @@ -223,19 +220,21 @@ fn generate_random_string(length: usize) -> String { } async fn get_connection(args: &Args) -> Client { - let mut connection_request = ConnectionRequest::new(); - connection_request.tls_mode = if args.tls { - TlsMode::SecureTls - } else { - TlsMode::NoTls - } - .into(); - let mut address_info: NodeAddress = NodeAddress::new(); - address_info.host = args.host.clone().into(); - address_info.port = args.port; - connection_request.addresses.push(address_info); - connection_request.request_timeout = 2000; - connection_request.cluster_mode_enabled = args.cluster_mode_enabled; + let address_info: NodeAddress = NodeAddress { + host: args.host.clone(), + port: args.port as u16, + }; + let connection_request = ConnectionRequest { + addresses: vec![address_info], + cluster_mode_enabled: args.cluster_mode_enabled, + request_timeout: Some(2000), + tls_mode: if args.tls { + Some(TlsMode::SecureTls) + } else { + Some(TlsMode::NoTls) + }, + ..Default::default() + }; glide_core::client::Client::new(connection_request) .await diff --git a/csharp/lib/src/lib.rs b/csharp/lib/src/lib.rs index 495d959598..8baa6d0155 100644 --- a/csharp/lib/src/lib.rs +++ b/csharp/lib/src/lib.rs @@ -1,8 +1,8 @@ /** * Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */ -use glide_core::connection_request; -use glide_core::{client::Client as GlideClient, connection_request::NodeAddress}; +use glide_core::client; +use glide_core::client::Client as GlideClient; use redis::{Cmd, FromRedisValue, RedisResult}; use std::{ ffi::{c_void, CStr, CString}, @@ -26,25 +26,21 @@ pub struct Client { runtime: Runtime, } -fn create_connection_request( - host: String, - port: u32, - use_tls: bool, -) -> connection_request::ConnectionRequest { - let mut address_info = NodeAddress::new(); - address_info.host = host.to_string().into(); - address_info.port = port; - let addresses_info = vec![address_info]; - let mut connection_request = connection_request::ConnectionRequest::new(); - connection_request.addresses = addresses_info; - connection_request.tls_mode = if use_tls { - connection_request::TlsMode::SecureTls - } else { - connection_request::TlsMode::NoTls +fn create_connection_request(host: String, port: u32, use_tls: bool) -> client::ConnectionRequest { + let address_info = client::NodeAddress { + host, + port: port as u16, + }; + let addresses = vec![address_info]; + client::ConnectionRequest { + addresses, + tls_mode: if use_tls { + Some(client::TlsMode::SecureTls) + } else { + Some(client::TlsMode::NoTls) + }, + ..Default::default() } - .into(); - - connection_request } fn create_client_internal( diff --git a/glide-core/Cargo.toml b/glide-core/Cargo.toml index 55a0e82fee..9055b71d28 100644 --- a/glide-core/Cargo.toml +++ b/glide-core/Cargo.toml @@ -8,27 +8,30 @@ authors = ["Amazon Web Services"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -bytes = "^1.3" +bytes = { version = "^1.3", optional = true } futures = "^0.3" redis = { path = "../submodules/redis-rs/redis", features = ["aio", "tokio-comp", "tokio-rustls-comp", "connection-manager","cluster", "cluster-async"] } -signal-hook = "^0.3" -signal-hook-tokio = {version = "^0.3", features = ["futures-v0_3"] } +signal-hook = { version = "^0.3", optional = true } +signal-hook-tokio = {version = "^0.3", features = ["futures-v0_3"], optional = true } tokio = { version = "1", features = ["macros", "time"] } logger_core = {path = "../logger_core"} dispose = "0.5.0" -tokio-util = {version = "^0.7", features = ["rt"]} -num_cpus = "^1.15" +tokio-util = {version = "^0.7", features = ["rt"], optional = true} +num_cpus = { version = "^1.15", optional = true } tokio-retry = "0.3.0" -protobuf = {version= "3", features = ["bytes", "with-bytes"]} -integer-encoding = "4.0.0" +protobuf = { version= "3", features = ["bytes", "with-bytes"], optional = true } +integer-encoding = { version = "4.0.0", optional = true } thiserror = "1" -rand = "0.8.5" +rand = { version = "0.8.5", optional = true } futures-intrusive = "0.5.0" -directories = "4.0" +directories = { version = "4.0", optional = true } once_cell = "1.18.0" arcstr = "1.1.5" sha1_smol = "1.0.0" +[features] +socket-layer = ["directories", "integer-encoding", "num_cpus", "signal-hook", "signal-hook-tokio", "protobuf", "tokio-util", "bytes", "rand"] + [dev-dependencies] rsevents = "0.3.1" socket2 = "^0.5" @@ -40,6 +43,8 @@ ctor = "0.2.2" redis = { path = "../submodules/redis-rs/redis", features = ["tls-rustls-insecure"] } iai-callgrind = "0.9" tokio = { version = "1", features = ["rt-multi-thread"] } +glide-core = { path = ".", features = ["socket-layer"] } # always enable this feature in tests. + [build-dependencies] protobuf-codegen = "3" diff --git a/glide-core/benches/memory_benchmark.rs b/glide-core/benches/memory_benchmark.rs index 44b15a6350..c6e307bae2 100644 --- a/glide-core/benches/memory_benchmark.rs +++ b/glide-core/benches/memory_benchmark.rs @@ -26,7 +26,9 @@ where { let runtime = Builder::new_current_thread().enable_all().build().unwrap(); runtime.block_on(async { - let client = Client::new(create_connection_request()).await.unwrap(); + let client = Client::new(create_connection_request().into()) + .await + .unwrap(); f(client).await; }); } diff --git a/glide-core/build.rs b/glide-core/build.rs index 40d734c507..9d41cd2491 100644 --- a/glide-core/build.rs +++ b/glide-core/build.rs @@ -2,7 +2,8 @@ * Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */ -fn main() { +#[cfg(feature = "socket-layer")] +fn build_protobuf() { let customization_options = protobuf_codegen::Customize::default() .lite_runtime(false) .tokio_bytes(true) @@ -16,3 +17,8 @@ fn main() { .customize(customization_options) .run_from_script(); } + +fn main() { + #[cfg(feature = "socket-layer")] + build_protobuf(); +} diff --git a/glide-core/src/client/mod.rs b/glide-core/src/client/mod.rs index 138e352045..645bef1118 100644 --- a/glide-core/src/client/mod.rs +++ b/glide-core/src/client/mod.rs @@ -1,9 +1,8 @@ /** * Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */ -use crate::connection_request::{ - connection_request, ConnectionRequest, NodeAddress, ProtocolVersion, ReadFrom, TlsMode, -}; +mod types; + use crate::scripts_container::get_script; use futures::FutureExt; use logger_core::log_info; @@ -16,6 +15,7 @@ pub use standalone_client::StandaloneClient; use std::io; use std::ops::Deref; use std::time::Duration; +pub use types::*; use self::value_conversion::{convert_to_expected_type, expected_type_for_cmd}; mod reconnecting_connection; @@ -34,41 +34,28 @@ pub(super) fn get_port(address: &NodeAddress) -> u16 { if address.port == 0 { DEFAULT_PORT } else { - address.port as u16 - } -} - -fn chars_to_string_option(chars: &::protobuf::Chars) -> Option { - if chars.is_empty() { - None - } else { - Some(chars.to_string()) - } -} - -pub fn convert_to_redis_protocol(protocol: ProtocolVersion) -> redis::ProtocolVersion { - match protocol { - ProtocolVersion::RESP3 => redis::ProtocolVersion::RESP3, - ProtocolVersion::RESP2 => redis::ProtocolVersion::RESP2, + address.port } } pub(super) fn get_redis_connection_info( connection_request: &ConnectionRequest, ) -> redis::RedisConnectionInfo { - let protocol = convert_to_redis_protocol(connection_request.protocol.enum_value_or_default()); - match connection_request.authentication_info.0.as_ref() { + let protocol = connection_request.protocol.unwrap_or_default(); + let db = connection_request.database_id; + let client_name = connection_request.client_name.clone(); + match &connection_request.authentication_info { Some(info) => redis::RedisConnectionInfo { - db: connection_request.database_id as i64, - username: chars_to_string_option(&info.username), - password: chars_to_string_option(&info.password), + db, + username: info.username.clone(), + password: info.password.clone(), protocol, - client_name: chars_to_string_option(&connection_request.client_name), + client_name, }, None => redis::RedisConnectionInfo { - db: connection_request.database_id as i64, + db, protocol, - client_name: chars_to_string_option(&connection_request.client_name), + client_name, ..Default::default() }, } @@ -264,34 +251,29 @@ fn eval_cmd>(hash: &str, keys: Vec, args: Vec) -> C cmd } -fn to_duration(time_in_millis: u32, default: Duration) -> Duration { - if time_in_millis > 0 { - Duration::from_millis(time_in_millis as u64) - } else { - default - } +fn to_duration(time_in_millis: Option, default: Duration) -> Duration { + time_in_millis + .map(|val| Duration::from_millis(val as u64)) + .unwrap_or(default) } async fn create_cluster_client( request: ConnectionRequest, ) -> RedisResult { // TODO - implement timeout for each connection attempt - let tls_mode = request.tls_mode.enum_value_or_default(); + let tls_mode = request.tls_mode.unwrap_or_default(); let redis_connection_info = get_redis_connection_info(&request); let initial_nodes: Vec<_> = request .addresses .into_iter() .map(|address| get_connection_info(&address, tls_mode, redis_connection_info.clone())) .collect(); - let read_from = request.read_from.enum_value().unwrap_or(ReadFrom::Primary); - let read_from_replicas = !matches!(read_from, ReadFrom::Primary,); // TODO - implement different read from replica strategies. + let read_from = request.read_from.unwrap_or_default(); + let read_from_replicas = !matches!(read_from, ReadFrom::Primary); // TODO - implement different read from replica strategies. let periodic_checks = match request.periodic_checks { - Some(periodic_checks) => match periodic_checks { - connection_request::Periodic_checks::PeriodicChecksManualInterval(interval) => { - Some(Duration::from_secs(interval.duration_in_sec.into())) - } - connection_request::Periodic_checks::PeriodicChecksDisabled(_) => None, - }, + Some(PeriodicCheck::Disabled) => None, + Some(PeriodicCheck::Enabled) => Some(DEFAULT_PERIODIC_CHECKS_INTERVAL), + Some(PeriodicCheck::ManualInterval(interval)) => Some(interval), None => Some(DEFAULT_PERIODIC_CHECKS_INTERVAL), }; let mut builder = redis::cluster::ClusterClientBuilder::new(initial_nodes) @@ -302,9 +284,7 @@ async fn create_cluster_client( if let Some(interval_duration) = periodic_checks { builder = builder.periodic_topology_checks(interval_duration); } - builder = builder.use_protocol(convert_to_redis_protocol( - request.protocol.enum_value_or_default(), - )); + builder = builder.use_protocol(request.protocol.unwrap_or_default()); if let Some(client_name) = redis_connection_info.client_name { builder = builder.client_name(client_name); } @@ -347,8 +327,11 @@ impl std::fmt::Display for ConnectionError { } } -fn format_non_zero_value(name: &'static str, value: u32) -> String { - if value > 0 { +fn format_optional_value(name: &'static str, value: Option) -> String +where + T: std::fmt::Display, +{ + if let Some(value) = value { format!("\n{name}: {value}") } else { String::new() @@ -364,7 +347,6 @@ fn sanitized_request_string(request: &ConnectionRequest) -> String { .join(", "); let tls_mode = request .tls_mode - .enum_value() .map(|tls_mode| { format!( "\nTLS mode: {}", @@ -381,54 +363,47 @@ fn sanitized_request_string(request: &ConnectionRequest) -> String { } else { "\nStandalone mode" }; - let request_timeout = format_non_zero_value("Request timeout", request.request_timeout); - let database_id = format_non_zero_value("database ID", request.database_id); + let request_timeout = format_optional_value("Request timeout", request.request_timeout); + let database_id = format!("\ndatabase ID: {}", request.database_id); let rfr_strategy = request .read_from - .enum_value() .map(|rfr| { format!( "\nRead from Replica mode: {}", match rfr { ReadFrom::Primary => "Only primary", ReadFrom::PreferReplica => "Prefer replica", - ReadFrom::LowestLatency => "Lowest latency", - ReadFrom::AZAffinity => "Availability zone affinity", } ) }) .unwrap_or_default(); - let connection_retry_strategy = request.connection_retry_strategy.0.as_ref().map(|strategy| + let connection_retry_strategy = request.connection_retry_strategy.as_ref().map(|strategy| format!("\nreconnect backoff strategy: number of increasing duration retries: {}, base: {}, factor: {}", strategy.number_of_retries, strategy.exponent_base, strategy.factor)).unwrap_or_default(); let protocol = request .protocol - .enum_value() .map(|protocol| format!("\nProtocol: {protocol:?}")) .unwrap_or_default(); - let client_name = chars_to_string_option(&request.client_name) + let client_name = request + .client_name + .as_ref() .map(|client_name| format!("\nClient name: {client_name}")) .unwrap_or_default(); let periodic_checks = if request.cluster_mode_enabled { match request.periodic_checks { - Some(ref periodic_checks) => match periodic_checks { - connection_request::Periodic_checks::PeriodicChecksManualInterval(interval) => { - format!( - "\nPeriodic Checks: Enabled with manual interval of {:?}s", - interval.duration_in_sec - ) - } - connection_request::Periodic_checks::PeriodicChecksDisabled(_) => { - "\nPeriodic Checks: Disabled".to_string() - } - }, - None => format!( + Some(PeriodicCheck::Disabled) => "\nPeriodic Checks: Disabled".to_string(), + Some(PeriodicCheck::Enabled) => format!( "\nPeriodic Checks: Enabled with default interval of {:?}", DEFAULT_PERIODIC_CHECKS_INTERVAL ), + Some(PeriodicCheck::ManualInterval(interval)) => format!( + "\nPeriodic Checks: Enabled with manual interval of {:?}s", + interval.as_secs() + ), + None => String::new(), } } else { - "".to_string() + String::new() }; format!( diff --git a/glide-core/src/client/reconnecting_connection.rs b/glide-core/src/client/reconnecting_connection.rs index f6964ec556..c039d347bd 100644 --- a/glide-core/src/client/reconnecting_connection.rs +++ b/glide-core/src/client/reconnecting_connection.rs @@ -1,7 +1,7 @@ /** * Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */ -use crate::connection_request::{NodeAddress, TlsMode}; +use super::{NodeAddress, TlsMode}; use crate::retry_strategies::RetryStrategy; use futures_intrusive::sync::ManualResetEvent; use logger_core::{log_debug, log_trace, log_warn}; diff --git a/glide-core/src/client/standalone_client.rs b/glide-core/src/client/standalone_client.rs index f2a337e07d..79246a7b76 100644 --- a/glide-core/src/client/standalone_client.rs +++ b/glide-core/src/client/standalone_client.rs @@ -3,13 +3,12 @@ */ use super::get_redis_connection_info; use super::reconnecting_connection::ReconnectingConnection; -use crate::connection_request::{ConnectionRequest, NodeAddress, TlsMode}; +use super::{ConnectionRequest, NodeAddress, TlsMode}; use crate::retry_strategies::RetryStrategy; use futures::{future, stream, StreamExt}; #[cfg(standalone_heartbeat)] use logger_core::log_debug; use logger_core::log_warn; -use protobuf::EnumOrUnknown; use redis::cluster_routing::{self, is_readonly_cmd, ResponsePolicy, Routable, RoutingInfo}; use redis::{RedisError, RedisResult, Value}; use std::sync::atomic::AtomicUsize; @@ -92,10 +91,10 @@ impl StandaloneClient { if connection_request.addresses.is_empty() { return Err(StandaloneClientConnectionError::NoAddressesProvided); } - let retry_strategy = RetryStrategy::new(&connection_request.connection_retry_strategy.0); let redis_connection_info = get_redis_connection_info(&connection_request); + let retry_strategy = RetryStrategy::new(connection_request.connection_retry_strategy); - let tls_mode = connection_request.tls_mode.enum_value_or_default(); + let tls_mode = connection_request.tls_mode; let node_count = connection_request.addresses.len(); let mut stream = stream::iter(connection_request.addresses.iter()) .map(|address| async { @@ -103,7 +102,7 @@ impl StandaloneClient { address, &retry_strategy, &redis_connection_info, - tls_mode, + tls_mode.unwrap_or(TlsMode::NoTls), ) .await .map_err(|err| (format!("{}:{}", address.host, address.port), err)) @@ -153,7 +152,7 @@ impl StandaloneClient { ), ); } - let read_from = get_read_from(&connection_request.read_from); + let read_from = get_read_from(connection_request.read_from); #[cfg(standalone_heartbeat)] for node in nodes.iter() { @@ -405,13 +404,12 @@ async fn get_connection_and_replication_info( } } -fn get_read_from(read_from: &EnumOrUnknown) -> ReadFrom { - match read_from.enum_value_or_default() { - crate::connection_request::ReadFrom::Primary => ReadFrom::Primary, - crate::connection_request::ReadFrom::PreferReplica => ReadFrom::PreferReplica { +fn get_read_from(read_from: Option) -> ReadFrom { + match read_from { + Some(super::ReadFrom::Primary) => ReadFrom::Primary, + Some(super::ReadFrom::PreferReplica) => ReadFrom::PreferReplica { latest_read_replica_index: Default::default(), }, - crate::connection_request::ReadFrom::LowestLatency => todo!(), - crate::connection_request::ReadFrom::AZAffinity => todo!(), + None => ReadFrom::Primary, } } diff --git a/glide-core/src/client/types.rs b/glide-core/src/client/types.rs new file mode 100644 index 0000000000..f942f64174 --- /dev/null +++ b/glide-core/src/client/types.rs @@ -0,0 +1,168 @@ +/* + * Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 + */ + +use std::time::Duration; + +#[cfg(feature = "socket-layer")] +use crate::connection_request as protobuf; + +#[derive(Default)] +pub struct ConnectionRequest { + pub read_from: Option, + pub client_name: Option, + pub authentication_info: Option, + pub database_id: i64, + pub protocol: Option, + pub tls_mode: Option, + pub addresses: Vec, + pub cluster_mode_enabled: bool, + pub request_timeout: Option, + pub connection_retry_strategy: Option, + pub periodic_checks: Option, +} + +pub struct AuthenticationInfo { + pub username: Option, + pub password: Option, +} + +#[derive(Default, Debug)] +pub enum PeriodicCheck { + #[default] + Enabled, + Disabled, + ManualInterval(Duration), +} + +#[derive(Debug)] +pub struct NodeAddress { + pub host: String, + pub port: u16, +} + +impl ::std::fmt::Display for NodeAddress { + fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { + write!(f, "Host: `{}`, Port: {}", self.host, self.port) + } +} + +#[derive(PartialEq, Eq, Clone, Copy, Default)] +pub enum ReadFrom { + #[default] + Primary, + PreferReplica, +} + +#[derive(PartialEq, Eq, Clone, Copy, Default)] +pub enum TlsMode { + #[default] + NoTls, + InsecureTls, + SecureTls, +} + +pub struct ConnectionRetryStrategy { + pub exponent_base: u32, + pub factor: u32, + pub number_of_retries: u32, +} + +#[cfg(feature = "socket-layer")] +fn chars_to_string_option(chars: &::protobuf::Chars) -> Option { + if chars.is_empty() { + None + } else { + Some(chars.to_string()) + } +} + +#[cfg(feature = "socket-layer")] +fn none_if_zero(value: u32) -> Option { + if value == 0 { + None + } else { + Some(value) + } +} + +#[cfg(feature = "socket-layer")] +impl From for ConnectionRequest { + fn from(value: protobuf::ConnectionRequest) -> Self { + let read_from = value.read_from.enum_value().ok().map(|val| match val { + protobuf::ReadFrom::Primary => ReadFrom::Primary, + protobuf::ReadFrom::PreferReplica => ReadFrom::PreferReplica, + protobuf::ReadFrom::LowestLatency => todo!(), + protobuf::ReadFrom::AZAffinity => todo!(), + }); + + let client_name = chars_to_string_option(&value.client_name); + let authentication_info = value.authentication_info.0.and_then(|authentication_info| { + let password = chars_to_string_option(&authentication_info.password); + let username = chars_to_string_option(&authentication_info.username); + if password.is_none() && username.is_none() { + return None; + } + + Some(AuthenticationInfo { password, username }) + }); + + let database_id = value.database_id as i64; + let protocol = value.protocol.enum_value().ok().map(|val| match val { + protobuf::ProtocolVersion::RESP3 => redis::ProtocolVersion::RESP3, + protobuf::ProtocolVersion::RESP2 => redis::ProtocolVersion::RESP2, + }); + + let tls_mode = value.tls_mode.enum_value().ok().map(|val| match val { + protobuf::TlsMode::NoTls => TlsMode::NoTls, + protobuf::TlsMode::SecureTls => TlsMode::SecureTls, + protobuf::TlsMode::InsecureTls => TlsMode::InsecureTls, + }); + + let addresses = value + .addresses + .into_iter() + .map(|addr| NodeAddress { + host: addr.host.to_string(), + port: addr.port as u16, + }) + .collect(); + let cluster_mode_enabled = value.cluster_mode_enabled; + let request_timeout = none_if_zero(value.request_timeout); + let connection_retry_strategy = + value + .connection_retry_strategy + .0 + .map(|strategy| ConnectionRetryStrategy { + exponent_base: strategy.exponent_base, + factor: strategy.factor, + number_of_retries: strategy.number_of_retries, + }); + let periodic_checks = value + .periodic_checks + .map(|periodic_check| match periodic_check { + protobuf::connection_request::Periodic_checks::PeriodicChecksManualInterval( + interval, + ) => PeriodicCheck::ManualInterval(Duration::from_secs( + interval.duration_in_sec.into(), + )), + protobuf::connection_request::Periodic_checks::PeriodicChecksDisabled(_) => { + PeriodicCheck::Disabled + } + }); + + ConnectionRequest { + read_from, + client_name, + authentication_info, + database_id, + protocol, + tls_mode, + addresses, + cluster_mode_enabled, + request_timeout, + connection_retry_strategy, + periodic_checks, + } + } +} diff --git a/glide-core/src/lib.rs b/glide-core/src/lib.rs index aa25cb89b1..bd194f008f 100644 --- a/glide-core/src/lib.rs +++ b/glide-core/src/lib.rs @@ -2,11 +2,16 @@ * Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */ +#[cfg(feature = "socket-layer")] include!(concat!(env!("OUT_DIR"), "/protobuf/mod.rs")); pub mod client; mod retry_strategies; +#[cfg(feature = "socket-layer")] pub mod rotating_buffer; +#[cfg(feature = "socket-layer")] mod socket_listener; +#[cfg(feature = "socket-layer")] pub use socket_listener::*; pub mod errors; pub mod scripts_container; +pub use client::ConnectionRequest; diff --git a/glide-core/src/retry_strategies.rs b/glide-core/src/retry_strategies.rs index 01e32a9da2..4dd5d7edb7 100644 --- a/glide-core/src/retry_strategies.rs +++ b/glide-core/src/retry_strategies.rs @@ -1,7 +1,7 @@ /** * Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */ -use crate::connection_request::ConnectionRetryStrategy; +use crate::client::ConnectionRetryStrategy; use std::time::Duration; use tokio_retry::strategy::{jitter, ExponentialBackoff}; @@ -13,7 +13,7 @@ pub(super) struct RetryStrategy { } impl RetryStrategy { - pub(super) fn new(data: &Option>) -> Self { + pub(super) fn new(data: Option) -> Self { match data { Some(ref strategy) => get_exponential_backoff( strategy.exponent_base, @@ -55,6 +55,7 @@ pub(crate) fn get_exponential_backoff( } } +#[cfg(feature = "socket-layer")] pub(crate) fn get_fixed_interval_backoff( fixed_interval: u32, number_of_retries: u32, @@ -66,6 +67,7 @@ pub(crate) fn get_fixed_interval_backoff( mod tests { use super::*; + #[cfg(feature = "socket-layer")] #[test] fn test_fixed_intervals_with_jitter() { let retries = 3; diff --git a/glide-core/src/socket_listener.rs b/glide-core/src/socket_listener.rs index efc18a3392..626804b8a6 100644 --- a/glide-core/src/socket_listener.rs +++ b/glide-core/src/socket_listener.rs @@ -578,7 +578,7 @@ async fn create_client( writer: &Rc, request: ConnectionRequest, ) -> Result { - let client = match Client::new(request).await { + let client = match Client::new(request.into()).await { Ok(client) => client, Err(err) => return Err(ClientCreationError::ConnectionError(err)), }; diff --git a/glide-core/tests/test_client.rs b/glide-core/tests/test_client.rs index 88f249d283..2186a00a26 100644 --- a/glide-core/tests/test_client.rs +++ b/glide-core/tests/test_client.rs @@ -40,10 +40,9 @@ pub(crate) mod shared_client_tests { // TODO - this is a patch, handling the situation where the new server // still isn't available to connection. This should be fixed in [RedisServer]. let client = repeat_try_create(|| async { - Client::new(create_connection_request( - &[connection_addr.clone()], - &configuration, - )) + Client::new( + create_connection_request(&[connection_addr.clone()], &configuration).into(), + ) .await .ok() }) diff --git a/glide-core/tests/test_socket_listener.rs b/glide-core/tests/test_socket_listener.rs index 39c5fb2447..816d0e2992 100644 --- a/glide-core/tests/test_socket_listener.rs +++ b/glide-core/tests/test_socket_listener.rs @@ -1,6 +1,8 @@ -/** +/* * Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */ + +#![cfg(feature = "socket-layer")] use glide_core::*; use rsevents::{Awaitable, EventState, ManualResetEvent}; use std::io::prelude::*; diff --git a/glide-core/tests/test_standalone_client.rs b/glide-core/tests/test_standalone_client.rs index c0d60d77e2..1d00ed5bbe 100644 --- a/glide-core/tests/test_standalone_client.rs +++ b/glide-core/tests/test_standalone_client.rs @@ -168,7 +168,7 @@ mod standalone_client_tests { connection_request.read_from = config.read_from.into(); block_on_all(async { - let mut client = StandaloneClient::create_client(connection_request) + let mut client = StandaloneClient::create_client(connection_request.into()) .await .unwrap(); for mock in mocks.drain(1..config.number_of_replicas_dropped_after_connection + 1) { @@ -279,7 +279,7 @@ mod standalone_client_tests { create_connection_request(addresses.as_slice(), &Default::default()); block_on_all(async { - let mut client = StandaloneClient::create_client(connection_request) + let mut client = StandaloneClient::create_client(connection_request.into()) .await .unwrap(); diff --git a/glide-core/tests/utilities/cluster.rs b/glide-core/tests/utilities/cluster.rs index 284bc85178..6b524a9c51 100644 --- a/glide-core/tests/utilities/cluster.rs +++ b/glide-core/tests/utilities/cluster.rs @@ -257,7 +257,7 @@ pub async fn setup_test_basics_internal(mut configuration: TestConfiguration) -> configuration.request_timeout = configuration.request_timeout.or(Some(10000)); let connection_request = create_connection_request(&addresses, &configuration); - let client = Client::new(connection_request).await.unwrap(); + let client = Client::new(connection_request.into()).await.unwrap(); ClusterTestBasics { cluster, client } } diff --git a/glide-core/tests/utilities/mod.rs b/glide-core/tests/utilities/mod.rs index 6b5a6991eb..f675b62c6b 100644 --- a/glide-core/tests/utilities/mod.rs +++ b/glide-core/tests/utilities/mod.rs @@ -689,7 +689,7 @@ pub(crate) async fn setup_test_basics_internal(configuration: &TestConfiguration let mut connection_request = create_connection_request(&[connection_addr], configuration); connection_request.cluster_mode_enabled = false; connection_request.protocol = configuration.protocol.into(); - let client = StandaloneClient::create_client(connection_request) + let client = StandaloneClient::create_client(connection_request.into()) .await .unwrap(); diff --git a/java/Cargo.toml b/java/Cargo.toml index d5c30d1bc4..15d26a30e4 100644 --- a/java/Cargo.toml +++ b/java/Cargo.toml @@ -11,7 +11,7 @@ crate-type = ["cdylib"] [dependencies] redis = { path = "../submodules/redis-rs/redis", features = ["aio", "tokio-comp", "connection-manager", "tokio-rustls-comp"] } -glide-core = { path = "../glide-core" } +glide-core = { path = "../glide-core", features = ["socket-layer"] } tokio = { version = "^1", features = ["rt", "macros", "rt-multi-thread", "time"] } logger_core = {path = "../logger_core"} tracing-subscriber = "0.3.16" diff --git a/node/rust-client/Cargo.toml b/node/rust-client/Cargo.toml index 789860c237..0d9c99c95b 100644 --- a/node/rust-client/Cargo.toml +++ b/node/rust-client/Cargo.toml @@ -12,7 +12,7 @@ crate-type = ["cdylib"] [dependencies] redis = { path = "../../submodules/redis-rs/redis", features = ["aio", "tokio-comp", "tokio-rustls-comp"] } -glide-core = { path = "../../glide-core" } +glide-core = { path = "../../glide-core", features = ["socket-layer"] } tokio = { version = "1", features = ["rt", "macros", "rt-multi-thread", "time"] } napi = {version = "2.14", features = ["napi4", "napi6"] } napi-derive = "2.14" diff --git a/python/Cargo.toml b/python/Cargo.toml index d8a0e3c6a8..842526ed8d 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -13,7 +13,7 @@ crate-type = ["cdylib"] [dependencies] pyo3 = { version = "^0.20", features = ["extension-module", "num-bigint"] } redis = { path = "../submodules/redis-rs/redis", features = ["aio", "tokio-comp", "connection-manager","tokio-rustls-comp"] } -glide-core = { path = "../glide-core" } +glide-core = { path = "../glide-core", features = ["socket-layer"] } logger_core = {path = "../logger_core"} [package.metadata.maturin]