From 2f59d3ca144ecfebb5b90a8b4c14b08176598424 Mon Sep 17 00:00:00 2001 From: Yanxin Xiang Date: Tue, 16 Apr 2024 18:33:54 -0500 Subject: [PATCH] init --- datafusion/execution/Cargo.toml | 1 + .../execution/src/cache/cache_manager.rs | 15 +++- datafusion/execution/src/cache/cache_unit.rs | 90 ++++++++++++++++++- 3 files changed, 102 insertions(+), 4 deletions(-) diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index 84c878bf10dc..df81522ad654 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -46,3 +46,4 @@ parking_lot = { workspace = true } rand = { workspace = true } tempfile = { workspace = true } url = { workspace = true } +sequence_trie = "*" \ No newline at end of file diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index 97529263688b..e5aa833dd0f7 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -15,13 +15,15 @@ // specific language governing permissions and limitations // under the License. +use super::cache_unit::{self, DefaultFileStatisticsCache}; use crate::cache::CacheAccessor; use datafusion_common::{Result, Statistics}; use object_store::path::Path; use object_store::ObjectMeta; +use parking_lot::Mutex; +use parking_lot::RwLock; use std::fmt::{Debug, Formatter}; use std::sync::Arc; - /// The cache of listing files statistics. /// if set [`CacheManagerConfig::with_files_statistics_cache`] /// Will avoid infer same file statistics repeatedly during the session lifetime, @@ -73,7 +75,7 @@ impl CacheManager { } } -#[derive(Clone, Default)] +#[derive(Clone)] pub struct CacheManagerConfig { /// Enable cache of files statistics when listing files. /// Avoid get same file statistics repeatedly in same datafusion session. @@ -87,7 +89,14 @@ pub struct CacheManagerConfig { /// Default is disable. pub list_files_cache: Option, } - +impl Default for CacheManagerConfig { + fn default() -> Self { + Self { + table_files_statistics_cache: None, + list_files_cache: Some(Arc::new(cache_unit::TrieFileCache::default())), + } + } +} impl CacheManagerConfig { pub fn with_files_statistics_cache( mut self, diff --git a/datafusion/execution/src/cache/cache_unit.rs b/datafusion/execution/src/cache/cache_unit.rs index 25f9b9fa4d68..1348b9798fcf 100644 --- a/datafusion/execution/src/cache/cache_unit.rs +++ b/datafusion/execution/src/cache/cache_unit.rs @@ -15,10 +15,12 @@ // specific language governing permissions and limitations // under the License. +use core::panic; use std::sync::Arc; use crate::cache::CacheAccessor; +use datafusion_common::tree_node::TreeNode; use datafusion_common::Statistics; use dashmap::DashMap; @@ -156,10 +158,74 @@ impl CacheAccessor>> for DefaultListFilesCache { "DefaultListFilesCache".to_string() } } +use parking_lot::RwLock; +use sequence_trie::SequenceTrie; +#[derive(Default)] +pub struct TrieFileCache { + stores: RwLock>>>, +} +impl TrieFileCache { + fn path_to_char_seq(path: &Path) -> Vec { + path.filename().unwrap_or_default().chars().collect() + } +} + +impl CacheAccessor>> for TrieFileCache { + type Extra = ObjectMeta; + + fn get_with_extra(&self, k: &Path, e: &Self::Extra) -> Option>> { + panic!("Not supported DefaultListFilesCache put_with_extra") + } + fn get(&self, k: &Path) -> Option>> { + let key_seq = Self::path_to_char_seq(k); + self.stores.read().get(&key_seq).map(|x| x.clone()) + } + fn put( + &self, + key: &Path, + value: Arc>, + ) -> Option>> { + let key_seq = Self::path_to_char_seq(key); + self.stores.write().insert(&key_seq, value) + } + + fn put_with_extra( + &self, + _key: &Path, + _value: Arc>, + _e: &Self::Extra, + ) -> Option>> { + panic!("Not supported TrieFileCache put_with_extra") + } + + fn remove(&mut self, key: &Path) -> Option>> { + let key_seq = Self::path_to_char_seq(key); + self.stores.write().remove(&key_seq); + Some(Arc::new(Default::default())) + } + + fn contains_key(&self, k: &Path) -> bool { + let key_seq = Self::path_to_char_seq(k); + self.stores.read().get(&key_seq).is_some() + } + fn len(&self) -> usize { + panic!("did not implement len") + } + + fn clear(&self) { + self.stores.write().children().clear() + } + + fn name(&self) -> String { + "TrieFileCache".to_string() + } +} #[cfg(test)] mod tests { - use crate::cache::cache_unit::{DefaultFileStatisticsCache, DefaultListFilesCache}; + use crate::cache::cache_unit::{ + DefaultFileStatisticsCache, DefaultListFilesCache, TrieFileCache, + }; use crate::cache::CacheAccessor; use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use chrono::DateTime; @@ -232,4 +298,26 @@ mod tests { meta.clone() ); } + + #[test] + fn test_trie_cache() { + let meta = ObjectMeta { + location: Path::from("test"), + last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00") + .unwrap() + .into(), + size: 1024, + e_tag: None, + version: None, + }; + + let cache = TrieFileCache::default(); + assert!(cache.get(&meta.location).is_none()); + + cache.put(&meta.location, vec![meta.clone()].into()); + assert_eq!( + cache.get(&meta.location).unwrap().first().unwrap().clone(), + meta.clone() + ); + } }