Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-44345: [C++][Parquet] Fully support arrow decimal32/64 in Parquet #45351

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

curioustien
Copy link

@curioustien curioustien commented Jan 25, 2025

Rationale for this change

As described in #44345, the support for arrow decimal32/64 in Parquet is not there yet. This change fully supports arrow decimal32/64 in Parquet by doing the correct conversion between arrow decimal32/64 and Parquet decimal.

What changes are included in this PR?

A few changes in this PR:

  • Support correct schema conversion between Parquet and arrow decimal32/64/128/256
  • Support writing arrow decimal32/64 to Parquet
  • Support reading Parquet decimal to arrow decimal32/64
  • Enforce the right decimal conversion based on the precision value
  • Allow decimal32/64 in Arrow compute vector hash which is needed for some of the existing Parquet tests
  • Support converting pyarrow parquet decimal32/64 to pandas

Are these changes tested?

Yes

Are there any user-facing changes?

Yes, after this change, any decimals in Parquet will be converted to the corresponding arrow decimal type based on the precision

Copy link
Member

@mapleFU mapleFU left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I think the legacy Decimal128/Decimal256 write data behavior should not be change

if (maybe_type.ok()) {
arrow_type = maybe_type.ValueOrDie();

if (precision <= Decimal32Type::kMaxPrecision) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment that the literal would can be cast to the correspond type if the real reader type is a wider decimal type?

Comment on lines -2051 to -2052
WRITE_SERIALIZE_CASE(DECIMAL128, Decimal128Type, Int32Type)
WRITE_SERIALIZE_CASE(DECIMAL256, Decimal256Type, Int32Type)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not

      WRITE_SERIALIZE_CASE(DECIMAL32, Decimal32Type, Int32Type)
      WRITE_SERIALIZE_CASE(DECIMAL64, Decimal64Type, Int32Type)
      WRITE_SERIALIZE_CASE(DECIMAL128, Decimal128Type, Int32Type)
      WRITE_SERIALIZE_CASE(DECIMAL256, Decimal256Type, Int32Type)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we don't need WRITE_SERIALIZE_CASE(DECIMAL64, Decimal64Type, Int32Type)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we don't need

I think we need, decimal64 is just decimal type but not limit the precision ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to this but suspicious of the value of it.

@@ -2220,8 +2229,7 @@ Status TypedColumnWriterImpl<Int64Type>::WriteArrowDense(
WRITE_SERIALIZE_CASE(UINT64, UInt64Type, Int64Type)
WRITE_ZERO_COPY_CASE(TIME64, Time64Type, Int64Type)
WRITE_ZERO_COPY_CASE(DURATION, DurationType, Int64Type)
WRITE_SERIALIZE_CASE(DECIMAL128, Decimal128Type, Int64Type)
WRITE_SERIALIZE_CASE(DECIMAL256, Decimal256Type, Int64Type)
WRITE_SERIALIZE_CASE(DECIMAL64, Decimal64Type, Int64Type)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Comment on lines +2394 to +2395
scratch_i32 = reinterpret_cast<int32_t*>(scratch_buffer->mutable_data());
scratch_i64 = reinterpret_cast<int64_t*>(scratch_buffer->mutable_data());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why split this into two parts?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how to handle this code in a clean way. However, the main reason why I had to do this was because of the int64_t* scratch pointer we're using. IIUC, the current code constructs the decimal endian array using this scratch space. The scratch pointer moves along the memory address space to do this construction.

If you see the current logic, it looks at the byte_width to determine how many address spaces we need to use from the input to construct the decimal. The int64_t* scratch pointer works for decimal64, decimal128, and decimal256. However, it doesn't work for decimal32 because it uses 32-bit address space, so I had to create another pointer with int32_t*.

I may misunderstand how this code works, so feel free to correct me here

@@ -364,9 +364,9 @@ def test_byte_stream_split():

def test_store_decimal_as_integer(tempdir):
arr_decimal_1_9 = pa.array(list(map(Decimal, range(100))),
type=pa.decimal128(5, 2))
type=pa.decimal32(5, 2))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should not be changed, instead we should add new tests here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little tricky to keep the test to be the same if we don't cast the type to a wider decimal. On the writer side, we can keep the same behavior from arrow to Parquet with additional support for decimal32/64.

However, on the reader side from Parquet to arrow, the Parquet decimal format only contains precision and scale without any knowledge of different arrow types (which is the correct behavior here). Therefore, in order to do the conversion, we look at the precision to convert it to either decimal32/64/128/256 correspondingly.

For this test which does a round trip for both writing to parquet and reading from parquet, the correct end result should be decimal32 when we read the data. I can modify this test case to cast the return decimal to a wider decimal if that's what you meant.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So can we provide a type function, and test all possible types here?

@@ -555,6 +555,7 @@ KernelInit GetHashInit(Type::type type_id) {
case Type::DATE32:
case Type::TIME32:
case Type::INTERVAL_MONTHS:
case Type::DECIMAL32:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the changes required to the compute kernels required to support Parquet? I can't see why but I might be missing something. Otherwise, we should move adding support for decimal32 and decimal64 to those compute kernels on a different PR and leave this one only with the required parquet changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I see now, on the description says this is required for some tests:
Allow decimal32/64 in Arrow compute vector hash which is needed for some of the existing Parquet tests

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm down to split this change to another PR which can cover this support with more tests on the arrow compute side. But yes, there are a few tests in Parquet that hit arrow vector kernel code path

@curioustien
Copy link
Author

Quick update:
Finding a good way to convert these decimal types and getting all the test passes take longer than I thought. Probably need a few more days

@curioustien
Copy link
Author

@wgtmac @mapleFU I'm facing an implementation blocker on keeping the same writing/reading behaviors for decimal128/256 between Arrow and Parquet because I can't find a clean way to do it. As I mentioned in this previous comment #45351 (comment) on my original implementation, the current Parquet reading logic looks at the decimal precision to determine how to convert Parquet decimal logical type to Arrow decimal type. Since we introduced decimal32/64 in arrow, I had to change this logic to include these types based on the precision.

Therefore, whenever we want to cast a decimal32 to decimal128, we need to force the schema to convert to a bigger decimal. I found this arrow Field.MergeWith method that could do the job (which I used it in one of the schema tests here). However, when I moved on to the reader/writer tests, I found that these schema fields can only be accessed from manifest within FileReader. Though, it's read only. Therefore, if I want to force schema conversion, I'll have to either:

  1. Change this manifest() method so that we can manipulate the schema fields
  2. Add a new property in ArrowReaderProperties so that we can propagate this schema conversion logic

I can't really come up with other options here. Both of these options require changes in some important classes, so I want to get some alignments before I proceed. Option 2 probably makes the most sense here. I also need to propagate this new property to pyarrow as well if we want to have this same behavior in python.

What are your thoughts here on this problem? Any other alternatives that I should try?

@mapleFU
Copy link
Member

mapleFU commented Feb 6, 2025

This is really a problem. Currently arrow has an option store_schema(), which stores the arrow schema in Parquet file

what about the new written decimal32/decimal64 might be readed as decimal32/decimal64, and legacy code goes the legacy way?

@curioustien
Copy link
Author

@mapleFU Thanks for the pointer on store_schema(). I think we can leverage this option.

what about the new written decimal32/decimal64 might be readed as decimal32/decimal64, and legacy code goes the legacy way?

I'm quite confused about this comment. Could you elaborate? Did you mean that if store_schema() is enabled then we'll convert things correctly? Otherwise, we keep the legacy code and always convert to either decimal128 or decimal256? That kinda defeats the purpose of having decimal32 and decimal64 in arrow though unless the users know how to specify this store_schema() flag

@wgtmac
Copy link
Member

wgtmac commented Feb 7, 2025

If store_schema() is enabled, reading the Parquet file should just use the restored Arrow type. This is simple. However, if it is not used, I prefer to add a new option to ArrowReaderProperties to advise the reader that we need to use decimal type created by smallest_decimal(int32_t precision, int32_t scale).

@curioustien
Copy link
Author

If store_schema() is enabled, reading the Parquet file should just use the restored Arrow type. This is simple. However, if it is not used, I prefer to add a new option to ArrowReaderProperties to advise the reader that we need to use decimal type created by smallest_decimal(int32_t precision, int32_t scale).

This makes sense to me. I'll proceed with this implementation then. Thanks for the discussion

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants