Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: init PgElection with candidate registration #5209

Merged
merged 23 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/develop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ jobs:
working-directory: tests-integration/fixtures/postgres
run: docker compose -f docker-compose-standalone.yml up -d --wait
- name: Run nextest cases
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F pyo3_backend -F dashboard
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F pyo3_backend -F dashboard -F pg_kvbackend
env:
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=lld"
RUST_BACKTRACE: 1
Expand Down
7 changes: 6 additions & 1 deletion src/common/meta/src/kv_backend/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::any::Any;
use std::borrow::Cow;
use std::sync::Arc;

use common_telemetry::error;
use snafu::ResultExt;
use tokio_postgres::types::ToSql;
use tokio_postgres::{Client, NoTls};
Expand Down Expand Up @@ -97,7 +98,11 @@ impl PgStore {
let (client, conn) = tokio_postgres::connect(url, NoTls)
.await
.context(ConnectPostgresSnafu)?;
tokio::spawn(async move { conn.await.context(ConnectPostgresSnafu) });
tokio::spawn(async move {
if let Err(e) = conn.await {
error!(e; "connection error");
}
});
Self::with_pg_client(client).await
}

Expand Down
5 changes: 3 additions & 2 deletions src/meta-srv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ license.workspace = true

[features]
mock = []
pg_kvbackend = ["dep:tokio-postgres"]
pg_kvbackend = ["dep:tokio-postgres", "common-meta/pg_kvbackend"]

[lints]
workspace = true

[dependencies]
api.workspace = true
async-trait = "0.1"
chrono.workspace = true
clap.workspace = true
client.workspace = true
common-base.workspace = true
Expand Down Expand Up @@ -55,7 +56,7 @@ snafu.workspace = true
store-api.workspace = true
table.workspace = true
tokio.workspace = true
tokio-postgres = { workspace = true, optional = true }
tokio-postgres = { workspace = true, optional = true, features = ["with-chrono-0_4"] }
tokio-stream = { workspace = true, features = ["net"] }
toml.workspace = true
tonic.workspace = true
Expand Down
15 changes: 12 additions & 3 deletions src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use common_meta::kv_backend::memory::MemoryKvBackend;
#[cfg(feature = "pg_kvbackend")]
use common_meta::kv_backend::postgres::PgStore;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
#[cfg(feature = "pg_kvbackend")]
use common_telemetry::error;
use common_telemetry::info;
use etcd_client::Client;
use futures::future;
Expand Down Expand Up @@ -224,8 +226,9 @@ pub async fn metasrv_builder(
#[cfg(feature = "pg_kvbackend")]
(None, BackendImpl::PostgresStore) => {
let pg_client = create_postgres_client(opts).await?;
let kv_backend = PgStore::with_pg_client(pg_client).await.unwrap();
// TODO(jeremy, weny): implement election for postgres
let kv_backend = PgStore::with_pg_client(pg_client)
.await
.context(error::KvBackendSnafu)?;
(kv_backend, None)
}
};
Expand Down Expand Up @@ -275,8 +278,14 @@ async fn create_postgres_client(opts: &MetasrvOptions) -> Result<tokio_postgres:
let postgres_url = opts.store_addrs.first().context(InvalidArgumentsSnafu {
err_msg: "empty store addrs",
})?;
let (client, _) = tokio_postgres::connect(postgres_url, NoTls)
let (client, connection) = tokio_postgres::connect(postgres_url, NoTls)
.await
.context(error::ConnectPostgresSnafu)?;

tokio::spawn(async move {
if let Err(e) = connection.await {
error!(e; "connection error");
}
});
fengjiachun marked this conversation as resolved.
Show resolved Hide resolved
Ok(client)
}
36 changes: 29 additions & 7 deletions src/meta-srv/src/election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
// limitations under the License.

pub mod etcd;
#[cfg(feature = "pg_kvbackend")]
pub mod postgres;

use std::fmt;
use std::fmt::{self, Debug};
use std::sync::Arc;

use etcd_client::LeaderKey;
use tokio::sync::broadcast::Receiver;

use crate::error::Result;
Expand All @@ -26,10 +27,31 @@ use crate::metasrv::MetasrvNodeInfo;
pub const ELECTION_KEY: &str = "__metasrv_election";
pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";

pub(crate) const CANDIDATE_LEASE_SECS: u64 = 600;
const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2;

/// Messages sent when the leader changes.
#[derive(Debug, Clone)]
pub enum LeaderChangeMessage {
Elected(Arc<LeaderKey>),
StepDown(Arc<LeaderKey>),
Elected(Arc<dyn LeaderKey>),
StepDown(Arc<dyn LeaderKey>),
}

/// LeaderKey is a key that represents the leader of metasrv.
/// The structure is corresponding to [etcd_client::LeaderKey].
pub trait LeaderKey: Send + Sync + Debug {
CookiePieWw marked this conversation as resolved.
Show resolved Hide resolved
/// The name in byte. name is the election identifier that corresponds to the leadership key.
fn name(&self) -> &[u8];

/// The key in byte. key is an opaque key representing the ownership of the election. If the key
/// is deleted, then leadership is lost.
fn key(&self) -> &[u8];

/// The creation revision of the key.
fn revision(&self) -> i64;

/// The lease ID of the election leader.
fn lease_id(&self) -> i64;
}

impl fmt::Display for LeaderChangeMessage {
Expand All @@ -47,8 +69,8 @@ impl fmt::Display for LeaderChangeMessage {
write!(f, "LeaderKey {{ ")?;
write!(f, "name: {}", String::from_utf8_lossy(leader_key.name()))?;
write!(f, ", key: {}", String::from_utf8_lossy(leader_key.key()))?;
write!(f, ", rev: {}", leader_key.rev())?;
write!(f, ", lease: {}", leader_key.lease())?;
write!(f, ", rev: {}", leader_key.revision())?;
write!(f, ", lease: {}", leader_key.lease_id())?;
write!(f, " }})")
}
}
Expand All @@ -65,7 +87,7 @@ pub trait Election: Send + Sync {
/// initialization operations can be performed.
///
/// note: a new leader will only return true on the first call.
fn in_infancy(&self) -> bool;
fn in_leader_infancy(&self) -> bool;

/// Registers a candidate for the election.
async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()>;
Expand Down
46 changes: 33 additions & 13 deletions src/meta-srv/src/election/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,41 @@ use std::time::Duration;

use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
use common_telemetry::{error, info, warn};
use etcd_client::{Client, GetOptions, LeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions};
use etcd_client::{
Client, GetOptions, LeaderKey as EtcdLeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions,
};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::Receiver;
use tokio::time::{timeout, MissedTickBehavior};

use crate::election::{Election, LeaderChangeMessage, CANDIDATES_ROOT, ELECTION_KEY};
use crate::election::{
Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, CANDIDATE_LEASE_SECS, ELECTION_KEY,
KEEP_ALIVE_INTERVAL_SECS,
};
use crate::error;
use crate::error::Result;
use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};

impl LeaderKey for EtcdLeaderKey {
fn name(&self) -> &[u8] {
self.name()
}

fn key(&self) -> &[u8] {
self.key()
}

fn revision(&self) -> i64 {
self.rev()
}

fn lease_id(&self) -> i64 {
self.lease()
}
}

pub struct EtcdElection {
leader_value: String,
client: Client,
Expand Down Expand Up @@ -75,15 +98,15 @@ impl EtcdElection {
LeaderChangeMessage::Elected(key) => {
info!(
"[{leader_ident}] is elected as leader: {:?}, lease: {}",
key.name_str(),
key.lease()
String::from_utf8_lossy(key.name()),
key.lease_id()
);
}
LeaderChangeMessage::StepDown(key) => {
warn!(
"[{leader_ident}] is stepping down: {:?}, lease: {}",
key.name_str(),
key.lease()
String::from_utf8_lossy(key.name()),
key.lease_id()
);
}
},
Expand Down Expand Up @@ -126,16 +149,13 @@ impl Election for EtcdElection {
self.is_leader.load(Ordering::Relaxed)
}

fn in_infancy(&self) -> bool {
fn in_leader_infancy(&self) -> bool {
self.infancy
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
}

async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
const CANDIDATE_LEASE_SECS: u64 = 600;
const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2;

let mut lease_client = self.client.lease_client();
let res = lease_client
.grant(CANDIDATE_LEASE_SECS as i64, None)
Expand Down Expand Up @@ -239,7 +259,7 @@ impl Election for EtcdElection {
// The keep alive operation MUST be done in `META_KEEP_ALIVE_INTERVAL_SECS`.
match timeout(
keep_lease_duration,
self.keep_alive(&mut keeper, &mut receiver, leader),
self.keep_alive(&mut keeper, &mut receiver, leader.clone()),
)
.await
{
Expand Down Expand Up @@ -303,7 +323,7 @@ impl EtcdElection {
&self,
keeper: &mut LeaseKeeper,
receiver: &mut LeaseKeepAliveStream,
leader: &LeaderKey,
leader: EtcdLeaderKey,
) -> Result<()> {
keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? {
Expand All @@ -324,7 +344,7 @@ impl EtcdElection {

if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::Elected(Arc::new(leader.clone())))
.send(LeaderChangeMessage::Elected(Arc::new(leader)))
{
error!(e; "Failed to send leader change message");
}
Expand Down
Loading
Loading