From c885e1f72cf65168d00a7d74dba1cefb3e572a51 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sat, 31 Aug 2024 11:47:51 +0800 Subject: [PATCH 1/2] feat: remove files from the write cache in purger --- src/mito2/src/cache/file_cache.rs | 1 - src/mito2/src/cache/write_cache.rs | 12 ++++++++++++ src/mito2/src/sst/file_purger.rs | 26 +++++++++++++++++++++++++- 3 files changed, 37 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index 6e902490c0dc..9aebcc48e988 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -179,7 +179,6 @@ impl FileCache { } } - #[allow(unused)] /// Removes a file from the cache explicitly. pub(crate) async fn remove(&self, key: IndexKey) { let file_path = self.cache_file_path(key); diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index ff856fa4cca8..822ff4a29e1b 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -169,6 +169,11 @@ impl WriteCache { Ok(Some(sst_info)) } + /// Removes a file from the cache by `index_key`. + pub(crate) async fn remove(&self, index_key: IndexKey) { + self.file_cache.remove(index_key).await + } + /// Downloads a file in `remote_path` from the remote object store to the local cache /// (specified by `index_key`). pub(crate) async fn download( @@ -424,6 +429,13 @@ mod tests { .await .unwrap(); assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec()); + + // Removes the file from the cache. + let sst_index_key = IndexKey::new(region_id, file_id, FileType::Parquet); + write_cache.remove(sst_index_key).await; + assert!(!write_cache.file_cache.contains_key(&sst_index_key)); + write_cache.remove(index_key).await; + assert!(!write_cache.file_cache.contains_key(&index_key)); } #[tokio::test] diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index 9e6c6c89e8eb..76c7a7150328 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use common_telemetry::{error, info}; use crate::access_layer::AccessLayerRef; +use crate::cache::file_cache::{FileType, IndexKey}; use crate::cache::CacheManagerRef; use crate::schedule::scheduler::SchedulerRef; use crate::sst::file::FileMeta; @@ -77,9 +78,10 @@ impl FilePurger for LocalFilePurger { cache.remove_parquet_meta_data(file_meta.region_id, file_meta.file_id); } + let cache_manager = self.cache_manager.clone(); if let Err(e) = self.scheduler.schedule(Box::pin(async move { if let Err(e) = sst_layer.delete_sst(&file_meta).await { - error!(e; "Failed to delete SST file, file_id: {}, region: {}", + error!(e; "Failed to delete SST file, file_id: {}, region: {}", file_meta.file_id, file_meta.region_id); } else { info!( @@ -87,6 +89,28 @@ impl FilePurger for LocalFilePurger { file_meta.file_id, file_meta.region_id ); } + + if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) + { + // Removes the inverted index from the cache. + if file_meta.inverted_index_available() { + write_cache + .remove(IndexKey::new( + file_meta.region_id, + file_meta.file_id, + FileType::Puffin, + )) + .await; + } + // Remove the SST file from the cache. + write_cache + .remove(IndexKey::new( + file_meta.region_id, + file_meta.file_id, + FileType::Parquet, + )) + .await; + } })) { error!(e; "Failed to schedule the file purge request"); } From 4a9d0b02eaa169cdb38550812b2f089e3f14e0ce Mon Sep 17 00:00:00 2001 From: evenyag Date: Sat, 31 Aug 2024 12:04:21 +0800 Subject: [PATCH 2/2] chore: fix typo --- src/catalog/src/kvbackend/manager.rs | 2 +- src/datanode/src/region_server.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index 6b44b2459c46..d39e1abdb9f8 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -313,7 +313,7 @@ struct SystemCatalog { catalog_cache: Cache>, pg_catalog_cache: Cache>, - // system_schema_provier for default catalog + // system_schema_provider for default catalog information_schema_provider: Arc, pg_catalog_provider: Arc, backend: KvBackendRef, diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 20441b86f657..f6cc479d6a17 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -860,7 +860,7 @@ impl RegionServerInner { // complains "higher-ranked lifetime error". Rust can't prove some future is legit. // Possible related issue: https://github.com/rust-lang/rust/issues/102211 // - // The walkaround is to put the async functions in the `common_runtime::spawn_global`. Or like + // The workaround is to put the async functions in the `common_runtime::spawn_global`. Or like // it here, collect the values first then use later separately. let regions = self