diff --git a/Cargo.lock b/Cargo.lock index 001c2821f332..6d406716eef1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6468,6 +6468,7 @@ dependencies = [ "common-telemetry", "datatypes", "futures", + "futures-util", "humantime-serde", "meta-srv", "rand", diff --git a/src/meta-client/Cargo.toml b/src/meta-client/Cargo.toml index 73042744990b..4d22fe4bd31d 100644 --- a/src/meta-client/Cargo.toml +++ b/src/meta-client/Cargo.toml @@ -15,6 +15,8 @@ common-grpc.workspace = true common-macro.workspace = true common-meta.workspace = true common-telemetry.workspace = true +futures.workspace = true +futures-util.workspace = true humantime-serde.workspace = true rand.workspace = true serde.workspace = true diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index ed6fdf13fba3..723bb099ddd8 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -21,6 +21,8 @@ mod cluster; mod store; mod util; +use std::sync::Arc; + use api::v1::meta::{ProcedureDetailResponse, Role}; use cluster::Client as ClusterClient; use common_error::ext::BoxedError; @@ -30,7 +32,8 @@ use common_meta::cluster::{ }; use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat}; use common_meta::ddl::{ExecutorContext, ProcedureExecutor}; -use common_meta::error::{self as meta_error, Result as MetaResult}; +use common_meta::error::{self as meta_error, ExternalSnafu, Result as MetaResult}; +use common_meta::range_stream::PaginationStream; use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::procedure::{ MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse, @@ -40,8 +43,10 @@ use common_meta::rpc::store::{ BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, }; +use common_meta::rpc::KeyValue; use common_meta::ClusterId; use common_telemetry::info; +use futures::TryStreamExt; use heartbeat::Client as HeartbeatClient; use procedure::Client as ProcedureClient; use snafu::{OptionExt, ResultExt}; @@ -314,16 +319,15 @@ impl ClusterInfo for MetaClient { } async fn list_region_stats(&self) -> Result> { - let cluster_client = self.cluster_client()?; + let cluster_kv_backend = Arc::new(self.cluster_client()?); let range_prefix = DatanodeStatKey::key_prefix_with_cluster_id(self.id.0); let req = RangeRequest::new().with_prefix(range_prefix); - let mut datanode_stats = cluster_client - .range(req) - .await? - .kvs - .into_iter() - .map(|kv| DatanodeStatValue::try_from(kv.value).context(ConvertMetaRequestSnafu)) - .collect::>>()?; + let stream = PaginationStream::new(cluster_kv_backend, req, 256, Arc::new(decode_stats)) + .into_stream(); + let mut datanode_stats = stream + .try_collect::>() + .await + .context(ConvertMetaResponseSnafu)?; let region_stats = datanode_stats .iter_mut() .flat_map(|datanode_stat| { @@ -336,6 +340,12 @@ impl ClusterInfo for MetaClient { } } +fn decode_stats(kv: KeyValue) -> MetaResult { + DatanodeStatValue::try_from(kv.value) + .map_err(BoxedError::new) + .context(ExternalSnafu) +} + impl MetaClient { pub fn new(id: Id) -> Self { Self { diff --git a/src/meta-client/src/client/cluster.rs b/src/meta-client/src/client/cluster.rs index 60ce52ecb69b..b1c7ff1089a1 100644 --- a/src/meta-client/src/client/cluster.rs +++ b/src/meta-client/src/client/cluster.rs @@ -12,14 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; use std::future::Future; use std::sync::Arc; use api::greptime_proto::v1; use api::v1::meta::cluster_client::ClusterClient; use api::v1::meta::{MetasrvNodeInfo, MetasrvPeersRequest, ResponseHeader, Role}; +use common_error::ext::BoxedError; use common_grpc::channel_manager::ChannelManager; -use common_meta::rpc::store::{BatchGetRequest, BatchGetResponse, RangeRequest, RangeResponse}; +use common_meta::error::{Error as MetaError, ExternalSnafu, Result as MetaResult}; +use common_meta::kv_backend::{KvBackend, TxnService}; +use common_meta::rpc::store::{ + BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, + BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse, + RangeRequest, RangeResponse, +}; use common_telemetry::{info, warn}; use snafu::{ensure, ResultExt}; use tokio::sync::RwLock; @@ -79,6 +87,51 @@ impl Client { } } +impl TxnService for Client { + type Error = MetaError; +} + +#[async_trait::async_trait] +impl KvBackend for Client { + fn name(&self) -> &str { + "ClusterClientKvBackend" + } + + fn as_any(&self) -> &dyn Any { + self + } + + async fn range(&self, req: RangeRequest) -> MetaResult { + self.range(req) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu) + } + + async fn put(&self, _: PutRequest) -> MetaResult { + unimplemented!("`put` is not supported in cluster client kv backend") + } + + async fn batch_put(&self, _: BatchPutRequest) -> MetaResult { + unimplemented!("`batch_put` is not supported in cluster client kv backend") + } + + async fn batch_get(&self, req: BatchGetRequest) -> MetaResult { + self.batch_get(req) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu) + } + + async fn delete_range(&self, _: DeleteRangeRequest) -> MetaResult { + unimplemented!("`delete_range` is not supported in cluster client kv backend") + } + + async fn batch_delete(&self, _: BatchDeleteRequest) -> MetaResult { + unimplemented!("`batch_delete` is not supported in cluster client kv backend") + } +} + #[derive(Debug)] struct Inner { id: Id,