diff --git a/src/common/meta/src/kv_backend/postgres.rs b/src/common/meta/src/kv_backend/postgres.rs index 8add65cd49c4..23c506761a9d 100644 --- a/src/common/meta/src/kv_backend/postgres.rs +++ b/src/common/meta/src/kv_backend/postgres.rs @@ -45,9 +45,9 @@ const METADKV_CREATION: &str = const FULL_TABLE_SCAN: &str = "SELECT k, v FROM greptime_metakv $1 ORDER BY K"; -const POINT_GET: &str = "SELECT k, v FROM greptime_metakv WHERE k = $1"; +pub const POINT_GET: &str = "SELECT k, v FROM greptime_metakv WHERE k = $1"; -const PREFIX_SCAN: &str = "SELECT k, v FROM greptime_metakv WHERE k LIKE $1 ORDER BY K"; +pub const PREFIX_SCAN: &str = "SELECT k, v FROM greptime_metakv WHERE k LIKE $1 ORDER BY K"; const RANGE_SCAN_LEFT_BOUNDED: &str = "SELECT k, v FROM greptime_metakv WHERE k >= $1 ORDER BY K"; @@ -56,7 +56,7 @@ const RANGE_SCAN_FULL_RANGE: &str = const FULL_TABLE_DELETE: &str = "DELETE FROM greptime_metakv RETURNING k,v"; -const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE K = $1 RETURNING k,v;"; +pub const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE K = $1 RETURNING k,v;"; const PREFIX_DELETE: &str = "DELETE FROM greptime_metakv WHERE k LIKE $1 RETURNING k,v;"; @@ -65,7 +65,7 @@ const RANGE_DELETE_LEFT_BOUNDED: &str = "DELETE FROM greptime_metakv WHERE k >= const RANGE_DELETE_FULL_RANGE: &str = "DELETE FROM greptime_metakv WHERE k >= $1 AND K < $2 RETURNING k,v;"; -const CAS: &str = r#" +pub const CAS: &str = r#" WITH prev AS ( SELECT k,v FROM greptime_metakv WHERE k = $1 AND v = $2 ), update AS ( @@ -79,7 +79,7 @@ WHERE SELECT k, v FROM prev; "#; -const PUT_IF_NOT_EXISTS: &str = r#" +pub const PUT_IF_NOT_EXISTS: &str = r#" WITH prev AS ( select k,v from greptime_metakv where k = $1 ), insert AS ( diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 85770e1f3d4d..4d12caa41fec 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -46,6 +46,8 @@ use tonic::transport::server::{Router, TcpIncoming}; use crate::election::etcd::EtcdElection; #[cfg(feature = "pg_kvbackend")] +use crate::election::postgres::PgElection; +#[cfg(feature = "pg_kvbackend")] use crate::error::InvalidArgumentsSnafu; use crate::error::{InitExportMetricsTaskSnafu, TomlFormatSnafu}; use crate::metasrv::builder::MetasrvBuilder; @@ -224,9 +226,17 @@ pub async fn metasrv_builder( #[cfg(feature = "pg_kvbackend")] (None, BackendImpl::PostgresStore) => { let pg_client = create_postgres_client(opts).await?; - let kv_backend = PgStore::with_pg_client(pg_client).await.unwrap(); - // TODO(jeremy, weny): implement election for postgres - (kv_backend, None) + let kv_backend = PgStore::with_pg_client(pg_client) + .await + .context(error::KvBackendSnafu)?; + let election_client = create_postgres_client(opts).await?; + let election = PgElection::with_pg_client( + opts.server_addr.clone(), + election_client, + opts.store_key_prefix.clone(), + ) + .await?; + (kv_backend, Some(election)) } }; diff --git a/src/meta-srv/src/election.rs b/src/meta-srv/src/election.rs index d73f453a88f0..4db7814a2846 100644 --- a/src/meta-srv/src/election.rs +++ b/src/meta-srv/src/election.rs @@ -13,11 +13,12 @@ // limitations under the License. pub mod etcd; +#[cfg(feature = "pg_kvbackend")] +pub mod postgres; -use std::fmt; +use std::fmt::{self, Debug}; use std::sync::Arc; -use etcd_client::LeaderKey; use tokio::sync::broadcast::Receiver; use crate::error::Result; @@ -26,10 +27,20 @@ use crate::metasrv::MetasrvNodeInfo; pub const ELECTION_KEY: &str = "__metasrv_election"; pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/"; +const CANDIDATE_LEASE_SECS: u64 = 600; +const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2; + #[derive(Debug, Clone)] pub enum LeaderChangeMessage { - Elected(Arc), - StepDown(Arc), + Elected(Arc), + StepDown(Arc), +} + +pub trait LeaderKey: Send + Sync + Debug { + fn name(&self) -> &[u8]; + fn key(&self) -> &[u8]; + fn rev(&self) -> i64; + fn lease(&self) -> i64; } impl fmt::Display for LeaderChangeMessage { diff --git a/src/meta-srv/src/election/etcd.rs b/src/meta-srv/src/election/etcd.rs index fef7e928a783..274a6a7e4792 100644 --- a/src/meta-srv/src/election/etcd.rs +++ b/src/meta-srv/src/election/etcd.rs @@ -18,18 +18,41 @@ use std::time::Duration; use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS}; use common_telemetry::{error, info, warn}; -use etcd_client::{Client, GetOptions, LeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions}; +use etcd_client::{ + Client, GetOptions, LeaderKey as EtcdLeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions, +}; use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; use tokio::sync::broadcast::Receiver; use tokio::time::{timeout, MissedTickBehavior}; -use crate::election::{Election, LeaderChangeMessage, CANDIDATES_ROOT, ELECTION_KEY}; +use crate::election::{ + Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, CANDIDATE_LEASE_SECS, ELECTION_KEY, + KEEP_ALIVE_INTERVAL_SECS, +}; use crate::error; use crate::error::Result; use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; +impl LeaderKey for EtcdLeaderKey { + fn name(&self) -> &[u8] { + self.name() + } + + fn key(&self) -> &[u8] { + self.key() + } + + fn rev(&self) -> i64 { + self.rev() + } + + fn lease(&self) -> i64 { + self.lease() + } +} + pub struct EtcdElection { leader_value: String, client: Client, @@ -75,14 +98,14 @@ impl EtcdElection { LeaderChangeMessage::Elected(key) => { info!( "[{leader_ident}] is elected as leader: {:?}, lease: {}", - key.name_str(), + String::from_utf8_lossy(key.name()), key.lease() ); } LeaderChangeMessage::StepDown(key) => { warn!( "[{leader_ident}] is stepping down: {:?}, lease: {}", - key.name_str(), + String::from_utf8_lossy(key.name()), key.lease() ); } @@ -133,9 +156,6 @@ impl Election for EtcdElection { } async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> { - const CANDIDATE_LEASE_SECS: u64 = 600; - const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2; - let mut lease_client = self.client.lease_client(); let res = lease_client .grant(CANDIDATE_LEASE_SECS as i64, None) @@ -239,7 +259,7 @@ impl Election for EtcdElection { // The keep alive operation MUST be done in `META_KEEP_ALIVE_INTERVAL_SECS`. match timeout( keep_lease_duration, - self.keep_alive(&mut keeper, &mut receiver, leader), + self.keep_alive(&mut keeper, &mut receiver, leader.clone()), ) .await { @@ -262,12 +282,9 @@ impl Election for EtcdElection { .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) .is_ok() { - if let Err(e) = self - .leader_watcher + self.leader_watcher .send(LeaderChangeMessage::StepDown(Arc::new(leader.clone()))) - { - error!(e; "Failed to send leader change message"); - } + .context(error::SendLeaderChangeSnafu)?; } } @@ -303,7 +320,7 @@ impl EtcdElection { &self, keeper: &mut LeaseKeeper, receiver: &mut LeaseKeepAliveStream, - leader: &LeaderKey, + leader: EtcdLeaderKey, ) -> Result<()> { keeper.keep_alive().await.context(error::EtcdFailedSnafu)?; if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? { @@ -322,12 +339,9 @@ impl EtcdElection { { self.infancy.store(true, Ordering::Relaxed); - if let Err(e) = self - .leader_watcher - .send(LeaderChangeMessage::Elected(Arc::new(leader.clone()))) - { - error!(e; "Failed to send leader change message"); - } + self.leader_watcher + .send(LeaderChangeMessage::Elected(Arc::new(leader))) + .context(error::SendLeaderChangeSnafu)?; } } diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs new file mode 100644 index 000000000000..c1150e30e3f1 --- /dev/null +++ b/src/meta-srv/src/election/postgres.rs @@ -0,0 +1,301 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::{self, Duration}; + +use common_meta::kv_backend::postgres::{ + CAS, POINT_DELETE, POINT_GET, PREFIX_SCAN, PUT_IF_NOT_EXISTS, +}; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, ResultExt}; +use tokio::sync::broadcast; +use tokio_postgres::Client; + +use crate::election::{ + Election, LeaderChangeMessage, CANDIDATES_ROOT, CANDIDATE_LEASE_SECS, ELECTION_KEY, + KEEP_ALIVE_INTERVAL_SECS, +}; +use crate::error::{ + DeserializeFromJsonSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu, UnexpectedSnafu, +}; +use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; + +/// Value with a expire time. The expire time is in seconds since UNIX epoch. +#[derive(Debug, Serialize, Deserialize, Default)] +struct ValueWithLease { + value: String, + expire_time: f64, +} + +/// PostgreSql implementation of Election. +pub struct PgElection { + leader_value: String, + client: Client, + is_leader: AtomicBool, + infancy: AtomicBool, + leader_watcher: broadcast::Sender, + store_key_prefix: String, +} + +impl PgElection { + pub async fn with_pg_client( + leader_value: String, + client: Client, + store_key_prefix: String, + ) -> Result { + let (tx, _) = broadcast::channel(100); + Ok(Arc::new(Self { + leader_value, + client, + is_leader: AtomicBool::new(false), + infancy: AtomicBool::new(true), + leader_watcher: tx, + store_key_prefix, + })) + } + + fn _election_key(&self) -> String { + format!("{}{}", self.store_key_prefix, ELECTION_KEY) + } + + fn candidate_root(&self) -> String { + format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT) + } + + fn candidate_key(&self) -> String { + format!("{}{}", self.candidate_root(), self.leader_value) + } +} + +#[async_trait::async_trait] +impl Election for PgElection { + type Leader = LeaderValue; + + fn subscribe_leader_change(&self) -> broadcast::Receiver { + self.leader_watcher.subscribe() + } + + fn is_leader(&self) -> bool { + self.is_leader.load(Ordering::Relaxed) + } + + fn in_infancy(&self) -> bool { + self.infancy + .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + } + + async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> { + let key = self.candidate_key().into_bytes(); + let node_info = + serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu { + input: format!("{node_info:?}"), + })?; + let value_with_lease = ValueWithLease { + value: node_info, + expire_time: time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64() + + CANDIDATE_LEASE_SECS as f64, + }; + let res = self.put_value_with_lease(&key, &value_with_lease).await?; + // May registered before, check if the lease expired. If so, delete and re-register. + if !res { + let prev = self.get_value_with_lease(&key).await?.unwrap_or_default(); + if prev.expire_time + <= time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64() + { + self.delete_value(&key).await?; + self.put_value_with_lease(&key, &value_with_lease).await?; + } + } + + // Check if the current lease has expired and renew the lease. + let mut keep_alive_interval = + tokio::time::interval(Duration::from_secs(KEEP_ALIVE_INTERVAL_SECS)); + loop { + let _ = keep_alive_interval.tick().await; + + let prev = self.get_value_with_lease(&key).await?.unwrap_or_default(); + let now = time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64(); + + ensure!( + prev.expire_time > now, + UnexpectedSnafu { + violated: format!( + "Candidate lease expired, key: {:?}", + String::from_utf8_lossy(&key) + ), + } + ); + + let updated = ValueWithLease { + value: prev.value.clone(), + expire_time: now + CANDIDATE_LEASE_SECS as f64, + }; + self.update_value_with_lease(&key, &prev, &updated).await?; + } + } + + async fn all_candidates(&self) -> Result> { + let key_prefix = self.candidate_root().into_bytes(); + let mut candidates = self.get_value_with_lease_by_prefix(&key_prefix).await?; + let now = time::SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64(); + // Remove expired candidates + candidates.retain(|c| c.expire_time > now); + let mut valid_candidates = Vec::with_capacity(candidates.len()); + for c in candidates { + let node_info: MetasrvNodeInfo = + serde_json::from_str(&c.value).with_context(|_| DeserializeFromJsonSnafu { + input: format!("{:?}", c.value), + })?; + valid_candidates.push(node_info); + } + Ok(valid_candidates) + } + + async fn campaign(&self) -> Result<()> { + todo!() + } + + async fn leader(&self) -> Result { + todo!() + } + + async fn resign(&self) -> Result<()> { + todo!() + } +} + +impl PgElection { + async fn get_value_with_lease(&self, key: &Vec) -> Result> { + let prev = self + .client + .query(POINT_GET, &[&key]) + .await + .context(PostgresExecutionSnafu)?; + + if let Some(row) = prev.first() { + let value: String = row.get(0); + let value_with_lease: ValueWithLease = + serde_json::from_str(&value).with_context(|_| DeserializeFromJsonSnafu { + input: format!("{value:?}"), + })?; + Ok(Some(value_with_lease)) + } else { + Ok(None) + } + } + + async fn get_value_with_lease_by_prefix( + &self, + key_prefix: &Vec, + ) -> Result> { + let prev = self + .client + .query(PREFIX_SCAN, &[key_prefix]) + .await + .context(PostgresExecutionSnafu)?; + + let mut res = Vec::new(); + for row in prev { + let value: String = row.get(0); + let value_with_lease: ValueWithLease = + serde_json::from_str(&value).with_context(|_| DeserializeFromJsonSnafu { + input: format!("{value:?}"), + })?; + res.push(value_with_lease); + } + + Ok(res) + } + + async fn update_value_with_lease( + &self, + key: &Vec, + prev: &ValueWithLease, + updated: &ValueWithLease, + ) -> Result<()> { + let prev = serde_json::to_string(prev) + .with_context(|_| SerializeToJsonSnafu { + input: format!("{prev:?}"), + })? + .into_bytes(); + + let updated = serde_json::to_string(updated) + .with_context(|_| SerializeToJsonSnafu { + input: format!("{updated:?}"), + })? + .into_bytes(); + + let res = self + .client + .query(CAS, &[key, &prev, &updated]) + .await + .context(PostgresExecutionSnafu)?; + + // CAS operation will return the updated value if the operation is successful + match res.is_empty() { + false => Ok(()), + true => UnexpectedSnafu { + violated: format!( + "Failed to update value from key: {:?}", + String::from_utf8_lossy(key) + ), + } + .fail(), + } + } + + /// Returns `true` if the insertion is successful + async fn put_value_with_lease(&self, key: &Vec, value: &ValueWithLease) -> Result { + let value = serde_json::to_string(value) + .with_context(|_| SerializeToJsonSnafu { + input: format!("{value:?}"), + })? + .into_bytes(); + + let res = self + .client + .query(PUT_IF_NOT_EXISTS, &[key, &value]) + .await + .context(PostgresExecutionSnafu)?; + + Ok(res.is_empty()) + } + + /// Returns `true` if the deletion is successful. + /// Caution: Should only delete the key if the lease is expired. + async fn delete_value(&self, key: &Vec) -> Result { + let res = self + .client + .query(POINT_DELETE, &[key]) + .await + .context(PostgresExecutionSnafu)?; + + Ok(res.len() == 1) + } +} diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 1c529f06d606..60f04eccff8a 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -24,6 +24,7 @@ use table::metadata::TableId; use tokio::sync::mpsc::error::SendError; use tonic::codegen::http; +use crate::election::LeaderChangeMessage; use crate::metasrv::SelectTarget; use crate::pubsub::Message; @@ -697,6 +698,8 @@ pub enum Error { #[cfg(feature = "pg_kvbackend")] #[snafu(display("Failed to execute via postgres"))] PostgresExecution { + #[snafu(source)] + error: tokio_postgres::Error, #[snafu(implicit)] location: Location, }, @@ -717,6 +720,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to send leader change message"))] + SendLeaderChange { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: tokio::sync::broadcast::error::SendError, + }, + #[snafu(display("Flow state handler error"))] FlowStateHandler { #[snafu(implicit)] @@ -765,6 +776,7 @@ impl ErrorExt for Error { | Error::StartGrpc { .. } | Error::NoEnoughAvailableNode { .. } | Error::PublishMessage { .. } + | Error::SendLeaderChange { .. } | Error::Join { .. } | Error::PeerUnavailable { .. } | Error::ExceededDeadline { .. }