From 93be81c0410a32f8955769599090de0f4f11e73f Mon Sep 17 00:00:00 2001 From: Lanqing Yang Date: Wed, 14 Aug 2024 15:49:32 -0700 Subject: [PATCH] feat: implement postgres kvbackend (#4421) --- .../actions/setup-postgres-cluster/action.yml | 30 + .github/workflows/develop.yml | 8 + Cargo.lock | 3 + Cargo.toml | 1 + src/cmd/src/metasrv.rs | 10 + src/common/meta/Cargo.toml | 2 + src/common/meta/src/error.rs | 35 +- src/common/meta/src/kv_backend.rs | 2 + src/common/meta/src/kv_backend/postgres.rs | 626 ++++++++++++++++++ src/meta-srv/Cargo.toml | 3 + src/meta-srv/src/bootstrap.rs | 34 +- src/meta-srv/src/error.rs | 20 + src/meta-srv/src/metasrv.rs | 17 + tests-integration/Cargo.toml | 2 +- .../postgres/docker-compose-standalone.yml | 12 + tests/runner/src/env.rs | 4 +- 16 files changed, 801 insertions(+), 8 deletions(-) create mode 100644 .github/actions/setup-postgres-cluster/action.yml create mode 100644 src/common/meta/src/kv_backend/postgres.rs create mode 100644 tests-integration/fixtures/postgres/docker-compose-standalone.yml diff --git a/.github/actions/setup-postgres-cluster/action.yml b/.github/actions/setup-postgres-cluster/action.yml new file mode 100644 index 000000000000..5f93a6a63e56 --- /dev/null +++ b/.github/actions/setup-postgres-cluster/action.yml @@ -0,0 +1,30 @@ +name: Setup PostgreSQL +description: Deploy PostgreSQL on Kubernetes +inputs: + postgres-replicas: + default: 1 + description: "Number of PostgreSQL replicas" + namespace: + default: "postgres-namespace" + postgres-version: + default: "14.2" + description: "PostgreSQL version" + storage-size: + default: "1Gi" + description: "Storage size for PostgreSQL" + +runs: + using: composite + steps: + - name: Install PostgreSQL + shell: bash + run: | + helm upgrade \ + --install postgresql oci://registry-1.docker.io/bitnamicharts/postgresql \ + --set replicaCount=${{ inputs.postgres-replicas }} \ + --set image.tag=${{ inputs.postgres-version }} \ + --set persistence.size=${{ inputs.storage-size }} \ + --set postgresql.username=greptimedb \ + --set postgresql.password=admin \ + --create-namespace \ + -n ${{ inputs.namespace }} diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index 13bbc41db125..ac4a6f64dcbd 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -338,6 +338,8 @@ jobs: uses: ./.github/actions/setup-kafka-cluster - name: Setup Etcd cluser uses: ./.github/actions/setup-etcd-cluster + - name: Setup Postgres cluser + uses: ./.github/actions/setup-postgres-cluster # Prepares for fuzz tests - uses: arduino/setup-protoc@v3 with: @@ -476,6 +478,8 @@ jobs: uses: ./.github/actions/setup-kafka-cluster - name: Setup Etcd cluser uses: ./.github/actions/setup-etcd-cluster + - name: Setup Postgres cluser + uses: ./.github/actions/setup-postgres-cluster # Prepares for fuzz tests - uses: arduino/setup-protoc@v3 with: @@ -702,6 +706,9 @@ jobs: - name: Setup minio working-directory: tests-integration/fixtures/minio run: docker compose -f docker-compose-standalone.yml up -d --wait + - name: Setup postgres server + working-directory: tests-integration/fixtures/postgres + run: docker compose -f docker-compose-standalone.yml up -d --wait - name: Run nextest cases run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F pyo3_backend -F dashboard env: @@ -718,6 +725,7 @@ jobs: GT_MINIO_REGION: us-west-2 GT_MINIO_ENDPOINT_URL: http://127.0.0.1:9000 GT_ETCD_ENDPOINTS: http://127.0.0.1:2379 + GT_POSTGRES_ENDPOINTS: postgres://greptimedb:admin@127.0.0.1:5432/postgres GT_KAFKA_ENDPOINTS: 127.0.0.1:9092 GT_KAFKA_SASL_ENDPOINTS: 127.0.0.1:9093 UNITTEST_LOG_DIR: "__unittest_logs" diff --git a/Cargo.lock b/Cargo.lock index acd6a6bd8b73..12a4fdca9890 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2094,6 +2094,7 @@ dependencies = [ "strum 0.25.0", "table", "tokio", + "tokio-postgres", "tonic 0.11.0", "typetag", "uuid", @@ -6129,6 +6130,7 @@ dependencies = [ "api", "async-trait", "chrono", + "clap 4.5.7", "client", "common-base", "common-catalog", @@ -6170,6 +6172,7 @@ dependencies = [ "store-api", "table", "tokio", + "tokio-postgres", "tokio-stream", "toml 0.8.14", "tonic 0.11.0", diff --git a/Cargo.toml b/Cargo.toml index d2fb1632cdbf..4b5fb9369e38 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -174,6 +174,7 @@ sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "5 strum = { version = "0.25", features = ["derive"] } tempfile = "3" tokio = { version = "1.36", features = ["full"] } +tokio-postgres = "0.7" tokio-stream = { version = "0.1" } tokio-util = { version = "0.7", features = ["io-util", "compat"] } toml = "0.8.8" diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index b072b7d183b5..44463d063b59 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -22,6 +22,7 @@ use common_telemetry::info; use common_telemetry::logging::TracingOptions; use common_version::{short_version, version}; use meta_srv::bootstrap::MetasrvInstance; +use meta_srv::metasrv::BackendImpl; use snafu::ResultExt; use tracing_appender::non_blocking::WorkerGuard; @@ -137,6 +138,9 @@ struct StartCommand { /// The max operations per txn #[clap(long)] max_txn_ops: Option, + /// The database backend. + #[clap(long, value_enum)] + backend: Option, } impl StartCommand { @@ -219,6 +223,12 @@ impl StartCommand { opts.max_txn_ops = max_txn_ops; } + if let Some(backend) = &self.backend { + opts.backend.clone_from(backend); + } else { + opts.backend = BackendImpl::default() + } + // Disable dashboard in metasrv. opts.http.disable_dashboard = true; diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 347ccc3b5f2b..28f518190a56 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -6,6 +6,7 @@ license.workspace = true [features] testing = [] +pg_kvbackend = ["dep:tokio-postgres"] [lints] workspace = true @@ -56,6 +57,7 @@ store-api.workspace = true strum.workspace = true table.workspace = true tokio.workspace = true +tokio-postgres = { workspace = true, optional = true } tonic.workspace = true typetag = "0.2" diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 6d436edae03e..4667a9ef8914 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -643,6 +643,15 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to parse {} from str to utf8", name))] + StrFromUtf8 { + name: String, + #[snafu(source)] + error: std::str::Utf8Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Value not exists"))] ValueNotExist { #[snafu(implicit)] @@ -651,6 +660,24 @@ pub enum Error { #[snafu(display("Failed to get cache"))] GetCache { source: Arc }, + + #[cfg(feature = "pg_kvbackend")] + #[snafu(display("Failed to execute via Postgres"))] + PostgresExecution { + #[snafu(source)] + error: tokio_postgres::Error, + #[snafu(implicit)] + location: Location, + }, + + #[cfg(feature = "pg_kvbackend")] + #[snafu(display("Failed to connect to Postgres"))] + ConnectPostgres { + #[snafu(source)] + error: tokio_postgres::Error, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -704,7 +731,8 @@ impl ErrorExt for Error { | UnexpectedLogicalRouteTable { .. } | ProcedureOutput { .. } | FromUtf8 { .. } - | MetadataCorruption { .. } => StatusCode::Unexpected, + | MetadataCorruption { .. } + | StrFromUtf8 { .. } => StatusCode::Unexpected, SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } | RenameTable { .. } => { StatusCode::Internal @@ -749,6 +777,11 @@ impl ErrorExt for Error { | ParseNum { .. } | InvalidRole { .. } | EmptyDdlTasks { .. } => StatusCode::InvalidArguments, + + #[cfg(feature = "pg_kvbackend")] + PostgresExecution { .. } => StatusCode::Internal, + #[cfg(feature = "pg_kvbackend")] + ConnectPostgres { .. } => StatusCode::Internal, } } diff --git a/src/common/meta/src/kv_backend.rs b/src/common/meta/src/kv_backend.rs index 1bc04e7089d0..ba9db2ec2a44 100644 --- a/src/common/meta/src/kv_backend.rs +++ b/src/common/meta/src/kv_backend.rs @@ -31,6 +31,8 @@ use crate::rpc::KeyValue; pub mod chroot; pub mod etcd; pub mod memory; +#[cfg(feature = "pg_kvbackend")] +pub mod postgres; pub mod test; pub mod txn; diff --git a/src/common/meta/src/kv_backend/postgres.rs b/src/common/meta/src/kv_backend/postgres.rs new file mode 100644 index 000000000000..8add65cd49c4 --- /dev/null +++ b/src/common/meta/src/kv_backend/postgres.rs @@ -0,0 +1,626 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::borrow::Cow; +use std::sync::Arc; + +use snafu::ResultExt; +use tokio_postgres::types::ToSql; +use tokio_postgres::{Client, NoTls}; + +use super::{KvBackend, TxnService}; +use crate::error::{ConnectPostgresSnafu, Error, PostgresExecutionSnafu, Result, StrFromUtf8Snafu}; +use crate::kv_backend::txn::{Txn as KvTxn, TxnResponse as KvTxnResponse}; +use crate::kv_backend::KvBackendRef; +use crate::rpc::store::{ + BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, + BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, + DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, +}; +use crate::rpc::KeyValue; + +/// Posgres backend store for metasrv +pub struct PgStore { + // TODO: Consider using sqlx crate. + client: Client, +} + +const EMPTY: &[u8] = &[0]; + +// TODO: allow users to configure metadata table name. +const METADKV_CREATION: &str = + "CREATE TABLE IF NOT EXISTS greptime_metakv(k varchar PRIMARY KEY, v varchar)"; + +const FULL_TABLE_SCAN: &str = "SELECT k, v FROM greptime_metakv $1 ORDER BY K"; + +const POINT_GET: &str = "SELECT k, v FROM greptime_metakv WHERE k = $1"; + +const PREFIX_SCAN: &str = "SELECT k, v FROM greptime_metakv WHERE k LIKE $1 ORDER BY K"; + +const RANGE_SCAN_LEFT_BOUNDED: &str = "SELECT k, v FROM greptime_metakv WHERE k >= $1 ORDER BY K"; + +const RANGE_SCAN_FULL_RANGE: &str = + "SELECT k, v FROM greptime_metakv WHERE k >= $1 AND K < $2 ORDER BY K"; + +const FULL_TABLE_DELETE: &str = "DELETE FROM greptime_metakv RETURNING k,v"; + +const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE K = $1 RETURNING k,v;"; + +const PREFIX_DELETE: &str = "DELETE FROM greptime_metakv WHERE k LIKE $1 RETURNING k,v;"; + +const RANGE_DELETE_LEFT_BOUNDED: &str = "DELETE FROM greptime_metakv WHERE k >= $1 RETURNING k,v;"; + +const RANGE_DELETE_FULL_RANGE: &str = + "DELETE FROM greptime_metakv WHERE k >= $1 AND K < $2 RETURNING k,v;"; + +const CAS: &str = r#" +WITH prev AS ( + SELECT k,v FROM greptime_metakv WHERE k = $1 AND v = $2 +), update AS ( +UPDATE greptime_metakv +SET k=$1, +v=$2 +WHERE + k=$1 AND v=$3 +) + +SELECT k, v FROM prev; +"#; + +const PUT_IF_NOT_EXISTS: &str = r#" +WITH prev AS ( + select k,v from greptime_metakv where k = $1 +), insert AS ( + INSERT INTO greptime_metakv + VALUES ($1, $2) + ON CONFLICT (k) DO NOTHING +) + +SELECT k, v FROM prev;"#; + +impl PgStore { + /// Create pgstore impl of KvBackendRef from url. + pub async fn with_url(url: &str) -> Result { + // TODO: support tls. + let (client, conn) = tokio_postgres::connect(url, NoTls) + .await + .context(ConnectPostgresSnafu)?; + tokio::spawn(async move { conn.await.context(ConnectPostgresSnafu) }); + Self::with_pg_client(client).await + } + + /// Create pgstore impl of KvBackendRef from tokio-postgres client. + pub async fn with_pg_client(client: Client) -> Result { + // This step ensures the postgres metadata backend is ready to use. + // We check if greptime_metakv table exists, and we will create a new table + // if it does not exist. + client + .execute(METADKV_CREATION, &[]) + .await + .context(PostgresExecutionSnafu)?; + Ok(Arc::new(Self { client })) + } + + async fn put_if_not_exists(&self, key: &str, value: &str) -> Result { + let res = self + .client + .query(PUT_IF_NOT_EXISTS, &[&key, &value]) + .await + .context(PostgresExecutionSnafu)?; + Ok(res.is_empty()) + } +} + +fn select_range_template(req: &RangeRequest) -> &str { + if req.range_end.is_empty() { + return POINT_GET; + } + if req.key == EMPTY && req.range_end == EMPTY { + FULL_TABLE_SCAN + } else if req.range_end == EMPTY { + RANGE_SCAN_LEFT_BOUNDED + } else if is_prefix_range(&req.key, &req.range_end) { + PREFIX_SCAN + } else { + RANGE_SCAN_FULL_RANGE + } +} + +fn select_range_delete_template(req: &DeleteRangeRequest) -> &str { + if req.range_end.is_empty() { + return POINT_DELETE; + } + if req.key == EMPTY && req.range_end == EMPTY { + FULL_TABLE_DELETE + } else if req.range_end == EMPTY { + RANGE_DELETE_LEFT_BOUNDED + } else if is_prefix_range(&req.key, &req.range_end) { + PREFIX_DELETE + } else { + RANGE_DELETE_FULL_RANGE + } +} + +// Generate dynamic parameterized sql for batch get. +fn generate_batch_get_query(key_len: usize) -> String { + let in_placeholders: Vec = (1..=key_len).map(|i| format!("${}", i)).collect(); + let in_clause = in_placeholders.join(", "); + format!( + "SELECT k, v FROM greptime_metakv WHERE k in ({});", + in_clause + ) +} + +// Generate dynamic parameterized sql for batch delete. +fn generate_batch_delete_query(key_len: usize) -> String { + let in_placeholders: Vec = (1..=key_len).map(|i| format!("${}", i)).collect(); + let in_clause = in_placeholders.join(", "); + format!( + "DELETE FROM greptime_metakv WHERE k in ({}) RETURNING k, v;", + in_clause + ) +} + +// Generate dynamic parameterized sql for batch upsert. +fn generate_batch_upsert_query(kv_len: usize) -> String { + let in_placeholders: Vec = (1..=kv_len).map(|i| format!("${}", i)).collect(); + let in_clause = in_placeholders.join(", "); + let mut param_index = kv_len + 1; + let mut values_placeholders = Vec::new(); + for _ in 0..kv_len { + values_placeholders.push(format!("(${0}, ${1})", param_index, param_index + 1)); + param_index += 2; + } + let values_clause = values_placeholders.join(", "); + + format!( + r#" + WITH prev AS ( + SELECT k,v FROM greptime_metakv WHERE k IN ({in_clause}) + ), update AS ( + INSERT INTO greptime_metakv (k, v) VALUES + {values_clause} + ON CONFLICT ( + k + ) DO UPDATE SET + v = excluded.v + ) + + SELECT k, v FROM prev; + "# + ) +} + +// Trim null byte at the end and convert bytes to string. +fn process_bytes<'a>(data: &'a [u8], name: &str) -> Result<&'a str> { + let mut len = data.len(); + // remove trailing null bytes to avoid error in postgres encoding. + while len > 0 && data[len - 1] == 0 { + len -= 1; + } + let res = std::str::from_utf8(&data[0..len]).context(StrFromUtf8Snafu { name })?; + Ok(res) +} + +#[async_trait::async_trait] +impl KvBackend for PgStore { + fn name(&self) -> &str { + "Postgres" + } + + fn as_any(&self) -> &dyn Any { + self + } + + async fn range(&self, req: RangeRequest) -> Result { + let mut params = vec![]; + let template = select_range_template(&req); + if req.key != EMPTY { + let key = process_bytes(&req.key, "rangeKey")?; + if template == PREFIX_SCAN { + let prefix = format!("{key}%"); + params.push(Cow::Owned(prefix)) + } else { + params.push(Cow::Borrowed(key)) + } + } + if template == RANGE_SCAN_FULL_RANGE && req.range_end != EMPTY { + let range_end = process_bytes(&req.range_end, "rangeEnd")?; + params.push(Cow::Borrowed(range_end)); + } + let limit = req.limit as usize; + let limit_cause = match limit > 0 { + true => format!(" LIMIT {};", limit + 1), + false => ";".to_string(), + }; + let template = format!("{}{}", template, limit_cause); + let params: Vec<&(dyn ToSql + Sync)> = params + .iter() + .map(|x| match x { + Cow::Borrowed(borrowed) => borrowed as &(dyn ToSql + Sync), + Cow::Owned(owned) => owned as &(dyn ToSql + Sync), + }) + .collect(); + let res = self + .client + .query(&template, ¶ms) + .await + .context(PostgresExecutionSnafu)?; + let kvs: Vec = res + .into_iter() + .map(|r| { + let key: String = r.get(0); + if req.keys_only { + return KeyValue { + key: key.into_bytes(), + value: vec![], + }; + } + let value: String = r.get(1); + KeyValue { + key: key.into_bytes(), + value: value.into_bytes(), + } + }) + .collect(); + if limit == 0 || limit > kvs.len() { + return Ok(RangeResponse { kvs, more: false }); + } + let (filtered_kvs, _) = kvs.split_at(limit); + Ok(RangeResponse { + kvs: filtered_kvs.to_vec(), + more: kvs.len() > limit, + }) + } + + async fn put(&self, req: PutRequest) -> Result { + let kv = KeyValue { + key: req.key, + value: req.value, + }; + let mut res = self + .batch_put(BatchPutRequest { + kvs: vec![kv], + prev_kv: req.prev_kv, + }) + .await?; + + if !res.prev_kvs.is_empty() { + return Ok(PutResponse { + prev_kv: Some(res.prev_kvs.remove(0)), + }); + } + Ok(PutResponse { prev_kv: None }) + } + + async fn batch_put(&self, req: BatchPutRequest) -> Result { + let mut in_params = Vec::with_capacity(req.kvs.len()); + let mut values_params = Vec::with_capacity(req.kvs.len() * 2); + + for kv in &req.kvs { + let processed_key = process_bytes(&kv.key, "BatchPutRequestKey")?; + in_params.push(processed_key); + + let processed_value = process_bytes(&kv.value, "BatchPutRequestValue")?; + values_params.push(processed_key); + values_params.push(processed_value); + } + in_params.extend(values_params); + let params: Vec<&(dyn ToSql + Sync)> = + in_params.iter().map(|x| x as &(dyn ToSql + Sync)).collect(); + + let query = generate_batch_upsert_query(req.kvs.len()); + let res = self + .client + .query(&query, ¶ms) + .await + .context(PostgresExecutionSnafu)?; + if req.prev_kv { + let kvs: Vec = res + .into_iter() + .map(|r| { + let key: String = r.get(0); + let value: String = r.get(1); + KeyValue { + key: key.into_bytes(), + value: value.into_bytes(), + } + }) + .collect(); + if !kvs.is_empty() { + return Ok(BatchPutResponse { prev_kvs: kvs }); + } + } + Ok(BatchPutResponse { prev_kvs: vec![] }) + } + + async fn batch_get(&self, req: BatchGetRequest) -> Result { + if req.keys.is_empty() { + return Ok(BatchGetResponse { kvs: vec![] }); + } + let query = generate_batch_get_query(req.keys.len()); + let value_params = req + .keys + .iter() + .map(|k| process_bytes(k, "BatchGetRequestKey")) + .collect::>>()?; + let params: Vec<&(dyn ToSql + Sync)> = value_params + .iter() + .map(|x| x as &(dyn ToSql + Sync)) + .collect(); + let res = self + .client + .query(&query, ¶ms) + .await + .context(PostgresExecutionSnafu)?; + let kvs: Vec = res + .into_iter() + .map(|r| { + let key: String = r.get(0); + let value: String = r.get(1); + KeyValue { + key: key.into_bytes(), + value: value.into_bytes(), + } + }) + .collect(); + Ok(BatchGetResponse { kvs }) + } + + async fn delete_range(&self, req: DeleteRangeRequest) -> Result { + let mut params = vec![]; + let template = select_range_delete_template(&req); + if req.key != EMPTY { + let key = process_bytes(&req.key, "deleteRangeKey")?; + if template == PREFIX_DELETE { + let prefix = format!("{key}%"); + params.push(Cow::Owned(prefix)); + } else { + params.push(Cow::Borrowed(key)); + } + } + if template == RANGE_DELETE_FULL_RANGE && req.range_end != EMPTY { + let range_end = process_bytes(&req.range_end, "deleteRangeEnd")?; + params.push(Cow::Borrowed(range_end)); + } + let params: Vec<&(dyn ToSql + Sync)> = params + .iter() + .map(|x| match x { + Cow::Borrowed(borrowed) => borrowed as &(dyn ToSql + Sync), + Cow::Owned(owned) => owned as &(dyn ToSql + Sync), + }) + .collect(); + + let res = self + .client + .query(template, ¶ms) + .await + .context(PostgresExecutionSnafu)?; + let deleted = res.len() as i64; + if !req.prev_kv { + return Ok({ + DeleteRangeResponse { + deleted, + prev_kvs: vec![], + } + }); + } + let kvs: Vec = res + .into_iter() + .map(|r| { + let key: String = r.get(0); + let value: String = r.get(1); + KeyValue { + key: key.into_bytes(), + value: value.into_bytes(), + } + }) + .collect(); + Ok(DeleteRangeResponse { + deleted, + prev_kvs: kvs, + }) + } + + async fn batch_delete(&self, req: BatchDeleteRequest) -> Result { + if req.keys.is_empty() { + return Ok(BatchDeleteResponse { prev_kvs: vec![] }); + } + let query = generate_batch_delete_query(req.keys.len()); + let value_params = req + .keys + .iter() + .map(|k| process_bytes(k, "BatchDeleteRequestKey")) + .collect::>>()?; + let params: Vec<&(dyn ToSql + Sync)> = value_params + .iter() + .map(|x| x as &(dyn ToSql + Sync)) + .collect(); + let res = self + .client + .query(&query, ¶ms) + .await + .context(PostgresExecutionSnafu)?; + if !req.prev_kv { + return Ok(BatchDeleteResponse { prev_kvs: vec![] }); + } + let kvs: Vec = res + .into_iter() + .map(|r| { + let key: String = r.get(0); + let value: String = r.get(1); + KeyValue { + key: key.into_bytes(), + value: value.into_bytes(), + } + }) + .collect(); + Ok(BatchDeleteResponse { prev_kvs: kvs }) + } + + async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result { + let key = process_bytes(&req.key, "CASKey")?; + let value = process_bytes(&req.value, "CASValue")?; + if req.expect.is_empty() { + let put_res = self.put_if_not_exists(key, value).await?; + return Ok(CompareAndPutResponse { + success: put_res, + prev_kv: None, + }); + } + let expect = process_bytes(&req.expect, "CASExpect")?; + + let res = self + .client + .query(CAS, &[&key, &value, &expect]) + .await + .context(PostgresExecutionSnafu)?; + match res.is_empty() { + true => Ok(CompareAndPutResponse { + success: false, + prev_kv: None, + }), + false => { + let mut kvs: Vec = res + .into_iter() + .map(|r| { + let key: String = r.get(0); + let value: String = r.get(1); + KeyValue { + key: key.into_bytes(), + value: value.into_bytes(), + } + }) + .collect(); + Ok(CompareAndPutResponse { + success: true, + prev_kv: Some(kvs.remove(0)), + }) + } + } + } +} + +#[async_trait::async_trait] +impl TxnService for PgStore { + type Error = Error; + + async fn txn(&self, _txn: KvTxn) -> Result { + // TODO: implement txn for pg kv backend. + unimplemented!() + } + + fn max_txn_ops(&self) -> usize { + unreachable!("postgres backend does not support max_txn_ops!") + } +} + +fn is_prefix_range(start: &[u8], end: &[u8]) -> bool { + if start.len() != end.len() { + return false; + } + let l = start.len(); + let same_prefix = start[0..l - 1] == end[0..l - 1]; + if let (Some(rhs), Some(lhs)) = (start.last(), end.last()) { + return same_prefix && (*rhs + 1) == *lhs; + } + false +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::kv_backend::test::{ + prepare_kv_with_prefix, test_kv_batch_delete_with_prefix, test_kv_batch_get_with_prefix, + test_kv_compare_and_put_with_prefix, test_kv_delete_range_with_prefix, + test_kv_put_with_prefix, test_kv_range_2_with_prefix, test_kv_range_with_prefix, + unprepare_kv, + }; + + async fn build_pg_kv_backend() -> Option { + let endpoints = std::env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default(); + if endpoints.is_empty() { + return None; + } + + let (client, connection) = tokio_postgres::connect(&endpoints, NoTls).await.unwrap(); + tokio::spawn(connection); + let _ = client.execute(METADKV_CREATION, &[]).await; + Some(PgStore { client }) + } + + #[tokio::test] + async fn test_put() { + if let Some(kv_backend) = build_pg_kv_backend().await { + let prefix = b"put/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } + } + + #[tokio::test] + async fn test_range() { + if let Some(kv_backend) = build_pg_kv_backend().await { + let prefix = b"range/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } + } + + #[tokio::test] + async fn test_range_2() { + if let Some(kv_backend) = build_pg_kv_backend().await { + test_kv_range_2_with_prefix(kv_backend, b"range2/".to_vec()).await; + } + } + + #[tokio::test] + async fn test_batch_get() { + if let Some(kv_backend) = build_pg_kv_backend().await { + let prefix = b"batchGet/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_compare_and_put() { + if let Some(kv_backend) = build_pg_kv_backend().await { + let kv_backend = Arc::new(kv_backend); + test_kv_compare_and_put_with_prefix(kv_backend, b"compareAndPut/".to_vec()).await; + } + } + + #[tokio::test] + async fn test_delete_range() { + if let Some(kv_backend) = build_pg_kv_backend().await { + let prefix = b"deleteRange/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_delete_range_with_prefix(kv_backend, prefix.to_vec()).await; + } + } + + #[tokio::test] + async fn test_batch_delete() { + if let Some(kv_backend) = build_pg_kv_backend().await { + let prefix = b"batchDelete/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_batch_delete_with_prefix(kv_backend, prefix.to_vec()).await; + } + } +} diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 458832b34d43..2e3216d075d0 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -6,6 +6,7 @@ license.workspace = true [features] mock = [] +pg_kvbackend = ["dep:tokio-postgres"] [lints] workspace = true @@ -13,6 +14,7 @@ workspace = true [dependencies] api.workspace = true async-trait = "0.1" +clap.workspace = true client.workspace = true common-base.workspace = true common-catalog.workspace = true @@ -52,6 +54,7 @@ snafu.workspace = true store-api.workspace = true table.workspace = true tokio.workspace = true +tokio-postgres = { workspace = true, optional = true } tokio-stream = { workspace = true, features = ["net"] } toml.workspace = true tonic.workspace = true diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index b860db6a24f2..dcfac253f380 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -24,6 +24,8 @@ use common_config::Configurable; use common_meta::kv_backend::chroot::ChrootKvBackend; use common_meta::kv_backend::etcd::EtcdStore; use common_meta::kv_backend::memory::MemoryKvBackend; +#[cfg(feature = "pg_kvbackend")] +use common_meta::kv_backend::postgres::PgStore; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use common_telemetry::info; use etcd_client::Client; @@ -33,17 +35,23 @@ use servers::export_metrics::ExportMetricsTask; use servers::http::{HttpServer, HttpServerBuilder}; use servers::metrics_handler::MetricsHandler; use servers::server::Server; +#[cfg(feature = "pg_kvbackend")] +use snafu::OptionExt; use snafu::ResultExt; use tokio::net::TcpListener; use tokio::sync::mpsc::{self, Receiver, Sender}; +#[cfg(feature = "pg_kvbackend")] +use tokio_postgres::NoTls; use tonic::transport::server::{Router, TcpIncoming}; use crate::election::etcd::EtcdElection; +#[cfg(feature = "pg_kvbackend")] +use crate::error::InvalidArgumentsSnafu; use crate::error::{InitExportMetricsTaskSnafu, TomlFormatSnafu}; use crate::lock::etcd::EtcdLock; use crate::lock::memory::MemLock; use crate::metasrv::builder::MetasrvBuilder; -use crate::metasrv::{Metasrv, MetasrvOptions, SelectorRef}; +use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectorRef}; use crate::selector::lease_based::LeaseBasedSelector; use crate::selector::load_based::LoadBasedSelector; use crate::selector::round_robin::RoundRobinSelector; @@ -185,14 +193,14 @@ pub async fn metasrv_builder( plugins: Plugins, kv_backend: Option, ) -> Result { - let (kv_backend, election, lock) = match (kv_backend, opts.use_memory_store) { + let (kv_backend, election, lock) = match (kv_backend, &opts.backend) { (Some(kv_backend), _) => (kv_backend, None, Some(Arc::new(MemLock::default()) as _)), - (None, true) => ( + (None, BackendImpl::MemoryStore) => ( Arc::new(MemoryKvBackend::new()) as _, None, Some(Arc::new(MemLock::default()) as _), ), - (None, false) => { + (None, BackendImpl::EtcdStore) => { let etcd_client = create_etcd_client(opts).await?; let kv_backend = { let etcd_backend = @@ -222,6 +230,13 @@ pub async fn metasrv_builder( )?), ) } + #[cfg(feature = "pg_kvbackend")] + (None, BackendImpl::PostgresStore) => { + let pg_client = create_postgres_client(opts).await?; + let kv_backend = PgStore::with_pg_client(pg_client).await.unwrap(); + // TODO: implement locking and leader election for pg backend. + (kv_backend, None, None) + } }; let in_memory = Arc::new(MemoryKvBackend::new()) as ResettableKvBackendRef; @@ -253,3 +268,14 @@ async fn create_etcd_client(opts: &MetasrvOptions) -> Result { .await .context(error::ConnectEtcdSnafu) } + +#[cfg(feature = "pg_kvbackend")] +async fn create_postgres_client(opts: &MetasrvOptions) -> Result { + let postgres_url = opts.store_addrs.first().context(InvalidArgumentsSnafu { + err_msg: "empty store addrs", + })?; + let (client, _) = tokio_postgres::connect(postgres_url, NoTls) + .await + .context(error::ConnectPostgresSnafu)?; + Ok(client) +} diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 8c5312fe9195..a41dd6387237 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -865,6 +865,22 @@ pub enum Error { location: Location, source: BoxedError, }, + + #[cfg(feature = "pg_kvbackend")] + #[snafu(display("Failed to execute via postgres"))] + PostgresExecution { + #[snafu(implicit)] + location: Location, + }, + + #[cfg(feature = "pg_kvbackend")] + #[snafu(display("Failed to connect to PostgresSQL"))] + ConnectPostgres { + #[snafu(source)] + error: tokio_postgres::Error, + #[snafu(implicit)] + location: Location, + }, } impl Error { @@ -1003,6 +1019,10 @@ impl ErrorExt for Error { Error::Other { source, .. } => source.status_code(), Error::LookupPeer { source, .. } => source.status_code(), + #[cfg(feature = "pg_kvbackend")] + Error::ConnectPostgres { .. } => StatusCode::Internal, + #[cfg(feature = "pg_kvbackend")] + Error::PostgresExecution { .. } => StatusCode::Internal, } } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 138490fb8c04..d2a80734084c 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -19,6 +19,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; +use clap::ValueEnum; use common_base::readable_size::ReadableSize; use common_base::Plugins; use common_config::Configurable; @@ -65,6 +66,19 @@ pub const TABLE_ID_SEQ: &str = "table_id"; pub const FLOW_ID_SEQ: &str = "flow_id"; pub const METASRV_HOME: &str = "/tmp/metasrv"; +// The datastores that implements metadata kvbackend. +#[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)] +pub enum BackendImpl { + // Etcd as metadata storage. + #[default] + EtcdStore, + // In memory metadata storage - mostly used for testing. + MemoryStore, + #[cfg(feature = "pg_kvbackend")] + // Postgres as metadata storage. + PostgresStore, +} + #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(default)] pub struct MetasrvOptions { @@ -114,6 +128,8 @@ pub struct MetasrvOptions { pub max_txn_ops: usize, /// The tracing options. pub tracing: TracingOptions, + /// The datastore for kv metadata. + pub backend: BackendImpl, } impl Default for MetasrvOptions { @@ -146,6 +162,7 @@ impl Default for MetasrvOptions { store_key_prefix: String::new(), max_txn_ops: 128, tracing: TracingOptions::default(), + backend: BackendImpl::EtcdStore, } } } diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 3dbe3f79f216..aa5f74540d14 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -90,5 +90,5 @@ rand.workspace = true script.workspace = true session = { workspace = true, features = ["testing"] } store-api.workspace = true -tokio-postgres = "0.7" +tokio-postgres = { workspace = true } url = "2.3" diff --git a/tests-integration/fixtures/postgres/docker-compose-standalone.yml b/tests-integration/fixtures/postgres/docker-compose-standalone.yml new file mode 100644 index 000000000000..4f1ca8e00778 --- /dev/null +++ b/tests-integration/fixtures/postgres/docker-compose-standalone.yml @@ -0,0 +1,12 @@ +version: '3.9' +services: + postgres: + image: postgres:14-alpine + ports: + - 5432:5432 + volumes: + - ~/apps/postgres:/var/lib/postgresql/data + environment: + - POSTGRES_USER=greptimedb + - POSTGRES_DB=postgres + - POSTGRES_PASSWORD=admin diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index b0d4c733f071..d8ed5713f52f 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -245,8 +245,8 @@ impl Env { DEFAULT_LOG_LEVEL.to_string(), subcommand.to_string(), "start".to_string(), - "--use-memory-store".to_string(), - "true".to_string(), + "--backend".to_string(), + "memory-store".to_string(), "--enable-region-failover".to_string(), "false".to_string(), "--http-addr=127.0.0.1:5002".to_string(),