Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support the custom terminator for the CSV file format #12263

Merged
merged 7 commits into from
Sep 5, 2024

Conversation

goldmedal
Copy link
Contributor

@goldmedal goldmedal commented Aug 30, 2024

Which issue does this PR close?

Closes #12215

Rationale for this change

What changes are included in this PR?

Now, we can customize the record delimiter (terminator) when creating a dataframe like

let df = session_ctx
    .read_csv("memory:///", CsvReadOptions::new().terminator(Some(b'\r')))
    .await
    .unwrap();

The default value is None which means Arrow will use CRLF as the terminator.
https://docs.rs/csv-core/latest/csv_core/struct.ReaderBuilder.html#method.terminator

Are these changes tested?

yes

Are there any user-facing changes?

New API for the CSV Options.

@github-actions github-actions bot added core Core DataFusion crate common Related to common crate proto Related to proto crate labels Aug 30, 2024
@goldmedal goldmedal marked this pull request as ready for review August 30, 2024 15:04
Copy link
Contributor

@korowa korowa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @goldmedal

I've left some comments, and did I get it right that writing support is not impleented as it's not supported by arrow-csv (yet)?

@@ -1696,6 +1697,13 @@ impl CsvOptions {
self
}

/// The character that terminates a row.
/// - default to '\n'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's None as stated above -- won't it be CRLF by default (which is slightly more than \n according to csv_core docs used internally by arrow-csv)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. I missed modifying this from my first version design. Thanks.

@@ -493,6 +517,7 @@ impl CsvConfig {

impl CsvConfig {
fn open<R: Read>(&self, reader: R) -> Result<csv::Reader<R>> {
dbg!(&self.terminator);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this line be cleaned up?

@@ -529,6 +556,7 @@ impl CsvOpener {
config: Arc<CsvConfig>,
file_compression_type: FileCompressionType,
) -> Self {
dbg!(&config);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one, probably, should be cleaned up too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I missed to remove them 😢

@@ -112,6 +114,7 @@ impl CsvExecBuilder {
has_header: false,
delimiter: b',',
quote: b'"',
terminator: None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this option should have any interactions with newlines_in_values, or somehow affect execution of CSVExec (if newlines_in_values used anywhere to actually check input data -- I wasn't able to find such a usages for it, but still not sure).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some tests. I think that newlines_in_values won't be impacted by terminator. No matter how I change the terminator, the number of rows is the same.

        let session_ctx = SessionContext::new();
        let store = object_store::memory::InMemory::new();

        let data = bytes::Bytes::from("a,b\r1,\"2\na\"\r3,\"4\nb\"");
        let path = object_store::path::Path::from("a.csv");
        store.put(&path, data.into()).await.unwrap();

        let url = Url::parse("memory://").unwrap();
        session_ctx.register_object_store(&url, Arc::new(store));

        let df = session_ctx
            .read_csv("memory:///", CsvReadOptions::new().newlines_in_values(true).terminator(Some(b'\r')))
            .await
            .unwrap();

        let result = df.collect().await.unwrap();
        assert_eq!(result[0].num_rows(), 2);

@@ -1210,6 +1244,44 @@ mod tests {
crate::assert_batches_eq!(expected, &result);
}

#[tokio::test]
async fn test_terminator() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps there also should be added sqllogictests for reading files with customized line terminator (also checking that nothing is broken when terminator used along with newlines_in_values).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got two issues when creating the sqllogictests case:

  1. I prepared a test file breaking the line by CR. However, it seems that object store doesn't support this by default.
External error: query failed: DataFusion error: Object Store error: Generic LocalFileSystem error: Requested range was invalid
  1. I tried to create an external table with the terminator config like
statement ok
CREATE EXTERNAL TABLE stored_table_with_cf_terminator (
col1 TEXT,
col2 TEXT
) STORED AS CSV
LOCATION '../core/tests/data/cr_terminator.csv'
OPTIONS ('format.terminator' '\r', 'format.has_header' 'true');

However, there are some issues for parsing the value.

External error: statement failed: DataFusion error: Invalid or Unsupported Configuration: Error parsing \r as u8. Non-ASCII string provided

It seems that \r is parsed to two different ASCII but it is one. 🤔
I tried to select \r directly

> select '\t';
+-----------+
| Utf8("	") |
+-----------+
| 	         |
+-----------+
1 row(s) fetched. 
Elapsed 0.003 seconds.

It works fine but it's not valid for the config value. Is there any way to correctly create \r character for the config value?

Copy link
Contributor

@korowa korowa Sep 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it works only for single character -- probably ConfigField implementation for u8 should be enhanced, as right now it doesn't accept escape sequences like \t \n etc.

(same works for the delitmiter -- I wasn't able to read tab-separated file via SQL 🤔 )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To solve the ConfigField issue, I support EscapedStringLiteral in 8a1c9c7. Now, we can use E'\r' to create the terminator. I think it also works for E'\t' or E'\n'.

@goldmedal
Copy link
Contributor Author

I've left some comments, and did I get it right that writing support is not impleented as it's not supported by arrow-csv (yet)?

Thanks @korowa for reviewing. I think the custom terminator for writing isn't supported by arrow-csv yet. I checked the doc of WriterBuilder and the source code in master branch. I can't find it.

@github-actions github-actions bot added sql SQL Planner sqllogictest SQL Logic Tests (.slt) labels Sep 3, 2024
Comment on lines 348 to 350
# TODO: It should be passed but got the error: External error: query failed: DataFusion error: Object Store error: Generic LocalFileSystem error: Requested range was invalid
# query TT
# select * from stored_table_with_cr_terminator;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still got this error in the sqllogictests but I don't know why. The same case in datafusion/core/src/datasource/physical_plan/csv.rs works fine. I tried to find if there are any special configs for object_store in the sqllogictests but I found nothing. 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I bet it is a bug and not related to this PR

What I think we should do is file a ticket tracking the issue and then merge this PR and work on figuring it out as a follow on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reminding this bug. However, I'm not sure what the root cause of this issue is, but I can help by drafting an issue and adding a reference to this test. You can edit it when you're available. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed #12328 to trace this issue. Feel free to edit it or add more information.

@alamb
Copy link
Contributor

alamb commented Sep 3, 2024

FYI @Blizzara

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @goldmedal and @korowa -- I think this PR is looking very nice now.

To get this merged, I suggest we file the ticket about sql level failure and add a reference to the test. I can do this over the next day or two if no one beats me to it

.read_csv("memory:///", CsvReadOptions::new().terminator(Some(b'\n')))
.await.unwrap().collect().await {
Ok(_) => panic!("Expected error"),
Err(e) => assert_eq!(e.strip_backtrace(), "Arrow error: Csv error: incorrect number of fields for line 1, expected 2 got more than 2"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI you can also use unwrap_err here as well

Something like

let e = session_ctx
            .read_csv("memory:///", CsvReadOptions::new().terminator(Some(b'\n')))
            .await.unwrap().collect().await.unwrap_err();
assert_eq!(e.strip_backtrace(), "Arrow error: Csv error: incorrect number of fields for line 1, expected 2 got more than 2"),

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! It looks better.

Comment on lines 348 to 350
# TODO: It should be passed but got the error: External error: query failed: DataFusion error: Object Store error: Generic LocalFileSystem error: Requested range was invalid
# query TT
# select * from stored_table_with_cr_terminator;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I bet it is a bug and not related to this PR

What I think we should do is file a ticket tracking the issue and then merge this PR and work on figuring it out as a follow on.

@alamb
Copy link
Contributor

alamb commented Sep 5, 2024

Thank you @goldmedal for filing #12328 👍

@alamb alamb merged commit 008c942 into apache:main Sep 5, 2024
24 checks passed
@goldmedal goldmedal deleted the feature/12215-csv-terminator branch September 5, 2024 12:27
@goldmedal
Copy link
Contributor Author

Thanks @korowa and @alamb for reviewing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
common Related to common crate core Core DataFusion crate proto Related to proto crate sql SQL Planner sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support record delimiter ("terminator" in Arrow terms) for CSV reading
3 participants