diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index 8939453f9dd9..e901dcb721bd 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -697,7 +697,7 @@ jobs: working-directory: tests-integration/fixtures/postgres run: docker compose -f docker-compose-standalone.yml up -d --wait - name: Run nextest cases - run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F pyo3_backend -F dashboard + run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F pyo3_backend -F dashboard -F pg_kvbackend env: CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=lld" RUST_BACKTRACE: 1 diff --git a/src/common/meta/src/kv_backend/postgres.rs b/src/common/meta/src/kv_backend/postgres.rs index 8add65cd49c4..f67f527871ea 100644 --- a/src/common/meta/src/kv_backend/postgres.rs +++ b/src/common/meta/src/kv_backend/postgres.rs @@ -16,6 +16,7 @@ use std::any::Any; use std::borrow::Cow; use std::sync::Arc; +use common_telemetry::error; use snafu::ResultExt; use tokio_postgres::types::ToSql; use tokio_postgres::{Client, NoTls}; @@ -97,7 +98,11 @@ impl PgStore { let (client, conn) = tokio_postgres::connect(url, NoTls) .await .context(ConnectPostgresSnafu)?; - tokio::spawn(async move { conn.await.context(ConnectPostgresSnafu) }); + tokio::spawn(async move { + if let Err(e) = conn.await { + error!(e; "connection error"); + } + }); Self::with_pg_client(client).await } diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 71430ee992af..13975ff95091 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -6,7 +6,7 @@ license.workspace = true [features] mock = [] -pg_kvbackend = ["dep:tokio-postgres"] +pg_kvbackend = ["dep:tokio-postgres", "common-meta/pg_kvbackend"] [lints] workspace = true @@ -14,6 +14,7 @@ workspace = true [dependencies] api.workspace = true async-trait = "0.1" +chrono.workspace = true clap.workspace = true client.workspace = true common-base.workspace = true @@ -55,7 +56,7 @@ snafu.workspace = true store-api.workspace = true table.workspace = true tokio.workspace = true -tokio-postgres = { workspace = true, optional = true } +tokio-postgres = { workspace = true, optional = true, features = ["with-chrono-0_4"] } tokio-stream = { workspace = true, features = ["net"] } toml.workspace = true tonic.workspace = true diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 85770e1f3d4d..b4408db7acde 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -26,6 +26,8 @@ use common_meta::kv_backend::memory::MemoryKvBackend; #[cfg(feature = "pg_kvbackend")] use common_meta::kv_backend::postgres::PgStore; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; +#[cfg(feature = "pg_kvbackend")] +use common_telemetry::error; use common_telemetry::info; use etcd_client::Client; use futures::future; @@ -224,8 +226,9 @@ pub async fn metasrv_builder( #[cfg(feature = "pg_kvbackend")] (None, BackendImpl::PostgresStore) => { let pg_client = create_postgres_client(opts).await?; - let kv_backend = PgStore::with_pg_client(pg_client).await.unwrap(); - // TODO(jeremy, weny): implement election for postgres + let kv_backend = PgStore::with_pg_client(pg_client) + .await + .context(error::KvBackendSnafu)?; (kv_backend, None) } }; @@ -275,8 +278,14 @@ async fn create_postgres_client(opts: &MetasrvOptions) -> Result), - StepDown(Arc), + Elected(Arc), + StepDown(Arc), +} + +/// LeaderKey is a key that represents the leader of metasrv. +/// The structure is corresponding to [etcd_client::LeaderKey]. +pub trait LeaderKey: Send + Sync + Debug { + /// The name in byte. name is the election identifier that corresponds to the leadership key. + fn name(&self) -> &[u8]; + + /// The key in byte. key is an opaque key representing the ownership of the election. If the key + /// is deleted, then leadership is lost. + fn key(&self) -> &[u8]; + + /// The creation revision of the key. + fn revision(&self) -> i64; + + /// The lease ID of the election leader. + fn lease_id(&self) -> i64; } impl fmt::Display for LeaderChangeMessage { @@ -47,8 +69,8 @@ impl fmt::Display for LeaderChangeMessage { write!(f, "LeaderKey {{ ")?; write!(f, "name: {}", String::from_utf8_lossy(leader_key.name()))?; write!(f, ", key: {}", String::from_utf8_lossy(leader_key.key()))?; - write!(f, ", rev: {}", leader_key.rev())?; - write!(f, ", lease: {}", leader_key.lease())?; + write!(f, ", rev: {}", leader_key.revision())?; + write!(f, ", lease: {}", leader_key.lease_id())?; write!(f, " }})") } } @@ -65,7 +87,7 @@ pub trait Election: Send + Sync { /// initialization operations can be performed. /// /// note: a new leader will only return true on the first call. - fn in_infancy(&self) -> bool; + fn in_leader_infancy(&self) -> bool; /// Registers a candidate for the election. async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()>; diff --git a/src/meta-srv/src/election/etcd.rs b/src/meta-srv/src/election/etcd.rs index fef7e928a783..5f2cf3342007 100644 --- a/src/meta-srv/src/election/etcd.rs +++ b/src/meta-srv/src/election/etcd.rs @@ -18,18 +18,41 @@ use std::time::Duration; use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS}; use common_telemetry::{error, info, warn}; -use etcd_client::{Client, GetOptions, LeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions}; +use etcd_client::{ + Client, GetOptions, LeaderKey as EtcdLeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions, +}; use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; use tokio::sync::broadcast::Receiver; use tokio::time::{timeout, MissedTickBehavior}; -use crate::election::{Election, LeaderChangeMessage, CANDIDATES_ROOT, ELECTION_KEY}; +use crate::election::{ + Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, CANDIDATE_LEASE_SECS, ELECTION_KEY, + KEEP_ALIVE_INTERVAL_SECS, +}; use crate::error; use crate::error::Result; use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; +impl LeaderKey for EtcdLeaderKey { + fn name(&self) -> &[u8] { + self.name() + } + + fn key(&self) -> &[u8] { + self.key() + } + + fn revision(&self) -> i64 { + self.rev() + } + + fn lease_id(&self) -> i64 { + self.lease() + } +} + pub struct EtcdElection { leader_value: String, client: Client, @@ -75,15 +98,15 @@ impl EtcdElection { LeaderChangeMessage::Elected(key) => { info!( "[{leader_ident}] is elected as leader: {:?}, lease: {}", - key.name_str(), - key.lease() + String::from_utf8_lossy(key.name()), + key.lease_id() ); } LeaderChangeMessage::StepDown(key) => { warn!( "[{leader_ident}] is stepping down: {:?}, lease: {}", - key.name_str(), - key.lease() + String::from_utf8_lossy(key.name()), + key.lease_id() ); } }, @@ -126,16 +149,13 @@ impl Election for EtcdElection { self.is_leader.load(Ordering::Relaxed) } - fn in_infancy(&self) -> bool { + fn in_leader_infancy(&self) -> bool { self.infancy .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) .is_ok() } async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> { - const CANDIDATE_LEASE_SECS: u64 = 600; - const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2; - let mut lease_client = self.client.lease_client(); let res = lease_client .grant(CANDIDATE_LEASE_SECS as i64, None) @@ -239,7 +259,7 @@ impl Election for EtcdElection { // The keep alive operation MUST be done in `META_KEEP_ALIVE_INTERVAL_SECS`. match timeout( keep_lease_duration, - self.keep_alive(&mut keeper, &mut receiver, leader), + self.keep_alive(&mut keeper, &mut receiver, leader.clone()), ) .await { @@ -303,7 +323,7 @@ impl EtcdElection { &self, keeper: &mut LeaseKeeper, receiver: &mut LeaseKeepAliveStream, - leader: &LeaderKey, + leader: EtcdLeaderKey, ) -> Result<()> { keeper.keep_alive().await.context(error::EtcdFailedSnafu)?; if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? { @@ -324,7 +344,7 @@ impl EtcdElection { if let Err(e) = self .leader_watcher - .send(LeaderChangeMessage::Elected(Arc::new(leader.clone()))) + .send(LeaderChangeMessage::Elected(Arc::new(leader))) { error!(e; "Failed to send leader change message"); } diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs new file mode 100644 index 000000000000..805b9b8bd208 --- /dev/null +++ b/src/meta-srv/src/election/postgres.rs @@ -0,0 +1,519 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use common_time::Timestamp; +use itertools::Itertools; +use snafu::{ensure, OptionExt, ResultExt}; +use tokio::sync::broadcast; +use tokio_postgres::Client; + +use crate::election::{Election, LeaderChangeMessage, CANDIDATES_ROOT, ELECTION_KEY}; +use crate::error::{ + DeserializeFromJsonSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu, UnexpectedSnafu, +}; +use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; + +// Separator between value and expire time. +const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#; + +// SQL to put a value with expire time. Parameters: key, value, LEASE_SEP, expire_time +const PUT_IF_NOT_EXISTS_WITH_EXPIRE_TIME: &str = r#" +WITH prev AS ( + SELECT k, v FROM greptime_metakv WHERE k = $1 +), insert AS ( + INSERT INTO greptime_metakv + VALUES($1, $2 || $3 || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS')) + ON CONFLICT (k) DO NOTHING +) + +SELECT k, v FROM prev; +"#; + +// SQL to update a value with expire time. Parameters: key, prev_value_with_lease, updated_value, LEASE_SEP, expire_time +const CAS_WITH_EXPIRE_TIME: &str = r#" +UPDATE greptime_metakv +SET k=$1, +v=$3 || $4 || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $5, 'YYYY-MM-DD HH24:MI:SS.MS') +WHERE + k=$1 AND v=$2 +"#; + +const GET_WITH_CURRENT_TIMESTAMP: &str = r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM greptime_metakv WHERE k = $1"#; + +const PREFIX_GET_WITH_CURRENT_TIMESTAMP: &str = r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM greptime_metakv WHERE k LIKE $1"#; + +const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE k = $1 RETURNING k,v;"; + +/// Parse the value and expire time from the given string. The value should be in the format "value || LEASE_SEP || expire_time". +fn parse_value_and_expire_time(value: &str) -> Result<(String, Timestamp)> { + let (value, expire_time) = value + .split(LEASE_SEP) + .collect_tuple() + .context(UnexpectedSnafu { + violated: format!( + "Invalid value {}, expect node info || {} || expire time", + value, LEASE_SEP + ), + })?; + // Given expire_time is in the format 'YYYY-MM-DD HH24:MI:SS.MS' + let expire_time = match Timestamp::from_str(expire_time, None) { + Ok(ts) => ts, + Err(_) => UnexpectedSnafu { + violated: format!("Invalid timestamp: {}", expire_time), + } + .fail()?, + }; + Ok((value.to_string(), expire_time)) +} + +/// PostgreSql implementation of Election. +/// TODO(CookiePie): Currently only support candidate registration. Add election logic. +pub struct PgElection { + leader_value: String, + client: Client, + is_leader: AtomicBool, + leader_infancy: AtomicBool, + leader_watcher: broadcast::Sender, + store_key_prefix: String, + candidate_lease_ttl_secs: u64, +} + +impl PgElection { + pub async fn with_pg_client( + leader_value: String, + client: Client, + store_key_prefix: String, + candidate_lease_ttl_secs: u64, + ) -> Result { + let (tx, _) = broadcast::channel(100); + Ok(Arc::new(Self { + leader_value, + client, + is_leader: AtomicBool::new(false), + leader_infancy: AtomicBool::new(false), + leader_watcher: tx, + store_key_prefix, + candidate_lease_ttl_secs, + })) + } + + fn _election_key(&self) -> String { + format!("{}{}", self.store_key_prefix, ELECTION_KEY) + } + + fn candidate_root(&self) -> String { + format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT) + } + + fn candidate_key(&self) -> String { + format!("{}{}", self.candidate_root(), self.leader_value) + } +} + +#[async_trait::async_trait] +impl Election for PgElection { + type Leader = LeaderValue; + + fn is_leader(&self) -> bool { + self.is_leader.load(Ordering::Relaxed) + } + + fn in_leader_infancy(&self) -> bool { + self.leader_infancy + .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + } + + /// TODO(CookiePie): Split the candidate registration and keep alive logic into separate methods, so that upper layers can call them separately. + async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> { + let key = self.candidate_key(); + let node_info = + serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu { + input: format!("{node_info:?}"), + })?; + let res = self.put_value_with_lease(&key, &node_info).await?; + // May registered before, just update the lease. + if !res { + self.delete_value(&key).await?; + self.put_value_with_lease(&key, &node_info).await?; + } + + // Check if the current lease has expired and renew the lease. + let mut keep_alive_interval = + tokio::time::interval(Duration::from_secs(self.candidate_lease_ttl_secs / 2)); + loop { + let _ = keep_alive_interval.tick().await; + + let (_, prev_expire_time, current_time, origin) = self + .get_value_with_lease(&key, true) + .await? + .unwrap_or_default(); + + ensure!( + prev_expire_time > current_time, + UnexpectedSnafu { + violated: format!( + "Candidate lease expired, key: {:?}", + String::from_utf8_lossy(&key.into_bytes()) + ), + } + ); + + // Safety: origin is Some since we are using `get_value_with_lease` with `true`. + let origin = origin.unwrap(); + self.update_value_with_lease(&key, &origin, &node_info) + .await?; + } + } + + async fn all_candidates(&self) -> Result> { + let key_prefix = self.candidate_root(); + let (mut candidates, current) = self.get_value_with_lease_by_prefix(&key_prefix).await?; + // Remove expired candidates + candidates.retain(|c| c.1 > current); + let mut valid_candidates = Vec::with_capacity(candidates.len()); + for (c, _) in candidates { + let node_info: MetasrvNodeInfo = + serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu { + input: format!("{:?}", c), + })?; + valid_candidates.push(node_info); + } + Ok(valid_candidates) + } + + async fn campaign(&self) -> Result<()> { + todo!() + } + + async fn leader(&self) -> Result { + todo!() + } + + async fn resign(&self) -> Result<()> { + todo!() + } + + fn subscribe_leader_change(&self) -> broadcast::Receiver { + self.leader_watcher.subscribe() + } +} + +impl PgElection { + /// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned. + async fn get_value_with_lease( + &self, + key: &String, + with_origin: bool, + ) -> Result)>> { + let res = self + .client + .query(GET_WITH_CURRENT_TIMESTAMP, &[&key]) + .await + .context(PostgresExecutionSnafu)?; + + if res.is_empty() { + Ok(None) + } else { + // Safety: Checked if res is empty above. + let current_time_str = res[0].get(1); + let current_time = match Timestamp::from_str(current_time_str, None) { + Ok(ts) => ts, + Err(_) => UnexpectedSnafu { + violated: format!("Invalid timestamp: {}", current_time_str), + } + .fail()?, + }; + // Safety: Checked if res is empty above. + let value_and_expire_time = res[0].get(0); + let (value, expire_time) = parse_value_and_expire_time(value_and_expire_time)?; + + if with_origin { + Ok(Some(( + value, + expire_time, + current_time, + Some(value_and_expire_time.to_string()), + ))) + } else { + Ok(Some((value, expire_time, current_time, None))) + } + } + } + + /// Returns all values and expire time with the given key prefix. Also returns the current time. + async fn get_value_with_lease_by_prefix( + &self, + key_prefix: &str, + ) -> Result<(Vec<(String, Timestamp)>, Timestamp)> { + let key_prefix = format!("{}%", key_prefix); + let res = self + .client + .query(PREFIX_GET_WITH_CURRENT_TIMESTAMP, &[&key_prefix]) + .await + .context(PostgresExecutionSnafu)?; + + let mut values_with_leases = vec![]; + let mut current = Timestamp::default(); + for row in res { + let current_time_str = row.get(1); + current = match Timestamp::from_str(current_time_str, None) { + Ok(ts) => ts, + Err(_) => UnexpectedSnafu { + violated: format!("Invalid timestamp: {}", current_time_str), + } + .fail()?, + }; + + let value_and_expire_time = row.get(0); + let (value, expire_time) = parse_value_and_expire_time(value_and_expire_time)?; + + values_with_leases.push((value, expire_time)); + } + Ok((values_with_leases, current)) + } + + async fn update_value_with_lease(&self, key: &str, prev: &str, updated: &str) -> Result<()> { + let res = self + .client + .execute( + CAS_WITH_EXPIRE_TIME, + &[ + &key, + &prev, + &updated, + &LEASE_SEP, + &(self.candidate_lease_ttl_secs as f64), + ], + ) + .await + .context(PostgresExecutionSnafu)?; + + ensure!( + res == 1, + UnexpectedSnafu { + violated: format!("Failed to update key: {}", key), + } + ); + + Ok(()) + } + + /// Returns `true` if the insertion is successful + async fn put_value_with_lease(&self, key: &str, value: &str) -> Result { + let res = self + .client + .query( + PUT_IF_NOT_EXISTS_WITH_EXPIRE_TIME, + &[ + &key, + &value, + &LEASE_SEP, + &(self.candidate_lease_ttl_secs as f64), + ], + ) + .await + .context(PostgresExecutionSnafu)?; + Ok(res.is_empty()) + } + + /// Returns `true` if the deletion is successful. + /// Caution: Should only delete the key if the lease is expired. + async fn delete_value(&self, key: &String) -> Result { + let res = self + .client + .query(POINT_DELETE, &[&key]) + .await + .context(PostgresExecutionSnafu)?; + + Ok(res.len() == 1) + } +} + +#[cfg(test)] +mod tests { + use std::env; + + use tokio_postgres::{Client, NoTls}; + + use super::*; + use crate::error::PostgresExecutionSnafu; + + async fn create_postgres_client() -> Result { + let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default(); + if endpoint.is_empty() { + return UnexpectedSnafu { + violated: "Postgres endpoint is empty".to_string(), + } + .fail(); + } + let (client, connection) = tokio_postgres::connect(&endpoint, NoTls) + .await + .context(PostgresExecutionSnafu)?; + tokio::spawn(async move { + connection.await.context(PostgresExecutionSnafu).unwrap(); + }); + Ok(client) + } + + #[tokio::test] + async fn test_postgres_crud() { + let client = create_postgres_client().await.unwrap(); + + let key = "test_key".to_string(); + let value = "test_value".to_string(); + + let (tx, _) = broadcast::channel(100); + let pg_election = PgElection { + leader_value: "test_leader".to_string(), + client, + is_leader: AtomicBool::new(false), + leader_infancy: AtomicBool::new(true), + leader_watcher: tx, + store_key_prefix: "test_prefix".to_string(), + candidate_lease_ttl_secs: 10, + }; + + let res = pg_election + .put_value_with_lease(&key, &value) + .await + .unwrap(); + assert!(res); + + let (value, _, _, prev) = pg_election + .get_value_with_lease(&key, true) + .await + .unwrap() + .unwrap(); + assert_eq!(value, value); + + let prev = prev.unwrap(); + pg_election + .update_value_with_lease(&key, &prev, &value) + .await + .unwrap(); + + let res = pg_election.delete_value(&key).await.unwrap(); + assert!(res); + + let res = pg_election.get_value_with_lease(&key, false).await.unwrap(); + assert!(res.is_none()); + + for i in 0..10 { + let key = format!("test_key_{}", i); + let value = format!("test_value_{}", i); + pg_election + .put_value_with_lease(&key, &value) + .await + .unwrap(); + } + + let key_prefix = "test_key".to_string(); + let (res, _) = pg_election + .get_value_with_lease_by_prefix(&key_prefix) + .await + .unwrap(); + assert_eq!(res.len(), 10); + + for i in 0..10 { + let key = format!("test_key_{}", i); + let res = pg_election.delete_value(&key).await.unwrap(); + assert!(res); + } + + let (res, current) = pg_election + .get_value_with_lease_by_prefix(&key_prefix) + .await + .unwrap(); + assert!(res.is_empty()); + assert!(current == Timestamp::default()); + } + + async fn candidate(leader_value: String, candidate_lease_ttl_secs: u64) { + let client = create_postgres_client().await.unwrap(); + + let (tx, _) = broadcast::channel(100); + let pg_election = PgElection { + leader_value, + client, + is_leader: AtomicBool::new(false), + leader_infancy: AtomicBool::new(true), + leader_watcher: tx, + store_key_prefix: "test_prefix".to_string(), + candidate_lease_ttl_secs, + }; + + let node_info = MetasrvNodeInfo { + addr: "test_addr".to_string(), + version: "test_version".to_string(), + git_commit: "test_git_commit".to_string(), + start_time_ms: 0, + }; + pg_election.register_candidate(&node_info).await.unwrap(); + } + + #[tokio::test] + async fn test_candidate_registration() { + let leader_value_prefix = "test_leader".to_string(); + let candidate_lease_ttl_secs = 5; + let mut handles = vec![]; + for i in 0..10 { + let leader_value = format!("{}{}", leader_value_prefix, i); + let handle = tokio::spawn(candidate(leader_value, candidate_lease_ttl_secs)); + handles.push(handle); + } + // Wait for candidates to registrate themselves and renew their leases at least once. + tokio::time::sleep(Duration::from_secs(6)).await; + + let client = create_postgres_client().await.unwrap(); + + let (tx, _) = broadcast::channel(100); + let leader_value = "test_leader".to_string(); + let pg_election = PgElection { + leader_value, + client, + is_leader: AtomicBool::new(false), + leader_infancy: AtomicBool::new(true), + leader_watcher: tx, + store_key_prefix: "test_prefix".to_string(), + candidate_lease_ttl_secs, + }; + + let candidates = pg_election.all_candidates().await.unwrap(); + assert_eq!(candidates.len(), 10); + + for handle in handles { + handle.abort(); + } + + // Wait for the candidate leases to expire. + tokio::time::sleep(Duration::from_secs(5)).await; + let candidates = pg_election.all_candidates().await.unwrap(); + assert!(candidates.is_empty()); + + // Garbage collection + for i in 0..10 { + let key = format!( + "{}{}{}{}", + "test_prefix", CANDIDATES_ROOT, leader_value_prefix, i + ); + let res = pg_election.delete_value(&key).await.unwrap(); + assert!(res); + } + } +} diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 1c529f06d606..ddc9d3658bad 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -697,6 +697,8 @@ pub enum Error { #[cfg(feature = "pg_kvbackend")] #[snafu(display("Failed to execute via postgres"))] PostgresExecution { + #[snafu(source)] + error: tokio_postgres::Error, #[snafu(implicit)] location: Location, }, diff --git a/src/meta-srv/src/handler/on_leader_start_handler.rs b/src/meta-srv/src/handler/on_leader_start_handler.rs index dccb8d3d60f9..97e1704343b8 100644 --- a/src/meta-srv/src/handler/on_leader_start_handler.rs +++ b/src/meta-srv/src/handler/on_leader_start_handler.rs @@ -36,7 +36,7 @@ impl HeartbeatHandler for OnLeaderStartHandler { return Ok(HandleControl::Continue); }; - if election.in_infancy() { + if election.in_leader_infancy() { ctx.is_infancy = true; // TODO(weny): Unifies the multiple leader state between Context and Metasrv. // we can't ensure the in-memory kv has already been reset in the outside loop.