From 1b4c0a4b86fdb981fb0927a126d68f68d1a3d536 Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Sat, 7 Dec 2024 16:00:07 +0200 Subject: [PATCH] fix: repartitioned reads of CSV with custom line terminator (#13677) --- .../core/src/datasource/physical_plan/csv.rs | 4 ++- .../core/src/datasource/physical_plan/json.rs | 2 +- .../core/src/datasource/physical_plan/mod.rs | 11 +++--- .../sqllogictest/test_files/csv_files.slt | 36 ++++++++++++++----- 4 files changed, 38 insertions(+), 15 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 0c41f69c7691..c54c663dca7d 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -612,11 +612,13 @@ impl FileOpener for CsvOpener { } let store = Arc::clone(&self.config.object_store); + let terminator = self.config.terminator; Ok(Box::pin(async move { // Current partition contains bytes [start_byte, end_byte) (might contain incomplete lines at boundaries) - let calculated_range = calculate_range(&file_meta, &store).await?; + let calculated_range = + calculate_range(&file_meta, &store, terminator).await?; let range = match calculated_range { RangeCalculation::Range(None) => None, diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index c07e8ca74543..5c70968fbb42 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -273,7 +273,7 @@ impl FileOpener for JsonOpener { let file_compression_type = self.file_compression_type.to_owned(); Ok(Box::pin(async move { - let calculated_range = calculate_range(&file_meta, &store).await?; + let calculated_range = calculate_range(&file_meta, &store, None).await?; let range = match calculated_range { RangeCalculation::Range(None) => None, diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 449b7bb43519..3146d124d9f1 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -426,9 +426,11 @@ enum RangeCalculation { async fn calculate_range( file_meta: &FileMeta, store: &Arc, + terminator: Option, ) -> Result { let location = file_meta.location(); let file_size = file_meta.object_meta.size; + let newline = terminator.unwrap_or(b'\n'); match file_meta.range { None => Ok(RangeCalculation::Range(None)), @@ -436,13 +438,13 @@ async fn calculate_range( let (start, end) = (start as usize, end as usize); let start_delta = if start != 0 { - find_first_newline(store, location, start - 1, file_size).await? + find_first_newline(store, location, start - 1, file_size, newline).await? } else { 0 }; let end_delta = if end != file_size { - find_first_newline(store, location, end - 1, file_size).await? + find_first_newline(store, location, end - 1, file_size, newline).await? } else { 0 }; @@ -462,7 +464,7 @@ async fn calculate_range( /// within an object, such as a file, in an object store. /// /// This function scans the contents of the object starting from the specified `start` position -/// up to the `end` position, looking for the first occurrence of a newline (`'\n'`) character. +/// up to the `end` position, looking for the first occurrence of a newline character. /// It returns the position of the first newline relative to the start of the range. /// /// Returns a `Result` wrapping a `usize` that represents the position of the first newline character found within the specified range. If no newline is found, it returns the length of the scanned data, effectively indicating the end of the range. @@ -474,6 +476,7 @@ async fn find_first_newline( location: &Path, start: usize, end: usize, + newline: u8, ) -> Result { let options = GetOptions { range: Some(GetRange::Bounded(start..end)), @@ -486,7 +489,7 @@ async fn find_first_newline( let mut index = 0; while let Some(chunk) = result_stream.next().await.transpose()? { - if let Some(position) = chunk.iter().position(|&byte| byte == b'\n') { + if let Some(position) = chunk.iter().position(|&byte| byte == newline) { return Ok(index + position); } diff --git a/datafusion/sqllogictest/test_files/csv_files.slt b/datafusion/sqllogictest/test_files/csv_files.slt index 01d0f4ac39bd..5906c6a19bb8 100644 --- a/datafusion/sqllogictest/test_files/csv_files.slt +++ b/datafusion/sqllogictest/test_files/csv_files.slt @@ -350,15 +350,33 @@ col2 TEXT LOCATION '../core/tests/data/cr_terminator.csv' OPTIONS ('format.terminator' E'\r', 'format.has_header' 'true'); -# TODO: It should be passed but got the error: External error: query failed: DataFusion error: Object Store error: Generic LocalFileSystem error: Requested range was invalid -# See the issue: https://github.com/apache/datafusion/issues/12328 -# query TT -# select * from stored_table_with_cr_terminator; -# ---- -# id0 value0 -# id1 value1 -# id2 value2 -# id3 value3 +# Check single-thread reading of CSV with custom line terminator +statement ok +SET datafusion.optimizer.repartition_file_min_size = 10485760; + +query TT +select * from stored_table_with_cr_terminator; +---- +id0 value0 +id1 value1 +id2 value2 +id3 value3 + +# Check repartitioned reading of CSV with custom line terminator +statement ok +SET datafusion.optimizer.repartition_file_min_size = 1; + +query TT +select * from stored_table_with_cr_terminator order by col1; +---- +id0 value0 +id1 value1 +id2 value2 +id3 value3 + +# Reset repartition_file_min_size to default value +statement ok +SET datafusion.optimizer.repartition_file_min_size = 10485760; statement ok drop table stored_table_with_cr_terminator;