From 02b1ef3b8c168e3e5a468d398f8830b13cea5e17 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 10 Dec 2024 17:57:29 +0000 Subject: [PATCH] Cache initial reads in `VortexFormat` (#1633) --- .gitignore | 5 +- Cargo.lock | 100 ++++++++++++++++++ Cargo.toml | 1 + bench-vortex/Cargo.toml | 2 +- fuzz/Cargo.toml | 1 - vortex-datafusion/Cargo.toml | 3 + vortex-datafusion/src/persistent/cache.rs | 65 ++++++++++++ vortex-datafusion/src/persistent/execution.rs | 5 + vortex-datafusion/src/persistent/format.rs | 29 +++-- vortex-datafusion/src/persistent/mod.rs | 1 + vortex-datafusion/src/persistent/opener.rs | 68 +++++++----- vortex-file/src/read/builder/initial_read.rs | 13 +-- vortex-file/src/read/builder/mod.rs | 13 ++- 13 files changed, 255 insertions(+), 51 deletions(-) create mode 100644 vortex-datafusion/src/persistent/cache.rs diff --git a/.gitignore b/.gitignore index 8a22636953..4311a05e04 100644 --- a/.gitignore +++ b/.gitignore @@ -204,4 +204,7 @@ benchmarks/.out # Zed .zed/ -*.plan \ No newline at end of file +*.plan + +# Default samply profile +profile.json.gz diff --git a/Cargo.lock b/Cargo.lock index 787ee4b57a..0f89976411 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -391,6 +391,17 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "async-lock" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" +dependencies = [ + "event-listener", + "event-listener-strategy", + "pin-project-lite", +] + [[package]] name = "async-task" version = "4.7.1" @@ -1707,6 +1718,27 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "event-listener" +version = "5.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f214dc438f977e6d4e3500aaa277f5ad94ca83fbbd9b1a15713ce2344ccc5a1" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "fastlanes" version = "0.1.8" @@ -2703,6 +2735,30 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "moka" +version = "0.12.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32cf62eb4dd975d2dde76432fb1075c49e3ee2331cf36f1f8fd4b66550d32b6f" +dependencies = [ + "async-lock", + "async-trait", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "event-listener", + "futures-util", + "once_cell", + "parking_lot", + "quanta", + "rustc_version", + "smallvec", + "tagptr", + "thiserror 1.0.69", + "triomphe", + "uuid", +] + [[package]] name = "multimap" version = "0.10.0" @@ -3026,6 +3082,12 @@ version = "1.5.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fecab3723493c7851f292cb060f3ee1c42f19b8d749345d0d7eaf3fd19aa62d" +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.3" @@ -3451,6 +3513,21 @@ dependencies = [ "vortex", ] +[[package]] +name = "quanta" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quick-xml" version = "0.36.2" @@ -3552,6 +3629,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "raw-cpuid" +version = "11.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ab240315c661615f2ee9f0f2cd32d5a7343a84d5ebcccb99d46e6637565e7b0" +dependencies = [ + "bitflags 2.6.0", +] + [[package]] name = "rayon" version = "1.10.0" @@ -4216,6 +4302,12 @@ dependencies = [ "papergrid", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tar" version = "0.4.43" @@ -4531,6 +4623,12 @@ dependencies = [ "once_cell", ] +[[package]] +name = "triomphe" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "859eb650cfee7434994602c3a68b25d77ad9e68c8a6cd491616ef86661382eb3" + [[package]] name = "try-lock" version = "0.2.5" @@ -4758,6 +4856,7 @@ dependencies = [ "arrow-array", "arrow-schema", "async-trait", + "bytes", "chrono", "datafusion", "datafusion-common", @@ -4768,6 +4867,7 @@ dependencies = [ "futures", "itertools 0.13.0", "log", + "moka", "object_store", "pin-project", "tempfile", diff --git a/Cargo.toml b/Cargo.toml index 74407ee34f..2e9b226ae0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -95,6 +95,7 @@ jiff = "0.1.8" libfuzzer-sys = "0.4" log = "0.4.21" mimalloc = "0.1.42" +moka = "0.12" num-traits = "0.2.18" num_enum = "0.7.2" object_store = "0.11.0" diff --git a/bench-vortex/Cargo.toml b/bench-vortex/Cargo.toml index f90b838868..ec0630dac4 100644 --- a/bench-vortex/Cargo.toml +++ b/bench-vortex/Cargo.toml @@ -46,7 +46,6 @@ mimalloc = { workspace = true } object_store = { workspace = true, features = ["aws"] } parquet = { workspace = true, features = ["async"] } prettytable-rs = { workspace = true } -tabled = { workspace = true, features = ["std"] } rand = { workspace = true } rayon = { workspace = true } regex = { workspace = true } @@ -54,6 +53,7 @@ reqwest = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } simplelog = { workspace = true } +tabled = { workspace = true, features = ["std"] } tar = { workspace = true } tokio = { workspace = true, features = ["full"] } uuid = { workspace = true, features = ["v4"] } diff --git a/fuzz/Cargo.toml b/fuzz/Cargo.toml index 51a4fe0823..4460e73d9b 100644 --- a/fuzz/Cargo.toml +++ b/fuzz/Cargo.toml @@ -14,7 +14,6 @@ rust-version = { workspace = true } categories = { workspace = true } readme = "README.md" - [package.metadata] cargo-fuzz = true diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index d41d18a61b..cd3641d90e 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -21,6 +21,7 @@ path = "src/lib.rs" arrow-array = { workspace = true } arrow-schema = { workspace = true } async-trait = { workspace = true } +bytes = { workspace = true } chrono = { workspace = true } datafusion = { workspace = true } datafusion-common = { workspace = true } @@ -31,8 +32,10 @@ datafusion-physical-plan = { workspace = true } futures = { workspace = true } itertools = { workspace = true } log = { workspace = true } +moka = { workspace = true, features = ["future", "sync"] } object_store = { workspace = true } pin-project = { workspace = true } +tokio = { workspace = true } vortex-array = { workspace = true } vortex-datetime-dtype = { workspace = true } vortex-dtype = { workspace = true } diff --git a/vortex-datafusion/src/persistent/cache.rs b/vortex-datafusion/src/persistent/cache.rs new file mode 100644 index 0000000000..55f959f678 --- /dev/null +++ b/vortex-datafusion/src/persistent/cache.rs @@ -0,0 +1,65 @@ +use std::sync::Arc; + +use chrono::{DateTime, Utc}; +use moka::future::Cache; +use object_store::path::Path; +use object_store::{ObjectMeta, ObjectStore}; +use vortex_error::{vortex_err, VortexError, VortexResult}; +use vortex_file::{read_initial_bytes, InitialRead}; +use vortex_io::ObjectStoreReadAt; + +#[derive(Debug, Clone)] +pub struct InitialReadCache { + inner: Cache, +} + +impl Default for InitialReadCache { + fn default() -> Self { + let inner = Cache::builder() + .weigher(|k: &Key, v: &InitialRead| { + (k.location.as_ref().as_bytes().len() + v.buf.len()) as u32 + }) + .max_capacity(256 * (2 << 20)) + .eviction_listener(|k, _v, cause| { + log::trace!("Removed {} due to {:?}", k.location, cause); + }) + .build(); + + Self { inner } + } +} + +#[derive(Hash, Eq, PartialEq, Debug)] +pub struct Key { + location: Path, + m_time: DateTime, +} + +impl From<&ObjectMeta> for Key { + fn from(value: &ObjectMeta) -> Self { + Self { + location: value.location.clone(), + m_time: value.last_modified, + } + } +} + +impl InitialReadCache { + pub async fn try_get( + &self, + object: &ObjectMeta, + store: Arc, + ) -> VortexResult { + self.inner + .try_get_with(Key::from(object), async { + let os_read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone()); + let initial_read = read_initial_bytes(&os_read_at, object.size as u64).await?; + VortexResult::Ok(initial_read) + }) + .await + .map_err(|e: Arc| match Arc::try_unwrap(e) { + Ok(e) => e, + Err(e) => vortex_err!("{}", e.to_string()), + }) + } +} diff --git a/vortex-datafusion/src/persistent/execution.rs b/vortex-datafusion/src/persistent/execution.rs index 8298f89271..073d21686b 100644 --- a/vortex-datafusion/src/persistent/execution.rs +++ b/vortex-datafusion/src/persistent/execution.rs @@ -14,6 +14,7 @@ use datafusion_physical_plan::{ use itertools::Itertools; use vortex_array::Context; +use super::cache::InitialReadCache; use crate::persistent::opener::VortexFileOpener; #[derive(Debug, Clone)] @@ -24,6 +25,7 @@ pub struct VortexExec { plan_properties: PlanProperties, projected_statistics: Statistics, ctx: Arc, + initial_read_cache: InitialReadCache, } impl VortexExec { @@ -32,6 +34,7 @@ impl VortexExec { metrics: ExecutionPlanMetricsSet, predicate: Option>, ctx: Arc, + initial_read_cache: InitialReadCache, ) -> DFResult { let projected_schema = project_schema( &file_scan_config.file_schema, @@ -60,6 +63,7 @@ impl VortexExec { plan_properties, projected_statistics, ctx, + initial_read_cache, }) } @@ -118,6 +122,7 @@ impl ExecutionPlan for VortexExec { object_store, projection: self.file_scan_config.projection.clone(), predicate: self.predicate.clone(), + initial_read_cache: self.initial_read_cache.clone(), arrow_schema, }; let stream = FileStream::new(&self.file_scan_config, partition, opener, &self.metrics)?; diff --git a/vortex-datafusion/src/persistent/format.rs b/vortex-datafusion/src/persistent/format.rs index e22ecc2147..d46c6b161a 100644 --- a/vortex-datafusion/src/persistent/format.rs +++ b/vortex-datafusion/src/persistent/format.rs @@ -22,11 +22,12 @@ use vortex_array::arrow::infer_schema; use vortex_array::Context; use vortex_file::metadata::fetch_metadata; use vortex_file::{ - read_initial_bytes, LayoutContext, LayoutDeserializer, LayoutMessageCache, RelativeLayoutCache, - Scan, VORTEX_FILE_EXTENSION, + LayoutContext, LayoutDeserializer, LayoutMessageCache, RelativeLayoutCache, Scan, + VORTEX_FILE_EXTENSION, }; use vortex_io::{IoDispatcher, ObjectStoreReadAt}; +use super::cache::InitialReadCache; use super::execution::VortexExec; use super::statistics::{array_to_col_statistics, uncompressed_col_size}; use crate::can_be_pushed_down; @@ -34,12 +35,14 @@ use crate::can_be_pushed_down; #[derive(Debug, Default)] pub struct VortexFormat { context: Arc, + initial_read_cache: InitialReadCache, } impl VortexFormat { pub fn new(context: &Context) -> Self { Self { context: Arc::new(context.clone()), + initial_read_cache: InitialReadCache::default(), } } } @@ -74,8 +77,8 @@ impl FileFormat for VortexFormat { ) -> DFResult { let mut file_schemas = Vec::default(); for o in objects { - let os_read_at = ObjectStoreReadAt::new(store.clone(), o.location.clone()); - let initial_read = read_initial_bytes(&os_read_at, o.size as u64).await?; + let initial_read = self.initial_read_cache.try_get(o, store.clone()).await?; + let lazy_dtype = initial_read.lazy_dtype(); let s = infer_schema(lazy_dtype.value()?)?; file_schemas.push(s); @@ -93,8 +96,11 @@ impl FileFormat for VortexFormat { table_schema: SchemaRef, object: &ObjectMeta, ) -> DFResult { - let os_read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone()); - let initial_read = read_initial_bytes(&os_read_at, object.size as u64).await?; + let initial_read = self + .initial_read_cache + .try_get(object, store.clone()) + .await?; + let layout = initial_read.fb_layout(); let row_count = layout.row_count(); @@ -112,18 +118,18 @@ impl FileFormat for VortexFormat { relative_message_cache, )?; + let os_read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone()); let io = IoDispatcher::default(); let mut stats = Statistics::new_unknown(&table_schema); stats.num_rows = Precision::Exact(row_count as usize); - let metadata_table = - fetch_metadata(os_read_at, io.into(), root_layout, layout_message_cache).await?; - - if let Some(metadata) = metadata_table { + if let Some(metadata_table) = + fetch_metadata(os_read_at, io.into(), root_layout, layout_message_cache).await? + { let mut column_statistics = Vec::with_capacity(table_schema.fields().len()); let mut total_size = 0_u64; - for col_stats in metadata.into_iter() { + for col_stats in metadata_table.into_iter() { let col_stats = match col_stats { Some(array) => { let col_metadata_array = StructArray::try_from(array)?; @@ -157,6 +163,7 @@ impl FileFormat for VortexFormat { metrics, filters.cloned(), self.context.clone(), + self.initial_read_cache.clone(), )? .into_arc(); diff --git a/vortex-datafusion/src/persistent/mod.rs b/vortex-datafusion/src/persistent/mod.rs index da33dd239c..a6bfa18fae 100644 --- a/vortex-datafusion/src/persistent/mod.rs +++ b/vortex-datafusion/src/persistent/mod.rs @@ -1,3 +1,4 @@ +mod cache; pub mod config; pub mod execution; pub mod format; diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 27dc1b3cd7..cb0d433375 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -12,52 +12,62 @@ use vortex_expr::datafusion::convert_expr_to_vortex; use vortex_file::{LayoutContext, LayoutDeserializer, Projection, RowFilter, VortexReadBuilder}; use vortex_io::{IoDispatcher, ObjectStoreReadAt}; +use super::cache::InitialReadCache; + /// Share an IO dispatcher across all DataFusion instances. static IO_DISPATCHER: LazyLock> = LazyLock::new(|| Arc::new(IoDispatcher::default())); +#[derive(Clone)] pub struct VortexFileOpener { pub ctx: Arc, pub object_store: Arc, pub projection: Option>, pub predicate: Option>, pub arrow_schema: SchemaRef, + pub(crate) initial_read_cache: InitialReadCache, } impl FileOpener for VortexFileOpener { fn open(&self, file_meta: FileMeta) -> DFResult { - let read_at = - ObjectStoreReadAt::new(self.object_store.clone(), file_meta.location().clone()); + let this = self.clone(); + let f = async move { + let read_at = + ObjectStoreReadAt::new(this.object_store.clone(), file_meta.location().clone()); + let initial_read = this + .initial_read_cache + .try_get(&file_meta.object_meta, this.object_store.clone()) + .await?; - let mut builder = VortexReadBuilder::new( - read_at, - LayoutDeserializer::new(self.ctx.clone(), Arc::new(LayoutContext::default())), - ) - .with_io_dispatcher(IO_DISPATCHER.clone()) - .with_file_size(file_meta.object_meta.size as u64); + let mut builder = VortexReadBuilder::new( + read_at, + LayoutDeserializer::new(this.ctx.clone(), Arc::new(LayoutContext::default())), + ) + .with_io_dispatcher(IO_DISPATCHER.clone()) + .with_file_size(file_meta.object_meta.size as u64) + .with_initial_read(initial_read); - // We split the predicate and filter out the conjunction members that we can't push down - let row_filter = self - .predicate - .as_ref() - .map(|filter_expr| { - split_conjunction(filter_expr) - .into_iter() - .filter_map(|e| convert_expr_to_vortex(e.clone()).ok()) - .collect::>() - }) - .filter(|conjunction| !conjunction.is_empty()) - .map(RowFilter::from_conjunction); + // We split the predicate and filter out the conjunction members that we can't push down + let row_filter = this + .predicate + .as_ref() + .map(|filter_expr| { + split_conjunction(filter_expr) + .into_iter() + .filter_map(|e| convert_expr_to_vortex(e.clone()).ok()) + .collect::>() + }) + .filter(|conjunction| !conjunction.is_empty()) + .map(RowFilter::from_conjunction); - if let Some(row_filter) = row_filter { - builder = builder.with_row_filter(row_filter); - } + if let Some(row_filter) = row_filter { + builder = builder.with_row_filter(row_filter); + } - if let Some(projection) = self.projection.as_ref() { - builder = builder.with_projection(Projection::new(projection)); - } + if let Some(projection) = this.projection.as_ref() { + builder = builder.with_projection(Projection::new(projection)); + } - Ok(async { Ok(Box::pin( builder .build() @@ -67,6 +77,8 @@ impl FileOpener for VortexFileOpener { .map_err(|e| e.into()), ) as _) } - .boxed()) + .boxed(); + + Ok(f) } } diff --git a/vortex-file/src/read/builder/initial_read.rs b/vortex-file/src/read/builder/initial_read.rs index 58eab07ce2..1c2fe4363f 100644 --- a/vortex-file/src/read/builder/initial_read.rs +++ b/vortex-file/src/read/builder/initial_read.rs @@ -2,13 +2,13 @@ use core::ops::Range; use bytes::Bytes; use flatbuffers::{root, root_unchecked}; -use vortex_error::{vortex_bail, vortex_err, VortexResult}; +use vortex_error::{vortex_bail, vortex_err, VortexResult, VortexUnwrap}; use vortex_flatbuffers::{footer, message}; use vortex_io::VortexReadAt; use crate::{LazyDType, EOF_SIZE, INITIAL_READ_SIZE, MAGIC_BYTES, VERSION}; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct InitialRead { /// The bytes from the initial read of the file, which is assumed (for now) to be sufficiently /// large to contain the schema and layout. @@ -68,7 +68,7 @@ pub async fn read_initial_bytes( let read_size = INITIAL_READ_SIZE.min(file_size as usize); let initial_read_offset = file_size - read_size as u64; - let buf = read + let buf: Bytes = read .read_byte_range(initial_read_offset, read_size as u64) .await?; @@ -89,11 +89,8 @@ pub async fn read_initial_bytes( } // The footer MUST fit in the initial read. - let ps_size = u16::from_le_bytes( - buf[eof_loc + 2..eof_loc + 4] - .try_into() - .map_err(|e| vortex_err!("Footer size was not a u16 {e}"))?, - ) as usize; + let ps_size = + u16::from_le_bytes(buf[eof_loc + 2..eof_loc + 4].try_into().vortex_unwrap()) as usize; if ps_size > eof_loc { vortex_bail!( "Malformed file, postscript of size {} is too large to fit in initial read of size {} (file size {})", diff --git a/vortex-file/src/read/builder/mod.rs b/vortex-file/src/read/builder/mod.rs index 9d7ee83c05..73bf33dc63 100644 --- a/vortex-file/src/read/builder/mod.rs +++ b/vortex-file/src/read/builder/mod.rs @@ -6,6 +6,7 @@ use vortex_error::VortexResult; use vortex_expr::Select; use vortex_io::{IoDispatcher, VortexReadAt}; +use super::InitialRead; use crate::read::cache::{LayoutMessageCache, RelativeLayoutCache}; use crate::read::context::LayoutDeserializer; use crate::read::filtering::RowFilter; @@ -69,6 +70,7 @@ pub struct VortexReadBuilder { row_mask: Option, row_filter: Option, io_dispatcher: Option>, + initial_read: Option, } impl VortexReadBuilder { @@ -81,6 +83,7 @@ impl VortexReadBuilder { row_mask: None, row_filter: None, io_dispatcher: None, + initial_read: None, } } @@ -114,9 +117,17 @@ impl VortexReadBuilder { self } + pub fn with_initial_read(mut self, initial_read: InitialRead) -> Self { + self.initial_read = Some(initial_read); + self + } + pub async fn build(self) -> VortexResult> { // we do a large enough initial read to get footer, layout, and schema - let initial_read = read_initial_bytes(&self.read_at, self.file_size().await?).await?; + let initial_read = match self.initial_read { + Some(r) => r, + None => read_initial_bytes(&self.read_at, self.file_size().await?).await?, + }; let layout = initial_read.fb_layout();