Skip to content

Commit

Permalink
feat: etcd election componet for pg_kv_backend
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Dec 9, 2024
1 parent 903da8f commit 4533155
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 46 deletions.
2 changes: 1 addition & 1 deletion src/cmd/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ impl StartCommand {
.await
.context(StartMetaServerSnafu)?;

let builder = meta_srv::bootstrap::metasrv_builder(&opts, plugins.clone(), None)
let builder = meta_srv::bootstrap::metasrv_builder(&opts, plugins.clone())
.await
.context(error::BuildMetaServerSnafu)?;
let metasrv = builder.build().await.context(error::BuildMetaServerSnafu)?;
Expand Down
97 changes: 53 additions & 44 deletions src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use crate::election::etcd::EtcdElection;
use crate::error::InvalidArgumentsSnafu;
use crate::error::{InitExportMetricsTaskSnafu, TomlFormatSnafu};
use crate::metasrv::builder::MetasrvBuilder;
use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectorRef};
use crate::metasrv::{BackendImpl, ElectionRef, Metasrv, MetasrvOptions, SelectorRef};
use crate::selector::lease_based::LeaseBasedSelector;
use crate::selector::load_based::LoadBasedSelector;
use crate::selector::round_robin::RoundRobinSelector;
Expand Down Expand Up @@ -201,45 +201,19 @@ pub fn router(metasrv: Arc<Metasrv>) -> Router {
router.add_service(admin::make_admin_service(metasrv))
}

pub async fn metasrv_builder(
opts: &MetasrvOptions,
plugins: Plugins,
kv_backend: Option<KvBackendRef>,
) -> Result<MetasrvBuilder> {
let (kv_backend, election) = match (kv_backend, &opts.backend) {
(Some(kv_backend), _) => (kv_backend, None),
(None, BackendImpl::MemoryStore) => (Arc::new(MemoryKvBackend::new()) as _, None),
(None, BackendImpl::EtcdStore) => {
let etcd_client = create_etcd_client(opts).await?;
let kv_backend = {
let etcd_backend =
EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops);
if !opts.store_key_prefix.is_empty() {
Arc::new(ChrootKvBackend::new(
opts.store_key_prefix.clone().into_bytes(),
etcd_backend,
))
} else {
etcd_backend
}
};
(
kv_backend,
Some(
EtcdElection::with_etcd_client(
&opts.server_addr,
etcd_client.clone(),
opts.store_key_prefix.clone(),
)
.await?,
),
)
pub async fn metasrv_builder(opts: &MetasrvOptions, plugins: Plugins) -> Result<MetasrvBuilder> {
let (kv_backend, election) = match &opts.backend {
BackendImpl::MemoryStore => (Arc::new(MemoryKvBackend::new()) as _, None), // Only for test
BackendImpl::EtcdStore => {
let kv_backend = create_etcd_backend(opts).await?;
let election_client = create_etcd_election_client(opts).await?;
(kv_backend, Some(election_client))
}
#[cfg(feature = "pg_kvbackend")]
(None, BackendImpl::PostgresStore) => {
let pg_client = create_postgres_client(opts).await?;
let kv_backend = PgStore::with_pg_client(pg_client).await.unwrap();
(kv_backend, None)
BackendImpl::PostgresStore => {
let kv_backend = create_postgres_backend(opts).await?;
let election_client = create_etcd_election_client(opts).await?;
(kv_backend, Some(election_client))
}
};

Expand All @@ -260,25 +234,60 @@ pub async fn metasrv_builder(
.plugins(plugins))
}

async fn create_etcd_client(opts: &MetasrvOptions) -> Result<Client> {
async fn create_etcd_election_client(opts: &MetasrvOptions) -> Result<ElectionRef> {
let election_endpoints = if !opts.election_addrs.is_empty() {
&opts.election_addrs
} else {
&opts.store_addrs
}
.iter()
.map(|x| x.trim())
.filter(|x| !x.is_empty())
.collect::<Vec<_>>();
let etcd_client = Client::connect(&election_endpoints, None)
.await
.context(error::ConnectEtcdSnafu)?;
EtcdElection::with_etcd_client(
&opts.server_addr,
etcd_client,
opts.store_key_prefix.clone(),
)
.await
}

async fn create_etcd_backend(opts: &MetasrvOptions) -> Result<KvBackendRef> {
let etcd_endpoints = opts
.store_addrs
.iter()
.map(|x| x.trim())
.filter(|x| !x.is_empty())
.collect::<Vec<_>>();
Client::connect(&etcd_endpoints, None)
let etcd_client = Client::connect(&etcd_endpoints, None)
.await
.context(error::ConnectEtcdSnafu)
.context(error::ConnectEtcdSnafu)?;
let backend = EtcdStore::with_etcd_client(etcd_client, opts.max_txn_ops);
Ok(chroot_kv_backend(opts, backend))
}

#[cfg(feature = "pg_kvbackend")]
async fn create_postgres_client(opts: &MetasrvOptions) -> Result<tokio_postgres::Client> {
async fn create_postgres_backend(opts: &MetasrvOptions) -> Result<KvBackendRef> {
let postgres_url = opts.store_addrs.first().context(InvalidArgumentsSnafu {
err_msg: "empty store addrs",
})?;
let (client, _) = tokio_postgres::connect(postgres_url, NoTls)
let (pg_client, _) = tokio_postgres::connect(postgres_url, NoTls)
.await
.context(error::ConnectPostgresSnafu)?;
Ok(client)
let backend = PgStore::with_pg_client(pg_client).await.unwrap();
Ok(chroot_kv_backend(opts, backend))
}

fn chroot_kv_backend(opts: &MetasrvOptions, backend: KvBackendRef) -> KvBackendRef {
if opts.store_key_prefix.is_empty() {
backend
} else {
Arc::new(ChrootKvBackend::new(
opts.store_key_prefix.clone().into_bytes(),
backend,
))
}
}
11 changes: 10 additions & 1 deletion src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,12 @@ pub struct MetasrvOptions {
pub bind_addr: String,
/// The address the server advertises to the clients.
pub server_addr: String,
/// The address of the store, e.g., etcd.
/// The addresses of the metadata store, e.g., etcd.
pub store_addrs: Vec<String>,
/// The addresses for the election service, which is used to elect the leader for Metasrvs.
/// If not specified, Metasrv will use the `store_addrs` as election service addresses.
/// For example, etcd can serve as both the metadata store and election service.
pub election_addrs: Vec<String>,
/// The type of selector.
pub selector: SelectorType,
/// Whether to use the memory store.
Expand Down Expand Up @@ -147,6 +151,7 @@ impl Default for MetasrvOptions {
bind_addr: "127.0.0.1:3002".to_string(),
server_addr: "127.0.0.1:3002".to_string(),
store_addrs: vec!["127.0.0.1:2379".to_string()],
election_addrs: vec!["127.0.0.1:2379".to_string()],
selector: SelectorType::default(),
use_memory_store: false,
enable_region_failover: false,
Expand Down Expand Up @@ -470,6 +475,10 @@ impl Metasrv {
});
}
} else {
warn!(
"Ensure only one instance of metasrv is running, as there is no election service."
);

if let Err(e) = self.wal_options_allocator.start().await {
error!(e; "Failed to start wal options allocator");
}
Expand Down

0 comments on commit 4533155

Please sign in to comment.