diff --git a/src/common/meta/src/kv_backend/postgres.rs b/src/common/meta/src/kv_backend/postgres.rs index d55e0b8a1505..23c506761a9d 100644 --- a/src/common/meta/src/kv_backend/postgres.rs +++ b/src/common/meta/src/kv_backend/postgres.rs @@ -56,7 +56,7 @@ const RANGE_SCAN_FULL_RANGE: &str = const FULL_TABLE_DELETE: &str = "DELETE FROM greptime_metakv RETURNING k,v"; -const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE K = $1 RETURNING k,v;"; +pub const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE K = $1 RETURNING k,v;"; const PREFIX_DELETE: &str = "DELETE FROM greptime_metakv WHERE k LIKE $1 RETURNING k,v;"; diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 2f88794d2b6c..485e9ac0c042 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -17,7 +17,9 @@ use std::sync::Arc; use std::time::{self, Duration}; use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS}; -use common_meta::kv_backend::postgres::{CAS, POINT_GET, PREFIX_SCAN, PUT_IF_NOT_EXISTS}; +use common_meta::kv_backend::postgres::{ + CAS, POINT_DELETE, POINT_GET, PREFIX_SCAN, PUT_IF_NOT_EXISTS, +}; use common_telemetry::{error, info, warn}; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; @@ -170,8 +172,22 @@ impl Election for PgElection { .as_secs_f64() + CANDIDATE_LEASE_SECS as f64, }; - self.put_value_with_lease(&key, &value_with_lease).await?; + let res = self.put_value_with_lease(&key, &value_with_lease).await?; + // May registered before, check if the lease is expired. If not, just renew the lease. + if !res { + let prev = self.get_value_with_lease(&key).await?; + if prev.expire_time + < time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64() + { + self.delete_value(&key).await?; + self.put_value_with_lease(&key, &value_with_lease).await?; + } + } + // Renew the lease let mut keep_alive_interval = tokio::time::interval(Duration::from_secs(KEEP_ALIVE_INTERVAL_SECS)); @@ -430,7 +446,7 @@ impl PgElection { } } - // Returns `true` if the insertion is successful + /// Returns `true` if the insertion is successful async fn put_value_with_lease(&self, key: &Vec, value: &ValueWithLease) -> Result { let value = serde_json::to_string(value) .with_context(|_| SerializeToJsonSnafu { @@ -447,6 +463,18 @@ impl PgElection { Ok(res.is_empty()) } + /// Returns `true` if the deletion is successful. + /// Caution: Should only delete the key if the lease is expired. + async fn delete_value(&self, key: &Vec) -> Result { + let res = self + .client + .query(POINT_DELETE, &[key]) + .await + .context(PostgresExecutionSnafu)?; + + Ok(res.len() == 1) + } + async fn keep_alive( &self, key: &Vec, @@ -507,6 +535,8 @@ impl PgElection { .unwrap_or_default() .as_secs_f64(); if leader_value_with_lease.expire_time <= now { + // Invalidate preivous leader + self.delete_value(key).await?; return Ok(()); } }