diff --git a/Cargo.lock b/Cargo.lock index 68eeaf20ff34..9a7a6b3eb834 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3225,6 +3225,7 @@ dependencies = [ "arrow-flight", "async-trait", "bytes", + "cache", "catalog", "client", "common-base", diff --git a/src/cache/src/lib.rs b/src/cache/src/lib.rs index 4adf0ff1ff33..f267a74346d9 100644 --- a/src/cache/src/lib.rs +++ b/src/cache/src/lib.rs @@ -19,9 +19,9 @@ use std::time::Duration; use catalog::kvbackend::new_table_cache; use common_meta::cache::{ - new_table_flownode_set_cache, new_table_info_cache, new_table_name_cache, - new_table_route_cache, new_view_info_cache, CacheRegistry, CacheRegistryBuilder, - LayeredCacheRegistryBuilder, + new_schema_cache, new_table_flownode_set_cache, new_table_info_cache, new_table_name_cache, + new_table_route_cache, new_table_schema_cache, new_view_info_cache, CacheRegistry, + CacheRegistryBuilder, LayeredCacheRegistryBuilder, }; use common_meta::kv_backend::KvBackendRef; use moka::future::CacheBuilder; @@ -37,9 +37,47 @@ pub const TABLE_INFO_CACHE_NAME: &str = "table_info_cache"; pub const VIEW_INFO_CACHE_NAME: &str = "view_info_cache"; pub const TABLE_NAME_CACHE_NAME: &str = "table_name_cache"; pub const TABLE_CACHE_NAME: &str = "table_cache"; +pub const SCHEMA_CACHE_NAME: &str = "schema_cache"; +pub const TABLE_SCHEMA_NAME_CACHE_NAME: &str = "table_schema_name_cache"; pub const TABLE_FLOWNODE_SET_CACHE_NAME: &str = "table_flownode_set_cache"; pub const TABLE_ROUTE_CACHE_NAME: &str = "table_route_cache"; +/// Builds cache registry for datanode, including: +/// - Schema cache. +/// - Table id to schema name cache. +pub fn build_datanode_cache_registry(kv_backend: KvBackendRef) -> CacheRegistry { + // Builds table id schema name cache that never expires. + let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY).build(); + let table_id_schema_cache = Arc::new(new_table_schema_cache( + TABLE_SCHEMA_NAME_CACHE_NAME.to_string(), + cache, + kv_backend.clone(), + )); + + // Builds schema cache + let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY) + .time_to_live(DEFAULT_CACHE_TTL) + .time_to_idle(DEFAULT_CACHE_TTI) + .build(); + let schema_cache = Arc::new(new_schema_cache( + SCHEMA_CACHE_NAME.to_string(), + cache, + kv_backend.clone(), + )); + + CacheRegistryBuilder::default() + .add_cache(table_id_schema_cache) + .add_cache(schema_cache) + .build() +} + +/// Builds cache registry for frontend and datanode, including: +/// - Table info cache +/// - Table name cache +/// - Table route cache +/// - Table flow node cache +/// - View cache +/// - Schema cache pub fn build_fundamental_cache_registry(kv_backend: KvBackendRef) -> CacheRegistry { // Builds table info cache let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY) @@ -95,12 +133,30 @@ pub fn build_fundamental_cache_registry(kv_backend: KvBackendRef) -> CacheRegist kv_backend.clone(), )); + // Builds schema cache + let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY) + .time_to_live(DEFAULT_CACHE_TTL) + .time_to_idle(DEFAULT_CACHE_TTI) + .build(); + let schema_cache = Arc::new(new_schema_cache( + SCHEMA_CACHE_NAME.to_string(), + cache, + kv_backend.clone(), + )); + + let table_id_schema_cache = Arc::new(new_table_schema_cache( + TABLE_SCHEMA_NAME_CACHE_NAME.to_string(), + CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY).build(), + kv_backend, + )); CacheRegistryBuilder::default() .add_cache(table_info_cache) .add_cache(table_name_cache) .add_cache(table_route_cache) .add_cache(view_info_cache) .add_cache(table_flownode_set_cache) + .add_cache(schema_cache) + .add_cache(table_id_schema_cache) .build() } diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 187bdbd7aa3a..811ed826ad49 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -16,10 +16,12 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; +use cache::build_datanode_cache_registry; use catalog::kvbackend::MetaKvBackend; use clap::Parser; use common_base::Plugins; use common_config::Configurable; +use common_meta::cache::LayeredCacheRegistryBuilder; use common_telemetry::logging::TracingOptions; use common_telemetry::{info, warn}; use common_version::{short_version, version}; @@ -300,9 +302,17 @@ impl StartCommand { client: meta_client.clone(), }); + // Builds cache registry for datanode. + let layered_cache_registry = Arc::new( + LayeredCacheRegistryBuilder::default() + .add_cache_registry(build_datanode_cache_registry(meta_backend.clone())) + .build(), + ); + let mut datanode = DatanodeBuilder::new(opts.clone(), plugins) .with_meta_client(meta_client) .with_kv_backend(meta_backend) + .with_cache_registry(layered_cache_registry) .build() .await .context(StartDatanodeSnafu)?; diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index 18fbf5184661..a2b6b41c019a 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -22,6 +22,7 @@ use common_base::Plugins; use common_config::Configurable; use common_grpc::channel_manager::ChannelConfig; use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; +use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_meta::key::flow::FlowMetadataManager; @@ -30,7 +31,6 @@ use common_telemetry::info; use common_telemetry::logging::TracingOptions; use common_version::{short_version, version}; use flow::{FlownodeBuilder, FlownodeInstance, FrontendInvoker}; -use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler; use meta_client::{MetaClientOptions, MetaClientType}; use servers::Mode; use snafu::{OptionExt, ResultExt}; @@ -288,9 +288,7 @@ impl StartCommand { let executor = HandlerGroupExecutor::new(vec![ Arc::new(ParseMailboxMessageHandler), - Arc::new(InvalidateTableCacheHandler::new( - layered_cache_registry.clone(), - )), + Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())), ]); let heartbeat_task = flow::heartbeat::HeartbeatTask::new( diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index dc4645dfa1f0..d90a286fc451 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -24,13 +24,13 @@ use common_base::Plugins; use common_config::Configurable; use common_grpc::channel_manager::ChannelConfig; use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; +use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_telemetry::info; use common_telemetry::logging::TracingOptions; use common_time::timezone::set_default_timezone; use common_version::{short_version, version}; -use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler; use frontend::heartbeat::HeartbeatTask; use frontend::instance::builder::FrontendBuilder; use frontend::instance::{FrontendInstance, Instance as FeInstance}; @@ -328,9 +328,7 @@ impl StartCommand { let executor = HandlerGroupExecutor::new(vec![ Arc::new(ParseMailboxMessageHandler), - Arc::new(InvalidateTableCacheHandler::new( - layered_cache_registry.clone(), - )), + Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())), ]); let heartbeat_task = HeartbeatTask::new( diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 251957dd285b..d7e816166b03 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -497,6 +497,7 @@ impl StartCommand { let datanode = DatanodeBuilder::new(dn_opts, plugins.clone()) .with_kv_backend(kv_backend.clone()) + .with_cache_registry(layered_cache_registry.clone()) .build() .await .context(StartDatanodeSnafu)?; diff --git a/src/common/meta/src/cache.rs b/src/common/meta/src/cache.rs index 52dae1a094af..9ce996b27d59 100644 --- a/src/common/meta/src/cache.rs +++ b/src/common/meta/src/cache.rs @@ -24,7 +24,8 @@ pub use registry::{ LayeredCacheRegistryBuilder, LayeredCacheRegistryRef, }; pub use table::{ - new_table_info_cache, new_table_name_cache, new_table_route_cache, new_view_info_cache, - TableInfoCache, TableInfoCacheRef, TableNameCache, TableNameCacheRef, TableRoute, - TableRouteCache, TableRouteCacheRef, ViewInfoCache, ViewInfoCacheRef, + new_schema_cache, new_table_info_cache, new_table_name_cache, new_table_route_cache, + new_table_schema_cache, new_view_info_cache, SchemaCache, SchemaCacheRef, TableInfoCache, + TableInfoCacheRef, TableNameCache, TableNameCacheRef, TableRoute, TableRouteCache, + TableRouteCacheRef, TableSchemaCache, TableSchemaCacheRef, ViewInfoCache, ViewInfoCacheRef, }; diff --git a/src/common/meta/src/cache/table.rs b/src/common/meta/src/cache/table.rs index 82a3ad98df33..87ad9a5a16a3 100644 --- a/src/common/meta/src/cache/table.rs +++ b/src/common/meta/src/cache/table.rs @@ -12,12 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod schema; mod table_info; mod table_name; mod table_route; +mod table_schema; mod view_info; +pub use schema::{new_schema_cache, SchemaCache, SchemaCacheRef}; pub use table_info::{new_table_info_cache, TableInfoCache, TableInfoCacheRef}; pub use table_name::{new_table_name_cache, TableNameCache, TableNameCacheRef}; pub use table_route::{new_table_route_cache, TableRoute, TableRouteCache, TableRouteCacheRef}; +pub use table_schema::{new_table_schema_cache, TableSchemaCache, TableSchemaCacheRef}; pub use view_info::{new_view_info_cache, ViewInfoCache, ViewInfoCacheRef}; diff --git a/src/common/meta/src/cache/table/schema.rs b/src/common/meta/src/cache/table/schema.rs new file mode 100644 index 000000000000..8016c85eaa67 --- /dev/null +++ b/src/common/meta/src/cache/table/schema.rs @@ -0,0 +1,73 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use futures_util::future::BoxFuture; +use moka::future::Cache; +use snafu::OptionExt; + +use crate::cache::{CacheContainer, Initializer}; +use crate::error::ValueNotExistSnafu; +use crate::instruction::CacheIdent; +use crate::key::schema_name::{SchemaManager, SchemaName, SchemaNameKey, SchemaNameValue}; +use crate::kv_backend::KvBackendRef; + +pub type SchemaCache = CacheContainer, CacheIdent>; +pub type SchemaCacheRef = Arc; + +/// Constructs a [SchemaCache]. +pub fn new_schema_cache( + name: String, + cache: Cache>, + kv_backend: KvBackendRef, +) -> SchemaCache { + let schema_manager = SchemaManager::new(kv_backend.clone()); + let init = init_factory(schema_manager); + + CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter)) +} + +fn init_factory(schema_manager: SchemaManager) -> Initializer> { + Arc::new(move |schema_name| { + let manager = schema_manager.clone(); + Box::pin(async move { + let schema_value = manager + .get(SchemaNameKey { + catalog: &schema_name.catalog_name, + schema: &schema_name.schema_name, + }) + .await? + .context(ValueNotExistSnafu)? + .into_inner(); + Ok(Some(Arc::new(schema_value))) + }) + }) +} + +fn invalidator<'a>( + cache: &'a Cache>, + ident: &'a CacheIdent, +) -> BoxFuture<'a, crate::error::Result<()>> { + Box::pin(async move { + if let CacheIdent::SchemaName(schema_name) = ident { + cache.invalidate(schema_name).await + } + Ok(()) + }) +} + +fn filter(ident: &CacheIdent) -> bool { + matches!(ident, CacheIdent::SchemaName(_)) +} diff --git a/src/common/meta/src/cache/table/table_schema.rs b/src/common/meta/src/cache/table/table_schema.rs new file mode 100644 index 000000000000..a0cc567a7303 --- /dev/null +++ b/src/common/meta/src/cache/table/table_schema.rs @@ -0,0 +1,76 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Cache for table id to schema name mapping. + +use std::sync::Arc; + +use futures_util::future::BoxFuture; +use moka::future::Cache; +use snafu::OptionExt; +use store_api::storage::TableId; + +use crate::cache::{CacheContainer, Initializer}; +use crate::error; +use crate::instruction::CacheIdent; +use crate::key::schema_name::SchemaName; +use crate::key::table_info::TableInfoManager; +use crate::kv_backend::KvBackendRef; + +pub type TableSchemaCache = CacheContainer, CacheIdent>; +pub type TableSchemaCacheRef = Arc; + +/// Constructs a [TableSchemaCache]. +pub fn new_table_schema_cache( + name: String, + cache: Cache>, + kv_backend: KvBackendRef, +) -> TableSchemaCache { + let table_info_manager = TableInfoManager::new(kv_backend); + let init = init_factory(table_info_manager); + + CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter)) +} + +fn init_factory(table_info_manager: TableInfoManager) -> Initializer> { + Arc::new(move |table_id| { + let table_info_manager = table_info_manager.clone(); + Box::pin(async move { + let raw_table_info = table_info_manager + .get(*table_id) + .await? + .context(error::ValueNotExistSnafu)? + .into_inner() + .table_info; + + Ok(Some(Arc::new(SchemaName { + catalog_name: raw_table_info.catalog_name, + schema_name: raw_table_info.schema_name, + }))) + }) + }) +} + +/// Never invalidates table id schema cache. +fn invalidator<'a>( + _cache: &'a Cache>, + _ident: &'a CacheIdent, +) -> BoxFuture<'a, error::Result<()>> { + Box::pin(std::future::ready(Ok(()))) +} + +/// Never invalidates table id schema cache. +fn filter(_ident: &CacheIdent) -> bool { + false +} diff --git a/src/common/meta/src/heartbeat/handler.rs b/src/common/meta/src/heartbeat/handler.rs index d80e3b8486c9..afa71f0edfd8 100644 --- a/src/common/meta/src/heartbeat/handler.rs +++ b/src/common/meta/src/heartbeat/handler.rs @@ -21,6 +21,7 @@ use common_telemetry::error; use crate::error::Result; use crate::heartbeat::mailbox::{IncomingMessage, MailboxRef}; +pub mod invalidate_table_cache; pub mod parse_mailbox_message; #[cfg(test)] mod tests; diff --git a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs b/src/common/meta/src/heartbeat/handler/invalidate_table_cache.rs similarity index 74% rename from src/frontend/src/heartbeat/handler/invalidate_table_cache.rs rename to src/common/meta/src/heartbeat/handler/invalidate_table_cache.rs index 9b949b4ea32a..2012b616f22a 100644 --- a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs +++ b/src/common/meta/src/heartbeat/handler/invalidate_table_cache.rs @@ -13,21 +13,22 @@ // limitations under the License. use async_trait::async_trait; -use common_meta::cache_invalidator::{CacheInvalidatorRef, Context}; -use common_meta::error::Result as MetaResult; -use common_meta::heartbeat::handler::{ +use common_telemetry::debug; + +use crate::cache_invalidator::{CacheInvalidatorRef, Context}; +use crate::error::Result as MetaResult; +use crate::heartbeat::handler::{ HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, }; -use common_meta::instruction::Instruction; -use common_telemetry::debug; +use crate::instruction::Instruction; #[derive(Clone)] -pub struct InvalidateTableCacheHandler { +pub struct InvalidateCacheHandler { cache_invalidator: CacheInvalidatorRef, } #[async_trait] -impl HeartbeatResponseHandler for InvalidateTableCacheHandler { +impl HeartbeatResponseHandler for InvalidateCacheHandler { fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool { matches!( ctx.incoming_message.as_ref(), @@ -37,13 +38,10 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler { async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult { let Some((_, Instruction::InvalidateCaches(caches))) = ctx.incoming_message.take() else { - unreachable!("InvalidateTableCacheHandler: should be guarded by 'is_acceptable'") + unreachable!("InvalidateCacheHandler: should be guarded by 'is_acceptable'") }; - debug!( - "InvalidateTableCacheHandler: invalidating caches: {:?}", - caches - ); + debug!("InvalidateCacheHandler: invalidating caches: {:?}", caches); // Invalidate local cache always success let _ = self @@ -55,7 +53,7 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler { } } -impl InvalidateTableCacheHandler { +impl InvalidateCacheHandler { pub fn new(cache_invalidator: CacheInvalidatorRef) -> Self { Self { cache_invalidator } } diff --git a/src/common/meta/src/key/schema_metadata_manager.rs b/src/common/meta/src/key/schema_metadata_manager.rs index 23ad86d2ab45..ad6673e7137a 100644 --- a/src/common/meta/src/key/schema_metadata_manager.rs +++ b/src/common/meta/src/key/schema_metadata_manager.rs @@ -19,41 +19,39 @@ use std::sync::Arc; use snafu::OptionExt; use store_api::storage::TableId; +use crate::cache::{SchemaCacheRef, TableSchemaCacheRef}; use crate::error::TableInfoNotFoundSnafu; -use crate::key::schema_name::{SchemaManager, SchemaNameKey}; -use crate::key::table_info::{TableInfoManager, TableInfoManagerRef}; -use crate::kv_backend::KvBackendRef; use crate::{error, SchemaOptions}; pub type SchemaMetadataManagerRef = Arc; pub struct SchemaMetadataManager { - table_info_manager: TableInfoManagerRef, - schema_manager: SchemaManager, + table_id_schema_cache: TableSchemaCacheRef, + schema_cache: SchemaCacheRef, #[cfg(any(test, feature = "testing"))] - kv_backend: KvBackendRef, + kv_backend: crate::kv_backend::KvBackendRef, } impl SchemaMetadataManager { /// Creates a new database meta #[cfg(not(any(test, feature = "testing")))] - pub fn new(kv_backend: KvBackendRef) -> Self { - let table_info_manager = Arc::new(TableInfoManager::new(kv_backend.clone())); - let schema_manager = SchemaManager::new(kv_backend); + pub fn new(table_id_schema_cache: TableSchemaCacheRef, schema_cache: SchemaCacheRef) -> Self { Self { - table_info_manager, - schema_manager, + table_id_schema_cache, + schema_cache, } } /// Creates a new database meta #[cfg(any(test, feature = "testing"))] - pub fn new(kv_backend: KvBackendRef) -> Self { - let table_info_manager = Arc::new(TableInfoManager::new(kv_backend.clone())); - let schema_manager = SchemaManager::new(kv_backend.clone()); + pub fn new( + kv_backend: crate::kv_backend::KvBackendRef, + table_id_schema_cache: TableSchemaCacheRef, + schema_cache: SchemaCacheRef, + ) -> Self { Self { - table_info_manager, - schema_manager, + table_id_schema_cache, + schema_cache, kv_backend, } } @@ -62,23 +60,16 @@ impl SchemaMetadataManager { pub async fn get_schema_options_by_table_id( &self, table_id: TableId, - ) -> error::Result> { - let table_info = self - .table_info_manager + ) -> error::Result>> { + let schema_name = self + .table_id_schema_cache .get(table_id) .await? .with_context(|| TableInfoNotFoundSnafu { table: format!("table id: {}", table_id), })?; - let key = SchemaNameKey::new( - &table_info.table_info.catalog_name, - &table_info.table_info.schema_name, - ); - self.schema_manager - .get(key) - .await - .map(|v| v.map(|v| v.into_inner())) + self.schema_cache.get_by_ref(&schema_name).await } #[cfg(any(test, feature = "testing"))] @@ -100,17 +91,19 @@ impl SchemaMetadataManager { meta: Default::default(), table_type: TableType::Base, }); - let (txn, _) = self - .table_info_manager + let table_info_manager = + crate::key::table_info::TableInfoManager::new(self.kv_backend.clone()); + let (txn, _) = table_info_manager .build_create_txn(table_id, &value) .unwrap(); let resp = self.kv_backend.txn(txn).await.unwrap(); assert!(resp.succeeded, "Failed to create table metadata"); - let key = SchemaNameKey { + let key = crate::key::schema_name::SchemaNameKey { catalog: catalog_name, schema: schema_name, }; - self.schema_manager + + crate::key::schema_name::SchemaManager::new(self.kv_backend.clone()) .create(key, schema_value, false) .await .expect("Failed to create schema metadata"); diff --git a/src/common/meta/src/key/schema_name.rs b/src/common/meta/src/key/schema_name.rs index 15cb37c4d1a8..1ec8c17eb5a1 100644 --- a/src/common/meta/src/key/schema_name.rs +++ b/src/common/meta/src/key/schema_name.rs @@ -171,6 +171,7 @@ impl<'a> TryFrom<&'a str> for SchemaNameKey<'a> { } } +#[derive(Clone)] pub struct SchemaManager { kv_backend: KvBackendRef, } diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index c7ef0a724289..9f500050d30b 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -65,6 +65,7 @@ toml.workspace = true tonic.workspace = true [dev-dependencies] +cache.workspace = true client.workspace = true common-meta = { workspace = true, features = ["testing"] } common-query.workspace = true diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 604fee7dfdde..c89c007082bf 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -18,11 +18,11 @@ use std::path::Path; use std::sync::Arc; use std::time::Duration; -use catalog::kvbackend::CachedKvBackendBuilder; use catalog::memory::MemoryCatalogManager; use common_base::Plugins; use common_error::ext::BoxedError; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; +use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef}; use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue}; use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; @@ -57,9 +57,9 @@ use tokio::sync::Notify; use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig}; use crate::error::{ - self, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu, MissingKvBackendSnafu, - MissingNodeIdSnafu, OpenLogStoreSnafu, Result, ShutdownInstanceSnafu, ShutdownServerSnafu, - StartServerSnafu, + self, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu, MissingCacheSnafu, + MissingKvBackendSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, ShutdownInstanceSnafu, + ShutdownServerSnafu, StartServerSnafu, }; use crate::event_listener::{ new_region_server_event_channel, NoopRegionServerEventListener, RegionServerEventListenerRef, @@ -160,6 +160,7 @@ pub struct DatanodeBuilder { plugins: Plugins, meta_client: Option, kv_backend: Option, + cache_registry: Option>, } impl DatanodeBuilder { @@ -171,6 +172,7 @@ impl DatanodeBuilder { plugins, meta_client: None, kv_backend: None, + cache_registry: None, } } @@ -181,6 +183,13 @@ impl DatanodeBuilder { } } + pub fn with_cache_registry(self, cache_registry: Arc) -> Self { + Self { + cache_registry: Some(cache_registry), + ..self + } + } + pub fn with_kv_backend(self, kv_backend: KvBackendRef) -> Self { Self { kv_backend: Some(kv_backend), @@ -209,10 +218,16 @@ impl DatanodeBuilder { (Box::new(NoopRegionServerEventListener) as _, None) }; - let cached_kv_backend = Arc::new(CachedKvBackendBuilder::new(kv_backend.clone()).build()); + let cache_registry = self.cache_registry.take().context(MissingCacheSnafu)?; + let schema_cache: SchemaCacheRef = cache_registry.get().context(MissingCacheSnafu)?; + let table_id_schema_cache: TableSchemaCacheRef = + cache_registry.get().context(MissingCacheSnafu)?; - let schema_metadata_manager = - Arc::new(SchemaMetadataManager::new(cached_kv_backend.clone())); + let schema_metadata_manager = Arc::new(SchemaMetadataManager::new( + kv_backend.clone(), + table_id_schema_cache, + schema_cache, + )); let region_server = self .new_region_server(schema_metadata_manager, region_event_listener) .await?; @@ -248,7 +263,7 @@ impl DatanodeBuilder { &self.opts, region_server.clone(), meta_client, - cached_kv_backend, + cache_registry, ) .await?, ) @@ -591,7 +606,9 @@ mod tests { use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; + use cache::build_datanode_cache_registry; use common_base::Plugins; + use common_meta::cache::LayeredCacheRegistryBuilder; use common_meta::key::datanode_table::DatanodeTableManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::KvBackendRef; @@ -628,13 +645,21 @@ mod tests { mock_region_server.register_engine(mock_region.clone()); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let layered_cache_registry = Arc::new( + LayeredCacheRegistryBuilder::default() + .add_cache_registry(build_datanode_cache_registry(kv_backend)) + .build(), + ); + let builder = DatanodeBuilder::new( DatanodeOptions { node_id: Some(0), ..Default::default() }, Plugins::default(), - ); + ) + .with_cache_registry(layered_cache_registry); let kv = Arc::new(MemoryKvBackend::default()) as _; setup_table_datanode(&kv).await; diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 0b36245924b4..9fbd46e16009 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -364,6 +364,12 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Cache not found in registry"))] + MissingCache { + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -431,6 +437,7 @@ impl ErrorExt for Error { ConcurrentQueryLimiterClosed { .. } | ConcurrentQueryLimiterTimeout { .. } => { StatusCode::RegionBusy } + MissingCache { .. } => StatusCode::Internal, } } diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 3bd2ba2ef1e0..a42edd029229 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -18,9 +18,10 @@ use std::sync::Arc; use std::time::Duration; use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat}; -use catalog::kvbackend::CachedKvBackend; +use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::datanode::REGION_STATISTIC_KEY; use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS; +use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::{ HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef, @@ -40,7 +41,6 @@ use crate::alive_keeper::RegionAliveKeeper; use crate::config::DatanodeOptions; use crate::error::{self, MetaClientInitSnafu, Result}; use crate::event_listener::RegionServerEventReceiver; -use crate::heartbeat::handler::cache_invalidator::InvalidateSchemaCacheHandler; use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT}; use crate::region_server::RegionServer; @@ -72,7 +72,7 @@ impl HeartbeatTask { opts: &DatanodeOptions, region_server: RegionServer, meta_client: MetaClientRef, - cache_kv_backend: Arc, + cache_invalidator: CacheInvalidatorRef, ) -> Result { let region_alive_keeper = Arc::new(RegionAliveKeeper::new( region_server.clone(), @@ -82,7 +82,7 @@ impl HeartbeatTask { region_alive_keeper.clone(), Arc::new(ParseMailboxMessageHandler), Arc::new(RegionHeartbeatResponseHandler::new(region_server.clone())), - Arc::new(InvalidateSchemaCacheHandler::new(cache_kv_backend)), + Arc::new(InvalidateCacheHandler::new(cache_invalidator)), ])); Ok(Self { diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index d902ae98ea79..b5c99e57eec1 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -24,7 +24,6 @@ use futures::future::BoxFuture; use snafu::OptionExt; use store_api::storage::RegionId; -pub(crate) mod cache_invalidator; mod close_region; mod downgrade_region; mod open_region; diff --git a/src/datanode/src/heartbeat/handler/cache_invalidator.rs b/src/datanode/src/heartbeat/handler/cache_invalidator.rs deleted file mode 100644 index 09f4c7b72179..000000000000 --- a/src/datanode/src/heartbeat/handler/cache_invalidator.rs +++ /dev/null @@ -1,167 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Schema cache invalidator handler - -use std::sync::Arc; - -use async_trait::async_trait; -use catalog::kvbackend::CachedKvBackend; -use common_meta::cache_invalidator::KvCacheInvalidator; -use common_meta::heartbeat::handler::{ - HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, -}; -use common_meta::instruction::{CacheIdent, Instruction}; -use common_meta::key::schema_name::SchemaNameKey; -use common_meta::key::MetadataKey; -use common_telemetry::debug; - -#[derive(Clone)] -pub(crate) struct InvalidateSchemaCacheHandler { - cached_kv_backend: Arc, -} - -#[async_trait] -impl HeartbeatResponseHandler for InvalidateSchemaCacheHandler { - fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool { - matches!( - ctx.incoming_message.as_ref(), - Some((_, Instruction::InvalidateCaches(_))) - ) - } - - async fn handle( - &self, - ctx: &mut HeartbeatResponseHandlerContext, - ) -> common_meta::error::Result { - let Some((_, Instruction::InvalidateCaches(caches))) = ctx.incoming_message.take() else { - unreachable!("InvalidateSchemaCacheHandler: should be guarded by 'is_acceptable'") - }; - - debug!( - "InvalidateSchemaCacheHandler: invalidating caches: {:?}", - caches - ); - - for cache in caches { - let CacheIdent::SchemaName(schema_name) = cache else { - continue; - }; - let key: SchemaNameKey = (&schema_name).into(); - let key_bytes = key.to_bytes(); - // invalidate cache - self.cached_kv_backend.invalidate_key(&key_bytes).await; - } - - Ok(HandleControl::Done) - } -} - -impl InvalidateSchemaCacheHandler { - pub fn new(cached_kv_backend: Arc) -> Self { - Self { cached_kv_backend } - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - use std::time::Duration; - - use api::v1::meta::HeartbeatResponse; - use catalog::kvbackend::CachedKvBackendBuilder; - use common_meta::heartbeat::handler::{ - HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor, - }; - use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta}; - use common_meta::instruction::{CacheIdent, Instruction}; - use common_meta::key::schema_name::{SchemaName, SchemaNameKey, SchemaNameValue}; - use common_meta::key::{MetadataKey, SchemaMetadataManager}; - use common_meta::kv_backend::memory::MemoryKvBackend; - use common_meta::kv_backend::KvBackend; - use common_meta::rpc::store::PutRequest; - - use crate::heartbeat::handler::cache_invalidator::InvalidateSchemaCacheHandler; - - #[tokio::test] - async fn test_invalidate_schema_cache_handler() { - let inner_kv = Arc::new(MemoryKvBackend::default()); - let cached_kv = Arc::new(CachedKvBackendBuilder::new(inner_kv.clone()).build()); - let schema_metadata_manager = SchemaMetadataManager::new(cached_kv.clone()); - - let schema_name = "test_schema"; - let catalog_name = "test_catalog"; - schema_metadata_manager - .register_region_table_info( - 1, - "test_table", - schema_name, - catalog_name, - Some(SchemaNameValue { - ttl: Some(Duration::from_secs(1)), - }), - ) - .await; - - schema_metadata_manager - .get_schema_options_by_table_id(1) - .await - .unwrap(); - - let schema_key = SchemaNameKey::new(catalog_name, schema_name).to_bytes(); - let new_schema_value = SchemaNameValue { - ttl: Some(Duration::from_secs(3)), - } - .try_as_raw_value() - .unwrap(); - inner_kv - .put(PutRequest { - key: schema_key.clone(), - value: new_schema_value, - prev_kv: false, - }) - .await - .unwrap(); - - let executor = Arc::new(HandlerGroupExecutor::new(vec![Arc::new( - InvalidateSchemaCacheHandler::new(cached_kv), - )])); - - let (tx, _) = tokio::sync::mpsc::channel(8); - let mailbox = Arc::new(HeartbeatMailbox::new(tx)); - - // removes a valid key - let response = HeartbeatResponse::default(); - let mut ctx: HeartbeatResponseHandlerContext = - HeartbeatResponseHandlerContext::new(mailbox, response); - ctx.incoming_message = Some(( - MessageMeta::new_test(1, "hi", "foo", "bar"), - Instruction::InvalidateCaches(vec![CacheIdent::SchemaName(SchemaName { - catalog_name: catalog_name.to_string(), - schema_name: schema_name.to_string(), - })]), - )); - executor.handle(ctx).await.unwrap(); - - assert_eq!( - Some(Duration::from_secs(3)), - SchemaNameValue::try_from_raw_value( - &inner_kv.get(&schema_key).await.unwrap().unwrap().value - ) - .unwrap() - .unwrap() - .ttl - ); - } -} diff --git a/src/frontend/src/heartbeat.rs b/src/frontend/src/heartbeat.rs index db0761e56dcd..efa5c32286bb 100644 --- a/src/frontend/src/heartbeat.rs +++ b/src/frontend/src/heartbeat.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +#[cfg(test)] +mod tests; + use std::sync::Arc; use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer}; @@ -34,8 +37,6 @@ use crate::error::Result; use crate::frontend::FrontendOptions; use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT}; -pub mod handler; - /// The frontend heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background. #[derive(Clone)] pub struct HeartbeatTask { diff --git a/src/frontend/src/heartbeat/handler.rs b/src/frontend/src/heartbeat/handler.rs deleted file mode 100644 index 24f15d115e66..000000000000 --- a/src/frontend/src/heartbeat/handler.rs +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub mod invalidate_table_cache; - -#[cfg(test)] -pub(crate) mod tests; diff --git a/src/frontend/src/heartbeat/handler/tests.rs b/src/frontend/src/heartbeat/tests.rs similarity index 96% rename from src/frontend/src/heartbeat/handler/tests.rs rename to src/frontend/src/heartbeat/tests.rs index 1791f6e91fcd..c44d3a15d150 100644 --- a/src/frontend/src/heartbeat/handler/tests.rs +++ b/src/frontend/src/heartbeat/tests.rs @@ -17,6 +17,7 @@ use std::sync::{Arc, Mutex}; use api::v1::meta::HeartbeatResponse; use common_meta::cache_invalidator::KvCacheInvalidator; +use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler; use common_meta::heartbeat::handler::{ HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor, }; @@ -29,8 +30,6 @@ use partition::manager::TableRouteCacheInvalidator; use table::metadata::TableId; use tokio::sync::mpsc; -use super::invalidate_table_cache::InvalidateTableCacheHandler; - #[derive(Default)] pub struct MockKvCacheInvalidator { inner: Mutex, i32>>, @@ -85,7 +84,7 @@ async fn test_invalidate_table_cache_handler() { }); let executor = Arc::new(HandlerGroupExecutor::new(vec![Arc::new( - InvalidateTableCacheHandler::new(backend.clone()), + InvalidateCacheHandler::new(backend.clone()), )])); let (tx, _) = mpsc::channel(8); @@ -124,7 +123,7 @@ async fn test_invalidate_schema_key_handler() { }); let executor = Arc::new(HandlerGroupExecutor::new(vec![Arc::new( - InvalidateTableCacheHandler::new(backend.clone()), + InvalidateCacheHandler::new(backend.clone()), )])); let (tx, _) = mpsc::channel(8); diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 57337d8f74ae..44aa03a67df8 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -679,12 +679,10 @@ fn get_expired_ssts( #[cfg(test)] mod tests { - use common_meta::key::SchemaMetadataManager; - use common_meta::kv_backend::memory::MemoryKvBackend; - use common_meta::kv_backend::KvBackendRef; use tokio::sync::oneshot; use super::*; + use crate::test_util::mock_schema_metadata_manager; use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler}; use crate::test_util::version_util::{apply_edit, VersionControlBuilder}; @@ -694,10 +692,7 @@ mod tests { let (tx, _rx) = mpsc::channel(4); let mut scheduler = env.mock_compaction_scheduler(tx); let mut builder = VersionControlBuilder::new(); - let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(Arc::new( - MemoryKvBackend::new(), - ) - as KvBackendRef)); + let schema_metadata_manager = mock_schema_metadata_manager(); schema_metadata_manager .register_region_table_info( builder.region_id().table_id(), @@ -760,10 +755,8 @@ mod tests { let mut builder = VersionControlBuilder::new(); let purger = builder.file_purger(); let region_id = builder.region_id(); - let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(Arc::new( - MemoryKvBackend::new(), - ) - as KvBackendRef)); + + let schema_metadata_manager = mock_schema_metadata_manager(); schema_metadata_manager .register_region_table_info( builder.region_id().table_id(), diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index ac3cff1578de..dec175e76ff6 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -35,6 +35,8 @@ use api::v1::{OpType, Row, Rows, SemanticType}; use common_base::readable_size::ReadableSize; use common_base::Plugins; use common_datasource::compression::CompressionType; +use common_meta::cache::{new_schema_cache, new_table_info_cache, new_table_schema_cache}; +use common_meta::key::schema_name::{SchemaName, SchemaNameValue}; use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef}; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::KvBackendRef; @@ -47,6 +49,7 @@ use datatypes::schema::ColumnSchema; use log_store::kafka::log_store::KafkaLogStore; use log_store::raft_engine::log_store::RaftEngineLogStore; use log_store::test_util::log_store_util; +use moka::future::{Cache, CacheBuilder}; use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef}; use object_store::services::Fs; use object_store::ObjectStore; @@ -210,43 +213,37 @@ impl Default for TestEnv { impl TestEnv { /// Returns a new env with empty prefix for test. pub fn new() -> TestEnv { + let schema_metadata_manager = mock_schema_metadata_manager(); TestEnv { data_home: create_temp_dir(""), log_store: None, log_store_factory: LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory), object_store_manager: None, - schema_metadata_manager: Arc::new(SchemaMetadataManager::new(Arc::new( - MemoryKvBackend::new(), - ) - as KvBackendRef)), + schema_metadata_manager, } } /// Returns a new env with specific `prefix` for test. pub fn with_prefix(prefix: &str) -> TestEnv { + let schema_metadata_manager = mock_schema_metadata_manager(); TestEnv { data_home: create_temp_dir(prefix), log_store: None, log_store_factory: LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory), object_store_manager: None, - schema_metadata_manager: Arc::new(SchemaMetadataManager::new(Arc::new( - MemoryKvBackend::new(), - ) - as KvBackendRef)), + schema_metadata_manager, } } /// Returns a new env with specific `data_home` for test. pub fn with_data_home(data_home: TempDir) -> TestEnv { + let schema_metadata_manager = mock_schema_metadata_manager(); TestEnv { data_home, log_store: None, log_store_factory: LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory), object_store_manager: None, - schema_metadata_manager: Arc::new(SchemaMetadataManager::new(Arc::new( - MemoryKvBackend::new(), - ) - as KvBackendRef)), + schema_metadata_manager, } } @@ -1154,3 +1151,22 @@ pub async fn reopen_region( .unwrap(); } } + +pub(crate) fn mock_schema_metadata_manager() -> Arc { + let kv_backend = Arc::new(MemoryKvBackend::new()); + let table_schema_cache = Arc::new(new_table_schema_cache( + "table_schema_name_cache".to_string(), + CacheBuilder::default().build(), + kv_backend.clone(), + )); + let schema_cache = Arc::new(new_schema_cache( + "schema_cache".to_string(), + CacheBuilder::default().build(), + kv_backend.clone(), + )); + Arc::new(SchemaMetadataManager::new( + kv_backend as KvBackendRef, + table_schema_cache, + schema_cache, + )) +} diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 2402c301ca5d..8bdb8299f7c4 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -19,7 +19,10 @@ use std::time::Duration; use api::v1::region::region_server::RegionServer; use arrow_flight::flight_service_server::FlightServiceServer; -use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; +use cache::{ + build_datanode_cache_registry, build_fundamental_cache_registry, + with_default_composite_cache_registry, +}; use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend}; use client::client_manager::NodeClients; use client::Client; @@ -27,6 +30,7 @@ use cmd::DistributedInformationExtension; use common_base::Plugins; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; +use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_meta::kv_backend::chroot::ChrootKvBackend; @@ -42,7 +46,6 @@ use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; use datanode::config::{DatanodeOptions, ObjectStoreConfig}; use datanode::datanode::{Datanode, DatanodeBuilder, ProcedureConfig}; use frontend::frontend::FrontendOptions; -use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler; use frontend::heartbeat::HeartbeatTask; use frontend::instance::builder::FrontendBuilder; use frontend::instance::{FrontendInstance, Instance as FeInstance}; @@ -328,8 +331,15 @@ impl GreptimeDbClusterBuilder { client: meta_client.clone(), }); + let layered_cache_registry = Arc::new( + LayeredCacheRegistryBuilder::default() + .add_cache_registry(build_datanode_cache_registry(meta_backend.clone())) + .build(), + ); + let mut datanode = DatanodeBuilder::new(opts, Plugins::default()) .with_kv_backend(meta_backend) + .with_cache_registry(layered_cache_registry) .with_meta_client(meta_client) .build() .await @@ -382,7 +392,7 @@ impl GreptimeDbClusterBuilder { let handlers_executor = HandlerGroupExecutor::new(vec![ Arc::new(ParseMailboxMessageHandler), - Arc::new(InvalidateTableCacheHandler::new(cache_registry.clone())), + Arc::new(InvalidateCacheHandler::new(cache_registry.clone())), ]); let options = FrontendOptions::default(); diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 47095ecc65b3..cc2458fa99cb 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -14,7 +14,10 @@ use std::sync::Arc; -use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; +use cache::{ + build_datanode_cache_registry, build_fundamental_cache_registry, + with_default_composite_cache_registry, +}; use catalog::information_schema::NoopInformationExtension; use catalog::kvbackend::KvBackendCatalogManager; use cmd::error::StartFlownodeSnafu; @@ -125,8 +128,15 @@ impl GreptimeDbStandaloneBuilder { ) -> GreptimeDbStandalone { let plugins = self.plugin.clone().unwrap_or_default(); + let layered_cache_registry = Arc::new( + LayeredCacheRegistryBuilder::default() + .add_cache_registry(build_datanode_cache_registry(kv_backend.clone())) + .build(), + ); + let datanode = DatanodeBuilder::new(opts.datanode_options(), plugins.clone()) .with_kv_backend(kv_backend.clone()) + .with_cache_registry(layered_cache_registry) .build() .await .unwrap();