Skip to content

Commit

Permalink
feat: implement kvs postgres backend
Browse files Browse the repository at this point in the history
  • Loading branch information
lyang24 committed Jul 24, 2024
1 parent 2ae2a66 commit 475b628
Show file tree
Hide file tree
Showing 10 changed files with 212 additions and 14 deletions.
16 changes: 9 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ opentelemetry-proto = { version = "0.5", features = [
parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] }
paste = "1.0"
pin-project = "1.0"
tokio-postgres = "0.7.11"
prometheus = { version = "0.13.3", features = ["process"] }
promql-parser = { version = "0.4" }
prost = "0.12"
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ store-api.workspace = true
strum.workspace = true
table.workspace = true
tokio.workspace = true
tokio-postgres.workspace = true
tonic.workspace = true
typetag = "0.2"

Expand Down
9 changes: 9 additions & 0 deletions src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,14 @@ pub enum Error {

#[snafu(display("Failed to get cache"))]
GetCache { source: Arc<Error> },

#[snafu(display("Failed to execute via Postgres"))]
PostgresFailed {
#[snafu(source)]
error: tokio_postgres::Error,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -655,6 +663,7 @@ impl ErrorExt for Error {
IllegalServerState { .. }
| EtcdTxnOpResponse { .. }
| EtcdFailed { .. }
| PostgresFailed { .. }
| EtcdTxnFailed { .. }
| ConnectEtcd { .. }
| MoveValues { .. }
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/src/kv_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::rpc::KeyValue;
pub mod chroot;
pub mod etcd;
pub mod memory;
pub mod postgres;
pub mod test;
pub mod txn;

Expand Down
137 changes: 137 additions & 0 deletions src/common/meta/src/kv_backend/postgres.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use std::any::Any;
use std::sync::Arc;

use snafu::ResultExt;
use tokio_postgres::{Client, NoTls};

use super::{KvBackend, TxnService};
use crate::error::{Error, PostgresFailedSnafu, Result};
use crate::kv_backend::txn::{Txn as KvTxn, TxnResponse as KvTxnResponse};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse,
RangeRequest, RangeResponse,
};
use crate::rpc::KeyValue;

pub struct PgStore {
client: Client,
}

const METADKV_CREATION: &str =
"CREATE TABLE IF NOT EXISTS metakv(k varchar PRIMARY KEY, v varchar);";

impl PgStore {
pub async fn with_url(url: &str) -> Result<KvBackendRef> {
let (client, _) = tokio_postgres::connect(url, NoTls).await.unwrap();
Self::with_pg_client(client).await
}

pub async fn with_pg_client(client: Client) -> Result<KvBackendRef> {
// This step ensure the postgres metadata backend is ready to use by
// checking if metadata kv table exists, and create a new table
// if it does not exist.
client
.execute(METADKV_CREATION, &[])
.await
.context(PostgresFailedSnafu)?;
Ok(Arc::new(Self { client }))
}

/*
SELECT K, V FROM metakv
WHERE K >= start_key and k <= end_key
*/
async fn range_scan(
client: Client,
start_key: String,
end_key: Option<String>,
) -> Result<Vec<KeyValue>> {
todo!()
}

/*
SELECT K, V FROM metakv
WHERE K IN (keys)
*/
async fn batch_scan(client: Client, keys: Vec<String>) -> Result<Vec<KeyValue>> {
todo!()
}

/*
BEGIN;
SELECT K, V FROM metakv FOR UPDATE; --return k, v as previous kvs.
INSERT INTO metakv VALUES (kvs) ON CONFLICT DO UPDATE;
COMMIT;
*/
async fn batch_upsert(client: Client, kvs: Vec<KeyValue>) -> Result<Vec<KeyValue>> {
todo!()
}

/*
DELETE FROM metakv WHERE k >= start_key and k <= end_key RETURNING K, V;
*/
async fn range_delete(
client: Client,
start_key: String,
end_key: Option<String>,
) -> Result<Vec<KeyValue>> {
todo!()
}

/*
DELETE FROM metakv WHERE k IN (kvs) RETURNING K, V;
*/
async fn BATCH_delete(client: Client, kvs: Vec<KeyValue>) -> Result<Vec<KeyValue>> {
todo!()
}
}

#[async_trait::async_trait]
impl KvBackend for PgStore {
fn name(&self) -> &str {
"Postgres"
}

fn as_any(&self) -> &dyn Any {
self
}

async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
todo!()
}

async fn put(&self, req: PutRequest) -> Result<PutResponse> {
todo!()
}

async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
todo!()
}

async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
todo!()
}

async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
todo!()
}

async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
todo!()
}
}

#[async_trait::async_trait]
impl TxnService for PgStore {
type Error = Error;

async fn txn(&self, txn: KvTxn) -> Result<KvTxnResponse> {
todo!()
}

fn max_txn_ops(&self) -> usize {
todo!()
}
}
1 change: 1 addition & 0 deletions src/meta-srv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ snafu.workspace = true
store-api.workspace = true
table.workspace = true
tokio.workspace = true
tokio-postgres.workspace = true
tokio-stream = { workspace = true, features = ["net"] }
toml.workspace = true
tonic.workspace = true
Expand Down
28 changes: 22 additions & 6 deletions src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ use api::v1::meta::procedure_service_server::ProcedureServiceServer;
use api::v1::meta::store_server::StoreServer;
use common_base::Plugins;
use common_config::Configurable;
use common_meta::error::PostgresFailedSnafu;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::postgres::PgStore;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use common_telemetry::info;
use etcd_client::Client;
Expand All @@ -33,17 +35,18 @@ use servers::export_metrics::ExportMetricsTask;
use servers::http::{HttpServer, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler;
use servers::server::Server;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use tokio::net::TcpListener;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio_postgres::NoTls;
use tonic::transport::server::{Router, TcpIncoming};

use crate::election::etcd::EtcdElection;
use crate::error::{InitExportMetricsTaskSnafu, TomlFormatSnafu};
use crate::error::{EmptyServerAddrSnafu, InitExportMetricsTaskSnafu, TomlFormatSnafu};
use crate::lock::etcd::EtcdLock;
use crate::lock::memory::MemLock;
use crate::metasrv::builder::MetasrvBuilder;
use crate::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectorRef};
use crate::selector::lease_based::LeaseBasedSelector;
use crate::selector::load_based::LoadBasedSelector;
use crate::selector::round_robin::RoundRobinSelector;
Expand Down Expand Up @@ -185,14 +188,14 @@ pub async fn metasrv_builder(
plugins: Plugins,
kv_backend: Option<KvBackendRef>,
) -> Result<MetasrvBuilder> {
let (kv_backend, election, lock) = match (kv_backend, opts.use_memory_store) {
let (kv_backend, election, lock) = match (kv_backend, &opts.backend) {
(Some(kv_backend), _) => (kv_backend, None, Some(Arc::new(MemLock::default()) as _)),
(None, true) => (
(None, BackendImpl::MemoryStore) => (
Arc::new(MemoryKvBackend::new()) as _,
None,
Some(Arc::new(MemLock::default()) as _),
),
(None, false) => {
(None, BackendImpl::EtcdStore) => {
let etcd_client = create_etcd_client(opts).await?;
let kv_backend = {
let etcd_backend =
Expand Down Expand Up @@ -222,6 +225,11 @@ pub async fn metasrv_builder(
)?),
)
}
(None, BackendImpl::PostgresStore) => {
let pg_client = create_postgres_client(opts).await?;
let kv_backend = PgStore::with_pg_client(pg_client).await.unwrap();
(kv_backend, None, None)
}
};

let in_memory = Arc::new(MemoryKvBackend::new()) as ResettableKvBackendRef;
Expand Down Expand Up @@ -253,3 +261,11 @@ async fn create_etcd_client(opts: &MetasrvOptions) -> Result<Client> {
.await
.context(error::ConnectEtcdSnafu)
}

async fn create_postgres_client(opts: &MetasrvOptions) -> Result<tokio_postgres::Client> {
let postgres_url = opts.store_addrs.first().context(EmptyServerAddrSnafu)?;
let (client, _) = tokio_postgres::connect(postgres_url, NoTls)
.await
.context(error::ConnectPostgresSnafu)?;
Ok(client)
}
22 changes: 21 additions & 1 deletion src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,23 @@ pub enum Error {
#[snafu(source(from(common_config::error::Error, Box::new)))]
source: Box<common_config::error::Error>,
},
#[snafu(display("Failed to execute via postgres"))]
PostgresFailed {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to connect to PostgresSQL"))]
ConnectPostgres {
#[snafu(source)]
error: tokio_postgres::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Empty server address"))]
EmptyServerAddr {
#[snafu(implicit)]
location: Location,
},
}

impl Error {
Expand Down Expand Up @@ -902,7 +919,10 @@ impl ErrorExt for Error {
| Error::Join { .. }
| Error::WeightArray { .. }
| Error::NotSetWeightArray { .. }
| Error::PeerUnavailable { .. } => StatusCode::Internal,
| Error::PeerUnavailable { .. }
| Error::ConnectPostgres { .. }
| Error::EmptyServerAddr { .. }
| Error::PostgresFailed { .. } => StatusCode::Internal,

Error::Unsupported { .. } => StatusCode::Unsupported,

Expand Down
10 changes: 10 additions & 0 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ pub const TABLE_ID_SEQ: &str = "table_id";
pub const FLOW_ID_SEQ: &str = "flow_id";
pub const METASRV_HOME: &str = "/tmp/metasrv";

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum BackendImpl {
EtcdStore,
MemoryStore,
PostgresStore,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct MetasrvOptions {
Expand Down Expand Up @@ -114,6 +121,8 @@ pub struct MetasrvOptions {
pub max_txn_ops: usize,
/// The tracing options.
pub tracing: TracingOptions,
/// The datastore for kv metadata.
pub backend: BackendImpl,
}

impl Default for MetasrvOptions {
Expand Down Expand Up @@ -146,6 +155,7 @@ impl Default for MetasrvOptions {
store_key_prefix: String::new(),
max_txn_ops: 128,
tracing: TracingOptions::default(),
backend: BackendImpl::EtcdStore,
}
}
}
Expand Down

0 comments on commit 475b628

Please sign in to comment.