From 4533155c40bf4c731158e87f231420098b3a49f7 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Mon, 9 Dec 2024 18:19:13 +0800 Subject: [PATCH] feat: etcd election componet for pg_kv_backend --- src/cmd/src/metasrv.rs | 2 +- src/meta-srv/src/bootstrap.rs | 97 +++++++++++++++++++---------------- src/meta-srv/src/metasrv.rs | 11 +++- 3 files changed, 64 insertions(+), 46 deletions(-) diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index b1167903671a..b2898813f95e 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -280,7 +280,7 @@ impl StartCommand { .await .context(StartMetaServerSnafu)?; - let builder = meta_srv::bootstrap::metasrv_builder(&opts, plugins.clone(), None) + let builder = meta_srv::bootstrap::metasrv_builder(&opts, plugins.clone()) .await .context(error::BuildMetaServerSnafu)?; let metasrv = builder.build().await.context(error::BuildMetaServerSnafu)?; diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 47afa0ab416b..8dd8027c1064 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -49,7 +49,7 @@ use crate::election::etcd::EtcdElection; use crate::error::InvalidArgumentsSnafu; use crate::error::{InitExportMetricsTaskSnafu, TomlFormatSnafu}; use crate::metasrv::builder::MetasrvBuilder; -use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectorRef}; +use crate::metasrv::{BackendImpl, ElectionRef, Metasrv, MetasrvOptions, SelectorRef}; use crate::selector::lease_based::LeaseBasedSelector; use crate::selector::load_based::LoadBasedSelector; use crate::selector::round_robin::RoundRobinSelector; @@ -201,45 +201,19 @@ pub fn router(metasrv: Arc) -> Router { router.add_service(admin::make_admin_service(metasrv)) } -pub async fn metasrv_builder( - opts: &MetasrvOptions, - plugins: Plugins, - kv_backend: Option, -) -> Result { - let (kv_backend, election) = match (kv_backend, &opts.backend) { - (Some(kv_backend), _) => (kv_backend, None), - (None, BackendImpl::MemoryStore) => (Arc::new(MemoryKvBackend::new()) as _, None), - (None, BackendImpl::EtcdStore) => { - let etcd_client = create_etcd_client(opts).await?; - let kv_backend = { - let etcd_backend = - EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops); - if !opts.store_key_prefix.is_empty() { - Arc::new(ChrootKvBackend::new( - opts.store_key_prefix.clone().into_bytes(), - etcd_backend, - )) - } else { - etcd_backend - } - }; - ( - kv_backend, - Some( - EtcdElection::with_etcd_client( - &opts.server_addr, - etcd_client.clone(), - opts.store_key_prefix.clone(), - ) - .await?, - ), - ) +pub async fn metasrv_builder(opts: &MetasrvOptions, plugins: Plugins) -> Result { + let (kv_backend, election) = match &opts.backend { + BackendImpl::MemoryStore => (Arc::new(MemoryKvBackend::new()) as _, None), // Only for test + BackendImpl::EtcdStore => { + let kv_backend = create_etcd_backend(opts).await?; + let election_client = create_etcd_election_client(opts).await?; + (kv_backend, Some(election_client)) } #[cfg(feature = "pg_kvbackend")] - (None, BackendImpl::PostgresStore) => { - let pg_client = create_postgres_client(opts).await?; - let kv_backend = PgStore::with_pg_client(pg_client).await.unwrap(); - (kv_backend, None) + BackendImpl::PostgresStore => { + let kv_backend = create_postgres_backend(opts).await?; + let election_client = create_etcd_election_client(opts).await?; + (kv_backend, Some(election_client)) } }; @@ -260,25 +234,60 @@ pub async fn metasrv_builder( .plugins(plugins)) } -async fn create_etcd_client(opts: &MetasrvOptions) -> Result { +async fn create_etcd_election_client(opts: &MetasrvOptions) -> Result { + let election_endpoints = if !opts.election_addrs.is_empty() { + &opts.election_addrs + } else { + &opts.store_addrs + } + .iter() + .map(|x| x.trim()) + .filter(|x| !x.is_empty()) + .collect::>(); + let etcd_client = Client::connect(&election_endpoints, None) + .await + .context(error::ConnectEtcdSnafu)?; + EtcdElection::with_etcd_client( + &opts.server_addr, + etcd_client, + opts.store_key_prefix.clone(), + ) + .await +} + +async fn create_etcd_backend(opts: &MetasrvOptions) -> Result { let etcd_endpoints = opts .store_addrs .iter() .map(|x| x.trim()) .filter(|x| !x.is_empty()) .collect::>(); - Client::connect(&etcd_endpoints, None) + let etcd_client = Client::connect(&etcd_endpoints, None) .await - .context(error::ConnectEtcdSnafu) + .context(error::ConnectEtcdSnafu)?; + let backend = EtcdStore::with_etcd_client(etcd_client, opts.max_txn_ops); + Ok(chroot_kv_backend(opts, backend)) } #[cfg(feature = "pg_kvbackend")] -async fn create_postgres_client(opts: &MetasrvOptions) -> Result { +async fn create_postgres_backend(opts: &MetasrvOptions) -> Result { let postgres_url = opts.store_addrs.first().context(InvalidArgumentsSnafu { err_msg: "empty store addrs", })?; - let (client, _) = tokio_postgres::connect(postgres_url, NoTls) + let (pg_client, _) = tokio_postgres::connect(postgres_url, NoTls) .await .context(error::ConnectPostgresSnafu)?; - Ok(client) + let backend = PgStore::with_pg_client(pg_client).await.unwrap(); + Ok(chroot_kv_backend(opts, backend)) +} + +fn chroot_kv_backend(opts: &MetasrvOptions, backend: KvBackendRef) -> KvBackendRef { + if opts.store_key_prefix.is_empty() { + backend + } else { + Arc::new(ChrootKvBackend::new( + opts.store_key_prefix.clone().into_bytes(), + backend, + )) + } } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 716b85f83485..0c48a2b5ff0b 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -91,8 +91,12 @@ pub struct MetasrvOptions { pub bind_addr: String, /// The address the server advertises to the clients. pub server_addr: String, - /// The address of the store, e.g., etcd. + /// The addresses of the metadata store, e.g., etcd. pub store_addrs: Vec, + /// The addresses for the election service, which is used to elect the leader for Metasrvs. + /// If not specified, Metasrv will use the `store_addrs` as election service addresses. + /// For example, etcd can serve as both the metadata store and election service. + pub election_addrs: Vec, /// The type of selector. pub selector: SelectorType, /// Whether to use the memory store. @@ -147,6 +151,7 @@ impl Default for MetasrvOptions { bind_addr: "127.0.0.1:3002".to_string(), server_addr: "127.0.0.1:3002".to_string(), store_addrs: vec!["127.0.0.1:2379".to_string()], + election_addrs: vec!["127.0.0.1:2379".to_string()], selector: SelectorType::default(), use_memory_store: false, enable_region_failover: false, @@ -470,6 +475,10 @@ impl Metasrv { }); } } else { + warn!( + "Ensure only one instance of metasrv is running, as there is no election service." + ); + if let Err(e) = self.wal_options_allocator.start().await { error!(e; "Failed to start wal options allocator"); }