diff --git a/.github/workflows/create-test-matrices/action.yml b/.github/workflows/create-test-matrices/action.yml index 5bd777b5a1..e1a7186521 100644 --- a/.github/workflows/create-test-matrices/action.yml +++ b/.github/workflows/create-test-matrices/action.yml @@ -41,12 +41,11 @@ runs: echo 'Select server engines to run tests against' if [[ "${{ github.event_name }}" == "pull_request" || "${{ github.event_name }}" == "push" || "${{ inputs.run-full-matrix }}" == "false" ]]; then echo 'Pick engines marked as `"run": "always"` only - on PR, push or manually triggered job which does not require full matrix' - jq -c '[.[] | select(.run == "always")]' < .github/json_matrices/engine-matrix.json | awk '{ printf "engine-matrix=%s\n", $1 }' | tee -a $GITHUB_OUTPUT + jq -c '[.[] | select(.run == "always")]' < .github/json_matrices/engine-matrix.json | awk '{ printf "engine-matrix=%s\n", $0 }' | tee -a $GITHUB_OUTPUT else echo 'Pick all engines - on cron (schedule) or if manually triggered job requires a full matrix' - jq -c . < .github/json_matrices/engine-matrix.json | awk '{ printf "engine-matrix=%s\n", $1 }' | tee -a $GITHUB_OUTPUT + jq -c . < .github/json_matrices/engine-matrix.json | awk '{ printf "engine-matrix=%s\n", $0 }' | tee -a $GITHUB_OUTPUT fi - cat $GITHUB_OUTPUT - name: Load host matrix id: load-host-matrix @@ -57,12 +56,11 @@ runs: echo 'Select runners (VMs) to run tests on' if [[ "${{ github.event_name }}" == "pull_request" || "${{ github.event_name }}" == "push" || "${{ inputs.run-full-matrix }}" == "false" ]]; then echo 'Pick runners marked as '"run": "always"' only - on PR, push or manually triggered job which does not require full matrix' - jq -c '[.[] | select(.run == "always")]' < .github/json_matrices/build-matrix.json | awk '{ printf "host-matrix=%s\n", $1 }' | tee -a $GITHUB_OUTPUT + jq -c '[.[] | select(.run == "always")]' < .github/json_matrices/build-matrix.json | awk '{ printf "host-matrix=%s\n", $0 }' | tee -a $GITHUB_OUTPUT else echo 'Pick all runners assigned for the chosen client (language) - on cron (schedule) or if manually triggered job requires a full matrix' - jq -c "[.[] | select(.languages? and any(.languages[] == \"${{ inputs.language-name }}\"; .) and $CONDITION)]" < .github/json_matrices/build-matrix.json | awk '{ printf "host-matrix=%s\n", $1 }' | tee -a $GITHUB_OUTPUT + jq -c "[.[] | select(.languages? and any(.languages[] == \"${{ inputs.language-name }}\"; .) and $CONDITION)]" < .github/json_matrices/build-matrix.json | awk '{ printf "host-matrix=%s\n", $0 }' | tee -a $GITHUB_OUTPUT fi - cat $GITHUB_OUTPUT - name: Create language version matrix id: create-lang-version-matrix @@ -72,9 +70,8 @@ runs: echo 'Select language (framework/SDK) versions to run tests on' if [[ "${{ github.event_name }}" == "pull_request" || "${{ github.event_name }}" == "push" || "${{ inputs.run-full-matrix }}" == "false" ]]; then echo 'Pick language versions listed in 'always-run-versions' only - on PR, push or manually triggered job which does not require full matrix' - jq -c '[.[] | select(.language == "${{ inputs.language-name }}") | .["always-run-versions"]][0] // []' < .github/json_matrices/supported-languages-versions.json | awk '{ printf "version-matrix=%s\n", $1 }' | tee -a $GITHUB_OUTPUT + jq -c '[.[] | select(.language == "${{ inputs.language-name }}") | .["always-run-versions"]][0] // []' < .github/json_matrices/supported-languages-versions.json | awk '{ printf "version-matrix=%s\n", $0 }' | tee -a $GITHUB_OUTPUT else echo 'Pick language versions listed in 'versions' - on cron (schedule) or if manually triggered job requires a full matrix' - jq -c '[.[] | select(.language == "${{ inputs.language-name }}") | .versions][0]' < .github/json_matrices/supported-languages-versions.json | awk '{ printf "version-matrix=%s\n", $1 }' | tee -a $GITHUB_OUTPUT + jq -c '[.[] | select(.language == "${{ inputs.language-name }}") | .versions][0]' < .github/json_matrices/supported-languages-versions.json | awk '{ printf "version-matrix=%s\n", $0 }' | tee -a $GITHUB_OUTPUT fi - cat $GITHUB_OUTPUT diff --git a/.github/workflows/java-cd.yml b/.github/workflows/java-cd.yml index 5e2fd5886e..ed618f1708 100644 --- a/.github/workflows/java-cd.yml +++ b/.github/workflows/java-cd.yml @@ -225,6 +225,10 @@ jobs: host: ${{ fromJson(needs.load-platform-matrix.outputs.PLATFORM_MATRIX) }} runs-on: ${{ matrix.host.RUNNER }} steps: + - name: Setup self-hosted runner access + if: ${{matrix.host.TARGET == 'aarch64-unknown-linux-gnu' }} + run: sudo chown -R $USER:$USER /home/ubuntu/action-runner-ilia/_work/valkey-glide + - name: Checkout uses: actions/checkout@v4 diff --git a/.github/workflows/npm-cd.yml b/.github/workflows/npm-cd.yml index 24497e83cc..3e5f673a4c 100644 --- a/.github/workflows/npm-cd.yml +++ b/.github/workflows/npm-cd.yml @@ -308,23 +308,18 @@ jobs: if: ${{ matrix.build.TARGET == 'aarch64-unknown-linux-gnu' }} run: sudo chown -R $USER:$USER /home/ubuntu/actions-runner/_work/valkey-glide - - name: install Redis and git for alpine + - name: install redis and git for alpine if: ${{ contains(matrix.build.TARGET, 'musl') }} run: | apk update - apk add redis git + apk add git redis node -v - - name: install Redis and Python for ubuntu + - name: install Python for ubuntu if: ${{ contains(matrix.build.TARGET, 'linux-gnu') }} run: | sudo apt-get update - sudo apt-get install redis-server python3 - - - name: install Redis, Python for macos - if: ${{ contains(matrix.build.RUNNER, 'mac') }} - run: | - brew install redis python3 + sudo apt-get install python3 - name: Checkout if: ${{ matrix.build.TARGET != 'aarch64-unknown-linux-musl'}} @@ -339,6 +334,13 @@ jobs: npm-auth-token: ${{ secrets.NPM_AUTH_TOKEN }} arch: ${{ matrix.build.ARCH }} + - name: Install engine + if: ${{ !contains(matrix.build.TARGET, 'musl') }} + uses: ./.github/workflows/install-engine + with: + engine-version: "8.0" + target: ${{ matrix.build.target }} + - name: Setup node if: ${{ !contains(matrix.build.TARGET, 'musl') }} uses: actions/setup-node@v4 diff --git a/.github/workflows/pypi-cd.yml b/.github/workflows/pypi-cd.yml index 4ea517a818..28d8de8579 100644 --- a/.github/workflows/pypi-cd.yml +++ b/.github/workflows/pypi-cd.yml @@ -232,31 +232,12 @@ jobs: with: python-version: 3.12 - - name: Install engine Ubuntu ARM - if: ${{ matrix.build.TARGET == 'aarch64-unknown-linux-gnu' }} - shell: bash - # in self hosted runner we first want to check that engine is not already installed - run: | - if [[ $(`which redis-server`) == '' ]] - then - sudo apt-get update - sudo apt-get install -y redis-server - else - echo "Redis is already installed" - fi - - - name: Install engine Ubuntu x86 - if: ${{ matrix.build.TARGET == 'x86_64-unknown-linux-gnu' }} - shell: bash - run: | - sudo apt-get update - sudo apt-get install -y redis-server + - name: Install engine + uses: ./.github/workflows/install-engine + with: + engine-version: "8.0" + target: ${{ matrix.build.target }} - - name: Install engine MacOS - if: ${{ matrix.build.OS == 'macos' }} - shell: bash - run: | - brew install redis - name: Check if RC and set a distribution tag for the package shell: bash diff --git a/CHANGELOG.md b/CHANGELOG.md index f25bdb890c..142854a347 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,5 @@ #### Changes -* Node, Python, Java: Add allow uncovered slots scanning flag option in cluster scan ([#2814](https://github.com/valkey-io/valkey-glide/pull/2814), [#2815](https://github.com/valkey-io/valkey-glide/pull/2815), [#2860](https://github.com/valkey-io/valkey-glide/pull/2860)) + * Go: Add HINCRBY command ([#2847](https://github.com/valkey-io/valkey-glide/pull/2847)) * Go: Add HINCRBYFLOAT command ([#2846](https://github.com/valkey-io/valkey-glide/pull/2846)) * Go: Add SUNIONSTORE command ([#2805](https://github.com/valkey-io/valkey-glide/pull/2805)) @@ -7,7 +7,6 @@ * Java: bump `netty` version ([#2795](https://github.com/valkey-io/valkey-glide/pull/2795)) * Java: Bump protobuf (protoc) version ([#2796](https://github.com/valkey-io/valkey-glide/pull/2796), [#2800](https://github.com/valkey-io/valkey-glide/pull/2800)) * Go: Add `SInterStore` ([#2779](https://github.com/valkey-io/valkey-glide/issues/2779)) -* Node: Remove native package references for MacOs x64 architecture ([#2799](https://github.com/valkey-io/valkey-glide/issues/2799)) * Go: Add `ZIncrBy` command ([#2830](https://github.com/valkey-io/valkey-glide/pull/2830)) * Go: Add `SScan` and `SMove` ([#2789](https://github.com/valkey-io/valkey-glide/issues/2789)) * Go: Add `ZADD` ([#2813](https://github.com/valkey-io/valkey-glide/issues/2813)) @@ -19,6 +18,25 @@ #### Operational Enhancements +## 1.2.1 (2024-12-29) + +#### Changes + +* Node, Python, Java: Add allow uncovered slots scanning flag option in cluster scan ([#2814](https://github.com/valkey-io/valkey-glide/pull/2814), [#2815](https://github.com/valkey-io/valkey-glide/pull/2815), [#2860](https://github.com/valkey-io/valkey-glide/pull/2860)) +* Java: Bump protobuf (protoc) version ([#2561](https://github.com/valkey-io/valkey-glide/pull/2561), [#2802](https://github.com/valkey-io/valkey-glide/pull/2802)) +* Java: bump `netty` version ([#2777](https://github.com/valkey-io/valkey-glide/pull/2777)) +* Node: Remove native package references for MacOs x64 architecture ([#2799](https://github.com/valkey-io/valkey-glide/issues/2799)) +* Node, Python, Java: Add connection timeout to client configuration ([#2823](https://github.com/valkey-io/valkey-glide/issues/2823)) + +#### Breaking Changes + +#### Fixes + +* Core: Fix RESP2 multi-node response from cluster ([#2381](https://github.com/valkey-io/valkey-glide/pull/2381)) +* Core: Ensure cluster client creation fail when engine is < 7.0 and sharded subscriptions are configured ([#2819](https://github.com/valkey-io/valkey-glide/pull/2819)) + +#### Operational Enhancements + ## 1.2.0 (2024-11-27) #### Changes @@ -117,8 +135,6 @@ #### Breaking Changes -**None** - #### Fixes * Core: UDS Socket Handling Rework ([#2482](https://github.com/valkey-io/valkey-glide/pull/2482)) diff --git a/glide-core/Cargo.toml b/glide-core/Cargo.toml index bd12bb09c9..394c953f49 100644 --- a/glide-core/Cargo.toml +++ b/glide-core/Cargo.toml @@ -41,6 +41,7 @@ nanoid = "0.4.0" async-trait = { version = "0.1.24" } serde_json = "1" serde = { version = "1", features = ["derive"] } +versions = "6.3" [features] socket-layer = [ diff --git a/glide-core/redis-rs/redis/src/client.rs b/glide-core/redis-rs/redis/src/client.rs index 6ac3f40bcf..2b97671110 100644 --- a/glide-core/redis-rs/redis/src/client.rs +++ b/glide-core/redis-rs/redis/src/client.rs @@ -89,6 +89,11 @@ pub struct GlideConnectionOptions { /// If ReadFromReplica strategy is set to AZAffinity, this parameter will be set to 'true'. /// In this case, an INFO command will be triggered in the connection's setup to update the connection's 'availability_zone' property. pub discover_az: bool, + /// Connection timeout duration. + /// + /// This optional field sets the maximum duration to wait when attempting to establish + /// a connection. If `None`, the connection will use `DEFAULT_CONNECTION_TIMEOUT`. + pub connection_timeout: Option, } /// To enable async support you need to enable the feature: `tokio-comp` diff --git a/glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs b/glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs index 4f9b3f0d4e..e5af8d1e50 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs @@ -193,6 +193,7 @@ where push_sender: None, disconnect_notifier, discover_az, + connection_timeout: Some(params.connection_timeout), }, ) .await diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 8164d09413..534fdd429e 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -1094,6 +1094,7 @@ where push_sender, disconnect_notifier, discover_az, + connection_timeout: Some(cluster_params.connection_timeout), }; let connections = Self::create_initial_connections( diff --git a/glide-core/redis-rs/redis/src/cluster_routing.rs b/glide-core/redis-rs/redis/src/cluster_routing.rs index 8bf11d19d4..011f5e08e6 100644 --- a/glide-core/redis-rs/redis/src/cluster_routing.rs +++ b/glide-core/redis-rs/redis/src/cluster_routing.rs @@ -623,6 +623,7 @@ fn base_routing(cmd: &[u8]) -> RouteBy { | b"FUNCTION STATS" => RouteBy::AllNodes, b"DBSIZE" + | b"DEBUG" | b"FLUSHALL" | b"FLUSHDB" | b"FT._ALIASLIST" @@ -717,7 +718,6 @@ fn base_routing(cmd: &[u8]) -> RouteBy { | b"COMMAND LIST" | b"COMMAND" | b"CONFIG GET" - | b"DEBUG" | b"ECHO" | b"FUNCTION LIST" | b"LASTSAVE" diff --git a/glide-core/src/client/mod.rs b/glide-core/src/client/mod.rs index bb86cac5e1..005a38a9ca 100644 --- a/glide-core/src/client/mod.rs +++ b/glide-core/src/client/mod.rs @@ -13,7 +13,8 @@ use redis::cluster_routing::{ }; use redis::cluster_slotmap::ReadFromReplicaStrategy; use redis::{ - ClusterScanArgs, Cmd, ErrorKind, PushInfo, RedisError, RedisResult, ScanStateRC, Value, + ClusterScanArgs, Cmd, ErrorKind, FromRedisValue, PushInfo, RedisError, RedisResult, + ScanStateRC, Value, }; pub use standalone_client::StandaloneClient; use std::io; @@ -26,14 +27,17 @@ use self::value_conversion::{convert_to_expected_type, expected_type_for_cmd, ge mod reconnecting_connection; mod standalone_client; mod value_conversion; +use redis::InfoDict; use tokio::sync::mpsc; +use versions::Versioning; pub const HEARTBEAT_SLEEP_DURATION: Duration = Duration::from_secs(1); pub const DEFAULT_RETRIES: u32 = 3; +/// Note: If you change the default value, make sure to change the documentation in *all* wrappers. pub const DEFAULT_RESPONSE_TIMEOUT: Duration = Duration::from_millis(250); -pub const DEFAULT_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_millis(250); pub const DEFAULT_PERIODIC_TOPOLOGY_CHECKS_INTERVAL: Duration = Duration::from_secs(60); -pub const INTERNAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(250); +/// Note: If you change the default value, make sure to change the documentation in *all* wrappers. +pub const DEFAULT_CONNECTION_TIMEOUT: Duration = Duration::from_millis(250); pub const FINISHED_SCAN_CURSOR: &str = "finished"; /// The value of 1000 for the maximum number of inflight requests is determined based on Little's Law in queuing theory: @@ -568,8 +572,9 @@ async fn create_cluster_client( Some(PeriodicCheck::ManualInterval(interval)) => Some(interval), None => Some(DEFAULT_PERIODIC_TOPOLOGY_CHECKS_INTERVAL), }; + let connection_timeout = to_duration(request.connection_timeout, DEFAULT_CONNECTION_TIMEOUT); let mut builder = redis::cluster::ClusterClientBuilder::new(initial_nodes) - .connection_timeout(INTERNAL_CONNECTION_TIMEOUT) + .connection_timeout(connection_timeout) .retries(DEFAULT_RETRIES); let read_from_strategy = request.read_from.unwrap_or_default(); builder = builder.read_from(match read_from_strategy { @@ -592,7 +597,7 @@ async fn create_cluster_client( }; builder = builder.tls(tls); } - if let Some(pubsub_subscriptions) = redis_connection_info.pubsub_subscriptions { + if let Some(pubsub_subscriptions) = redis_connection_info.pubsub_subscriptions.clone() { builder = builder.pubsub_subscriptions(pubsub_subscriptions); } @@ -600,7 +605,55 @@ async fn create_cluster_client( builder = builder.periodic_connections_checks(CONNECTION_CHECKS_INTERVAL); let client = builder.build()?; - client.get_async_connection(push_sender).await + let mut con = client.get_async_connection(push_sender).await?; + + // This validation ensures that sharded subscriptions are not applied to Redis engines older than version 7.0, + // preventing scenarios where the client becomes inoperable or, worse, unaware that sharded pubsub messages are not being received. + // The issue arises because `client.get_async_connection()` might succeed even if the engine does not support sharded pubsub. + // For example, initial connections may exclude the target node for sharded subscriptions, allowing the creation to succeed, + // but subsequent resubscription tasks will fail when `setup_connection()` cannot establish a connection to the node. + // + // One approach to handle this would be to check the engine version inside `setup_connection()` and skip applying sharded subscriptions. + // However, this approach would leave the application unaware that the subscriptions were not applied, requiring the user to analyze logs to identify the issue. + // Instead, we explicitly check the engine version here and fail the connection creation if it is incompatible with sharded subscriptions. + + if let Some(pubsub_subscriptions) = redis_connection_info.pubsub_subscriptions { + if pubsub_subscriptions.contains_key(&redis::PubSubSubscriptionKind::Sharded) { + let info_res = con + .route_command( + redis::cmd("INFO").arg("SERVER"), + RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random), + ) + .await?; + let info_dict: InfoDict = FromRedisValue::from_redis_value(&info_res)?; + match info_dict.get::("redis_version") { + Some(version) => match (Versioning::new(version), Versioning::new("7.0")) { + (Some(server_ver), Some(min_ver)) => { + if server_ver < min_ver { + return Err(RedisError::from(( + ErrorKind::InvalidClientConfig, + "Sharded subscriptions provided, but the engine version is < 7.0", + ))); + } + } + _ => { + return Err(RedisError::from(( + ErrorKind::ResponseError, + "Failed to parse engine version", + ))) + } + }, + _ => { + return Err(RedisError::from(( + ErrorKind::ResponseError, + "Could not determine engine version from INFO result", + ))) + } + } + } + } + + Ok(con) } #[derive(thiserror::Error)] @@ -667,6 +720,8 @@ fn sanitized_request_string(request: &ConnectionRequest) -> String { "\nStandalone mode" }; let request_timeout = format_optional_value("Request timeout", request.request_timeout); + let connection_timeout = + format_optional_value("Connection timeout", request.connection_timeout); let database_id = format!("\ndatabase ID: {}", request.database_id); let rfr_strategy = request .read_from @@ -723,7 +778,7 @@ fn sanitized_request_string(request: &ConnectionRequest) -> String { ); format!( - "\nAddresses: {addresses}{tls_mode}{cluster_mode}{request_timeout}{rfr_strategy}{connection_retry_strategy}{database_id}{protocol}{client_name}{periodic_checks}{pubsub_subscriptions}{inflight_requests_limit}", + "\nAddresses: {addresses}{tls_mode}{cluster_mode}{request_timeout}{connection_timeout}{rfr_strategy}{connection_retry_strategy}{database_id}{protocol}{client_name}{periodic_checks}{pubsub_subscriptions}{inflight_requests_limit}", ) } diff --git a/glide-core/src/client/reconnecting_connection.rs b/glide-core/src/client/reconnecting_connection.rs index c882dd29d6..197de503b9 100644 --- a/glide-core/src/client/reconnecting_connection.rs +++ b/glide-core/src/client/reconnecting_connection.rs @@ -18,7 +18,7 @@ use tokio::task; use tokio::time::timeout; use tokio_retry2::{Retry, RetryError}; -use super::{run_with_timeout, DEFAULT_CONNECTION_ATTEMPT_TIMEOUT}; +use super::{run_with_timeout, DEFAULT_CONNECTION_TIMEOUT}; /// The reason behind the call to `reconnect()` #[derive(PartialEq, Eq, Debug, Clone)] @@ -71,7 +71,11 @@ async fn get_multiplexed_connection( connection_options: &GlideConnectionOptions, ) -> RedisResult { run_with_timeout( - Some(DEFAULT_CONNECTION_ATTEMPT_TIMEOUT), + Some( + connection_options + .connection_timeout + .unwrap_or(DEFAULT_CONNECTION_TIMEOUT), + ), client.get_multiplexed_async_connection(connection_options.clone()), ) .await @@ -113,6 +117,7 @@ async fn create_connection( retry_strategy: RetryStrategy, push_sender: Option>, discover_az: bool, + connection_timeout: Duration, ) -> Result { let client = &connection_backend.connection_info; let connection_options = GlideConnectionOptions { @@ -121,6 +126,7 @@ async fn create_connection( TokioDisconnectNotifier::new(), )), discover_az, + connection_timeout: Some(connection_timeout), }; let action = || async { get_multiplexed_connection(client, &connection_options) @@ -206,6 +212,7 @@ impl ReconnectingConnection { tls_mode: TlsMode, push_sender: Option>, discover_az: bool, + connection_timeout: Duration, ) -> Result { log_debug( "connection creation", @@ -218,7 +225,14 @@ impl ReconnectingConnection { connection_available_signal: ManualResetEvent::new(true), client_dropped_flagged: AtomicBool::new(false), }; - create_connection(backend, connection_retry_strategy, push_sender, discover_az).await + create_connection( + backend, + connection_retry_strategy, + push_sender, + discover_az, + connection_timeout, + ) + .await } pub(crate) fn node_address(&self) -> String { diff --git a/glide-core/src/client/standalone_client.rs b/glide-core/src/client/standalone_client.rs index 5bc26999a8..c2c541c763 100644 --- a/glide-core/src/client/standalone_client.rs +++ b/glide-core/src/client/standalone_client.rs @@ -2,6 +2,7 @@ use super::get_redis_connection_info; use super::reconnecting_connection::{ReconnectReason, ReconnectingConnection}; +use super::{to_duration, DEFAULT_CONNECTION_TIMEOUT}; use super::{ConnectionRequest, NodeAddress, TlsMode}; use crate::client::types::ReadFrom as ClientReadFrom; use crate::retry_strategies::RetryStrategy; @@ -15,6 +16,7 @@ use redis::{PushInfo, RedisError, RedisResult, Value}; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; +use std::time::Duration; use telemetrylib::Telemetry; use tokio::sync::mpsc; use tokio::task; @@ -130,6 +132,11 @@ impl StandaloneClient { Some(ClientReadFrom::AZAffinity(_)) ); + let connection_timeout = to_duration( + connection_request.connection_timeout, + DEFAULT_CONNECTION_TIMEOUT, + ); + let mut stream = stream::iter(connection_request.addresses.iter()) .map(|address| async { get_connection_and_replication_info( @@ -143,6 +150,7 @@ impl StandaloneClient { tls_mode.unwrap_or(TlsMode::NoTls), &push_sender, discover_az, + connection_timeout, ) .await .map_err(|err| (format!("{}:{}", address.host, address.port), err)) @@ -552,6 +560,7 @@ async fn get_connection_and_replication_info( tls_mode: TlsMode, push_sender: &Option>, discover_az: bool, + connection_timeout: Duration, ) -> Result<(ReconnectingConnection, Value), (ReconnectingConnection, RedisError)> { let result = ReconnectingConnection::new( address, @@ -560,6 +569,7 @@ async fn get_connection_and_replication_info( tls_mode, push_sender.clone(), discover_az, + connection_timeout, ) .await; let reconnecting_connection = match result { diff --git a/glide-core/src/client/types.rs b/glide-core/src/client/types.rs index a0053587c8..e2314a1ab6 100644 --- a/glide-core/src/client/types.rs +++ b/glide-core/src/client/types.rs @@ -20,6 +20,7 @@ pub struct ConnectionRequest { pub addresses: Vec, pub cluster_mode_enabled: bool, pub request_timeout: Option, + pub connection_timeout: Option, pub connection_retry_strategy: Option, pub periodic_checks: Option, pub pubsub_subscriptions: Option, @@ -147,6 +148,7 @@ impl From for ConnectionRequest { .collect(); let cluster_mode_enabled = value.cluster_mode_enabled; let request_timeout = none_if_zero(value.request_timeout); + let connection_timeout = none_if_zero(value.connection_timeout); let connection_retry_strategy = value .connection_retry_strategy @@ -214,6 +216,7 @@ impl From for ConnectionRequest { addresses, cluster_mode_enabled, request_timeout, + connection_timeout, connection_retry_strategy, periodic_checks, pubsub_subscriptions, diff --git a/glide-core/src/client/value_conversion.rs b/glide-core/src/client/value_conversion.rs index 48996fc76d..7eafe3f373 100644 --- a/glide-core/src/client/value_conversion.rs +++ b/glide-core/src/client/value_conversion.rs @@ -10,6 +10,11 @@ pub(crate) enum ExpectedReturnType<'a> { key_type: &'a Option>, value_type: &'a Option>, }, + // Second parameter is a function which returns true if value needs to be converted + SingleOrMultiNode( + &'a Option>, + Option bool>, + ), MapOfStringToDouble, Double, Boolean, @@ -278,12 +283,6 @@ pub(crate) fn convert_to_expected_type( }, ExpectedReturnType::Lolwut => { match value { - // cluster (multi-node) response - go recursive - Value::Map(map) => convert_map_entries( - map, - Some(ExpectedReturnType::BulkString), - Some(ExpectedReturnType::Lolwut), - ), // RESP 2 response Value::BulkString(bytes) => { let text = std::str::from_utf8(&bytes).unwrap(); @@ -558,19 +557,7 @@ pub(crate) fn convert_to_expected_type( // Second part is converted as `Map[str, Map[str, int]]` ExpectedReturnType::FunctionStatsReturnType => match value { // TODO reuse https://github.com/Bit-Quill/glide-for-redis/pull/331 and https://github.com/valkey-io/valkey-glide/pull/1489 - Value::Map(map) => { - if map[0].0 == Value::BulkString(b"running_script".into()) { - // already a RESP3 response - do nothing - Ok(Value::Map(map)) - } else { - // cluster (multi-node) response - go recursive - convert_map_entries( - map, - Some(ExpectedReturnType::BulkString), - Some(ExpectedReturnType::FunctionStatsReturnType), - ) - } - } + Value::Map(map) => Ok(Value::Map(map)), Value::Array(mut array) if array.len() == 4 => { let mut result: Vec<(Value, Value)> = Vec::with_capacity(2); let running_script_info = array.remove(1); @@ -1144,6 +1131,19 @@ pub(crate) fn convert_to_expected_type( ) .into()) } + ExpectedReturnType::SingleOrMultiNode(value_type, value_checker) => match value { + Value::Map(ref map) => match value_checker { + Some(func) => { + if !map.is_empty() && func(map[0].clone().1) { + convert_to_expected_type(value, Some(ExpectedReturnType::Map { key_type: &None, value_type })) + } else { + Ok(value) + } + } + None => convert_to_expected_type(value, Some(ExpectedReturnType::Map { key_type: &None, value_type })), + } + _ => convert_to_expected_type(value, *value_type), + } } } @@ -1392,12 +1392,19 @@ pub(crate) fn expected_type_for_cmd(cmd: &Cmd) -> Option { // TODO use enum to avoid mistakes match command.as_slice() { - b"HGETALL" | b"CONFIG GET" | b"FT.CONFIG GET" | b"FT._ALIASLIST" | b"HELLO" => { + b"HGETALL" | b"FT.CONFIG GET" | b"FT._ALIASLIST" | b"HELLO" => { Some(ExpectedReturnType::Map { key_type: &None, value_type: &None, }) } + b"CONFIG GET" => Some(ExpectedReturnType::SingleOrMultiNode( + &Some(ExpectedReturnType::Map { + key_type: &None, + value_type: &None, + }), + Some(|val| matches!(val, Value::Array(_))), + )), b"XCLAIM" => { if cmd.position(b"JUSTID").is_some() { Some(ExpectedReturnType::ArrayOfStrings) @@ -1481,11 +1488,17 @@ pub(crate) fn expected_type_for_cmd(cmd: &Cmd) -> Option { None } } - b"LOLWUT" => Some(ExpectedReturnType::Lolwut), + b"LOLWUT" => Some(ExpectedReturnType::SingleOrMultiNode( + &Some(ExpectedReturnType::Lolwut), + None, + )), b"FUNCTION LIST" => Some(ExpectedReturnType::ArrayOfMaps(&Some( ExpectedReturnType::ArrayOfMaps(&Some(ExpectedReturnType::StringOrSet)), ))), - b"FUNCTION STATS" => Some(ExpectedReturnType::FunctionStatsReturnType), + b"FUNCTION STATS" => Some(ExpectedReturnType::SingleOrMultiNode( + &Some(ExpectedReturnType::FunctionStatsReturnType), + Some(|val| matches!(val, Value::Array(_))), + )), b"GEOSEARCH" => { if cmd.position(b"WITHDIST").is_some() || cmd.position(b"WITHHASH").is_some() @@ -1953,17 +1966,14 @@ mod tests { #[test] fn convert_lolwut() { - assert!(matches!( - expected_type_for_cmd(redis::cmd("LOLWUT").arg("version").arg("42")), - Some(ExpectedReturnType::Lolwut) - )); - let unconverted_string : String = "\x1b[0;97;107m \x1b[0m--\x1b[0;37;47m \x1b[0m--\x1b[0;90;100m \x1b[0m--\x1b[0;30;40m \x1b[0m".into(); let expected: String = "\u{2591}--\u{2592}--\u{2593}-- ".into(); + let mut cmd = redis::cmd("LOLWUT"); + let conversion_type = expected_type_for_cmd(cmd.arg("version").arg("42")); let converted_1 = convert_to_expected_type( Value::BulkString(unconverted_string.clone().into_bytes()), - Some(ExpectedReturnType::Lolwut), + conversion_type, ); assert_eq!( Value::BulkString(expected.clone().into_bytes()), @@ -1975,7 +1985,7 @@ mod tests { format: redis::VerbatimFormat::Text, text: unconverted_string.clone(), }, - Some(ExpectedReturnType::Lolwut), + conversion_type, ); assert_eq!( Value::BulkString(expected.clone().into_bytes()), @@ -1993,16 +2003,16 @@ mod tests { Value::BulkString(unconverted_string.clone().into_bytes()), ), ]), - Some(ExpectedReturnType::Lolwut), + conversion_type, ); assert_eq!( Value::Map(vec![ ( - Value::BulkString("node 1".into()), + Value::SimpleString("node 1".into()), Value::BulkString(expected.clone().into_bytes()) ), ( - Value::BulkString("node 2".into()), + Value::SimpleString("node 2".into()), Value::BulkString(expected.clone().into_bytes()) ), ]), @@ -2011,7 +2021,7 @@ mod tests { let converted_4 = convert_to_expected_type( Value::SimpleString(unconverted_string.clone()), - Some(ExpectedReturnType::Lolwut), + conversion_type, ); assert!(converted_4.is_err()); } @@ -2521,11 +2531,6 @@ mod tests { #[test] fn convert_function_stats() { - assert!(matches!( - expected_type_for_cmd(redis::cmd("FUNCTION").arg("STATS")), - Some(ExpectedReturnType::FunctionStatsReturnType) - )); - let resp2_response_non_empty_first_part_data = vec![ Value::BulkString(b"running_script".into()), Value::Array(vec![ @@ -2652,7 +2657,8 @@ mod tests { ), ]); - let conversion_type = Some(ExpectedReturnType::FunctionStatsReturnType); + let cmd = redis::cmd("FUNCTION STATS"); + let conversion_type = expected_type_for_cmd(&cmd); // resp2 -> resp3 conversion with non-empty `running_script` block assert_eq!( convert_to_expected_type( diff --git a/glide-core/src/protobuf/connection_request.proto b/glide-core/src/protobuf/connection_request.proto index 5f4db44b00..8e33b39da3 100644 --- a/glide-core/src/protobuf/connection_request.proto +++ b/glide-core/src/protobuf/connection_request.proto @@ -26,7 +26,7 @@ message AuthenticationInfo { enum ProtocolVersion { RESP3 = 0; - RESP2 = 1; + RESP2 = 1; } message PeriodicChecksManualInterval { @@ -71,6 +71,7 @@ message ConnectionRequest { PubSubSubscriptions pubsub_subscriptions = 13; uint32 inflight_requests_limit = 14; string client_az = 15; + uint32 connection_timeout = 16; } message ConnectionRetryStrategy { diff --git a/glide-core/tests/test_cluster_client.rs b/glide-core/tests/test_cluster_client.rs index 2943ab21bb..ec795481c6 100644 --- a/glide-core/tests/test_cluster_client.rs +++ b/glide-core/tests/test_cluster_client.rs @@ -7,13 +7,19 @@ mod cluster_client_tests { use std::collections::HashMap; use super::*; - use glide_core::connection_request::ReadFrom; + use cluster::{setup_cluster_with_replicas, LONG_CLUSTER_TEST_TIMEOUT}; + use glide_core::client::Client; + use glide_core::connection_request::{ + self, PubSubChannelsOrPatterns, PubSubSubscriptions, ReadFrom, + }; use redis::cluster_routing::{ MultipleNodeRoutingInfo, Route, RoutingInfo, SingleNodeRoutingInfo, SlotAddr, }; + use redis::InfoDict; use rstest::rstest; use utilities::cluster::{setup_test_basics_internal, SHORT_CLUSTER_TEST_TIMEOUT}; use utilities::*; + use versions::Versioning; fn count_primary_or_replica(value: &str) -> (u16, u16) { if value.contains("role:master") { @@ -214,4 +220,85 @@ mod cluster_client_tests { assert_eq!(replicas, 1); }); } + + #[rstest] + #[timeout(LONG_CLUSTER_TEST_TIMEOUT)] + fn test_fail_creation_with_unsupported_sharded_pubsub() { + block_on_all(async { + let mut test_basics = setup_cluster_with_replicas( + TestConfiguration { + cluster_mode: ClusterMode::Enabled, + shared_server: false, + ..Default::default() + }, + 0, + 3, + ) + .await; + + // get engine version + let cmd = redis::cmd("INFO"); + let info = test_basics + .client + .send_command( + &cmd, + Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)), + ) + .await + .unwrap(); + + let info_dict: InfoDict = redis::from_owned_redis_value(info).unwrap(); + match info_dict.get::("redis_version") { + Some(version) => match (Versioning::new(version), Versioning::new("7.0")) { + (Some(server_ver), Some(min_ver)) => { + if server_ver < min_ver { + // try to create client with initial nodes lacking the target sharded subscription node + let cluster = test_basics.cluster.unwrap(); + let mut addresses = cluster.get_server_addresses(); + addresses.truncate(1); + + let mut connection_request = + connection_request::ConnectionRequest::new(); + connection_request.addresses = + addresses.iter().map(get_address_info).collect(); + + connection_request.cluster_mode_enabled = true; + // Assumes the current implementation of the test cluster, where slots are distributed across nodes + // in a monotonically increasing order. + let mut last_slot_channel = PubSubChannelsOrPatterns::new(); + last_slot_channel + .channels_or_patterns + .push("last-slot-channel-{16383}".as_bytes().into()); + + let mut subs = PubSubSubscriptions::new(); + // First try to create a client with the Exact subscription + subs.channels_or_patterns_by_type + .insert(0, last_slot_channel.clone()); + connection_request.pubsub_subscriptions = + protobuf::MessageField::from_option(Some(subs.clone())); + + let _client = Client::new(connection_request.clone().into(), None) + .await + .unwrap(); + + // Now try to create a client with a Sharded subscription which should fail + subs.channels_or_patterns_by_type + .insert(2, last_slot_channel); + connection_request.pubsub_subscriptions = + protobuf::MessageField::from_option(Some(subs)); + + let client = Client::new(connection_request.into(), None).await; + assert!(client.is_err()); + } + } + _ => { + panic!("Failed to parse engine version"); + } + }, + _ => { + panic!("Could not determine engine version from INFO result"); + } + } + }); + } } diff --git a/java/client/src/main/java/glide/api/models/configuration/AdvancedBaseClientConfiguration.java b/java/client/src/main/java/glide/api/models/configuration/AdvancedBaseClientConfiguration.java new file mode 100644 index 0000000000..5a28ee9fcf --- /dev/null +++ b/java/client/src/main/java/glide/api/models/configuration/AdvancedBaseClientConfiguration.java @@ -0,0 +1,22 @@ +/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.api.models.configuration; + +import lombok.Getter; +import lombok.experimental.SuperBuilder; + +/** + * Advanced configuration settings class for creating a client. Shared settings for standalone and + * cluster clients. + */ +@Getter +@SuperBuilder +public abstract class AdvancedBaseClientConfiguration { + + /** + * The duration in milliseconds to wait for a TCP/TLS connection to complete. This applies both + * during initial client creation and any reconnections that may occur during request processing. + * **Note**: A high connection timeout may lead to prolonged blocking of the entire command + * pipeline. If not explicitly set, a default value of 250 milliseconds will be used. + */ + private final Integer connectionTimeout; +} diff --git a/java/client/src/main/java/glide/api/models/configuration/AdvancedGlideClientConfiguration.java b/java/client/src/main/java/glide/api/models/configuration/AdvancedGlideClientConfiguration.java new file mode 100644 index 0000000000..0ce5f85958 --- /dev/null +++ b/java/client/src/main/java/glide/api/models/configuration/AdvancedGlideClientConfiguration.java @@ -0,0 +1,23 @@ +/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.api.models.configuration; + +import glide.api.GlideClient; +import lombok.Getter; +import lombok.ToString; +import lombok.experimental.SuperBuilder; + +/** + * Represents advanced configuration settings for a Standalone {@link GlideClient} used in {@link + * GlideClientConfiguration}. + * + * @example + *
{@code
+ * AdvancedGlideClientConfiguration config = AdvancedGlideClientConfiguration.builder()
+ *     .connectionTimeout(500)
+ *     .build();
+ * }
+ */ +@Getter +@SuperBuilder +@ToString +public class AdvancedGlideClientConfiguration extends AdvancedBaseClientConfiguration {} diff --git a/java/client/src/main/java/glide/api/models/configuration/AdvancedGlideClusterClientConfiguration.java b/java/client/src/main/java/glide/api/models/configuration/AdvancedGlideClusterClientConfiguration.java new file mode 100644 index 0000000000..ff02b18c4f --- /dev/null +++ b/java/client/src/main/java/glide/api/models/configuration/AdvancedGlideClusterClientConfiguration.java @@ -0,0 +1,23 @@ +/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.api.models.configuration; + +import glide.api.GlideClusterClient; +import lombok.Getter; +import lombok.ToString; +import lombok.experimental.SuperBuilder; + +/** + * Represents advanced configuration settings for a Standalone {@link GlideClusterClient} used in + * {@link GlideClusterClientConfiguration}. + * + * @example + *
{@code
+ * AdvancedGlideClusterClientConfiguration config = AdvancedGlideClusterClientConfiguration.builder()
+ *     .connectionTimeout(500)
+ *     .build();
+ * }
+ */ +@Getter +@SuperBuilder +@ToString +public class AdvancedGlideClusterClientConfiguration extends AdvancedBaseClientConfiguration {} diff --git a/java/client/src/main/java/glide/api/models/configuration/BaseClientConfiguration.java b/java/client/src/main/java/glide/api/models/configuration/BaseClientConfiguration.java index 7d9d5d5b68..7cd29a7cb8 100644 --- a/java/client/src/main/java/glide/api/models/configuration/BaseClientConfiguration.java +++ b/java/client/src/main/java/glide/api/models/configuration/BaseClientConfiguration.java @@ -49,7 +49,8 @@ public abstract class BaseClientConfiguration { * The duration in milliseconds that the client should wait for a request to complete. This * duration encompasses sending the request, awaiting for a response from the server, and any * required reconnections or retries. If the specified timeout is exceeded for a pending request, - * it will result in a timeout error. If not set, a default value will be used. + * it will result in a timeout error. If not explicitly set, a default value of 250 milliseconds + * will be used. */ private final Integer requestTimeout; diff --git a/java/client/src/main/java/glide/api/models/configuration/GlideClientConfiguration.java b/java/client/src/main/java/glide/api/models/configuration/GlideClientConfiguration.java index 83d84e7c1f..4321f1dd39 100644 --- a/java/client/src/main/java/glide/api/models/configuration/GlideClientConfiguration.java +++ b/java/client/src/main/java/glide/api/models/configuration/GlideClientConfiguration.java @@ -24,6 +24,7 @@ * .clientName("GLIDE") * .subscriptionConfiguration(subscriptionConfiguration) * .inflightRequestsLimit(1000) + * .advancedConfiguration(AdvancedGlideClientConfiguration.builder().connectionTimeout(500).build()) * .build(); * } */ @@ -39,4 +40,7 @@ public class GlideClientConfiguration extends BaseClientConfiguration { /** Subscription configuration for the current client. */ private final StandaloneSubscriptionConfiguration subscriptionConfiguration; + + /** Advanced configuration settings for the client. */ + private final AdvancedGlideClientConfiguration advancedConfiguration; } diff --git a/java/client/src/main/java/glide/api/models/configuration/GlideClusterClientConfiguration.java b/java/client/src/main/java/glide/api/models/configuration/GlideClusterClientConfiguration.java index b1d1c7590c..f0b6d7789d 100644 --- a/java/client/src/main/java/glide/api/models/configuration/GlideClusterClientConfiguration.java +++ b/java/client/src/main/java/glide/api/models/configuration/GlideClusterClientConfiguration.java @@ -23,6 +23,7 @@ * .clientName("GLIDE") * .subscriptionConfiguration(subscriptionConfiguration) * .inflightRequestsLimit(1000) + * .advancedConfiguration(AdvancedGlideClusterClientConfiguration.builder().connectionTimeout(500).build()) * .build(); * } */ @@ -32,4 +33,7 @@ public class GlideClusterClientConfiguration extends BaseClientConfiguration { /** Subscription configuration for the current client. */ private final ClusterSubscriptionConfiguration subscriptionConfiguration; + + /** Advanced configuration settings for the client. */ + private final AdvancedGlideClusterClientConfiguration advancedConfiguration; } diff --git a/java/client/src/main/java/glide/managers/ConnectionManager.java b/java/client/src/main/java/glide/managers/ConnectionManager.java index 99b383a9ed..443384d5a6 100644 --- a/java/client/src/main/java/glide/managers/ConnectionManager.java +++ b/java/client/src/main/java/glide/managers/ConnectionManager.java @@ -8,6 +8,7 @@ import connection_request.ConnectionRequestOuterClass.PubSubChannelsOrPatterns; import connection_request.ConnectionRequestOuterClass.PubSubSubscriptions; import connection_request.ConnectionRequestOuterClass.TlsMode; +import glide.api.models.configuration.AdvancedBaseClientConfiguration; import glide.api.models.configuration.BaseClientConfiguration; import glide.api.models.configuration.GlideClientConfiguration; import glide.api.models.configuration.GlideClusterClientConfiguration; @@ -171,6 +172,30 @@ private ConnectionRequest.Builder setupConnectionRequestBuilderGlideClient( connectionRequestBuilder.setPubsubSubscriptions(subscriptionsBuilder.build()); } + if (configuration.getAdvancedConfiguration() != null) { + connectionRequestBuilder = + setupConnectionRequestBuilderAdvancedBaseConfiguration( + connectionRequestBuilder, configuration.getAdvancedConfiguration()); + } + + return connectionRequestBuilder; + } + + /** + * Configures the {@link ConnectionRequest.Builder} with settings from the provided {@link + * AdvancedBaseClientConfiguration}. + * + * @param connectionRequestBuilder The builder for the {@link ConnectionRequest}. + * @param configuration The advanced configuration settings. + * @return The updated {@link ConnectionRequest.Builder}. + */ + private ConnectionRequest.Builder setupConnectionRequestBuilderAdvancedBaseConfiguration( + ConnectionRequest.Builder connectionRequestBuilder, + AdvancedBaseClientConfiguration configuration) { + if (configuration.getConnectionTimeout() != null) { + connectionRequestBuilder.setConnectionTimeout(configuration.getConnectionTimeout()); + } + return connectionRequestBuilder; } @@ -199,6 +224,12 @@ private ConnectionRequest.Builder setupConnectionRequestBuilderGlideClusterClien connectionRequestBuilder.setPubsubSubscriptions(subscriptionsBuilder.build()); } + if (configuration.getAdvancedConfiguration() != null) { + connectionRequestBuilder = + setupConnectionRequestBuilderAdvancedBaseConfiguration( + connectionRequestBuilder, configuration.getAdvancedConfiguration()); + } + return connectionRequestBuilder; } diff --git a/java/integTest/src/test/java/glide/ConnectionTests.java b/java/integTest/src/test/java/glide/ConnectionTests.java index 2aec2e4e6b..45fea7065a 100644 --- a/java/integTest/src/test/java/glide/ConnectionTests.java +++ b/java/integTest/src/test/java/glide/ConnectionTests.java @@ -10,19 +10,33 @@ import static glide.api.models.configuration.RequestRoutingConfiguration.SlotType.PRIMARY; import static glide.api.models.configuration.RequestRoutingConfiguration.SlotType.REPLICA; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assumptions.assumeTrue; +import glide.api.BaseClient; import glide.api.GlideClient; import glide.api.GlideClusterClient; import glide.api.models.ClusterValue; import glide.api.models.commands.InfoOptions; +import glide.api.models.configuration.AdvancedGlideClientConfiguration; +import glide.api.models.configuration.AdvancedGlideClusterClientConfiguration; +import glide.api.models.configuration.BackoffStrategy; import glide.api.models.configuration.ReadFrom; import glide.api.models.configuration.RequestRoutingConfiguration; +import glide.api.models.exceptions.ClosingException; +import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.stream.Stream; import lombok.SneakyThrows; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; @Timeout(10) // seconds public class ConnectionTests { @@ -52,6 +66,35 @@ public GlideClusterClient createAzTestClient(String az) { .get(); } + @SneakyThrows + public BaseClient createConnectionTimeoutClient( + Boolean clusterMode, + int connectionTimeout, + int requestTimeout, + BackoffStrategy backoffStrategy) { + if (clusterMode) { + var advancedConfiguration = + AdvancedGlideClusterClientConfiguration.builder() + .connectionTimeout(connectionTimeout) + .build(); + return GlideClusterClient.createClient( + commonClusterClientConfig() + .advancedConfiguration(advancedConfiguration) + .requestTimeout(requestTimeout) + .build()) + .get(); + } + var advancedConfiguration = + AdvancedGlideClientConfiguration.builder().connectionTimeout(connectionTimeout).build(); + return GlideClient.createClient( + commonClientConfig() + .advancedConfiguration(advancedConfiguration) + .requestTimeout(requestTimeout) + .reconnectStrategy(backoffStrategy) + .build()) + .get(); + } + /** * Test that the client with AZ affinity strategy routes in a round-robin manner to all replicas * within the specified AZ. @@ -202,4 +245,76 @@ public void test_az_affinity_non_existing_az() { assertEquals(4, matchingEntries); azTestClient.close(); } + + @SneakyThrows + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void test_connection_timeout(boolean clusterMode) { + var backoffStrategy = + BackoffStrategy.builder().exponentBase(2).factor(100).numOfRetries(1).build(); + var client = createConnectionTimeoutClient(clusterMode, 250, 20000, backoffStrategy); + + // Runnable for long-running DEBUG SLEEP command + Runnable debugSleepTask = + () -> { + try { + if (client instanceof GlideClusterClient) { + ((GlideClusterClient) client) + .customCommand(new String[] {"DEBUG", "sleep", "7"}, ALL_NODES) + .get(); + } else if (client instanceof GlideClient) { + ((GlideClient) client).customCommand(new String[] {"DEBUG", "sleep", "7"}).get(); + } + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Error during DEBUG SLEEP command", e); + } + }; + + // Runnable for testing connection failure due to timeout + Runnable failToConnectTask = + () -> { + try { + Thread.sleep(1000); // Wait to ensure the debug sleep command is running + ExecutionException executionException = + assertThrows( + ExecutionException.class, + () -> createConnectionTimeoutClient(clusterMode, 100, 250, backoffStrategy)); + assertInstanceOf(ClosingException.class, executionException.getCause()); + assertTrue(executionException.getMessage().toLowerCase().contains("timed out")); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread was interrupted", e); + } + }; + + // Runnable for testing successful connection + Runnable connectToClientTask = + () -> { + try { + Thread.sleep(1000); // Wait to ensure the debug sleep command is running + var timeoutClient = + createConnectionTimeoutClient(clusterMode, 10000, 250, backoffStrategy); + assertEquals(timeoutClient.set("key", "value").get(), "OK"); + timeoutClient.close(); + } catch (Exception e) { + throw new RuntimeException("Error during successful connection attempt", e); + } + }; + + // Execute all tasks concurrently + ExecutorService executorService = Executors.newFixedThreadPool(3); + try { + executorService.invokeAll( + List.of( + Executors.callable(debugSleepTask), + Executors.callable(failToConnectTask), + Executors.callable(connectToClientTask))); + } finally { + executorService.shutdown(); + // Clean up the main client + if (client != null) { + client.close(); + } + } + } } diff --git a/node/npm/glide/index.ts b/node/npm/glide/index.ts index c4dab9795b..a7fad935c1 100644 --- a/node/npm/glide/index.ts +++ b/node/npm/glide/index.ts @@ -213,6 +213,8 @@ function initialize() { Script, ObjectType, ClusterScanCursor, + AdvancedGlideClientConfiguration, + AdvancedGlideClusterClientConfiguration, BaseClientConfiguration, GlideClusterClientConfiguration, LevelOptions, @@ -290,6 +292,8 @@ function initialize() { GlideClient, GlideClusterClient, GlideClientConfiguration, + AdvancedGlideClientConfiguration, + AdvancedGlideClusterClientConfiguration, FunctionListOptions, FunctionListResponse, FunctionStatsSingleResponse, diff --git a/node/rust-client/src/lib.rs b/node/rust-client/src/lib.rs index 963f966f24..ffa5b5c47f 100644 --- a/node/rust-client/src/lib.rs +++ b/node/rust-client/src/lib.rs @@ -40,9 +40,13 @@ pub enum Level { pub const MAX_REQUEST_ARGS_LEN: u32 = MAX_REQUEST_ARGS_LENGTH as u32; #[napi] -pub const DEFAULT_TIMEOUT_IN_MILLISECONDS: u32 = +pub const DEFAULT_REQUEST_TIMEOUT_IN_MILLISECONDS: u32 = glide_core::client::DEFAULT_RESPONSE_TIMEOUT.as_millis() as u32; +#[napi] +pub const DEFAULT_CONNECTION_TIMEOUT_IN_MILLISECONDS: u32 = + glide_core::client::DEFAULT_CONNECTION_TIMEOUT.as_millis() as u32; + #[napi] pub const DEFAULT_INFLIGHT_REQUESTS_LIMIT: u32 = glide_core::client::DEFAULT_MAX_INFLIGHT_REQUESTS; diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index ded6ea58f3..a9aed75560 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -3,8 +3,9 @@ */ import { ClusterScanCursor, + DEFAULT_CONNECTION_TIMEOUT_IN_MILLISECONDS, DEFAULT_INFLIGHT_REQUESTS_LIMIT, - DEFAULT_TIMEOUT_IN_MILLISECONDS, + DEFAULT_REQUEST_TIMEOUT_IN_MILLISECONDS, Script, StartSocketConnection, getStatistics, @@ -605,7 +606,7 @@ export interface BaseClientConfiguration { * The duration in milliseconds that the client should wait for a request to complete. * This duration encompasses sending the request, awaiting for a response from the server, and any required reconnections or retries. * If the specified timeout is exceeded for a pending request, it will result in a timeout error. - * If not set, a default value will be used. + * If not explicitly set, a default value of 250 milliseconds will be used. * Value must be an integer. */ requestTimeout?: number; @@ -649,6 +650,33 @@ export interface BaseClientConfiguration { clientAz?: string; } +/** + * Represents advanced configuration settings for a client, including connection-related options. + * + * @remarks + * The `AdvancedBaseClientConfiguration` interface defines advanced configuration settings for managing the client's connection behavior. + * + * ### Connection Timeout + * + * - **Connection Timeout**: The `connectionTimeout` property specifies the duration (in milliseconds) the client should wait for a connection to be established. + * + * @example + * ```typescript + * const config: AdvancedBaseClientConfiguration = { + * connectionTimeout: 5000, // 5 seconds + * }; + * ``` + */ +export interface AdvancedBaseClientConfiguration { + /** + * The duration in milliseconds to wait for a TCP/TLS connection to complete. + * This applies both during initial client creation and any reconnections that may occur during request processing. + * **Note**: A high connection timeout may lead to prolonged blocking of the entire command pipeline. + * If not explicitly set, a default value of 250 milliseconds will be used. + */ + connectionTimeout?: number; +} + /** * Enum of Valkey data types * `STRING` @@ -950,7 +978,7 @@ export class BaseClient { Logger.log("info", "Client lifetime", `construct client`); this.config = options; this.requestTimeout = - options?.requestTimeout ?? DEFAULT_TIMEOUT_IN_MILLISECONDS; + options?.requestTimeout ?? DEFAULT_REQUEST_TIMEOUT_IN_MILLISECONDS; this.socket = socket; this.socket .on("data", (data) => this.handleReadData(data)) @@ -7619,6 +7647,18 @@ export class BaseClient { }; } + /** + * @internal + */ + protected configureAdvancedConfigurationBase( + options: AdvancedBaseClientConfiguration, + request: connection_request.IConnectionRequest, + ) { + request.connectionTimeout = + options.connectionTimeout ?? + DEFAULT_CONNECTION_TIMEOUT_IN_MILLISECONDS; + } + /** * @internal */ diff --git a/node/src/GlideClient.ts b/node/src/GlideClient.ts index acec0c377f..fc9301bd75 100644 --- a/node/src/GlideClient.ts +++ b/node/src/GlideClient.ts @@ -4,6 +4,7 @@ import * as net from "net"; import { + AdvancedBaseClientConfiguration, BaseClient, BaseClientConfiguration, convertGlideRecordToRecord, @@ -171,8 +172,26 @@ export type GlideClientConfiguration = BaseClientConfiguration & { * Will be applied via SUBSCRIBE/PSUBSCRIBE commands during connection establishment. */ pubsubSubscriptions?: GlideClientConfiguration.PubSubSubscriptions; + /** + * Advanced configuration settings for the client. + */ + advancedConfiguration?: AdvancedGlideClientConfiguration; }; +/** + * Represents advanced configuration settings for creating a {@link GlideClient | GlideClient} used in {@link GlideClientConfiguration | GlideClientConfiguration}. + * + * + * @example + * ```typescript + * const config: AdvancedGlideClientConfiguration = { + * connectionTimeout: 500, // Set the connection timeout to 500ms + * }; + * ``` + */ +export type AdvancedGlideClientConfiguration = + AdvancedBaseClientConfiguration & {}; + /** * Client used for connection to standalone servers. * @@ -189,6 +208,14 @@ export class GlideClient extends BaseClient { configuration.databaseId = options.databaseId; configuration.connectionRetryStrategy = options.connectionBackoff; this.configurePubsub(options, configuration); + + if (options.advancedConfiguration) { + this.configureAdvancedConfigurationBase( + options.advancedConfiguration, + configuration, + ); + } + return configuration; } /** diff --git a/node/src/GlideClusterClient.ts b/node/src/GlideClusterClient.ts index 4e9aee579d..c12264f078 100644 --- a/node/src/GlideClusterClient.ts +++ b/node/src/GlideClusterClient.ts @@ -5,6 +5,7 @@ import { ClusterScanCursor, Script } from "glide-rs"; import * as net from "net"; import { + AdvancedBaseClientConfiguration, BaseClient, BaseClientConfiguration, Decoder, @@ -190,8 +191,26 @@ export type GlideClusterClientConfiguration = BaseClientConfiguration & { * Will be applied via SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE commands during connection establishment. */ pubsubSubscriptions?: GlideClusterClientConfiguration.PubSubSubscriptions; + /** + * Advanced configuration settings for the client. + */ + advancedConfiguration?: AdvancedGlideClusterClientConfiguration; }; +/** + * Represents advanced configuration settings for creating a {@link GlideClusterClient | GlideClusterClient} used in {@link GlideClusterClientConfiguration | GlideClusterClientConfiguration}. + * + * + * @example + * ```typescript + * const config: AdvancedGlideClusterClientConfiguration = { + * connectionTimeout: 500, // Set the connection timeout to 500ms + * }; + * ``` + */ +export type AdvancedGlideClusterClientConfiguration = + AdvancedBaseClientConfiguration & {}; + /** * If the command's routing is to one node we will get T as a response type, * otherwise, we will get a dictionary of address: nodeResponse, address is of type string and nodeResponse is of type T. @@ -504,6 +523,14 @@ export class GlideClusterClient extends BaseClient { } this.configurePubsub(options, configuration); + + if (options.advancedConfiguration) { + this.configureAdvancedConfigurationBase( + options.advancedConfiguration, + configuration, + ); + } + return configuration; } /** diff --git a/node/tests/GlideClient.test.ts b/node/tests/GlideClient.test.ts index 409c92f1d4..32dcde243d 100644 --- a/node/tests/GlideClient.test.ts +++ b/node/tests/GlideClient.test.ts @@ -998,6 +998,76 @@ describe("GlideClient", () => { }, ); + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + "should handle connection timeout when client is blocked by long-running command (protocol: %p)", + async (protocol) => { + // Create a client configuration with a generous request timeout + const config = getClientConfigurationOption( + cluster.getAddresses(), + protocol, + { requestTimeout: 20000 }, // Long timeout to allow debugging operations (sleep for 7 seconds) + ); + + // Initialize the primary client + const client = await GlideClient.createClient(config); + + try { + // Run a long-running DEBUG SLEEP command using the first client (client) + const debugCommandPromise = client.customCommand( + ["DEBUG", "sleep", "7"], // Sleep for 7 seconds + ); + + // Function that tries to create a client with a short connection timeout (100ms) + const failToCreateClient = async () => { + await new Promise((resolve) => setTimeout(resolve, 1000)); // Wait for 1 second before retry + await expect( + GlideClient.createClient({ + connectionBackoff: { + exponentBase: 2, + factor: 100, + numberOfRetries: 1, + }, + advancedConfiguration: { connectionTimeout: 100 }, // 100ms connection timeout + ...config, // Include the rest of the config + }), + ).rejects.toThrowError(/timed out/i); // Ensure it throws a timeout error + }; + + // Function that verifies that a larger connection timeout allows connection + const connectWithLargeTimeout = async () => { + await new Promise((resolve) => setTimeout(resolve, 1000)); // Wait for 1 second before retry + const longerTimeoutClient = await GlideClient.createClient({ + connectionBackoff: { + exponentBase: 2, + factor: 100, + numberOfRetries: 1, + }, + advancedConfiguration: { connectionTimeout: 10000 }, // 10s connection timeout + ...config, // Include the rest of the config + }); + expect(await client.set("x", "y")).toEqual("OK"); + longerTimeoutClient.close(); // Close the client after successful connection + }; + + // Run both the long-running DEBUG SLEEP command and the client creation attempt in parallel + await Promise.all([ + debugCommandPromise, // Run the long-running command + failToCreateClient(), // Attempt to create the client with a short timeout + ]); + + // Run all tasks: fail short timeout, succeed with large timeout, and run the debug command + await Promise.all([ + debugCommandPromise, // Run the long-running command + connectWithLargeTimeout(), // Attempt to create the client with a short timeout + ]); + } finally { + // Clean up the test client and ensure everything is flushed and closed + client.close(); + } + }, + TIMEOUT, + ); + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( "function kill RW func %p", async (protocol) => { diff --git a/node/tests/GlideClusterClient.test.ts b/node/tests/GlideClusterClient.test.ts index 1e72051acb..f2553131f1 100644 --- a/node/tests/GlideClusterClient.test.ts +++ b/node/tests/GlideClusterClient.test.ts @@ -27,7 +27,6 @@ import { InfoOptions, ListDirection, ProtocolVersion, - ReadFrom, RequestError, Routes, ScoreFilter, @@ -328,6 +327,27 @@ describe("GlideClusterClient", () => { TIMEOUT, ); + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + `config get with wildcard and multi node route %p`, + async (protocol) => { + client = await GlideClusterClient.createClient( + getClientConfigurationOption(cluster.getAddresses(), protocol), + ); + const result = await client.configGet(["*file"], { + route: "allPrimaries", + }); + Object.values( + result as Record>, + ).forEach((resp) => { + const keys = Object.keys(resp); + expect(keys.length).toBeGreaterThan(5); + expect(keys).toContain("pidfile"); + expect(keys).toContain("logfile"); + }); + }, + TIMEOUT, + ); + describe.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( "Protocol is RESP2 = %s", (protocol) => { @@ -1977,6 +1997,68 @@ describe("GlideClusterClient", () => { TIMEOUT, ); + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + "should handle connection timeout when client is blocked by long-running command (protocol: %p)", + async (protocol) => { + // Create a client configuration with a generous request timeout + const config = getClientConfigurationOption( + cluster.getAddresses(), + protocol, + { requestTimeout: 20000 }, // Long timeout to allow debugging operations (sleep for 7 seconds) + ); + + // Initialize the primary client + const client = await GlideClusterClient.createClient(config); + + try { + // Run a long-running DEBUG SLEEP command using the first client (client) + const debugCommandPromise = client.customCommand( + ["DEBUG", "sleep", "7"], + { route: "allNodes" }, // Sleep for 7 seconds + ); + + // Function that tries to create a client with a short connection timeout (100ms) + const failToCreateClient = async () => { + await new Promise((resolve) => setTimeout(resolve, 1000)); // Wait for 1 second before retry + await expect( + GlideClusterClient.createClient({ + advancedConfiguration: { connectionTimeout: 100 }, // 100ms connection timeout + ...config, // Include the rest of the config + }), + ).rejects.toThrowError(/timed out/i); // Ensure it throws a timeout error + }; + + // Function that verifies that a larger connection timeout allows connection + const connectWithLargeTimeout = async () => { + await new Promise((resolve) => setTimeout(resolve, 1000)); // Wait for 1 second before retry + const longerTimeoutClient = + await GlideClusterClient.createClient({ + advancedConfiguration: { connectionTimeout: 10000 }, // 10s connection timeout + ...config, // Include the rest of the config + }); + expect(await client.set("x", "y")).toEqual("OK"); + longerTimeoutClient.close(); // Close the client after successful connection + }; + + // Run both the long-running DEBUG SLEEP command and the client creation attempt in parallel + await Promise.all([ + debugCommandPromise, // Run the long-running command + failToCreateClient(), // Attempt to create the client with a short timeout + ]); + + // Run all tasks: fail short timeout, succeed with large timeout, and run the debug command + await Promise.all([ + debugCommandPromise, // Run the long-running command + connectWithLargeTimeout(), // Attempt to create the client with a short timeout + ]); + } finally { + // Clean up the test client and ensure everything is flushed and closed + client.close(); + } + }, + TIMEOUT, + ); + it.each([ [ProtocolVersion.RESP2, 5], [ProtocolVersion.RESP2, 100], @@ -2024,62 +2106,81 @@ describe("GlideClusterClient", () => { } }, ); - describe("GlideClusterClient - AZAffinity Read Strategy Test", () => { + + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + "should return valid statistics using protocol %p", + async (protocol) => { + let glideClientForTesting; + + try { + // Create a GlideClusterClient instance for testing + glideClientForTesting = await GlideClusterClient.createClient( + getClientConfigurationOption( + cluster.getAddresses(), + protocol, + { + requestTimeout: 2000, + }, + ), + ); + + // Fetch statistics using get_statistics method + const stats = glideClientForTesting.getStatistics(); + + // Assertions to check if stats object has correct structure + expect(typeof stats).toBe("object"); + expect(stats).toHaveProperty("total_connections"); + expect(stats).toHaveProperty("total_clients"); + expect(Object.keys(stats)).toHaveLength(2); + } finally { + // Ensure the client is properly closed + glideClientForTesting?.close(); + } + }, + ); + + describe("AZAffinity Read Strategy Tests", () => { async function getNumberOfReplicas( azClient: GlideClusterClient, ): Promise { - const replicationInfo = await azClient.customCommand([ - "INFO", - "REPLICATION", - ]); - - if (Array.isArray(replicationInfo)) { - // Handle array response from cluster (CME Mode) - let totalReplicas = 0; - - for (const node of replicationInfo) { - const nodeInfo = node as { - key: string; - value: string | string[] | null; - }; - - if (typeof nodeInfo.value === "string") { - const lines = nodeInfo.value.split(/\r?\n/); - const connectedReplicasLine = lines.find( - (line) => - line.startsWith("connected_slaves:") || - line.startsWith("connected_replicas:"), - ); + const replicationInfo = (await azClient.info({ + sections: [InfoOptions.Replication], + })) as Record; + let totalReplicas = 0; + Object.values(replicationInfo).forEach((nodeInfo) => { + const lines = nodeInfo.split(/\r?\n/); + const connectedReplicasLine = lines.find( + (line) => + line.startsWith("connected_slaves:") || + line.startsWith("connected_replicas:"), + ); - if (connectedReplicasLine) { - const parts = connectedReplicasLine.split(":"); - const numReplicas = parseInt(parts[1], 10); + if (connectedReplicasLine) { + const parts = connectedReplicasLine.split(":"); + const numReplicas = parseInt(parts[1], 10); - if (!isNaN(numReplicas)) { - // Sum up replicas from each primary node - totalReplicas += numReplicas; - } - } + if (!isNaN(numReplicas)) { + // Sum up replicas from each primary node + totalReplicas += numReplicas; } } + }); - if (totalReplicas > 0) { - return totalReplicas; - } - - throw new Error( - "Could not find replica information in any node's response", - ); + if (totalReplicas > 0) { + return totalReplicas; } throw new Error( - "Unexpected response format from INFO REPLICATION command", + "Could not find replica information in any node's response", ); } it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( "should route GET commands to all replicas with the same AZ using protocol %p", async (protocol) => { + // Skip test if version is below 8.0.0 + if (cluster.checkIfServerVersionLessThan("8.0.0")) return; + const az = "us-east-1a"; const GET_CALLS_PER_REPLICA = 3; @@ -2095,21 +2196,9 @@ describe("GlideClusterClient", () => { protocol, ), ); - - // Skip test if version is below 8.0.0 - if (cluster.checkIfServerVersionLessThan("8.0.0")) { - console.log( - "Skipping test: requires Valkey 8.0.0 or higher", - ); - return; - } - - await client_for_config_set.customCommand([ - "CONFIG", - "RESETSTAT", - ]); - await client_for_config_set.customCommand( - ["CONFIG", "SET", "availability-zone", az], + await client_for_config_set.configResetStat(); + await client_for_config_set.configSet( + { "availability-zone": az }, { route: "allNodes" }, ); @@ -2134,46 +2223,22 @@ describe("GlideClusterClient", () => { azCluster.getAddresses(), protocol, { - readFrom: "AZAffinity" as ReadFrom, + readFrom: "AZAffinity", clientAz: az, }, ), ); - const azs = await client_for_testing_az.customCommand( - ["CONFIG", "GET", "availability-zone"], + const azs = (await client_for_testing_az.configGet( + ["availability-zone"], { route: "allNodes" }, - ); + )) as Record>; - if (Array.isArray(azs)) { - const allAZsMatch = azs.every((node) => { - const nodeResponse = node as { - key: string; - value: string | number; - }; - - if (protocol === ProtocolVersion.RESP2) { - // RESP2: Direct array format ["availability-zone", "us-east-1a"] - return ( - Array.isArray(nodeResponse.value) && - nodeResponse.value[1] === az - ); - } else { - // RESP3: Nested object format [{ key: "availability-zone", value: "us-east-1a" }] - return ( - Array.isArray(nodeResponse.value) && - nodeResponse.value[0]?.key === - "availability-zone" && - nodeResponse.value[0]?.value === az - ); - } - }); - expect(allAZsMatch).toBe(true); - } else { - throw new Error( - "Unexpected response format from CONFIG GET command", - ); - } + Object.values(azs).forEach((nodeResponse) => + expect(nodeResponse["availability-zone"]).toEqual( + "us-east-1a", + ), + ); // Stage 3: Set test data and perform GET operations await client_for_testing_az.set("foo", "testvalue"); @@ -2183,52 +2248,40 @@ describe("GlideClusterClient", () => { } // Stage 4: Verify GET commands were routed correctly - const info_result = - await client_for_testing_az.customCommand( - ["INFO", "ALL"], // Get both replication and commandstats info - { route: "allNodes" }, - ); - - if (Array.isArray(info_result)) { - const matching_entries_count = info_result.filter( - (node) => { - const nodeInfo = node as { - key: string; - value: string | string[] | null; - }; - const infoStr = - nodeInfo.value?.toString() || ""; - - // Check if this is a replica node AND it has the expected number of GET calls - const isReplicaNode = - infoStr.includes("role:slave") || - infoStr.includes("role:replica"); - - return ( - isReplicaNode && - infoStr.includes(get_cmdstat) - ); - }, - ).length; - - expect(matching_entries_count).toBe(n_replicas); // Should expect 12 as the cluster was created with 3 primary and 4 replicas, totalling 12 replica nodes - } else { - throw new Error( - "Unexpected response format from INFO command", - ); - } + const info_result = (await client_for_testing_az.info( + { sections: [InfoOptions.All], route: "allNodes" }, // Get both replication and commandstats info + )) as Record; + + const matching_entries_count = Object.values( + info_result, + ).filter((infoStr) => { + // Check if this is a replica node AND it has the expected number of GET calls + const isReplicaNode = + infoStr.includes("role:slave") || + infoStr.includes("role:replica"); + + return isReplicaNode && infoStr.includes(get_cmdstat); + }).length; + + expect(matching_entries_count).toBe(n_replicas); // Should expect 12 as the cluster was created with 3 primary and 4 replicas, totalling 12 replica nodes } finally { // Cleanup - await client_for_config_set?.close(); - await client_for_testing_az?.close(); + await client_for_config_set?.configSet( + { "availability-zone": "" }, + { route: "allNodes" }, + ); + client_for_config_set?.close(); + client_for_testing_az?.close(); } }, ); - }); - describe("GlideClusterClient - AZAffinity Routing to 1 replica", () => { + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( "should route commands to single replica with AZ using protocol %p", async (protocol) => { + // Skip test if version is below 8.0.0 + if (cluster.checkIfServerVersionLessThan("8.0.0")) return; + const az = "us-east-1a"; const GET_CALLS = 3; const get_cmdstat = `calls=${GET_CALLS}`; @@ -2245,26 +2298,15 @@ describe("GlideClusterClient", () => { ), ); - // Skip test if version is below 8.0.0 - if (cluster.checkIfServerVersionLessThan("8.0.0")) { - console.log( - "Skipping test: requires Valkey 8.0.0 or higher", - ); - return; - } - - await client_for_config_set.customCommand( - ["CONFIG", "SET", "availability-zone", ""], + await client_for_config_set.configSet( + { "availability-zone": "" }, { route: "allNodes" }, ); - await client_for_config_set.customCommand([ - "CONFIG", - "RESETSTAT", - ]); + await client_for_config_set.configResetStat(); - await client_for_config_set.customCommand( - ["CONFIG", "SET", "availability-zone", az], + await client_for_config_set.configSet( + { "availability-zone": az }, { route: { type: "replicaSlotId", id: 12182 } }, ); @@ -2287,73 +2329,54 @@ describe("GlideClusterClient", () => { } // Stage 4: Verify GET commands were routed correctly - const info_result = - await client_for_testing_az.customCommand( - ["INFO", "ALL"], - { route: "allNodes" }, - ); + const info_result = (await client_for_testing_az.info({ + sections: [InfoOptions.All], + route: "allNodes", + })) as Record; // Process the info_result to check that only one replica has the GET calls - if (Array.isArray(info_result)) { - // Count the number of nodes where both get_cmdstat and az are present - const matching_entries_count = info_result.filter( - (node) => { - const nodeInfo = node as { - key: string; - value: string | string[] | null; - }; - const infoStr = - nodeInfo.value?.toString() || ""; - return ( - infoStr.includes(get_cmdstat) && - infoStr.includes(`availability_zone:${az}`) - ); - }, - ).length; - - expect(matching_entries_count).toBe(1); - - // Check that only one node has the availability zone set to az - const changed_az_count = info_result.filter((node) => { - const nodeInfo = node as { - key: string; - value: string | string[] | null; - }; - const infoStr = nodeInfo.value?.toString() || ""; + const matching_entries_count = Object.values( + info_result, + ).filter((infoStr) => { + return ( + infoStr.includes(get_cmdstat) && + infoStr.includes(`availability_zone:${az}`) + ); + }).length; + + expect(matching_entries_count).toBe(1); + + // Check that only one node has the availability zone set to az + const changed_az_count = Object.values(info_result).filter( + (infoStr) => { return infoStr.includes(`availability_zone:${az}`); - }).length; + }, + ).length; - expect(changed_az_count).toBe(1); - } else { - throw new Error( - "Unexpected response format from INFO command", - ); - } + expect(changed_az_count).toBe(1); } finally { - await client_for_config_set?.close(); - await client_for_testing_az?.close(); + await client_for_config_set?.configSet( + { "availability-zone": "" }, + { route: "allNodes" }, + ); + client_for_config_set?.close(); + client_for_testing_az?.close(); } }, ); - }); - describe("GlideClusterClient - AZAffinity with Non-existing AZ", () => { + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( "should route commands to a replica when AZ does not exist using protocol %p", async (protocol) => { + // Skip test if version is below 8.0.0 + if (cluster.checkIfServerVersionLessThan("8.0.0")) return; + const GET_CALLS = 4; const replica_calls = 1; const get_cmdstat = `cmdstat_get:calls=${replica_calls}`; let client_for_testing_az; try { - // Skip test if server version is below 8.0.0 - if (azCluster.checkIfServerVersionLessThan("8.0.0")) { - console.log( - "Skipping test: requires Valkey 8.0.0 or higher", - ); - return; - } - // Create a client configured for AZAffinity with a non-existing AZ client_for_testing_az = await GlideClusterClient.createClient( @@ -2369,10 +2392,9 @@ describe("GlideClusterClient", () => { ); // Reset command stats on all nodes - await client_for_testing_az.customCommand( - ["CONFIG", "RESETSTAT"], - { route: "allNodes" }, - ); + await client_for_testing_az.configResetStat({ + route: "allNodes", + }); // Issue GET commands for (let i = 0; i < GET_CALLS; i++) { @@ -2380,76 +2402,23 @@ describe("GlideClusterClient", () => { } // Fetch command stats from all nodes - const info_result = - await client_for_testing_az.customCommand( - ["INFO", "COMMANDSTATS"], - { route: "allNodes" }, - ); + const info_result = (await client_for_testing_az.info({ + sections: [InfoOptions.Commandstats], + route: "allNodes", + })) as Record; // Inline matching logic - let matchingEntriesCount = 0; - - if ( - typeof info_result === "object" && - info_result !== null - ) { - const nodeResponses = Object.values(info_result); - - for (const response of nodeResponses) { - if ( - response && - typeof response === "object" && - "value" in response && - response.value.includes(get_cmdstat) - ) { - matchingEntriesCount++; - } - } - } else { - throw new Error( - "Unexpected response format from INFO command", - ); - } + const matchingEntriesCount = Object.values( + info_result, + ).filter((nodeResponses) => { + return nodeResponses.includes(get_cmdstat); + }).length; // Validate that only one replica handled the GET calls expect(matchingEntriesCount).toBe(4); } finally { // Cleanup: Close the client after test execution - await client_for_testing_az?.close(); - } - }, - ); - }); - describe("GlideClusterClient - Get Statistics", () => { - it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( - "should return valid statistics using protocol %p", - async (protocol) => { - let glideClientForTesting; - - try { - // Create a GlideClusterClient instance for testing - glideClientForTesting = - await GlideClusterClient.createClient( - getClientConfigurationOption( - cluster.getAddresses(), - protocol, - { - requestTimeout: 2000, - }, - ), - ); - - // Fetch statistics using get_statistics method - const stats = await glideClientForTesting.getStatistics(); - - // Assertions to check if stats object has correct structure - expect(typeof stats).toBe("object"); - expect(stats).toHaveProperty("total_connections"); - expect(stats).toHaveProperty("total_clients"); - expect(Object.keys(stats)).toHaveLength(2); - } finally { - // Ensure the client is properly closed - await glideClientForTesting?.close(); + client_for_testing_az?.close(); } }, ); diff --git a/python/python/glide/__init__.py b/python/python/glide/__init__.py index 8f6ceac47b..f2ecc3da4e 100644 --- a/python/python/glide/__init__.py +++ b/python/python/glide/__init__.py @@ -111,6 +111,8 @@ TTransaction, ) from glide.config import ( + AdvancedGlideClientConfiguration, + AdvancedGlideClusterClientConfiguration, BackoffStrategy, GlideClientConfiguration, GlideClusterClientConfiguration, @@ -176,6 +178,8 @@ "TGlideClient", "TTransaction", # Config + "AdvancedGlideClientConfiguration", + "AdvancedGlideClusterClientConfiguration", "GlideClientConfiguration", "GlideClusterClientConfiguration", "BackoffStrategy", diff --git a/python/python/glide/config.py b/python/python/glide/config.py index b33c037cbf..fc1acda94c 100644 --- a/python/python/glide/config.py +++ b/python/python/glide/config.py @@ -129,6 +129,28 @@ class PeriodicChecksStatus(Enum): """ +class AdvancedBaseClientConfiguration: + """ + Represents the advanced configuration settings for a base Glide client. + + Args: + connection_timeout (Optional[int]): The duration in milliseconds to wait for a TCP/TLS connection to complete. + This applies both during initial client creation and any reconnections that may occur during request processing. + **Note**: A high connection timeout may lead to prolonged blocking of the entire command pipeline. + If not explicitly set, a default value of 250 milliseconds will be used. + """ + + def __init__(self, connection_timeout: Optional[int] = None): + self.connection_timeout = connection_timeout + + def _create_a_protobuf_conn_request( + self, request: ConnectionRequest + ) -> ConnectionRequest: + if self.connection_timeout: + request.connection_timeout = self.connection_timeout + return request + + class BaseClientConfiguration: def __init__( self, @@ -141,6 +163,7 @@ def __init__( protocol: ProtocolVersion = ProtocolVersion.RESP3, inflight_requests_limit: Optional[int] = None, client_az: Optional[str] = None, + advanced_config: Optional[AdvancedBaseClientConfiguration] = None, ): """ Represents the configuration settings for a Glide client. @@ -163,12 +186,14 @@ def __init__( read_from (ReadFrom): If not set, `PRIMARY` will be used. request_timeout (Optional[int]): The duration in milliseconds that the client should wait for a request to complete. This duration encompasses sending the request, awaiting for a response from the server, and any required reconnections or retries. - If the specified timeout is exceeded for a pending request, it will result in a timeout error. If not set, a default value will be used. + If the specified timeout is exceeded for a pending request, it will result in a timeout error. If not explicitly set, a default value of 250 milliseconds will be used. client_name (Optional[str]): Client name to be used for the client. Will be used with CLIENT SETNAME command during connection establishment. inflight_requests_limit (Optional[int]): The maximum number of concurrent requests allowed to be in-flight (sent but not yet completed). This limit is used to control the memory usage and prevent the client from overwhelming the server or getting stuck in case of a queue backlog. If not set, a default value will be used. - + client_az (Optional[str]): Availability Zone of the client. + If ReadFrom strategy is AZAffinity, this setting ensures that readonly commands are directed to replicas within the specified AZ if exits. + advanced_config (Optional[AdvancedBaseClientConfiguration]): Advanced configuration settings for the client. """ self.addresses = addresses self.use_tls = use_tls @@ -179,6 +204,7 @@ def __init__( self.protocol = protocol self.inflight_requests_limit = inflight_requests_limit self.client_az = client_az + self.advanced_config = advanced_config if read_from == ReadFrom.AZ_AFFINITY and not client_az: raise ValueError( @@ -218,6 +244,8 @@ def _create_a_protobuf_conn_request( request.inflight_requests_limit = self.inflight_requests_limit if self.client_az: request.client_az = self.client_az + if self.advanced_config: + self.advanced_config._create_a_protobuf_conn_request(request) return request @@ -230,6 +258,16 @@ def _get_pubsub_callback_and_context( return None, None +class AdvancedGlideClientConfiguration(AdvancedBaseClientConfiguration): + """ + Represents the advanced configuration settings for a Standalone Glide client. + """ + + def __init__(self, connection_timeout: Optional[int] = None): + + super().__init__(connection_timeout) + + class GlideClientConfiguration(BaseClientConfiguration): """ Represents the configuration settings for a Standalone Glide client. @@ -249,7 +287,7 @@ class GlideClientConfiguration(BaseClientConfiguration): request_timeout (Optional[int]): The duration in milliseconds that the client should wait for a request to complete. This duration encompasses sending the request, awaiting for a response from the server, and any required reconnections or retries. If the specified timeout is exceeded for a pending request, it will result in a timeout error. - If not set, a default value will be used. + If not explicitly set, a default value of 250 milliseconds will be used. reconnect_strategy (Optional[BackoffStrategy]): Strategy used to determine how and when to reconnect, in case of connection failures. If not set, a default backoff strategy will be used. @@ -261,7 +299,9 @@ class GlideClientConfiguration(BaseClientConfiguration): inflight_requests_limit (Optional[int]): The maximum number of concurrent requests allowed to be in-flight (sent but not yet completed). This limit is used to control the memory usage and prevent the client from overwhelming the server or getting stuck in case of a queue backlog. If not set, a default value will be used. - + client_az (Optional[str]): Availability Zone of the client. + If ReadFrom strategy is AZAffinity, this setting ensures that readonly commands are directed to replicas within the specified AZ if exits. + advanced_config (Optional[AdvancedGlideClientConfiguration]): Advanced configuration settings for the client, see `AdvancedGlideClientConfiguration`. """ class PubSubChannelModes(IntEnum): @@ -308,6 +348,7 @@ def __init__( pubsub_subscriptions: Optional[PubSubSubscriptions] = None, inflight_requests_limit: Optional[int] = None, client_az: Optional[str] = None, + advanced_config: Optional[AdvancedGlideClientConfiguration] = None, ): super().__init__( addresses=addresses, @@ -319,6 +360,7 @@ def __init__( protocol=protocol, inflight_requests_limit=inflight_requests_limit, client_az=client_az, + advanced_config=advanced_config, ) self.reconnect_strategy = reconnect_strategy self.database_id = database_id @@ -375,6 +417,15 @@ def _get_pubsub_callback_and_context( return None, None +class AdvancedGlideClusterClientConfiguration(AdvancedBaseClientConfiguration): + """ + Represents the advanced configuration settings for a Glide Cluster client. + """ + + def __init__(self, connection_timeout: Optional[int] = None): + super().__init__(connection_timeout) + + class GlideClusterClientConfiguration(BaseClientConfiguration): """ Represents the configuration settings for a Cluster Glide client. @@ -392,7 +443,7 @@ class GlideClusterClientConfiguration(BaseClientConfiguration): read_from (ReadFrom): If not set, `PRIMARY` will be used. request_timeout (Optional[int]): The duration in milliseconds that the client should wait for a request to complete. This duration encompasses sending the request, awaiting for a response from the server, and any required reconnections or retries. - If the specified timeout is exceeded for a pending request, it will result in a timeout error. If not set, a default value will be used. + If the specified timeout is exceeded for a pending request, it will result in a timeout error. If not explicitly set, a default value of 250 milliseconds will be used. client_name (Optional[str]): Client name to be used for the client. Will be used with CLIENT SETNAME command during connection establishment. protocol (ProtocolVersion): The version of the RESP protocol to communicate with the server. periodic_checks (Union[PeriodicChecksStatus, PeriodicChecksManualInterval]): Configure the periodic topology checks. @@ -404,7 +455,9 @@ class GlideClusterClientConfiguration(BaseClientConfiguration): inflight_requests_limit (Optional[int]): The maximum number of concurrent requests allowed to be in-flight (sent but not yet completed). This limit is used to control the memory usage and prevent the client from overwhelming the server or getting stuck in case of a queue backlog. If not set, a default value will be used. - + client_az (Optional[str]): Availability Zone of the client. + If ReadFrom strategy is AZAffinity, this setting ensures that readonly commands are directed to replicas within the specified AZ if exits. + advanced_config (Optional[AdvancedGlideClusterClientConfiguration]) : Advanced configuration settings for the client, see `AdvancedGlideClusterClientConfiguration`. Notes: @@ -459,6 +512,7 @@ def __init__( pubsub_subscriptions: Optional[PubSubSubscriptions] = None, inflight_requests_limit: Optional[int] = None, client_az: Optional[str] = None, + advanced_config: Optional[AdvancedGlideClusterClientConfiguration] = None, ): super().__init__( addresses=addresses, @@ -470,6 +524,7 @@ def __init__( protocol=protocol, inflight_requests_limit=inflight_requests_limit, client_az=client_az, + advanced_config=advanced_config, ) self.periodic_checks = periodic_checks self.pubsub_subscriptions = pubsub_subscriptions diff --git a/python/python/tests/conftest.py b/python/python/tests/conftest.py index 0ab5c9d6e9..b2a97b4d0a 100644 --- a/python/python/tests/conftest.py +++ b/python/python/tests/conftest.py @@ -5,6 +5,9 @@ import pytest from glide.config import ( + AdvancedGlideClientConfiguration, + AdvancedGlideClusterClientConfiguration, + BackoffStrategy, GlideClientConfiguration, GlideClusterClientConfiguration, NodeAddress, @@ -242,7 +245,8 @@ async def create_client( addresses: Optional[List[NodeAddress]] = None, client_name: Optional[str] = None, protocol: ProtocolVersion = ProtocolVersion.RESP3, - timeout: Optional[int] = 1000, + request_timeout: Optional[int] = 1000, + connection_timeout: Optional[int] = 1000, cluster_mode_pubsub: Optional[ GlideClusterClientConfiguration.PubSubSubscriptions ] = None, @@ -252,6 +256,7 @@ async def create_client( inflight_requests_limit: Optional[int] = None, read_from: ReadFrom = ReadFrom.PRIMARY, client_az: Optional[str] = None, + reconnect_strategy: Optional[BackoffStrategy] = None, valkey_cluster: Optional[ValkeyCluster] = None, ) -> Union[GlideClient, GlideClusterClient]: # Create async socket client @@ -268,11 +273,12 @@ async def create_client( credentials=credentials, client_name=client_name, protocol=protocol, - request_timeout=timeout, + request_timeout=request_timeout, pubsub_subscriptions=cluster_mode_pubsub, inflight_requests_limit=inflight_requests_limit, read_from=read_from, client_az=client_az, + advanced_config=AdvancedGlideClusterClientConfiguration(connection_timeout), ) return await GlideClusterClient.create(cluster_config) else: @@ -286,11 +292,13 @@ async def create_client( database_id=database_id, client_name=client_name, protocol=protocol, - request_timeout=timeout, + request_timeout=request_timeout, pubsub_subscriptions=standalone_mode_pubsub, inflight_requests_limit=inflight_requests_limit, read_from=read_from, client_az=client_az, + advanced_config=AdvancedGlideClientConfiguration(connection_timeout), + reconnect_strategy=reconnect_strategy, ) return await GlideClient.create(config) @@ -343,7 +351,7 @@ async def test_teardown(request, cluster_mode: bool, protocol: ProtocolVersion): try: # Try connecting without credentials client = await create_client( - request, cluster_mode, protocol=protocol, timeout=2000 + request, cluster_mode, protocol=protocol, request_timeout=2000 ) await client.custom_command(["FLUSHALL"]) await client.close() @@ -356,7 +364,7 @@ async def test_teardown(request, cluster_mode: bool, protocol: ProtocolVersion): request, cluster_mode, protocol=protocol, - timeout=2000, + request_timeout=2000, credentials=credentials, ) try: diff --git a/python/python/tests/test_api_export.py b/python/python/tests/test_api_export.py index 996a8821a3..e169950da0 100644 --- a/python/python/tests/test_api_export.py +++ b/python/python/tests/test_api_export.py @@ -55,6 +55,7 @@ def _get_export_rename_map(): "FtSearchKeywords", # ClassDef "FtAggregateKeywords", # ClassDef "FtProfileKeywords", # ClassDef + "AdvancedBaseClientConfiguration", # ClassDef ] diff --git a/python/python/tests/test_async_client.py b/python/python/tests/test_async_client.py index 23064b2f3d..3db7e965db 100644 --- a/python/python/tests/test_async_client.py +++ b/python/python/tests/test_async_client.py @@ -72,6 +72,7 @@ ) from glide.async_commands.transaction import ClusterTransaction, Transaction from glide.config import ( + BackoffStrategy, GlideClientConfiguration, GlideClusterClientConfiguration, ProtocolVersion, @@ -128,7 +129,7 @@ async def test_register_client_name_and_version(self, glide_client: TGlideClient @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_send_and_receive_large_values(self, request, cluster_mode, protocol): glide_client = await create_client( - request, cluster_mode=cluster_mode, protocol=protocol, timeout=5000 + request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=5000 ) length = 2**25 # 33mb key = "0" * length @@ -302,6 +303,90 @@ async def test_statistics(self, glide_client: TGlideClient): assert "total_clients" in stats assert len(stats) == 2 + @pytest.mark.parametrize("cluster_mode", [True, False]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_connection_timeout( + self, + request, + cluster_mode: bool, + protocol: ProtocolVersion, + ): + + client = await create_client( + request, + cluster_mode, + protocol=protocol, + request_timeout=2000, + connection_timeout=2000, + ) + assert isinstance(client, (GlideClient, GlideClusterClient)) + + assert await client.set("key", "value") == "OK" + + await client.close() + + @pytest.mark.parametrize("cluster_mode", [True, False]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_connection_timeout_when_client_is_blocked( + self, + request, + cluster_mode: bool, + protocol: ProtocolVersion, + ): + client = await create_client( + request, + cluster_mode, + protocol=protocol, + request_timeout=20000, # 20 seconds timeout + ) + + async def run_debug_sleep(): + """ + Run a long-running DEBUG SLEEP command. + """ + command = ["DEBUG", "sleep", "7"] + if isinstance(client, GlideClusterClient): + await client.custom_command(command, AllNodes()) + else: + await client.custom_command(command) + + async def fail_to_connect_to_client(): + # try to connect with a small timeout connection + await asyncio.sleep(1) + with pytest.raises(ClosingError) as e: + await create_client( + request, + cluster_mode, + protocol=protocol, + connection_timeout=100, # 100 ms + reconnect_strategy=BackoffStrategy( + 1, 100, 2 + ), # needs to be configured so that we wont be connected within 7 seconds bc of default retries + ) + assert "timed out" in str(e) + + async def connect_to_client(): + # Create a second client with a connection timeout of 7 seconds + await asyncio.sleep(1) + timeout_client = await create_client( + request, + cluster_mode, + protocol=protocol, + connection_timeout=10000, # 10-second connection timeout + reconnect_strategy=BackoffStrategy(1, 100, 2), + ) + + # Ensure the second client can connect and perform a simple operation + assert await timeout_client.set("key", "value") == "OK" + await timeout_client.close() + + # Run tests + await asyncio.gather(run_debug_sleep(), fail_to_connect_to_client()) + await asyncio.gather(run_debug_sleep(), connect_to_client()) + + # Clean up the main client + await client.close() + @pytest.mark.asyncio class TestCommands: @@ -770,6 +855,18 @@ async def test_config_get_set(self, glide_client: TGlideClient): == OK ) + @pytest.mark.parametrize("cluster_mode", [True]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_config_get_with_wildcard_and_multi_node_route( + self, glide_client: GlideClusterClient + ): + result = await glide_client.config_get(["*file"], AllPrimaries()) + assert isinstance(result, Dict) + for resp in result.values(): + assert len(resp) > 5 + assert b"pidfile" in resp + assert b"logfile" in resp + @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_decr_decrby_existing_key(self, glide_client: TGlideClient): @@ -5412,7 +5509,10 @@ async def test_xread_edge_cases_and_failures( ) test_client = await create_client( - request=request, protocol=protocol, cluster_mode=cluster_mode, timeout=900 + request=request, + protocol=protocol, + cluster_mode=cluster_mode, + request_timeout=900, ) # ensure command doesn't time out even if timeout > request timeout assert ( @@ -5805,7 +5905,10 @@ async def test_xreadgroup_edge_cases_and_failures( ) test_client = await create_client( - request=request, protocol=protocol, cluster_mode=cluster_mode, timeout=900 + request=request, + protocol=protocol, + cluster_mode=cluster_mode, + request_timeout=900, ) timeout_key = f"{{testKey}}:{get_random_string(10)}" timeout_group_name = get_random_string(10) @@ -8333,11 +8436,11 @@ async def test_function_stats_running_script( # create a second client to run fcall test_client = await create_client( - request, cluster_mode=cluster_mode, protocol=protocol, timeout=30000 + request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=30000 ) test_client2 = await create_client( - request, cluster_mode=cluster_mode, protocol=protocol, timeout=30000 + request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=30000 ) async def endless_fcall_route_call(): @@ -8462,7 +8565,7 @@ async def test_function_kill_no_write( # create a second client to run fcall test_client = await create_client( - request, cluster_mode=cluster_mode, protocol=protocol, timeout=15000 + request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=15000 ) async def endless_fcall_route_call(): @@ -8517,7 +8620,7 @@ async def test_function_kill_write_is_unkillable( # create a second client to run fcall - and give it a long timeout test_client = await create_client( - request, cluster_mode=cluster_mode, protocol=protocol, timeout=15000 + request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=15000 ) # call fcall to run the function loaded function @@ -10365,7 +10468,7 @@ async def test_script_binary(self, glide_client: TGlideClient): @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_script_large_keys_no_args(self, request, cluster_mode, protocol): glide_client = await create_client( - request, cluster_mode=cluster_mode, protocol=protocol, timeout=5000 + request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=5000 ) length = 2**13 # 8kb key = "0" * length @@ -10377,7 +10480,7 @@ async def test_script_large_keys_no_args(self, request, cluster_mode, protocol): @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_script_large_args_no_keys(self, request, cluster_mode, protocol): glide_client = await create_client( - request, cluster_mode=cluster_mode, protocol=protocol, timeout=5000 + request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=5000 ) length = 2**12 # 4kb arg1 = "0" * length @@ -10393,7 +10496,7 @@ async def test_script_large_args_no_keys(self, request, cluster_mode, protocol): @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_script_large_keys_and_args(self, request, cluster_mode, protocol): glide_client = await create_client( - request, cluster_mode=cluster_mode, protocol=protocol, timeout=5000 + request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=5000 ) length = 2**12 # 4kb key = "0" * length @@ -10477,7 +10580,7 @@ async def test_script_kill_route( # Create a second client to run the script test_client = await create_client( - request, cluster_mode=cluster_mode, protocol=protocol, timeout=30000 + request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=30000 ) await script_kill_tests(glide_client, test_client, route) @@ -10493,7 +10596,7 @@ async def test_script_kill_no_route( ): # Create a second client to run the script test_client = await create_client( - request, cluster_mode=cluster_mode, protocol=protocol, timeout=30000 + request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=30000 ) await script_kill_tests(glide_client, test_client) @@ -10505,12 +10608,12 @@ async def test_script_kill_unkillable( ): # Create a second client to run the script test_client = await create_client( - request, cluster_mode=cluster_mode, protocol=protocol, timeout=30000 + request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=30000 ) # Create a second client to kill the script test_client2 = await create_client( - request, cluster_mode=cluster_mode, protocol=protocol, timeout=15000 + request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=15000 ) # Add test for script_kill with writing script diff --git a/python/python/tests/test_config.py b/python/python/tests/test_config.py index 3b22adb09c..2476d8ec0f 100644 --- a/python/python/tests/test_config.py +++ b/python/python/tests/test_config.py @@ -1,16 +1,22 @@ # Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 from glide.config import ( + AdvancedGlideClientConfiguration, + AdvancedGlideClusterClientConfiguration, BaseClientConfiguration, + GlideClientConfiguration, GlideClusterClientConfiguration, NodeAddress, PeriodicChecksManualInterval, PeriodicChecksStatus, + ProtocolVersion, ReadFrom, ) +from glide.glide_client import GlideClient, GlideClusterClient from glide.protobuf.connection_request_pb2 import ConnectionRequest from glide.protobuf.connection_request_pb2 import ReadFrom as ProtobufReadFrom from glide.protobuf.connection_request_pb2 import TlsMode +from tests.conftest import create_client def test_default_client_config(): @@ -67,3 +73,24 @@ def test_convert_config_with_azaffinity_to_protobuf(): assert request.tls_mode is TlsMode.SecureTls assert request.read_from == ProtobufReadFrom.AZAffinity assert request.client_az == az + + +def test_connection_timeout_in_protobuf_request(): + connection_timeout = 5000 # in milliseconds + config = GlideClientConfiguration( + [NodeAddress("127.0.0.1")], + advanced_config=AdvancedGlideClientConfiguration(connection_timeout), + ) + request = config._create_a_protobuf_conn_request() + + assert isinstance(request, ConnectionRequest) + assert request.connection_timeout == connection_timeout + + config = GlideClusterClientConfiguration( + [NodeAddress("127.0.0.1")], + advanced_config=AdvancedGlideClusterClientConfiguration(connection_timeout), + ) + request = config._create_a_protobuf_conn_request(cluster_mode=True) + + assert isinstance(request, ConnectionRequest) + assert request.connection_timeout == connection_timeout diff --git a/python/python/tests/test_pubsub.py b/python/python/tests/test_pubsub.py index 6069104ed7..60baf383b2 100644 --- a/python/python/tests/test_pubsub.py +++ b/python/python/tests/test_pubsub.py @@ -66,7 +66,7 @@ async def create_two_clients_with_pubsub( cluster_mode_pubsub=cluster_mode_pubsub1, standalone_mode_pubsub=standalone_mode_pubsub1, protocol=protocol, - timeout=timeout, + request_timeout=timeout, ) try: client2 = await create_client( @@ -75,7 +75,7 @@ async def create_two_clients_with_pubsub( cluster_mode_pubsub=cluster_mode_pubsub2, standalone_mode_pubsub=standalone_mode_pubsub2, protocol=protocol, - timeout=timeout, + request_timeout=timeout, ) except Exception as e: await client1.close() diff --git a/python/python/tests/test_read_from_strategy.py b/python/python/tests/test_read_from_strategy.py index 03f3f8e9ae..cddb1e6f10 100644 --- a/python/python/tests/test_read_from_strategy.py +++ b/python/python/tests/test_read_from_strategy.py @@ -46,7 +46,7 @@ async def test_routing_with_az_affinity_strategy_to_1_replica( cluster_mode, # addresses=multiple_replicas_cluster.nodes_addr, protocol=protocol, - timeout=2000, + request_timeout=2000, ) # Reset the availability zone for all nodes @@ -67,7 +67,7 @@ async def test_routing_with_az_affinity_strategy_to_1_replica( cluster_mode, protocol=protocol, read_from=ReadFrom.AZ_AFFINITY, - timeout=2000, + request_timeout=2000, client_az=az, ) @@ -113,7 +113,7 @@ async def test_routing_by_slot_to_replica_with_az_affinity_strategy_to_all_repli cluster_mode, # addresses=multiple_replicas_cluster.nodes_addr, protocol=protocol, - timeout=2000, + request_timeout=2000, ) assert await client_for_config_set.config_resetstat() == OK await client_for_config_set.custom_command( @@ -125,7 +125,7 @@ async def test_routing_by_slot_to_replica_with_az_affinity_strategy_to_all_repli cluster_mode, protocol=protocol, read_from=ReadFrom.AZ_AFFINITY, - timeout=2000, + request_timeout=2000, client_az=az, ) azs = await client_for_testing_az.custom_command( @@ -181,7 +181,7 @@ async def test_az_affinity_non_existing_az( # addresses=multiple_replicas_cluster.nodes_addr, protocol=protocol, read_from=ReadFrom.AZ_AFFINITY, - timeout=2000, + request_timeout=2000, client_az="non-existing-az", ) assert await client_for_testing_az.config_resetstat() == OK @@ -217,5 +217,5 @@ async def test_az_affinity_requires_client_az( cluster_mode=cluster_mode, protocol=protocol, read_from=ReadFrom.AZ_AFFINITY, - timeout=2000, + request_timeout=2000, ) diff --git a/python/python/tests/test_transaction.py b/python/python/tests/test_transaction.py index 2ec2eee854..7affca711b 100644 --- a/python/python/tests/test_transaction.py +++ b/python/python/tests/test_transaction.py @@ -972,7 +972,7 @@ async def test_can_return_null_on_watch_transaction_failures( @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_transaction_large_values(self, request, cluster_mode, protocol): glide_client = await create_client( - request, cluster_mode=cluster_mode, protocol=protocol, timeout=5000 + request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=5000 ) length = 2**25 # 33mb key = "0" * length diff --git a/utils/cluster_manager.py b/utils/cluster_manager.py index 36e23723d2..604001d196 100755 --- a/utils/cluster_manager.py +++ b/utils/cluster_manager.py @@ -341,6 +341,39 @@ def start_server( node_folder = f"{cluster_folder}/{port}" Path(node_folder).mkdir(exist_ok=True) + # Determine which server to use by checking `valkey-server` and `redis-server` + def get_server_command() -> str: + for server in ["valkey-server", "redis-server"]: + try: + result = subprocess.run( + ["which", server], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + if result.returncode == 0: + return server + except Exception as e: + logging.error(f"Error checking {server}: {e}") + raise Exception( + "Neither valkey-server nor redis-server found in the system.") + + def get_server_version(server_name): + result = subprocess.run( + [server_name, "--version"], capture_output=True, text=True + ) + version_output = result.stdout + version_match = re.search( + r"(?:Redis|Valkey) server v=(\d+\.\d+\.\d+)", version_output, re.IGNORECASE + ) + if version_match: + return tuple(map(int, version_match.group(1).split("."))) + raise Exception("Unable to determine server version.") + + server_name = get_server_command() + server_version = get_server_version(server_name) + logfile = f"{node_folder}/redis.log" + # Define command arguments logfile = f"{node_folder}/server.log" cmd_args = [ @@ -358,6 +391,8 @@ def start_server( "--protected-mode", "no", ] + if server_version >= (7, 0, 0): + cmd_args.extend(["--enable-debug-command", "yes"]) if load_module: if len(load_module) == 0: raise ValueError(