diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 855e112645de..ff77882de807 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -425,6 +425,13 @@ pub enum Error { source: BoxedError, }, + #[snafu(display("The response exceeded size limit"))] + ResponseExceededSizeLimit { + #[snafu(implicit)] + location: Location, + source: BoxedError, + }, + #[snafu(display("Invalid heartbeat response"))] InvalidHeartbeatResponse { #[snafu(implicit)] @@ -763,6 +770,7 @@ impl ErrorExt for Error { | StopProcedureManager { source, .. } => source.status_code(), RegisterProcedureLoader { source, .. } => source.status_code(), External { source, .. } => source.status_code(), + ResponseExceededSizeLimit { source, .. } => source.status_code(), OperateDatanode { source, .. } => source.status_code(), Table { source, .. } => source.status_code(), RetryLater { source, .. } => source.status_code(), @@ -805,13 +813,13 @@ impl Error { /// Returns true if the response exceeds the size limit. pub fn is_exceeded_size_limit(&self) -> bool { - if let Error::EtcdFailed { - error: etcd_client::Error::GRpcStatus(status), - .. - } = self - { - return status.code() == tonic::Code::OutOfRange; + match self { + Error::EtcdFailed { + error: etcd_client::Error::GRpcStatus(status), + .. + } => status.code() == tonic::Code::OutOfRange, + Error::ResponseExceededSizeLimit { .. } => true, + _ => false, } - false } } diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 723bb099ddd8..d0008a7e81b1 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -543,24 +543,29 @@ impl MetaClient { #[cfg(test)] mod tests { use api::v1::meta::{HeartbeatRequest, Peer}; + use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; + use rand::Rng; use super::*; - use crate::{error, mocks}; + use crate::error; + use crate::mocks::{self, MockMetaContext}; const TEST_KEY_PREFIX: &str = "__unit_test__meta__"; struct TestClient { ns: String, client: MetaClient, + meta_ctx: MockMetaContext, } impl TestClient { async fn new(ns: impl Into) -> Self { // can also test with etcd: mocks::mock_client_with_etcdstore("127.0.0.1:2379").await; - let client = mocks::mock_client_with_memstore().await; + let (client, meta_ctx) = mocks::mock_client_with_memstore().await; Self { ns: ns.into(), client, + meta_ctx, } } @@ -585,6 +590,15 @@ mod tests { let res = self.client.delete_range(req).await; let _ = res.unwrap(); } + + #[allow(dead_code)] + fn kv_backend(&self) -> KvBackendRef { + self.meta_ctx.kv_backend.clone() + } + + fn in_memory(&self) -> Option { + self.meta_ctx.in_memory.clone() + } } async fn new_client(ns: impl Into) -> TestClient { @@ -940,4 +954,37 @@ mod tests { ); } } + + fn mock_decoder(_kv: KeyValue) -> MetaResult<()> { + Ok(()) + } + + #[tokio::test] + async fn test_cluster_client_adaptive_range() { + let tx = new_client("test_cluster_client").await; + let in_memory = tx.in_memory().unwrap(); + let cluster_client = tx.client.cluster_client().unwrap(); + let mut rng = rand::thread_rng(); + + // Generates rough 10MB data, which is larger than the default grpc message size limit. + for i in 0..10 { + let data: Vec = (0..1024 * 1024).map(|_| rng.gen()).collect(); + in_memory + .put( + PutRequest::new() + .with_key(format!("__prefix/{i}").as_bytes()) + .with_value(data.clone()), + ) + .await + .unwrap(); + } + + let req = RangeRequest::new().with_prefix(b"__prefix/"); + let stream = + PaginationStream::new(Arc::new(cluster_client), req, 10, Arc::new(mock_decoder)) + .into_stream(); + + let res = stream.try_collect::>().await.unwrap(); + assert_eq!(10, res.len()); + } } diff --git a/src/meta-client/src/client/cluster.rs b/src/meta-client/src/client/cluster.rs index 72b2307790fa..c7edbcc8d39e 100644 --- a/src/meta-client/src/client/cluster.rs +++ b/src/meta-client/src/client/cluster.rs @@ -21,7 +21,9 @@ use api::v1::meta::cluster_client::ClusterClient; use api::v1::meta::{MetasrvNodeInfo, MetasrvPeersRequest, ResponseHeader, Role}; use common_error::ext::BoxedError; use common_grpc::channel_manager::ChannelManager; -use common_meta::error::{Error as MetaError, ExternalSnafu, Result as MetaResult}; +use common_meta::error::{ + Error as MetaError, ExternalSnafu, ResponseExceededSizeLimitSnafu, Result as MetaResult, +}; use common_meta::kv_backend::{KvBackend, TxnService}; use common_meta::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, @@ -103,10 +105,14 @@ impl KvBackend for Client { } async fn range(&self, req: RangeRequest) -> MetaResult { - self.range(req) - .await - .map_err(BoxedError::new) - .context(ExternalSnafu) + let resp = self.range(req).await; + match resp { + Ok(resp) => Ok(resp), + Err(err) if err.is_exceeded_size_limit() => { + Err(BoxedError::new(err)).context(ResponseExceededSizeLimitSnafu) + } + Err(err) => Err(BoxedError::new(err)).context(ExternalSnafu), + } } async fn put(&self, _: PutRequest) -> MetaResult { diff --git a/src/meta-client/src/error.rs b/src/meta-client/src/error.rs index 33a960d9f22f..a4f8663368b4 100644 --- a/src/meta-client/src/error.rs +++ b/src/meta-client/src/error.rs @@ -31,7 +31,11 @@ pub enum Error { }, #[snafu(display("{}", msg))] - MetaServer { code: StatusCode, msg: String }, + MetaServer { + code: StatusCode, + msg: String, + tonic_code: tonic::Code, + }, #[snafu(display("No leader, should ask leader first"))] NoLeader { @@ -127,6 +131,18 @@ impl ErrorExt for Error { } } +impl Error { + pub fn is_exceeded_size_limit(&self) -> bool { + matches!( + self, + Error::MetaServer { + tonic_code: tonic::Code::OutOfRange, + .. + } + ) + } +} + // FIXME(dennis): partial duplicated with src/client/src/error.rs impl From for Error { fn from(e: Status) -> Self { @@ -149,6 +165,10 @@ impl From for Error { let msg = get_metadata_value(&e, GREPTIME_DB_HEADER_ERROR_MSG) .unwrap_or_else(|| e.message().to_string()); - Self::MetaServer { code, msg } + Self::MetaServer { + code, + msg, + tonic_code: e.code(), + } } } diff --git a/src/meta-client/src/mocks.rs b/src/meta-client/src/mocks.rs index cb102bc024f5..2643c44927ac 100644 --- a/src/meta-client/src/mocks.rs +++ b/src/meta-client/src/mocks.rs @@ -12,31 +12,57 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_grpc::channel_manager::ChannelManager; +use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use meta_srv::mocks as server_mock; use meta_srv::mocks::MockInfo; use crate::client::{MetaClient, MetaClientBuilder}; -pub async fn mock_client_with_memstore() -> MetaClient { - let mock_info = server_mock::mock_with_memstore().await; - mock_client_by(mock_info).await +pub struct MockMetaContext { + pub kv_backend: KvBackendRef, + pub in_memory: Option, } -#[allow(dead_code)] -pub async fn mock_client_with_etcdstore(addr: &str) -> MetaClient { - let mock_info = server_mock::mock_with_etcdstore(addr).await; - mock_client_by(mock_info).await +pub async fn mock_client_with_memstore() -> (MetaClient, MockMetaContext) { + let MockInfo { + server_addr, + channel_manager, + kv_backend, + in_memory, + .. + } = server_mock::mock_with_memstore().await; + ( + mock_client_by(server_addr, channel_manager).await, + MockMetaContext { + kv_backend, + in_memory, + }, + ) } -pub async fn mock_client_by(mock_info: MockInfo) -> MetaClient { +#[allow(dead_code)] +pub async fn mock_client_with_etcdstore(addr: &str) -> (MetaClient, MockMetaContext) { let MockInfo { server_addr, channel_manager, + kv_backend, + in_memory, .. - } = mock_info; + } = server_mock::mock_with_etcdstore(addr).await; + ( + mock_client_by(server_addr, channel_manager).await, + MockMetaContext { + kv_backend, + in_memory, + }, + ) +} +pub async fn mock_client_by(server_addr: String, channel_manager: ChannelManager) -> MetaClient { let id = (1000u64, 2000u64); let mut meta_client = MetaClientBuilder::datanode_default_options(id.0, id.1) + .enable_access_cluster_info() .channel_manager(channel_manager) .build(); meta_client.start(&[&server_addr]).await.unwrap(); diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs index 4991c2ab8a4d..cf9144dc3900 100644 --- a/src/meta-srv/src/mocks.rs +++ b/src/meta-srv/src/mocks.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use std::time::Duration; +use api::v1::meta::cluster_server::ClusterServer; use api::v1::meta::heartbeat_server::HeartbeatServer; use api::v1::meta::procedure_service_server::ProcedureServiceServer; use api::v1::meta::store_server::StoreServer; @@ -23,7 +24,7 @@ use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::etcd::EtcdStore; use common_meta::kv_backend::memory::MemoryKvBackend; -use common_meta::kv_backend::KvBackendRef; +use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use tonic::codec::CompressionEncoding; use tower::service_fn; @@ -36,21 +37,24 @@ pub struct MockInfo { pub server_addr: String, pub channel_manager: ChannelManager, pub metasrv: Arc, + pub kv_backend: KvBackendRef, + pub in_memory: Option, } pub async fn mock_with_memstore() -> MockInfo { let kv_backend = Arc::new(MemoryKvBackend::new()); - mock(Default::default(), kv_backend, None, None).await + let in_memory = Arc::new(MemoryKvBackend::new()); + mock(Default::default(), kv_backend, None, None, Some(in_memory)).await } pub async fn mock_with_etcdstore(addr: &str) -> MockInfo { let kv_backend = EtcdStore::with_endpoints([addr], 128).await.unwrap(); - mock(Default::default(), kv_backend, None, None).await + mock(Default::default(), kv_backend, None, None, None).await } pub async fn mock_with_memstore_and_selector(selector: SelectorRef) -> MockInfo { let kv_backend = Arc::new(MemoryKvBackend::new()); - mock(Default::default(), kv_backend, Some(selector), None).await + mock(Default::default(), kv_backend, Some(selector), None, None).await } pub async fn mock( @@ -58,13 +62,16 @@ pub async fn mock( kv_backend: KvBackendRef, selector: Option, datanode_clients: Option>, + in_memory: Option, ) -> MockInfo { let server_addr = opts.server_addr.clone(); let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); table_metadata_manager.init().await.unwrap(); - let builder = MetasrvBuilder::new().options(opts).kv_backend(kv_backend); + let builder = MetasrvBuilder::new() + .options(opts) + .kv_backend(kv_backend.clone()); let builder = match selector { Some(s) => builder.selector(s), @@ -76,6 +83,11 @@ pub async fn mock( None => builder, }; + let builder = match &in_memory { + Some(in_memory) => builder.in_memory(in_memory.clone()), + None => builder, + }; + let metasrv = builder.build().await.unwrap(); metasrv.try_start().await.unwrap(); @@ -89,6 +101,7 @@ pub async fn mock( let router = add_compressed_service!(router, StoreServer::from_arc(service.clone())); let router = add_compressed_service!(router, ProcedureServiceServer::from_arc(service.clone())); + let router = add_compressed_service!(router, ClusterServer::from_arc(service.clone())); router .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await @@ -126,5 +139,7 @@ pub async fn mock( server_addr, channel_manager, metasrv, + kv_backend, + in_memory, } } diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 0705f91fd233..2402c301ca5d 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -190,6 +190,7 @@ impl GreptimeDbClusterBuilder { self.kv_backend.clone(), self.meta_selector.clone(), Some(datanode_clients.clone()), + None, ) .await;