Skip to content

Commit

Permalink
feat: refactoring LruCacheLayer with list_with_metakey and concurrent…
Browse files Browse the repository at this point in the history
…_stat_in_list (GreptimeTeam#4596)

* use list_with_metakey and concurrent_stat_in_list

* change concurrent in recover_cache like before

* remove stat funcation

* use 8 concurrent

* use const value

* fmt code

* Apply suggestions from code review

---------

Co-authored-by: ozewr <[email protected]>
Co-authored-by: Weny Xu <[email protected]>
  • Loading branch information
3 people authored and CookiePieWw committed Sep 17, 2024
1 parent 36ec621 commit 0a36b79
Showing 1 changed file with 14 additions and 15 deletions.
29 changes: 14 additions & 15 deletions src/object-store/src/layers/lru_cache/read_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ use common_telemetry::debug;
use futures::FutureExt;
use moka::future::Cache;
use moka::notification::ListenerFuture;
use opendal::raw::oio::{List, Read, Reader, Write};
use opendal::raw::{Access, OpDelete, OpList, OpRead, OpStat, OpWrite, RpRead};
use opendal::{Error as OpendalError, ErrorKind, Result};
use opendal::raw::oio::{Read, Reader, Write};
use opendal::raw::{Access, OpDelete, OpRead, OpStat, OpWrite, RpRead};
use opendal::{Error as OpendalError, ErrorKind, Metakey, OperatorBuilder, Result};

use crate::metrics::{
OBJECT_STORE_LRU_CACHE_BYTES, OBJECT_STORE_LRU_CACHE_ENTRIES, OBJECT_STORE_LRU_CACHE_HIT,
OBJECT_STORE_LRU_CACHE_MISS, OBJECT_STORE_READ_ERROR,
};

const RECOVER_CACHE_LIST_CONCURRENT: usize = 8;

/// Cache value for read file
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
enum ReadResult {
Expand Down Expand Up @@ -142,19 +144,16 @@ impl<C: Access> ReadCache<C> {
/// Recover existing cache items from `file_cache` to `mem_cache`.
/// Return entry count and total approximate entry size in bytes.
pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> {
let (_, mut pager) = self.file_cache.list("/", OpList::default()).await?;

while let Some(entry) = pager.next().await? {
let op = OperatorBuilder::new(self.file_cache.clone()).finish();
let mut entries = op
.list_with("/")
.metakey(Metakey::ContentLength | Metakey::ContentType)
.concurrent(RECOVER_CACHE_LIST_CONCURRENT)
.await?;

while let Some(entry) = entries.pop() {
let read_key = entry.path();

// We can't retrieve the metadata from `[opendal::raw::oio::Entry]` directly,
// because it's private field.
let size = {
let stat = self.file_cache.stat(read_key, OpStat::default()).await?;

stat.into_metadata().content_length()
};

let size = entry.metadata().content_length();
OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
OBJECT_STORE_LRU_CACHE_BYTES.add(size as i64);
self.mem_cache
Expand Down

0 comments on commit 0a36b79

Please sign in to comment.