Skip to content

Commit

Permalink
Defer file creation to write (#8539)
Browse files Browse the repository at this point in the history
* Defer file creation to write

* Format

* Remove INSERT_MODE

* Format

* Add ticket link
  • Loading branch information
tustvold authored Dec 14, 2023
1 parent 1042095 commit a971f1e
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 33 deletions.
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ impl ListingTableUrl {
/// Get object store for specified input_url
/// if input_url is actually not a url, we assume it is a local file path
/// if we have a local path, create it if not exists so ListingTableUrl::parse works
#[deprecated(note = "Use parse")]
pub fn parse_create_local_if_not_exists(
s: impl AsRef<str>,
is_directory: bool,
Expand Down
17 changes: 5 additions & 12 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,19 +148,18 @@ impl TableProviderFactory for ListingTableFactory {
.unwrap_or(false)
};

let create_local_path = statement_options
.take_bool_option("create_local_path")?
.unwrap_or(false);
let single_file = statement_options
.take_bool_option("single_file")?
.unwrap_or(false);

// Backwards compatibility
// Backwards compatibility (#8547)
if let Some(s) = statement_options.take_str_option("insert_mode") {
if !s.eq_ignore_ascii_case("append_new_files") {
return plan_err!("Unknown or unsupported insert mode {s}. Only append_to_file supported");
return plan_err!("Unknown or unsupported insert mode {s}. Only append_new_files supported");
}
}
statement_options.take_bool_option("create_local_path")?;

let file_type = file_format.file_type();

// Use remaining options and session state to build FileTypeWriterOptions
Expand Down Expand Up @@ -199,13 +198,7 @@ impl TableProviderFactory for ListingTableFactory {
FileType::AVRO => file_type_writer_options,
};

let table_path = match create_local_path {
true => ListingTableUrl::parse_create_local_if_not_exists(
&cmd.location,
!single_file,
),
false => ListingTableUrl::parse(&cmd.location),
}?;
let table_path = ListingTableUrl::parse(&cmd.location)?;

let options = ListingOptions::new(file_format)
.with_collect_stat(state.config().collect_statistics())
Expand Down
10 changes: 8 additions & 2 deletions datafusion/core/src/datasource/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,21 @@ impl StreamConfig {
match &self.encoding {
StreamEncoding::Csv => {
let header = self.header && !self.location.exists();
let file = OpenOptions::new().append(true).open(&self.location)?;
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.location)?;
let writer = arrow::csv::WriterBuilder::new()
.with_header(header)
.build(file);

Ok(Box::new(writer))
}
StreamEncoding::Json => {
let file = OpenOptions::new().append(true).open(&self.location)?;
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.location)?;
Ok(Box::new(arrow::json::LineDelimitedWriter::new(file)))
}
}
Expand Down
6 changes: 1 addition & 5 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,11 +571,7 @@ impl DefaultPhysicalPlanner {
copy_options,
}) => {
let input_exec = self.create_initial_plan(input, session_state).await?;

// TODO: make this behavior configurable via options (should copy to create path/file as needed?)
// TODO: add additional configurable options for if existing files should be overwritten or
// appended to
let parsed_url = ListingTableUrl::parse_create_local_if_not_exists(output_url, !*single_file_output)?;
let parsed_url = ListingTableUrl::parse(output_url)?;
let object_store_url = parsed_url.object_store();

let schema: Schema = (**input.schema()).clone().into();
Expand Down
18 changes: 9 additions & 9 deletions datafusion/sqllogictest/test_files/insert_to_external.slt
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ CREATE EXTERNAL TABLE dictionary_encoded_parquet_partitioned(
b varchar,
)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/parquet_types_partitioned'
LOCATION 'test_files/scratch/insert_to_external/parquet_types_partitioned/'
PARTITIONED BY (b)
OPTIONS(
create_local_path 'true',
Expand Down Expand Up @@ -292,7 +292,7 @@ statement ok
CREATE EXTERNAL TABLE
directory_test(a bigint, b bigint)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q0'
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q0/'
OPTIONS(
create_local_path 'true',
);
Expand All @@ -312,7 +312,7 @@ statement ok
CREATE EXTERNAL TABLE
table_without_values(field1 BIGINT NULL, field2 BIGINT NULL)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q1'
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q1/'
OPTIONS (create_local_path 'true');

query TT
Expand Down Expand Up @@ -378,7 +378,7 @@ statement ok
CREATE EXTERNAL TABLE
table_without_values(field1 BIGINT NULL, field2 BIGINT NULL)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q2'
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q2/'
OPTIONS (create_local_path 'true');

query TT
Expand Down Expand Up @@ -423,7 +423,7 @@ statement ok
CREATE EXTERNAL TABLE
table_without_values(c1 varchar NULL)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q3'
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q3/'
OPTIONS (create_local_path 'true');

# verify that the sort order of the insert query is maintained into the
Expand Down Expand Up @@ -462,7 +462,7 @@ statement ok
CREATE EXTERNAL TABLE
table_without_values(id BIGINT, name varchar)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q4'
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q4/'
OPTIONS (create_local_path 'true');

query IT
Expand Down Expand Up @@ -505,7 +505,7 @@ statement ok
CREATE EXTERNAL TABLE
table_without_values(field1 BIGINT NOT NULL, field2 BIGINT NULL)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q5'
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q5/'
OPTIONS (create_local_path 'true');

query II
Expand Down Expand Up @@ -555,7 +555,7 @@ CREATE EXTERNAL TABLE test_column_defaults(
d text default lower('DEFAULT_TEXT'),
e timestamp default now()
) STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q6'
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q6/'
OPTIONS (create_local_path 'true');

# fill in all column values
Expand Down Expand Up @@ -608,5 +608,5 @@ CREATE EXTERNAL TABLE test_column_defaults(
a int,
b int default a+1
) STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q7'
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q7/'
OPTIONS (create_local_path 'true');
8 changes: 3 additions & 5 deletions docs/source/user-guide/sql/write_options.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,9 @@ The following special options are specific to the `COPY` command.

The following special options are specific to creating an external table.

| Option | Description | Default Value |
| ----------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | ---------------------------------------------------------------------------- |
| SINGLE_FILE | If true, indicates that this external table is backed by a single file. INSERT INTO queries will append to this file. | false |
| CREATE_LOCAL_PATH | If true, the folder or file backing this table will be created on the local file system if it does not already exist when running INSERT INTO queries. | false |
| INSERT_MODE | Determines if INSERT INTO queries should append to existing files or append new files to an existing directory. Valid values are append_to_file, append_new_files, and error. Note that "error" will block inserting data into this table. | CSV and JSON default to append_to_file. Parquet defaults to append_new_files |
| Option | Description | Default Value |
| ----------- | --------------------------------------------------------------------------------------------------------------------- | ------------- |
| SINGLE_FILE | If true, indicates that this external table is backed by a single file. INSERT INTO queries will append to this file. | false |

### JSON Format Specific Options

Expand Down

0 comments on commit a971f1e

Please sign in to comment.