Skip to content

Commit

Permalink
feat: use connection pool for pg kv backend in preparation for txn (#…
Browse files Browse the repository at this point in the history
…5260)

* feat: use connection pool

* chore: follow review comments

* fix: create table before test
  • Loading branch information
CookiePieWw authored Jan 2, 2025
1 parent 2b89970 commit 1b0b9ad
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 51 deletions.
37 changes: 37 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" }
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" }
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" }
deadpool = "0.10"
deadpool-postgres = "0.12"
derive_builder = "0.12"
dotenv = "0.15"
etcd-client = "0.13"
Expand Down
2 changes: 2 additions & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ common-wal.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datatypes.workspace = true
deadpool.workspace = true
deadpool-postgres.workspace = true
derive_builder.workspace = true
etcd-client.workspace = true
futures.workspace = true
Expand Down
20 changes: 14 additions & 6 deletions src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,10 +667,18 @@ pub enum Error {
},

#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to connect to Postgres"))]
ConnectPostgres {
#[snafu(display("Failed to create connection pool for Postgres"))]
CreatePostgresPool {
#[snafu(source)]
error: tokio_postgres::Error,
error: deadpool_postgres::CreatePoolError,
#[snafu(implicit)]
location: Location,
},

#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to get Postgres connection from pool: {}", reason))]
GetPostgresConnection {
reason: String,
#[snafu(implicit)]
location: Location,
},
Expand Down Expand Up @@ -786,9 +794,9 @@ impl ErrorExt for Error {
| EmptyDdlTasks { .. } => StatusCode::InvalidArguments,

#[cfg(feature = "pg_kvbackend")]
PostgresExecution { .. } => StatusCode::Internal,
#[cfg(feature = "pg_kvbackend")]
ConnectPostgres { .. } => StatusCode::Internal,
PostgresExecution { .. } | CreatePostgresPool { .. } | GetPostgresConnection { .. } => {
StatusCode::Internal
}
Error::DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
}
}
Expand Down
95 changes: 65 additions & 30 deletions src/common/meta/src/kv_backend/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@ use std::any::Any;
use std::borrow::Cow;
use std::sync::Arc;

use common_telemetry::error;
use deadpool_postgres::{Config, Pool, Runtime};
use snafu::ResultExt;
use tokio_postgres::types::ToSql;
use tokio_postgres::{Client, NoTls};
use tokio_postgres::NoTls;

use super::{KvBackend, TxnService};
use crate::error::{ConnectPostgresSnafu, Error, PostgresExecutionSnafu, Result, StrFromUtf8Snafu};
use crate::error::{
CreatePostgresPoolSnafu, Error, GetPostgresConnectionSnafu, PostgresExecutionSnafu, Result,
StrFromUtf8Snafu,
};
use crate::kv_backend::txn::{Txn as KvTxn, TxnResponse as KvTxnResponse};
use crate::kv_backend::KvBackendRef;
use crate::kv_backend::{KvBackend, KvBackendRef, TxnService};
use crate::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
Expand All @@ -34,8 +36,7 @@ use crate::rpc::KeyValue;

/// Posgres backend store for metasrv
pub struct PgStore {
// TODO: Consider using sqlx crate.
client: Client,
pool: Pool,
}

const EMPTY: &[u8] = &[0];
Expand Down Expand Up @@ -94,33 +95,49 @@ SELECT k, v FROM prev;"#;
impl PgStore {
/// Create pgstore impl of KvBackendRef from url.
pub async fn with_url(url: &str) -> Result<KvBackendRef> {
// TODO: support tls.
let (client, conn) = tokio_postgres::connect(url, NoTls)
.await
.context(ConnectPostgresSnafu)?;
tokio::spawn(async move {
if let Err(e) = conn.await {
error!(e; "connection error");
}
});
Self::with_pg_client(client).await
let mut cfg = Config::new();
cfg.url = Some(url.to_string());
let pool = cfg
.create_pool(Some(Runtime::Tokio1), NoTls)
.context(CreatePostgresPoolSnafu)?;
Self::with_pg_pool(pool).await
}

/// Create pgstore impl of KvBackendRef from tokio-postgres client.
pub async fn with_pg_client(client: Client) -> Result<KvBackendRef> {
pub async fn with_pg_pool(pool: Pool) -> Result<KvBackendRef> {
// This step ensures the postgres metadata backend is ready to use.
// We check if greptime_metakv table exists, and we will create a new table
// if it does not exist.
let client = match pool.get().await {
Ok(client) => client,
Err(e) => {
return GetPostgresConnectionSnafu {
reason: e.to_string(),
}
.fail();
}
};
client
.execute(METADKV_CREATION, &[])
.await
.context(PostgresExecutionSnafu)?;
Ok(Arc::new(Self { client }))
Ok(Arc::new(Self { pool }))
}

async fn get_client(&self) -> Result<deadpool::managed::Object<deadpool_postgres::Manager>> {
match self.pool.get().await {
Ok(client) => Ok(client),
Err(e) => GetPostgresConnectionSnafu {
reason: e.to_string(),
}
.fail(),
}
}

async fn put_if_not_exists(&self, key: &str, value: &str) -> Result<bool> {
let res = self
.client
.get_client()
.await?
.query(PUT_IF_NOT_EXISTS, &[&key, &value])
.await
.context(PostgresExecutionSnafu)?;
Expand Down Expand Up @@ -259,7 +276,8 @@ impl KvBackend for PgStore {
})
.collect();
let res = self
.client
.get_client()
.await?
.query(&template, &params)
.await
.context(PostgresExecutionSnafu)?;
Expand Down Expand Up @@ -327,8 +345,10 @@ impl KvBackend for PgStore {
in_params.iter().map(|x| x as &(dyn ToSql + Sync)).collect();

let query = generate_batch_upsert_query(req.kvs.len());

let res = self
.client
.get_client()
.await?
.query(&query, &params)
.await
.context(PostgresExecutionSnafu)?;
Expand Down Expand Up @@ -365,8 +385,10 @@ impl KvBackend for PgStore {
.iter()
.map(|x| x as &(dyn ToSql + Sync))
.collect();

let res = self
.client
.get_client()
.await?
.query(&query, &params)
.await
.context(PostgresExecutionSnafu)?;
Expand Down Expand Up @@ -409,7 +431,8 @@ impl KvBackend for PgStore {
.collect();

let res = self
.client
.get_client()
.await?
.query(template, &params)
.await
.context(PostgresExecutionSnafu)?;
Expand Down Expand Up @@ -453,8 +476,10 @@ impl KvBackend for PgStore {
.iter()
.map(|x| x as &(dyn ToSql + Sync))
.collect();

let res = self
.client
.get_client()
.await?
.query(&query, &params)
.await
.context(PostgresExecutionSnafu)?;
Expand Down Expand Up @@ -488,7 +513,8 @@ impl KvBackend for PgStore {
let expect = process_bytes(&req.expect, "CASExpect")?;

let res = self
.client
.get_client()
.await?
.query(CAS, &[&key, &value, &expect])
.await
.context(PostgresExecutionSnafu)?;
Expand Down Expand Up @@ -560,10 +586,19 @@ mod tests {
return None;
}

let (client, connection) = tokio_postgres::connect(&endpoints, NoTls).await.unwrap();
tokio::spawn(connection);
let _ = client.execute(METADKV_CREATION, &[]).await;
Some(PgStore { client })
let mut cfg = Config::new();
cfg.url = Some(endpoints);
let pool = cfg
.create_pool(Some(Runtime::Tokio1), NoTls)
.context(CreatePostgresPoolSnafu)
.unwrap();
let client = pool.get().await.unwrap();
client
.execute(METADKV_CREATION, &[])
.await
.context(PostgresExecutionSnafu)
.unwrap();
Some(PgStore { pool })
}

#[tokio::test]
Expand Down
2 changes: 2 additions & 0 deletions src/meta-srv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ common-version.workspace = true
common-wal.workspace = true
dashmap.workspace = true
datatypes.workspace = true
deadpool.workspace = true
deadpool-postgres.workspace = true
derive_builder.workspace = true
etcd-client.workspace = true
futures.workspace = true
Expand Down
Loading

0 comments on commit 1b0b9ad

Please sign in to comment.