diff --git a/src/catalog/src/kvbackend/client.rs b/src/catalog/src/kvbackend/client.rs index 9535ab7c77ad..e0ccb1cc4175 100644 --- a/src/catalog/src/kvbackend/client.rs +++ b/src/catalog/src/kvbackend/client.rs @@ -364,6 +364,10 @@ impl KvBackend for MetaKvBackend { "MetaKvBackend" } + fn as_any(&self) -> &dyn Any { + self + } + async fn range(&self, req: RangeRequest) -> Result { self.client .range(req) @@ -372,17 +376,12 @@ impl KvBackend for MetaKvBackend { .context(ExternalSnafu) } - async fn get(&self, key: &[u8]) -> Result> { - let mut response = self - .client - .range(RangeRequest::new().with_key(key)) + async fn put(&self, req: PutRequest) -> Result { + self.client + .put(req) .await .map_err(BoxedError::new) - .context(ExternalSnafu)?; - Ok(response.take_kvs().get_mut(0).map(|kv| KeyValue { - key: kv.take_key(), - value: kv.take_value(), - })) + .context(ExternalSnafu) } async fn batch_put(&self, req: BatchPutRequest) -> Result { @@ -393,51 +392,52 @@ impl KvBackend for MetaKvBackend { .context(ExternalSnafu) } - async fn put(&self, req: PutRequest) -> Result { + async fn batch_get(&self, req: BatchGetRequest) -> Result { self.client - .put(req) + .batch_get(req) .await .map_err(BoxedError::new) .context(ExternalSnafu) } - async fn delete_range(&self, req: DeleteRangeRequest) -> Result { + async fn compare_and_put( + &self, + request: CompareAndPutRequest, + ) -> Result { self.client - .delete_range(req) + .compare_and_put(request) .await .map_err(BoxedError::new) .context(ExternalSnafu) } - async fn batch_delete(&self, req: BatchDeleteRequest) -> Result { + async fn delete_range(&self, req: DeleteRangeRequest) -> Result { self.client - .batch_delete(req) + .delete_range(req) .await .map_err(BoxedError::new) .context(ExternalSnafu) } - async fn batch_get(&self, req: BatchGetRequest) -> Result { + async fn batch_delete(&self, req: BatchDeleteRequest) -> Result { self.client - .batch_get(req) + .batch_delete(req) .await .map_err(BoxedError::new) .context(ExternalSnafu) } - async fn compare_and_put( - &self, - request: CompareAndPutRequest, - ) -> Result { - self.client - .compare_and_put(request) + async fn get(&self, key: &[u8]) -> Result> { + let mut response = self + .client + .range(RangeRequest::new().with_key(key)) .await .map_err(BoxedError::new) - .context(ExternalSnafu) - } - - fn as_any(&self) -> &dyn Any { - self + .context(ExternalSnafu)?; + Ok(response.take_kvs().get_mut(0).map(|kv| KeyValue { + key: kv.take_key(), + value: kv.take_value(), + })) } } diff --git a/src/common/meta/src/cluster.rs b/src/common/meta/src/cluster.rs new file mode 100644 index 000000000000..5a96c095927f --- /dev/null +++ b/src/common/meta/src/cluster.rs @@ -0,0 +1,300 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::str::FromStr; + +use common_error::ext::ErrorExt; +use lazy_static::lazy_static; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, OptionExt, ResultExt}; + +use crate::error::{ + DecodeJsonSnafu, EncodeJsonSnafu, Error, FromUtf8Snafu, InvalidNodeInfoKeySnafu, + InvalidRoleSnafu, ParseNumSnafu, Result, +}; +use crate::peer::Peer; + +const CLUSTER_NODE_INFO_PREFIX: &str = "__meta_cluster_node_info"; + +lazy_static! { + static ref CLUSTER_NODE_INFO_PREFIX_PATTERN: Regex = Regex::new(&format!( + "^{CLUSTER_NODE_INFO_PREFIX}-([0-9]+)-([0-9]+)-([0-9]+)$" + )) + .unwrap(); +} + +/// [ClusterInfo] provides information about the cluster. +#[async_trait::async_trait] +pub trait ClusterInfo { + type Error: ErrorExt; + + /// List all nodes by role in the cluster. If `role` is `None`, list all nodes. + async fn list_nodes( + &self, + role: Option, + ) -> std::result::Result, Self::Error>; + + // TODO(jeremy): Other info, like region status, etc. +} + +/// The key of [NodeInfo] in the storage. The format is `__meta_cluster_node_info-{cluster_id}-{role}-{node_id}`. +#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)] +pub struct NodeInfoKey { + /// The cluster id. + pub cluster_id: u64, + /// The role of the node. It can be [Role::Datanode], [Role::Frontend], or [Role::Metasrv]. + pub role: Role, + /// The node id. + pub node_id: u64, +} + +impl NodeInfoKey { + pub fn key_prefix_with_cluster_id(cluster_id: u64) -> String { + format!("{}-{}-", CLUSTER_NODE_INFO_PREFIX, cluster_id) + } + + pub fn key_prefix_with_role(cluster_id: u64, role: Role) -> String { + format!( + "{}-{}-{}-", + CLUSTER_NODE_INFO_PREFIX, + cluster_id, + i32::from(role) + ) + } +} + +/// The information of a node in the cluster. +#[derive(Debug, Serialize, Deserialize)] +pub struct NodeInfo { + /// The peer information. [node_id, address] + pub peer: Peer, + /// Last activity time in milliseconds. + pub last_activity_ts: i64, + /// The status of the node. Different roles have different node status. + pub status: NodeStatus, +} + +#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)] +pub enum Role { + Datanode, + Frontend, + Metasrv, +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum NodeStatus { + Datanode(DatanodeStatus), + Frontend(FrontendStatus), + Metasrv(MetasrvStatus), +} + +/// The status of a datanode. +#[derive(Debug, Serialize, Deserialize)] +pub struct DatanodeStatus { + /// The read capacity units during this period. + pub rcus: i64, + /// The write capacity units during this period. + pub wcus: i64, + /// How many leader regions on this node. + pub leader_regions: usize, + /// How many follower regions on this node. + pub follower_regions: usize, +} + +/// The status of a frontend. +#[derive(Debug, Serialize, Deserialize)] +pub struct FrontendStatus {} + +/// The status of a metasrv. +#[derive(Debug, Serialize, Deserialize)] +pub struct MetasrvStatus { + pub is_leader: bool, +} + +impl FromStr for NodeInfoKey { + type Err = Error; + + fn from_str(key: &str) -> Result { + let caps = CLUSTER_NODE_INFO_PREFIX_PATTERN + .captures(key) + .context(InvalidNodeInfoKeySnafu { key })?; + + ensure!(caps.len() == 4, InvalidNodeInfoKeySnafu { key }); + + let cluster_id = caps[1].to_string(); + let role = caps[2].to_string(); + let node_id = caps[3].to_string(); + let cluster_id: u64 = cluster_id.parse().context(ParseNumSnafu { + err_msg: format!("invalid cluster_id: {cluster_id}"), + })?; + let role: i32 = role.parse().context(ParseNumSnafu { + err_msg: format!("invalid role {role}"), + })?; + let role = Role::try_from(role)?; + let node_id: u64 = node_id.parse().context(ParseNumSnafu { + err_msg: format!("invalid node_id: {node_id}"), + })?; + + Ok(Self { + cluster_id, + role, + node_id, + }) + } +} + +impl TryFrom> for NodeInfoKey { + type Error = Error; + + fn try_from(bytes: Vec) -> Result { + String::from_utf8(bytes) + .context(FromUtf8Snafu { + name: "NodeInfoKey", + }) + .map(|x| x.parse())? + } +} + +impl From for Vec { + fn from(key: NodeInfoKey) -> Self { + format!( + "{}-{}-{}-{}", + CLUSTER_NODE_INFO_PREFIX, + key.cluster_id, + i32::from(key.role), + key.node_id + ) + .into_bytes() + } +} + +impl FromStr for NodeInfo { + type Err = Error; + + fn from_str(value: &str) -> Result { + serde_json::from_str(value).context(DecodeJsonSnafu) + } +} + +impl TryFrom> for NodeInfo { + type Error = Error; + + fn try_from(bytes: Vec) -> Result { + String::from_utf8(bytes) + .context(FromUtf8Snafu { name: "NodeInfo" }) + .map(|x| x.parse())? + } +} + +impl TryFrom for Vec { + type Error = Error; + + fn try_from(info: NodeInfo) -> Result { + Ok(serde_json::to_string(&info) + .context(EncodeJsonSnafu)? + .into_bytes()) + } +} + +impl From for i32 { + fn from(role: Role) -> Self { + match role { + Role::Datanode => 0, + Role::Frontend => 1, + Role::Metasrv => 2, + } + } +} + +impl TryFrom for Role { + type Error = Error; + + fn try_from(role: i32) -> Result { + match role { + 0 => Ok(Self::Datanode), + 1 => Ok(Self::Frontend), + 2 => Ok(Self::Metasrv), + _ => InvalidRoleSnafu { role }.fail(), + } + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use crate::cluster::Role::{Datanode, Frontend}; + use crate::cluster::{DatanodeStatus, NodeInfo, NodeInfoKey, NodeStatus}; + use crate::peer::Peer; + + #[test] + fn test_node_info_key_round_trip() { + let key = NodeInfoKey { + cluster_id: 1, + role: Datanode, + node_id: 2, + }; + + let key_bytes: Vec = key.into(); + let new_key: NodeInfoKey = key_bytes.try_into().unwrap(); + + assert_eq!(1, new_key.cluster_id); + assert_eq!(Datanode, new_key.role); + assert_eq!(2, new_key.node_id); + } + + #[test] + fn test_node_info_round_trip() { + let node_info = NodeInfo { + peer: Peer { + id: 1, + addr: "127.0.0.1".to_string(), + }, + last_activity_ts: 123, + status: NodeStatus::Datanode(DatanodeStatus { + rcus: 1, + wcus: 2, + leader_regions: 3, + follower_regions: 4, + }), + }; + + let node_info_bytes: Vec = node_info.try_into().unwrap(); + let new_node_info: NodeInfo = node_info_bytes.try_into().unwrap(); + + assert_matches!( + new_node_info, + NodeInfo { + peer: Peer { id: 1, .. }, + last_activity_ts: 123, + status: NodeStatus::Datanode(DatanodeStatus { + rcus: 1, + wcus: 2, + leader_regions: 3, + follower_regions: 4, + }), + } + ); + } + + #[test] + fn test_node_info_key_prefix() { + let prefix = NodeInfoKey::key_prefix_with_cluster_id(1); + assert_eq!(prefix, "__meta_cluster_node_info-1-"); + + let prefix = NodeInfoKey::key_prefix_with_role(2, Frontend); + assert_eq!(prefix, "__meta_cluster_node_info-2-1-"); + } +} diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index a4728676bafd..87ee269946cf 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -406,6 +406,28 @@ pub enum Error { #[snafu(display("Create logical tables invalid arguments: {}", err_msg))] CreateLogicalTablesInvalidArguments { err_msg: String, location: Location }, + + #[snafu(display("Invalid node info key: {}", key))] + InvalidNodeInfoKey { key: String, location: Location }, + + #[snafu(display("Failed to parse number: {}", err_msg))] + ParseNum { + err_msg: String, + #[snafu(source)] + error: std::num::ParseIntError, + location: Location, + }, + + #[snafu(display("Invalid role: {}", role))] + InvalidRole { role: i32, location: Location }, + + #[snafu(display("Failed to parse {} from utf8", name))] + FromUtf8 { + name: String, + #[snafu(source)] + error: std::string::FromUtf8Error, + location: Location, + }, } pub type Result = std::result::Result; @@ -452,6 +474,7 @@ impl ErrorExt for Error { | EmptyTopicPool { .. } | UnexpectedLogicalRouteTable { .. } | ProcedureOutput { .. } + | FromUtf8 { .. } | MetadataCorruption { .. } => StatusCode::Unexpected, SendMessage { .. } @@ -486,6 +509,9 @@ impl ErrorExt for Error { ParseProcedureId { .. } | InvalidNumTopics { .. } | SchemaNotFound { .. } + | InvalidNodeInfoKey { .. } + | ParseNum { .. } + | InvalidRole { .. } | EmptyDdlTasks { .. } => StatusCode::InvalidArguments, } } diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 7f515b79cd55..3737bd94a7e4 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -18,6 +18,7 @@ #![feature(let_chains)] pub mod cache_invalidator; +pub mod cluster; pub mod datanode_manager; pub mod ddl; pub mod ddl_manager; diff --git a/src/common/meta/src/rpc.rs b/src/common/meta/src/rpc.rs index 978a43cd25b3..a11c5164b87b 100644 --- a/src/common/meta/src/rpc.rs +++ b/src/common/meta/src/rpc.rs @@ -17,7 +17,6 @@ pub mod lock; pub mod procedure; pub mod router; pub mod store; -pub mod util; use std::fmt::{Display, Formatter}; diff --git a/src/common/meta/src/rpc/store.rs b/src/common/meta/src/rpc/store.rs index 3156a8b29639..f2b43a9c3a28 100644 --- a/src/common/meta/src/rpc/store.rs +++ b/src/common/meta/src/rpc/store.rs @@ -26,9 +26,9 @@ use api::v1::meta::{ ResponseHeader as PbResponseHeader, }; -use crate::error; use crate::error::Result; -use crate::rpc::{util, KeyValue}; +use crate::rpc::KeyValue; +use crate::{error, util}; pub fn to_range(key: Vec, range_end: Vec) -> (Bound>, Bound>) { match (&key[..], &range_end[..]) { diff --git a/src/common/meta/src/rpc/util.rs b/src/common/meta/src/rpc/util.rs deleted file mode 100644 index 3df5a8630ce4..000000000000 --- a/src/common/meta/src/rpc/util.rs +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use api::v1::meta::ResponseHeader; - -use crate::error; -use crate::error::Result; - -#[inline] -pub fn check_response_header(header: Option<&ResponseHeader>) -> Result<()> { - if let Some(header) = header { - if let Some(error) = &header.error { - let code = error.code; - let err_msg = &error.err_msg; - return error::IllegalServerStateSnafu { code, err_msg }.fail(); - } - } - - Ok(()) -} - -/// Get prefix end key of `key`. -#[inline] -pub fn get_prefix_end_key(key: &[u8]) -> Vec { - for (i, v) in key.iter().enumerate().rev() { - if *v < 0xFF { - let mut end = Vec::from(&key[..=i]); - end[i] = *v + 1; - return end; - } - } - - // next prefix does not exist (e.g., 0xffff); - vec![0] -} diff --git a/src/common/meta/src/util.rs b/src/common/meta/src/util.rs index 7a823aba9d30..e7a8eba3039c 100644 --- a/src/common/meta/src/util.rs +++ b/src/common/meta/src/util.rs @@ -12,8 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use api::v1::meta::ResponseHeader; + +use crate::error::{IllegalServerStateSnafu, Result}; + /// Get prefix end key of `key`. -#[inline] pub fn get_prefix_end_key(key: &[u8]) -> Vec { for (i, v) in key.iter().enumerate().rev() { if *v < 0xFF { @@ -27,8 +30,19 @@ pub fn get_prefix_end_key(key: &[u8]) -> Vec { vec![0] } +pub fn check_response_header(header: Option<&ResponseHeader>) -> Result<()> { + if let Some(header) = header { + if let Some(error) = &header.error { + let code = error.code; + let err_msg = &error.err_msg; + return IllegalServerStateSnafu { code, err_msg }.fail(); + } + } + + Ok(()) +} + /// Get next prefix key of `key`. -#[inline] pub fn get_next_prefix_key(key: &[u8]) -> Vec { let mut next = Vec::with_capacity(key.len() + 1); next.extend_from_slice(key); diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index bb9a0e8609a3..d623bbc3c311 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -144,12 +144,14 @@ impl Instance { let channel_manager = ChannelManager::with_config(channel_config); let ddl_channel_manager = ChannelManager::with_config(ddl_channel_config); - let cluster_id = 0; // TODO(jeremy): read from config - let mut meta_client = MetaClientBuilder::new(cluster_id, 0, Role::Frontend) + let cluster_id = 0; // It is currently a reserved field and has not been enabled. + let member_id = 0; // Frontend does not need a member id. + let mut meta_client = MetaClientBuilder::new(cluster_id, member_id, Role::Frontend) .enable_router() .enable_store() .enable_heartbeat() .enable_procedure() + .enable_access_cluster_info() .channel_manager(channel_manager) .ddl_channel_manager(ddl_channel_manager) .build(); diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 94122b81a7c6..b37e74fe8831 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -18,9 +18,12 @@ mod load_balance; mod lock; mod procedure; +mod cluster; mod store; +mod util; use api::v1::meta::Role; +use cluster::Client as ClusterClient; use common_error::ext::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::ddl::{ExecutorContext, ProcedureExecutor}; @@ -50,6 +53,7 @@ pub type Id = (u64, u64); const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3; const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3; +const DEFAULT_CLUSTER_CLIENT_MAX_RETRY: usize = 3; #[derive(Clone, Debug, Default)] pub struct MetaClientBuilder { @@ -60,6 +64,7 @@ pub struct MetaClientBuilder { enable_store: bool, enable_lock: bool, enable_procedure: bool, + enable_access_cluster_info: bool, channel_manager: Option, ddl_channel_manager: Option, heartbeat_channel_manager: Option, @@ -109,6 +114,13 @@ impl MetaClientBuilder { } } + pub fn enable_access_cluster_info(self) -> Self { + Self { + enable_access_cluster_info: true, + ..self + } + } + pub fn channel_manager(self, channel_manager: ChannelManager) -> Self { Self { channel_manager: Some(channel_manager), @@ -159,7 +171,7 @@ impl MetaClientBuilder { client.lock = Some(LockClient::new(self.id, self.role, mgr.clone())); } if self.enable_procedure { - let mgr = self.ddl_channel_manager.unwrap_or(mgr); + let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone()); client.procedure = Some(ProcedureClient::new( self.id, self.role, @@ -167,6 +179,14 @@ impl MetaClientBuilder { DEFAULT_SUBMIT_DDL_MAX_RETRY, )); } + if self.enable_access_cluster_info { + client.cluster = Some(ClusterClient::new( + self.id, + self.role, + mgr, + DEFAULT_CLUSTER_CLIENT_MAX_RETRY, + )) + } client } @@ -180,6 +200,7 @@ pub struct MetaClient { store: Option, lock: Option, procedure: Option, + cluster: Option, } #[async_trait::async_trait] @@ -254,9 +275,13 @@ impl MetaClient { info!("Lock client started"); } if let Some(client) = &mut self.procedure { - client.start(urls).await?; + client.start(urls.clone()).await?; info!("DDL client started"); } + if let Some(client) = &mut self.cluster { + client.start(urls).await?; + info!("Cluster client started"); + } Ok(()) } @@ -493,7 +518,6 @@ mod tests { let _ = meta_client.heartbeat_client().unwrap(); assert!(meta_client.store_client().is_err()); meta_client.start(urls).await.unwrap(); - assert!(meta_client.heartbeat_client().unwrap().is_started().await); let mut meta_client = MetaClientBuilder::new(0, 0, Role::Datanode) .enable_router() @@ -508,7 +532,6 @@ mod tests { assert!(meta_client.heartbeat_client().is_err()); let _ = meta_client.store_client().unwrap(); meta_client.start(urls).await.unwrap(); - assert!(meta_client.store_client().unwrap().is_started().await); let mut meta_client = MetaClientBuilder::new(1, 2, Role::Datanode) .enable_heartbeat() @@ -520,8 +543,6 @@ mod tests { let _ = meta_client.heartbeat_client().unwrap(); let _ = meta_client.store_client().unwrap(); meta_client.start(urls).await.unwrap(); - assert!(meta_client.heartbeat_client().unwrap().is_started().await); - assert!(meta_client.store_client().unwrap().is_started().await); } #[tokio::test] diff --git a/src/meta-client/src/client/cluster.rs b/src/meta-client/src/client/cluster.rs new file mode 100644 index 000000000000..3e9568d06a87 --- /dev/null +++ b/src/meta-client/src/client/cluster.rs @@ -0,0 +1,242 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::future::Future; +use std::sync::Arc; + +use api::greptime_proto::v1; +use api::v1::meta::cluster_client::ClusterClient; +use api::v1::meta::{ResponseHeader, Role}; +use common_grpc::channel_manager::ChannelManager; +use common_meta::cluster; +use common_meta::cluster::{ClusterInfo, NodeInfo, NodeInfoKey}; +use common_meta::rpc::store::{BatchGetRequest, BatchGetResponse, RangeRequest, RangeResponse}; +use common_telemetry::{info, warn}; +use snafu::{ensure, ResultExt}; +use tokio::sync::RwLock; +use tonic::transport::Channel; +use tonic::Status; + +use crate::client::ask_leader::AskLeader; +use crate::client::{util, Id}; +use crate::error::{ + ConvertMetaResponseSnafu, CreateChannelSnafu, Error, IllegalGrpcClientStateSnafu, Result, + RetryTimesExceededSnafu, +}; + +#[derive(Clone, Debug)] +pub struct Client { + inner: Arc>, +} + +impl Client { + pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self { + let inner = Arc::new(RwLock::new(Inner { + id, + role, + channel_manager, + ask_leader: None, + max_retry, + })); + + Self { inner } + } + + pub async fn start(&mut self, urls: A) -> Result<()> + where + U: AsRef, + A: AsRef<[U]>, + { + let mut inner = self.inner.write().await; + inner.start(urls).await + } + + pub async fn range(&self, req: RangeRequest) -> Result { + let inner = self.inner.read().await; + inner.range(req).await + } + + #[allow(dead_code)] + pub async fn batch_get(&self, req: BatchGetRequest) -> Result { + let inner = self.inner.read().await; + inner.batch_get(req).await + } +} + +#[async_trait::async_trait] +impl ClusterInfo for Client { + type Error = Error; + + async fn list_nodes(&self, role: Option) -> Result> { + let cluster_id = self.inner.read().await.id.0; + let key_prefix = match role { + None => NodeInfoKey::key_prefix_with_cluster_id(cluster_id), + Some(role) => NodeInfoKey::key_prefix_with_role(cluster_id, role), + }; + + let req = RangeRequest::new().with_prefix(key_prefix); + + let res = self.range(req).await?; + + res.kvs + .into_iter() + .map(|kv| NodeInfo::try_from(kv.value).context(ConvertMetaResponseSnafu)) + .collect::>>() + } +} + +#[derive(Debug)] +struct Inner { + id: Id, + role: Role, + channel_manager: ChannelManager, + ask_leader: Option, + max_retry: usize, +} + +impl Inner { + async fn start(&mut self, urls: A) -> Result<()> + where + U: AsRef, + A: AsRef<[U]>, + { + ensure!( + !self.is_started(), + IllegalGrpcClientStateSnafu { + err_msg: "Cluster client already started", + } + ); + + let peers = urls + .as_ref() + .iter() + .map(|url| url.as_ref().to_string()) + .collect::>(); + self.ask_leader = Some(AskLeader::new( + self.id, + self.role, + peers, + self.channel_manager.clone(), + self.max_retry, + )); + + Ok(()) + } + + fn make_client(&self, addr: impl AsRef) -> Result> { + let channel = self.channel_manager.get(addr).context(CreateChannelSnafu)?; + + Ok(ClusterClient::new(channel)) + } + + #[inline] + fn is_started(&self) -> bool { + self.ask_leader.is_some() + } + + fn ask_leader(&self) -> Result<&AskLeader> { + ensure!( + self.is_started(), + IllegalGrpcClientStateSnafu { + err_msg: "Cluster client not start" + } + ); + + Ok(self.ask_leader.as_ref().unwrap()) + } + + async fn with_retry(&self, task: &str, body_fn: F, get_header: H) -> Result + where + R: Future>, + F: Fn(ClusterClient) -> R, + H: Fn(&T) -> &Option, + { + let ask_leader = self.ask_leader()?; + let mut times = 0; + + while times < self.max_retry { + if let Some(leader) = &ask_leader.get_leader() { + let client = self.make_client(leader)?; + match body_fn(client).await { + Ok(res) => { + if util::is_not_leader(get_header(&res)) { + warn!("Failed to {task} to {leader}, not a leader"); + let leader = ask_leader.ask_leader().await?; + info!("Cluster client updated to new leader addr: {leader}"); + times += 1; + continue; + } + return Ok(res); + } + Err(status) => { + // The leader may be unreachable. + if util::is_unreachable(&status) { + warn!("Failed to {task} to {leader}, source: {status}"); + let leader = ask_leader.ask_leader().await?; + info!("Cluster client updated to new leader addr: {leader}"); + times += 1; + continue; + } else { + return Err(Error::from(status)); + } + } + } + } else if let Err(err) = ask_leader.ask_leader().await { + return Err(err); + } + } + + RetryTimesExceededSnafu { + msg: "Failed to {task}", + times: self.max_retry, + } + .fail() + } + + async fn range(&self, request: RangeRequest) -> Result { + self.with_retry( + "range", + move |mut client| { + let inner_req = tonic::Request::new(v1::meta::RangeRequest::from(request.clone())); + + async move { client.range(inner_req).await.map(|res| res.into_inner()) } + }, + |res| &res.header, + ) + .await? + .try_into() + .context(ConvertMetaResponseSnafu) + } + + async fn batch_get(&self, request: BatchGetRequest) -> Result { + self.with_retry( + "batch_get", + move |mut client| { + let inner_req = + tonic::Request::new(v1::meta::BatchGetRequest::from(request.clone())); + + async move { + client + .batch_get(inner_req) + .await + .map(|res| res.into_inner()) + } + }, + |res| &res.header, + ) + .await? + .try_into() + .context(ConvertMetaResponseSnafu) + } +} diff --git a/src/meta-client/src/client/heartbeat.rs b/src/meta-client/src/client/heartbeat.rs index 8b873e48da1b..47984360b44b 100644 --- a/src/meta-client/src/client/heartbeat.rs +++ b/src/meta-client/src/client/heartbeat.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use api::v1::meta::heartbeat_client::HeartbeatClient; use api::v1::meta::{HeartbeatRequest, HeartbeatResponse, RequestHeader, Role}; use common_grpc::channel_manager::ChannelManager; -use common_meta::rpc::util; +use common_meta::util; use common_telemetry::info; use common_telemetry::tracing_context::TracingContext; use snafu::{ensure, OptionExt, ResultExt}; @@ -128,11 +128,6 @@ impl Client { inner.ask_leader().await?; inner.heartbeat().await } - - pub async fn is_started(&self) -> bool { - let inner = self.inner.read().await; - inner.is_started() - } } #[derive(Debug)] @@ -267,17 +262,6 @@ impl Inner { mod test { use super::*; - #[tokio::test] - async fn test_start_client() { - let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default(), 3); - assert!(!client.is_started().await); - client - .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) - .await - .unwrap(); - assert!(client.is_started().await); - } - #[tokio::test] async fn test_already_start() { let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default(), 3); @@ -285,7 +269,6 @@ mod test { .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) .await .unwrap(); - assert!(client.is_started().await); let res = client.start(&["127.0.0.1:1002"]).await; assert!(res.is_err()); assert!(matches!( diff --git a/src/meta-client/src/client/lock.rs b/src/meta-client/src/client/lock.rs index 04c4aaa34c9c..66fe077c2286 100644 --- a/src/meta-client/src/client/lock.rs +++ b/src/meta-client/src/client/lock.rs @@ -53,11 +53,6 @@ impl Client { inner.start(urls).await } - pub async fn is_started(&self) -> bool { - let inner = self.inner.read().await; - inner.is_started() - } - pub async fn lock(&self, req: LockRequest) -> Result { let inner = self.inner.read().await; inner.lock(req).await @@ -155,17 +150,6 @@ impl Inner { mod tests { use super::*; - #[tokio::test] - async fn test_start_client() { - let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default()); - assert!(!client.is_started().await); - client - .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) - .await - .unwrap(); - assert!(client.is_started().await); - } - #[tokio::test] async fn test_already_start() { let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default()); @@ -173,7 +157,6 @@ mod tests { .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) .await .unwrap(); - assert!(client.is_started().await); let res = client.start(&["127.0.0.1:1002"]).await; assert!(res.is_err()); assert!(matches!( diff --git a/src/meta-client/src/client/procedure.rs b/src/meta-client/src/client/procedure.rs index 638f19bb8a67..20cd5385a872 100644 --- a/src/meta-client/src/client/procedure.rs +++ b/src/meta-client/src/client/procedure.rs @@ -18,8 +18,8 @@ use std::time::Duration; use api::v1::meta::procedure_service_client::ProcedureServiceClient; use api::v1::meta::{ - DdlTaskRequest, DdlTaskResponse, ErrorCode, MigrateRegionRequest, MigrateRegionResponse, - ProcedureId, ProcedureStateResponse, QueryProcedureRequest, ResponseHeader, Role, + DdlTaskRequest, DdlTaskResponse, MigrateRegionRequest, MigrateRegionResponse, ProcedureId, + ProcedureStateResponse, QueryProcedureRequest, ResponseHeader, Role, }; use common_grpc::channel_manager::ChannelManager; use common_telemetry::tracing_context::TracingContext; @@ -27,10 +27,10 @@ use common_telemetry::{info, warn}; use snafu::{ensure, ResultExt}; use tokio::sync::RwLock; use tonic::transport::Channel; -use tonic::{Code, Status}; +use tonic::Status; use crate::client::ask_leader::AskLeader; -use crate::client::Id; +use crate::client::{util, Id}; use crate::error; use crate::error::Result; @@ -61,11 +61,6 @@ impl Client { inner.start(urls).await } - pub async fn is_started(&self) -> bool { - let inner = self.inner.read().await; - inner.is_started() - } - pub async fn submit_ddl_task(&self, req: DdlTaskRequest) -> Result { let inner = self.inner.read().await; inner.submit_ddl_task(req).await @@ -173,7 +168,7 @@ impl Inner { let client = self.make_client(leader)?; match body_fn(client).await { Ok(res) => { - if is_not_leader(get_header(&res)) { + if util::is_not_leader(get_header(&res)) { warn!("Failed to {task} to {leader}, not a leader"); let leader = ask_leader.ask_leader().await?; info!("DDL client updated to new leader addr: {leader}"); @@ -184,7 +179,7 @@ impl Inner { } Err(status) => { // The leader may be unreachable. - if is_unreachable(&status) { + if util::is_unreachable(&status) { warn!("Failed to {task} to {leader}, source: {status}"); let leader = ask_leader.ask_leader().await?; info!("Procedure client updated to new leader addr: {leader}"); @@ -282,17 +277,3 @@ impl Inner { .await } } - -fn is_unreachable(status: &Status) -> bool { - status.code() == Code::Unavailable || status.code() == Code::DeadlineExceeded -} - -fn is_not_leader(header: &Option) -> bool { - if let Some(header) = header { - if let Some(err) = header.error.as_ref() { - return err.code == ErrorCode::NotLeader as i32; - } - } - - false -} diff --git a/src/meta-client/src/client/store.rs b/src/meta-client/src/client/store.rs index a03c88fbf1bb..e63f7ade1f5a 100644 --- a/src/meta-client/src/client/store.rs +++ b/src/meta-client/src/client/store.rs @@ -57,11 +57,6 @@ impl Client { inner.start(urls).await } - pub async fn is_started(&self) -> bool { - let inner = self.inner.read().await; - inner.is_started() - } - pub async fn range(&self, req: RangeRequest) -> Result { let inner = self.inner.read().await; inner.range(req).await @@ -254,17 +249,6 @@ impl Inner { mod test { use super::*; - #[tokio::test] - async fn test_start_client() { - let mut client = Client::new((0, 0), Role::Frontend, ChannelManager::default()); - assert!(!client.is_started().await); - client - .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) - .await - .unwrap(); - assert!(client.is_started().await); - } - #[tokio::test] async fn test_already_start() { let mut client = Client::new((0, 0), Role::Frontend, ChannelManager::default()); @@ -272,7 +256,6 @@ mod test { .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) .await .unwrap(); - assert!(client.is_started().await); let res = client.start(&["127.0.0.1:1002"]).await; assert!(res.is_err()); assert!(matches!( diff --git a/src/meta-client/src/client/util.rs b/src/meta-client/src/client/util.rs new file mode 100644 index 000000000000..758e1f9de1a3 --- /dev/null +++ b/src/meta-client/src/client/util.rs @@ -0,0 +1,32 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::{ErrorCode, ResponseHeader}; +use tonic::{Code, Status}; + +pub(crate) fn is_unreachable(status: &Status) -> bool { + status.code() == Code::Unavailable || status.code() == Code::DeadlineExceeded +} + +pub(crate) fn is_not_leader(header: &Option) -> bool { + let Some(header) = header else { + return false; + }; + + let Some(err) = header.error.as_ref() else { + return false; + }; + + err.code == ErrorCode::NotLeader as i32 +} diff --git a/src/meta-srv/src/cluster.rs b/src/meta-srv/src/cluster.rs index 2dbc0c1ea66c..0c8def1c0cfe 100644 --- a/src/meta-srv/src/cluster.rs +++ b/src/meta-srv/src/cluster.rs @@ -105,6 +105,21 @@ impl KvBackend for MetaPeerClient { .fail() } + // MetaPeerClient does not support mutable methods listed below. + async fn put(&self, _req: PutRequest) -> Result { + error::UnsupportedSnafu { + operation: "put".to_string(), + } + .fail() + } + + async fn batch_put(&self, _req: BatchPutRequest) -> Result { + error::UnsupportedSnafu { + operation: "batch put".to_string(), + } + .fail() + } + // Get kv information from the leader's in_mem kv store async fn batch_get(&self, req: BatchGetRequest) -> Result { if self.is_leader() { @@ -139,21 +154,6 @@ impl KvBackend for MetaPeerClient { .fail() } - // MetaPeerClient does not support mutable methods listed below. - async fn put(&self, _req: PutRequest) -> Result { - error::UnsupportedSnafu { - operation: "put".to_string(), - } - .fail() - } - - async fn batch_put(&self, _req: BatchPutRequest) -> Result { - error::UnsupportedSnafu { - operation: "batch put".to_string(), - } - .fail() - } - async fn compare_and_put(&self, _req: CompareAndPutRequest) -> Result { error::UnsupportedSnafu { operation: "compare and put".to_string(), @@ -175,13 +175,6 @@ impl KvBackend for MetaPeerClient { .fail() } - async fn delete(&self, _key: &[u8], _prev_kv: bool) -> Result> { - error::UnsupportedSnafu { - operation: "delete".to_string(), - } - .fail() - } - async fn put_conditionally( &self, _key: Vec, @@ -193,6 +186,13 @@ impl KvBackend for MetaPeerClient { } .fail() } + + async fn delete(&self, _key: &[u8], _prev_kv: bool) -> Result> { + error::UnsupportedSnafu { + operation: "delete".to_string(), + } + .fail() + } } impl MetaPeerClient { diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index ced24419f3ed..5db066c34deb 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -654,6 +654,18 @@ pub enum Error { err_msg: String, source: common_meta::error::Error, }, + + #[snafu(display("Failed to save cluster info"))] + SaveClusterInfo { + location: Location, + source: common_meta::error::Error, + }, + + #[snafu(display("Invalid cluster info format"))] + InvalidClusterInfoFormat { + location: Location, + source: common_meta::error::Error, + }, } impl Error { @@ -746,6 +758,8 @@ impl ErrorExt for Error { | Error::MigrationAbort { .. } | Error::MigrationRunning { .. } => StatusCode::Unexpected, Error::TableNotFound { .. } => StatusCode::TableNotFound, + Error::SaveClusterInfo { source, .. } + | Error::InvalidClusterInfoFormat { source, .. } => source.status_code(), Error::InvalidateTableCache { source, .. } => source.status_code(), Error::RequestDatanode { source, .. } => source.status_code(), Error::InvalidCatalogValue { source, .. } diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 4de3d5530dd8..93d99b682500 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -41,6 +41,7 @@ use crate::service::mailbox::{ }; pub mod check_leader_handler; +pub mod collect_cluster_info_handler; pub mod collect_stats_handler; pub mod failure_handler; pub mod filter_inactive_region_stats; diff --git a/src/meta-srv/src/handler/collect_cluster_info_handler.rs b/src/meta-srv/src/handler/collect_cluster_info_handler.rs new file mode 100644 index 000000000000..48edc4504075 --- /dev/null +++ b/src/meta-srv/src/handler/collect_cluster_info_handler.rs @@ -0,0 +1,143 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::{HeartbeatRequest, Role}; +use common_meta::cluster; +use common_meta::cluster::{DatanodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus}; +use common_meta::peer::Peer; +use common_meta::rpc::store::PutRequest; +use snafu::ResultExt; +use store_api::region_engine::RegionRole; + +use crate::error::{InvalidClusterInfoFormatSnafu, SaveClusterInfoSnafu}; +use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; +use crate::metasrv::Context; +use crate::Result; + +/// The handler to collect cluster info from the heartbeat request of frontend. +pub struct CollectFrontendClusterInfoHandler; + +#[async_trait::async_trait] +impl HeartbeatHandler for CollectFrontendClusterInfoHandler { + fn is_acceptable(&self, role: Role) -> bool { + role == Role::Frontend + } + + async fn handle( + &self, + req: &HeartbeatRequest, + ctx: &mut Context, + _acc: &mut HeartbeatAccumulator, + ) -> Result { + let Some((key, peer)) = extract_base_info(req, Role::Frontend) else { + return Ok(HandleControl::Continue); + }; + + let value = NodeInfo { + peer, + last_activity_ts: common_time::util::current_time_millis(), + status: NodeStatus::Frontend(FrontendStatus {}), + }; + + save_to_mem_store(key, value, ctx).await?; + + Ok(HandleControl::Continue) + } +} + +/// The handler to collect cluster info from the heartbeat request of datanode. +pub struct CollectDatanodeClusterInfoHandler; + +#[async_trait::async_trait] +impl HeartbeatHandler for CollectDatanodeClusterInfoHandler { + fn is_acceptable(&self, role: Role) -> bool { + role == Role::Datanode + } + + async fn handle( + &self, + req: &HeartbeatRequest, + ctx: &mut Context, + acc: &mut HeartbeatAccumulator, + ) -> Result { + let Some((key, peer)) = extract_base_info(req, Role::Datanode) else { + return Ok(HandleControl::Continue); + }; + + let Some(stat) = &acc.stat else { + return Ok(HandleControl::Continue); + }; + + let leader_regions = stat + .region_stats + .iter() + .filter(|s| s.role == RegionRole::Leader) + .count(); + let follower_regions = stat.region_stats.len() - leader_regions; + + let value = NodeInfo { + peer, + last_activity_ts: stat.timestamp_millis, + status: NodeStatus::Datanode(DatanodeStatus { + rcus: stat.rcus, + wcus: stat.wcus, + leader_regions, + follower_regions, + }), + }; + + save_to_mem_store(key, value, ctx).await?; + + Ok(HandleControl::Continue) + } +} + +fn extract_base_info(req: &HeartbeatRequest, role: Role) -> Option<(NodeInfoKey, Peer)> { + let HeartbeatRequest { header, peer, .. } = req; + let Some(header) = &header else { + return None; + }; + let Some(peer) = &peer else { + return None; + }; + + Some(( + NodeInfoKey { + cluster_id: header.cluster_id, + role: match role { + Role::Datanode => cluster::Role::Datanode, + Role::Frontend => cluster::Role::Frontend, + }, + node_id: peer.id, + }, + Peer::from(peer.clone()), + )) +} + +async fn save_to_mem_store(key: NodeInfoKey, value: NodeInfo, ctx: &mut Context) -> Result<()> { + let key = key.into(); + let value = value.try_into().context(InvalidClusterInfoFormatSnafu)?; + let put_req = PutRequest { + key, + value, + ..Default::default() + }; + + ctx.in_memory + .put(put_req) + .await + .context(SaveClusterInfoSnafu)?; + + Ok(()) +} diff --git a/src/meta-srv/src/handler/on_leader_start_handler.rs b/src/meta-srv/src/handler/on_leader_start_handler.rs index 58f70005aa8e..58751833d173 100644 --- a/src/meta-srv/src/handler/on_leader_start_handler.rs +++ b/src/meta-srv/src/handler/on_leader_start_handler.rs @@ -22,8 +22,8 @@ pub struct OnLeaderStartHandler; #[async_trait::async_trait] impl HeartbeatHandler for OnLeaderStartHandler { - fn is_acceptable(&self, role: Role) -> bool { - role == Role::Datanode + fn is_acceptable(&self, _: Role) -> bool { + true } async fn handle( diff --git a/src/meta-srv/src/handler/response_header_handler.rs b/src/meta-srv/src/handler/response_header_handler.rs index 143982ba8020..8bcc7e9a138d 100644 --- a/src/meta-srv/src/handler/response_header_handler.rs +++ b/src/meta-srv/src/handler/response_header_handler.rs @@ -22,7 +22,7 @@ pub struct ResponseHeaderHandler; #[async_trait::async_trait] impl HeartbeatHandler for ResponseHeaderHandler { - fn is_acceptable(&self, _role: Role) -> bool { + fn is_acceptable(&self, _: Role) -> bool { true } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index fe327bd5898f..d406589599b5 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -40,6 +40,9 @@ use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; use crate::error::{self, Result}; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::handler::check_leader_handler::CheckLeaderHandler; +use crate::handler::collect_cluster_info_handler::{ + CollectDatanodeClusterInfoHandler, CollectFrontendClusterInfoHandler, +}; use crate::handler::collect_stats_handler::CollectStatsHandler; use crate::handler::failure_handler::RegionFailureHandler; use crate::handler::filter_inactive_region_stats::FilterInactiveRegionStatsHandler; @@ -298,6 +301,8 @@ impl MetaSrvBuilder { group.add_handler(CheckLeaderHandler).await; group.add_handler(OnLeaderStartHandler).await; group.add_handler(CollectStatsHandler).await; + group.add_handler(CollectDatanodeClusterInfoHandler).await; + group.add_handler(CollectFrontendClusterInfoHandler).await; group.add_handler(MailboxHandler).await; group.add_handler(region_lease_handler).await; group.add_handler(FilterInactiveRegionStatsHandler).await; diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index b4eb412856e1..bc28e8e1305f 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -348,6 +348,7 @@ impl GreptimeDbClusterBuilder { .enable_heartbeat() .channel_manager(meta_srv.channel_manager) .enable_procedure() + .enable_access_cluster_info() .build(); meta_client.start(&[&meta_srv.server_addr]).await.unwrap(); let meta_client = Arc::new(meta_client);