From 4182365c9dc48840e958cc700ba26ff615d768af Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 24 Dec 2024 04:43:09 +0000 Subject: [PATCH 1/2] chore: remove unnecessary arc --- src/common/meta/src/key/catalog_name.rs | 5 +++-- src/common/meta/src/key/datanode_table.rs | 3 +-- src/common/meta/src/key/flow/flow_name.rs | 4 +--- src/common/meta/src/key/flow/flow_route.rs | 4 +--- src/common/meta/src/key/flow/flownode_flow.rs | 4 +--- src/common/meta/src/key/flow/table_flow.rs | 2 +- src/common/meta/src/key/schema_name.rs | 5 ++--- src/common/meta/src/key/table_name.rs | 2 +- src/common/meta/src/range_stream.rs | 11 +++++------ src/common/meta/src/state_store.rs | 4 +--- src/meta-client/src/client.rs | 7 +++---- src/meta-srv/src/service/store/cached_kv.rs | 2 +- 12 files changed, 21 insertions(+), 32 deletions(-) diff --git a/src/common/meta/src/key/catalog_name.rs b/src/common/meta/src/key/catalog_name.rs index 18ca096412a2..dddbeed3d21c 100644 --- a/src/common/meta/src/key/catalog_name.rs +++ b/src/common/meta/src/key/catalog_name.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::fmt::Display; -use std::sync::Arc; use common_catalog::consts::DEFAULT_CATALOG_NAME; use futures::stream::BoxStream; @@ -146,7 +145,7 @@ impl CatalogManager { self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, - Arc::new(catalog_decoder), + catalog_decoder, ) .into_stream(); @@ -156,6 +155,8 @@ impl CatalogManager { #[cfg(test)] mod tests { + use std::sync::Arc; + use super::*; use crate::kv_backend::memory::MemoryKvBackend; diff --git a/src/common/meta/src/key/datanode_table.rs b/src/common/meta/src/key/datanode_table.rs index a0f0e9e511b8..a8226e631bc5 100644 --- a/src/common/meta/src/key/datanode_table.rs +++ b/src/common/meta/src/key/datanode_table.rs @@ -14,7 +14,6 @@ use std::collections::HashMap; use std::fmt::Display; -use std::sync::Arc; use futures::stream::BoxStream; use serde::{Deserialize, Serialize}; @@ -166,7 +165,7 @@ impl DatanodeTableManager { self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, - Arc::new(datanode_table_value_decoder), + datanode_table_value_decoder, ) .into_stream(); diff --git a/src/common/meta/src/key/flow/flow_name.rs b/src/common/meta/src/key/flow/flow_name.rs index 79c87c7360ea..cac2e29633f4 100644 --- a/src/common/meta/src/key/flow/flow_name.rs +++ b/src/common/meta/src/key/flow/flow_name.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use futures::stream::BoxStream; use lazy_static::lazy_static; use regex::Regex; @@ -201,7 +199,7 @@ impl FlowNameManager { self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, - Arc::new(flow_name_decoder), + flow_name_decoder, ) .into_stream(); diff --git a/src/common/meta/src/key/flow/flow_route.rs b/src/common/meta/src/key/flow/flow_route.rs index 47ee94ce9543..c8d81c5e2a26 100644 --- a/src/common/meta/src/key/flow/flow_route.rs +++ b/src/common/meta/src/key/flow/flow_route.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use futures::stream::BoxStream; use lazy_static::lazy_static; use regex::Regex; @@ -179,7 +177,7 @@ impl FlowRouteManager { self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, - Arc::new(flow_route_decoder), + flow_route_decoder, ) .into_stream(); diff --git a/src/common/meta/src/key/flow/flownode_flow.rs b/src/common/meta/src/key/flow/flownode_flow.rs index 552abfcdbe45..6d987c7f4a46 100644 --- a/src/common/meta/src/key/flow/flownode_flow.rs +++ b/src/common/meta/src/key/flow/flownode_flow.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use futures::stream::BoxStream; use futures::TryStreamExt; use lazy_static::lazy_static; @@ -179,7 +177,7 @@ impl FlownodeFlowManager { self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, - Arc::new(flownode_flow_key_decoder), + flownode_flow_key_decoder, ) .into_stream(); diff --git a/src/common/meta/src/key/flow/table_flow.rs b/src/common/meta/src/key/flow/table_flow.rs index c4f47cde514a..4aa4ab060afc 100644 --- a/src/common/meta/src/key/flow/table_flow.rs +++ b/src/common/meta/src/key/flow/table_flow.rs @@ -206,7 +206,7 @@ impl TableFlowManager { self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, - Arc::new(table_flow_decoder), + table_flow_decoder, ) .into_stream(); diff --git a/src/common/meta/src/key/schema_name.rs b/src/common/meta/src/key/schema_name.rs index 35413433a445..46acac8d2bc6 100644 --- a/src/common/meta/src/key/schema_name.rs +++ b/src/common/meta/src/key/schema_name.rs @@ -14,7 +14,6 @@ use std::collections::HashMap; use std::fmt::Display; -use std::sync::Arc; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_time::DatabaseTimeToLive; @@ -283,7 +282,7 @@ impl SchemaManager { self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, - Arc::new(schema_decoder), + schema_decoder, ) .into_stream(); @@ -308,7 +307,7 @@ impl<'a> From<&'a SchemaName> for SchemaNameKey<'a> { #[cfg(test)] mod tests { - use std::time::Duration; + use std::{sync::Arc, time::Duration}; use super::*; use crate::kv_backend::memory::MemoryKvBackend; diff --git a/src/common/meta/src/key/table_name.rs b/src/common/meta/src/key/table_name.rs index e508c5e87764..a504c4a0b89f 100644 --- a/src/common/meta/src/key/table_name.rs +++ b/src/common/meta/src/key/table_name.rs @@ -269,7 +269,7 @@ impl TableNameManager { self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, - Arc::new(table_decoder), + table_decoder, ) .into_stream(); diff --git a/src/common/meta/src/range_stream.rs b/src/common/meta/src/range_stream.rs index be54865281b3..367f081b63d2 100644 --- a/src/common/meta/src/range_stream.rs +++ b/src/common/meta/src/range_stream.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use async_stream::try_stream; use common_telemetry::debug; use futures::Stream; @@ -148,7 +146,7 @@ impl PaginationStreamFactory { } pub struct PaginationStream { - decoder_fn: Arc>, + decoder_fn: fn(KeyValue) -> Result, factory: PaginationStreamFactory, } @@ -158,7 +156,7 @@ impl PaginationStream { kv: KvBackendRef, req: RangeRequest, page_size: usize, - decoder_fn: Arc>, + decoder_fn: fn(KeyValue) -> Result, ) -> Self { Self { decoder_fn, @@ -191,6 +189,7 @@ mod tests { use std::assert_matches::assert_matches; use std::collections::BTreeMap; + use std::sync::Arc; use futures::TryStreamExt; @@ -250,7 +249,7 @@ mod tests { ..Default::default() }, DEFAULT_PAGE_SIZE, - Arc::new(decoder), + decoder, ) .into_stream(); let kv = stream.try_collect::>().await.unwrap(); @@ -290,7 +289,7 @@ mod tests { ..Default::default() }, 2, - Arc::new(decoder), + decoder, ); let kv = stream .into_stream() diff --git a/src/common/meta/src/state_store.rs b/src/common/meta/src/state_store.rs index 89d5dfd0ff0b..a9c3f14a3ebc 100644 --- a/src/common/meta/src/state_store.rs +++ b/src/common/meta/src/state_store.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use async_trait::async_trait; use common_error::ext::BoxedError; use common_procedure::error::{DeleteStatesSnafu, ListStateSnafu, PutStateSnafu}; @@ -171,7 +169,7 @@ impl StateStore for KvStateStore { self.kv_backend.clone(), req, self.max_num_per_range_request.unwrap_or_default(), - Arc::new(decode_kv), + decode_kv, ) .into_stream(); diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index ebe0e94e4861..ee7aebba527a 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -326,8 +326,8 @@ impl ClusterInfo for MetaClient { let cluster_kv_backend = Arc::new(self.cluster_client()?); let range_prefix = DatanodeStatKey::key_prefix_with_cluster_id(self.id.0); let req = RangeRequest::new().with_prefix(range_prefix); - let stream = PaginationStream::new(cluster_kv_backend, req, 256, Arc::new(decode_stats)) - .into_stream(); + let stream = + PaginationStream::new(cluster_kv_backend, req, 256, decode_stats).into_stream(); let mut datanode_stats = stream .try_collect::>() .await @@ -994,8 +994,7 @@ mod tests { let req = RangeRequest::new().with_prefix(b"__prefix/"); let stream = - PaginationStream::new(Arc::new(cluster_client), req, 10, Arc::new(mock_decoder)) - .into_stream(); + PaginationStream::new(Arc::new(cluster_client), req, 10, mock_decoder).into_stream(); let res = stream.try_collect::>().await.unwrap(); assert_eq!(10, res.len()); diff --git a/src/meta-srv/src/service/store/cached_kv.rs b/src/meta-srv/src/service/store/cached_kv.rs index 0f90ecddea33..b26c2a558f1d 100644 --- a/src/meta-srv/src/service/store/cached_kv.rs +++ b/src/meta-srv/src/service/store/cached_kv.rs @@ -102,7 +102,7 @@ impl LeaderCachedKvBackend { self.store.clone(), RangeRequest::new().with_prefix(prefix.as_bytes()), DEFAULT_PAGE_SIZE, - Arc::new(Ok), + Ok, ) .into_stream(); From 244fecc5a53fbad2bcff66b3a4780b16dcb27e54 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 24 Dec 2024 06:46:32 +0000 Subject: [PATCH 2/2] chore: remove unnecessary box --- src/catalog/src/kvbackend/table_cache.rs | 2 +- src/common/meta/src/cache/container.rs | 44 ++++++++++++++----- .../meta/src/cache/flow/table_flownode.rs | 2 +- src/common/meta/src/cache/registry.rs | 22 ++++++++-- src/common/meta/src/cache/table/schema.rs | 2 +- src/common/meta/src/cache/table/table_info.rs | 2 +- src/common/meta/src/cache/table/table_name.rs | 2 +- .../meta/src/cache/table/table_route.rs | 2 +- .../meta/src/cache/table/table_schema.rs | 2 +- src/common/meta/src/cache/table/view_info.rs | 2 +- src/common/meta/src/key/schema_name.rs | 3 +- 11 files changed, 62 insertions(+), 23 deletions(-) diff --git a/src/catalog/src/kvbackend/table_cache.rs b/src/catalog/src/kvbackend/table_cache.rs index 93980d1a0612..c890960bd61f 100644 --- a/src/catalog/src/kvbackend/table_cache.rs +++ b/src/catalog/src/kvbackend/table_cache.rs @@ -38,7 +38,7 @@ pub fn new_table_cache( ) -> TableCache { let init = init_factory(table_info_cache, table_name_cache); - CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter)) + CacheContainer::new(name, cache, Box::new(invalidator), init, filter) } fn init_factory( diff --git a/src/common/meta/src/cache/container.rs b/src/common/meta/src/cache/container.rs index c32506534f90..289c2c920841 100644 --- a/src/common/meta/src/cache/container.rs +++ b/src/common/meta/src/cache/container.rs @@ -43,7 +43,7 @@ pub struct CacheContainer { cache: Cache, invalidator: Invalidator, initializer: Initializer, - token_filter: TokenFilter, + token_filter: fn(&CacheToken) -> bool, } impl CacheContainer @@ -58,7 +58,7 @@ where cache: Cache, invalidator: Invalidator, initializer: Initializer, - token_filter: TokenFilter, + token_filter: fn(&CacheToken) -> bool, ) -> Self { Self { name, @@ -206,10 +206,13 @@ mod tests { name: &'a str, } + fn always_true_filter(_: &String) -> bool { + true + } + #[tokio::test] async fn test_get() { let cache: Cache = CacheBuilder::new(128).build(); - let filter: TokenFilter = Box::new(|_| true); let counter = Arc::new(AtomicI32::new(0)); let moved_counter = counter.clone(); let init: Initializer = Arc::new(move |_| { @@ -219,7 +222,13 @@ mod tests { let invalidator: Invalidator = Box::new(|_, _| Box::pin(async { Ok(()) })); - let adv_cache = CacheContainer::new("test".to_string(), cache, invalidator, init, filter); + let adv_cache = CacheContainer::new( + "test".to_string(), + cache, + invalidator, + init, + always_true_filter, + ); let key = NameKey { name: "key" }; let value = adv_cache.get(key).await.unwrap().unwrap(); assert_eq!(value, "hi"); @@ -233,7 +242,6 @@ mod tests { #[tokio::test] async fn test_get_by_ref() { let cache: Cache = CacheBuilder::new(128).build(); - let filter: TokenFilter = Box::new(|_| true); let counter = Arc::new(AtomicI32::new(0)); let moved_counter = counter.clone(); let init: Initializer = Arc::new(move |_| { @@ -243,7 +251,13 @@ mod tests { let invalidator: Invalidator = Box::new(|_, _| Box::pin(async { Ok(()) })); - let adv_cache = CacheContainer::new("test".to_string(), cache, invalidator, init, filter); + let adv_cache = CacheContainer::new( + "test".to_string(), + cache, + invalidator, + init, + always_true_filter, + ); let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap(); assert_eq!(value, "hi"); let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap(); @@ -257,13 +271,18 @@ mod tests { #[tokio::test] async fn test_get_value_not_exits() { let cache: Cache = CacheBuilder::new(128).build(); - let filter: TokenFilter = Box::new(|_| true); let init: Initializer = Arc::new(move |_| Box::pin(async { error::ValueNotExistSnafu {}.fail() })); let invalidator: Invalidator = Box::new(|_, _| Box::pin(async { Ok(()) })); - let adv_cache = CacheContainer::new("test".to_string(), cache, invalidator, init, filter); + let adv_cache = CacheContainer::new( + "test".to_string(), + cache, + invalidator, + init, + always_true_filter, + ); let value = adv_cache.get_by_ref("foo").await.unwrap(); assert!(value.is_none()); } @@ -271,7 +290,6 @@ mod tests { #[tokio::test] async fn test_invalidate() { let cache: Cache = CacheBuilder::new(128).build(); - let filter: TokenFilter = Box::new(|_| true); let counter = Arc::new(AtomicI32::new(0)); let moved_counter = counter.clone(); let init: Initializer = Arc::new(move |_| { @@ -285,7 +303,13 @@ mod tests { }) }); - let adv_cache = CacheContainer::new("test".to_string(), cache, invalidator, init, filter); + let adv_cache = CacheContainer::new( + "test".to_string(), + cache, + invalidator, + init, + always_true_filter, + ); let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap(); assert_eq!(value, "hi"); let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap(); diff --git a/src/common/meta/src/cache/flow/table_flownode.rs b/src/common/meta/src/cache/flow/table_flownode.rs index 684478c1510e..50a47aade1e5 100644 --- a/src/common/meta/src/cache/flow/table_flownode.rs +++ b/src/common/meta/src/cache/flow/table_flownode.rs @@ -45,7 +45,7 @@ pub fn new_table_flownode_set_cache( let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend)); let init = init_factory(table_flow_manager); - CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter)) + CacheContainer::new(name, cache, Box::new(invalidator), init, filter) } fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer { diff --git a/src/common/meta/src/cache/registry.rs b/src/common/meta/src/cache/registry.rs index e51fb7e6732e..20e378cf199a 100644 --- a/src/common/meta/src/cache/registry.rs +++ b/src/common/meta/src/cache/registry.rs @@ -151,12 +151,15 @@ mod tests { use crate::cache::*; use crate::instruction::CacheIdent; + fn always_true_filter(_: &CacheIdent) -> bool { + true + } + fn test_cache( name: &str, invalidator: Invalidator, ) -> CacheContainer { let cache: Cache = CacheBuilder::new(128).build(); - let filter: TokenFilter = Box::new(|_| true); let counter = Arc::new(AtomicI32::new(0)); let moved_counter = counter.clone(); let init: Initializer = Arc::new(move |_| { @@ -164,7 +167,13 @@ mod tests { Box::pin(async { Ok(Some("hi".to_string())) }) }); - CacheContainer::new(name.to_string(), cache, invalidator, init, filter) + CacheContainer::new( + name.to_string(), + cache, + invalidator, + init, + always_true_filter, + ) } fn test_i32_cache( @@ -172,7 +181,6 @@ mod tests { invalidator: Invalidator, ) -> CacheContainer { let cache: Cache = CacheBuilder::new(128).build(); - let filter: TokenFilter = Box::new(|_| true); let counter = Arc::new(AtomicI32::new(0)); let moved_counter = counter.clone(); let init: Initializer = Arc::new(move |_| { @@ -180,7 +188,13 @@ mod tests { Box::pin(async { Ok(Some("foo".to_string())) }) }); - CacheContainer::new(name.to_string(), cache, invalidator, init, filter) + CacheContainer::new( + name.to_string(), + cache, + invalidator, + init, + always_true_filter, + ) } #[tokio::test] diff --git a/src/common/meta/src/cache/table/schema.rs b/src/common/meta/src/cache/table/schema.rs index 8016c85eaa67..bcf81d4fe6e6 100644 --- a/src/common/meta/src/cache/table/schema.rs +++ b/src/common/meta/src/cache/table/schema.rs @@ -36,7 +36,7 @@ pub fn new_schema_cache( let schema_manager = SchemaManager::new(kv_backend.clone()); let init = init_factory(schema_manager); - CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter)) + CacheContainer::new(name, cache, Box::new(invalidator), init, filter) } fn init_factory(schema_manager: SchemaManager) -> Initializer> { diff --git a/src/common/meta/src/cache/table/table_info.rs b/src/common/meta/src/cache/table/table_info.rs index 2f8d188d3dd7..c3444516a58e 100644 --- a/src/common/meta/src/cache/table/table_info.rs +++ b/src/common/meta/src/cache/table/table_info.rs @@ -41,7 +41,7 @@ pub fn new_table_info_cache( let table_info_manager = Arc::new(TableInfoManager::new(kv_backend)); let init = init_factory(table_info_manager); - CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter)) + CacheContainer::new(name, cache, Box::new(invalidator), init, filter) } fn init_factory(table_info_manager: TableInfoManagerRef) -> Initializer> { diff --git a/src/common/meta/src/cache/table/table_name.rs b/src/common/meta/src/cache/table/table_name.rs index 926e4de66f63..540da5e5f4ba 100644 --- a/src/common/meta/src/cache/table/table_name.rs +++ b/src/common/meta/src/cache/table/table_name.rs @@ -41,7 +41,7 @@ pub fn new_table_name_cache( let table_name_manager = Arc::new(TableNameManager::new(kv_backend)); let init = init_factory(table_name_manager); - CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter)) + CacheContainer::new(name, cache, Box::new(invalidator), init, filter) } fn init_factory(table_name_manager: TableNameManagerRef) -> Initializer { diff --git a/src/common/meta/src/cache/table/table_route.rs b/src/common/meta/src/cache/table/table_route.rs index 840e52f8ae1c..f75926592728 100644 --- a/src/common/meta/src/cache/table/table_route.rs +++ b/src/common/meta/src/cache/table/table_route.rs @@ -65,7 +65,7 @@ pub fn new_table_route_cache( let table_info_manager = Arc::new(TableRouteManager::new(kv_backend)); let init = init_factory(table_info_manager); - CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter)) + CacheContainer::new(name, cache, Box::new(invalidator), init, filter) } fn init_factory( diff --git a/src/common/meta/src/cache/table/table_schema.rs b/src/common/meta/src/cache/table/table_schema.rs index a0cc567a7303..99ece65683a8 100644 --- a/src/common/meta/src/cache/table/table_schema.rs +++ b/src/common/meta/src/cache/table/table_schema.rs @@ -40,7 +40,7 @@ pub fn new_table_schema_cache( let table_info_manager = TableInfoManager::new(kv_backend); let init = init_factory(table_info_manager); - CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter)) + CacheContainer::new(name, cache, Box::new(invalidator), init, filter) } fn init_factory(table_info_manager: TableInfoManager) -> Initializer> { diff --git a/src/common/meta/src/cache/table/view_info.rs b/src/common/meta/src/cache/table/view_info.rs index 4a5c391f42e6..6a85493d420d 100644 --- a/src/common/meta/src/cache/table/view_info.rs +++ b/src/common/meta/src/cache/table/view_info.rs @@ -40,7 +40,7 @@ pub fn new_view_info_cache( let view_info_manager = Arc::new(ViewInfoManager::new(kv_backend)); let init = init_factory(view_info_manager); - CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter)) + CacheContainer::new(name, cache, Box::new(invalidator), init, filter) } fn init_factory(view_info_manager: ViewInfoManagerRef) -> Initializer> { diff --git a/src/common/meta/src/key/schema_name.rs b/src/common/meta/src/key/schema_name.rs index 46acac8d2bc6..41d4f503b12f 100644 --- a/src/common/meta/src/key/schema_name.rs +++ b/src/common/meta/src/key/schema_name.rs @@ -307,7 +307,8 @@ impl<'a> From<&'a SchemaName> for SchemaNameKey<'a> { #[cfg(test)] mod tests { - use std::{sync::Arc, time::Duration}; + use std::sync::Arc; + use std::time::Duration; use super::*; use crate::kv_backend::memory::MemoryKvBackend;