Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

By default match fields by position #80

Merged
merged 2 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ Alternatively, you can use the following environment variables when starting pos

`pg_parquet` supports the following options in the `COPY FROM` command:
- `format parquet`: you need to specify this option to read or write Parquet files which does not end with `.parquet[.<compression>]` extension,
- `match_by <string>`: method to match Parquet file fields to PostgreSQL table columns. The available methods are `position` and `name`. The default method is `position`. You can set it to `name` to match the columns by their name rather than by their position in the schema (default). Match by `name` is useful when field order differs between the Parquet file and the table, but their names match.

## Configuration
There is currently only one GUC parameter to enable/disable the `pg_parquet`:
Expand Down
1 change: 1 addition & 0 deletions src/arrow_parquet.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub(crate) mod arrow_to_pg;
pub(crate) mod arrow_utils;
pub(crate) mod compression;
pub(crate) mod match_by;
pub(crate) mod parquet_reader;
pub(crate) mod parquet_writer;
pub(crate) mod pg_to_arrow;
Expand Down
2 changes: 1 addition & 1 deletion src/arrow_parquet/arrow_to_pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl ArrowToPgAttributeContext {
};

let attributes =
collect_attributes_for(CollectAttributesFor::Struct, attribute_tupledesc);
collect_attributes_for(CollectAttributesFor::Other, attribute_tupledesc);

// we only cast the top-level attributes, which already covers the nested attributes
let cast_to_types = None;
Expand Down
20 changes: 20 additions & 0 deletions src/arrow_parquet/match_by.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use std::str::FromStr;

#[derive(Debug, Clone, Copy, Default, PartialEq)]
pub(crate) enum MatchBy {
#[default]
Position,
Name,
}

impl FromStr for MatchBy {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"position" => Ok(MatchBy::Position),
"name" => Ok(MatchBy::Name),
_ => Err(format!("unrecognized match_by method: {}", s)),
}
}
}
37 changes: 29 additions & 8 deletions src/arrow_parquet/parquet_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@ use url::Url;

use crate::{
arrow_parquet::{
arrow_to_pg::to_pg_datum, schema_parser::parquet_schema_string_from_attributes,
arrow_to_pg::to_pg_datum,
schema_parser::{
error_if_copy_from_match_by_position_with_generated_columns,
parquet_schema_string_from_attributes,
},
},
pgrx_utils::{collect_attributes_for, CollectAttributesFor},
type_compat::{geometry::reset_postgis_context, map::reset_map_context},
};

use super::{
arrow_to_pg::{collect_arrow_to_pg_attribute_contexts, ArrowToPgAttributeContext},
match_by::MatchBy,
schema_parser::{
ensure_file_schema_match_tupledesc_schema, parse_arrow_schema_from_attributes,
},
Expand All @@ -38,15 +43,18 @@ pub(crate) struct ParquetReaderContext {
parquet_reader: ParquetRecordBatchStream<ParquetObjectReader>,
attribute_contexts: Vec<ArrowToPgAttributeContext>,
binary_out_funcs: Vec<PgBox<FmgrInfo>>,
match_by: MatchBy,
}

impl ParquetReaderContext {
pub(crate) fn new(uri: Url, tupledesc: &PgTupleDesc) -> Self {
pub(crate) fn new(uri: Url, match_by: MatchBy, tupledesc: &PgTupleDesc) -> Self {
// Postgis and Map contexts are used throughout reading the parquet file.
// We need to reset them to avoid reading the stale data. (e.g. extension could be dropped)
reset_postgis_context();
reset_map_context();

error_if_copy_from_match_by_position_with_generated_columns(tupledesc, match_by);

let parquet_reader = parquet_reader_from_uri(&uri);

let parquet_file_schema = parquet_reader.schema();
Expand All @@ -69,6 +77,7 @@ impl ParquetReaderContext {
parquet_file_schema.clone(),
tupledesc_schema.clone(),
&attributes,
match_by,
);

let attribute_contexts = collect_arrow_to_pg_attribute_contexts(
Expand All @@ -85,6 +94,7 @@ impl ParquetReaderContext {
attribute_contexts,
parquet_reader,
binary_out_funcs,
match_by,
started: false,
finished: false,
}
Expand Down Expand Up @@ -116,15 +126,23 @@ impl ParquetReaderContext {
fn record_batch_to_tuple_datums(
record_batch: RecordBatch,
attribute_contexts: &[ArrowToPgAttributeContext],
match_by: MatchBy,
) -> Vec<Option<Datum>> {
let mut datums = vec![];

for attribute_context in attribute_contexts {
for (attribute_idx, attribute_context) in attribute_contexts.iter().enumerate() {
let name = attribute_context.name();

let column_array = record_batch
.column_by_name(name)
.unwrap_or_else(|| panic!("column {} not found", name));
let column_array = match match_by {
MatchBy::Position => record_batch
.columns()
.get(attribute_idx)
.unwrap_or_else(|| panic!("column {} not found", name)),

MatchBy::Name => record_batch
.column_by_name(name)
.unwrap_or_else(|| panic!("column {} not found", name)),
};

let datum = if attribute_context.needs_cast() {
// should fail instead of returning None if the cast fails at runtime
Expand Down Expand Up @@ -181,8 +199,11 @@ impl ParquetReaderContext {
self.buffer.extend_from_slice(&attnum_len_bytes);

// convert the columnar arrays in record batch to tuple datums
let tuple_datums =
Self::record_batch_to_tuple_datums(record_batch, &self.attribute_contexts);
let tuple_datums = Self::record_batch_to_tuple_datums(
record_batch,
&self.attribute_contexts,
self.match_by,
);

// write the tuple datums to the ParquetReader's internal buffer in PG copy format
for (datum, out_func) in tuple_datums.into_iter().zip(self.binary_out_funcs.iter())
Expand Down
2 changes: 1 addition & 1 deletion src/arrow_parquet/pg_to_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl PgToArrowAttributeContext {
};

let attributes =
collect_attributes_for(CollectAttributesFor::Struct, &attribute_tupledesc);
collect_attributes_for(CollectAttributesFor::Other, &attribute_tupledesc);

collect_pg_to_arrow_attribute_contexts(&attributes, &fields)
});
Expand Down
65 changes: 56 additions & 9 deletions src/arrow_parquet/schema_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use pgrx::{check_for_interrupts, prelude::*, PgTupleDesc};
use crate::{
pgrx_utils::{
array_element_typoid, collect_attributes_for, domain_array_base_elem_typoid, is_array_type,
is_composite_type, tuple_desc, CollectAttributesFor,
is_composite_type, is_generated_attribute, tuple_desc, CollectAttributesFor,
},
type_compat::{
geometry::is_postgis_geometry_type,
Expand All @@ -27,6 +27,8 @@ use crate::{
},
};

use super::match_by::MatchBy;

pub(crate) fn parquet_schema_string_from_attributes(
attributes: &[FormData_pg_attribute],
) -> String {
Expand Down Expand Up @@ -95,7 +97,7 @@ fn parse_struct_schema(tupledesc: PgTupleDesc, elem_name: &str, field_id: &mut i

let mut child_fields: Vec<Arc<Field>> = vec![];

let attributes = collect_attributes_for(CollectAttributesFor::Struct, &tupledesc);
let attributes = collect_attributes_for(CollectAttributesFor::Other, &tupledesc);

for attribute in attributes {
if attribute.is_dropped() {
Expand Down Expand Up @@ -342,28 +344,73 @@ fn adjust_map_entries_field(field: FieldRef) -> FieldRef {
Arc::new(entries_field)
}

pub(crate) fn error_if_copy_from_match_by_position_with_generated_columns(
tupledesc: &PgTupleDesc,
match_by: MatchBy,
) {
// match_by 'name' can handle generated columns
if let MatchBy::Name = match_by {
return;
}

let attributes = collect_attributes_for(CollectAttributesFor::Other, tupledesc);

for attribute in attributes {
if is_generated_attribute(&attribute) {
ereport!(
PgLogLevel::ERROR,
PgSqlErrorCode::ERRCODE_FEATURE_NOT_SUPPORTED,
"COPY FROM parquet with generated columns is not supported",
"Try COPY FROM parquet WITH (match_by 'name'). \"
It works only if the column names match with parquet file's.",
);
}
}
}

// ensure_file_schema_match_tupledesc_schema throws an error if the file's schema does not match the table schema.
// If the file's arrow schema is castable to the table's arrow schema, it returns a vector of Option<DataType>
// to cast to for each field.
pub(crate) fn ensure_file_schema_match_tupledesc_schema(
file_schema: Arc<Schema>,
tupledesc_schema: Arc<Schema>,
attributes: &[FormData_pg_attribute],
match_by: MatchBy,
) -> Vec<Option<DataType>> {
let mut cast_to_types = Vec::new();

if match_by == MatchBy::Position
&& tupledesc_schema.fields().len() != file_schema.fields().len()
{
panic!(
"column count mismatch between table and parquet file. \
parquet file has {} columns, but table has {} columns",
file_schema.fields().len(),
tupledesc_schema.fields().len()
);
}

for (tupledesc_schema_field, attribute) in
tupledesc_schema.fields().iter().zip(attributes.iter())
{
let field_name = tupledesc_schema_field.name();

let file_schema_field = file_schema.column_with_name(field_name);
let file_schema_field = match match_by {
MatchBy::Position => file_schema.field(attribute.attnum as usize - 1),

if file_schema_field.is_none() {
panic!("column \"{}\" is not found in parquet file", field_name);
}
MatchBy::Name => {
let file_schema_field = file_schema.column_with_name(field_name);

if file_schema_field.is_none() {
panic!("column \"{}\" is not found in parquet file", field_name);
}

let (_, file_schema_field) = file_schema_field.unwrap();

file_schema_field
}
};

let (_, file_schema_field) = file_schema_field.unwrap();
let file_schema_field = Arc::new(file_schema_field.clone());

let from_type = file_schema_field.data_type();
Expand All @@ -378,7 +425,7 @@ pub(crate) fn ensure_file_schema_match_tupledesc_schema(
if !is_coercible(from_type, to_type, attribute.atttypid, attribute.atttypmod) {
panic!(
"type mismatch for column \"{}\" between table and parquet file.\n\n\
table has \"{}\"\n\nparquet file has \"{}\"",
table has \"{}\"\n\nparquet file has \"{}\"",
field_name, to_type, from_type
);
}
Expand Down Expand Up @@ -413,7 +460,7 @@ fn is_coercible(from_type: &DataType, to_type: &DataType, to_typoid: Oid, to_typ

let tupledesc = tuple_desc(to_typoid, to_typmod);

let attributes = collect_attributes_for(CollectAttributesFor::Struct, &tupledesc);
let attributes = collect_attributes_for(CollectAttributesFor::Other, &tupledesc);

for (from_field, (to_field, to_attribute)) in from_fields
.iter()
Expand Down
8 changes: 5 additions & 3 deletions src/parquet_copy_hook/copy_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use crate::{
};

use super::copy_utils::{
copy_stmt_attribute_list, copy_stmt_create_namespace_item, copy_stmt_create_parse_state,
create_filtered_tupledesc_for_relation,
copy_from_stmt_match_by, copy_stmt_attribute_list, copy_stmt_create_namespace_item,
copy_stmt_create_parse_state, create_filtered_tupledesc_for_relation,
};

// stack to store parquet reader contexts for COPY FROM.
Expand Down Expand Up @@ -131,9 +131,11 @@ pub(crate) fn execute_copy_from(

let tupledesc = create_filtered_tupledesc_for_relation(p_stmt, &relation);

let match_by = copy_from_stmt_match_by(p_stmt);

unsafe {
// parquet reader context is used throughout the COPY FROM operation.
let parquet_reader_context = ParquetReaderContext::new(uri, &tupledesc);
let parquet_reader_context = ParquetReaderContext::new(uri, match_by, &tupledesc);
push_parquet_reader_context(parquet_reader_context);

// makes sure to set binary format
Expand Down
21 changes: 20 additions & 1 deletion src/parquet_copy_hook/copy_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use url::Url;

use crate::arrow_parquet::{
compression::{all_supported_compressions, PgParquetCompression},
match_by::MatchBy,
parquet_writer::{DEFAULT_ROW_GROUP_SIZE, DEFAULT_ROW_GROUP_SIZE_BYTES},
uri_utils::parse_uri,
};
Expand Down Expand Up @@ -109,7 +110,7 @@ pub(crate) fn validate_copy_to_options(p_stmt: &PgBox<PlannedStmt>, uri: &Url) {
}

pub(crate) fn validate_copy_from_options(p_stmt: &PgBox<PlannedStmt>) {
validate_copy_option_names(p_stmt, &["format", "freeze"]);
validate_copy_option_names(p_stmt, &["format", "match_by", "freeze"]);

let format_option = copy_stmt_get_option(p_stmt, "format");

Expand Down Expand Up @@ -253,6 +254,24 @@ pub(crate) fn copy_from_stmt_create_option_list(p_stmt: &PgBox<PlannedStmt>) ->
new_copy_options
}

pub(crate) fn copy_from_stmt_match_by(p_stmt: &PgBox<PlannedStmt>) -> MatchBy {
let match_by_option = copy_stmt_get_option(p_stmt, "match_by");

if match_by_option.is_null() {
MatchBy::default()
} else {
let match_by = unsafe { defGetString(match_by_option.as_ptr()) };

let match_by = unsafe {
CStr::from_ptr(match_by)
.to_str()
.expect("match_by option is not a valid CString")
};

MatchBy::from_str(match_by).unwrap_or_else(|e| panic!("{}", e))
}
}

pub(crate) fn copy_stmt_get_option(
p_stmt: &PgBox<PlannedStmt>,
option_name: &str,
Expand Down
Loading