diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 12510eb84b10..2f88794d2b6c 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -37,6 +37,7 @@ use crate::error::{ use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; const CAMPAIGN: &str = "SELECT pg_try_advisory_lock(1)"; +const UNLOCK: &str = "SELECT pg_advisory_unlock(1)"; #[derive(Debug, Serialize, Deserialize)] struct ValueWithLease { @@ -243,62 +244,76 @@ impl Election for PgElection { .as_secs_f64() + META_LEASE_SECS as f64, }; - self.put_value_with_lease(&key, &leader_value_with_lease) - .await?; - - let keep_lease_duration = Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS); - let mut keep_alive_interval = tokio::time::interval(keep_lease_duration); - keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); - loop { - match timeout( - keep_lease_duration, - self.keep_alive(&key, leader_key.clone(), true), - ) - .await - { - Ok(Ok(())) => { - let _ = keep_alive_interval.tick().await; + // If the leader value is successfully put, then we start the keep alive loop + if self + .put_value_with_lease(&key, &leader_value_with_lease) + .await? + { + let res = self + .client + .query(UNLOCK, &[]) + .await + .context(PostgresExecutionSnafu)?; + match res.first() { + Some(row) => { + let unlocked: bool = row.get(0); + if !unlocked { + return UnexpectedSnafu { + violated: "Failed to unlock the advisory lock".to_string(), + } + .fail(); + } } - Ok(Err(err)) => { - error!(err; "Failed to keep alive"); - break; + None => { + return UnexpectedSnafu { + violated: "Failed to unlock the advisory lock".to_string(), + } + .fail(); } - Err(_) => { - error!("Refresh lease timeout"); - break; + } + let keep_lease_duration = Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS); + let mut keep_alive_interval = tokio::time::interval(keep_lease_duration); + keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + loop { + match timeout( + keep_lease_duration, + self.keep_alive(&key, leader_key.clone(), true), + ) + .await + { + Ok(Ok(())) => { + let _ = keep_alive_interval.tick().await; + } + Ok(Err(err)) => { + error!(err; "Failed to keep alive"); + break; + } + Err(_) => { + error!("Refresh lease timeout"); + break; + } } } - } - // Step down - if self - .is_leader - .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) - .is_ok() - { - if let Err(e) = self - .leader_watcher - .send(LeaderChangeMessage::StepDown(Arc::new(leader_key))) + // Step down + if self + .is_leader + .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() { - error!(e; "Failed to send leader change message"); + if let Err(e) = self + .leader_watcher + .send(LeaderChangeMessage::StepDown(Arc::new(leader_key))) + { + error!(e; "Failed to send leader change message"); + } } + } else { + // Failed to put the leader value, we fall back to the candidate and check the leadership + self.check_leadership(&key).await?; } } else { - // Not the leader, we check if the leader is still alive - let check_interval = Duration::from_secs(META_LEASE_SECS); - let mut check_interval = tokio::time::interval(check_interval); - loop { - let _ = check_interval.tick().await; - let leader_value_with_lease = self.get_value_with_lease(&key).await?; - let now = time::SystemTime::now() - .duration_since(time::UNIX_EPOCH) - .unwrap_or_default() - .as_secs_f64(); - // If the leader is expired, we re-initiate the campaign - if leader_value_with_lease.expire_time <= now { - break; - } - } + self.check_leadership(&key).await?; } } @@ -416,7 +431,7 @@ impl PgElection { } // Returns `true` if the insertion is successful - async fn put_value_with_lease(&self, key: &Vec, value: &ValueWithLease) -> Result<()> { + async fn put_value_with_lease(&self, key: &Vec, value: &ValueWithLease) -> Result { let value = serde_json::to_string(value) .with_context(|_| SerializeToJsonSnafu { input: format!("{value:?}"), @@ -429,17 +444,7 @@ impl PgElection { .await .context(PostgresExecutionSnafu)?; - ensure!( - res.is_empty(), - UnexpectedSnafu { - violated: format!( - "Failed to insert value from key: {:?}", - String::from_utf8_lossy(key) - ), - } - ); - - Ok(()) + Ok(res.is_empty()) } async fn keep_alive( @@ -489,4 +494,21 @@ impl PgElection { } Ok(()) } + + // Check if the leader is still valid + async fn check_leadership(&self, key: &Vec) -> Result<()> { + let check_interval = Duration::from_secs(META_LEASE_SECS); + let mut check_interval = tokio::time::interval(check_interval); + loop { + let _ = check_interval.tick().await; + let leader_value_with_lease = self.get_value_with_lease(key).await?; + let now = time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64(); + if leader_value_with_lease.expire_time <= now { + return Ok(()); + } + } + } }