From 1b0b9add9095a667a2be053e9be52ff132d7b8a9 Mon Sep 17 00:00:00 2001 From: Yohan Wal Date: Thu, 2 Jan 2025 14:33:21 +0800 Subject: [PATCH] feat: use connection pool for pg kv backend in preparation for txn (#5260) * feat: use connection pool * chore: follow review comments * fix: create table before test --- Cargo.lock | 37 +++++++++ Cargo.toml | 2 + src/common/meta/Cargo.toml | 2 + src/common/meta/src/error.rs | 20 +++-- src/common/meta/src/kv_backend/postgres.rs | 95 +++++++++++++++------- src/meta-srv/Cargo.toml | 2 + src/meta-srv/src/bootstrap.rs | 41 +++++++--- src/meta-srv/src/error.rs | 26 +++++- 8 files changed, 174 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6c7e56398e4b..842928528b35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2192,6 +2192,8 @@ dependencies = [ "datafusion-common", "datafusion-expr", "datatypes", + "deadpool", + "deadpool-postgres", "derive_builder 0.12.0", "etcd-client", "futures", @@ -3313,6 +3315,39 @@ dependencies = [ "sqlparser_derive 0.1.1", ] +[[package]] +name = "deadpool" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb84100978c1c7b37f09ed3ce3e5f843af02c2a2c431bae5b19230dad2c1b490" +dependencies = [ + "async-trait", + "deadpool-runtime", + "num_cpus", + "tokio", +] + +[[package]] +name = "deadpool-postgres" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda39fa1cfff190d8924d447ad04fd22772c250438ca5ce1dfb3c80621c05aaa" +dependencies = [ + "deadpool", + "tokio", + "tokio-postgres", + "tracing", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" +dependencies = [ + "tokio", +] + [[package]] name = "debugid" version = "0.8.0" @@ -6531,6 +6566,8 @@ dependencies = [ "common-wal", "dashmap", "datatypes", + "deadpool", + "deadpool-postgres", "derive_builder 0.12.0", "etcd-client", "futures", diff --git a/Cargo.toml b/Cargo.toml index 22dc3e75aaa8..2f1c6b0fb1a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -118,6 +118,8 @@ datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" } datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" } datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" } +deadpool = "0.10" +deadpool-postgres = "0.12" derive_builder = "0.12" dotenv = "0.15" etcd-client = "0.13" diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 647cea839f4a..00da3cacca94 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -35,6 +35,8 @@ common-wal.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true datatypes.workspace = true +deadpool.workspace = true +deadpool-postgres.workspace = true derive_builder.workspace = true etcd-client.workspace = true futures.workspace = true diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index ff77882de807..82b591d139a6 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -667,10 +667,18 @@ pub enum Error { }, #[cfg(feature = "pg_kvbackend")] - #[snafu(display("Failed to connect to Postgres"))] - ConnectPostgres { + #[snafu(display("Failed to create connection pool for Postgres"))] + CreatePostgresPool { #[snafu(source)] - error: tokio_postgres::Error, + error: deadpool_postgres::CreatePoolError, + #[snafu(implicit)] + location: Location, + }, + + #[cfg(feature = "pg_kvbackend")] + #[snafu(display("Failed to get Postgres connection from pool: {}", reason))] + GetPostgresConnection { + reason: String, #[snafu(implicit)] location: Location, }, @@ -786,9 +794,9 @@ impl ErrorExt for Error { | EmptyDdlTasks { .. } => StatusCode::InvalidArguments, #[cfg(feature = "pg_kvbackend")] - PostgresExecution { .. } => StatusCode::Internal, - #[cfg(feature = "pg_kvbackend")] - ConnectPostgres { .. } => StatusCode::Internal, + PostgresExecution { .. } | CreatePostgresPool { .. } | GetPostgresConnection { .. } => { + StatusCode::Internal + } Error::DatanodeTableInfoNotFound { .. } => StatusCode::Internal, } } diff --git a/src/common/meta/src/kv_backend/postgres.rs b/src/common/meta/src/kv_backend/postgres.rs index f67f527871ea..f2416671e229 100644 --- a/src/common/meta/src/kv_backend/postgres.rs +++ b/src/common/meta/src/kv_backend/postgres.rs @@ -16,15 +16,17 @@ use std::any::Any; use std::borrow::Cow; use std::sync::Arc; -use common_telemetry::error; +use deadpool_postgres::{Config, Pool, Runtime}; use snafu::ResultExt; use tokio_postgres::types::ToSql; -use tokio_postgres::{Client, NoTls}; +use tokio_postgres::NoTls; -use super::{KvBackend, TxnService}; -use crate::error::{ConnectPostgresSnafu, Error, PostgresExecutionSnafu, Result, StrFromUtf8Snafu}; +use crate::error::{ + CreatePostgresPoolSnafu, Error, GetPostgresConnectionSnafu, PostgresExecutionSnafu, Result, + StrFromUtf8Snafu, +}; use crate::kv_backend::txn::{Txn as KvTxn, TxnResponse as KvTxnResponse}; -use crate::kv_backend::KvBackendRef; +use crate::kv_backend::{KvBackend, KvBackendRef, TxnService}; use crate::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, @@ -34,8 +36,7 @@ use crate::rpc::KeyValue; /// Posgres backend store for metasrv pub struct PgStore { - // TODO: Consider using sqlx crate. - client: Client, + pool: Pool, } const EMPTY: &[u8] = &[0]; @@ -94,33 +95,49 @@ SELECT k, v FROM prev;"#; impl PgStore { /// Create pgstore impl of KvBackendRef from url. pub async fn with_url(url: &str) -> Result { - // TODO: support tls. - let (client, conn) = tokio_postgres::connect(url, NoTls) - .await - .context(ConnectPostgresSnafu)?; - tokio::spawn(async move { - if let Err(e) = conn.await { - error!(e; "connection error"); - } - }); - Self::with_pg_client(client).await + let mut cfg = Config::new(); + cfg.url = Some(url.to_string()); + let pool = cfg + .create_pool(Some(Runtime::Tokio1), NoTls) + .context(CreatePostgresPoolSnafu)?; + Self::with_pg_pool(pool).await } /// Create pgstore impl of KvBackendRef from tokio-postgres client. - pub async fn with_pg_client(client: Client) -> Result { + pub async fn with_pg_pool(pool: Pool) -> Result { // This step ensures the postgres metadata backend is ready to use. // We check if greptime_metakv table exists, and we will create a new table // if it does not exist. + let client = match pool.get().await { + Ok(client) => client, + Err(e) => { + return GetPostgresConnectionSnafu { + reason: e.to_string(), + } + .fail(); + } + }; client .execute(METADKV_CREATION, &[]) .await .context(PostgresExecutionSnafu)?; - Ok(Arc::new(Self { client })) + Ok(Arc::new(Self { pool })) + } + + async fn get_client(&self) -> Result> { + match self.pool.get().await { + Ok(client) => Ok(client), + Err(e) => GetPostgresConnectionSnafu { + reason: e.to_string(), + } + .fail(), + } } async fn put_if_not_exists(&self, key: &str, value: &str) -> Result { let res = self - .client + .get_client() + .await? .query(PUT_IF_NOT_EXISTS, &[&key, &value]) .await .context(PostgresExecutionSnafu)?; @@ -259,7 +276,8 @@ impl KvBackend for PgStore { }) .collect(); let res = self - .client + .get_client() + .await? .query(&template, ¶ms) .await .context(PostgresExecutionSnafu)?; @@ -327,8 +345,10 @@ impl KvBackend for PgStore { in_params.iter().map(|x| x as &(dyn ToSql + Sync)).collect(); let query = generate_batch_upsert_query(req.kvs.len()); + let res = self - .client + .get_client() + .await? .query(&query, ¶ms) .await .context(PostgresExecutionSnafu)?; @@ -365,8 +385,10 @@ impl KvBackend for PgStore { .iter() .map(|x| x as &(dyn ToSql + Sync)) .collect(); + let res = self - .client + .get_client() + .await? .query(&query, ¶ms) .await .context(PostgresExecutionSnafu)?; @@ -409,7 +431,8 @@ impl KvBackend for PgStore { .collect(); let res = self - .client + .get_client() + .await? .query(template, ¶ms) .await .context(PostgresExecutionSnafu)?; @@ -453,8 +476,10 @@ impl KvBackend for PgStore { .iter() .map(|x| x as &(dyn ToSql + Sync)) .collect(); + let res = self - .client + .get_client() + .await? .query(&query, ¶ms) .await .context(PostgresExecutionSnafu)?; @@ -488,7 +513,8 @@ impl KvBackend for PgStore { let expect = process_bytes(&req.expect, "CASExpect")?; let res = self - .client + .get_client() + .await? .query(CAS, &[&key, &value, &expect]) .await .context(PostgresExecutionSnafu)?; @@ -560,10 +586,19 @@ mod tests { return None; } - let (client, connection) = tokio_postgres::connect(&endpoints, NoTls).await.unwrap(); - tokio::spawn(connection); - let _ = client.execute(METADKV_CREATION, &[]).await; - Some(PgStore { client }) + let mut cfg = Config::new(); + cfg.url = Some(endpoints); + let pool = cfg + .create_pool(Some(Runtime::Tokio1), NoTls) + .context(CreatePostgresPoolSnafu) + .unwrap(); + let client = pool.get().await.unwrap(); + client + .execute(METADKV_CREATION, &[]) + .await + .context(PostgresExecutionSnafu) + .unwrap(); + Some(PgStore { pool }) } #[tokio::test] diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 13975ff95091..8fcc9379e631 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -34,6 +34,8 @@ common-version.workspace = true common-wal.workspace = true dashmap.workspace = true datatypes.workspace = true +deadpool.workspace = true +deadpool-postgres.workspace = true derive_builder.workspace = true etcd-client.workspace = true futures.workspace = true diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 1c504387c7a5..11b29f288506 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -29,6 +29,8 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; #[cfg(feature = "pg_kvbackend")] use common_telemetry::error; use common_telemetry::info; +#[cfg(feature = "pg_kvbackend")] +use deadpool_postgres::{Config, Runtime}; use etcd_client::Client; use futures::future; use servers::configurator::ConfiguratorRef; @@ -51,9 +53,6 @@ use crate::election::etcd::EtcdElection; use crate::election::postgres::PgElection; #[cfg(feature = "pg_kvbackend")] use crate::election::CANDIDATE_LEASE_SECS; -#[cfg(feature = "pg_kvbackend")] -use crate::error::InvalidArgumentsSnafu; -use crate::error::{InitExportMetricsTaskSnafu, TomlFormatSnafu}; use crate::metasrv::builder::MetasrvBuilder; use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectorRef}; use crate::selector::lease_based::LeaseBasedSelector; @@ -86,14 +85,14 @@ impl MetasrvInstance { let httpsrv = Arc::new( HttpServerBuilder::new(opts.http.clone()) .with_metrics_handler(MetricsHandler) - .with_greptime_config_options(opts.to_toml().context(TomlFormatSnafu)?) + .with_greptime_config_options(opts.to_toml().context(error::TomlFormatSnafu)?) .build(), ); let metasrv = Arc::new(metasrv); // put metasrv into plugins for later use plugins.insert::>(metasrv.clone()); let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins)) - .context(InitExportMetricsTaskSnafu)?; + .context(error::InitExportMetricsTaskSnafu)?; Ok(MetasrvInstance { metasrv, httpsrv, @@ -108,7 +107,7 @@ impl MetasrvInstance { self.metasrv.try_start().await?; if let Some(t) = self.export_metrics_task.as_ref() { - t.start(None).context(InitExportMetricsTaskSnafu)? + t.start(None).context(error::InitExportMetricsTaskSnafu)? } let (tx, rx) = mpsc::channel::<()>(1); @@ -229,10 +228,11 @@ pub async fn metasrv_builder( } #[cfg(feature = "pg_kvbackend")] (None, BackendImpl::PostgresStore) => { - let pg_client = create_postgres_client(opts).await?; - let kv_backend = PgStore::with_pg_client(pg_client) + let pool = create_postgres_pool(opts).await?; + let kv_backend = PgStore::with_pg_pool(pool) .await .context(error::KvBackendSnafu)?; + // Client for election should be created separately since we need a different session keep-alive idle time. let election_client = create_postgres_client(opts).await?; let election = PgElection::with_pg_client( opts.server_addr.clone(), @@ -287,9 +287,12 @@ async fn create_etcd_client(opts: &MetasrvOptions) -> Result { #[cfg(feature = "pg_kvbackend")] async fn create_postgres_client(opts: &MetasrvOptions) -> Result { - let postgres_url = opts.store_addrs.first().context(InvalidArgumentsSnafu { - err_msg: "empty store addrs", - })?; + let postgres_url = opts + .store_addrs + .first() + .context(error::InvalidArgumentsSnafu { + err_msg: "empty store addrs", + })?; let (client, connection) = tokio_postgres::connect(postgres_url, NoTls) .await .context(error::ConnectPostgresSnafu)?; @@ -301,3 +304,19 @@ async fn create_postgres_client(opts: &MetasrvOptions) -> Result Result { + let postgres_url = opts + .store_addrs + .first() + .context(error::InvalidArgumentsSnafu { + err_msg: "empty store addrs", + })?; + let mut cfg = Config::new(); + cfg.url = Some(postgres_url.to_string()); + let pool = cfg + .create_pool(Some(Runtime::Tokio1), NoTls) + .context(error::CreatePostgresPoolSnafu)?; + Ok(pool) +} diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index ddc9d3658bad..25c949c55c4d 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -704,7 +704,7 @@ pub enum Error { }, #[cfg(feature = "pg_kvbackend")] - #[snafu(display("Failed to connect to PostgresSQL"))] + #[snafu(display("Failed to connect to Postgres"))] ConnectPostgres { #[snafu(source)] error: tokio_postgres::Error, @@ -712,6 +712,23 @@ pub enum Error { location: Location, }, + #[cfg(feature = "pg_kvbackend")] + #[snafu(display("Failed to create connection pool for Postgres"))] + CreatePostgresPool { + #[snafu(source)] + error: deadpool_postgres::CreatePoolError, + #[snafu(implicit)] + location: Location, + }, + + #[cfg(feature = "pg_kvbackend")] + #[snafu(display("Failed to get connection from Postgres pool: {}", reason))] + GetPostgresConnection { + reason: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Handler not found: {}", name))] HandlerNotFound { name: String, @@ -843,9 +860,10 @@ impl ErrorExt for Error { Error::Other { source, .. } => source.status_code(), Error::LookupPeer { source, .. } => source.status_code(), #[cfg(feature = "pg_kvbackend")] - Error::ConnectPostgres { .. } => StatusCode::Internal, - #[cfg(feature = "pg_kvbackend")] - Error::PostgresExecution { .. } => StatusCode::Internal, + Error::CreatePostgresPool { .. } + | Error::GetPostgresConnection { .. } + | Error::PostgresExecution { .. } + | Error::ConnectPostgres { .. } => StatusCode::Internal, } }