Skip to content

Commit

Permalink
fix: correct is_exceeded_size_limit behavior for in-memory store (#…
Browse files Browse the repository at this point in the history
…5082)

* fix: correct `is_exceeded_size_limit` behavior for in-memory store

* chore: rename `MetaClientExceededSizeLimit` to `ResponseExceededSizeLimit`
  • Loading branch information
WenyXu authored Dec 2, 2024
1 parent bcadce3 commit 5bdea1a
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 30 deletions.
22 changes: 15 additions & 7 deletions src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,13 @@ pub enum Error {
source: BoxedError,
},

#[snafu(display("The response exceeded size limit"))]
ResponseExceededSizeLimit {
#[snafu(implicit)]
location: Location,
source: BoxedError,
},

#[snafu(display("Invalid heartbeat response"))]
InvalidHeartbeatResponse {
#[snafu(implicit)]
Expand Down Expand Up @@ -763,6 +770,7 @@ impl ErrorExt for Error {
| StopProcedureManager { source, .. } => source.status_code(),
RegisterProcedureLoader { source, .. } => source.status_code(),
External { source, .. } => source.status_code(),
ResponseExceededSizeLimit { source, .. } => source.status_code(),
OperateDatanode { source, .. } => source.status_code(),
Table { source, .. } => source.status_code(),
RetryLater { source, .. } => source.status_code(),
Expand Down Expand Up @@ -805,13 +813,13 @@ impl Error {

/// Returns true if the response exceeds the size limit.
pub fn is_exceeded_size_limit(&self) -> bool {
if let Error::EtcdFailed {
error: etcd_client::Error::GRpcStatus(status),
..
} = self
{
return status.code() == tonic::Code::OutOfRange;
match self {
Error::EtcdFailed {
error: etcd_client::Error::GRpcStatus(status),
..
} => status.code() == tonic::Code::OutOfRange,
Error::ResponseExceededSizeLimit { .. } => true,
_ => false,
}
false
}
}
51 changes: 49 additions & 2 deletions src/meta-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,24 +543,29 @@ impl MetaClient {
#[cfg(test)]
mod tests {
use api::v1::meta::{HeartbeatRequest, Peer};
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use rand::Rng;

use super::*;
use crate::{error, mocks};
use crate::error;
use crate::mocks::{self, MockMetaContext};

const TEST_KEY_PREFIX: &str = "__unit_test__meta__";

struct TestClient {
ns: String,
client: MetaClient,
meta_ctx: MockMetaContext,
}

impl TestClient {
async fn new(ns: impl Into<String>) -> Self {
// can also test with etcd: mocks::mock_client_with_etcdstore("127.0.0.1:2379").await;
let client = mocks::mock_client_with_memstore().await;
let (client, meta_ctx) = mocks::mock_client_with_memstore().await;
Self {
ns: ns.into(),
client,
meta_ctx,
}
}

Expand All @@ -585,6 +590,15 @@ mod tests {
let res = self.client.delete_range(req).await;
let _ = res.unwrap();
}

#[allow(dead_code)]
fn kv_backend(&self) -> KvBackendRef {
self.meta_ctx.kv_backend.clone()
}

fn in_memory(&self) -> Option<ResettableKvBackendRef> {
self.meta_ctx.in_memory.clone()
}
}

async fn new_client(ns: impl Into<String>) -> TestClient {
Expand Down Expand Up @@ -940,4 +954,37 @@ mod tests {
);
}
}

fn mock_decoder(_kv: KeyValue) -> MetaResult<()> {
Ok(())
}

#[tokio::test]
async fn test_cluster_client_adaptive_range() {
let tx = new_client("test_cluster_client").await;
let in_memory = tx.in_memory().unwrap();
let cluster_client = tx.client.cluster_client().unwrap();
let mut rng = rand::thread_rng();

// Generates rough 10MB data, which is larger than the default grpc message size limit.
for i in 0..10 {
let data: Vec<u8> = (0..1024 * 1024).map(|_| rng.gen()).collect();
in_memory
.put(
PutRequest::new()
.with_key(format!("__prefix/{i}").as_bytes())
.with_value(data.clone()),
)
.await
.unwrap();
}

let req = RangeRequest::new().with_prefix(b"__prefix/");
let stream =
PaginationStream::new(Arc::new(cluster_client), req, 10, Arc::new(mock_decoder))
.into_stream();

let res = stream.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(10, res.len());
}
}
16 changes: 11 additions & 5 deletions src/meta-client/src/client/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use api::v1::meta::cluster_client::ClusterClient;
use api::v1::meta::{MetasrvNodeInfo, MetasrvPeersRequest, ResponseHeader, Role};
use common_error::ext::BoxedError;
use common_grpc::channel_manager::ChannelManager;
use common_meta::error::{Error as MetaError, ExternalSnafu, Result as MetaResult};
use common_meta::error::{
Error as MetaError, ExternalSnafu, ResponseExceededSizeLimitSnafu, Result as MetaResult,
};
use common_meta::kv_backend::{KvBackend, TxnService};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
Expand Down Expand Up @@ -103,10 +105,14 @@ impl KvBackend for Client {
}

async fn range(&self, req: RangeRequest) -> MetaResult<RangeResponse> {
self.range(req)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
let resp = self.range(req).await;
match resp {
Ok(resp) => Ok(resp),
Err(err) if err.is_exceeded_size_limit() => {
Err(BoxedError::new(err)).context(ResponseExceededSizeLimitSnafu)
}
Err(err) => Err(BoxedError::new(err)).context(ExternalSnafu),
}
}

async fn put(&self, _: PutRequest) -> MetaResult<PutResponse> {
Expand Down
24 changes: 22 additions & 2 deletions src/meta-client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ pub enum Error {
},

#[snafu(display("{}", msg))]
MetaServer { code: StatusCode, msg: String },
MetaServer {
code: StatusCode,
msg: String,
tonic_code: tonic::Code,
},

#[snafu(display("No leader, should ask leader first"))]
NoLeader {
Expand Down Expand Up @@ -127,6 +131,18 @@ impl ErrorExt for Error {
}
}

impl Error {
pub fn is_exceeded_size_limit(&self) -> bool {
matches!(
self,
Error::MetaServer {
tonic_code: tonic::Code::OutOfRange,
..
}
)
}
}

// FIXME(dennis): partial duplicated with src/client/src/error.rs
impl From<Status> for Error {
fn from(e: Status) -> Self {
Expand All @@ -149,6 +165,10 @@ impl From<Status> for Error {
let msg = get_metadata_value(&e, GREPTIME_DB_HEADER_ERROR_MSG)
.unwrap_or_else(|| e.message().to_string());

Self::MetaServer { code, msg }
Self::MetaServer {
code,
msg,
tonic_code: e.code(),
}
}
}
44 changes: 35 additions & 9 deletions src/meta-client/src/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,57 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_grpc::channel_manager::ChannelManager;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use meta_srv::mocks as server_mock;
use meta_srv::mocks::MockInfo;

use crate::client::{MetaClient, MetaClientBuilder};

pub async fn mock_client_with_memstore() -> MetaClient {
let mock_info = server_mock::mock_with_memstore().await;
mock_client_by(mock_info).await
pub struct MockMetaContext {
pub kv_backend: KvBackendRef,
pub in_memory: Option<ResettableKvBackendRef>,
}

#[allow(dead_code)]
pub async fn mock_client_with_etcdstore(addr: &str) -> MetaClient {
let mock_info = server_mock::mock_with_etcdstore(addr).await;
mock_client_by(mock_info).await
pub async fn mock_client_with_memstore() -> (MetaClient, MockMetaContext) {
let MockInfo {
server_addr,
channel_manager,
kv_backend,
in_memory,
..
} = server_mock::mock_with_memstore().await;
(
mock_client_by(server_addr, channel_manager).await,
MockMetaContext {
kv_backend,
in_memory,
},
)
}

pub async fn mock_client_by(mock_info: MockInfo) -> MetaClient {
#[allow(dead_code)]
pub async fn mock_client_with_etcdstore(addr: &str) -> (MetaClient, MockMetaContext) {
let MockInfo {
server_addr,
channel_manager,
kv_backend,
in_memory,
..
} = mock_info;
} = server_mock::mock_with_etcdstore(addr).await;
(
mock_client_by(server_addr, channel_manager).await,
MockMetaContext {
kv_backend,
in_memory,
},
)
}

pub async fn mock_client_by(server_addr: String, channel_manager: ChannelManager) -> MetaClient {
let id = (1000u64, 2000u64);
let mut meta_client = MetaClientBuilder::datanode_default_options(id.0, id.1)
.enable_access_cluster_info()
.channel_manager(channel_manager)
.build();
meta_client.start(&[&server_addr]).await.unwrap();
Expand Down
25 changes: 20 additions & 5 deletions src/meta-srv/src/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;
use std::time::Duration;

use api::v1::meta::cluster_server::ClusterServer;
use api::v1::meta::heartbeat_server::HeartbeatServer;
use api::v1::meta::procedure_service_server::ProcedureServiceServer;
use api::v1::meta::store_server::StoreServer;
Expand All @@ -23,7 +24,7 @@ use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use tonic::codec::CompressionEncoding;
use tower::service_fn;

Expand All @@ -36,35 +37,41 @@ pub struct MockInfo {
pub server_addr: String,
pub channel_manager: ChannelManager,
pub metasrv: Arc<Metasrv>,
pub kv_backend: KvBackendRef,
pub in_memory: Option<ResettableKvBackendRef>,
}

pub async fn mock_with_memstore() -> MockInfo {
let kv_backend = Arc::new(MemoryKvBackend::new());
mock(Default::default(), kv_backend, None, None).await
let in_memory = Arc::new(MemoryKvBackend::new());
mock(Default::default(), kv_backend, None, None, Some(in_memory)).await
}

pub async fn mock_with_etcdstore(addr: &str) -> MockInfo {
let kv_backend = EtcdStore::with_endpoints([addr], 128).await.unwrap();
mock(Default::default(), kv_backend, None, None).await
mock(Default::default(), kv_backend, None, None, None).await
}

pub async fn mock_with_memstore_and_selector(selector: SelectorRef) -> MockInfo {
let kv_backend = Arc::new(MemoryKvBackend::new());
mock(Default::default(), kv_backend, Some(selector), None).await
mock(Default::default(), kv_backend, Some(selector), None, None).await
}

pub async fn mock(
opts: MetasrvOptions,
kv_backend: KvBackendRef,
selector: Option<SelectorRef>,
datanode_clients: Option<Arc<NodeClients>>,
in_memory: Option<ResettableKvBackendRef>,
) -> MockInfo {
let server_addr = opts.server_addr.clone();
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));

table_metadata_manager.init().await.unwrap();

let builder = MetasrvBuilder::new().options(opts).kv_backend(kv_backend);
let builder = MetasrvBuilder::new()
.options(opts)
.kv_backend(kv_backend.clone());

let builder = match selector {
Some(s) => builder.selector(s),
Expand All @@ -76,6 +83,11 @@ pub async fn mock(
None => builder,
};

let builder = match &in_memory {
Some(in_memory) => builder.in_memory(in_memory.clone()),
None => builder,
};

let metasrv = builder.build().await.unwrap();
metasrv.try_start().await.unwrap();

Expand All @@ -89,6 +101,7 @@ pub async fn mock(
let router = add_compressed_service!(router, StoreServer::from_arc(service.clone()));
let router =
add_compressed_service!(router, ProcedureServiceServer::from_arc(service.clone()));
let router = add_compressed_service!(router, ClusterServer::from_arc(service.clone()));
router
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
Expand Down Expand Up @@ -126,5 +139,7 @@ pub async fn mock(
server_addr,
channel_manager,
metasrv,
kv_backend,
in_memory,
}
}
1 change: 1 addition & 0 deletions tests-integration/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ impl GreptimeDbClusterBuilder {
self.kv_backend.clone(),
self.meta_selector.clone(),
Some(datanode_clients.clone()),
None,
)
.await;

Expand Down

0 comments on commit 5bdea1a

Please sign in to comment.