Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge 1 2 1 to main #2886

Merged
merged 19 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions .github/workflows/create-test-matrices/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,11 @@ runs:
echo 'Select server engines to run tests against'
if [[ "${{ github.event_name }}" == "pull_request" || "${{ github.event_name }}" == "push" || "${{ inputs.run-full-matrix }}" == "false" ]]; then
echo 'Pick engines marked as `"run": "always"` only - on PR, push or manually triggered job which does not require full matrix'
jq -c '[.[] | select(.run == "always")]' < .github/json_matrices/engine-matrix.json | awk '{ printf "engine-matrix=%s\n", $1 }' | tee -a $GITHUB_OUTPUT
jq -c '[.[] | select(.run == "always")]' < .github/json_matrices/engine-matrix.json | awk '{ printf "engine-matrix=%s\n", $0 }' | tee -a $GITHUB_OUTPUT
else
echo 'Pick all engines - on cron (schedule) or if manually triggered job requires a full matrix'
jq -c . < .github/json_matrices/engine-matrix.json | awk '{ printf "engine-matrix=%s\n", $1 }' | tee -a $GITHUB_OUTPUT
jq -c . < .github/json_matrices/engine-matrix.json | awk '{ printf "engine-matrix=%s\n", $0 }' | tee -a $GITHUB_OUTPUT
fi
cat $GITHUB_OUTPUT

- name: Load host matrix
id: load-host-matrix
Expand All @@ -57,12 +56,11 @@ runs:
echo 'Select runners (VMs) to run tests on'
if [[ "${{ github.event_name }}" == "pull_request" || "${{ github.event_name }}" == "push" || "${{ inputs.run-full-matrix }}" == "false" ]]; then
echo 'Pick runners marked as '"run": "always"' only - on PR, push or manually triggered job which does not require full matrix'
jq -c '[.[] | select(.run == "always")]' < .github/json_matrices/build-matrix.json | awk '{ printf "host-matrix=%s\n", $1 }' | tee -a $GITHUB_OUTPUT
jq -c '[.[] | select(.run == "always")]' < .github/json_matrices/build-matrix.json | awk '{ printf "host-matrix=%s\n", $0 }' | tee -a $GITHUB_OUTPUT
else
echo 'Pick all runners assigned for the chosen client (language) - on cron (schedule) or if manually triggered job requires a full matrix'
jq -c "[.[] | select(.languages? and any(.languages[] == \"${{ inputs.language-name }}\"; .) and $CONDITION)]" < .github/json_matrices/build-matrix.json | awk '{ printf "host-matrix=%s\n", $1 }' | tee -a $GITHUB_OUTPUT
jq -c "[.[] | select(.languages? and any(.languages[] == \"${{ inputs.language-name }}\"; .) and $CONDITION)]" < .github/json_matrices/build-matrix.json | awk '{ printf "host-matrix=%s\n", $0 }' | tee -a $GITHUB_OUTPUT
fi
cat $GITHUB_OUTPUT

- name: Create language version matrix
id: create-lang-version-matrix
Expand All @@ -72,9 +70,8 @@ runs:
echo 'Select language (framework/SDK) versions to run tests on'
if [[ "${{ github.event_name }}" == "pull_request" || "${{ github.event_name }}" == "push" || "${{ inputs.run-full-matrix }}" == "false" ]]; then
echo 'Pick language versions listed in 'always-run-versions' only - on PR, push or manually triggered job which does not require full matrix'
jq -c '[.[] | select(.language == "${{ inputs.language-name }}") | .["always-run-versions"]][0] // []' < .github/json_matrices/supported-languages-versions.json | awk '{ printf "version-matrix=%s\n", $1 }' | tee -a $GITHUB_OUTPUT
jq -c '[.[] | select(.language == "${{ inputs.language-name }}") | .["always-run-versions"]][0] // []' < .github/json_matrices/supported-languages-versions.json | awk '{ printf "version-matrix=%s\n", $0 }' | tee -a $GITHUB_OUTPUT
else
echo 'Pick language versions listed in 'versions' - on cron (schedule) or if manually triggered job requires a full matrix'
jq -c '[.[] | select(.language == "${{ inputs.language-name }}") | .versions][0]' < .github/json_matrices/supported-languages-versions.json | awk '{ printf "version-matrix=%s\n", $1 }' | tee -a $GITHUB_OUTPUT
jq -c '[.[] | select(.language == "${{ inputs.language-name }}") | .versions][0]' < .github/json_matrices/supported-languages-versions.json | awk '{ printf "version-matrix=%s\n", $0 }' | tee -a $GITHUB_OUTPUT
fi
cat $GITHUB_OUTPUT
4 changes: 4 additions & 0 deletions .github/workflows/java-cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ jobs:
host: ${{ fromJson(needs.load-platform-matrix.outputs.PLATFORM_MATRIX) }}
runs-on: ${{ matrix.host.RUNNER }}
steps:
- name: Setup self-hosted runner access
if: ${{matrix.host.TARGET == 'aarch64-unknown-linux-gnu' }}
run: sudo chown -R $USER:$USER /home/ubuntu/action-runner-ilia/_work/valkey-glide

- name: Checkout
uses: actions/checkout@v4

Expand Down
20 changes: 11 additions & 9 deletions .github/workflows/npm-cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -308,23 +308,18 @@ jobs:
if: ${{ matrix.build.TARGET == 'aarch64-unknown-linux-gnu' }}
run: sudo chown -R $USER:$USER /home/ubuntu/actions-runner/_work/valkey-glide

- name: install Redis and git for alpine
- name: install redis and git for alpine
if: ${{ contains(matrix.build.TARGET, 'musl') }}
run: |
apk update
apk add redis git
apk add git redis
node -v
- name: install Redis and Python for ubuntu
- name: install Python for ubuntu
if: ${{ contains(matrix.build.TARGET, 'linux-gnu') }}
run: |
sudo apt-get update
sudo apt-get install redis-server python3
- name: install Redis, Python for macos
if: ${{ contains(matrix.build.RUNNER, 'mac') }}
run: |
brew install redis python3
sudo apt-get install python3
- name: Checkout
if: ${{ matrix.build.TARGET != 'aarch64-unknown-linux-musl'}}
Expand All @@ -339,6 +334,13 @@ jobs:
npm-auth-token: ${{ secrets.NPM_AUTH_TOKEN }}
arch: ${{ matrix.build.ARCH }}

- name: Install engine
if: ${{ !contains(matrix.build.TARGET, 'musl') }}
uses: ./.github/workflows/install-engine
with:
engine-version: "8.0"
target: ${{ matrix.build.target }}

- name: Setup node
if: ${{ !contains(matrix.build.TARGET, 'musl') }}
uses: actions/setup-node@v4
Expand Down
29 changes: 5 additions & 24 deletions .github/workflows/pypi-cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -232,31 +232,12 @@ jobs:
with:
python-version: 3.12

- name: Install engine Ubuntu ARM
if: ${{ matrix.build.TARGET == 'aarch64-unknown-linux-gnu' }}
shell: bash
# in self hosted runner we first want to check that engine is not already installed
run: |
if [[ $(`which redis-server`) == '' ]]
then
sudo apt-get update
sudo apt-get install -y redis-server
else
echo "Redis is already installed"
fi
- name: Install engine Ubuntu x86
if: ${{ matrix.build.TARGET == 'x86_64-unknown-linux-gnu' }}
shell: bash
run: |
sudo apt-get update
sudo apt-get install -y redis-server
- name: Install engine
uses: ./.github/workflows/install-engine
with:
engine-version: "8.0"
target: ${{ matrix.build.target }}

- name: Install engine MacOS
if: ${{ matrix.build.OS == 'macos' }}
shell: bash
run: |
brew install redis

- name: Check if RC and set a distribution tag for the package
shell: bash
Expand Down
24 changes: 20 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
#### Changes
* Node, Python, Java: Add allow uncovered slots scanning flag option in cluster scan ([#2814](https://github.com/valkey-io/valkey-glide/pull/2814), [#2815](https://github.com/valkey-io/valkey-glide/pull/2815), [#2860](https://github.com/valkey-io/valkey-glide/pull/2860))

* Go: Add HINCRBY command ([#2847](https://github.com/valkey-io/valkey-glide/pull/2847))
* Go: Add HINCRBYFLOAT command ([#2846](https://github.com/valkey-io/valkey-glide/pull/2846))
* Go: Add SUNIONSTORE command ([#2805](https://github.com/valkey-io/valkey-glide/pull/2805))
* Go: Add SUNION ([#2787](https://github.com/valkey-io/valkey-glide/pull/2787))
* Java: bump `netty` version ([#2795](https://github.com/valkey-io/valkey-glide/pull/2795))
* Java: Bump protobuf (protoc) version ([#2796](https://github.com/valkey-io/valkey-glide/pull/2796), [#2800](https://github.com/valkey-io/valkey-glide/pull/2800))
* Go: Add `SInterStore` ([#2779](https://github.com/valkey-io/valkey-glide/issues/2779))
* Node: Remove native package references for MacOs x64 architecture ([#2799](https://github.com/valkey-io/valkey-glide/issues/2799))
* Go: Add `ZIncrBy` command ([#2830](https://github.com/valkey-io/valkey-glide/pull/2830))
* Go: Add `SScan` and `SMove` ([#2789](https://github.com/valkey-io/valkey-glide/issues/2789))
* Go: Add `ZADD` ([#2813](https://github.com/valkey-io/valkey-glide/issues/2813))
Expand All @@ -19,6 +18,25 @@

#### Operational Enhancements

## 1.2.1 (2024-12-29)

#### Changes

* Node, Python, Java: Add allow uncovered slots scanning flag option in cluster scan ([#2814](https://github.com/valkey-io/valkey-glide/pull/2814), [#2815](https://github.com/valkey-io/valkey-glide/pull/2815), [#2860](https://github.com/valkey-io/valkey-glide/pull/2860))
* Java: Bump protobuf (protoc) version ([#2561](https://github.com/valkey-io/valkey-glide/pull/2561), [#2802](https://github.com/valkey-io/valkey-glide/pull/2802))
* Java: bump `netty` version ([#2777](https://github.com/valkey-io/valkey-glide/pull/2777))
* Node: Remove native package references for MacOs x64 architecture ([#2799](https://github.com/valkey-io/valkey-glide/issues/2799))
* Node, Python, Java: Add connection timeout to client configuration ([#2823](https://github.com/valkey-io/valkey-glide/issues/2823))

#### Breaking Changes

#### Fixes

* Core: Fix RESP2 multi-node response from cluster ([#2381](https://github.com/valkey-io/valkey-glide/pull/2381))
* Core: Ensure cluster client creation fail when engine is < 7.0 and sharded subscriptions are configured ([#2819](https://github.com/valkey-io/valkey-glide/pull/2819))

#### Operational Enhancements

## 1.2.0 (2024-11-27)

#### Changes
Expand Down Expand Up @@ -117,8 +135,6 @@

#### Breaking Changes

**None**

#### Fixes

* Core: UDS Socket Handling Rework ([#2482](https://github.com/valkey-io/valkey-glide/pull/2482))
Expand Down
1 change: 1 addition & 0 deletions glide-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ nanoid = "0.4.0"
async-trait = { version = "0.1.24" }
serde_json = "1"
serde = { version = "1", features = ["derive"] }
versions = "6.3"

[features]
socket-layer = [
Expand Down
5 changes: 5 additions & 0 deletions glide-core/redis-rs/redis/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ pub struct GlideConnectionOptions {
/// If ReadFromReplica strategy is set to AZAffinity, this parameter will be set to 'true'.
/// In this case, an INFO command will be triggered in the connection's setup to update the connection's 'availability_zone' property.
pub discover_az: bool,
/// Connection timeout duration.
///
/// This optional field sets the maximum duration to wait when attempting to establish
/// a connection. If `None`, the connection will use `DEFAULT_CONNECTION_TIMEOUT`.
pub connection_timeout: Option<Duration>,
}

/// To enable async support you need to enable the feature: `tokio-comp`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ where
push_sender: None,
disconnect_notifier,
discover_az,
connection_timeout: Some(params.connection_timeout),
},
)
.await
Expand Down
1 change: 1 addition & 0 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,7 @@ where
push_sender,
disconnect_notifier,
discover_az,
connection_timeout: Some(cluster_params.connection_timeout),
};

let connections = Self::create_initial_connections(
Expand Down
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/src/cluster_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ fn base_routing(cmd: &[u8]) -> RouteBy {
| b"FUNCTION STATS" => RouteBy::AllNodes,

b"DBSIZE"
| b"DEBUG"
| b"FLUSHALL"
| b"FLUSHDB"
| b"FT._ALIASLIST"
Expand Down Expand Up @@ -717,7 +718,6 @@ fn base_routing(cmd: &[u8]) -> RouteBy {
| b"COMMAND LIST"
| b"COMMAND"
| b"CONFIG GET"
| b"DEBUG"
| b"ECHO"
| b"FUNCTION LIST"
| b"LASTSAVE"
Expand Down
69 changes: 62 additions & 7 deletions glide-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use redis::cluster_routing::{
};
use redis::cluster_slotmap::ReadFromReplicaStrategy;
use redis::{
ClusterScanArgs, Cmd, ErrorKind, PushInfo, RedisError, RedisResult, ScanStateRC, Value,
ClusterScanArgs, Cmd, ErrorKind, FromRedisValue, PushInfo, RedisError, RedisResult,
ScanStateRC, Value,
};
pub use standalone_client::StandaloneClient;
use std::io;
Expand All @@ -26,14 +27,17 @@ use self::value_conversion::{convert_to_expected_type, expected_type_for_cmd, ge
mod reconnecting_connection;
mod standalone_client;
mod value_conversion;
use redis::InfoDict;
use tokio::sync::mpsc;
use versions::Versioning;

pub const HEARTBEAT_SLEEP_DURATION: Duration = Duration::from_secs(1);
pub const DEFAULT_RETRIES: u32 = 3;
/// Note: If you change the default value, make sure to change the documentation in *all* wrappers.
pub const DEFAULT_RESPONSE_TIMEOUT: Duration = Duration::from_millis(250);
pub const DEFAULT_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_millis(250);
pub const DEFAULT_PERIODIC_TOPOLOGY_CHECKS_INTERVAL: Duration = Duration::from_secs(60);
pub const INTERNAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(250);
/// Note: If you change the default value, make sure to change the documentation in *all* wrappers.
pub const DEFAULT_CONNECTION_TIMEOUT: Duration = Duration::from_millis(250);
pub const FINISHED_SCAN_CURSOR: &str = "finished";

/// The value of 1000 for the maximum number of inflight requests is determined based on Little's Law in queuing theory:
Expand Down Expand Up @@ -568,8 +572,9 @@ async fn create_cluster_client(
Some(PeriodicCheck::ManualInterval(interval)) => Some(interval),
None => Some(DEFAULT_PERIODIC_TOPOLOGY_CHECKS_INTERVAL),
};
let connection_timeout = to_duration(request.connection_timeout, DEFAULT_CONNECTION_TIMEOUT);
let mut builder = redis::cluster::ClusterClientBuilder::new(initial_nodes)
.connection_timeout(INTERNAL_CONNECTION_TIMEOUT)
.connection_timeout(connection_timeout)
.retries(DEFAULT_RETRIES);
let read_from_strategy = request.read_from.unwrap_or_default();
builder = builder.read_from(match read_from_strategy {
Expand All @@ -592,15 +597,63 @@ async fn create_cluster_client(
};
builder = builder.tls(tls);
}
if let Some(pubsub_subscriptions) = redis_connection_info.pubsub_subscriptions {
if let Some(pubsub_subscriptions) = redis_connection_info.pubsub_subscriptions.clone() {
builder = builder.pubsub_subscriptions(pubsub_subscriptions);
}

// Always use with Glide
builder = builder.periodic_connections_checks(CONNECTION_CHECKS_INTERVAL);

let client = builder.build()?;
client.get_async_connection(push_sender).await
let mut con = client.get_async_connection(push_sender).await?;

// This validation ensures that sharded subscriptions are not applied to Redis engines older than version 7.0,
// preventing scenarios where the client becomes inoperable or, worse, unaware that sharded pubsub messages are not being received.
// The issue arises because `client.get_async_connection()` might succeed even if the engine does not support sharded pubsub.
// For example, initial connections may exclude the target node for sharded subscriptions, allowing the creation to succeed,
// but subsequent resubscription tasks will fail when `setup_connection()` cannot establish a connection to the node.
//
// One approach to handle this would be to check the engine version inside `setup_connection()` and skip applying sharded subscriptions.
// However, this approach would leave the application unaware that the subscriptions were not applied, requiring the user to analyze logs to identify the issue.
// Instead, we explicitly check the engine version here and fail the connection creation if it is incompatible with sharded subscriptions.

if let Some(pubsub_subscriptions) = redis_connection_info.pubsub_subscriptions {
if pubsub_subscriptions.contains_key(&redis::PubSubSubscriptionKind::Sharded) {
let info_res = con
.route_command(
redis::cmd("INFO").arg("SERVER"),
RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random),
)
.await?;
let info_dict: InfoDict = FromRedisValue::from_redis_value(&info_res)?;
match info_dict.get::<String>("redis_version") {
Some(version) => match (Versioning::new(version), Versioning::new("7.0")) {
(Some(server_ver), Some(min_ver)) => {
if server_ver < min_ver {
return Err(RedisError::from((
ErrorKind::InvalidClientConfig,
"Sharded subscriptions provided, but the engine version is < 7.0",
)));
}
}
_ => {
return Err(RedisError::from((
ErrorKind::ResponseError,
"Failed to parse engine version",
)))
}
},
_ => {
return Err(RedisError::from((
ErrorKind::ResponseError,
"Could not determine engine version from INFO result",
)))
}
}
}
}

Ok(con)
}

#[derive(thiserror::Error)]
Expand Down Expand Up @@ -667,6 +720,8 @@ fn sanitized_request_string(request: &ConnectionRequest) -> String {
"\nStandalone mode"
};
let request_timeout = format_optional_value("Request timeout", request.request_timeout);
let connection_timeout =
format_optional_value("Connection timeout", request.connection_timeout);
let database_id = format!("\ndatabase ID: {}", request.database_id);
let rfr_strategy = request
.read_from
Expand Down Expand Up @@ -723,7 +778,7 @@ fn sanitized_request_string(request: &ConnectionRequest) -> String {
);

format!(
"\nAddresses: {addresses}{tls_mode}{cluster_mode}{request_timeout}{rfr_strategy}{connection_retry_strategy}{database_id}{protocol}{client_name}{periodic_checks}{pubsub_subscriptions}{inflight_requests_limit}",
"\nAddresses: {addresses}{tls_mode}{cluster_mode}{request_timeout}{connection_timeout}{rfr_strategy}{connection_retry_strategy}{database_id}{protocol}{client_name}{periodic_checks}{pubsub_subscriptions}{inflight_requests_limit}",
)
}

Expand Down
Loading
Loading