Skip to content

Commit

Permalink
add test for configuring columns (#103)
Browse files Browse the repository at this point in the history
  • Loading branch information
rebasedming authored Aug 26, 2024
1 parent be9e230 commit 1a7381f
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 5 deletions.
10 changes: 9 additions & 1 deletion src/duckdb/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ pub enum CsvOption {
Quote,
#[strum(serialize = "sample_size")]
SampleSize,
#[strum(serialize = "select")]
Select,
#[strum(serialize = "sep")]
Sep,
#[strum(serialize = "skip")]
Expand Down Expand Up @@ -122,6 +124,7 @@ impl CsvOption {
Self::PreserveCasing => false,
Self::Quote => false,
Self::SampleSize => false,
Self::Select => false,
Self::Sep => false,
Self::Skip => false,
Self::Timestampformat => false,
Expand Down Expand Up @@ -305,7 +308,12 @@ pub fn create_view(
.collect::<Vec<String>>()
.join(", ");

Ok(format!("CREATE VIEW IF NOT EXISTS {schema_name}.{table_name} AS SELECT * FROM read_csv({create_csv_str})"))
let default_select = "*".to_string();
let select = table_options
.get(CsvOption::Select.as_ref())
.unwrap_or(&default_select);

Ok(format!("CREATE VIEW IF NOT EXISTS {schema_name}.{table_name} AS SELECT {select} FROM read_csv({create_csv_str})"))
}

#[cfg(test)]
Expand Down
10 changes: 9 additions & 1 deletion src/duckdb/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@ pub enum DeltaOption {
Files,
#[strum(serialize = "preserve_casing")]
PreserveCasing,
#[strum(serialize = "select")]
Select,
}

impl DeltaOption {
pub fn is_required(&self) -> bool {
match self {
Self::Files => true,
Self::PreserveCasing => false,
Self::Select => false,
}
}
}
Expand All @@ -48,8 +51,13 @@ pub fn create_view(
.ok_or_else(|| anyhow!("files option is required"))?
);

let default_select = "*".to_string();
let select = table_options
.get(DeltaOption::Select.as_ref())
.unwrap_or(&default_select);

Ok(format!(
"CREATE VIEW IF NOT EXISTS {schema_name}.{table_name} AS SELECT * FROM delta_scan({files})"
"CREATE VIEW IF NOT EXISTS {schema_name}.{table_name} AS SELECT {select} FROM delta_scan({files})"
))
}

Expand Down
10 changes: 9 additions & 1 deletion src/duckdb/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub enum IcebergOption {
Files,
#[strum(serialize = "preserve_casing")]
PreserveCasing,
#[strum(serialize = "select")]
Select,
}

impl IcebergOption {
Expand All @@ -35,6 +37,7 @@ impl IcebergOption {
Self::AllowMovedPaths => false,
Self::Files => true,
Self::PreserveCasing => false,
Self::Select => false,
}
}
}
Expand All @@ -61,7 +64,12 @@ pub fn create_view(
.collect::<Vec<String>>()
.join(", ");

Ok(format!("CREATE VIEW IF NOT EXISTS {schema_name}.{table_name} AS SELECT * FROM iceberg_scan({create_iceberg_str})"))
let default_select = "*".to_string();
let select = table_options
.get(IcebergOption::Select.as_ref())
.unwrap_or(&default_select);

Ok(format!("CREATE VIEW IF NOT EXISTS {schema_name}.{table_name} AS SELECT {select} FROM iceberg_scan({create_iceberg_str})"))
}

#[cfg(test)]
Expand Down
10 changes: 9 additions & 1 deletion src/duckdb/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ pub enum ParquetOption {
PreserveCasing,
#[strum(serialize = "union_by_name")]
UnionByName,
#[strum(serialize = "select")]
Select,
// TODO: EncryptionConfig
}

Expand All @@ -55,6 +57,7 @@ impl ParquetOption {
Self::HiveTypes => false,
Self::HiveTypesAutocast => false,
Self::PreserveCasing => false,
Self::Select => false,
Self::UnionByName => false,
}
}
Expand Down Expand Up @@ -114,7 +117,12 @@ pub fn create_view(
.collect::<Vec<String>>()
.join(", ");

Ok(format!("CREATE VIEW IF NOT EXISTS {schema_name}.{table_name} AS SELECT * FROM read_parquet({create_parquet_str})"))
let default_select = "*".to_string();
let select = table_options
.get(ParquetOption::Select.as_ref())
.unwrap_or(&default_select);

Ok(format!("CREATE VIEW IF NOT EXISTS {schema_name}.{table_name} AS SELECT {select} FROM read_parquet({create_parquet_str})"))
}

#[cfg(test)]
Expand Down
31 changes: 30 additions & 1 deletion tests/table_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ async fn test_table_with_custom_schema(mut conn: PgConnection, tempdir: TempDir)
// test non-quoted schema name
"CREATE SCHEMA MY_SCHEMA".to_string().execute(&mut conn);
match
format!( "CREATE FOREIGN TABLE MY_SCHEMA.MyTable () SERVER parquet_server OPTIONS (files '{}', preserve_casing 'true')",
format!("CREATE FOREIGN TABLE MY_SCHEMA.MyTable () SERVER parquet_server OPTIONS (files '{}', preserve_casing 'true')",
parquet_path.to_str().unwrap()
).execute_result(&mut conn) {
Ok(_) => {}
Expand All @@ -219,3 +219,32 @@ async fn test_table_with_custom_schema(mut conn: PgConnection, tempdir: TempDir)

Ok(())
}

#[rstest]
async fn test_configure_columns(mut conn: PgConnection, tempdir: TempDir) -> Result<()> {
let stored_batch = primitive_record_batch()?;
let parquet_path = tempdir.path().join("test_arrow_types.parquet");

let parquet_file = File::create(&parquet_path)?;

let mut writer = ArrowWriter::try_new(parquet_file, stored_batch.schema(), None).unwrap();
writer.write(&stored_batch)?;
writer.close()?;

primitive_setup_fdw_local_file_listing(
parquet_path.as_path().to_str().unwrap(),
"primitive_table",
)
.execute(&mut conn);

format!(
r#"CREATE FOREIGN TABLE primitive () SERVER parquet_server OPTIONS (files '{}', select 'boolean_col AS bool_col, 2020 as year')"#,
parquet_path.to_str().unwrap()
).execute(&mut conn);

let retrieved_batch: Vec<(bool, i32)> =
"SELECT bool_col, year FROM primitive LIMIT 1".fetch(&mut conn);
assert_eq!(retrieved_batch, vec![(true, 2020)]);

Ok(())
}

0 comments on commit 1a7381f

Please sign in to comment.