diff --git a/config/config.md b/config/config.md index d3353930b163..db2f6e010286 100644 --- a/config/config.md +++ b/config/config.md @@ -421,7 +421,7 @@ | `storage` | -- | -- | The data storage options. | | `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. | | `storage.type` | String | `File` | The storage type used to store the data.
- `File`: the data is stored in the local file system.
- `S3`: the data is stored in the S3 object storage.
- `Gcs`: the data is stored in the Google Cloud Storage.
- `Azblob`: the data is stored in the Azure Blob Storage.
- `Oss`: the data is stored in the Aliyun OSS. | -| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.
A local file directory, defaults to `{data_home}/object_cache/read`. An empty string means disabling. | +| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.
A local file directory, defaults to `{data_home}`. An empty string means disabling. | | `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. | | `storage.bucket` | String | Unset | The S3 bucket name.
**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. | | `storage.root` | String | Unset | The S3 data will be stored in the specified prefix, for example, `s3://${bucket}/${root}`.
**It's only used when the storage type is `S3`, `Oss` and `Azblob`**. | @@ -460,7 +460,7 @@ | `region_engine.mito.page_cache_size` | String | Auto | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.
If not set, it's default to 1/8 of OS memory. | | `region_engine.mito.selector_result_cache_size` | String | Auto | Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.
If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. | | `region_engine.mito.enable_experimental_write_cache` | Bool | `false` | Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. | -| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}/object_cache/write`. | +| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}`. | | `region_engine.mito.experimental_write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. | | `region_engine.mito.experimental_write_cache_ttl` | String | Unset | TTL for write cache. | | `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 90a4d69b2e89..1b062a4b3af1 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -294,7 +294,7 @@ data_home = "/tmp/greptimedb/" type = "File" ## Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance. -## A local file directory, defaults to `{data_home}/object_cache/read`. An empty string means disabling. +## A local file directory, defaults to `{data_home}`. An empty string means disabling. ## @toml2docs:none-default #+ cache_path = "" @@ -478,7 +478,7 @@ auto_flush_interval = "1h" ## Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. enable_experimental_write_cache = false -## File system path for write cache, defaults to `{data_home}/object_cache/write`. +## File system path for write cache, defaults to `{data_home}`. experimental_write_cache_path = "" ## Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index 52a1cba982e1..64cacd4e8583 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -28,7 +28,7 @@ use common_telemetry::{info, warn}; use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer}; use object_store::services::Fs; use object_store::util::{join_dir, normalize_dir, with_instrument_layers}; -use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder, OBJECT_CACHE_DIR}; +use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder}; use snafu::prelude::*; use crate::config::{HttpClientConfig, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE}; @@ -147,12 +147,10 @@ async fn build_cache_layer( }; // Enable object cache by default - // Set the cache_path to be `${data_home}/object_cache/read/{name}` by default + // Set the cache_path to be `${data_home}` by default // if it's not present if cache_path.is_none() { - let object_cache_path = join_dir(data_home, OBJECT_CACHE_DIR); - let read_cache_path = join_dir(&object_cache_path, "read"); - let read_cache_path = join_dir(&read_cache_path, &name.to_lowercase()); + let read_cache_path = data_home.to_string(); tokio::fs::create_dir_all(Path::new(&read_cache_path)) .await .context(CreateDirSnafu { diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index eb112530cad7..51c20f742839 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -37,8 +37,10 @@ use crate::sst::file::FileId; use crate::sst::parquet::helper::fetch_byte_ranges; use crate::sst::parquet::metadata::MetadataLoader; -/// Subdirectory of cached files. -const FILE_DIR: &str = "files/"; +/// Subdirectory of cached files for write. +/// +/// This must contain three layers, corresponding to [`build_prometheus_metrics_layer`](object_store::layers::build_prometheus_metrics_layer). +const FILE_DIR: &str = "cache/object/write/"; /// A file cache manages files on local store and evict files based /// on size. diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 8a431f22a63d..fc9972de5305 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -20,7 +20,6 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; use common_telemetry::{debug, info}; use futures::AsyncWriteExt; -use object_store::manager::ObjectStoreManagerRef; use object_store::ObjectStore; use snafu::ResultExt; @@ -44,10 +43,6 @@ use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY}; pub struct WriteCache { /// Local file cache. file_cache: FileCacheRef, - /// Object store manager. - #[allow(unused)] - /// TODO: Remove unused after implementing async write cache - object_store_manager: ObjectStoreManagerRef, /// Puffin manager factory for index. puffin_manager_factory: PuffinManagerFactory, /// Intermediate manager for index. @@ -61,7 +56,6 @@ impl WriteCache { /// `object_store_manager` for all object stores. pub async fn new( local_store: ObjectStore, - object_store_manager: ObjectStoreManagerRef, cache_capacity: ReadableSize, ttl: Option, puffin_manager_factory: PuffinManagerFactory, @@ -72,7 +66,6 @@ impl WriteCache { Ok(Self { file_cache, - object_store_manager, puffin_manager_factory, intermediate_manager, }) @@ -81,7 +74,6 @@ impl WriteCache { /// Creates a write cache based on local fs. pub async fn new_fs( cache_dir: &str, - object_store_manager: ObjectStoreManagerRef, cache_capacity: ReadableSize, ttl: Option, puffin_manager_factory: PuffinManagerFactory, @@ -92,7 +84,6 @@ impl WriteCache { let local_store = new_fs_cache_store(cache_dir).await?; Self::new( local_store, - object_store_manager, cache_capacity, ttl, puffin_manager_factory, diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 963089c60aed..7a1574c850ae 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -20,8 +20,6 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; use common_telemetry::warn; -use object_store::util::join_dir; -use object_store::OBJECT_CACHE_DIR; use serde::{Deserialize, Serialize}; use serde_with::serde_as; @@ -97,7 +95,7 @@ pub struct MitoConfig { pub selector_result_cache_size: ReadableSize, /// Whether to enable the experimental write cache. pub enable_experimental_write_cache: bool, - /// File system path for write cache, defaults to `{data_home}/object_cache/write`. + /// File system path for write cache dir's root, defaults to `{data_home}`. pub experimental_write_cache_path: String, /// Capacity for write cache. pub experimental_write_cache_size: ReadableSize, @@ -234,8 +232,7 @@ impl MitoConfig { // Sets write cache path if it is empty. if self.experimental_write_cache_path.trim().is_empty() { - let object_cache_path = join_dir(data_home, OBJECT_CACHE_DIR); - self.experimental_write_cache_path = join_dir(&object_cache_path, "write"); + self.experimental_write_cache_path = data_home.to_string(); } self.index.sanitize(data_home, &self.inverted_index)?; diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 314e886ba9ca..14b4bb4a9109 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -644,16 +644,9 @@ impl TestEnv { .unwrap(); let object_store_manager = self.get_object_store_manager().unwrap(); - let write_cache = WriteCache::new( - local_store, - object_store_manager, - capacity, - None, - puffin_mgr, - intm_mgr, - ) - .await - .unwrap(); + let write_cache = WriteCache::new(local_store, capacity, None, puffin_mgr, intm_mgr) + .await + .unwrap(); Arc::new(write_cache) } diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 233ab9f056b1..62ad59f0701a 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -157,7 +157,6 @@ impl WorkerGroup { let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges)); let write_cache = write_cache_from_config( &config, - object_store_manager.clone(), puffin_manager_factory.clone(), intermediate_manager.clone(), ) @@ -303,7 +302,6 @@ impl WorkerGroup { .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _)); let write_cache = write_cache_from_config( &config, - object_store_manager.clone(), puffin_manager_factory.clone(), intermediate_manager.clone(), ) @@ -364,7 +362,6 @@ fn region_id_to_index(id: RegionId, num_workers: usize) -> usize { async fn write_cache_from_config( config: &MitoConfig, - object_store_manager: ObjectStoreManagerRef, puffin_manager_factory: PuffinManagerFactory, intermediate_manager: IntermediateManager, ) -> Result> { @@ -383,7 +380,6 @@ async fn write_cache_from_config( let cache = WriteCache::new_fs( &config.experimental_write_cache_path, - object_store_manager, config.experimental_write_cache_size, config.experimental_write_cache_ttl, puffin_manager_factory, diff --git a/src/object-store/src/layers.rs b/src/object-store/src/layers.rs index 20108ab63c52..8383fd237952 100644 --- a/src/object-store/src/layers.rs +++ b/src/object-store/src/layers.rs @@ -25,14 +25,14 @@ mod prometheus { static PROMETHEUS_LAYER: OnceLock> = OnceLock::new(); + /// This logical tries to extract parent path from the object storage operation + /// the function also relies on assumption that the region path is built from + /// pattern `/catalog/schema/table_id/...` OR `greptimedb/object_cache//...` + /// + /// We'll get the data/catalog/schema from path. pub fn build_prometheus_metrics_layer(with_path_label: bool) -> PrometheusLayer { PROMETHEUS_LAYER .get_or_init(|| { - // This logical tries to extract parent path from the object storage operation - // the function also relies on assumption that the region path is built from - // pattern `/catalog/schema/table_id/....` - // - // We'll get the data/catalog/schema from path. let path_level = if with_path_label { 3 } else { 0 }; let layer = PrometheusLayer::builder() diff --git a/src/object-store/src/layers/lru_cache.rs b/src/object-store/src/layers/lru_cache.rs index 197a222162be..95e9349452cf 100644 --- a/src/object-store/src/layers/lru_cache.rs +++ b/src/object-store/src/layers/lru_cache.rs @@ -117,9 +117,7 @@ impl LayeredAccess for LruCacheAccess { async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { let result = self.inner.write(path, args).await; - self.read_cache - .invalidate_entries_with_prefix(format!("{:x}", md5::compute(path))) - .await; + self.read_cache.invalidate_entries_with_prefix(path); result } @@ -127,9 +125,7 @@ impl LayeredAccess for LruCacheAccess { async fn delete(&self, path: &str, args: OpDelete) -> Result { let result = self.inner.delete(path, args).await; - self.read_cache - .invalidate_entries_with_prefix(format!("{:x}", md5::compute(path))) - .await; + self.read_cache.invalidate_entries_with_prefix(path); result } @@ -146,8 +142,7 @@ impl LayeredAccess for LruCacheAccess { fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { let result = self.inner.blocking_write(path, args); - self.read_cache - .blocking_invalidate_entries_with_prefix(format!("{:x}", md5::compute(path))); + self.read_cache.invalidate_entries_with_prefix(path); result } diff --git a/src/object-store/src/layers/lru_cache/read_cache.rs b/src/object-store/src/layers/lru_cache/read_cache.rs index 874b17280d9c..1e3cf61615f5 100644 --- a/src/object-store/src/layers/lru_cache/read_cache.rs +++ b/src/object-store/src/layers/lru_cache/read_cache.rs @@ -20,7 +20,7 @@ use moka::future::Cache; use moka::notification::ListenerFuture; use opendal::raw::oio::{Read, Reader, Write}; use opendal::raw::{Access, OpDelete, OpRead, OpStat, OpWrite, RpRead}; -use opendal::{Error as OpendalError, ErrorKind, Metakey, OperatorBuilder, Result}; +use opendal::{EntryMode, Error as OpendalError, ErrorKind, Metakey, OperatorBuilder, Result}; use crate::metrics::{ OBJECT_STORE_LRU_CACHE_BYTES, OBJECT_STORE_LRU_CACHE_ENTRIES, OBJECT_STORE_LRU_CACHE_HIT, @@ -28,6 +28,10 @@ use crate::metrics::{ }; const RECOVER_CACHE_LIST_CONCURRENT: usize = 8; +/// Subdirectory of cached files for read. +/// +/// This must contain three layers, corresponding to [`build_prometheus_metrics_layer`](object_store::layers::build_prometheus_metrics_layer). +const READ_CACHE_DIR: &str = "cache/object/read"; /// Cache value for read file #[derive(Debug, Clone, PartialEq, Eq, Copy)] @@ -56,12 +60,20 @@ fn can_cache(path: &str) -> bool { /// Generate a unique cache key for the read path and range. fn read_cache_key(path: &str, args: &OpRead) -> String { format!( - "{:x}.cache-{}", + "{READ_CACHE_DIR}/{:x}.cache-{}", md5::compute(path), args.range().to_header() ) } +fn read_cache_root() -> String { + format!("/{READ_CACHE_DIR}/") +} + +fn read_cache_key_prefix(path: &str) -> String { + format!("{READ_CACHE_DIR}/{:x}", md5::compute(path)) +} + /// Local read cache for files in object storage #[derive(Debug)] pub(crate) struct ReadCache { @@ -125,16 +137,9 @@ impl ReadCache { (self.mem_cache.entry_count(), self.mem_cache.weighted_size()) } - /// Invalidate all cache items which key starts with `prefix`. - pub(crate) async fn invalidate_entries_with_prefix(&self, prefix: String) { - // Safety: always ok when building cache with `support_invalidation_closures`. - self.mem_cache - .invalidate_entries_if(move |k: &String, &_v| k.starts_with(&prefix)) - .ok(); - } - - /// Blocking version of `invalidate_entries_with_prefix`. - pub(crate) fn blocking_invalidate_entries_with_prefix(&self, prefix: String) { + /// Invalidate all cache items belong to the specific path. + pub(crate) fn invalidate_entries_with_prefix(&self, path: &str) { + let prefix = read_cache_key_prefix(path); // Safety: always ok when building cache with `support_invalidation_closures`. self.mem_cache .invalidate_entries_if(move |k: &String, &_v| k.starts_with(&prefix)) @@ -145,8 +150,9 @@ impl ReadCache { /// Return entry count and total approximate entry size in bytes. pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> { let op = OperatorBuilder::new(self.file_cache.clone()).finish(); + let root = read_cache_root(); let mut entries = op - .list_with("/") + .list_with(&root) .metakey(Metakey::ContentLength | Metakey::ContentType) .concurrent(RECOVER_CACHE_LIST_CONCURRENT) .await?; @@ -157,7 +163,7 @@ impl ReadCache { OBJECT_STORE_LRU_CACHE_ENTRIES.inc(); OBJECT_STORE_LRU_CACHE_BYTES.add(size as i64); // ignore root path - if entry.path() != "/" { + if entry.metadata().mode() == EntryMode::FILE { self.mem_cache .insert(read_key.to_string(), ReadResult::Success(size as u32)) .await; diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index 7e81b965fbed..d34fb57ab7d8 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -27,6 +27,9 @@ use opendal::raw::{Access, OpList, OpRead}; use opendal::services::{Azblob, Gcs, Oss}; use opendal::{EntryMode, OperatorBuilder}; +/// Duplicate of the constant in `src/layers/lru_cache/read_cache.rs` +const READ_CACHE_DIR: &str = "cache/object/read"; + async fn test_object_crud(store: &ObjectStore) -> Result<()> { // Create object handler. // Write data info object; @@ -267,7 +270,8 @@ async fn test_file_backend_with_lru_cache() -> Result<()> { async fn assert_lru_cache(cache_layer: &LruCacheLayer, file_names: &[&str]) { for file_name in file_names { - assert!(cache_layer.contains_file(file_name).await, "{file_name}"); + let file_path = format!("{READ_CACHE_DIR}/{file_name}"); + assert!(cache_layer.contains_file(&file_path).await, "{file_path:?}"); } }