diff --git a/Cargo.lock b/Cargo.lock index 1c7ddf1b404c..1f101eec3180 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2092,6 +2092,7 @@ dependencies = [ "strum 0.25.0", "table", "tokio", + "tokio-postgres", "tonic 0.11.0", "typetag", "uuid", @@ -6140,6 +6141,7 @@ dependencies = [ "store-api", "table", "tokio", + "tokio-postgres", "tokio-stream", "toml 0.8.14", "tonic 0.11.0", @@ -7926,11 +7928,11 @@ checksum = "7170ef9988bc169ba16dd36a7fa041e5c4cbeb6a35b76d4c03daded371eae7c0" [[package]] name = "postgres-protocol" -version = "0.6.6" +version = "0.6.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49b6c5ef183cd3ab4ba005f1ca64c21e8bd97ce4699cfea9e8d9a2c4958ca520" +checksum = "acda0ebdebc28befa84bee35e651e4c5f09073d668c7aed4cf7e23c3cda84b23" dependencies = [ - "base64 0.21.7", + "base64 0.22.1", "byteorder", "bytes", "fallible-iterator", @@ -7944,9 +7946,9 @@ dependencies = [ [[package]] name = "postgres-types" -version = "0.2.6" +version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d2234cdee9408b523530a9b6d2d6b373d1db34f6a8e51dc03ded1828d7fb67c" +checksum = "02048d9e032fb3cc3413bbf7b83a15d84a5d419778e2628751896d856498eee9" dependencies = [ "array-init", "bytes", @@ -11906,9 +11908,9 @@ dependencies = [ [[package]] name = "tokio-postgres" -version = "0.7.10" +version = "0.7.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d340244b32d920260ae7448cb72b6e238bddc3d4f7603394e7dd46ed8e48f5b8" +checksum = "03adcf0147e203b6032c0b2d30be1415ba03bc348901f3ff1cc0df6a733e60c3" dependencies = [ "async-trait", "byteorder", diff --git a/Cargo.toml b/Cargo.toml index 79b527104b9c..69b0612f1ce0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -138,6 +138,7 @@ opentelemetry-proto = { version = "0.5", features = [ parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] } paste = "1.0" pin-project = "1.0" +tokio-postgres = "0.7.11" prometheus = { version = "0.13.3", features = ["process"] } promql-parser = { version = "0.4" } prost = "0.12" diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 347ccc3b5f2b..e1aae80cc14a 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -56,6 +56,7 @@ store-api.workspace = true strum.workspace = true table.workspace = true tokio.workspace = true +tokio-postgres.workspace = true tonic.workspace = true typetag = "0.2" diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index ccd887345c07..546d8d866116 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -644,6 +644,14 @@ pub enum Error { #[snafu(display("Failed to get cache"))] GetCache { source: Arc }, + + #[snafu(display("Failed to execute via Postgres"))] + PostgresFailed { + #[snafu(source)] + error: tokio_postgres::Error, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -655,6 +663,7 @@ impl ErrorExt for Error { IllegalServerState { .. } | EtcdTxnOpResponse { .. } | EtcdFailed { .. } + | PostgresFailed { .. } | EtcdTxnFailed { .. } | ConnectEtcd { .. } | MoveValues { .. } diff --git a/src/common/meta/src/kv_backend.rs b/src/common/meta/src/kv_backend.rs index 1bc04e7089d0..0d1b2ebd9375 100644 --- a/src/common/meta/src/kv_backend.rs +++ b/src/common/meta/src/kv_backend.rs @@ -31,6 +31,7 @@ use crate::rpc::KeyValue; pub mod chroot; pub mod etcd; pub mod memory; +pub mod postgres; pub mod test; pub mod txn; diff --git a/src/common/meta/src/kv_backend/postgres.rs b/src/common/meta/src/kv_backend/postgres.rs new file mode 100644 index 000000000000..94ff8cd214da --- /dev/null +++ b/src/common/meta/src/kv_backend/postgres.rs @@ -0,0 +1,137 @@ +use std::any::Any; +use std::sync::Arc; + +use snafu::ResultExt; +use tokio_postgres::{Client, NoTls}; + +use super::{KvBackend, TxnService}; +use crate::error::{Error, PostgresFailedSnafu, Result}; +use crate::kv_backend::txn::{Txn as KvTxn, TxnResponse as KvTxnResponse}; +use crate::kv_backend::KvBackendRef; +use crate::rpc::store::{ + BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, + BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse, + RangeRequest, RangeResponse, +}; +use crate::rpc::KeyValue; + +pub struct PgStore { + client: Client, +} + +const METADKV_CREATION: &str = + "CREATE TABLE IF NOT EXISTS metakv(k varchar PRIMARY KEY, v varchar);"; + +impl PgStore { + pub async fn with_url(url: &str) -> Result { + let (client, _) = tokio_postgres::connect(url, NoTls).await.unwrap(); + Self::with_pg_client(client).await + } + + pub async fn with_pg_client(client: Client) -> Result { + // This step ensure the postgres metadata backend is ready to use by + // checking if metadata kv table exists, and create a new table + // if it does not exist. + client + .execute(METADKV_CREATION, &[]) + .await + .context(PostgresFailedSnafu)?; + Ok(Arc::new(Self { client })) + } + + /* + SELECT K, V FROM metakv + WHERE K >= start_key and k <= end_key + */ + async fn range_scan( + client: Client, + start_key: String, + end_key: Option, + ) -> Result> { + todo!() + } + + /* + SELECT K, V FROM metakv + WHERE K IN (keys) + */ + async fn batch_scan(client: Client, keys: Vec) -> Result> { + todo!() + } + + /* + BEGIN; + SELECT K, V FROM metakv FOR UPDATE; --return k, v as previous kvs. + INSERT INTO metakv VALUES (kvs) ON CONFLICT DO UPDATE; + COMMIT; + */ + async fn batch_upsert(client: Client, kvs: Vec) -> Result> { + todo!() + } + + /* + DELETE FROM metakv WHERE k >= start_key and k <= end_key RETURNING K, V; + */ + async fn range_delete( + client: Client, + start_key: String, + end_key: Option, + ) -> Result> { + todo!() + } + + /* + DELETE FROM metakv WHERE k IN (kvs) RETURNING K, V; + */ + async fn BATCH_delete(client: Client, kvs: Vec) -> Result> { + todo!() + } +} + +#[async_trait::async_trait] +impl KvBackend for PgStore { + fn name(&self) -> &str { + "Postgres" + } + + fn as_any(&self) -> &dyn Any { + self + } + + async fn range(&self, req: RangeRequest) -> Result { + todo!() + } + + async fn put(&self, req: PutRequest) -> Result { + todo!() + } + + async fn batch_put(&self, req: BatchPutRequest) -> Result { + todo!() + } + + async fn batch_get(&self, req: BatchGetRequest) -> Result { + todo!() + } + + async fn delete_range(&self, req: DeleteRangeRequest) -> Result { + todo!() + } + + async fn batch_delete(&self, req: BatchDeleteRequest) -> Result { + todo!() + } +} + +#[async_trait::async_trait] +impl TxnService for PgStore { + type Error = Error; + + async fn txn(&self, txn: KvTxn) -> Result { + todo!() + } + + fn max_txn_ops(&self) -> usize { + todo!() + } +} diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 458832b34d43..fc0fd130cdd7 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -52,6 +52,7 @@ snafu.workspace = true store-api.workspace = true table.workspace = true tokio.workspace = true +tokio-postgres.workspace = true tokio-stream = { workspace = true, features = ["net"] } toml.workspace = true tonic.workspace = true diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index b860db6a24f2..86aeba43605c 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -21,9 +21,11 @@ use api::v1::meta::procedure_service_server::ProcedureServiceServer; use api::v1::meta::store_server::StoreServer; use common_base::Plugins; use common_config::Configurable; +use common_meta::error::PostgresFailedSnafu; use common_meta::kv_backend::chroot::ChrootKvBackend; use common_meta::kv_backend::etcd::EtcdStore; use common_meta::kv_backend::memory::MemoryKvBackend; +use common_meta::kv_backend::postgres::PgStore; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use common_telemetry::info; use etcd_client::Client; @@ -33,17 +35,18 @@ use servers::export_metrics::ExportMetricsTask; use servers::http::{HttpServer, HttpServerBuilder}; use servers::metrics_handler::MetricsHandler; use servers::server::Server; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use tokio::net::TcpListener; use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio_postgres::NoTls; use tonic::transport::server::{Router, TcpIncoming}; use crate::election::etcd::EtcdElection; -use crate::error::{InitExportMetricsTaskSnafu, TomlFormatSnafu}; +use crate::error::{EmptyServerAddrSnafu, InitExportMetricsTaskSnafu, TomlFormatSnafu}; use crate::lock::etcd::EtcdLock; use crate::lock::memory::MemLock; use crate::metasrv::builder::MetasrvBuilder; -use crate::metasrv::{Metasrv, MetasrvOptions, SelectorRef}; +use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectorRef}; use crate::selector::lease_based::LeaseBasedSelector; use crate::selector::load_based::LoadBasedSelector; use crate::selector::round_robin::RoundRobinSelector; @@ -185,14 +188,14 @@ pub async fn metasrv_builder( plugins: Plugins, kv_backend: Option, ) -> Result { - let (kv_backend, election, lock) = match (kv_backend, opts.use_memory_store) { + let (kv_backend, election, lock) = match (kv_backend, &opts.backend) { (Some(kv_backend), _) => (kv_backend, None, Some(Arc::new(MemLock::default()) as _)), - (None, true) => ( + (None, BackendImpl::MemoryStore) => ( Arc::new(MemoryKvBackend::new()) as _, None, Some(Arc::new(MemLock::default()) as _), ), - (None, false) => { + (None, BackendImpl::EtcdStore) => { let etcd_client = create_etcd_client(opts).await?; let kv_backend = { let etcd_backend = @@ -222,6 +225,11 @@ pub async fn metasrv_builder( )?), ) } + (None, BackendImpl::PostgresStore) => { + let pg_client = create_postgres_client(opts).await?; + let kv_backend = PgStore::with_pg_client(pg_client).await.unwrap(); + (kv_backend, None, None) + } }; let in_memory = Arc::new(MemoryKvBackend::new()) as ResettableKvBackendRef; @@ -253,3 +261,11 @@ async fn create_etcd_client(opts: &MetasrvOptions) -> Result { .await .context(error::ConnectEtcdSnafu) } + +async fn create_postgres_client(opts: &MetasrvOptions) -> Result { + let postgres_url = opts.store_addrs.first().context(EmptyServerAddrSnafu)?; + let (client, _) = tokio_postgres::connect(postgres_url, NoTls) + .await + .context(error::ConnectPostgresSnafu)?; + Ok(client) +} diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 6e9efc48570b..3ee56dfd82b6 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -851,6 +851,23 @@ pub enum Error { #[snafu(source(from(common_config::error::Error, Box::new)))] source: Box, }, + #[snafu(display("Failed to execute via postgres"))] + PostgresFailed { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to connect to PostgresSQL"))] + ConnectPostgres { + #[snafu(source)] + error: tokio_postgres::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Empty server address"))] + EmptyServerAddr { + #[snafu(implicit)] + location: Location, + }, } impl Error { @@ -902,7 +919,10 @@ impl ErrorExt for Error { | Error::Join { .. } | Error::WeightArray { .. } | Error::NotSetWeightArray { .. } - | Error::PeerUnavailable { .. } => StatusCode::Internal, + | Error::PeerUnavailable { .. } + | Error::ConnectPostgres { .. } + | Error::EmptyServerAddr { .. } + | Error::PostgresFailed { .. } => StatusCode::Internal, Error::Unsupported { .. } => StatusCode::Unsupported, diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index d92798a113b1..473fb8630351 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -65,6 +65,13 @@ pub const TABLE_ID_SEQ: &str = "table_id"; pub const FLOW_ID_SEQ: &str = "flow_id"; pub const METASRV_HOME: &str = "/tmp/metasrv"; +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub enum BackendImpl { + EtcdStore, + MemoryStore, + PostgresStore, +} + #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(default)] pub struct MetasrvOptions { @@ -114,6 +121,8 @@ pub struct MetasrvOptions { pub max_txn_ops: usize, /// The tracing options. pub tracing: TracingOptions, + /// The datastore for kv metadata. + pub backend: BackendImpl, } impl Default for MetasrvOptions { @@ -146,6 +155,7 @@ impl Default for MetasrvOptions { store_key_prefix: String::new(), max_txn_ops: 128, tracing: TracingOptions::default(), + backend: BackendImpl::EtcdStore, } } }