Skip to content

Commit

Permalink
fix: release advisory lock
Browse files Browse the repository at this point in the history
  • Loading branch information
CookiePieWw committed Dec 19, 2024
1 parent bb91eb5 commit 903865d
Showing 1 changed file with 81 additions and 59 deletions.
140 changes: 81 additions & 59 deletions src/meta-srv/src/election/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::error::{
use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};

const CAMPAIGN: &str = "SELECT pg_try_advisory_lock(1)";
const UNLOCK: &str = "SELECT pg_advisory_unlock(1)";

#[derive(Debug, Serialize, Deserialize)]
struct ValueWithLease {
Expand Down Expand Up @@ -243,62 +244,76 @@ impl Election for PgElection {
.as_secs_f64()
+ META_LEASE_SECS as f64,
};
self.put_value_with_lease(&key, &leader_value_with_lease)
.await?;

let keep_lease_duration = Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS);
let mut keep_alive_interval = tokio::time::interval(keep_lease_duration);
keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
match timeout(
keep_lease_duration,
self.keep_alive(&key, leader_key.clone(), true),
)
.await
{
Ok(Ok(())) => {
let _ = keep_alive_interval.tick().await;
// If the leader value is successfully put, then we start the keep alive loop
if self
.put_value_with_lease(&key, &leader_value_with_lease)
.await?
{
let res = self
.client
.query(UNLOCK, &[])
.await
.context(PostgresExecutionSnafu)?;
match res.first() {
Some(row) => {
let unlocked: bool = row.get(0);
if !unlocked {
return UnexpectedSnafu {
violated: "Failed to unlock the advisory lock".to_string(),
}
.fail();
}
}
Ok(Err(err)) => {
error!(err; "Failed to keep alive");
break;
None => {
return UnexpectedSnafu {
violated: "Failed to unlock the advisory lock".to_string(),
}
.fail();
}
Err(_) => {
error!("Refresh lease timeout");
break;
}
let keep_lease_duration = Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS);
let mut keep_alive_interval = tokio::time::interval(keep_lease_duration);
keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
match timeout(
keep_lease_duration,
self.keep_alive(&key, leader_key.clone(), true),
)
.await
{
Ok(Ok(())) => {
let _ = keep_alive_interval.tick().await;
}
Ok(Err(err)) => {
error!(err; "Failed to keep alive");
break;
}
Err(_) => {
error!("Refresh lease timeout");
break;
}
}
}
}

// Step down
if self
.is_leader
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::StepDown(Arc::new(leader_key)))
// Step down
if self
.is_leader
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
error!(e; "Failed to send leader change message");
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::StepDown(Arc::new(leader_key)))
{
error!(e; "Failed to send leader change message");
}
}
} else {
// Failed to put the leader value, we fall back to the candidate and check the leadership
self.check_leadership(&key).await?;
}
} else {
// Not the leader, we check if the leader is still alive
let check_interval = Duration::from_secs(META_LEASE_SECS);
let mut check_interval = tokio::time::interval(check_interval);
loop {
let _ = check_interval.tick().await;
let leader_value_with_lease = self.get_value_with_lease(&key).await?;
let now = time::SystemTime::now()
.duration_since(time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64();
// If the leader is expired, we re-initiate the campaign
if leader_value_with_lease.expire_time <= now {
break;
}
}
self.check_leadership(&key).await?;
}
}

Expand Down Expand Up @@ -416,7 +431,7 @@ impl PgElection {
}

// Returns `true` if the insertion is successful
async fn put_value_with_lease(&self, key: &Vec<u8>, value: &ValueWithLease) -> Result<()> {
async fn put_value_with_lease(&self, key: &Vec<u8>, value: &ValueWithLease) -> Result<bool> {
let value = serde_json::to_string(value)
.with_context(|_| SerializeToJsonSnafu {
input: format!("{value:?}"),
Expand All @@ -429,17 +444,7 @@ impl PgElection {
.await
.context(PostgresExecutionSnafu)?;

ensure!(
res.is_empty(),
UnexpectedSnafu {
violated: format!(
"Failed to insert value from key: {:?}",
String::from_utf8_lossy(key)
),
}
);

Ok(())
Ok(res.is_empty())
}

async fn keep_alive(
Expand Down Expand Up @@ -489,4 +494,21 @@ impl PgElection {
}
Ok(())
}

// Check if the leader is still valid
async fn check_leadership(&self, key: &Vec<u8>) -> Result<()> {
let check_interval = Duration::from_secs(META_LEASE_SECS);
let mut check_interval = tokio::time::interval(check_interval);
loop {
let _ = check_interval.tick().await;
let leader_value_with_lease = self.get_value_with_lease(key).await?;
let now = time::SystemTime::now()
.duration_since(time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64();
if leader_value_with_lease.expire_time <= now {
return Ok(());
}
}
}
}

0 comments on commit 903865d

Please sign in to comment.