-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support the custom terminator for the CSV file format #12263
Changes from 2 commits
dd1110b
b98980b
f2ff11f
8a1c9c7
6116f9e
7b60e54
4597f04
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,6 +47,7 @@ async fn main() -> Result<()> { | |
true, | ||
b',', | ||
b'"', | ||
None, | ||
object_store, | ||
Some(b'#'), | ||
); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -77,6 +77,7 @@ pub struct CsvExec { | |
has_header: bool, | ||
delimiter: u8, | ||
quote: u8, | ||
terminator: Option<u8>, | ||
escape: Option<u8>, | ||
comment: Option<u8>, | ||
newlines_in_values: bool, | ||
|
@@ -98,6 +99,7 @@ pub struct CsvExecBuilder { | |
has_header: bool, | ||
delimiter: u8, | ||
quote: u8, | ||
terminator: Option<u8>, | ||
escape: Option<u8>, | ||
comment: Option<u8>, | ||
newlines_in_values: bool, | ||
|
@@ -112,6 +114,7 @@ impl CsvExecBuilder { | |
has_header: false, | ||
delimiter: b',', | ||
quote: b'"', | ||
terminator: None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if this option should have any interactions with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did some tests. I think that let session_ctx = SessionContext::new();
let store = object_store::memory::InMemory::new();
let data = bytes::Bytes::from("a,b\r1,\"2\na\"\r3,\"4\nb\"");
let path = object_store::path::Path::from("a.csv");
store.put(&path, data.into()).await.unwrap();
let url = Url::parse("memory://").unwrap();
session_ctx.register_object_store(&url, Arc::new(store));
let df = session_ctx
.read_csv("memory:///", CsvReadOptions::new().newlines_in_values(true).terminator(Some(b'\r')))
.await
.unwrap();
let result = df.collect().await.unwrap();
assert_eq!(result[0].num_rows(), 2); |
||
escape: None, | ||
comment: None, | ||
newlines_in_values: false, | ||
|
@@ -143,6 +146,14 @@ impl CsvExecBuilder { | |
self | ||
} | ||
|
||
/// Set the line terminator. If not set, the default is CRLF. | ||
/// | ||
/// The default is None. | ||
pub fn with_terminator(mut self, terminator: Option<u8>) -> Self { | ||
self.terminator = terminator; | ||
self | ||
} | ||
|
||
/// Set the escape character. | ||
/// | ||
/// The default is `None` (i.e. quotes cannot be escaped). | ||
|
@@ -191,6 +202,7 @@ impl CsvExecBuilder { | |
has_header, | ||
delimiter, | ||
quote, | ||
terminator, | ||
escape, | ||
comment, | ||
newlines_in_values, | ||
|
@@ -210,6 +222,7 @@ impl CsvExecBuilder { | |
has_header, | ||
delimiter, | ||
quote, | ||
terminator, | ||
escape, | ||
newlines_in_values, | ||
metrics: ExecutionPlanMetricsSet::new(), | ||
|
@@ -229,6 +242,7 @@ impl CsvExec { | |
has_header: bool, | ||
delimiter: u8, | ||
quote: u8, | ||
terminator: Option<u8>, | ||
escape: Option<u8>, | ||
comment: Option<u8>, | ||
newlines_in_values: bool, | ||
|
@@ -238,6 +252,7 @@ impl CsvExec { | |
.with_has_header(has_header) | ||
.with_delimeter(delimiter) | ||
.with_quote(quote) | ||
.with_terminator(terminator) | ||
.with_escape(escape) | ||
.with_comment(comment) | ||
.with_newlines_in_values(newlines_in_values) | ||
|
@@ -270,6 +285,11 @@ impl CsvExec { | |
self.quote | ||
} | ||
|
||
/// The line terminator | ||
pub fn terminator(&self) -> Option<u8> { | ||
self.terminator | ||
} | ||
|
||
/// Lines beginning with this byte are ignored. | ||
pub fn comment(&self) -> Option<u8> { | ||
self.comment | ||
|
@@ -406,10 +426,10 @@ impl ExecutionPlan for CsvExec { | |
delimiter: self.delimiter, | ||
quote: self.quote, | ||
escape: self.escape, | ||
terminator: self.terminator, | ||
object_store, | ||
comment: self.comment, | ||
}); | ||
|
||
let opener = CsvOpener { | ||
config, | ||
file_compression_type: self.file_compression_type.to_owned(), | ||
|
@@ -441,6 +461,7 @@ impl ExecutionPlan for CsvExec { | |
delimiter: self.delimiter, | ||
quote: self.quote, | ||
escape: self.escape, | ||
terminator: self.terminator, | ||
comment: self.comment, | ||
newlines_in_values: self.newlines_in_values, | ||
metrics: self.metrics.clone(), | ||
|
@@ -459,6 +480,7 @@ pub struct CsvConfig { | |
has_header: bool, | ||
delimiter: u8, | ||
quote: u8, | ||
terminator: Option<u8>, | ||
escape: Option<u8>, | ||
object_store: Arc<dyn ObjectStore>, | ||
comment: Option<u8>, | ||
|
@@ -474,6 +496,7 @@ impl CsvConfig { | |
has_header: bool, | ||
delimiter: u8, | ||
quote: u8, | ||
terminator: Option<u8>, | ||
object_store: Arc<dyn ObjectStore>, | ||
comment: Option<u8>, | ||
) -> Self { | ||
|
@@ -484,6 +507,7 @@ impl CsvConfig { | |
has_header, | ||
delimiter, | ||
quote, | ||
terminator, | ||
escape: None, | ||
object_store, | ||
comment, | ||
|
@@ -493,6 +517,7 @@ impl CsvConfig { | |
|
||
impl CsvConfig { | ||
fn open<R: Read>(&self, reader: R) -> Result<csv::Reader<R>> { | ||
dbg!(&self.terminator); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this line be cleaned up? |
||
Ok(self.builder().build(reader)?) | ||
} | ||
|
||
|
@@ -502,7 +527,9 @@ impl CsvConfig { | |
.with_batch_size(self.batch_size) | ||
.with_header(self.has_header) | ||
.with_quote(self.quote); | ||
|
||
if let Some(terminator) = self.terminator { | ||
builder = builder.with_terminator(terminator); | ||
} | ||
if let Some(proj) = &self.file_projection { | ||
builder = builder.with_projection(proj.clone()); | ||
} | ||
|
@@ -529,6 +556,7 @@ impl CsvOpener { | |
config: Arc<CsvConfig>, | ||
file_compression_type: FileCompressionType, | ||
) -> Self { | ||
dbg!(&config); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one, probably, should be cleaned up too There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, I missed to remove them 😢 |
||
Self { | ||
config, | ||
file_compression_type, | ||
|
@@ -775,6 +803,7 @@ mod tests { | |
.with_has_header(true) | ||
.with_delimeter(b',') | ||
.with_quote(b'"') | ||
.with_terminator(None) | ||
.with_escape(None) | ||
.with_comment(None) | ||
.with_newlines_in_values(false) | ||
|
@@ -844,6 +873,7 @@ mod tests { | |
.with_has_header(true) | ||
.with_delimeter(b',') | ||
.with_quote(b'"') | ||
.with_terminator(None) | ||
.with_escape(None) | ||
.with_comment(None) | ||
.with_newlines_in_values(false) | ||
|
@@ -913,6 +943,7 @@ mod tests { | |
.with_has_header(true) | ||
.with_delimeter(b',') | ||
.with_quote(b'"') | ||
.with_terminator(None) | ||
.with_escape(None) | ||
.with_comment(None) | ||
.with_newlines_in_values(false) | ||
|
@@ -979,6 +1010,7 @@ mod tests { | |
.with_has_header(true) | ||
.with_delimeter(b',') | ||
.with_quote(b'"') | ||
.with_terminator(None) | ||
.with_escape(None) | ||
.with_comment(None) | ||
.with_newlines_in_values(false) | ||
|
@@ -1044,6 +1076,7 @@ mod tests { | |
.with_has_header(true) | ||
.with_delimeter(b',') | ||
.with_quote(b'"') | ||
.with_terminator(None) | ||
.with_escape(None) | ||
.with_comment(None) | ||
.with_newlines_in_values(false) | ||
|
@@ -1139,6 +1172,7 @@ mod tests { | |
.with_has_header(true) | ||
.with_delimeter(b',') | ||
.with_quote(b'"') | ||
.with_terminator(None) | ||
.with_escape(None) | ||
.with_comment(None) | ||
.with_newlines_in_values(false) | ||
|
@@ -1210,6 +1244,44 @@ mod tests { | |
crate::assert_batches_eq!(expected, &result); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_terminator() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps there also should be added sqllogictests for reading files with customized line terminator (also checking that nothing is broken when terminator used along with newlines_in_values). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I got two issues when creating the sqllogictests case:
However, there are some issues for parsing the value.
It seems that
It works fine but it's not valid for the config value. Is there any way to correctly create There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it works only for single character -- probably ConfigField implementation for u8 should be enhanced, as right now it doesn't accept escape sequences like (same works for the delitmiter -- I wasn't able to read tab-separated file via SQL 🤔 ) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To solve the ConfigField issue, I support |
||
let session_ctx = SessionContext::new(); | ||
let store = object_store::memory::InMemory::new(); | ||
|
||
let data = bytes::Bytes::from("a,b\r1,2\r3,4"); | ||
let path = object_store::path::Path::from("a.csv"); | ||
store.put(&path, data.into()).await.unwrap(); | ||
|
||
let url = Url::parse("memory://").unwrap(); | ||
session_ctx.register_object_store(&url, Arc::new(store)); | ||
|
||
let df = session_ctx | ||
.read_csv("memory:///", CsvReadOptions::new().terminator(Some(b'\r'))) | ||
.await | ||
.unwrap(); | ||
|
||
let result = df.collect().await.unwrap(); | ||
|
||
let expected = [ | ||
"+---+---+", | ||
"| a | b |", | ||
"+---+---+", | ||
"| 1 | 2 |", | ||
"| 3 | 4 |", | ||
"+---+---+", | ||
]; | ||
|
||
crate::assert_batches_eq!(expected, &result); | ||
|
||
match session_ctx | ||
.read_csv("memory:///", CsvReadOptions::new().terminator(Some(b'\n'))) | ||
.await.unwrap().collect().await { | ||
Ok(_) => panic!("Expected error"), | ||
Err(e) => assert_eq!(e.strip_backtrace(), "Arrow error: Csv error: incorrect number of fields for line 1, expected 2 got more than 2"), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI you can also use Something like let e = session_ctx
.read_csv("memory:///", CsvReadOptions::new().terminator(Some(b'\n')))
.await.unwrap().collect().await.unwrap_err();
assert_eq!(e.strip_backtrace(), "Arrow error: Csv error: incorrect number of fields for line 1, expected 2 got more than 2"), There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks! It looks better. |
||
} | ||
} | ||
|
||
#[tokio::test] | ||
async fn write_csv_results_error_handling() -> Result<()> { | ||
let ctx = SessionContext::new(); | ||
|
@@ -1365,6 +1437,7 @@ mod tests { | |
has_header, | ||
delimiter, | ||
quote, | ||
terminator, | ||
escape, | ||
comment, | ||
newlines_in_values, | ||
|
@@ -1374,6 +1447,7 @@ mod tests { | |
assert_eq!(has_header, default_options.has_header.unwrap_or(false)); | ||
assert_eq!(delimiter, default_options.delimiter); | ||
assert_eq!(quote, default_options.quote); | ||
assert_eq!(terminator, default_options.terminator); | ||
assert_eq!(escape, default_options.escape); | ||
assert_eq!(comment, default_options.comment); | ||
assert_eq!( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's None as stated above -- won't it be CRLF by default (which is slightly more than
\n
according to csv_core docs used internally by arrow-csv)?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. I missed modifying this from my first version design. Thanks.