From dcbec3d9952d74a0e51c442f919b3d193f3cc351 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Wed, 8 Jan 2025 14:48:09 +0800 Subject: [PATCH 1/9] feat: configurable table name --- config/config.md | 1 + config/metasrv.example.toml | 3 + src/meta-srv/src/bootstrap.rs | 3 +- src/meta-srv/src/election/postgres.rs | 199 ++++++++++++++++++-------- src/meta-srv/src/metasrv.rs | 8 ++ 5 files changed, 150 insertions(+), 64 deletions(-) diff --git a/config/config.md b/config/config.md index 759d34364c2c..f17578864d8c 100644 --- a/config/config.md +++ b/config/config.md @@ -296,6 +296,7 @@ | `store_addrs` | Array | -- | Store server address default to etcd store.
For postgres store, the format is:
"password=password dbname=postgres user=postgres host=localhost port=5432"
For etcd store, the format is:
"127.0.0.1:2379" | | `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. | | `backend` | String | `etcd_store` | The datastore for meta server.
Available values:
- `etcd_store` (default value)
- `memory_store`
- `postgres_store` | +| `meta_table_name` | String | `greptime_metakv` | Table name in RDS to store metadata. Effect when using a RDS kvbackend. | | `selector` | String | `round_robin` | Datanode selector type.
- `round_robin` (default value)
- `lease_based`
- `load_based`
For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". | | `use_memory_store` | Bool | `false` | Store data in memory. | | `enable_region_failover` | Bool | `false` | Whether to enable region failover.
This feature is only available on GreptimeDB running on cluster mode and
- Using Remote WAL
- Using shared storage (e.g., s3). | diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index aaf5569a6d69..996ac3c124dd 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -24,6 +24,9 @@ store_key_prefix = "" ## - `postgres_store` backend = "etcd_store" +## Table name in RDS to store metadata. Effect when using a RDS kvbackend. +meta_table_name = "greptime_metakv" + ## Datanode selector type. ## - `round_robin` (default value) ## - `lease_based` diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 9ed6ed66c3ec..18d3ad52b02e 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -230,7 +230,7 @@ pub async fn metasrv_builder( (None, BackendImpl::PostgresStore) => { let pool = create_postgres_pool(opts).await?; // TODO(CookiePie): use table name from config. - let kv_backend = PgStore::with_pg_pool(pool, "greptime_metakv", opts.max_txn_ops) + let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops) .await .context(error::KvBackendSnafu)?; // Client for election should be created separately since we need a different session keep-alive idle time. @@ -240,6 +240,7 @@ pub async fn metasrv_builder( election_client, opts.store_key_prefix.clone(), CANDIDATE_LEASE_SECS, + opts.meta_table_name.clone(), ) .await?; (kv_backend, Some(election)) diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 192fa682bf5c..115747ef7264 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -35,50 +35,117 @@ use crate::error::{ }; use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; -// TODO(CookiePie): The lock id should be configurable. -const CAMPAIGN: &str = "SELECT pg_try_advisory_lock({})"; -const STEP_DOWN: &str = "SELECT pg_advisory_unlock({})"; -// Currently the session timeout is longer than the leader lease time, so the leader lease may expire while the session is still alive. -// Either the leader reconnects and step down or the session expires and the lock is released. -const SET_IDLE_SESSION_TIMEOUT: &str = "SET idle_session_timeout = '10s';"; - // Separator between value and expire time. const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#; -// SQL to put a value with expire time. Parameters: key, value, LEASE_SEP, expire_time -const PUT_IF_NOT_EXISTS_WITH_EXPIRE_TIME: &str = r#" -WITH prev AS ( - SELECT k, v FROM greptime_metakv WHERE k = $1 -), insert AS ( - INSERT INTO greptime_metakv - VALUES($1, convert_to($2 || $3 || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')) - ON CONFLICT (k) DO NOTHING -) +struct ElectionSqlFactory { + // TODO(CookiePie): The lock id should be configurable. + lock_id: i64, + table_name: String, +} -SELECT k, v FROM prev; -"#; +impl ElectionSqlFactory { + fn new(lock_id: i64, table_name: String) -> Self { + Self { + lock_id, + table_name, + } + } -// SQL to update a value with expire time. Parameters: key, prev_value_with_lease, updated_value, LEASE_SEP, expire_time -const CAS_WITH_EXPIRE_TIME: &str = r#" -UPDATE greptime_metakv -SET k=$1, -v=convert_to($3 || $4 || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $5, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8') -WHERE - k=$1 AND v=$2 -"#; + /// Currently the session timeout is longer than the leader lease time, so the leader lease may expire while the session is still alive. + /// Either the leader reconnects and step down or the session expires and the lock is released. + fn set_idle_session_timeout_sql(&self) -> String { + "SET idle_session_timeout = '10s';".to_string() + } -const GET_WITH_CURRENT_TIMESTAMP: &str = r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM greptime_metakv WHERE k = $1"#; + fn campaign_sql(&self) -> String { + format!("SELECT pg_try_advisory_lock({})", self.lock_id) + } -const PREFIX_GET_WITH_CURRENT_TIMESTAMP: &str = r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM greptime_metakv WHERE k LIKE $1"#; + fn step_down_sql(&self) -> String { + format!("SELECT pg_advisory_unlock({})", self.lock_id) + } -const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE k = $1 RETURNING k,v;"; + /// SQL to put a value with expire time. + /// + /// Parameters for the query: + /// `$1`: key, + /// `$2`: value, + /// `$3`: lease time in seconds + /// + /// Returns: + /// If the key already exists, return the previous value. + fn put_value_with_lease_sql(&self) -> String { + format!( + r#"WITH prev AS ( + SELECT k, v FROM {} WHERE k = $1 + ), insert AS ( + INSERT INTO {} + VALUES($1, convert_to($2 || {} || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $3, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')) + ON CONFLICT (k) DO NOTHING + ) + SELECT k, v FROM prev; + "#, + self.table_name, self.table_name, LEASE_SEP + ) + } -fn campaign_sql(lock_id: u64) -> String { - CAMPAIGN.replace("{}", &lock_id.to_string()) -} + /// SQL to update a value with expire time. + /// + /// Parameters for the query: + /// `$1`: key, + /// `$2`: previous value, + /// `$3`: updated value, + /// `$4`: lease time in seconds + fn update_value_with_lease_sql(&self) -> String { + format!( + r#"UPDATE {} + SET v = convert_to($3 || {} || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8') + WHERE k = $1 AND v = $2"#, + self.table_name, LEASE_SEP + ) + } -fn step_down_sql(lock_id: u64) -> String { - STEP_DOWN.replace("{}", &lock_id.to_string()) + /// SQL to get a value with expire time. + /// + /// Parameters: + /// `$1`: key + fn get_value_with_lease_sql(&self) -> String { + format!( + r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {} WHERE k = $1"#, + self.table_name + ) + } + + /// SQL to get all values with expire time with the given key prefix. + /// + /// Parameters: + /// `$1`: key prefix like 'prefix%' + /// + /// Returns: + /// column 0: value, + /// column 1: current timestamp + fn get_value_with_lease_by_prefix_sql(&self) -> String { + format!( + r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {} WHERE k LIKE $1"#, + self.table_name + ) + } + + /// SQL to delete a value. + /// + /// Parameters: + /// `$1`: key + /// + /// Returns: + /// column 0: key deleted, + /// column 1: value deleted + fn delete_value_sql(&self) -> String { + format!( + "DELETE FROM {} WHERE k = $1 RETURNING k,v;", + self.table_name + ) + } } /// Parse the value and expire time from the given string. The value should be in the format "value || LEASE_SEP || expire_time". @@ -138,7 +205,7 @@ pub struct PgElection { leader_watcher: broadcast::Sender, store_key_prefix: String, candidate_lease_ttl_secs: u64, - lock_id: u64, + sql_factory: ElectionSqlFactory, } impl PgElection { @@ -147,10 +214,13 @@ impl PgElection { client: Client, store_key_prefix: String, candidate_lease_ttl_secs: u64, + table_name: String, ) -> Result { + // TODO(CookiePie): The lock id should be configurable. + let sql_factory = ElectionSqlFactory::new(28319, table_name.clone()); // Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead advisory lock. client - .execute(SET_IDLE_SESSION_TIMEOUT, &[]) + .execute(&sql_factory.set_idle_session_timeout_sql(), &[]) .await .context(PostgresExecutionSnafu)?; @@ -163,8 +233,7 @@ impl PgElection { leader_watcher: tx, store_key_prefix, candidate_lease_ttl_secs, - // TODO(CookiePie): The lock id should be configurable. - lock_id: 28319, + sql_factory, })) } @@ -276,7 +345,7 @@ impl Election for PgElection { loop { let res = self .client - .query(&campaign_sql(self.lock_id), &[]) + .query(&self.sql_factory.campaign_sql(), &[]) .await .context(PostgresExecutionSnafu)?; if let Some(row) = res.first() { @@ -336,7 +405,10 @@ impl PgElection { let key = key.as_bytes().to_vec(); let res = self .client - .query(GET_WITH_CURRENT_TIMESTAMP, &[&key as &(dyn ToSql + Sync)]) + .query( + &self.sql_factory.get_value_with_lease_sql(), + &[&key as &(dyn ToSql + Sync)], + ) .await .context(PostgresExecutionSnafu)?; @@ -379,7 +451,7 @@ impl PgElection { let res = self .client .query( - PREFIX_GET_WITH_CURRENT_TIMESTAMP, + &self.sql_factory.get_value_with_lease_by_prefix_sql(), &[(&key_prefix as &(dyn ToSql + Sync))], ) .await @@ -411,12 +483,11 @@ impl PgElection { let res = self .client .execute( - CAS_WITH_EXPIRE_TIME, + &self.sql_factory.update_value_with_lease_sql(), &[ &key as &(dyn ToSql + Sync), &prev as &(dyn ToSql + Sync), &updated, - &LEASE_SEP, &(self.candidate_lease_ttl_secs as f64), ], ) @@ -445,12 +516,11 @@ impl PgElection { let params: Vec<&(dyn ToSql + Sync)> = vec![ &key as &(dyn ToSql + Sync), &value as &(dyn ToSql + Sync), - &LEASE_SEP, &lease_ttl_secs, ]; let res = self .client - .query(PUT_IF_NOT_EXISTS_WITH_EXPIRE_TIME, ¶ms) + .query(&self.sql_factory.put_value_with_lease_sql(), ¶ms) .await .context(PostgresExecutionSnafu)?; Ok(res.is_empty()) @@ -462,7 +532,10 @@ impl PgElection { let key = key.as_bytes().to_vec(); let res = self .client - .query(POINT_DELETE, &[&key as &(dyn ToSql + Sync)]) + .query( + &self.sql_factory.delete_value_sql(), + &[&key as &(dyn ToSql + Sync)], + ) .await .context(PostgresExecutionSnafu)?; @@ -574,7 +647,7 @@ impl PgElection { { self.delete_value(&key).await?; self.client - .query(&step_down_sql(self.lock_id), &[]) + .query(&self.sql_factory.step_down_sql(), &[]) .await .context(PostgresExecutionSnafu)?; if let Err(e) = self @@ -686,7 +759,7 @@ mod tests { leader_watcher: tx, store_key_prefix: uuid::Uuid::new_v4().to_string(), candidate_lease_ttl_secs: 10, - lock_id: 28319, + sql_factory: ElectionSqlFactory::new(28319, "greptime_metakv".to_string()), }; let res = pg_election @@ -760,7 +833,7 @@ mod tests { leader_watcher: tx, store_key_prefix, candidate_lease_ttl_secs, - lock_id: 28319, + sql_factory: ElectionSqlFactory::new(28319, "greptime_metakv".to_string()), }; let node_info = MetasrvNodeInfo { @@ -802,7 +875,7 @@ mod tests { leader_watcher: tx, store_key_prefix: store_key_prefix.clone(), candidate_lease_ttl_secs, - lock_id: 28319, + sql_factory: ElectionSqlFactory::new(28319, "greptime_metakv".to_string()), }; let candidates = pg_election.all_candidates().await.unwrap(); @@ -843,7 +916,7 @@ mod tests { leader_watcher: tx, store_key_prefix: uuid::Uuid::new_v4().to_string(), candidate_lease_ttl_secs, - lock_id: 28320, + sql_factory: ElectionSqlFactory::new(28320, "greptime_metakv".to_string()), }; leader_pg_election.elected().await.unwrap(); @@ -952,13 +1025,13 @@ mod tests { leader_watcher: tx, store_key_prefix, candidate_lease_ttl_secs, - lock_id: 28321, + sql_factory: ElectionSqlFactory::new(28321, "greptime_metakv".to_string()), }; // Step 1: No leader exists, campaign and elected. let res = leader_pg_election .client - .query(&campaign_sql(leader_pg_election.lock_id), &[]) + .query(&leader_pg_election.sql_factory.campaign_sql(), &[]) .await .unwrap(); let res: bool = res[0].get(0); @@ -989,7 +1062,7 @@ mod tests { // Step 2: As a leader, renew the lease. let res = leader_pg_election .client - .query(&campaign_sql(leader_pg_election.lock_id), &[]) + .query(&leader_pg_election.sql_factory.campaign_sql(), &[]) .await .unwrap(); let res: bool = res[0].get(0); @@ -1009,7 +1082,7 @@ mod tests { let res = leader_pg_election .client - .query(&campaign_sql(leader_pg_election.lock_id), &[]) + .query(&leader_pg_election.sql_factory.campaign_sql(), &[]) .await .unwrap(); let res: bool = res[0].get(0); @@ -1037,7 +1110,7 @@ mod tests { // Step 4: Re-campaign and elected. let res = leader_pg_election .client - .query(&campaign_sql(leader_pg_election.lock_id), &[]) + .query(&leader_pg_election.sql_factory.campaign_sql(), &[]) .await .unwrap(); let res: bool = res[0].get(0); @@ -1094,7 +1167,7 @@ mod tests { // Step 6: Re-campaign and elected. let res = leader_pg_election .client - .query(&campaign_sql(leader_pg_election.lock_id), &[]) + .query(&leader_pg_election.sql_factory.campaign_sql(), &[]) .await .unwrap(); let res: bool = res[0].get(0); @@ -1125,7 +1198,7 @@ mod tests { // Step 7: Something wrong, the leader key changed by others. let res = leader_pg_election .client - .query(&campaign_sql(leader_pg_election.lock_id), &[]) + .query(&leader_pg_election.sql_factory.campaign_sql(), &[]) .await .unwrap(); let res: bool = res[0].get(0); @@ -1162,7 +1235,7 @@ mod tests { // Clean up leader_pg_election .client - .query(&step_down_sql(leader_pg_election.lock_id), &[]) + .query(&leader_pg_election.sql_factory.step_down_sql(), &[]) .await .unwrap(); } @@ -1183,7 +1256,7 @@ mod tests { leader_watcher: tx, store_key_prefix: store_key_prefix.clone(), candidate_lease_ttl_secs, - lock_id: 28322, + sql_factory: ElectionSqlFactory::new(28322, "greptime_metakv".to_string()), }; let leader_client = create_postgres_client().await.unwrap(); @@ -1196,12 +1269,12 @@ mod tests { leader_watcher: tx, store_key_prefix, candidate_lease_ttl_secs, - lock_id: 28322, + sql_factory: ElectionSqlFactory::new(28322, "greptime_metakv".to_string()), }; leader_pg_election .client - .query(&campaign_sql(leader_pg_election.lock_id), &[]) + .query(&leader_pg_election.sql_factory.campaign_sql(), &[]) .await .unwrap(); leader_pg_election.elected().await.unwrap(); @@ -1242,7 +1315,7 @@ mod tests { // Clean up leader_pg_election .client - .query(&step_down_sql(leader_pg_election.lock_id), &[]) + .query(&leader_pg_election.sql_factory.step_down_sql(), &[]) .await .unwrap(); } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index e688b25e04bd..6f63635a4491 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -71,6 +71,9 @@ pub const TABLE_ID_SEQ: &str = "table_id"; pub const FLOW_ID_SEQ: &str = "flow_id"; pub const METASRV_HOME: &str = "/tmp/metasrv"; +#[cfg(feature = "pg_kvbackend")] +pub const DEFAULT_META_TABLE_NAME: &str = "greptime_metakv"; + // The datastores that implements metadata kvbackend. #[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)] #[serde(rename_all = "snake_case")] @@ -140,6 +143,9 @@ pub struct MetasrvOptions { pub tracing: TracingOptions, /// The datastore for kv metadata. pub backend: BackendImpl, + #[cfg(feature = "pg_kvbackend")] + /// Table name of rds kv backend. + pub meta_table_name: String, } impl Default for MetasrvOptions { @@ -174,6 +180,8 @@ impl Default for MetasrvOptions { flush_stats_factor: 3, tracing: TracingOptions::default(), backend: BackendImpl::EtcdStore, + #[cfg(feature = "pg_kvbackend")] + meta_table_name: DEFAULT_META_TABLE_NAME.to_string(), } } } From 61c141b4dd9fdd264107e923cce5f2554f0a6268 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Wed, 8 Jan 2025 15:34:40 +0800 Subject: [PATCH 2/9] fix: election sql --- src/meta-srv/src/election/postgres.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 115747ef7264..963e36369289 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -81,7 +81,7 @@ impl ElectionSqlFactory { SELECT k, v FROM {} WHERE k = $1 ), insert AS ( INSERT INTO {} - VALUES($1, convert_to($2 || {} || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $3, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')) + VALUES($1, convert_to($2 || '{}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $3, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')) ON CONFLICT (k) DO NOTHING ) SELECT k, v FROM prev; @@ -100,7 +100,7 @@ impl ElectionSqlFactory { fn update_value_with_lease_sql(&self) -> String { format!( r#"UPDATE {} - SET v = convert_to($3 || {} || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8') + SET v = convert_to($3 || '{}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8') WHERE k = $1 AND v = $2"#, self.table_name, LEASE_SEP ) From d5b881f704b4fa21c7d34f8cd22694408fb24c30 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Wed, 8 Jan 2025 15:36:09 +0800 Subject: [PATCH 3/9] feat: configurable lock_id --- src/meta-srv/src/bootstrap.rs | 1 + src/meta-srv/src/election/postgres.rs | 9 ++++----- src/meta-srv/src/metasrv.rs | 7 +++++++ 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 18d3ad52b02e..46e5f7115906 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -241,6 +241,7 @@ pub async fn metasrv_builder( opts.store_key_prefix.clone(), CANDIDATE_LEASE_SECS, opts.meta_table_name.clone(), + opts.meta_election_lock_id, ) .await?; (kv_backend, Some(election)) diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 963e36369289..f43a26a748d6 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -39,13 +39,12 @@ use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#; struct ElectionSqlFactory { - // TODO(CookiePie): The lock id should be configurable. - lock_id: i64, + lock_id: u64, table_name: String, } impl ElectionSqlFactory { - fn new(lock_id: i64, table_name: String) -> Self { + fn new(lock_id: u64, table_name: String) -> Self { Self { lock_id, table_name, @@ -215,9 +214,9 @@ impl PgElection { store_key_prefix: String, candidate_lease_ttl_secs: u64, table_name: String, + lock_id: u64, ) -> Result { - // TODO(CookiePie): The lock id should be configurable. - let sql_factory = ElectionSqlFactory::new(28319, table_name.clone()); + let sql_factory = ElectionSqlFactory::new(lock_id, table_name.clone()); // Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead advisory lock. client .execute(&sql_factory.set_idle_session_timeout_sql(), &[]) diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 6f63635a4491..f6c3d637777e 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -73,6 +73,8 @@ pub const METASRV_HOME: &str = "/tmp/metasrv"; #[cfg(feature = "pg_kvbackend")] pub const DEFAULT_META_TABLE_NAME: &str = "greptime_metakv"; +#[cfg(feature = "pg_kvbackend")] +pub const DEFAULT_META_ELECTION_LOCK_ID: u64 = 1; // The datastores that implements metadata kvbackend. #[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)] @@ -146,6 +148,9 @@ pub struct MetasrvOptions { #[cfg(feature = "pg_kvbackend")] /// Table name of rds kv backend. pub meta_table_name: String, + #[cfg(feature = "pg_kvbackend")] + /// Lock id for meta kv election. Only effect when using pg_kvbackend. + pub meta_election_lock_id: u64, } impl Default for MetasrvOptions { @@ -182,6 +187,8 @@ impl Default for MetasrvOptions { backend: BackendImpl::EtcdStore, #[cfg(feature = "pg_kvbackend")] meta_table_name: DEFAULT_META_TABLE_NAME.to_string(), + #[cfg(feature = "pg_kvbackend")] + meta_election_lock_id: DEFAULT_META_ELECTION_LOCK_ID, } } } From dc2975c61076f6ae2ddced09ae8ceefd8db765e2 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Wed, 8 Jan 2025 15:41:51 +0800 Subject: [PATCH 4/9] chore: update config file --- config/config.md | 1 + config/metasrv.example.toml | 3 +++ 2 files changed, 4 insertions(+) diff --git a/config/config.md b/config/config.md index f17578864d8c..e7f357b995d0 100644 --- a/config/config.md +++ b/config/config.md @@ -297,6 +297,7 @@ | `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. | | `backend` | String | `etcd_store` | The datastore for meta server.
Available values:
- `etcd_store` (default value)
- `memory_store`
- `postgres_store` | | `meta_table_name` | String | `greptime_metakv` | Table name in RDS to store metadata. Effect when using a RDS kvbackend. | +| `meta_election_lock_id` | Integer | `1` | Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend | | `selector` | String | `round_robin` | Datanode selector type.
- `round_robin` (default value)
- `lease_based`
- `load_based`
For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". | | `use_memory_store` | Bool | `false` | Store data in memory. | | `enable_region_failover` | Bool | `false` | Whether to enable region failover.
This feature is only available on GreptimeDB running on cluster mode and
- Using Remote WAL
- Using shared storage (e.g., s3). | diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 996ac3c124dd..8434a63a6079 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -27,6 +27,9 @@ backend = "etcd_store" ## Table name in RDS to store metadata. Effect when using a RDS kvbackend. meta_table_name = "greptime_metakv" +## Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend +meta_election_lock_id = 1 + ## Datanode selector type. ## - `round_robin` (default value) ## - `lease_based` From b6721056e75697534699ecf440aa818ab1d54889 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Wed, 8 Jan 2025 16:50:28 +0800 Subject: [PATCH 5/9] perf: useless allocation --- src/meta-srv/src/election/postgres.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index f43a26a748d6..775f11f86a87 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -401,7 +401,7 @@ impl PgElection { key: &str, with_origin: bool, ) -> Result)>> { - let key = key.as_bytes().to_vec(); + let key = key.as_bytes(); let res = self .client .query( @@ -477,8 +477,8 @@ impl PgElection { } async fn update_value_with_lease(&self, key: &str, prev: &str, updated: &str) -> Result<()> { - let key = key.as_bytes().to_vec(); - let prev = prev.as_bytes().to_vec(); + let key = key.as_bytes(); + let prev = prev.as_bytes(); let res = self .client .execute( @@ -496,7 +496,7 @@ impl PgElection { ensure!( res == 1, UnexpectedSnafu { - violated: format!("Failed to update key: {}", String::from_utf8_lossy(&key)), + violated: format!("Failed to update key: {}", String::from_utf8_lossy(key)), } ); @@ -510,7 +510,7 @@ impl PgElection { value: &str, lease_ttl_secs: u64, ) -> Result { - let key = key.as_bytes().to_vec(); + let key = key.as_bytes(); let lease_ttl_secs = lease_ttl_secs as f64; let params: Vec<&(dyn ToSql + Sync)> = vec![ &key as &(dyn ToSql + Sync), @@ -528,7 +528,7 @@ impl PgElection { /// Returns `true` if the deletion is successful. /// Caution: Should only delete the key if the lease is expired. async fn delete_value(&self, key: &str) -> Result { - let key = key.as_bytes().to_vec(); + let key = key.as_bytes(); let res = self .client .query( From c2e6fff082c69e9410ebf73aa49b1d6fa9dac0b3 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Wed, 8 Jan 2025 17:11:15 +0800 Subject: [PATCH 6/9] perf: useless allocation --- src/meta-srv/src/bootstrap.rs | 2 +- src/meta-srv/src/election/postgres.rs | 169 ++++++++++++++------------ 2 files changed, 95 insertions(+), 76 deletions(-) diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 46e5f7115906..aa06205aa4b1 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -240,7 +240,7 @@ pub async fn metasrv_builder( election_client, opts.store_key_prefix.clone(), CANDIDATE_LEASE_SECS, - opts.meta_table_name.clone(), + &opts.meta_table_name, opts.meta_election_lock_id, ) .await?; diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 775f11f86a87..5e18582aa8f4 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -38,23 +38,81 @@ use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; // Separator between value and expire time. const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#; -struct ElectionSqlFactory { +struct ElectionSqlFactory<'a> { lock_id: u64, - table_name: String, + table_name: &'a str, } -impl ElectionSqlFactory { - fn new(lock_id: u64, table_name: String) -> Self { +struct ElectionSqlSet { + campaign: String, + step_down: String, + // SQL to put a value with expire time. + // + // Parameters for the query: + // `$1`: key, + // `$2`: value, + // `$3`: lease time in seconds + // + // Returns: + // If the key already exists, return the previous value. + put_value_with_lease: String, + // SQL to update a value with expire time. + // + // Parameters for the query: + // `$1`: key, + // `$2`: previous value, + // `$3`: updated value, + // `$4`: lease time in seconds + update_value_with_lease: String, + // SQL to get a value with expire time. + // + // Parameters: + // `$1`: key + get_value_with_lease: String, + // SQL to get all values with expire time with the given key prefix. + // + // Parameters: + // `$1`: key prefix like 'prefix%' + // + // Returns: + // column 0: value, + // column 1: current timestamp + get_value_with_lease_by_prefix: String, + // SQL to delete a value. + // + // Parameters: + // `$1`: key + // + // Returns: + // column 0: key deleted, + // column 1: value deleted + delete_value: String, +} + +impl<'a> ElectionSqlFactory<'a> { + fn new(lock_id: u64, table_name: &'a str) -> Self { Self { lock_id, table_name, } } - /// Currently the session timeout is longer than the leader lease time, so the leader lease may expire while the session is still alive. - /// Either the leader reconnects and step down or the session expires and the lock is released. - fn set_idle_session_timeout_sql(&self) -> String { - "SET idle_session_timeout = '10s';".to_string() + fn build(self) -> ElectionSqlSet { + ElectionSqlSet { + campaign: self.campaign_sql(), + step_down: self.step_down_sql(), + put_value_with_lease: self.put_value_with_lease_sql(), + update_value_with_lease: self.update_value_with_lease_sql(), + get_value_with_lease: self.get_value_with_lease_sql(), + get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(), + delete_value: self.delete_value_sql(), + } + } + + // Currently the session timeout is longer than the leader lease time, so the leader lease may expire while the session is still alive. + // Either the leader reconnects and step down or the session expires and the lock is released. + fn set_idle_session_timeout_sql(&self) -> &str { + "SET idle_session_timeout = '10s';" } fn campaign_sql(&self) -> String { @@ -65,15 +123,6 @@ impl ElectionSqlFactory { format!("SELECT pg_advisory_unlock({})", self.lock_id) } - /// SQL to put a value with expire time. - /// - /// Parameters for the query: - /// `$1`: key, - /// `$2`: value, - /// `$3`: lease time in seconds - /// - /// Returns: - /// If the key already exists, return the previous value. fn put_value_with_lease_sql(&self) -> String { format!( r#"WITH prev AS ( @@ -89,13 +138,6 @@ impl ElectionSqlFactory { ) } - /// SQL to update a value with expire time. - /// - /// Parameters for the query: - /// `$1`: key, - /// `$2`: previous value, - /// `$3`: updated value, - /// `$4`: lease time in seconds fn update_value_with_lease_sql(&self) -> String { format!( r#"UPDATE {} @@ -105,10 +147,6 @@ impl ElectionSqlFactory { ) } - /// SQL to get a value with expire time. - /// - /// Parameters: - /// `$1`: key fn get_value_with_lease_sql(&self) -> String { format!( r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {} WHERE k = $1"#, @@ -116,14 +154,6 @@ impl ElectionSqlFactory { ) } - /// SQL to get all values with expire time with the given key prefix. - /// - /// Parameters: - /// `$1`: key prefix like 'prefix%' - /// - /// Returns: - /// column 0: value, - /// column 1: current timestamp fn get_value_with_lease_by_prefix_sql(&self) -> String { format!( r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {} WHERE k LIKE $1"#, @@ -131,14 +161,6 @@ impl ElectionSqlFactory { ) } - /// SQL to delete a value. - /// - /// Parameters: - /// `$1`: key - /// - /// Returns: - /// column 0: key deleted, - /// column 1: value deleted fn delete_value_sql(&self) -> String { format!( "DELETE FROM {} WHERE k = $1 RETURNING k,v;", @@ -204,7 +226,7 @@ pub struct PgElection { leader_watcher: broadcast::Sender, store_key_prefix: String, candidate_lease_ttl_secs: u64, - sql_factory: ElectionSqlFactory, + sql_set: ElectionSqlSet, } impl PgElection { @@ -213,13 +235,13 @@ impl PgElection { client: Client, store_key_prefix: String, candidate_lease_ttl_secs: u64, - table_name: String, + table_name: &str, lock_id: u64, ) -> Result { - let sql_factory = ElectionSqlFactory::new(lock_id, table_name.clone()); + let sql_factory = ElectionSqlFactory::new(lock_id, table_name); // Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead advisory lock. client - .execute(&sql_factory.set_idle_session_timeout_sql(), &[]) + .execute(sql_factory.set_idle_session_timeout_sql(), &[]) .await .context(PostgresExecutionSnafu)?; @@ -232,7 +254,7 @@ impl PgElection { leader_watcher: tx, store_key_prefix, candidate_lease_ttl_secs, - sql_factory, + sql_set: sql_factory.build(), })) } @@ -344,7 +366,7 @@ impl Election for PgElection { loop { let res = self .client - .query(&self.sql_factory.campaign_sql(), &[]) + .query(&self.sql_set.campaign, &[]) .await .context(PostgresExecutionSnafu)?; if let Some(row) = res.first() { @@ -405,7 +427,7 @@ impl PgElection { let res = self .client .query( - &self.sql_factory.get_value_with_lease_sql(), + &self.sql_set.get_value_with_lease, &[&key as &(dyn ToSql + Sync)], ) .await @@ -450,7 +472,7 @@ impl PgElection { let res = self .client .query( - &self.sql_factory.get_value_with_lease_by_prefix_sql(), + &self.sql_set.get_value_with_lease_by_prefix, &[(&key_prefix as &(dyn ToSql + Sync))], ) .await @@ -482,7 +504,7 @@ impl PgElection { let res = self .client .execute( - &self.sql_factory.update_value_with_lease_sql(), + &self.sql_set.update_value_with_lease, &[ &key as &(dyn ToSql + Sync), &prev as &(dyn ToSql + Sync), @@ -519,7 +541,7 @@ impl PgElection { ]; let res = self .client - .query(&self.sql_factory.put_value_with_lease_sql(), ¶ms) + .query(&self.sql_set.put_value_with_lease, ¶ms) .await .context(PostgresExecutionSnafu)?; Ok(res.is_empty()) @@ -531,10 +553,7 @@ impl PgElection { let key = key.as_bytes(); let res = self .client - .query( - &self.sql_factory.delete_value_sql(), - &[&key as &(dyn ToSql + Sync)], - ) + .query(&self.sql_set.delete_value, &[&key as &(dyn ToSql + Sync)]) .await .context(PostgresExecutionSnafu)?; @@ -646,7 +665,7 @@ impl PgElection { { self.delete_value(&key).await?; self.client - .query(&self.sql_factory.step_down_sql(), &[]) + .query(&self.sql_set.step_down, &[]) .await .context(PostgresExecutionSnafu)?; if let Err(e) = self @@ -758,7 +777,7 @@ mod tests { leader_watcher: tx, store_key_prefix: uuid::Uuid::new_v4().to_string(), candidate_lease_ttl_secs: 10, - sql_factory: ElectionSqlFactory::new(28319, "greptime_metakv".to_string()), + sql_set: ElectionSqlFactory::new(28319, "greptime_metakv").build(), }; let res = pg_election @@ -832,7 +851,7 @@ mod tests { leader_watcher: tx, store_key_prefix, candidate_lease_ttl_secs, - sql_factory: ElectionSqlFactory::new(28319, "greptime_metakv".to_string()), + sql_set: ElectionSqlFactory::new(28319, "greptime_metakv").build(), }; let node_info = MetasrvNodeInfo { @@ -874,7 +893,7 @@ mod tests { leader_watcher: tx, store_key_prefix: store_key_prefix.clone(), candidate_lease_ttl_secs, - sql_factory: ElectionSqlFactory::new(28319, "greptime_metakv".to_string()), + sql_set: ElectionSqlFactory::new(28319, "greptime_metakv").build(), }; let candidates = pg_election.all_candidates().await.unwrap(); @@ -915,7 +934,7 @@ mod tests { leader_watcher: tx, store_key_prefix: uuid::Uuid::new_v4().to_string(), candidate_lease_ttl_secs, - sql_factory: ElectionSqlFactory::new(28320, "greptime_metakv".to_string()), + sql_set: ElectionSqlFactory::new(28320, "greptime_metakv").build(), }; leader_pg_election.elected().await.unwrap(); @@ -1024,13 +1043,13 @@ mod tests { leader_watcher: tx, store_key_prefix, candidate_lease_ttl_secs, - sql_factory: ElectionSqlFactory::new(28321, "greptime_metakv".to_string()), + sql_set: ElectionSqlFactory::new(28321, "greptime_metakv").build(), }; // Step 1: No leader exists, campaign and elected. let res = leader_pg_election .client - .query(&leader_pg_election.sql_factory.campaign_sql(), &[]) + .query(&leader_pg_election.sql_set.campaign, &[]) .await .unwrap(); let res: bool = res[0].get(0); @@ -1061,7 +1080,7 @@ mod tests { // Step 2: As a leader, renew the lease. let res = leader_pg_election .client - .query(&leader_pg_election.sql_factory.campaign_sql(), &[]) + .query(&leader_pg_election.sql_set.campaign, &[]) .await .unwrap(); let res: bool = res[0].get(0); @@ -1081,7 +1100,7 @@ mod tests { let res = leader_pg_election .client - .query(&leader_pg_election.sql_factory.campaign_sql(), &[]) + .query(&leader_pg_election.sql_set.campaign, &[]) .await .unwrap(); let res: bool = res[0].get(0); @@ -1109,7 +1128,7 @@ mod tests { // Step 4: Re-campaign and elected. let res = leader_pg_election .client - .query(&leader_pg_election.sql_factory.campaign_sql(), &[]) + .query(&leader_pg_election.sql_set.campaign, &[]) .await .unwrap(); let res: bool = res[0].get(0); @@ -1166,7 +1185,7 @@ mod tests { // Step 6: Re-campaign and elected. let res = leader_pg_election .client - .query(&leader_pg_election.sql_factory.campaign_sql(), &[]) + .query(&leader_pg_election.sql_set.campaign, &[]) .await .unwrap(); let res: bool = res[0].get(0); @@ -1197,7 +1216,7 @@ mod tests { // Step 7: Something wrong, the leader key changed by others. let res = leader_pg_election .client - .query(&leader_pg_election.sql_factory.campaign_sql(), &[]) + .query(&leader_pg_election.sql_set.campaign, &[]) .await .unwrap(); let res: bool = res[0].get(0); @@ -1234,7 +1253,7 @@ mod tests { // Clean up leader_pg_election .client - .query(&leader_pg_election.sql_factory.step_down_sql(), &[]) + .query(&leader_pg_election.sql_set.step_down, &[]) .await .unwrap(); } @@ -1255,7 +1274,7 @@ mod tests { leader_watcher: tx, store_key_prefix: store_key_prefix.clone(), candidate_lease_ttl_secs, - sql_factory: ElectionSqlFactory::new(28322, "greptime_metakv".to_string()), + sql_set: ElectionSqlFactory::new(28322, "greptime_metakv").build(), }; let leader_client = create_postgres_client().await.unwrap(); @@ -1268,12 +1287,12 @@ mod tests { leader_watcher: tx, store_key_prefix, candidate_lease_ttl_secs, - sql_factory: ElectionSqlFactory::new(28322, "greptime_metakv".to_string()), + sql_set: ElectionSqlFactory::new(28322, "greptime_metakv").build(), }; leader_pg_election .client - .query(&leader_pg_election.sql_factory.campaign_sql(), &[]) + .query(&leader_pg_election.sql_set.campaign, &[]) .await .unwrap(); leader_pg_election.elected().await.unwrap(); @@ -1314,7 +1333,7 @@ mod tests { // Clean up leader_pg_election .client - .query(&leader_pg_election.sql_factory.step_down_sql(), &[]) + .query(&leader_pg_election.sql_set.step_down, &[]) .await .unwrap(); } From 99205f93f4bcceb6ec24ada421eba312f7a2c45c Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Wed, 8 Jan 2025 17:23:07 +0800 Subject: [PATCH 7/9] chore: remove unused type hint --- src/meta-srv/src/election/postgres.rs | 22 ++++++---------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 5e18582aa8f4..2aee20d0f930 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -426,10 +426,7 @@ impl PgElection { let key = key.as_bytes(); let res = self .client - .query( - &self.sql_set.get_value_with_lease, - &[&key as &(dyn ToSql + Sync)], - ) + .query(&self.sql_set.get_value_with_lease, &[&key]) .await .context(PostgresExecutionSnafu)?; @@ -471,10 +468,7 @@ impl PgElection { let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec(); let res = self .client - .query( - &self.sql_set.get_value_with_lease_by_prefix, - &[(&key_prefix as &(dyn ToSql + Sync))], - ) + .query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix]) .await .context(PostgresExecutionSnafu)?; @@ -506,8 +500,8 @@ impl PgElection { .execute( &self.sql_set.update_value_with_lease, &[ - &key as &(dyn ToSql + Sync), - &prev as &(dyn ToSql + Sync), + &key, + &prev, &updated, &(self.candidate_lease_ttl_secs as f64), ], @@ -534,11 +528,7 @@ impl PgElection { ) -> Result { let key = key.as_bytes(); let lease_ttl_secs = lease_ttl_secs as f64; - let params: Vec<&(dyn ToSql + Sync)> = vec![ - &key as &(dyn ToSql + Sync), - &value as &(dyn ToSql + Sync), - &lease_ttl_secs, - ]; + let params: Vec<&(dyn ToSql + Sync)> = vec![&key, &value, &lease_ttl_secs]; let res = self .client .query(&self.sql_set.put_value_with_lease, ¶ms) @@ -553,7 +543,7 @@ impl PgElection { let key = key.as_bytes(); let res = self .client - .query(&self.sql_set.delete_value, &[&key as &(dyn ToSql + Sync)]) + .query(&self.sql_set.delete_value, &[&key]) .await .context(PostgresExecutionSnafu)?; From 01460994ed125bdaf07408b03cc963db2279488f Mon Sep 17 00:00:00 2001 From: Yohan Wal <1035325592@qq.com> Date: Wed, 8 Jan 2025 17:27:46 +0800 Subject: [PATCH 8/9] Apply suggestions from code review Co-authored-by: Weny Xu --- config/metasrv.example.toml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 8434a63a6079..b2b748c7f671 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -25,9 +25,11 @@ store_key_prefix = "" backend = "etcd_store" ## Table name in RDS to store metadata. Effect when using a RDS kvbackend. +## **Only used when backend is `postgres_store`.** meta_table_name = "greptime_metakv" ## Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend +## Only used when backend is `postgres_store`. meta_election_lock_id = 1 ## Datanode selector type. From b8df128528d1bfdfcf17df1b74a23888fd875d82 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Wed, 8 Jan 2025 17:32:39 +0800 Subject: [PATCH 9/9] chore: update config file --- config/config.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config/config.md b/config/config.md index e7f357b995d0..d206dece89e4 100644 --- a/config/config.md +++ b/config/config.md @@ -296,8 +296,8 @@ | `store_addrs` | Array | -- | Store server address default to etcd store.
For postgres store, the format is:
"password=password dbname=postgres user=postgres host=localhost port=5432"
For etcd store, the format is:
"127.0.0.1:2379" | | `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. | | `backend` | String | `etcd_store` | The datastore for meta server.
Available values:
- `etcd_store` (default value)
- `memory_store`
- `postgres_store` | -| `meta_table_name` | String | `greptime_metakv` | Table name in RDS to store metadata. Effect when using a RDS kvbackend. | -| `meta_election_lock_id` | Integer | `1` | Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend | +| `meta_table_name` | String | `greptime_metakv` | Table name in RDS to store metadata. Effect when using a RDS kvbackend.
**Only used when backend is `postgres_store`.** | +| `meta_election_lock_id` | Integer | `1` | Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend
Only used when backend is `postgres_store`. | | `selector` | String | `round_robin` | Datanode selector type.
- `round_robin` (default value)
- `lease_based`
- `load_based`
For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". | | `use_memory_store` | Bool | `false` | Store data in memory. | | `enable_region_failover` | Bool | `false` | Whether to enable region failover.
This feature is only available on GreptimeDB running on cluster mode and
- Using Remote WAL
- Using shared storage (e.g., s3). |