Skip to content

Commit

Permalink
refactor: unify read and write cache path
Browse files Browse the repository at this point in the history
Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia committed Dec 24, 2024
1 parent e5772ad commit b367342
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 26 deletions.
8 changes: 3 additions & 5 deletions src/datanode/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use common_telemetry::{info, warn};
use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer};
use object_store::services::Fs;
use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder, OBJECT_CACHE_DIR};
use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder};
use snafu::prelude::*;

use crate::config::{HttpClientConfig, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
Expand Down Expand Up @@ -147,12 +147,10 @@ async fn build_cache_layer(
};

// Enable object cache by default
// Set the cache_path to be `${data_home}/object_cache/read/{name}` by default
// Set the cache_path to be `${data_home}/{name}` by default
// if it's not present
if cache_path.is_none() {
let object_cache_path = join_dir(data_home, OBJECT_CACHE_DIR);
let read_cache_path = join_dir(&object_cache_path, "read");
let read_cache_path = join_dir(&read_cache_path, &name.to_lowercase());
let read_cache_path = join_dir(data_home, &name.to_lowercase());
tokio::fs::create_dir_all(Path::new(&read_cache_path))
.await
.context(CreateDirSnafu {
Expand Down
6 changes: 4 additions & 2 deletions src/mito2/src/cache/file_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ use crate::sst::file::FileId;
use crate::sst::parquet::helper::fetch_byte_ranges;
use crate::sst::parquet::metadata::MetadataLoader;

/// Subdirectory of cached files.
const FILE_DIR: &str = "files/";
/// Subdirectory of cached files for write.
///
/// This must contain three layers, corresponding to [`build_prometheus_metrics_layer`](object_store::layers::build_prometheus_metrics_layer).
const FILE_DIR: &str = "greptimedb/object_cache/write/";

/// A file cache manages files on local store and evict files based
/// on size.
Expand Down
7 changes: 2 additions & 5 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ use std::time::Duration;

use common_base::readable_size::ReadableSize;
use common_telemetry::warn;
use object_store::util::join_dir;
use object_store::OBJECT_CACHE_DIR;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;

Expand Down Expand Up @@ -97,7 +95,7 @@ pub struct MitoConfig {
pub selector_result_cache_size: ReadableSize,
/// Whether to enable the experimental write cache.
pub enable_experimental_write_cache: bool,
/// File system path for write cache, defaults to `{data_home}/object_cache/write`.
/// File system path for write cache dir's root, defaults to `{data_home}`.
pub experimental_write_cache_path: String,
/// Capacity for write cache.
pub experimental_write_cache_size: ReadableSize,
Expand Down Expand Up @@ -234,8 +232,7 @@ impl MitoConfig {

// Sets write cache path if it is empty.
if self.experimental_write_cache_path.trim().is_empty() {
let object_cache_path = join_dir(data_home, OBJECT_CACHE_DIR);
self.experimental_write_cache_path = join_dir(&object_cache_path, "write");
self.experimental_write_cache_path = data_home.to_string();
}

self.index.sanitize(data_home, &self.inverted_index)?;
Expand Down
10 changes: 3 additions & 7 deletions src/object-store/src/layers/lru_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,19 +117,15 @@ impl<I: Access, C: Access> LayeredAccess for LruCacheAccess<I, C> {
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let result = self.inner.write(path, args).await;

self.read_cache
.invalidate_entries_with_prefix(format!("{:x}", md5::compute(path)))
.await;
self.read_cache.invalidate_entries_with_prefix(path).await;

result
}

async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
let result = self.inner.delete(path, args).await;

self.read_cache
.invalidate_entries_with_prefix(format!("{:x}", md5::compute(path)))
.await;
self.read_cache.invalidate_entries_with_prefix(path).await;

result
}
Expand All @@ -147,7 +143,7 @@ impl<I: Access, C: Access> LayeredAccess for LruCacheAccess<I, C> {
let result = self.inner.blocking_write(path, args);

self.read_cache
.blocking_invalidate_entries_with_prefix(format!("{:x}", md5::compute(path)));
.blocking_invalidate_entries_with_prefix(path);

result
}
Expand Down
29 changes: 22 additions & 7 deletions src/object-store/src/layers/lru_cache/read_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ use crate::metrics::{
};

const RECOVER_CACHE_LIST_CONCURRENT: usize = 8;
/// Subdirectory of cached files for read.
///
/// This must contain three layers, corresponding to [`build_prometheus_metrics_layer`](object_store::layers::build_prometheus_metrics_layer).
const READ_CACHE_DIR: &str = "greptimedb/object_cache/read";

/// Cache value for read file
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
Expand Down Expand Up @@ -56,12 +60,20 @@ fn can_cache(path: &str) -> bool {
/// Generate a unique cache key for the read path and range.
fn read_cache_key(path: &str, args: &OpRead) -> String {
format!(
"{:x}.cache-{}",
"{READ_CACHE_DIR}/{:x}.cache-{}",
md5::compute(path),
args.range().to_header()
)
}

fn read_cache_root() -> String {
format!("/{READ_CACHE_DIR}")
}

fn read_cache_key_prefix(path: &str) -> String {
format!("{READ_CACHE_DIR}/{:x}", md5::compute(path))
}

/// Local read cache for files in object storage
#[derive(Debug)]
pub(crate) struct ReadCache<C> {
Expand Down Expand Up @@ -125,16 +137,18 @@ impl<C: Access> ReadCache<C> {
(self.mem_cache.entry_count(), self.mem_cache.weighted_size())
}

/// Invalidate all cache items which key starts with `prefix`.
pub(crate) async fn invalidate_entries_with_prefix(&self, prefix: String) {
/// Invalidate all cache items belong to the specific path.
pub(crate) async fn invalidate_entries_with_prefix(&self, path: &str) {
let prefix = read_cache_key_prefix(path);
// Safety: always ok when building cache with `support_invalidation_closures`.
self.mem_cache
.invalidate_entries_if(move |k: &String, &_v| k.starts_with(&prefix))
.ok();
}

/// Blocking version of `invalidate_entries_with_prefix`.
pub(crate) fn blocking_invalidate_entries_with_prefix(&self, prefix: String) {
/// Blocking version of [`invalidate_entries_with_prefix`](Self::invalidate_entries_with_prefix).
pub(crate) fn blocking_invalidate_entries_with_prefix(&self, path: &str) {
let prefix = read_cache_key_prefix(path);
// Safety: always ok when building cache with `support_invalidation_closures`.
self.mem_cache
.invalidate_entries_if(move |k: &String, &_v| k.starts_with(&prefix))
Expand All @@ -145,8 +159,9 @@ impl<C: Access> ReadCache<C> {
/// Return entry count and total approximate entry size in bytes.
pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> {
let op = OperatorBuilder::new(self.file_cache.clone()).finish();
let root = read_cache_root();
let mut entries = op
.list_with("/")
.list_with(&root)
.metakey(Metakey::ContentLength | Metakey::ContentType)
.concurrent(RECOVER_CACHE_LIST_CONCURRENT)
.await?;
Expand All @@ -157,7 +172,7 @@ impl<C: Access> ReadCache<C> {
OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
OBJECT_STORE_LRU_CACHE_BYTES.add(size as i64);
// ignore root path
if entry.path() != "/" {
if entry.path() != &root {

Check failure on line 175 in src/object-store/src/layers/lru_cache/read_cache.rs

View workflow job for this annotation

GitHub Actions / Clippy

taken reference of right operand
self.mem_cache
.insert(read_key.to_string(), ReadResult::Success(size as u32))
.await;
Expand Down

0 comments on commit b367342

Please sign in to comment.