Skip to content

Commit

Permalink
feat: rename the old style files automatically
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jan 4, 2024
1 parent 746b4bc commit 8b863c1
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/object-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ opendal = { version = "0.44", features = [
"layers-tracing",
] }
prometheus.workspace = true
regex.workspace = true
snafu.workspace = true
uuid.workspace = true

Expand Down
87 changes: 81 additions & 6 deletions src/object-store/src/layers/lru_cache/read_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,27 @@
use std::sync::Arc;

use common_telemetry::logging::debug;
use common_telemetry::warn;
use futures::FutureExt;
use lazy_static::lazy_static;
use moka::future::Cache;
use moka::notification::ListenerFuture;
use opendal::raw::oio::{ListExt, Read, ReadExt, Reader, WriteExt};
use opendal::raw::{Accessor, OpDelete, OpList, OpRead, OpStat, OpWrite, RpRead};
use opendal::raw::{
Accessor, BytesRange, OpDelete, OpList, OpRead, OpRename, OpStat, OpWrite, RpRead,
};
use opendal::{Error as OpendalError, ErrorKind, Result};
use regex::Regex;

use crate::metrics::{
OBJECT_STORE_LRU_CACHE_BYTES, OBJECT_STORE_LRU_CACHE_ENTRIES, OBJECT_STORE_LRU_CACHE_HIT,
OBJECT_STORE_LRU_CACHE_MISS, OBJECT_STORE_READ_ERROR,
};

lazy_static! {
static ref OLD_CACHE_FILE_STYLE: Regex = Regex::new(r#"^(.*)-bytes=(\d+)-\s*$"#).unwrap();
}

/// Cache value for read file
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
enum ReadResult {
Expand All @@ -53,13 +62,33 @@ fn can_cache(path: &str) -> bool {

/// Generate an unique cache key for the read path and range.
fn read_cache_key(path: &str, args: &OpRead) -> String {
format!(
"{:x}.cache-{}",
md5::compute(path),
args.range().to_header()
read_cache_key_inner(
&format!("{:x}.cache", md5::compute(path)),
&args.range().to_header(),
)
}

fn read_cache_key_inner(prefix: &str, range: &str) -> String {
format!("{}-{}", prefix, range,)
}

#[derive(Debug, PartialEq, Eq)]
struct OldStyleCacheKey {
start: u64,
prefix: String,
}

/// Returns the [ParseCacheKeyResult] only if parse succeeds.
fn parse_cache_key(key: &str) -> Option<OldStyleCacheKey> {
if let Some(result) = OLD_CACHE_FILE_STYLE.captures(key) {
if let Ok(start) = result[2].parse::<u64>() {
let prefix = result[1].to_string();
return Some(OldStyleCacheKey { start, prefix });
}
}
None
}

/// Local read cache for files in object storage
#[derive(Clone, Debug)]
pub(crate) struct ReadCache<C: Clone> {
Expand Down Expand Up @@ -130,6 +159,24 @@ impl<C: Accessor + Clone> ReadCache<C> {
.ok();
}

/// Renames the old style cache file to new style.
async fn rename_cache_file(&self, read_key: &str, size: u64) -> Option<String> {
if let Some(OldStyleCacheKey { start, prefix }) = parse_cache_key(read_key) {
let range = BytesRange::new(Some(start), Some(size));
let new_read_key = read_cache_key_inner(&prefix, &range.to_header());
if let Err(err) = self
.file_cache
.rename(read_key, &new_read_key, OpRename::default())
.await
{
warn!(err; "Failed to rename old style cache: {read_key} to {new_read_key}");
}
return Some(new_read_key);
};

None
}

/// Recover existing cache items from `file_cache` to `mem_cache`.
/// Return entry count and total approximate entry size in bytes.
pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> {
Expand All @@ -146,10 +193,17 @@ impl<C: Accessor + Clone> ReadCache<C> {
stat.into_metadata().content_length()
};

let read_key = if let Some(new_read_key) = self.rename_cache_file(read_key, size).await
{
new_read_key
} else {
read_key.to_string()
};

OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
OBJECT_STORE_LRU_CACHE_BYTES.add(size as i64);
self.mem_cache
.insert(read_key.to_string(), ReadResult::Success(size as u32))
.insert(read_key, ReadResult::Success(size as u32))
.await;
}

Expand Down Expand Up @@ -300,4 +354,25 @@ mod tests {
assert!(!can_cache("test/__last_checkpoint"));
assert!(!can_cache("a/b/c/__last_checkpoint"));
}

#[test]
fn test_parse_cache_key() {
assert_eq!(
parse_cache_key("6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=7-").unwrap(),
OldStyleCacheKey {
start: 7,
prefix: "6d29752bdc6e4d5ba5483b96615d6c48.cache".to_string()
},
);
assert!(parse_cache_key("6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14").is_none());
assert!(parse_cache_key("6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=").is_none());
assert!(parse_cache_key("6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=x-y").is_none());
assert!(
parse_cache_key("6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=9-14.afjoadnda")
.is_none()
);
assert!(
parse_cache_key("6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=9-.afjoadnda").is_none()
);
}
}
136 changes: 136 additions & 0 deletions src/object-store/tests/object_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,28 @@ async fn assert_cache_files(
Ok(())
}

async fn rename_cache_files(
store: &Operator,
file_names: &[&str],
new_file_names: &[&str],
) -> Result<()> {
let objects = store.list("/").await?;

// compare the cache file with the expected cache file; ignore orders
for o in objects {
let position = file_names.iter().position(|&x| x == o.name());
assert!(position.is_some(), "file not found: {}", o.name());

let position = position.unwrap();
store
.rename(o.path(), new_file_names[position])
.await
.unwrap();
}

Ok(())
}

#[tokio::test]
async fn test_object_store_cache_policy() -> Result<()> {
common_telemetry::init_default_ut_logging();
Expand Down Expand Up @@ -458,3 +480,117 @@ async fn test_object_store_cache_policy() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_object_store_cache_with_rename() -> Result<()> {
common_telemetry::init_default_ut_logging();
// create file storage
let root_dir = create_temp_dir("test_object_store_cache_policy");
let store = OperatorBuilder::new(
Fs::default()
.root(&root_dir.path().to_string_lossy())
.atomic_write_dir(&root_dir.path().to_string_lossy())
.build()
.unwrap(),
)
.finish();

// create file cache layer
let cache_dir = create_temp_dir("test_object_store_cache_policy_cache");
let atomic_temp_dir = create_temp_dir("test_object_store_cache_policy_cache_tmp");
let mut builder = Fs::default();
let _ = builder
.root(&cache_dir.path().to_string_lossy())
.atomic_write_dir(&atomic_temp_dir.path().to_string_lossy());
let file_cache = Arc::new(builder.build().unwrap());
let cache_store = OperatorBuilder::new(file_cache.clone()).finish();

// create operator for cache dir to verify cache file
let cache_layer = LruCacheLayer::new(Arc::new(file_cache.clone()), 38)
.await
.unwrap();
let store = store.layer(cache_layer.clone());

// create several object handler.
// write data into object;
let p1 = "test_file1";
let p2 = "test_file2";
store.write(p1, "Hello, object1!").await.unwrap();
store.write(p2, "Hello, object2!").await.unwrap();

// Try to read p1 and p2
let _ = store.read_with(p1).range(0..).await?;
let _ = store.read(p1).await?;
let _ = store.read_with(p2).range(0..).await?;
let _ = store.read_with(p2).range(7..).await?;
let _ = store.read(p2).await?;

assert_eq!(cache_layer.read_cache_stat().await, (3, 38));
assert_cache_files(
&cache_store,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-14",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-14",
],
&["Hello, object1!", "object2!", "Hello, object2!"],
)
.await?;
assert_lru_cache(
&cache_layer,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-14",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-14",
],
)
.await;
rename_cache_files(
&cache_store,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-14",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-14",
],
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-",
],
)
.await
.unwrap();

// Rebuilds
let mut builder = Fs::default();
let _ = builder
.root(&cache_dir.path().to_string_lossy())
.atomic_write_dir(&atomic_temp_dir.path().to_string_lossy());
let file_cache = Arc::new(builder.build().unwrap());
let cache_store = OperatorBuilder::new(file_cache.clone()).finish();

// create operator for cache dir to verify cache file
let cache_layer = LruCacheLayer::new(Arc::new(file_cache.clone()), 38)
.await
.unwrap();
assert_cache_files(
&cache_store,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-14",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-14",
],
&["Hello, object1!", "object2!", "Hello, object2!"],
)
.await?;
assert_lru_cache(
&cache_layer,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-14",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-14",
],
)
.await;
Ok(())
}

0 comments on commit 8b863c1

Please sign in to comment.