Skip to content

Commit

Permalink
[feat] Support cache ListFiles result cache in session level (#7620)
Browse files Browse the repository at this point in the history
* support cache list files under path in session

* add test

* fix clippy

* fix clippy2

* fix comment

* Update datafusion/execution/src/cache/cache_manager.rs

Co-authored-by: Andrew Lamb <[email protected]>

* fix fmt

* fix api change

* fix api change2

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
Ted-Jiang and alamb authored Oct 7, 2023
1 parent 0809f45 commit 093b775
Show file tree
Hide file tree
Showing 7 changed files with 296 additions and 50 deletions.
16 changes: 11 additions & 5 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::{error::Result, scalar::ScalarValue};

use super::PartitionedFile;
use crate::datasource::listing::ListingTableUrl;
use crate::execution::context::SessionState;
use datafusion_common::tree_node::{TreeNode, VisitRecursion};
use datafusion_common::{Column, DFField, DFSchema, DataFusionError};
use datafusion_expr::expr::ScalarUDF;
Expand Down Expand Up @@ -315,6 +316,7 @@ async fn prune_partitions(
/// `filters` might contain expressions that can be resolved only at the
/// file level (e.g. Parquet row group pruning).
pub async fn pruned_partition_list<'a>(
ctx: &'a SessionState,
store: &'a dyn ObjectStore,
table_path: &'a ListingTableUrl,
filters: &'a [Expr],
Expand All @@ -325,7 +327,8 @@ pub async fn pruned_partition_list<'a>(
if partition_cols.is_empty() {
return Ok(Box::pin(
table_path
.list_all_files(store, file_extension)
.list_all_files(ctx, store, file_extension)
.await?
.map_ok(|object_meta| object_meta.into()),
));
}
Expand Down Expand Up @@ -422,7 +425,7 @@ mod tests {
use futures::StreamExt;

use crate::logical_expr::{case, col, lit};
use crate::test::object_store::make_test_store;
use crate::test::object_store::make_test_store_and_state;

use super::*;

Expand Down Expand Up @@ -468,12 +471,13 @@ mod tests {

#[tokio::test]
async fn test_pruned_partition_list_empty() {
let store = make_test_store(&[
let (store, state) = make_test_store_and_state(&[
("tablepath/mypartition=val1/notparquetfile", 100),
("tablepath/file.parquet", 100),
]);
let filter = Expr::eq(col("mypartition"), lit("val1"));
let pruned = pruned_partition_list(
&state,
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
&[filter],
Expand All @@ -490,13 +494,14 @@ mod tests {

#[tokio::test]
async fn test_pruned_partition_list() {
let store = make_test_store(&[
let (store, state) = make_test_store_and_state(&[
("tablepath/mypartition=val1/file.parquet", 100),
("tablepath/mypartition=val2/file.parquet", 100),
("tablepath/mypartition=val1/other=val3/file.parquet", 100),
]);
let filter = Expr::eq(col("mypartition"), lit("val1"));
let pruned = pruned_partition_list(
&state,
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
&[filter],
Expand Down Expand Up @@ -532,7 +537,7 @@ mod tests {

#[tokio::test]
async fn test_pruned_partition_list_multi() {
let store = make_test_store(&[
let (store, state) = make_test_store_and_state(&[
("tablepath/part1=p1v1/file.parquet", 100),
("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100),
("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100),
Expand All @@ -544,6 +549,7 @@ mod tests {
// filter3 cannot be resolved at partition pruning
let filter3 = Expr::eq(col("part2"), col("other"));
let pruned = pruned_partition_list(
&state,
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
&[filter1, filter2, filter3],
Expand Down
8 changes: 6 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ impl ListingTableConfig {
.table_paths
.get(0)
.unwrap()
.list_all_files(store.as_ref(), "")
.list_all_files(state, store.as_ref(), "")
.await?
.next()
.await
.ok_or_else(|| DataFusionError::Internal("No files for table".into()))??;
Expand Down Expand Up @@ -507,7 +508,8 @@ impl ListingOptions {
let store = state.runtime_env().object_store(table_path)?;

let files: Vec<_> = table_path
.list_all_files(store.as_ref(), &self.file_extension)
.list_all_files(state, store.as_ref(), &self.file_extension)
.await?
.try_collect()
.await?;

Expand Down Expand Up @@ -844,6 +846,7 @@ impl TableProvider for ListingTable {
let store = state.runtime_env().object_store(table_path)?;

let file_list_stream = pruned_partition_list(
state,
store.as_ref(),
table_path,
&[],
Expand Down Expand Up @@ -933,6 +936,7 @@ impl ListingTable {
// list files (with partitions)
let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| {
pruned_partition_list(
ctx,
store.as_ref(),
table_path,
filters,
Expand Down
34 changes: 26 additions & 8 deletions datafusion/core/src/datasource/listing/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
use std::fs;

use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::context::SessionState;
use datafusion_common::{DataFusionError, Result};
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use glob::Pattern;
use itertools::Itertools;
use log::debug;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use percent_encoding;
use std::sync::Arc;
use url::Url;

/// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`]
Expand Down Expand Up @@ -185,28 +188,43 @@ impl ListingTableUrl {
}

/// List all files identified by this [`ListingTableUrl`] for the provided `file_extension`
pub(crate) fn list_all_files<'a>(
pub(crate) async fn list_all_files<'a>(
&'a self,
ctx: &'a SessionState,
store: &'a dyn ObjectStore,
file_extension: &'a str,
) -> BoxStream<'a, Result<ObjectMeta>> {
) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
// If the prefix is a file, use a head request, otherwise list
let is_dir = self.url.as_str().ends_with('/');
let list = match is_dir {
true => futures::stream::once(store.list(Some(&self.prefix)))
.try_flatten()
.boxed(),
true => match ctx.runtime_env().cache_manager.get_list_files_cache() {
None => futures::stream::once(store.list(Some(&self.prefix)))
.try_flatten()
.boxed(),
Some(cache) => {
if let Some(res) = cache.get(&self.prefix) {
debug!("Hit list all files cache");
futures::stream::iter(res.as_ref().clone().into_iter().map(Ok))
.boxed()
} else {
let list_res = store.list(Some(&self.prefix)).await;
let vec = list_res?.try_collect::<Vec<ObjectMeta>>().await?;
cache.put(&self.prefix, Arc::new(vec.clone()));
futures::stream::iter(vec.into_iter().map(Ok)).boxed()
}
}
},
false => futures::stream::once(store.head(&self.prefix)).boxed(),
};

list.map_err(Into::into)
Ok(list
.try_filter(move |meta| {
let path = &meta.location;
let extension_match = path.as_ref().ends_with(file_extension);
let glob_match = self.contains(path);
futures::future::ready(extension_match && glob_match)
})
.boxed()
.map_err(DataFusionError::ObjectStore)
.boxed())
}

/// Returns this [`ListingTableUrl`] as a string
Expand Down
15 changes: 12 additions & 3 deletions datafusion/core/src/test/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
// specific language governing permissions and limitations
// under the License.
//! Object store implementation used for testing
use crate::execution::context::SessionState;
use crate::prelude::SessionContext;
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::RuntimeEnv;
use futures::FutureExt;
use object_store::{memory::InMemory, path::Path, ObjectMeta, ObjectStore};
use std::sync::Arc;
Expand All @@ -25,11 +28,11 @@ use url::Url;
pub fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) {
let url = Url::parse("test://").unwrap();
ctx.runtime_env()
.register_object_store(&url, make_test_store(files));
.register_object_store(&url, make_test_store_and_state(files).0);
}

/// Create a test object store with the provided files
pub fn make_test_store(files: &[(&str, u64)]) -> Arc<dyn ObjectStore> {
pub fn make_test_store_and_state(files: &[(&str, u64)]) -> (Arc<InMemory>, SessionState) {
let memory = InMemory::new();

for (name, size) in files {
Expand All @@ -40,7 +43,13 @@ pub fn make_test_store(files: &[(&str, u64)]) -> Arc<dyn ObjectStore> {
.unwrap();
}

Arc::new(memory)
(
Arc::new(memory),
SessionState::new_with_config_rt(
SessionConfig::default(),
Arc::new(RuntimeEnv::default()),
),
)
}

/// Helper method to fetch the file size and date at given path and create a `ObjectMeta`
Expand Down
Loading

0 comments on commit 093b775

Please sign in to comment.