From 883ac2de6a9dd7b6a87d201821b007b8c4f5bceb Mon Sep 17 00:00:00 2001 From: lyang24 Date: Wed, 24 Jul 2024 00:01:37 -0700 Subject: [PATCH] feat: implement postgres kvbackend --- .../actions/setup-postgres-cluster/action.yml | 28 + .github/workflows/develop.yml | 1 + Cargo.lock | 16 +- Cargo.toml | 1 + src/common/meta/Cargo.toml | 1 + src/common/meta/src/error.rs | 20 +- src/common/meta/src/kv_backend.rs | 1 + src/common/meta/src/kv_backend/postgres.rs | 717 ++++++++++++++++++ src/meta-srv/Cargo.toml | 1 + src/meta-srv/src/bootstrap.rs | 27 +- src/meta-srv/src/error.rs | 22 +- src/meta-srv/src/metasrv.rs | 10 + 12 files changed, 830 insertions(+), 15 deletions(-) create mode 100644 .github/actions/setup-postgres-cluster/action.yml create mode 100644 src/common/meta/src/kv_backend/postgres.rs diff --git a/.github/actions/setup-postgres-cluster/action.yml b/.github/actions/setup-postgres-cluster/action.yml new file mode 100644 index 000000000000..8163c7be00d8 --- /dev/null +++ b/.github/actions/setup-postgres-cluster/action.yml @@ -0,0 +1,28 @@ +name: Setup PostgreSQL +description: Deploy PostgreSQL on Kubernetes +inputs: + postgres-replicas: + default: 1 + description: "Number of PostgreSQL replicas" + namespace: + default: "postgres-namespace" + postgres-version: + default: "14.2" + description: "PostgreSQL version" + storage-size: + default: "1Gi" + description: "Storage size for PostgreSQL" + +runs: + using: composite + steps: + - name: Install PostgreSQL + shell: bash + run: | + helm upgrade \ + --install postgresql bitnami/postgresql \ + --set replicaCount=${{ inputs.postgres-replicas }} \ + --set image.tag=${{ inputs.postgres-version }} \ + --set persistence.size=${{ inputs.storage-size }} \ + --create-namespace \ + -n ${{ inputs.namespace }} diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index 875b59c768ce..71fcb014ae96 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -681,6 +681,7 @@ jobs: GT_MINIO_REGION: us-west-2 GT_MINIO_ENDPOINT_URL: http://127.0.0.1:9000 GT_ETCD_ENDPOINTS: http://127.0.0.1:2379 + GT_POSTGRES_ENDPOINTS: http://127.0.0.1:5432 GT_KAFKA_ENDPOINTS: 127.0.0.1:9092 UNITTEST_LOG_DIR: "__unittest_logs" - name: Codecov upload diff --git a/Cargo.lock b/Cargo.lock index 1c7ddf1b404c..1f101eec3180 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2092,6 +2092,7 @@ dependencies = [ "strum 0.25.0", "table", "tokio", + "tokio-postgres", "tonic 0.11.0", "typetag", "uuid", @@ -6140,6 +6141,7 @@ dependencies = [ "store-api", "table", "tokio", + "tokio-postgres", "tokio-stream", "toml 0.8.14", "tonic 0.11.0", @@ -7926,11 +7928,11 @@ checksum = "7170ef9988bc169ba16dd36a7fa041e5c4cbeb6a35b76d4c03daded371eae7c0" [[package]] name = "postgres-protocol" -version = "0.6.6" +version = "0.6.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49b6c5ef183cd3ab4ba005f1ca64c21e8bd97ce4699cfea9e8d9a2c4958ca520" +checksum = "acda0ebdebc28befa84bee35e651e4c5f09073d668c7aed4cf7e23c3cda84b23" dependencies = [ - "base64 0.21.7", + "base64 0.22.1", "byteorder", "bytes", "fallible-iterator", @@ -7944,9 +7946,9 @@ dependencies = [ [[package]] name = "postgres-types" -version = "0.2.6" +version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d2234cdee9408b523530a9b6d2d6b373d1db34f6a8e51dc03ded1828d7fb67c" +checksum = "02048d9e032fb3cc3413bbf7b83a15d84a5d419778e2628751896d856498eee9" dependencies = [ "array-init", "bytes", @@ -11906,9 +11908,9 @@ dependencies = [ [[package]] name = "tokio-postgres" -version = "0.7.10" +version = "0.7.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d340244b32d920260ae7448cb72b6e238bddc3d4f7603394e7dd46ed8e48f5b8" +checksum = "03adcf0147e203b6032c0b2d30be1415ba03bc348901f3ff1cc0df6a733e60c3" dependencies = [ "async-trait", "byteorder", diff --git a/Cargo.toml b/Cargo.toml index 79b527104b9c..69b0612f1ce0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -138,6 +138,7 @@ opentelemetry-proto = { version = "0.5", features = [ parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] } paste = "1.0" pin-project = "1.0" +tokio-postgres = "0.7.11" prometheus = { version = "0.13.3", features = ["process"] } promql-parser = { version = "0.4" } prost = "0.12" diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 347ccc3b5f2b..e1aae80cc14a 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -56,6 +56,7 @@ store-api.workspace = true strum.workspace = true table.workspace = true tokio.workspace = true +tokio-postgres.workspace = true tonic.workspace = true typetag = "0.2" diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index ccd887345c07..a0a016561027 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -644,6 +644,22 @@ pub enum Error { #[snafu(display("Failed to get cache"))] GetCache { source: Arc }, + + #[snafu(display("Failed to execute via Postgres"))] + PostgresFailed { + #[snafu(source)] + error: tokio_postgres::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to connect to Postgres"))] + ConnectPostgres { + #[snafu(source)] + error: tokio_postgres::Error, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -655,10 +671,12 @@ impl ErrorExt for Error { IllegalServerState { .. } | EtcdTxnOpResponse { .. } | EtcdFailed { .. } + | PostgresFailed { .. } | EtcdTxnFailed { .. } | ConnectEtcd { .. } | MoveValues { .. } - | GetCache { .. } => StatusCode::Internal, + | GetCache { .. } + | ConnectPostgres { .. } => StatusCode::Internal, ValueNotExist { .. } => StatusCode::Unexpected, diff --git a/src/common/meta/src/kv_backend.rs b/src/common/meta/src/kv_backend.rs index 1bc04e7089d0..0d1b2ebd9375 100644 --- a/src/common/meta/src/kv_backend.rs +++ b/src/common/meta/src/kv_backend.rs @@ -31,6 +31,7 @@ use crate::rpc::KeyValue; pub mod chroot; pub mod etcd; pub mod memory; +pub mod postgres; pub mod test; pub mod txn; diff --git a/src/common/meta/src/kv_backend/postgres.rs b/src/common/meta/src/kv_backend/postgres.rs new file mode 100644 index 000000000000..64d8d193f45d --- /dev/null +++ b/src/common/meta/src/kv_backend/postgres.rs @@ -0,0 +1,717 @@ +use std::any::Any; +use std::sync::Arc; + +use snafu::ResultExt; +use tokio_postgres::{Client, NoTls}; + +use super::{KvBackend, TxnService}; +use crate::error::{ConnectPostgresSnafu, Error, FromUtf8Snafu, PostgresFailedSnafu, Result}; +use crate::kv_backend::txn::{Txn as KvTxn, TxnResponse as KvTxnResponse}; +use crate::kv_backend::KvBackendRef; +use crate::rpc::store::{ + BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, + BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse, + RangeRequest, RangeResponse, +}; +use crate::rpc::KeyValue; + +pub struct PgStore { + client: Client, +} + +const METADKV_CREATION: &str = + "CREATE TABLE IF NOT EXISTS metakv(k varchar PRIMARY KEY, v varchar);"; + +impl PgStore { + pub async fn with_url(url: &str) -> Result { + let (client, conn) = tokio_postgres::connect(url, NoTls) + .await + .context(ConnectPostgresSnafu)?; + tokio::spawn(async move { conn.await.context(ConnectPostgresSnafu) }); + Self::with_pg_client(client).await + } + + pub async fn with_pg_client(client: Client) -> Result { + // This step ensures the postgres metadata backend is ready to use. + // We check if metakv table exists, and we will create a new table + // if it does not exist. + client + .execute(METADKV_CREATION, &[]) + .await + .context(PostgresFailedSnafu)?; + Ok(Arc::new(Self { client })) + } +} + +/* +SELECT k, v FROM metakv +WHERE k >= start_key and k <= end_key +*/ +fn generate_range_scan_query(req: RangeRequest) -> Result { + let mut sql = String::new(); + sql.push_str("SELECT k"); + if !req.keys_only { + sql.push_str(",v"); + } + sql.push_str(" FROM metakv "); + let start_key = String::from_utf8(req.key.clone()).context(FromUtf8Snafu { + name: "GetRequestKey", + })?; + if req.key != b"\0" || req.range_end != b"\0" { + match req.range_end.is_empty() { + true => sql.push_str(&format!("WHERE k = '{start_key}'")), + false => match is_prefix_range(req.key.clone(), &req.range_end) { + true => { + sql.push_str(&format!("WHERE k LIKE '{start_key}%'")); + } + false => { + sql.push_str(&format!("WHERE k >= '{start_key}'")); + if req.range_end != b"\0" { + let end_key = String::from_utf8(req.range_end).context(FromUtf8Snafu { + name: "GetRequestRangeEnd", + })?; + sql.push_str(&format!(" AND k < '{end_key}'")); + } + } + }, + } + } + if req.limit > 0 { + sql.push_str(&format!(" LIMIT {}", req.limit)) + } + sql = sql.trim_end().to_string(); + sql.push_str(";"); + Ok(sql) +} + +/* +SELECT k, v FROM metakv +WHERE k IN ('k1', 'k2', 'k3') +*/ +fn generate_batch_scan_query(req: BatchGetRequest) -> Result { + let keys = req + .keys + .iter() + .map(|k| { + String::from_utf8(k.to_vec()).context(FromUtf8Snafu { + name: "BatchGetRequestKey", + }) + }) + .collect::>>()?; + let mut sql = String::new(); + sql.push_str("SELECT k, v FROM metakv "); + let predicate = keys + .iter() + .map(|k| format!("'{k}'")) + .collect::>() + .join(","); + sql.push_str(&format!("WHERE k IN ({predicate})")); + sql.push_str(";"); + Ok(sql) +} + +/* +WITH prev AS ( + select k,v from metakv where k in ({keys_sql}) +), update AS ( +INSERT INTO metakv (k, v) VALUES + {} +ON CONFLICT ( + k +) DO UPDATE SET + v = excluded.v +) + +SELECT k, v FROM prev; +*/ +fn generate_batch_upsert_query(kvs: Vec) -> Result { + let keys = kvs + .iter() + .map(|kv| { + String::from_utf8(kv.key.to_vec()).context(FromUtf8Snafu { + name: "BatchPutRequestKey", + }) + }) + .collect::>>()?; + let vals = kvs + .iter() + .map(|kv| { + String::from_utf8(kv.value.to_vec()).context(FromUtf8Snafu { + name: "BatchPutRequestValue", + }) + }) + .collect::>>()?; + let keys_sql = keys + .iter() + .map(|k| format!("'{k}'")) + .collect::>() + .join(","); + let kvs_sql = keys + .iter() + .zip(vals.iter()) + .map(|kv| format!("('{}', '{}')", kv.0, kv.1)) + .collect::>() + .join(","); + + Ok(format!( + r#" + WITH prev AS ( + select k,v from metakv where k in ({keys_sql}) + ), update AS ( + INSERT INTO metakv (k, v) VALUES + {kvs_sql} + ON CONFLICT ( + k + ) DO UPDATE SET + v = excluded.v + ) + + SELECT k, v FROM prev; + "# + )) +} + +/* +DELETE FROM metakv WHERE k >= start_key and k < end_key RETURNING k, v; +*/ +fn generate_range_delete_query(req: DeleteRangeRequest) -> Result { + let mut sql = String::new(); + sql.push_str("DELETE FROM metakv "); + let start_key = String::from_utf8(req.key.clone()).context(FromUtf8Snafu { + name: "DeleteRangeRequestKey", + })?; + match req.range_end.is_empty() { + true => sql.push_str(&format!("WHERE k = '{start_key}'")), + false => match is_prefix_range(req.key.clone(), &req.range_end) { + true => sql.push_str(&format!("WHERE k like '{start_key}%'")), + false => { + sql.push_str(&format!("WHERE k >= '{start_key}'")); + if req.range_end != b"\0" { + let end_key = String::from_utf8(req.range_end).context(FromUtf8Snafu { + name: "DeleteRangeRequestRangeEnd", + })?; + sql.push_str(&format!(" AND k <= '{end_key}'")); + } + } + }, + } + if req.prev_kv { + sql.push_str(" RETURNING k, v"); + } + sql.push_str(";"); + Ok(sql) +} + +/* +DELETE FROM metakv WHERE k IN (kvs) RETURNING K, V; +*/ +fn generate_batch_delete_query(req: BatchDeleteRequest) -> Result { + let keys = req + .keys + .iter() + .map(|k| { + String::from_utf8(k.to_vec()).context(FromUtf8Snafu { + name: "BatchDeleteequestKey", + }) + }) + .collect::>>()?; + let mut sql = String::new(); + sql.push_str("DELETE FROM metakv "); + let predicate = keys + .iter() + .map(|k| format!("'{k}'")) + .collect::>() + .join(","); + sql.push_str(&format!("WHERE k IN ({predicate})")); + sql.push_str(" RETURNING k, v"); + sql.push_str(";"); + Ok(sql) +} + +#[async_trait::async_trait] +impl KvBackend for PgStore { + fn name(&self) -> &str { + "Postgres" + } + + fn as_any(&self) -> &dyn Any { + self + } + + async fn range(&self, req: RangeRequest) -> Result { + let key_only = req.keys_only; + let query = generate_range_scan_query(req)?; + let res = self + .client + .query(&query, &[]) + .await + .context(PostgresFailedSnafu)?; + let kvs: Vec = res + .into_iter() + .map(|r| { + let key: String = r.get(0); + if key_only { + return KeyValue { + key: key.into_bytes(), + value: vec![], + }; + } + let value: String = r.get(1); + KeyValue { + key: key.into_bytes(), + value: value.into_bytes(), + } + }) + .collect(); + Ok(RangeResponse { kvs, more: false }) + } + + async fn put(&self, req: PutRequest) -> Result { + let kvs = KeyValue { + key: req.key, + value: req.value, + }; + let query = generate_batch_upsert_query(vec![kvs])?; + let res = self + .client + .query(&query, &[]) + .await + .context(PostgresFailedSnafu)?; + if req.prev_kv { + let mut kvs: Vec = res + .into_iter() + .map(|r| { + let key: String = r.get(0); + let value: String = r.get(1); + KeyValue { + key: key.into_bytes(), + value: value.into_bytes(), + } + }) + .collect(); + if !kvs.is_empty() { + return Ok(PutResponse { + prev_kv: Some(kvs.remove(0)), + }); + } + } + Ok(PutResponse { prev_kv: None }) + } + + async fn batch_put(&self, req: BatchPutRequest) -> Result { + let query = generate_batch_upsert_query(req.kvs)?; + let res = self + .client + .query(&query, &[]) + .await + .context(PostgresFailedSnafu)?; + if req.prev_kv { + let kvs: Vec = res + .into_iter() + .map(|r| { + let key: String = r.get(0); + let value: String = r.get(1); + KeyValue { + key: key.into_bytes(), + value: value.into_bytes(), + } + }) + .collect(); + if !kvs.is_empty() { + return Ok(BatchPutResponse { prev_kvs: kvs }); + } + } + Ok(BatchPutResponse { prev_kvs: vec![] }) + } + + async fn batch_get(&self, req: BatchGetRequest) -> Result { + let query = generate_batch_scan_query(req)?; + let res = self + .client + .query(&query, &[]) + .await + .context(PostgresFailedSnafu)?; + let kvs: Vec = res + .into_iter() + .map(|r| { + let key: String = r.get(0); + let value: String = r.get(1); + KeyValue { + key: key.into_bytes(), + value: value.into_bytes(), + } + }) + .collect(); + Ok(BatchGetResponse { kvs }) + } + + async fn delete_range(&self, req: DeleteRangeRequest) -> Result { + let prev_kv = req.prev_kv; + let query = generate_range_delete_query(req)?; + let res = self + .client + .query(&query, &[]) + .await + .context(PostgresFailedSnafu)?; + let deleted = res.len() as i64; + if !prev_kv { + return Ok({ + DeleteRangeResponse { + deleted, + prev_kvs: vec![], + } + }); + } + let kvs: Vec = res + .into_iter() + .map(|r| { + let key: String = r.get(0); + let value: String = r.get(1); + KeyValue { + key: key.into_bytes(), + value: value.into_bytes(), + } + }) + .collect(); + Ok(DeleteRangeResponse { + deleted, + prev_kvs: kvs, + }) + } + + async fn batch_delete(&self, req: BatchDeleteRequest) -> Result { + let prev_kv = req.prev_kv; + let query = generate_batch_delete_query(req)?; + let res = self + .client + .query(&query, &[]) + .await + .context(PostgresFailedSnafu)?; + if !prev_kv { + return Ok(BatchDeleteResponse { prev_kvs: vec![] }); + } + let kvs: Vec = res + .into_iter() + .map(|r| { + let key: String = r.get(0); + let value: String = r.get(1); + KeyValue { + key: key.into_bytes(), + value: value.into_bytes(), + } + }) + .collect(); + Ok(BatchDeleteResponse { prev_kvs: kvs }) + } +} + +#[async_trait::async_trait] +impl TxnService for PgStore { + type Error = Error; + + async fn txn(&self, _txn: KvTxn) -> Result { + unimplemented!() + } + + fn max_txn_ops(&self) -> usize { + unimplemented!() + } +} + +fn is_prefix_range(mut start: Vec, end: &[u8]) -> bool { + if let Some(last_byte) = start.last_mut() { + *last_byte += 1; + } + start == end +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_generate_range_scan_query() { + // case 1: key and end range both present + let req = RangeRequest { + key: b"test_key".to_vec(), + range_end: b"test_range_end".to_vec(), + limit: 64, + keys_only: true, + }; + let query = generate_range_scan_query(req).unwrap(); + assert_eq!( + "SELECT k FROM metakv WHERE k >= 'test_key' AND k < 'test_range_end' LIMIT 64;", + query + ); + + // case 2: range_end == \0 we scan >= key + let req = RangeRequest { + key: b"test_key".to_vec(), + range_end: vec![b'\0'], + limit: 64, + keys_only: true, + }; + let query = generate_range_scan_query(req).unwrap(); + assert_eq!( + "SELECT k FROM metakv WHERE k >= 'test_key' LIMIT 64;", + query + ); + + // case 3: full table scan key == \0 and end range == \0 no limit + let req = RangeRequest { + key: vec![b'\0'], + range_end: vec![b'\0'], + limit: 0, + keys_only: true, + }; + let query = generate_range_scan_query(req).unwrap(); + assert_eq!("SELECT k FROM metakv;", query); + + // case 4: prefix scan + let start_key = b"a"; + let post_key = b"b"; + assert_eq!(is_prefix_range(start_key.clone().to_vec(), post_key), true); + let req = RangeRequest { + key: start_key.to_vec(), + range_end: post_key.to_vec(), + limit: 0, + keys_only: true, + }; + let query = generate_range_scan_query(req).unwrap(); + assert_eq!("SELECT k FROM metakv WHERE k LIKE 'a%';", query); + + // case 5: point get + let req = RangeRequest { + key: b"test_key".to_vec(), + range_end: vec![], + limit: 64, + keys_only: true, + }; + let query = generate_range_scan_query(req).unwrap(); + assert_eq!("SELECT k FROM metakv WHERE k = 'test_key' LIMIT 64;", query); + + // case 6: key and end range both present + let req = RangeRequest { + key: b"test_key".to_vec(), + range_end: b"test_range_end".to_vec(), + limit: 64, + keys_only: false, + }; + let query = generate_range_scan_query(req).unwrap(); + assert_eq!( + "SELECT k,v FROM metakv WHERE k >= 'test_key' AND k < 'test_range_end' LIMIT 64;", + query + ); + } + + #[test] + fn test_generate_batch_upsert_query() { + // case 1: single upsert + let kvs = vec![KeyValue { + key: b"a".to_vec(), + value: b"b".to_vec(), + }]; + let mut query = generate_batch_upsert_query(kvs).unwrap(); + query = query.trim().to_string(); + let expected = r#"WITH prev AS ( + select k,v from metakv where k in ('a') + ), update AS ( + INSERT INTO metakv (k, v) VALUES + ('a', 'b') + ON CONFLICT ( + k + ) DO UPDATE SET + v = excluded.v + ) + + SELECT k, v FROM prev;"#; + assert_eq!(expected, query); + + // case 2: multi-upsert + let kvs = vec![ + KeyValue { + key: b"a".to_vec(), + value: b"b".to_vec(), + }, + KeyValue { + key: b"c".to_vec(), + value: b"d".to_vec(), + }, + ]; + let mut query = generate_batch_upsert_query(kvs).unwrap(); + query = query.trim().to_string(); + let expected = r#"WITH prev AS ( + select k,v from metakv where k in ('a','c') + ), update AS ( + INSERT INTO metakv (k, v) VALUES + ('a', 'b'),('c', 'd') + ON CONFLICT ( + k + ) DO UPDATE SET + v = excluded.v + ) + + SELECT k, v FROM prev;"#; + assert_eq!(expected, query); + } + + #[test] + fn test_generate_range_delete_query() { + // case 1: delete range + let req = DeleteRangeRequest { + key: b"test_key".to_vec(), + range_end: b"test_range_end".to_vec(), + prev_kv: true, + }; + let query = generate_range_delete_query(req).unwrap(); + assert_eq!( + "DELETE FROM metakv WHERE k >= 'test_key' AND k <= 'test_range_end' RETURNING k, v;", + query + ); + + // case 2: range_end == \0 we delete >= key + let req = DeleteRangeRequest { + key: b"test_key".to_vec(), + range_end: vec![b'\0'], + prev_kv: true, + }; + let query = generate_range_delete_query(req).unwrap(); + assert_eq!( + "DELETE FROM metakv WHERE k >= 'test_key' RETURNING k, v;", + query + ); + + // case 3: prefix delete + let start_key = b"a"; + let post_key = b"b"; + assert_eq!(is_prefix_range(start_key.clone().to_vec(), post_key), true); + let req = DeleteRangeRequest { + key: start_key.to_vec(), + range_end: post_key.to_vec(), + prev_kv: true, + }; + let query = generate_range_delete_query(req).unwrap(); + assert_eq!( + "DELETE FROM metakv WHERE k like 'a%' RETURNING k, v;", + query + ); + + // case 4: point delete + let req = DeleteRangeRequest { + key: b"test_key".to_vec(), + range_end: vec![], + prev_kv: true, + }; + let query = generate_range_delete_query(req).unwrap(); + assert_eq!( + "DELETE FROM metakv WHERE k = 'test_key' RETURNING k, v;", + query + ); + + // case 4: no prev_kv + let req = DeleteRangeRequest { + key: b"test_key".to_vec(), + range_end: vec![], + prev_kv: false, + }; + let query = generate_range_delete_query(req).unwrap(); + assert_eq!("DELETE FROM metakv WHERE k = 'test_key';", query); + } + + #[test] + fn test_generate_batch_delete_query() { + let keys = vec![b"a".to_vec(), b"b".to_vec(), b"c".to_vec()]; + let req = BatchDeleteRequest { + keys, + prev_kv: true, + }; + let query = generate_batch_delete_query(req).unwrap(); + assert_eq!( + "DELETE FROM metakv WHERE k IN ('a','b','c') RETURNING k, v;", + query + ); + } + + use crate::kv_backend::test::{ + prepare_kv_with_prefix, test_kv_batch_delete_with_prefix, test_kv_batch_get_with_prefix, + test_kv_compare_and_put_with_prefix, test_kv_delete_range_with_prefix, + test_kv_put_with_prefix, test_kv_range_2_with_prefix, test_kv_range_with_prefix, + unprepare_kv, + }; + + async fn build_pg_kv_backend() -> Option { + // let endpoints = std::env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default(); + let endpoints = "postgres://lanqing@127.0.0.1:5432/postgres"; + if endpoints.is_empty() { + return None; + } + + let (client, connection) = tokio_postgres::connect(endpoints, NoTls).await.unwrap(); + tokio::spawn(async move { connection.await }); + client.execute(METADKV_CREATION, &[]).await.unwrap(); + Some(PgStore { client }) + } + + #[tokio::test] + async fn test_put() { + if let Some(kv_backend) = build_pg_kv_backend().await { + let prefix = b"put/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } + } + + #[tokio::test] + async fn test_range() { + if let Some(kv_backend) = build_pg_kv_backend().await { + let prefix = b"range/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } + } + + #[tokio::test] + async fn test_range_2() { + if let Some(kv_backend) = build_pg_kv_backend().await { + test_kv_range_2_with_prefix(kv_backend, b"range2/".to_vec()).await; + } + } + + #[tokio::test] + async fn test_batch_get() { + if let Some(kv_backend) = build_pg_kv_backend().await { + let prefix = b"batchGet/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_compare_and_put() { + if let Some(kv_backend) = build_pg_kv_backend().await { + let kv_backend = Arc::new(kv_backend); + test_kv_compare_and_put_with_prefix(kv_backend, b"compareAndPut/".to_vec()).await; + } + } + + #[tokio::test] + async fn test_delete_range() { + if let Some(kv_backend) = build_pg_kv_backend().await { + let prefix = b"deleteRange/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_delete_range_with_prefix(kv_backend, prefix.to_vec()).await; + } + } + + #[tokio::test] + async fn test_batch_delete() { + if let Some(kv_backend) = build_pg_kv_backend().await { + let prefix = b"batchDelete/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_batch_delete_with_prefix(kv_backend, prefix.to_vec()).await; + } + } +} diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 458832b34d43..fc0fd130cdd7 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -52,6 +52,7 @@ snafu.workspace = true store-api.workspace = true table.workspace = true tokio.workspace = true +tokio-postgres.workspace = true tokio-stream = { workspace = true, features = ["net"] } toml.workspace = true tonic.workspace = true diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index b860db6a24f2..55a26ce92aa0 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -24,6 +24,7 @@ use common_config::Configurable; use common_meta::kv_backend::chroot::ChrootKvBackend; use common_meta::kv_backend::etcd::EtcdStore; use common_meta::kv_backend::memory::MemoryKvBackend; +use common_meta::kv_backend::postgres::PgStore; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use common_telemetry::info; use etcd_client::Client; @@ -33,17 +34,18 @@ use servers::export_metrics::ExportMetricsTask; use servers::http::{HttpServer, HttpServerBuilder}; use servers::metrics_handler::MetricsHandler; use servers::server::Server; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use tokio::net::TcpListener; use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio_postgres::NoTls; use tonic::transport::server::{Router, TcpIncoming}; use crate::election::etcd::EtcdElection; -use crate::error::{InitExportMetricsTaskSnafu, TomlFormatSnafu}; +use crate::error::{EmptyServerAddrSnafu, InitExportMetricsTaskSnafu, TomlFormatSnafu}; use crate::lock::etcd::EtcdLock; use crate::lock::memory::MemLock; use crate::metasrv::builder::MetasrvBuilder; -use crate::metasrv::{Metasrv, MetasrvOptions, SelectorRef}; +use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectorRef}; use crate::selector::lease_based::LeaseBasedSelector; use crate::selector::load_based::LoadBasedSelector; use crate::selector::round_robin::RoundRobinSelector; @@ -185,14 +187,14 @@ pub async fn metasrv_builder( plugins: Plugins, kv_backend: Option, ) -> Result { - let (kv_backend, election, lock) = match (kv_backend, opts.use_memory_store) { + let (kv_backend, election, lock) = match (kv_backend, &opts.backend) { (Some(kv_backend), _) => (kv_backend, None, Some(Arc::new(MemLock::default()) as _)), - (None, true) => ( + (None, BackendImpl::MemoryStore) => ( Arc::new(MemoryKvBackend::new()) as _, None, Some(Arc::new(MemLock::default()) as _), ), - (None, false) => { + (None, BackendImpl::EtcdStore) => { let etcd_client = create_etcd_client(opts).await?; let kv_backend = { let etcd_backend = @@ -222,6 +224,11 @@ pub async fn metasrv_builder( )?), ) } + (None, BackendImpl::PostgresStore) => { + let pg_client = create_postgres_client(opts).await?; + let kv_backend = PgStore::with_pg_client(pg_client).await.unwrap(); + (kv_backend, None, None) + } }; let in_memory = Arc::new(MemoryKvBackend::new()) as ResettableKvBackendRef; @@ -253,3 +260,11 @@ async fn create_etcd_client(opts: &MetasrvOptions) -> Result { .await .context(error::ConnectEtcdSnafu) } + +async fn create_postgres_client(opts: &MetasrvOptions) -> Result { + let postgres_url = opts.store_addrs.first().context(EmptyServerAddrSnafu)?; + let (client, _) = tokio_postgres::connect(postgres_url, NoTls) + .await + .context(error::ConnectPostgresSnafu)?; + Ok(client) +} diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 6e9efc48570b..3ee56dfd82b6 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -851,6 +851,23 @@ pub enum Error { #[snafu(source(from(common_config::error::Error, Box::new)))] source: Box, }, + #[snafu(display("Failed to execute via postgres"))] + PostgresFailed { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to connect to PostgresSQL"))] + ConnectPostgres { + #[snafu(source)] + error: tokio_postgres::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Empty server address"))] + EmptyServerAddr { + #[snafu(implicit)] + location: Location, + }, } impl Error { @@ -902,7 +919,10 @@ impl ErrorExt for Error { | Error::Join { .. } | Error::WeightArray { .. } | Error::NotSetWeightArray { .. } - | Error::PeerUnavailable { .. } => StatusCode::Internal, + | Error::PeerUnavailable { .. } + | Error::ConnectPostgres { .. } + | Error::EmptyServerAddr { .. } + | Error::PostgresFailed { .. } => StatusCode::Internal, Error::Unsupported { .. } => StatusCode::Unsupported, diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index d92798a113b1..473fb8630351 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -65,6 +65,13 @@ pub const TABLE_ID_SEQ: &str = "table_id"; pub const FLOW_ID_SEQ: &str = "flow_id"; pub const METASRV_HOME: &str = "/tmp/metasrv"; +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub enum BackendImpl { + EtcdStore, + MemoryStore, + PostgresStore, +} + #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(default)] pub struct MetasrvOptions { @@ -114,6 +121,8 @@ pub struct MetasrvOptions { pub max_txn_ops: usize, /// The tracing options. pub tracing: TracingOptions, + /// The datastore for kv metadata. + pub backend: BackendImpl, } impl Default for MetasrvOptions { @@ -146,6 +155,7 @@ impl Default for MetasrvOptions { store_key_prefix: String::new(), max_txn_ops: 128, tracing: TracingOptions::default(), + backend: BackendImpl::EtcdStore, } } }