Skip to content

Commit

Permalink
Partitioning fixes (apache#9207)
Browse files Browse the repository at this point in the history
* remove hive partition segments from segments

* add test

* avoid allocating vec when there is no glob, remove unused variable

* a few more small fixes

* tests for standard cases and small refactor
  • Loading branch information
esheppa authored Feb 14, 2024
1 parent 3e01174 commit 528fdfb
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 26 deletions.
56 changes: 54 additions & 2 deletions datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ mod tests {
use datafusion_execution::object_store::{
DefaultObjectStoreRegistry, ObjectStoreRegistry,
};
use object_store::local::LocalFileSystem;
use std::sync::Arc;
use object_store::{local::LocalFileSystem, path::Path};
use std::{ops::Not, sync::Arc};
use url::Url;

#[test]
Expand Down Expand Up @@ -185,4 +185,56 @@ mod tests {
let url = ListingTableUrl::parse("../").unwrap();
sut.get_store(url.as_ref()).unwrap();
}

#[test]
fn test_url_contains() {
let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap();

// standard case with default config
assert!(url.contains(
&Path::parse("/var/data/mytable/data.parquet").unwrap(),
true
));

// standard case with `ignore_subdirectory` set to false
assert!(url.contains(
&Path::parse("/var/data/mytable/data.parquet").unwrap(),
false
));

// as per documentation, when `ignore_subdirectory` is true, we should ignore files that aren't
// a direct child of the `url`
assert!(url
.contains(
&Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
true
)
.not());

// when we set `ignore_subdirectory` to false, we should not ignore the file
assert!(url.contains(
&Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
false
));

// as above, `ignore_subdirectory` is false, so we include the file
assert!(url.contains(
&Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
false
));

// in this case, we include the file even when `ignore_subdirectory` is true because the
// path segment is a hive partition which doesn't count as a subdirectory for the purposes
// of `Url::contains`
assert!(url.contains(
&Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
true
));

// testing an empty path with default config
assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), true));

// testing an empty path with `ignore_subdirectory` set to false
assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false));
}
}
51 changes: 27 additions & 24 deletions datafusion/core/src/datasource/listing/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use glob::Pattern;
use itertools::Itertools;
use log::debug;
use object_store::path::Path;
use object_store::path::DELIMITER;
use object_store::{ObjectMeta, ObjectStore};
use std::sync::Arc;
use url::Url;
Expand Down Expand Up @@ -189,34 +190,37 @@ impl ListingTableUrl {

/// Returns `true` if `path` matches this [`ListingTableUrl`]
pub fn contains(&self, path: &Path, ignore_subdirectory: bool) -> bool {
match self.strip_prefix(path) {
Some(mut segments) => match &self.glob {
Some(glob) => {
if ignore_subdirectory {
segments
.next()
.map_or(false, |file_name| glob.matches(file_name))
} else {
let stripped = segments.join("/");
glob.matches(&stripped)
}
}
None => {
if ignore_subdirectory {
let has_subdirectory = segments.collect::<Vec<_>>().len() > 1;
!has_subdirectory
} else {
true
}
let Some(all_segments) = self.strip_prefix(path) else {
return false;
};

// remove any segments that contain `=` as they are allowed even
// when ignore subdirectories is `true`.
let mut segments = all_segments.filter(|s| !s.contains('='));

match &self.glob {
Some(glob) => {
if ignore_subdirectory {
segments
.next()
.map_or(false, |file_name| glob.matches(file_name))
} else {
let stripped = segments.join(DELIMITER);
glob.matches(&stripped)
}
},
None => false,
}
// where we are ignoring subdirectories, we require
// the path to be either empty, or contain just the
// final file name segment.
None if ignore_subdirectory => segments.count() <= 1,
// in this case, any valid path at or below the url is allowed
None => true,
}
}

/// Returns `true` if `path` refers to a collection of objects
pub fn is_collection(&self) -> bool {
self.url.as_str().ends_with('/')
self.url.as_str().ends_with(DELIMITER)
}

/// Strips the prefix of this [`ListingTableUrl`] from the provided path, returning
Expand All @@ -225,7 +229,6 @@ impl ListingTableUrl {
&'a self,
path: &'b Path,
) -> Option<impl Iterator<Item = &'b str> + 'a> {
use object_store::path::DELIMITER;
let mut stripped = path.as_ref().strip_prefix(self.prefix.as_ref())?;
if !stripped.is_empty() && !self.prefix.as_ref().is_empty() {
stripped = stripped.strip_prefix(DELIMITER)?;
Expand All @@ -234,7 +237,7 @@ impl ListingTableUrl {
}

/// List all files identified by this [`ListingTableUrl`] for the provided `file_extension`
pub(crate) async fn list_all_files<'a>(
pub async fn list_all_files<'a>(
&'a self,
ctx: &'a SessionState,
store: &'a dyn ObjectStore,
Expand Down

0 comments on commit 528fdfb

Please sign in to comment.