Skip to content

Commit

Permalink
Refined slot refresh logic to accept RedirectNode, enabling targeted …
Browse files Browse the repository at this point in the history
…updates driven by moved redirects.
  • Loading branch information
barshaul committed Jul 30, 2024
1 parent de53b2b commit 070dc47
Showing 1 changed file with 52 additions and 10 deletions.
62 changes: 52 additions & 10 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,23 @@ impl From<ArcStr> for OperationTarget {
}
}

/// Represents a node to which a `MOVED` or `ASK` error redirects.
pub(crate) struct RedirectNode {
/// The address of the redirect node.
pub _address: String,
/// The slot of the redirect node.
pub _slot: u16,
}

impl RedirectNode {
pub(crate) fn from_option_tuple(option: Option<(&str, u16)>) -> Option<Self> {
option.map(|(address, slot)| RedirectNode {
_address: address.to_string(),
_slot: slot,
})
}
}

struct Message<C> {
cmd: CmdArg<C>,
sender: oneshot::Sender<RedisResult<Response>>,
Expand Down Expand Up @@ -772,6 +789,7 @@ enum Next<C> {
// if not set, then a slot refresh should happen without sending a request afterwards
request: Option<PendingRequest<C>>,
sleep_duration: Option<Duration>,
moved_redirect: Option<RedirectNode>,
},
ReconnectToInitialNodes {
// if not set, then a reconnect should happen without sending a request afterwards
Expand Down Expand Up @@ -816,6 +834,7 @@ impl<C> Future for Request<C> {
Next::RefreshSlots {
request: None,
sleep_duration: None,
moved_redirect: RedirectNode::from_option_tuple(err.redirect_node()),
}
.into()
} else if matches!(err.retry_method(), crate::types::RetryMethod::Reconnect) {
Expand Down Expand Up @@ -861,6 +880,7 @@ impl<C> Future for Request<C> {
return Next::RefreshSlots {
request: Some(request),
sleep_duration: Some(sleep_duration),
moved_redirect: None,
}
.into();
}
Expand All @@ -879,13 +899,15 @@ impl<C> Future for Request<C> {
}
crate::types::RetryMethod::MovedRedirect => {
let mut request = this.request.take().unwrap();
let redirect_node = err.redirect_node();
request.info.set_redirect(
err.redirect_node()
.map(|(node, _slot)| Redirect::Moved(node.to_string())),
);
Next::RefreshSlots {
request: Some(request),
sleep_duration: None,
moved_redirect: RedirectNode::from_option_tuple(redirect_node),
}
.into()
}
Expand Down Expand Up @@ -994,6 +1016,7 @@ where
Self::refresh_slots_and_subscriptions_with_retries(
connection.inner.clone(),
&RefreshPolicy::NotThrottable,
None,
)
.await?;

Expand Down Expand Up @@ -1142,6 +1165,7 @@ where
if let Err(err) = Self::refresh_slots_and_subscriptions_with_retries(
inner.clone(),
&RefreshPolicy::Throttable,
None,
)
.await
{
Expand Down Expand Up @@ -1313,6 +1337,7 @@ where
async fn refresh_slots_and_subscriptions_with_retries(
inner: Arc<InnerCore<C>>,
policy: &RefreshPolicy,
moved_redirect: Option<RedirectNode>,
) -> RedisResult<()> {
let SlotRefreshState {
in_progress,
Expand All @@ -1326,7 +1351,7 @@ where
{
return Ok(());
}
let mut skip_slots_refresh = false;
let mut should_refresh_slots = true;
if *policy == RefreshPolicy::Throttable {
// Check if the current slot refresh is triggered before the wait duration has passed
let last_run_rlock = last_run.read().await;
Expand All @@ -1345,13 +1370,13 @@ where
if passed_time <= wait_duration {
debug!("Skipping slot refresh as the wait duration hasn't yet passed. Passed time = {:?},
Wait duration = {:?}", passed_time, wait_duration);
skip_slots_refresh = true;
should_refresh_slots = false;
}
}
}

let mut res = Ok(());
if !skip_slots_refresh {
if should_refresh_slots {
let retry_strategy = ExponentialBackoff {
initial_interval: DEFAULT_REFRESH_SLOTS_RETRY_INITIAL_INTERVAL,
max_interval: DEFAULT_REFRESH_SLOTS_RETRY_MAX_INTERVAL,
Expand All @@ -1364,6 +1389,10 @@ where
Self::refresh_slots(inner.clone(), curr_retry)
})
.await;
} else if moved_redirect.is_some() {
// Update relevant slots in the slots map based on the moved_redirect address,
// rather than refreshing all slots by querying the cluster nodes for their topology view.
Self::update_slots_for_redirect_change(inner.clone(), moved_redirect).await;
}
in_progress.store(false, Ordering::Relaxed);

Expand All @@ -1372,13 +1401,22 @@ where
res
}

/// Update relevant slots in the slots map based on the moved_redirect address
pub(crate) async fn update_slots_for_redirect_change(
_inner: Arc<InnerCore<C>>,
_moved_redirect: Option<RedirectNode>,
) {
// TODO: Add implementation
}

pub(crate) async fn check_topology_and_refresh_if_diff(
inner: Arc<InnerCore<C>>,
policy: &RefreshPolicy,
) -> bool {
let topology_changed = Self::check_for_topology_diff(inner.clone()).await;
if topology_changed {
let _ = Self::refresh_slots_and_subscriptions_with_retries(inner.clone(), policy).await;
let _ = Self::refresh_slots_and_subscriptions_with_retries(inner.clone(), policy, None)
.await;
}
topology_changed
}
Expand Down Expand Up @@ -1969,6 +2007,7 @@ where
*future = Box::pin(Self::refresh_slots_and_subscriptions_with_retries(
self.inner.clone(),
&RefreshPolicy::Throttable,
None,
));
Poll::Ready(Err(err))
}
Expand Down Expand Up @@ -2064,9 +2103,10 @@ where
Next::RefreshSlots {
request,
sleep_duration,
moved_redirect,
} => {
poll_flush_action =
poll_flush_action.change_state(PollFlushAction::RebuildSlots);
poll_flush_action = poll_flush_action
.change_state(PollFlushAction::RebuildSlots(moved_redirect));
if let Some(request) = request {
let future: RequestState<
Pin<Box<dyn Future<Output = OperationResult> + Send>>,
Expand Down Expand Up @@ -2136,7 +2176,7 @@ where

enum PollFlushAction {
None,
RebuildSlots,
RebuildSlots(Option<RedirectNode>),
Reconnect(Vec<ArcStr>),
ReconnectFromInitialConnections,
}
Expand All @@ -2151,8 +2191,9 @@ impl PollFlushAction {
PollFlushAction::ReconnectFromInitialConnections
}

(PollFlushAction::RebuildSlots, _) | (_, PollFlushAction::RebuildSlots) => {
PollFlushAction::RebuildSlots
(PollFlushAction::RebuildSlots(moved_redirect), _)
| (_, PollFlushAction::RebuildSlots(moved_redirect)) => {
PollFlushAction::RebuildSlots(moved_redirect)
}

(PollFlushAction::Reconnect(mut addrs), PollFlushAction::Reconnect(new_addrs)) => {
Expand Down Expand Up @@ -2213,11 +2254,12 @@ where

match ready!(self.poll_complete(cx)) {
PollFlushAction::None => return Poll::Ready(Ok(())),
PollFlushAction::RebuildSlots => {
PollFlushAction::RebuildSlots(moved_redirect) => {
self.state = ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin(
ClusterConnInner::refresh_slots_and_subscriptions_with_retries(
self.inner.clone(),
&RefreshPolicy::Throttable,
moved_redirect,
),
)));
}
Expand Down

0 comments on commit 070dc47

Please sign in to comment.