Skip to content

Commit

Permalink
Core + Python: Enable cluster periodic checks by default, add python …
Browse files Browse the repository at this point in the history
…configuration
  • Loading branch information
barshaul committed Mar 10, 2024
1 parent b1ef52b commit 8048ec7
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 7 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
* Python, Node: Added PTTL command ([#1036](https://github.com/aws/glide-for-redis/pull/1036), [#1082](https://github.com/aws/glide-for-redis/pull/1082))
* Node: Added HVAL command ([#1022](https://github.com/aws/glide-for-redis/pull/1022))
* Node: Added PERSIST command ([#1023](https://github.com/aws/glide-for-redis/pull/1023))
* Core: Enabled Cluster Mode periodic checks by default ([#1089](https://github.com/aws/glide-for-redis/pull/1089))

#### Features

* Python: Allow chaining function calls on transaction. ([#987](https://github.com/aws/glide-for-redis/pull/987))
* Python: Added Cluster Mode configuration for periodic checks interval ([#1089](https://github.com/aws/glide-for-redis/pull/1089))


## 0.2.0 (2024-02-11)

Expand Down
39 changes: 36 additions & 3 deletions glide-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0
*/
use crate::connection_request::{
ConnectionRequest, NodeAddress, ProtocolVersion, ReadFrom, TlsMode,
connection_request, ConnectionRequest, NodeAddress, ProtocolVersion, ReadFrom, TlsMode,
};
use crate::scripts_container::get_script;
use futures::FutureExt;
Expand All @@ -26,6 +26,7 @@ pub const HEARTBEAT_SLEEP_DURATION: Duration = Duration::from_secs(1);

pub const DEFAULT_RESPONSE_TIMEOUT: Duration = Duration::from_millis(250);
pub const DEFAULT_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_millis(250);
pub const DEFAULT_PERIODIC_CHECKS_INTERVAL: Duration = Duration::from_secs(60);
pub const INTERNAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(250);

pub(super) fn get_port(address: &NodeAddress) -> u16 {
Expand Down Expand Up @@ -284,11 +285,23 @@ async fn create_cluster_client(
.collect();
let read_from = request.read_from.enum_value().unwrap_or(ReadFrom::Primary);
let read_from_replicas = !matches!(read_from, ReadFrom::Primary,); // TODO - implement different read from replica strategies.
let periodic_checks = match request.periodic_checks {
Some(periodic_checks) => match periodic_checks {
connection_request::Periodic_checks::PeriodicChecksManualInterval(interval) => {
Some(Duration::from_secs(interval.duration_in_sec.into()))
}
connection_request::Periodic_checks::PeriodicChecksDisabled(_) => None,
},
None => Some(DEFAULT_PERIODIC_CHECKS_INTERVAL),
};
let mut builder = redis::cluster::ClusterClientBuilder::new(initial_nodes)
.connection_timeout(INTERNAL_CONNECTION_TIMEOUT);
if read_from_replicas {
builder = builder.read_from_replicas();
}
if let Some(interval_duration) = periodic_checks {
builder = builder.periodic_topology_checks(interval_duration);
}
builder = builder.use_protocol(convert_to_redis_protocol(
request.protocol.enum_value_or_default(),
));
Expand Down Expand Up @@ -388,7 +401,6 @@ fn sanitized_request_string(request: &ConnectionRequest) -> String {
let connection_retry_strategy = request.connection_retry_strategy.0.as_ref().map(|strategy|
format!("\nreconnect backoff strategy: number of increasing duration retries: {}, base: {}, factor: {}",
strategy.number_of_retries, strategy.exponent_base, strategy.factor)).unwrap_or_default();

let protocol = request
.protocol
.enum_value()
Expand All @@ -397,9 +409,30 @@ fn sanitized_request_string(request: &ConnectionRequest) -> String {
let client_name = chars_to_string_option(&request.client_name)
.map(|client_name| format!("\nClient name: {client_name}"))
.unwrap_or_default();
let periodic_checks = if request.cluster_mode_enabled {
match request.periodic_checks {
Some(ref periodic_checks) => match periodic_checks {
connection_request::Periodic_checks::PeriodicChecksManualInterval(interval) => {
format!(
"\nPeriodic Checks: Enabled with manual interval of {:?}s",
interval.duration_in_sec
)
}
connection_request::Periodic_checks::PeriodicChecksDisabled(_) => {
"\nPeriodic Checks: Disabled".to_string()
}
},
None => format!(
"\nPeriodic Checks: Enabled with default interval of {:?}",
DEFAULT_PERIODIC_CHECKS_INTERVAL
),
}
} else {
"".to_string()
};

format!(
"\nAddresses: {addresses}{tls_mode}{cluster_mode}{request_timeout}{rfr_strategy}{connection_retry_strategy}{database_id}{protocol}{client_name}",
"\nAddresses: {addresses}{tls_mode}{cluster_mode}{request_timeout}{rfr_strategy}{connection_retry_strategy}{database_id}{protocol}{client_name}{periodic_checks}",
)
}

Expand Down
11 changes: 11 additions & 0 deletions glide-core/src/protobuf/connection_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ enum ProtocolVersion {
RESP2 = 1;
}

message PeriodicChecksManualInterval {
uint32 duration_in_sec = 1;
}

message PeriodicChecksDisabled {
}

// IMPORTANT - if you add fields here, you probably need to add them also in client/mod.rs:`sanitized_request_string`.
message ConnectionRequest {
repeated NodeAddress addresses = 1;
Expand All @@ -41,6 +48,10 @@ message ConnectionRequest {
uint32 database_id = 8;
ProtocolVersion protocol = 9;
string client_name = 10;
oneof periodic_checks {
PeriodicChecksManualInterval periodic_checks_manual_interval = 11;
PeriodicChecksDisabled periodic_checks_disabled = 12;
}
}

message ConnectionRetryStrategy {
Expand Down
55 changes: 52 additions & 3 deletions python/python/glide/config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0

from enum import Enum
from typing import List, Optional
from typing import List, Optional, Union

from glide.protobuf.connection_request_pb2 import ConnectionRequest
from glide.protobuf.connection_request_pb2 import ProtocolVersion as SentProtocolVersion
Expand Down Expand Up @@ -92,6 +92,33 @@ def __init__(
self.username = username


class PeriodicChecksManualInterval:
def __init__(self, duration_in_sec: int) -> None:
"""
Represents a manually configured interval for periodic checks.
Args:
duration_in_sec (int): The duration in seconds for the interval between periodic checks.
"""
self.duration_in_sec = duration_in_sec


class PeriodicChecksStatus(Enum):
"""
Represents the cluster's periodic checks status.
To configure specific interval, see PeriodicChecksManualInterval.
"""

ENABLED_DEFAULT_INTERVAL = 0
"""
Periodic checks are enabled with the default interval between checks.
"""
DISABLED = 1
"""
Periodic checks are disabled.
"""


class BaseClientConfiguration:
def __init__(
self,
Expand Down Expand Up @@ -191,7 +218,7 @@ class RedisClientConfiguration(BaseClientConfiguration):
reconnect_strategy (Optional[BackoffStrategy]): Strategy used to determine how and when to reconnect, in case of
connection failures.
If not set, a default backoff strategy will be used.
database_id (Optional[Int]): index of the logical database to connect to.
database_id (Optional[int]): index of the logical database to connect to.
client_name (Optional[str]): Client name to be used for the client. Will be used with CLIENT SETNAME command during connection establishment.
protocol (ProtocolVersion): The version of the Redis RESP protocol to communicate with the server.
"""
Expand Down Expand Up @@ -224,7 +251,7 @@ def _create_a_protobuf_conn_request(
self, cluster_mode: bool = False
) -> ConnectionRequest:
assert cluster_mode is False
request = super()._create_a_protobuf_conn_request(False)
request = super()._create_a_protobuf_conn_request(cluster_mode)
if self.reconnect_strategy:
request.connection_retry_strategy.number_of_retries = (
self.reconnect_strategy.num_of_retries
Expand Down Expand Up @@ -259,6 +286,10 @@ class ClusterClientConfiguration(BaseClientConfiguration):
If the specified timeout is exceeded for a pending request, it will result in a timeout error. If not set, a default value will be used.
client_name (Optional[str]): Client name to be used for the client. Will be used with CLIENT SETNAME command during connection establishment.
protocol (ProtocolVersion): The version of the Redis RESP protocol to communicate with the server.
periodic_checks (Union[PeriodicChecksStatus, PeriodicChecksManualInterval]): Configure the periodic topology checks.
These checks evaluate changes in the cluster's topology, triggering a slot refresh when detected.
Periodic checks ensure a quick and efficient process by querying a limited number of nodes.
Defaults to PeriodicChecksStatus.ENABLED_DEFAULT_INTERVAL.
Notes:
Currently, the reconnection strategy in cluster mode is not configurable, and exponential backoff
Expand All @@ -274,6 +305,9 @@ def __init__(
request_timeout: Optional[int] = None,
client_name: Optional[str] = None,
protocol: ProtocolVersion = ProtocolVersion.RESP3,
periodic_checks: Union[
PeriodicChecksStatus, PeriodicChecksManualInterval
] = PeriodicChecksStatus.ENABLED_DEFAULT_INTERVAL,
):
super().__init__(
addresses=addresses,
Expand All @@ -284,3 +318,18 @@ def __init__(
client_name=client_name,
protocol=protocol,
)
self.periodic_checks = periodic_checks

def _create_a_protobuf_conn_request(
self, cluster_mode: bool = False
) -> ConnectionRequest:
assert cluster_mode is True
request = super()._create_a_protobuf_conn_request(cluster_mode)
if type(self.periodic_checks) is PeriodicChecksManualInterval:
request.periodic_checks_manual_interval.duration_in_sec = (
self.periodic_checks.duration_in_sec
)
elif self.periodic_checks == PeriodicChecksStatus.DISABLED:
request.periodic_checks_disabled.SetInParent()

return request
26 changes: 25 additions & 1 deletion python/python/tests/test_config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
# Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0

from glide.config import BaseClientConfiguration, NodeAddress, ReadFrom
from glide.config import (
BaseClientConfiguration,
ClusterClientConfiguration,
NodeAddress,
PeriodicChecksManualInterval,
PeriodicChecksStatus,
ReadFrom,
)
from glide.protobuf.connection_request_pb2 import ConnectionRequest
from glide.protobuf.connection_request_pb2 import ReadFrom as ProtobufReadFrom
from glide.protobuf.connection_request_pb2 import TlsMode
Expand Down Expand Up @@ -28,3 +35,20 @@ def test_convert_to_protobuf():
assert request.tls_mode is TlsMode.SecureTls
assert request.read_from == ProtobufReadFrom.PreferReplica
assert request.client_name == "TEST_CLIENT_NAME"


def test_periodic_checks_interval_to_protobuf():
config = ClusterClientConfiguration(
[NodeAddress("127.0.0.1")],
)
request = config._create_a_protobuf_conn_request(cluster_mode=True)
assert not request.HasField("periodic_checks_disabled")
assert not request.HasField("periodic_checks_manual_interval")

config.periodic_checks = PeriodicChecksStatus.DISABLED
request = config._create_a_protobuf_conn_request(cluster_mode=True)
assert request.HasField("periodic_checks_disabled")

config.periodic_checks = PeriodicChecksManualInterval(30)
request = config._create_a_protobuf_conn_request(cluster_mode=True)
assert request.periodic_checks_manual_interval.duration_in_sec == 30

0 comments on commit 8048ec7

Please sign in to comment.