diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index 52a1cba982e1..de289f689e2b 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -28,7 +28,7 @@ use common_telemetry::{info, warn}; use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer}; use object_store::services::Fs; use object_store::util::{join_dir, normalize_dir, with_instrument_layers}; -use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder, OBJECT_CACHE_DIR}; +use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder}; use snafu::prelude::*; use crate::config::{HttpClientConfig, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE}; @@ -147,12 +147,10 @@ async fn build_cache_layer( }; // Enable object cache by default - // Set the cache_path to be `${data_home}/object_cache/read/{name}` by default + // Set the cache_path to be `${data_home}/{name}` by default // if it's not present if cache_path.is_none() { - let object_cache_path = join_dir(data_home, OBJECT_CACHE_DIR); - let read_cache_path = join_dir(&object_cache_path, "read"); - let read_cache_path = join_dir(&read_cache_path, &name.to_lowercase()); + let read_cache_path = join_dir(data_home, &name.to_lowercase()); tokio::fs::create_dir_all(Path::new(&read_cache_path)) .await .context(CreateDirSnafu { diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index eb112530cad7..2b30bd9de608 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -37,8 +37,10 @@ use crate::sst::file::FileId; use crate::sst::parquet::helper::fetch_byte_ranges; use crate::sst::parquet::metadata::MetadataLoader; -/// Subdirectory of cached files. -const FILE_DIR: &str = "files/"; +/// Subdirectory of cached files for write. +/// +/// This must contain three layers, corresponding to [`build_prometheus_metrics_layer`](object_store::layers::build_prometheus_metrics_layer). +const FILE_DIR: &str = "greptimedb/object_cache/write/"; /// A file cache manages files on local store and evict files based /// on size. diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 963089c60aed..7a1574c850ae 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -20,8 +20,6 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; use common_telemetry::warn; -use object_store::util::join_dir; -use object_store::OBJECT_CACHE_DIR; use serde::{Deserialize, Serialize}; use serde_with::serde_as; @@ -97,7 +95,7 @@ pub struct MitoConfig { pub selector_result_cache_size: ReadableSize, /// Whether to enable the experimental write cache. pub enable_experimental_write_cache: bool, - /// File system path for write cache, defaults to `{data_home}/object_cache/write`. + /// File system path for write cache dir's root, defaults to `{data_home}`. pub experimental_write_cache_path: String, /// Capacity for write cache. pub experimental_write_cache_size: ReadableSize, @@ -234,8 +232,7 @@ impl MitoConfig { // Sets write cache path if it is empty. if self.experimental_write_cache_path.trim().is_empty() { - let object_cache_path = join_dir(data_home, OBJECT_CACHE_DIR); - self.experimental_write_cache_path = join_dir(&object_cache_path, "write"); + self.experimental_write_cache_path = data_home.to_string(); } self.index.sanitize(data_home, &self.inverted_index)?; diff --git a/src/object-store/src/layers/lru_cache.rs b/src/object-store/src/layers/lru_cache.rs index 197a222162be..4ed12c164ddd 100644 --- a/src/object-store/src/layers/lru_cache.rs +++ b/src/object-store/src/layers/lru_cache.rs @@ -117,9 +117,7 @@ impl LayeredAccess for LruCacheAccess { async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { let result = self.inner.write(path, args).await; - self.read_cache - .invalidate_entries_with_prefix(format!("{:x}", md5::compute(path))) - .await; + self.read_cache.invalidate_entries_with_prefix(path).await; result } @@ -127,9 +125,7 @@ impl LayeredAccess for LruCacheAccess { async fn delete(&self, path: &str, args: OpDelete) -> Result { let result = self.inner.delete(path, args).await; - self.read_cache - .invalidate_entries_with_prefix(format!("{:x}", md5::compute(path))) - .await; + self.read_cache.invalidate_entries_with_prefix(path).await; result } @@ -147,7 +143,7 @@ impl LayeredAccess for LruCacheAccess { let result = self.inner.blocking_write(path, args); self.read_cache - .blocking_invalidate_entries_with_prefix(format!("{:x}", md5::compute(path))); + .blocking_invalidate_entries_with_prefix(path); result } diff --git a/src/object-store/src/layers/lru_cache/read_cache.rs b/src/object-store/src/layers/lru_cache/read_cache.rs index 874b17280d9c..3f42789a5c47 100644 --- a/src/object-store/src/layers/lru_cache/read_cache.rs +++ b/src/object-store/src/layers/lru_cache/read_cache.rs @@ -28,6 +28,10 @@ use crate::metrics::{ }; const RECOVER_CACHE_LIST_CONCURRENT: usize = 8; +/// Subdirectory of cached files for read. +/// +/// This must contain three layers, corresponding to [`build_prometheus_metrics_layer`](object_store::layers::build_prometheus_metrics_layer). +const READ_CACHE_DIR: &str = "greptimedb/object_cache/read"; /// Cache value for read file #[derive(Debug, Clone, PartialEq, Eq, Copy)] @@ -56,12 +60,20 @@ fn can_cache(path: &str) -> bool { /// Generate a unique cache key for the read path and range. fn read_cache_key(path: &str, args: &OpRead) -> String { format!( - "{:x}.cache-{}", + "{READ_CACHE_DIR}/{:x}.cache-{}", md5::compute(path), args.range().to_header() ) } +fn read_cache_root() -> String { + format!("/{READ_CACHE_DIR}") +} + +fn read_cache_key_prefix(path: &str) -> String { + format!("{READ_CACHE_DIR}/{:x}", md5::compute(path)) +} + /// Local read cache for files in object storage #[derive(Debug)] pub(crate) struct ReadCache { @@ -125,16 +137,18 @@ impl ReadCache { (self.mem_cache.entry_count(), self.mem_cache.weighted_size()) } - /// Invalidate all cache items which key starts with `prefix`. - pub(crate) async fn invalidate_entries_with_prefix(&self, prefix: String) { + /// Invalidate all cache items belong to the specific path. + pub(crate) async fn invalidate_entries_with_prefix(&self, path: &str) { + let prefix = read_cache_key_prefix(path); // Safety: always ok when building cache with `support_invalidation_closures`. self.mem_cache .invalidate_entries_if(move |k: &String, &_v| k.starts_with(&prefix)) .ok(); } - /// Blocking version of `invalidate_entries_with_prefix`. - pub(crate) fn blocking_invalidate_entries_with_prefix(&self, prefix: String) { + /// Blocking version of [`invalidate_entries_with_prefix`](Self::invalidate_entries_with_prefix). + pub(crate) fn blocking_invalidate_entries_with_prefix(&self, path: &str) { + let prefix = read_cache_key_prefix(path); // Safety: always ok when building cache with `support_invalidation_closures`. self.mem_cache .invalidate_entries_if(move |k: &String, &_v| k.starts_with(&prefix)) @@ -145,8 +159,9 @@ impl ReadCache { /// Return entry count and total approximate entry size in bytes. pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> { let op = OperatorBuilder::new(self.file_cache.clone()).finish(); + let root = read_cache_root(); let mut entries = op - .list_with("/") + .list_with(&root) .metakey(Metakey::ContentLength | Metakey::ContentType) .concurrent(RECOVER_CACHE_LIST_CONCURRENT) .await?; @@ -157,7 +172,7 @@ impl ReadCache { OBJECT_STORE_LRU_CACHE_ENTRIES.inc(); OBJECT_STORE_LRU_CACHE_BYTES.add(size as i64); // ignore root path - if entry.path() != "/" { + if entry.path() != &root { self.mem_cache .insert(read_key.to_string(), ReadResult::Success(size as u32)) .await;