Skip to content

Commit

Permalink
chore: paginated query region stats (GreptimeTeam#4942)
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun authored Nov 7, 2024
1 parent 5d20acc commit 22f31f5
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/meta-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ common-grpc.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-telemetry.workspace = true
futures.workspace = true
futures-util.workspace = true
humantime-serde.workspace = true
rand.workspace = true
serde.workspace = true
Expand Down
28 changes: 19 additions & 9 deletions src/meta-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ mod cluster;
mod store;
mod util;

use std::sync::Arc;

use api::v1::meta::{ProcedureDetailResponse, Role};
use cluster::Client as ClusterClient;
use common_error::ext::BoxedError;
Expand All @@ -30,7 +32,8 @@ use common_meta::cluster::{
};
use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
use common_meta::ddl::{ExecutorContext, ProcedureExecutor};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::error::{self as meta_error, ExternalSnafu, Result as MetaResult};
use common_meta::range_stream::PaginationStream;
use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::procedure::{
MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
Expand All @@ -40,8 +43,10 @@ use common_meta::rpc::store::{
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
};
use common_meta::rpc::KeyValue;
use common_meta::ClusterId;
use common_telemetry::info;
use futures::TryStreamExt;
use heartbeat::Client as HeartbeatClient;
use procedure::Client as ProcedureClient;
use snafu::{OptionExt, ResultExt};
Expand Down Expand Up @@ -314,16 +319,15 @@ impl ClusterInfo for MetaClient {
}

async fn list_region_stats(&self) -> Result<Vec<RegionStat>> {
let cluster_client = self.cluster_client()?;
let cluster_kv_backend = Arc::new(self.cluster_client()?);
let range_prefix = DatanodeStatKey::key_prefix_with_cluster_id(self.id.0);
let req = RangeRequest::new().with_prefix(range_prefix);
let mut datanode_stats = cluster_client
.range(req)
.await?
.kvs
.into_iter()
.map(|kv| DatanodeStatValue::try_from(kv.value).context(ConvertMetaRequestSnafu))
.collect::<Result<Vec<_>>>()?;
let stream = PaginationStream::new(cluster_kv_backend, req, 256, Arc::new(decode_stats))
.into_stream();
let mut datanode_stats = stream
.try_collect::<Vec<_>>()
.await
.context(ConvertMetaResponseSnafu)?;
let region_stats = datanode_stats
.iter_mut()
.flat_map(|datanode_stat| {
Expand All @@ -336,6 +340,12 @@ impl ClusterInfo for MetaClient {
}
}

fn decode_stats(kv: KeyValue) -> MetaResult<DatanodeStatValue> {
DatanodeStatValue::try_from(kv.value)
.map_err(BoxedError::new)
.context(ExternalSnafu)
}

impl MetaClient {
pub fn new(id: Id) -> Self {
Self {
Expand Down
55 changes: 54 additions & 1 deletion src/meta-client/src/client/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::future::Future;
use std::sync::Arc;

use api::greptime_proto::v1;
use api::v1::meta::cluster_client::ClusterClient;
use api::v1::meta::{MetasrvNodeInfo, MetasrvPeersRequest, ResponseHeader, Role};
use common_error::ext::BoxedError;
use common_grpc::channel_manager::ChannelManager;
use common_meta::rpc::store::{BatchGetRequest, BatchGetResponse, RangeRequest, RangeResponse};
use common_meta::error::{Error as MetaError, ExternalSnafu, Result as MetaResult};
use common_meta::kv_backend::{KvBackend, TxnService};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse,
RangeRequest, RangeResponse,
};
use common_telemetry::{info, warn};
use snafu::{ensure, ResultExt};
use tokio::sync::RwLock;
Expand Down Expand Up @@ -79,6 +87,51 @@ impl Client {
}
}

impl TxnService for Client {
type Error = MetaError;
}

#[async_trait::async_trait]
impl KvBackend for Client {
fn name(&self) -> &str {
"ClusterClientKvBackend"
}

fn as_any(&self) -> &dyn Any {
self
}

async fn range(&self, req: RangeRequest) -> MetaResult<RangeResponse> {
self.range(req)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
}

async fn put(&self, _: PutRequest) -> MetaResult<PutResponse> {
unimplemented!("`put` is not supported in cluster client kv backend")
}

async fn batch_put(&self, _: BatchPutRequest) -> MetaResult<BatchPutResponse> {
unimplemented!("`batch_put` is not supported in cluster client kv backend")
}

async fn batch_get(&self, req: BatchGetRequest) -> MetaResult<BatchGetResponse> {
self.batch_get(req)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
}

async fn delete_range(&self, _: DeleteRangeRequest) -> MetaResult<DeleteRangeResponse> {
unimplemented!("`delete_range` is not supported in cluster client kv backend")
}

async fn batch_delete(&self, _: BatchDeleteRequest) -> MetaResult<BatchDeleteResponse> {
unimplemented!("`batch_delete` is not supported in cluster client kv backend")
}
}

#[derive(Debug)]
struct Inner {
id: Id,
Expand Down

0 comments on commit 22f31f5

Please sign in to comment.