Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support the custom terminator for the CSV file format #12263

Merged
merged 7 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion-examples/examples/csv_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ async fn main() -> Result<()> {
true,
b',',
b'"',
None,
object_store,
Some(b'#'),
);
Expand Down
13 changes: 13 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1628,6 +1628,7 @@ config_namespace! {
pub has_header: Option<bool>, default = None
pub delimiter: u8, default = b','
pub quote: u8, default = b'"'
pub terminator: Option<u8>, default = None
pub escape: Option<u8>, default = None
pub double_quote: Option<bool>, default = None
/// Specifies whether newlines in (quoted) values are supported.
Expand Down Expand Up @@ -1696,6 +1697,13 @@ impl CsvOptions {
self
}

/// The character that terminates a row.
/// - default to '\n'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's None as stated above -- won't it be CRLF by default (which is slightly more than \n according to csv_core docs used internally by arrow-csv)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. I missed modifying this from my first version design. Thanks.

pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
self.terminator = terminator;
self
}

/// The escape character in a row.
/// - default is None
pub fn with_escape(mut self, escape: Option<u8>) -> Self {
Expand Down Expand Up @@ -1742,6 +1750,11 @@ impl CsvOptions {
self.quote
}

/// The terminator character.
pub fn terminator(&self) -> Option<u8> {
self.terminator
}

/// The escape character.
pub fn escape(&self) -> Option<u8> {
self.escape
Expand Down
8 changes: 8 additions & 0 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,13 @@ impl CsvFormat {
self
}

/// The character used to indicate the end of a row.
/// - default to None (CRLF)
pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
self.options.terminator = terminator;
self
}

/// Specifies whether newlines in (quoted) values are supported.
///
/// Parsing newlines in quoted values may be affected by execution behaviour such as
Expand Down Expand Up @@ -359,6 +366,7 @@ impl FileFormat for CsvFormat {
.with_has_header(has_header)
.with_delimeter(self.options.delimiter)
.with_quote(self.options.quote)
.with_terminator(self.options.terminator)
.with_escape(self.options.escape)
.with_comment(self.options.comment)
.with_newlines_in_values(newlines_in_values)
Expand Down
10 changes: 10 additions & 0 deletions datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub struct CsvReadOptions<'a> {
pub delimiter: u8,
/// An optional quote character. Defaults to `b'"'`.
pub quote: u8,
/// An optional terminator character. Defaults to None (CRLF).
pub terminator: Option<u8>,
/// An optional escape character. Defaults to None.
pub escape: Option<u8>,
/// If enabled, lines beginning with this byte are ignored.
Expand Down Expand Up @@ -102,6 +104,7 @@ impl<'a> CsvReadOptions<'a> {
schema_infer_max_records: DEFAULT_SCHEMA_INFER_MAX_RECORD,
delimiter: b',',
quote: b'"',
terminator: None,
escape: None,
newlines_in_values: false,
file_extension: DEFAULT_CSV_EXTENSION,
Expand Down Expand Up @@ -136,6 +139,12 @@ impl<'a> CsvReadOptions<'a> {
self
}

/// Specify terminator to use for CSV read
pub fn terminator(mut self, terminator: Option<u8>) -> Self {
self.terminator = terminator;
self
}

/// Specify delimiter to use for CSV read
pub fn escape(mut self, escape: u8) -> Self {
self.escape = Some(escape);
Expand Down Expand Up @@ -511,6 +520,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
.with_delimiter(self.delimiter)
.with_quote(self.quote)
.with_escape(self.escape)
.with_terminator(self.terminator)
.with_newlines_in_values(self.newlines_in_values)
.with_schema_infer_max_rec(self.schema_infer_max_records)
.with_file_compression_type(self.file_compression_type.to_owned());
Expand Down
78 changes: 76 additions & 2 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pub struct CsvExec {
has_header: bool,
delimiter: u8,
quote: u8,
terminator: Option<u8>,
escape: Option<u8>,
comment: Option<u8>,
newlines_in_values: bool,
Expand All @@ -98,6 +99,7 @@ pub struct CsvExecBuilder {
has_header: bool,
delimiter: u8,
quote: u8,
terminator: Option<u8>,
escape: Option<u8>,
comment: Option<u8>,
newlines_in_values: bool,
Expand All @@ -112,6 +114,7 @@ impl CsvExecBuilder {
has_header: false,
delimiter: b',',
quote: b'"',
terminator: None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this option should have any interactions with newlines_in_values, or somehow affect execution of CSVExec (if newlines_in_values used anywhere to actually check input data -- I wasn't able to find such a usages for it, but still not sure).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some tests. I think that newlines_in_values won't be impacted by terminator. No matter how I change the terminator, the number of rows is the same.

        let session_ctx = SessionContext::new();
        let store = object_store::memory::InMemory::new();

        let data = bytes::Bytes::from("a,b\r1,\"2\na\"\r3,\"4\nb\"");
        let path = object_store::path::Path::from("a.csv");
        store.put(&path, data.into()).await.unwrap();

        let url = Url::parse("memory://").unwrap();
        session_ctx.register_object_store(&url, Arc::new(store));

        let df = session_ctx
            .read_csv("memory:///", CsvReadOptions::new().newlines_in_values(true).terminator(Some(b'\r')))
            .await
            .unwrap();

        let result = df.collect().await.unwrap();
        assert_eq!(result[0].num_rows(), 2);

escape: None,
comment: None,
newlines_in_values: false,
Expand Down Expand Up @@ -143,6 +146,14 @@ impl CsvExecBuilder {
self
}

/// Set the line terminator. If not set, the default is CRLF.
///
/// The default is None.
pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
self.terminator = terminator;
self
}

/// Set the escape character.
///
/// The default is `None` (i.e. quotes cannot be escaped).
Expand Down Expand Up @@ -191,6 +202,7 @@ impl CsvExecBuilder {
has_header,
delimiter,
quote,
terminator,
escape,
comment,
newlines_in_values,
Expand All @@ -210,6 +222,7 @@ impl CsvExecBuilder {
has_header,
delimiter,
quote,
terminator,
escape,
newlines_in_values,
metrics: ExecutionPlanMetricsSet::new(),
Expand All @@ -229,6 +242,7 @@ impl CsvExec {
has_header: bool,
delimiter: u8,
quote: u8,
terminator: Option<u8>,
escape: Option<u8>,
comment: Option<u8>,
newlines_in_values: bool,
Expand All @@ -238,6 +252,7 @@ impl CsvExec {
.with_has_header(has_header)
.with_delimeter(delimiter)
.with_quote(quote)
.with_terminator(terminator)
.with_escape(escape)
.with_comment(comment)
.with_newlines_in_values(newlines_in_values)
Expand Down Expand Up @@ -270,6 +285,11 @@ impl CsvExec {
self.quote
}

/// The line terminator
pub fn terminator(&self) -> Option<u8> {
self.terminator
}

/// Lines beginning with this byte are ignored.
pub fn comment(&self) -> Option<u8> {
self.comment
Expand Down Expand Up @@ -406,10 +426,10 @@ impl ExecutionPlan for CsvExec {
delimiter: self.delimiter,
quote: self.quote,
escape: self.escape,
terminator: self.terminator,
object_store,
comment: self.comment,
});

let opener = CsvOpener {
config,
file_compression_type: self.file_compression_type.to_owned(),
Expand Down Expand Up @@ -441,6 +461,7 @@ impl ExecutionPlan for CsvExec {
delimiter: self.delimiter,
quote: self.quote,
escape: self.escape,
terminator: self.terminator,
comment: self.comment,
newlines_in_values: self.newlines_in_values,
metrics: self.metrics.clone(),
Expand All @@ -459,6 +480,7 @@ pub struct CsvConfig {
has_header: bool,
delimiter: u8,
quote: u8,
terminator: Option<u8>,
escape: Option<u8>,
object_store: Arc<dyn ObjectStore>,
comment: Option<u8>,
Expand All @@ -474,6 +496,7 @@ impl CsvConfig {
has_header: bool,
delimiter: u8,
quote: u8,
terminator: Option<u8>,
object_store: Arc<dyn ObjectStore>,
comment: Option<u8>,
) -> Self {
Expand All @@ -484,6 +507,7 @@ impl CsvConfig {
has_header,
delimiter,
quote,
terminator,
escape: None,
object_store,
comment,
Expand All @@ -493,6 +517,7 @@ impl CsvConfig {

impl CsvConfig {
fn open<R: Read>(&self, reader: R) -> Result<csv::Reader<R>> {
dbg!(&self.terminator);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this line be cleaned up?

Ok(self.builder().build(reader)?)
}

Expand All @@ -502,7 +527,9 @@ impl CsvConfig {
.with_batch_size(self.batch_size)
.with_header(self.has_header)
.with_quote(self.quote);

if let Some(terminator) = self.terminator {
builder = builder.with_terminator(terminator);
}
if let Some(proj) = &self.file_projection {
builder = builder.with_projection(proj.clone());
}
Expand All @@ -529,6 +556,7 @@ impl CsvOpener {
config: Arc<CsvConfig>,
file_compression_type: FileCompressionType,
) -> Self {
dbg!(&config);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one, probably, should be cleaned up too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I missed to remove them 😢

Self {
config,
file_compression_type,
Expand Down Expand Up @@ -775,6 +803,7 @@ mod tests {
.with_has_header(true)
.with_delimeter(b',')
.with_quote(b'"')
.with_terminator(None)
.with_escape(None)
.with_comment(None)
.with_newlines_in_values(false)
Expand Down Expand Up @@ -844,6 +873,7 @@ mod tests {
.with_has_header(true)
.with_delimeter(b',')
.with_quote(b'"')
.with_terminator(None)
.with_escape(None)
.with_comment(None)
.with_newlines_in_values(false)
Expand Down Expand Up @@ -913,6 +943,7 @@ mod tests {
.with_has_header(true)
.with_delimeter(b',')
.with_quote(b'"')
.with_terminator(None)
.with_escape(None)
.with_comment(None)
.with_newlines_in_values(false)
Expand Down Expand Up @@ -979,6 +1010,7 @@ mod tests {
.with_has_header(true)
.with_delimeter(b',')
.with_quote(b'"')
.with_terminator(None)
.with_escape(None)
.with_comment(None)
.with_newlines_in_values(false)
Expand Down Expand Up @@ -1044,6 +1076,7 @@ mod tests {
.with_has_header(true)
.with_delimeter(b',')
.with_quote(b'"')
.with_terminator(None)
.with_escape(None)
.with_comment(None)
.with_newlines_in_values(false)
Expand Down Expand Up @@ -1139,6 +1172,7 @@ mod tests {
.with_has_header(true)
.with_delimeter(b',')
.with_quote(b'"')
.with_terminator(None)
.with_escape(None)
.with_comment(None)
.with_newlines_in_values(false)
Expand Down Expand Up @@ -1210,6 +1244,44 @@ mod tests {
crate::assert_batches_eq!(expected, &result);
}

#[tokio::test]
async fn test_terminator() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps there also should be added sqllogictests for reading files with customized line terminator (also checking that nothing is broken when terminator used along with newlines_in_values).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got two issues when creating the sqllogictests case:

  1. I prepared a test file breaking the line by CR. However, it seems that object store doesn't support this by default.
External error: query failed: DataFusion error: Object Store error: Generic LocalFileSystem error: Requested range was invalid
  1. I tried to create an external table with the terminator config like
statement ok
CREATE EXTERNAL TABLE stored_table_with_cf_terminator (
col1 TEXT,
col2 TEXT
) STORED AS CSV
LOCATION '../core/tests/data/cr_terminator.csv'
OPTIONS ('format.terminator' '\r', 'format.has_header' 'true');

However, there are some issues for parsing the value.

External error: statement failed: DataFusion error: Invalid or Unsupported Configuration: Error parsing \r as u8. Non-ASCII string provided

It seems that \r is parsed to two different ASCII but it is one. 🤔
I tried to select \r directly

> select '\t';
+-----------+
| Utf8("	") |
+-----------+
| 	         |
+-----------+
1 row(s) fetched. 
Elapsed 0.003 seconds.

It works fine but it's not valid for the config value. Is there any way to correctly create \r character for the config value?

Copy link
Contributor

@korowa korowa Sep 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it works only for single character -- probably ConfigField implementation for u8 should be enhanced, as right now it doesn't accept escape sequences like \t \n etc.

(same works for the delitmiter -- I wasn't able to read tab-separated file via SQL 🤔 )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To solve the ConfigField issue, I support EscapedStringLiteral in 8a1c9c7. Now, we can use E'\r' to create the terminator. I think it also works for E'\t' or E'\n'.

let session_ctx = SessionContext::new();
let store = object_store::memory::InMemory::new();

let data = bytes::Bytes::from("a,b\r1,2\r3,4");
let path = object_store::path::Path::from("a.csv");
store.put(&path, data.into()).await.unwrap();

let url = Url::parse("memory://").unwrap();
session_ctx.register_object_store(&url, Arc::new(store));

let df = session_ctx
.read_csv("memory:///", CsvReadOptions::new().terminator(Some(b'\r')))
.await
.unwrap();

let result = df.collect().await.unwrap();

let expected = [
"+---+---+",
"| a | b |",
"+---+---+",
"| 1 | 2 |",
"| 3 | 4 |",
"+---+---+",
];

crate::assert_batches_eq!(expected, &result);

match session_ctx
.read_csv("memory:///", CsvReadOptions::new().terminator(Some(b'\n')))
.await.unwrap().collect().await {
Ok(_) => panic!("Expected error"),
Err(e) => assert_eq!(e.strip_backtrace(), "Arrow error: Csv error: incorrect number of fields for line 1, expected 2 got more than 2"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI you can also use unwrap_err here as well

Something like

let e = session_ctx
            .read_csv("memory:///", CsvReadOptions::new().terminator(Some(b'\n')))
            .await.unwrap().collect().await.unwrap_err();
assert_eq!(e.strip_backtrace(), "Arrow error: Csv error: incorrect number of fields for line 1, expected 2 got more than 2"),

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! It looks better.

}
}

#[tokio::test]
async fn write_csv_results_error_handling() -> Result<()> {
let ctx = SessionContext::new();
Expand Down Expand Up @@ -1365,6 +1437,7 @@ mod tests {
has_header,
delimiter,
quote,
terminator,
escape,
comment,
newlines_in_values,
Expand All @@ -1374,6 +1447,7 @@ mod tests {
assert_eq!(has_header, default_options.has_header.unwrap_or(false));
assert_eq!(delimiter, default_options.delimiter);
assert_eq!(quote, default_options.quote);
assert_eq!(terminator, default_options.terminator);
assert_eq!(escape, default_options.escape);
assert_eq!(comment, default_options.comment);
assert_eq!(
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ message CsvOptions {
bytes comment = 13; // Optional comment character as a byte
bytes double_quote = 14; // Indicates if quotes are doubled
bytes newlines_in_values = 15; // Indicates if newlines are supported in values
bytes terminator = 16; // Optional terminator character as a byte
}

// Options controlling CSV format
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,7 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
has_header: proto_opts.has_header.first().map(|h| *h != 0),
delimiter: proto_opts.delimiter[0],
quote: proto_opts.quote[0],
terminator: proto_opts.terminator.first().copied(),
escape: proto_opts.escape.first().copied(),
double_quote: proto_opts.has_header.first().map(|h| *h != 0),
newlines_in_values: proto_opts.newlines_in_values.first().map(|h| *h != 0),
Expand Down
Loading
Loading