Skip to content

fix: repartitioned reads of CSV with custom line terminator #13677

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,11 +612,13 @@ impl FileOpener for CsvOpener {
}

let store = Arc::clone(&self.config.object_store);
let terminator = self.config.terminator;

Ok(Box::pin(async move {
// Current partition contains bytes [start_byte, end_byte) (might contain incomplete lines at boundaries)

let calculated_range = calculate_range(&file_meta, &store).await?;
let calculated_range =
calculate_range(&file_meta, &store, terminator).await?;

let range = match calculated_range {
RangeCalculation::Range(None) => None,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ impl FileOpener for JsonOpener {
let file_compression_type = self.file_compression_type.to_owned();

Ok(Box::pin(async move {
let calculated_range = calculate_range(&file_meta, &store).await?;
let calculated_range = calculate_range(&file_meta, &store, None).await?;

let range = match calculated_range {
RangeCalculation::Range(None) => None,
Expand Down
11 changes: 7 additions & 4 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,23 +426,25 @@ enum RangeCalculation {
async fn calculate_range(
file_meta: &FileMeta,
store: &Arc<dyn ObjectStore>,
terminator: Option<u8>,
) -> Result<RangeCalculation> {
let location = file_meta.location();
let file_size = file_meta.object_meta.size;
let newline = terminator.unwrap_or(b'\n');

match file_meta.range {
None => Ok(RangeCalculation::Range(None)),
Some(FileRange { start, end }) => {
let (start, end) = (start as usize, end as usize);

let start_delta = if start != 0 {
find_first_newline(store, location, start - 1, file_size).await?
find_first_newline(store, location, start - 1, file_size, newline).await?
} else {
0
};

let end_delta = if end != file_size {
find_first_newline(store, location, end - 1, file_size).await?
find_first_newline(store, location, end - 1, file_size, newline).await?
} else {
0
};
Expand All @@ -462,7 +464,7 @@ async fn calculate_range(
/// within an object, such as a file, in an object store.
///
/// This function scans the contents of the object starting from the specified `start` position
/// up to the `end` position, looking for the first occurrence of a newline (`'\n'`) character.
/// up to the `end` position, looking for the first occurrence of a newline character.
/// It returns the position of the first newline relative to the start of the range.
///
/// Returns a `Result` wrapping a `usize` that represents the position of the first newline character found within the specified range. If no newline is found, it returns the length of the scanned data, effectively indicating the end of the range.
Expand All @@ -474,6 +476,7 @@ async fn find_first_newline(
location: &Path,
start: usize,
end: usize,
newline: u8,
) -> Result<usize> {
let options = GetOptions {
range: Some(GetRange::Bounded(start..end)),
Expand All @@ -486,7 +489,7 @@ async fn find_first_newline(
let mut index = 0;

while let Some(chunk) = result_stream.next().await.transpose()? {
if let Some(position) = chunk.iter().position(|&byte| byte == b'\n') {
if let Some(position) = chunk.iter().position(|&byte| byte == newline) {
return Ok(index + position);
}

Expand Down
36 changes: 27 additions & 9 deletions datafusion/sqllogictest/test_files/csv_files.slt
Original file line number Diff line number Diff line change
Expand Up @@ -350,15 +350,33 @@ col2 TEXT
LOCATION '../core/tests/data/cr_terminator.csv'
OPTIONS ('format.terminator' E'\r', 'format.has_header' 'true');

# TODO: It should be passed but got the error: External error: query failed: DataFusion error: Object Store error: Generic LocalFileSystem error: Requested range was invalid
# See the issue: https://github.com/apache/datafusion/issues/12328
# query TT
# select * from stored_table_with_cr_terminator;
# ----
# id0 value0
# id1 value1
# id2 value2
# id3 value3
# Check single-thread reading of CSV with custom line terminator
statement ok
SET datafusion.optimizer.repartition_file_min_size = 10485760;

query TT
select * from stored_table_with_cr_terminator;
----
id0 value0
id1 value1
id2 value2
id3 value3

# Check repartitioned reading of CSV with custom line terminator
statement ok
SET datafusion.optimizer.repartition_file_min_size = 1;

query TT
select * from stored_table_with_cr_terminator order by col1;
----
id0 value0
id1 value1
id2 value2
id3 value3

# Reset repartition_file_min_size to default value
statement ok
SET datafusion.optimizer.repartition_file_min_size = 10485760;

statement ok
drop table stored_table_with_cr_terminator;
Loading