Skip to content

Commit

Permalink
Cache initial reads in VortexFormat (#1633)
Browse files Browse the repository at this point in the history
  • Loading branch information
AdamGS authored Dec 10, 2024
1 parent 8a4f4af commit 02b1ef3
Show file tree
Hide file tree
Showing 13 changed files with 255 additions and 51 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,7 @@ benchmarks/.out
# Zed
.zed/

*.plan
*.plan

# Default samply profile
profile.json.gz
100 changes: 100 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ jiff = "0.1.8"
libfuzzer-sys = "0.4"
log = "0.4.21"
mimalloc = "0.1.42"
moka = "0.12"
num-traits = "0.2.18"
num_enum = "0.7.2"
object_store = "0.11.0"
Expand Down
2 changes: 1 addition & 1 deletion bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ mimalloc = { workspace = true }
object_store = { workspace = true, features = ["aws"] }
parquet = { workspace = true, features = ["async"] }
prettytable-rs = { workspace = true }
tabled = { workspace = true, features = ["std"] }
rand = { workspace = true }
rayon = { workspace = true }
regex = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
simplelog = { workspace = true }
tabled = { workspace = true, features = ["std"] }
tar = { workspace = true }
tokio = { workspace = true, features = ["full"] }
uuid = { workspace = true, features = ["v4"] }
Expand Down
1 change: 0 additions & 1 deletion fuzz/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ rust-version = { workspace = true }
categories = { workspace = true }
readme = "README.md"


[package.metadata]
cargo-fuzz = true

Expand Down
3 changes: 3 additions & 0 deletions vortex-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ path = "src/lib.rs"
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
datafusion = { workspace = true }
datafusion-common = { workspace = true }
Expand All @@ -31,8 +32,10 @@ datafusion-physical-plan = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
moka = { workspace = true, features = ["future", "sync"] }
object_store = { workspace = true }
pin-project = { workspace = true }
tokio = { workspace = true }
vortex-array = { workspace = true }
vortex-datetime-dtype = { workspace = true }
vortex-dtype = { workspace = true }
Expand Down
65 changes: 65 additions & 0 deletions vortex-datafusion/src/persistent/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use std::sync::Arc;

use chrono::{DateTime, Utc};
use moka::future::Cache;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use vortex_error::{vortex_err, VortexError, VortexResult};
use vortex_file::{read_initial_bytes, InitialRead};
use vortex_io::ObjectStoreReadAt;

#[derive(Debug, Clone)]
pub struct InitialReadCache {
inner: Cache<Key, InitialRead>,
}

impl Default for InitialReadCache {
fn default() -> Self {
let inner = Cache::builder()
.weigher(|k: &Key, v: &InitialRead| {
(k.location.as_ref().as_bytes().len() + v.buf.len()) as u32
})
.max_capacity(256 * (2 << 20))
.eviction_listener(|k, _v, cause| {
log::trace!("Removed {} due to {:?}", k.location, cause);
})
.build();

Self { inner }
}
}

#[derive(Hash, Eq, PartialEq, Debug)]
pub struct Key {
location: Path,
m_time: DateTime<Utc>,
}

impl From<&ObjectMeta> for Key {
fn from(value: &ObjectMeta) -> Self {
Self {
location: value.location.clone(),
m_time: value.last_modified,
}
}
}

impl InitialReadCache {
pub async fn try_get(
&self,
object: &ObjectMeta,
store: Arc<dyn ObjectStore>,
) -> VortexResult<InitialRead> {
self.inner
.try_get_with(Key::from(object), async {
let os_read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone());
let initial_read = read_initial_bytes(&os_read_at, object.size as u64).await?;
VortexResult::Ok(initial_read)
})
.await
.map_err(|e: Arc<VortexError>| match Arc::try_unwrap(e) {
Ok(e) => e,
Err(e) => vortex_err!("{}", e.to_string()),
})
}
}
5 changes: 5 additions & 0 deletions vortex-datafusion/src/persistent/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use datafusion_physical_plan::{
use itertools::Itertools;
use vortex_array::Context;

use super::cache::InitialReadCache;
use crate::persistent::opener::VortexFileOpener;

#[derive(Debug, Clone)]
Expand All @@ -24,6 +25,7 @@ pub struct VortexExec {
plan_properties: PlanProperties,
projected_statistics: Statistics,
ctx: Arc<Context>,
initial_read_cache: InitialReadCache,
}

impl VortexExec {
Expand All @@ -32,6 +34,7 @@ impl VortexExec {
metrics: ExecutionPlanMetricsSet,
predicate: Option<Arc<dyn PhysicalExpr>>,
ctx: Arc<Context>,
initial_read_cache: InitialReadCache,
) -> DFResult<Self> {
let projected_schema = project_schema(
&file_scan_config.file_schema,
Expand Down Expand Up @@ -60,6 +63,7 @@ impl VortexExec {
plan_properties,
projected_statistics,
ctx,
initial_read_cache,
})
}

Expand Down Expand Up @@ -118,6 +122,7 @@ impl ExecutionPlan for VortexExec {
object_store,
projection: self.file_scan_config.projection.clone(),
predicate: self.predicate.clone(),
initial_read_cache: self.initial_read_cache.clone(),
arrow_schema,
};
let stream = FileStream::new(&self.file_scan_config, partition, opener, &self.metrics)?;
Expand Down
Loading

0 comments on commit 02b1ef3

Please sign in to comment.