Skip to content

Commit

Permalink
Coerce types on read
Browse files Browse the repository at this point in the history
`COPY FROM parquet` is too strict when matching Postgres tupledesc schema to the schema from parquet file.
e.g. `INT32` type in the parquet schema cannot be read into a Postgres column with `int64` type.
We can avoid this situation by adding a `is_coercible(from_type, to_type)` check while matching the expected schema
from the parquet file.

With that we can coerce as shown below from parquet source type to Postgres destination types:
- INT16 => {int32, int64}
- INT32 => {int64}
- UINT16 => {int16, int32, int64}
- UINT32 => {int32, int64}
- UINT64 => {int64}
- FLOAT32 => {double}

As we use arrow as intermediate format, it might be the case that `LargeUtf8` or `LargeBinary` types are used by the external writer instead of `Utf8` and `Binary`.
That is why we also need to support below coercions for arrow source types:
- `Utf8 | LargeUtf8` => {text}
- `Binary | LargeBinary` => {bytea}

Closes #67.
  • Loading branch information
aykut-bozkurt committed Nov 11, 2024
1 parent 518a5ac commit ef6f07d
Show file tree
Hide file tree
Showing 12 changed files with 922 additions and 173 deletions.
475 changes: 312 additions & 163 deletions src/arrow_parquet/arrow_to_pg.rs

Large diffs are not rendered by default.

27 changes: 26 additions & 1 deletion src/arrow_parquet/arrow_to_pg/bytea.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use arrow::array::{Array, BinaryArray};
use arrow::array::{Array, BinaryArray, LargeBinaryArray};

use super::{ArrowArrayToPgType, ArrowToPgAttributeContext};

Expand All @@ -13,6 +13,16 @@ impl ArrowArrayToPgType<Vec<u8>> for BinaryArray {
}
}

impl ArrowArrayToPgType<Vec<u8>> for LargeBinaryArray {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<u8>> {
if self.is_null(0) {
None

Check warning on line 19 in src/arrow_parquet/arrow_to_pg/bytea.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/arrow_to_pg/bytea.rs#L19

Added line #L19 was not covered by tests
} else {
Some(self.value(0).to_vec())
}
}
}

// Bytea[]
impl ArrowArrayToPgType<Vec<Option<Vec<u8>>>> for BinaryArray {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<Vec<u8>>>> {
Expand All @@ -28,3 +38,18 @@ impl ArrowArrayToPgType<Vec<Option<Vec<u8>>>> for BinaryArray {
Some(vals)
}
}

impl ArrowArrayToPgType<Vec<Option<Vec<u8>>>> for LargeBinaryArray {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<Vec<u8>>>> {
let mut vals = vec![];
for val in self.iter() {
if let Some(val) = val {
vals.push(Some(val.to_vec()));
} else {
vals.push(None);
}

Check warning on line 50 in src/arrow_parquet/arrow_to_pg/bytea.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/arrow_to_pg/bytea.rs#L43-L50

Added lines #L43 - L50 were not covered by tests
}

Some(vals)
}

Check warning on line 54 in src/arrow_parquet/arrow_to_pg/bytea.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/arrow_to_pg/bytea.rs#L53-L54

Added lines #L53 - L54 were not covered by tests
}
28 changes: 27 additions & 1 deletion src/arrow_parquet/arrow_to_pg/char.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use arrow::array::{Array, StringArray};
use arrow::array::{Array, LargeStringArray, StringArray};

use super::{ArrowArrayToPgType, ArrowToPgAttributeContext};

Expand All @@ -15,6 +15,18 @@ impl ArrowArrayToPgType<i8> for StringArray {
}
}

impl ArrowArrayToPgType<i8> for LargeStringArray {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<i8> {
if self.is_null(0) {
None

Check warning on line 21 in src/arrow_parquet/arrow_to_pg/char.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/arrow_to_pg/char.rs#L19-L21

Added lines #L19 - L21 were not covered by tests
} else {
let val = self.value(0);
let val: i8 = val.chars().next().expect("unexpected ascii char") as i8;
Some(val)

Check warning on line 25 in src/arrow_parquet/arrow_to_pg/char.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/arrow_to_pg/char.rs#L23-L25

Added lines #L23 - L25 were not covered by tests
}
}

Check warning on line 27 in src/arrow_parquet/arrow_to_pg/char.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/arrow_to_pg/char.rs#L27

Added line #L27 was not covered by tests
}

// Char[]
impl ArrowArrayToPgType<Vec<Option<i8>>> for StringArray {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<i8>>> {
Expand All @@ -29,3 +41,17 @@ impl ArrowArrayToPgType<Vec<Option<i8>>> for StringArray {
Some(vals)
}
}

impl ArrowArrayToPgType<Vec<Option<i8>>> for LargeStringArray {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<i8>>> {
let mut vals = vec![];
for val in self.iter() {
let val = val.map(|val| {
let val: i8 = val.chars().next().expect("unexpected ascii char") as i8;
val
});
vals.push(val);
}
Some(vals)
}

Check warning on line 56 in src/arrow_parquet/arrow_to_pg/char.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/arrow_to_pg/char.rs#L46-L56

Added lines #L46 - L56 were not covered by tests
}
28 changes: 27 additions & 1 deletion src/arrow_parquet/arrow_to_pg/fallback_to_text.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use arrow::array::{Array, StringArray};
use arrow::array::{Array, LargeStringArray, StringArray};

use crate::type_compat::fallback_to_text::FallbackToText;

Expand All @@ -17,6 +17,18 @@ impl ArrowArrayToPgType<FallbackToText> for StringArray {
}
}

impl ArrowArrayToPgType<FallbackToText> for LargeStringArray {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<FallbackToText> {
if self.is_null(0) {
None

Check warning on line 23 in src/arrow_parquet/arrow_to_pg/fallback_to_text.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/arrow_to_pg/fallback_to_text.rs#L21-L23

Added lines #L21 - L23 were not covered by tests
} else {
let text_repr = self.value(0).to_string();
let val = FallbackToText(text_repr);
Some(val)

Check warning on line 27 in src/arrow_parquet/arrow_to_pg/fallback_to_text.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/arrow_to_pg/fallback_to_text.rs#L25-L27

Added lines #L25 - L27 were not covered by tests
}
}

Check warning on line 29 in src/arrow_parquet/arrow_to_pg/fallback_to_text.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/arrow_to_pg/fallback_to_text.rs#L29

Added line #L29 was not covered by tests
}

// Text[] representation of any type
impl ArrowArrayToPgType<Vec<Option<FallbackToText>>> for StringArray {
fn to_pg_type(
Expand All @@ -31,3 +43,17 @@ impl ArrowArrayToPgType<Vec<Option<FallbackToText>>> for StringArray {
Some(vals)
}
}

impl ArrowArrayToPgType<Vec<Option<FallbackToText>>> for LargeStringArray {
fn to_pg_type(
self,
_context: &ArrowToPgAttributeContext,
) -> Option<Vec<Option<FallbackToText>>> {
let mut vals = vec![];
for val in self.iter() {
let val = val.map(|val| FallbackToText(val.to_string()));
vals.push(val);
}
Some(vals)
}

Check warning on line 58 in src/arrow_parquet/arrow_to_pg/fallback_to_text.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/arrow_to_pg/fallback_to_text.rs#L48-L58

Added lines #L48 - L58 were not covered by tests
}
21 changes: 21 additions & 0 deletions src/arrow_parquet/arrow_to_pg/float4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@ impl ArrowArrayToPgType<f32> for Float32Array {
}
}

impl ArrowArrayToPgType<f64> for Float32Array {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<f64> {
if self.is_null(0) {
None

Check warning on line 20 in src/arrow_parquet/arrow_to_pg/float4.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/arrow_to_pg/float4.rs#L20

Added line #L20 was not covered by tests
} else {
let val = self.value(0) as _;
Some(val)
}
}
}

// Float4[]
impl ArrowArrayToPgType<Vec<Option<f32>>> for Float32Array {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<f32>>> {
Expand All @@ -24,3 +35,13 @@ impl ArrowArrayToPgType<Vec<Option<f32>>> for Float32Array {
Some(vals)
}
}

impl ArrowArrayToPgType<Vec<Option<f64>>> for Float32Array {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<f64>>> {
let mut vals = vec![];
for val in self.iter() {
vals.push(val.map(|val| val as _));
}
Some(vals)
}

Check warning on line 46 in src/arrow_parquet/arrow_to_pg/float4.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/arrow_to_pg/float4.rs#L40-L46

Added lines #L40 - L46 were not covered by tests
}
27 changes: 26 additions & 1 deletion src/arrow_parquet/arrow_to_pg/geometry.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use arrow::array::{Array, BinaryArray};
use arrow::array::{Array, BinaryArray, LargeBinaryArray};

use crate::type_compat::geometry::Geometry;

Expand All @@ -15,6 +15,16 @@ impl ArrowArrayToPgType<Geometry> for BinaryArray {
}
}

impl ArrowArrayToPgType<Geometry> for LargeBinaryArray {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Geometry> {
if self.is_null(0) {
None

Check warning on line 21 in src/arrow_parquet/arrow_to_pg/geometry.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/arrow_to_pg/geometry.rs#L19-L21

Added lines #L19 - L21 were not covered by tests
} else {
Some(self.value(0).to_vec().into())

Check warning on line 23 in src/arrow_parquet/arrow_to_pg/geometry.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/arrow_to_pg/geometry.rs#L23

Added line #L23 was not covered by tests
}
}

Check warning on line 25 in src/arrow_parquet/arrow_to_pg/geometry.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/arrow_to_pg/geometry.rs#L25

Added line #L25 was not covered by tests
}

// Geometry[]
impl ArrowArrayToPgType<Vec<Option<Geometry>>> for BinaryArray {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<Geometry>>> {
Expand All @@ -30,3 +40,18 @@ impl ArrowArrayToPgType<Vec<Option<Geometry>>> for BinaryArray {
Some(vals)
}
}

impl ArrowArrayToPgType<Vec<Option<Geometry>>> for LargeBinaryArray {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<Geometry>>> {
let mut vals = vec![];
for val in self.iter() {
if let Some(val) = val {
vals.push(Some(val.to_vec().into()));
} else {
vals.push(None);
}

Check warning on line 52 in src/arrow_parquet/arrow_to_pg/geometry.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/arrow_to_pg/geometry.rs#L45-L52

Added lines #L45 - L52 were not covered by tests
}

Some(vals)
}

Check warning on line 56 in src/arrow_parquet/arrow_to_pg/geometry.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/arrow_to_pg/geometry.rs#L55-L56

Added lines #L55 - L56 were not covered by tests
}
107 changes: 106 additions & 1 deletion src/arrow_parquet/arrow_to_pg/int2.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use arrow::array::{Array, Int16Array};
use arrow::array::{Array, Int16Array, UInt16Array};

use super::{ArrowArrayToPgType, ArrowToPgAttributeContext};

Expand All @@ -14,6 +14,61 @@ impl ArrowArrayToPgType<i16> for Int16Array {
}
}

impl ArrowArrayToPgType<i32> for Int16Array {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<i32> {
if self.is_null(0) {
None

Check warning on line 20 in src/arrow_parquet/arrow_to_pg/int2.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/arrow_to_pg/int2.rs#L20

Added line #L20 was not covered by tests
} else {
let val = self.value(0) as _;
Some(val)
}
}
}

impl ArrowArrayToPgType<i64> for Int16Array {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<i64> {
if self.is_null(0) {
None

Check warning on line 31 in src/arrow_parquet/arrow_to_pg/int2.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/arrow_to_pg/int2.rs#L31

Added line #L31 was not covered by tests
} else {
let val = self.value(0) as _;
Some(val)
}
}
}

impl ArrowArrayToPgType<i16> for UInt16Array {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<i16> {
if self.is_null(0) {
None

Check warning on line 42 in src/arrow_parquet/arrow_to_pg/int2.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/arrow_to_pg/int2.rs#L42

Added line #L42 was not covered by tests
} else {
let val = self.value(0) as _;
Some(val)
}
}
}

impl ArrowArrayToPgType<i32> for UInt16Array {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<i32> {
if self.is_null(0) {
None

Check warning on line 53 in src/arrow_parquet/arrow_to_pg/int2.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/arrow_to_pg/int2.rs#L53

Added line #L53 was not covered by tests
} else {
let val = self.value(0) as _;
Some(val)
}
}
}

impl ArrowArrayToPgType<i64> for UInt16Array {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<i64> {
if self.is_null(0) {
None

Check warning on line 64 in src/arrow_parquet/arrow_to_pg/int2.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/arrow_to_pg/int2.rs#L64

Added line #L64 was not covered by tests
} else {
let val = self.value(0) as _;
Some(val)
}
}
}

// Int2[]
impl ArrowArrayToPgType<Vec<Option<i16>>> for Int16Array {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<i16>>> {
Expand All @@ -24,3 +79,53 @@ impl ArrowArrayToPgType<Vec<Option<i16>>> for Int16Array {
Some(vals)
}
}

impl ArrowArrayToPgType<Vec<Option<i32>>> for Int16Array {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<i32>>> {
let mut vals = vec![];
for val in self.iter() {
vals.push(val.map(|val| val as _));
}
Some(vals)
}

Check warning on line 90 in src/arrow_parquet/arrow_to_pg/int2.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/arrow_to_pg/int2.rs#L84-L90

Added lines #L84 - L90 were not covered by tests
}

impl ArrowArrayToPgType<Vec<Option<i64>>> for Int16Array {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<i64>>> {
let mut vals = vec![];
for val in self.iter() {
vals.push(val.map(|val| val as _));
}
Some(vals)
}

Check warning on line 100 in src/arrow_parquet/arrow_to_pg/int2.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/arrow_to_pg/int2.rs#L94-L100

Added lines #L94 - L100 were not covered by tests
}

impl ArrowArrayToPgType<Vec<Option<i16>>> for UInt16Array {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<i16>>> {
let mut vals = vec![];
for val in self.iter() {
vals.push(val.map(|val| val as _));
}
Some(vals)
}

Check warning on line 110 in src/arrow_parquet/arrow_to_pg/int2.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/arrow_to_pg/int2.rs#L104-L110

Added lines #L104 - L110 were not covered by tests
}

impl ArrowArrayToPgType<Vec<Option<i32>>> for UInt16Array {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<i32>>> {
let mut vals = vec![];
for val in self.iter() {
vals.push(val.map(|val| val as _));
}
Some(vals)
}

Check warning on line 120 in src/arrow_parquet/arrow_to_pg/int2.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/arrow_to_pg/int2.rs#L114-L120

Added lines #L114 - L120 were not covered by tests
}

impl ArrowArrayToPgType<Vec<Option<i64>>> for UInt16Array {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<i64>>> {
let mut vals = vec![];
for val in self.iter() {
vals.push(val.map(|val| val as _));
}
Some(vals)
}

Check warning on line 130 in src/arrow_parquet/arrow_to_pg/int2.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/arrow_to_pg/int2.rs#L124-L130

Added lines #L124 - L130 were not covered by tests
}
Loading

0 comments on commit ef6f07d

Please sign in to comment.