Skip to content

Commit

Permalink
Merge branch 'release-1.2'
Browse files Browse the repository at this point in the history
  • Loading branch information
ikolomi committed Dec 29, 2024
2 parents 2d6ae0b + 0bcca10 commit ec6fa62
Show file tree
Hide file tree
Showing 38 changed files with 1,153 additions and 360 deletions.
15 changes: 6 additions & 9 deletions .github/workflows/create-test-matrices/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,11 @@ runs:
echo 'Select server engines to run tests against'
if [[ "${{ github.event_name }}" == "pull_request" || "${{ github.event_name }}" == "push" || "${{ inputs.run-full-matrix }}" == "false" ]]; then
echo 'Pick engines marked as `"run": "always"` only - on PR, push or manually triggered job which does not require full matrix'
jq -c '[.[] | select(.run == "always")]' < .github/json_matrices/engine-matrix.json | awk '{ printf "engine-matrix=%s\n", $1 }' | tee -a $GITHUB_OUTPUT
jq -c '[.[] | select(.run == "always")]' < .github/json_matrices/engine-matrix.json | awk '{ printf "engine-matrix=%s\n", $0 }' | tee -a $GITHUB_OUTPUT
else
echo 'Pick all engines - on cron (schedule) or if manually triggered job requires a full matrix'
jq -c . < .github/json_matrices/engine-matrix.json | awk '{ printf "engine-matrix=%s\n", $1 }' | tee -a $GITHUB_OUTPUT
jq -c . < .github/json_matrices/engine-matrix.json | awk '{ printf "engine-matrix=%s\n", $0 }' | tee -a $GITHUB_OUTPUT
fi
cat $GITHUB_OUTPUT
- name: Load host matrix
id: load-host-matrix
Expand All @@ -57,12 +56,11 @@ runs:
echo 'Select runners (VMs) to run tests on'
if [[ "${{ github.event_name }}" == "pull_request" || "${{ github.event_name }}" == "push" || "${{ inputs.run-full-matrix }}" == "false" ]]; then
echo 'Pick runners marked as '"run": "always"' only - on PR, push or manually triggered job which does not require full matrix'
jq -c '[.[] | select(.run == "always")]' < .github/json_matrices/build-matrix.json | awk '{ printf "host-matrix=%s\n", $1 }' | tee -a $GITHUB_OUTPUT
jq -c '[.[] | select(.run == "always")]' < .github/json_matrices/build-matrix.json | awk '{ printf "host-matrix=%s\n", $0 }' | tee -a $GITHUB_OUTPUT
else
echo 'Pick all runners assigned for the chosen client (language) - on cron (schedule) or if manually triggered job requires a full matrix'
jq -c "[.[] | select(.languages? and any(.languages[] == \"${{ inputs.language-name }}\"; .) and $CONDITION)]" < .github/json_matrices/build-matrix.json | awk '{ printf "host-matrix=%s\n", $1 }' | tee -a $GITHUB_OUTPUT
jq -c "[.[] | select(.languages? and any(.languages[] == \"${{ inputs.language-name }}\"; .) and $CONDITION)]" < .github/json_matrices/build-matrix.json | awk '{ printf "host-matrix=%s\n", $0 }' | tee -a $GITHUB_OUTPUT
fi
cat $GITHUB_OUTPUT
- name: Create language version matrix
id: create-lang-version-matrix
Expand All @@ -72,9 +70,8 @@ runs:
echo 'Select language (framework/SDK) versions to run tests on'
if [[ "${{ github.event_name }}" == "pull_request" || "${{ github.event_name }}" == "push" || "${{ inputs.run-full-matrix }}" == "false" ]]; then
echo 'Pick language versions listed in 'always-run-versions' only - on PR, push or manually triggered job which does not require full matrix'
jq -c '[.[] | select(.language == "${{ inputs.language-name }}") | .["always-run-versions"]][0] // []' < .github/json_matrices/supported-languages-versions.json | awk '{ printf "version-matrix=%s\n", $1 }' | tee -a $GITHUB_OUTPUT
jq -c '[.[] | select(.language == "${{ inputs.language-name }}") | .["always-run-versions"]][0] // []' < .github/json_matrices/supported-languages-versions.json | awk '{ printf "version-matrix=%s\n", $0 }' | tee -a $GITHUB_OUTPUT
else
echo 'Pick language versions listed in 'versions' - on cron (schedule) or if manually triggered job requires a full matrix'
jq -c '[.[] | select(.language == "${{ inputs.language-name }}") | .versions][0]' < .github/json_matrices/supported-languages-versions.json | awk '{ printf "version-matrix=%s\n", $1 }' | tee -a $GITHUB_OUTPUT
jq -c '[.[] | select(.language == "${{ inputs.language-name }}") | .versions][0]' < .github/json_matrices/supported-languages-versions.json | awk '{ printf "version-matrix=%s\n", $0 }' | tee -a $GITHUB_OUTPUT
fi
cat $GITHUB_OUTPUT
19 changes: 18 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#### Changes

* Node, Python, Java: Add allow uncovered slots scanning flag option in cluster scan ([#2814](https://github.com/valkey-io/valkey-glide/pull/2814), [#2815](https://github.com/valkey-io/valkey-glide/pull/2815), [#2860](https://github.com/valkey-io/valkey-glide/pull/2860))
* Go: Add HINCRBY command ([#2847](https://github.com/valkey-io/valkey-glide/pull/2847))
* Go: Add HINCRBYFLOAT command ([#2846](https://github.com/valkey-io/valkey-glide/pull/2846))
Expand All @@ -19,6 +20,23 @@

#### Operational Enhancements

## 1.2.1 (2024-12-29)

#### Changes
* Node, Python, Java: Add allow uncovered slots scanning flag option in cluster scan ([#2814](https://github.com/valkey-io/valkey-glide/pull/2814), [#2815](https://github.com/valkey-io/valkey-glide/pull/2815), [#2860](https://github.com/valkey-io/valkey-glide/pull/2860))
* Java: Bump protobuf (protoc) version ([#2561](https://github.com/valkey-io/valkey-glide/pull/2561), [#2802](https://github.com/valkey-io/valkey-glide/pull/2802))
* Java: bump `netty` version ([#2777](https://github.com/valkey-io/valkey-glide/pull/2777))
* Node: Remove native package references for MacOs x64 architecture ([#2799](https://github.com/valkey-io/valkey-glide/issues/2799))
* Node, Python, Java: Add connection timeout to client configuration ([#2823](https://github.com/valkey-io/valkey-glide/issues/2823))

#### Breaking Changes

#### Fixes
* Core: Fix RESP2 multi-node response from cluster ([#2381](https://github.com/valkey-io/valkey-glide/pull/2381))
* Core: Ensure cluster client creation fail when engine is < 7.0 and sharded subscriptions are configured ([#2819](https://github.com/valkey-io/valkey-glide/pull/2819))

#### Operational Enhancements

## 1.2.0 (2024-11-27)

#### Changes
Expand Down Expand Up @@ -117,7 +135,6 @@

#### Breaking Changes

**None**

#### Fixes

Expand Down
1 change: 1 addition & 0 deletions glide-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ nanoid = "0.4.0"
async-trait = { version = "0.1.24" }
serde_json = "1"
serde = { version = "1", features = ["derive"] }
versions = "6.3"

[features]
socket-layer = [
Expand Down
5 changes: 5 additions & 0 deletions glide-core/redis-rs/redis/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ pub struct GlideConnectionOptions {
/// If ReadFromReplica strategy is set to AZAffinity, this parameter will be set to 'true'.
/// In this case, an INFO command will be triggered in the connection's setup to update the connection's 'availability_zone' property.
pub discover_az: bool,
/// Connection timeout duration.
///
/// This optional field sets the maximum duration to wait when attempting to establish
/// a connection. If `None`, the connection will use `DEFAULT_CONNECTION_TIMEOUT`.
pub connection_timeout: Option<Duration>,
}

/// To enable async support you need to enable the feature: `tokio-comp`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ where
push_sender: None,
disconnect_notifier,
discover_az,
connection_timeout: Some(params.connection_timeout),
},
)
.await
Expand Down
1 change: 1 addition & 0 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,7 @@ where
push_sender,
disconnect_notifier,
discover_az,
connection_timeout: Some(cluster_params.connection_timeout),
};

let connections = Self::create_initial_connections(
Expand Down
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/src/cluster_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ fn base_routing(cmd: &[u8]) -> RouteBy {
| b"FUNCTION STATS" => RouteBy::AllNodes,

b"DBSIZE"
| b"DEBUG"
| b"FLUSHALL"
| b"FLUSHDB"
| b"FT._ALIASLIST"
Expand Down Expand Up @@ -717,7 +718,6 @@ fn base_routing(cmd: &[u8]) -> RouteBy {
| b"COMMAND LIST"
| b"COMMAND"
| b"CONFIG GET"
| b"DEBUG"
| b"ECHO"
| b"FUNCTION LIST"
| b"LASTSAVE"
Expand Down
69 changes: 62 additions & 7 deletions glide-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use redis::cluster_routing::{
};
use redis::cluster_slotmap::ReadFromReplicaStrategy;
use redis::{
ClusterScanArgs, Cmd, ErrorKind, PushInfo, RedisError, RedisResult, ScanStateRC, Value,
ClusterScanArgs, Cmd, ErrorKind, FromRedisValue, PushInfo, RedisError, RedisResult,
ScanStateRC, Value,
};
pub use standalone_client::StandaloneClient;
use std::io;
Expand All @@ -26,14 +27,17 @@ use self::value_conversion::{convert_to_expected_type, expected_type_for_cmd, ge
mod reconnecting_connection;
mod standalone_client;
mod value_conversion;
use redis::InfoDict;
use tokio::sync::mpsc;
use versions::Versioning;

pub const HEARTBEAT_SLEEP_DURATION: Duration = Duration::from_secs(1);
pub const DEFAULT_RETRIES: u32 = 3;
/// Note: If you change the default value, make sure to change the documentation in *all* wrappers.
pub const DEFAULT_RESPONSE_TIMEOUT: Duration = Duration::from_millis(250);
pub const DEFAULT_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_millis(250);
pub const DEFAULT_PERIODIC_TOPOLOGY_CHECKS_INTERVAL: Duration = Duration::from_secs(60);
pub const INTERNAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(250);
/// Note: If you change the default value, make sure to change the documentation in *all* wrappers.
pub const DEFAULT_CONNECTION_TIMEOUT: Duration = Duration::from_millis(250);
pub const FINISHED_SCAN_CURSOR: &str = "finished";

/// The value of 1000 for the maximum number of inflight requests is determined based on Little's Law in queuing theory:
Expand Down Expand Up @@ -568,8 +572,9 @@ async fn create_cluster_client(
Some(PeriodicCheck::ManualInterval(interval)) => Some(interval),
None => Some(DEFAULT_PERIODIC_TOPOLOGY_CHECKS_INTERVAL),
};
let connection_timeout = to_duration(request.connection_timeout, DEFAULT_CONNECTION_TIMEOUT);
let mut builder = redis::cluster::ClusterClientBuilder::new(initial_nodes)
.connection_timeout(INTERNAL_CONNECTION_TIMEOUT)
.connection_timeout(connection_timeout)
.retries(DEFAULT_RETRIES);
let read_from_strategy = request.read_from.unwrap_or_default();
builder = builder.read_from(match read_from_strategy {
Expand All @@ -592,15 +597,63 @@ async fn create_cluster_client(
};
builder = builder.tls(tls);
}
if let Some(pubsub_subscriptions) = redis_connection_info.pubsub_subscriptions {
if let Some(pubsub_subscriptions) = redis_connection_info.pubsub_subscriptions.clone() {
builder = builder.pubsub_subscriptions(pubsub_subscriptions);
}

// Always use with Glide
builder = builder.periodic_connections_checks(CONNECTION_CHECKS_INTERVAL);

let client = builder.build()?;
client.get_async_connection(push_sender).await
let mut con = client.get_async_connection(push_sender).await?;

// This validation ensures that sharded subscriptions are not applied to Redis engines older than version 7.0,
// preventing scenarios where the client becomes inoperable or, worse, unaware that sharded pubsub messages are not being received.
// The issue arises because `client.get_async_connection()` might succeed even if the engine does not support sharded pubsub.
// For example, initial connections may exclude the target node for sharded subscriptions, allowing the creation to succeed,
// but subsequent resubscription tasks will fail when `setup_connection()` cannot establish a connection to the node.
//
// One approach to handle this would be to check the engine version inside `setup_connection()` and skip applying sharded subscriptions.
// However, this approach would leave the application unaware that the subscriptions were not applied, requiring the user to analyze logs to identify the issue.
// Instead, we explicitly check the engine version here and fail the connection creation if it is incompatible with sharded subscriptions.

if let Some(pubsub_subscriptions) = redis_connection_info.pubsub_subscriptions {
if pubsub_subscriptions.contains_key(&redis::PubSubSubscriptionKind::Sharded) {
let info_res = con
.route_command(
redis::cmd("INFO").arg("SERVER"),
RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random),
)
.await?;
let info_dict: InfoDict = FromRedisValue::from_redis_value(&info_res)?;
match info_dict.get::<String>("redis_version") {
Some(version) => match (Versioning::new(version), Versioning::new("7.0")) {
(Some(server_ver), Some(min_ver)) => {
if server_ver < min_ver {
return Err(RedisError::from((
ErrorKind::InvalidClientConfig,
"Sharded subscriptions provided, but the engine version is < 7.0",
)));
}
}
_ => {
return Err(RedisError::from((
ErrorKind::ResponseError,
"Failed to parse engine version",
)))
}
},
_ => {
return Err(RedisError::from((
ErrorKind::ResponseError,
"Could not determine engine version from INFO result",
)))
}
}
}
}

Ok(con)
}

#[derive(thiserror::Error)]
Expand Down Expand Up @@ -667,6 +720,8 @@ fn sanitized_request_string(request: &ConnectionRequest) -> String {
"\nStandalone mode"
};
let request_timeout = format_optional_value("Request timeout", request.request_timeout);
let connection_timeout =
format_optional_value("Connection timeout", request.connection_timeout);
let database_id = format!("\ndatabase ID: {}", request.database_id);
let rfr_strategy = request
.read_from
Expand Down Expand Up @@ -723,7 +778,7 @@ fn sanitized_request_string(request: &ConnectionRequest) -> String {
);

format!(
"\nAddresses: {addresses}{tls_mode}{cluster_mode}{request_timeout}{rfr_strategy}{connection_retry_strategy}{database_id}{protocol}{client_name}{periodic_checks}{pubsub_subscriptions}{inflight_requests_limit}",
"\nAddresses: {addresses}{tls_mode}{cluster_mode}{request_timeout}{connection_timeout}{rfr_strategy}{connection_retry_strategy}{database_id}{protocol}{client_name}{periodic_checks}{pubsub_subscriptions}{inflight_requests_limit}",
)
}

Expand Down
20 changes: 17 additions & 3 deletions glide-core/src/client/reconnecting_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tokio::task;
use tokio::time::timeout;
use tokio_retry2::{Retry, RetryError};

use super::{run_with_timeout, DEFAULT_CONNECTION_ATTEMPT_TIMEOUT};
use super::{run_with_timeout, DEFAULT_CONNECTION_TIMEOUT};

/// The reason behind the call to `reconnect()`
#[derive(PartialEq, Eq, Debug, Clone)]
Expand Down Expand Up @@ -71,7 +71,11 @@ async fn get_multiplexed_connection(
connection_options: &GlideConnectionOptions,
) -> RedisResult<MultiplexedConnection> {
run_with_timeout(
Some(DEFAULT_CONNECTION_ATTEMPT_TIMEOUT),
Some(
connection_options
.connection_timeout
.unwrap_or(DEFAULT_CONNECTION_TIMEOUT),
),
client.get_multiplexed_async_connection(connection_options.clone()),
)
.await
Expand Down Expand Up @@ -113,6 +117,7 @@ async fn create_connection(
retry_strategy: RetryStrategy,
push_sender: Option<mpsc::UnboundedSender<PushInfo>>,
discover_az: bool,
connection_timeout: Duration,
) -> Result<ReconnectingConnection, (ReconnectingConnection, RedisError)> {
let client = &connection_backend.connection_info;
let connection_options = GlideConnectionOptions {
Expand All @@ -121,6 +126,7 @@ async fn create_connection(
TokioDisconnectNotifier::new(),
)),
discover_az,
connection_timeout: Some(connection_timeout),
};
let action = || async {
get_multiplexed_connection(client, &connection_options)
Expand Down Expand Up @@ -206,6 +212,7 @@ impl ReconnectingConnection {
tls_mode: TlsMode,
push_sender: Option<mpsc::UnboundedSender<PushInfo>>,
discover_az: bool,
connection_timeout: Duration,
) -> Result<ReconnectingConnection, (ReconnectingConnection, RedisError)> {
log_debug(
"connection creation",
Expand All @@ -218,7 +225,14 @@ impl ReconnectingConnection {
connection_available_signal: ManualResetEvent::new(true),
client_dropped_flagged: AtomicBool::new(false),
};
create_connection(backend, connection_retry_strategy, push_sender, discover_az).await
create_connection(
backend,
connection_retry_strategy,
push_sender,
discover_az,
connection_timeout,
)
.await
}

pub(crate) fn node_address(&self) -> String {
Expand Down
10 changes: 10 additions & 0 deletions glide-core/src/client/standalone_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use super::get_redis_connection_info;
use super::reconnecting_connection::{ReconnectReason, ReconnectingConnection};
use super::{to_duration, DEFAULT_CONNECTION_TIMEOUT};
use super::{ConnectionRequest, NodeAddress, TlsMode};
use crate::client::types::ReadFrom as ClientReadFrom;
use crate::retry_strategies::RetryStrategy;
Expand All @@ -15,6 +16,7 @@ use redis::{PushInfo, RedisError, RedisResult, Value};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use telemetrylib::Telemetry;
use tokio::sync::mpsc;
use tokio::task;
Expand Down Expand Up @@ -130,6 +132,11 @@ impl StandaloneClient {
Some(ClientReadFrom::AZAffinity(_))
);

let connection_timeout = to_duration(
connection_request.connection_timeout,
DEFAULT_CONNECTION_TIMEOUT,
);

let mut stream = stream::iter(connection_request.addresses.iter())
.map(|address| async {
get_connection_and_replication_info(
Expand All @@ -143,6 +150,7 @@ impl StandaloneClient {
tls_mode.unwrap_or(TlsMode::NoTls),
&push_sender,
discover_az,
connection_timeout,
)
.await
.map_err(|err| (format!("{}:{}", address.host, address.port), err))
Expand Down Expand Up @@ -552,6 +560,7 @@ async fn get_connection_and_replication_info(
tls_mode: TlsMode,
push_sender: &Option<mpsc::UnboundedSender<PushInfo>>,
discover_az: bool,
connection_timeout: Duration,
) -> Result<(ReconnectingConnection, Value), (ReconnectingConnection, RedisError)> {
let result = ReconnectingConnection::new(
address,
Expand All @@ -560,6 +569,7 @@ async fn get_connection_and_replication_info(
tls_mode,
push_sender.clone(),
discover_az,
connection_timeout,
)
.await;
let reconnecting_connection = match result {
Expand Down
3 changes: 3 additions & 0 deletions glide-core/src/client/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub struct ConnectionRequest {
pub addresses: Vec<NodeAddress>,
pub cluster_mode_enabled: bool,
pub request_timeout: Option<u32>,
pub connection_timeout: Option<u32>,
pub connection_retry_strategy: Option<ConnectionRetryStrategy>,
pub periodic_checks: Option<PeriodicCheck>,
pub pubsub_subscriptions: Option<redis::PubSubSubscriptionInfo>,
Expand Down Expand Up @@ -147,6 +148,7 @@ impl From<protobuf::ConnectionRequest> for ConnectionRequest {
.collect();
let cluster_mode_enabled = value.cluster_mode_enabled;
let request_timeout = none_if_zero(value.request_timeout);
let connection_timeout = none_if_zero(value.connection_timeout);
let connection_retry_strategy =
value
.connection_retry_strategy
Expand Down Expand Up @@ -214,6 +216,7 @@ impl From<protobuf::ConnectionRequest> for ConnectionRequest {
addresses,
cluster_mode_enabled,
request_timeout,
connection_timeout,
connection_retry_strategy,
periodic_checks,
pubsub_subscriptions,
Expand Down
Loading

0 comments on commit ec6fa62

Please sign in to comment.