Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: remove files from the write cache in purger #4655

Merged
merged 2 commits into from
Aug 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/catalog/src/kvbackend/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ struct SystemCatalog {
catalog_cache: Cache<String, Arc<InformationSchemaProvider>>,
pg_catalog_cache: Cache<String, Arc<PGCatalogProvider>>,

// system_schema_provier for default catalog
// system_schema_provider for default catalog
information_schema_provider: Arc<InformationSchemaProvider>,
pg_catalog_provider: Arc<PGCatalogProvider>,
backend: KvBackendRef,
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ impl RegionServerInner {
// complains "higher-ranked lifetime error". Rust can't prove some future is legit.
// Possible related issue: https://github.com/rust-lang/rust/issues/102211
//
// The walkaround is to put the async functions in the `common_runtime::spawn_global`. Or like
// The workaround is to put the async functions in the `common_runtime::spawn_global`. Or like
// it here, collect the values first then use later separately.

let regions = self
Expand Down
1 change: 0 additions & 1 deletion src/mito2/src/cache/file_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ impl FileCache {
}
}

#[allow(unused)]
/// Removes a file from the cache explicitly.
pub(crate) async fn remove(&self, key: IndexKey) {
let file_path = self.cache_file_path(key);
Expand Down
12 changes: 12 additions & 0 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ impl WriteCache {
Ok(Some(sst_info))
}

/// Removes a file from the cache by `index_key`.
pub(crate) async fn remove(&self, index_key: IndexKey) {
self.file_cache.remove(index_key).await
}

/// Downloads a file in `remote_path` from the remote object store to the local cache
/// (specified by `index_key`).
pub(crate) async fn download(
Expand Down Expand Up @@ -424,6 +429,13 @@ mod tests {
.await
.unwrap();
assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec());

// Removes the file from the cache.
let sst_index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
write_cache.remove(sst_index_key).await;
assert!(!write_cache.file_cache.contains_key(&sst_index_key));
write_cache.remove(index_key).await;
assert!(!write_cache.file_cache.contains_key(&index_key));
}

#[tokio::test]
Expand Down
26 changes: 25 additions & 1 deletion src/mito2/src/sst/file_purger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;
use common_telemetry::{error, info};

use crate::access_layer::AccessLayerRef;
use crate::cache::file_cache::{FileType, IndexKey};
use crate::cache::CacheManagerRef;
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file::FileMeta;
Expand Down Expand Up @@ -77,16 +78,39 @@ impl FilePurger for LocalFilePurger {
cache.remove_parquet_meta_data(file_meta.region_id, file_meta.file_id);
}

let cache_manager = self.cache_manager.clone();
if let Err(e) = self.scheduler.schedule(Box::pin(async move {
if let Err(e) = sst_layer.delete_sst(&file_meta).await {
error!(e; "Failed to delete SST file, file_id: {}, region: {}",
error!(e; "Failed to delete SST file, file_id: {}, region: {}",
file_meta.file_id, file_meta.region_id);
} else {
info!(
"Successfully deleted SST file, file_id: {}, region: {}",
file_meta.file_id, file_meta.region_id
);
}

if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache())
{
// Removes the inverted index from the cache.
if file_meta.inverted_index_available() {
write_cache
.remove(IndexKey::new(
file_meta.region_id,
file_meta.file_id,
FileType::Puffin,
))
.await;
}
// Remove the SST file from the cache.
write_cache
.remove(IndexKey::new(
file_meta.region_id,
file_meta.file_id,
FileType::Parquet,
))
.await;
}
})) {
error!(e; "Failed to schedule the file purge request");
}
Expand Down