From 83c8a021d0db11118b4afe229732d6f5b0e7dcd7 Mon Sep 17 00:00:00 2001 From: Tim Nguyen <6283718+curioustien@users.noreply.github.com> Date: Sun, 19 Jan 2025 12:34:43 -0500 Subject: [PATCH] Support decimal32/64 in reader & vector kernels & tests --- cpp/src/arrow/compute/kernels/vector_hash.cc | 4 +- .../arrow/compute/kernels/vector_selection.cc | 3 +- .../parquet/arrow/arrow_reader_writer_test.cc | 170 +++++++++++------- cpp/src/parquet/arrow/arrow_schema_test.cc | 2 +- cpp/src/parquet/arrow/reader_internal.cc | 122 +++++++++---- cpp/src/parquet/arrow/test_util.h | 130 +++++++++++++- cpp/src/parquet/column_writer.cc | 42 +++-- 7 files changed, 358 insertions(+), 115 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/vector_hash.cc b/cpp/src/arrow/compute/kernels/vector_hash.cc index 5067298858132..0d13316001906 100644 --- a/cpp/src/arrow/compute/kernels/vector_hash.cc +++ b/cpp/src/arrow/compute/kernels/vector_hash.cc @@ -555,6 +555,7 @@ KernelInit GetHashInit(Type::type type_id) { case Type::DATE32: case Type::TIME32: case Type::INTERVAL_MONTHS: + case Type::DECIMAL32: return HashInit>; case Type::INT64: case Type::UINT64: @@ -564,6 +565,7 @@ KernelInit GetHashInit(Type::type type_id) { case Type::TIMESTAMP: case Type::DURATION: case Type::INTERVAL_DAY_TIME: + case Type::DECIMAL64: return HashInit>; case Type::BINARY: case Type::STRING: @@ -707,7 +709,7 @@ void AddHashKernels(VectorFunction* func, VectorKernel base, OutputType out_ty) DCHECK_OK(func->AddKernel(base)); } - for (auto t : {Type::DECIMAL128, Type::DECIMAL256}) { + for (auto t : {Type::DECIMAL32, Type::DECIMAL64, Type::DECIMAL128, Type::DECIMAL256}) { base.init = GetHashInit(t); base.signature = KernelSignature::Make({t}, out_ty); DCHECK_OK(func->AddKernel(base)); diff --git a/cpp/src/arrow/compute/kernels/vector_selection.cc b/cpp/src/arrow/compute/kernels/vector_selection.cc index b265673e23c86..b902352ee51e8 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection.cc @@ -308,7 +308,8 @@ std::shared_ptr MakeIndicesNonZeroFunction(std::string name, AddKernels(NumericTypes()); AddKernels({boolean()}); - for (const auto& ty : {Type::DECIMAL128, Type::DECIMAL256}) { + for (const auto& ty : + {Type::DECIMAL32, Type::DECIMAL64, Type::DECIMAL128, Type::DECIMAL256}) { kernel.signature = KernelSignature::Make({ty}, uint64()); DCHECK_OK(func->AddKernel(kernel)); } diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 47a00016b94b0..c43385b1565fd 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -181,6 +181,14 @@ std::shared_ptr get_logical_type(const DataType& type) { static_cast(type); return get_logical_type(*dict_type.value_type()); } + case ArrowId::DECIMAL32: { + const auto& dec_type = static_cast(type); + return LogicalType::Decimal(dec_type.precision(), dec_type.scale()); + } + case ArrowId::DECIMAL64: { + const auto& dec_type = static_cast(type); + return LogicalType::Decimal(dec_type.precision(), dec_type.scale()); + } case ArrowId::DECIMAL128: { const auto& dec_type = static_cast(type); return LogicalType::Decimal(dec_type.precision(), dec_type.scale()); @@ -206,9 +214,11 @@ ParquetType::type get_physical_type(const DataType& type) { case ArrowId::INT16: case ArrowId::UINT32: case ArrowId::INT32: + case ArrowId::DECIMAL32: return ParquetType::INT32; case ArrowId::UINT64: case ArrowId::INT64: + case ArrowId::DECIMAL64: return ParquetType::INT64; case ArrowId::FLOAT: return ParquetType::FLOAT; @@ -533,6 +543,8 @@ static std::shared_ptr MakeSimpleSchema(const DataType& type, case ::arrow::Type::HALF_FLOAT: byte_width = sizeof(::arrow::HalfFloatType::c_type); break; + case ::arrow::Type::DECIMAL32: + case ::arrow::Type::DECIMAL64: case ::arrow::Type::DECIMAL128: case ::arrow::Type::DECIMAL256: { const auto& decimal_type = static_cast(values_type); @@ -548,6 +560,8 @@ static std::shared_ptr MakeSimpleSchema(const DataType& type, case ::arrow::Type::HALF_FLOAT: byte_width = sizeof(::arrow::HalfFloatType::c_type); break; + case ::arrow::Type::DECIMAL32: + case ::arrow::Type::DECIMAL64: case ::arrow::Type::DECIMAL128: case ::arrow::Type::DECIMAL256: { const auto& decimal_type = static_cast(type); @@ -783,21 +797,55 @@ class TestReadDecimals : public ParquetIOTestBase { // The Decimal roundtrip tests always go through the FixedLenByteArray path, // check the ByteArray case manually. -TEST_F(TestReadDecimals, Decimal128ByteArray) { +TEST_F(TestReadDecimals, Decimal32ByteArray) { const std::vector> big_endian_decimals = { // 123456 {1, 226, 64}, // 987654 {15, 18, 6}, // -123456 - {255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 254, 29, 192}, + {255, 254, 29, 192}, }; auto expected = - ArrayFromJSON(::arrow::decimal128(6, 3), R"(["123.456", "987.654", "-123.456"])"); + ArrayFromJSON(::arrow::decimal32(6, 3), R"(["123.456", "987.654", "-123.456"])"); CheckReadFromByteArrays(LogicalType::Decimal(6, 3), big_endian_decimals, *expected); } +TEST_F(TestReadDecimals, Decimal64ByteArray) { + const std::vector> big_endian_decimals = { + // 123456 + {1, 226, 64}, + // 987654 + {15, 18, 6}, + // -123456 + {255, 254, 29, 192}, + // -123456 + {255, 255, 255, 255, 255, 254, 29, 192}, + }; + + auto expected = ArrayFromJSON(::arrow::decimal64(16, 3), + R"(["123.456", "987.654", "-123.456", "-123.456"])"); + CheckReadFromByteArrays(LogicalType::Decimal(16, 3), big_endian_decimals, *expected); +} + +TEST_F(TestReadDecimals, Decimal128ByteArray) { + const std::vector> big_endian_decimals = { + // 123456 + {1, 226, 64}, + // 987654 + {15, 18, 6}, + // -123456 + {255, 254, 29, 192}, + // -123456 + {255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 254, 29, 192}, + }; + + auto expected = ArrayFromJSON(::arrow::decimal128(20, 3), + R"(["123.456", "987.654", "-123.456", "-123.456"])"); + CheckReadFromByteArrays(LogicalType::Decimal(20, 3), big_endian_decimals, *expected); +} + TEST_F(TestReadDecimals, Decimal256ByteArray) { const std::vector> big_endian_decimals = { // 123456 @@ -805,12 +853,14 @@ TEST_F(TestReadDecimals, Decimal256ByteArray) { // 987654 {15, 18, 6}, // -123456 + {255, 254, 29, 192}, + // -123456 {255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 254, 29, 192}, }; - auto expected = - ArrayFromJSON(::arrow::decimal256(40, 3), R"(["123.456", "987.654", "-123.456"])"); + auto expected = ArrayFromJSON(::arrow::decimal256(40, 3), + R"(["123.456", "987.654", "-123.456", "-123.456"])"); CheckReadFromByteArrays(LogicalType::Decimal(40, 3), big_endian_decimals, *expected); } @@ -858,9 +908,9 @@ typedef ::testing::Types< ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::Date32Type, ::arrow::FloatType, ::arrow::DoubleType, ::arrow::StringType, ::arrow::BinaryType, ::arrow::FixedSizeBinaryType, ::arrow::HalfFloatType, - Decimal128WithPrecisionAndScale<1>, Decimal128WithPrecisionAndScale<5>, - Decimal128WithPrecisionAndScale<10>, Decimal128WithPrecisionAndScale<19>, - Decimal128WithPrecisionAndScale<23>, Decimal128WithPrecisionAndScale<27>, + Decimal32WithPrecisionAndScale<1>, Decimal32WithPrecisionAndScale<5>, + Decimal64WithPrecisionAndScale<10>, Decimal64WithPrecisionAndScale<18>, + Decimal128WithPrecisionAndScale<19>, Decimal128WithPrecisionAndScale<27>, Decimal128WithPrecisionAndScale<38>, Decimal256WithPrecisionAndScale<39>, Decimal256WithPrecisionAndScale<56>, Decimal256WithPrecisionAndScale<76>> TestTypes; @@ -903,8 +953,9 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) { std::shared_ptr table = MakeSimpleTable(values, false); this->ResetSink(); - ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, - values->length(), default_writer_properties())); + ASSERT_OK_NO_THROW(WriteTable( + *table, ::arrow::default_memory_pool(), this->sink_, values->length(), + ::parquet::WriterProperties::Builder().enable_store_decimal_as_integer()->build())); std::shared_ptr
out; std::unique_ptr reader; @@ -2944,7 +2995,7 @@ TEST(ArrowReadWrite, Decimal256) { using ::arrow::Decimal256; using ::arrow::field; - auto type = ::arrow::decimal256(8, 4); + auto type = ::arrow::decimal256(48, 4); const char* json = R"(["1.0000", null, "-1.2345", "-1000.5678", "-9999.9999", "9999.9999"])"; @@ -2958,7 +3009,7 @@ TEST(ArrowReadWrite, DecimalStats) { using ::arrow::Decimal128; using ::arrow::field; - auto type = ::arrow::decimal128(/*precision=*/8, /*scale=*/0); + auto type = ::arrow::decimal128(/*precision=*/28, /*scale=*/0); const char* json = R"(["255", "128", null, "0", "1", "-127", "-128", "-129", "-255"])"; auto array = ::arrow::ArrayFromJSON(type, json); @@ -3447,8 +3498,8 @@ TEST(ArrowReadWrite, NestedRequiredOuterOptional) { types.push_back(::arrow::duration(::arrow::TimeUnit::MILLI)); types.push_back(::arrow::duration(::arrow::TimeUnit::MICRO)); types.push_back(::arrow::duration(::arrow::TimeUnit::NANO)); - types.push_back(::arrow::decimal128(3, 2)); - types.push_back(::arrow::decimal256(3, 2)); + types.push_back(::arrow::decimal32(3, 2)); + types.push_back(::arrow::decimal128(23, 2)); types.push_back(::arrow::fixed_size_binary(4)); // Note large variants of types appear to get converted back to regular on read types.push_back(::arrow::dictionary(::arrow::int32(), ::arrow::binary())); @@ -3500,9 +3551,8 @@ TEST(ArrowReadWrite, NestedRequiredOuterOptionalDecimal) { ByteArray("\x0f\x12\x06"), // 987654 }; const std::vector int32_values = {123456, 987654}; - const std::vector int64_values = {123456, 987654}; - const auto inner_type = ::arrow::decimal128(6, 3); + const auto inner_type = ::arrow::decimal32(6, 3); auto inner_field = ::arrow::field("inner", inner_type, /*nullable=*/false); auto type = ::arrow::struct_({inner_field}); auto field = ::arrow::field("outer", type, /*nullable=*/true); @@ -3512,7 +3562,7 @@ TEST(ArrowReadWrite, NestedRequiredOuterOptionalDecimal) { ::arrow::StructArray::Make({inner}, {inner_field}, null_bitmap)); auto table = ::arrow::Table::Make(::arrow::schema({field}), {array}); - for (const auto& encoding : {Type::BYTE_ARRAY, Type::INT32, Type::INT64}) { + for (const auto& encoding : {Type::BYTE_ARRAY, Type::INT32}) { // Manually write out file based on encoding type ARROW_SCOPED_TRACE("Encoding decimals as ", encoding); auto parquet_schema = GroupNode::Make( @@ -3543,12 +3593,6 @@ TEST(ArrowReadWrite, NestedRequiredOuterOptionalDecimal) { int32_values.data()); break; } - case Type::INT64: { - auto typed_writer = checked_cast(column_writer); - typed_writer->WriteBatch(4, def_levels.data(), /*rep_levels=*/nullptr, - int64_values.data()); - break; - } default: FAIL() << "Invalid encoding"; return; @@ -3562,11 +3606,11 @@ TEST(ArrowReadWrite, NestedRequiredOuterOptionalDecimal) { } } -TEST(ArrowReadWrite, Decimal256AsInt) { +TEST(ArrowReadWrite, Decimal32AsInt) { using ::arrow::Decimal256; using ::arrow::field; - auto type = ::arrow::decimal256(8, 4); + auto type = ::arrow::decimal32(8, 4); const char* json = R"(["1.0000", null, "-1.2345", "-1000.5678", "-9999.9999", "9999.9999"])"; @@ -4059,7 +4103,7 @@ TEST(TestArrowReaderAdHoc, WriteBatchedNestedNullableStringColumn) { // ARROW-10493 std::vector> fields{ ::arrow::field("s", ::arrow::utf8(), /*nullable=*/true), - ::arrow::field("d", ::arrow::decimal128(4, 2), /*nullable=*/true), + ::arrow::field("d", ::arrow::decimal32(4, 2), /*nullable=*/true), ::arrow::field("b", ::arrow::boolean(), /*nullable=*/true), ::arrow::field("i8", ::arrow::int8(), /*nullable=*/true), ::arrow::field("i64", ::arrow::int64(), /*nullable=*/true)}; @@ -4222,25 +4266,47 @@ TEST_P(TestArrowReaderAdHocSparkAndHvr, ReadDecimals) { std::shared_ptr expected_array; - ::arrow::Decimal128Builder builder(decimal_type, pool); - - for (int32_t i = 0; i < expected_length; ++i) { - ::arrow::Decimal128 value((i + 1) * 100); - ASSERT_OK(builder.Append(value)); + if (decimal_type->id() == ::arrow::Decimal32Type::type_id) { + ::arrow::Decimal32Builder builder(decimal_type, pool); + for (int32_t i = 0; i < expected_length; ++i) { + ::arrow::Decimal32 value((i + 1) * 100); + ASSERT_OK(builder.Append(value)); + } + ASSERT_OK(builder.Finish(&expected_array)); + } else if (decimal_type->id() == ::arrow::Decimal64Type::type_id) { + ::arrow::Decimal64Builder builder(decimal_type, pool); + for (int32_t i = 0; i < expected_length; ++i) { + ::arrow::Decimal64 value((i + 1) * 100); + ASSERT_OK(builder.Append(value)); + } + ASSERT_OK(builder.Finish(&expected_array)); + } else if (decimal_type->id() == ::arrow::Decimal128Type::type_id) { + ::arrow::Decimal128Builder builder(decimal_type, pool); + for (int32_t i = 0; i < expected_length; ++i) { + ::arrow::Decimal128 value((i + 1) * 100); + ASSERT_OK(builder.Append(value)); + } + ASSERT_OK(builder.Finish(&expected_array)); + } else { + ::arrow::Decimal256Builder builder(decimal_type, pool); + for (int32_t i = 0; i < expected_length; ++i) { + ::arrow::Decimal256 value((i + 1) * 100); + ASSERT_OK(builder.Append(value)); + } + ASSERT_OK(builder.Finish(&expected_array)); } - ASSERT_OK(builder.Finish(&expected_array)); + AssertArraysEqual(*expected_array, *chunk); } INSTANTIATE_TEST_SUITE_P( ReadDecimals, TestArrowReaderAdHocSparkAndHvr, ::testing::Values( - std::make_tuple("int32_decimal.parquet", ::arrow::decimal128(4, 2)), - std::make_tuple("int64_decimal.parquet", ::arrow::decimal128(10, 2)), + std::make_tuple("int32_decimal.parquet", ::arrow::decimal32(4, 2)), + std::make_tuple("int64_decimal.parquet", ::arrow::decimal64(10, 2)), std::make_tuple("fixed_length_decimal.parquet", ::arrow::decimal128(25, 2)), - std::make_tuple("fixed_length_decimal_legacy.parquet", - ::arrow::decimal128(13, 2)), - std::make_tuple("byte_array_decimal.parquet", ::arrow::decimal128(4, 2)))); + std::make_tuple("fixed_length_decimal_legacy.parquet", ::arrow::decimal64(13, 2)), + std::make_tuple("byte_array_decimal.parquet", ::arrow::decimal32(4, 2)))); TEST(TestArrowReaderAdHoc, ReadFloat16Files) { using ::arrow::util::Float16; @@ -5162,33 +5228,17 @@ class TestIntegerAnnotateDecimalTypeParquetIO : public TestParquetIO { this->ReaderFromSink(&reader); this->ReadSingleColumnFile(std::move(reader), &out); - // Reader always read values as DECIMAL128 type - ASSERT_EQ(out->type()->id(), ::arrow::Type::DECIMAL128); - - if (values.type()->id() == ::arrow::Type::DECIMAL128) { - AssertArraysEqual(values, *out); - } else { - auto& expected_values = dynamic_cast(values); - auto& read_values = dynamic_cast(*out); - ASSERT_EQ(expected_values.length(), read_values.length()); - ASSERT_EQ(expected_values.null_count(), read_values.null_count()); - ASSERT_EQ(expected_values.length(), read_values.length()); - for (int64_t i = 0; i < expected_values.length(); ++i) { - ASSERT_EQ(expected_values.IsNull(i), read_values.IsNull(i)); - if (!expected_values.IsNull(i)) { - ASSERT_EQ(::arrow::Decimal256(expected_values.Value(i)).ToString(0), - ::arrow::Decimal128(read_values.Value(i)).ToString(0)); - } - } - } + ASSERT_EQ(out->type()->id(), TestType::type_id); + AssertArraysEqual(values, *out); } }; typedef ::testing::Types< - Decimal128WithPrecisionAndScale<1>, Decimal128WithPrecisionAndScale<5>, - Decimal128WithPrecisionAndScale<10>, Decimal128WithPrecisionAndScale<18>, - Decimal256WithPrecisionAndScale<1>, Decimal256WithPrecisionAndScale<5>, - Decimal256WithPrecisionAndScale<10>, Decimal256WithPrecisionAndScale<18>> + Decimal32WithPrecisionAndScale<1>, Decimal32WithPrecisionAndScale<5>, + Decimal64WithPrecisionAndScale<10>, Decimal64WithPrecisionAndScale<18>, + Decimal128WithPrecisionAndScale<19>, Decimal128WithPrecisionAndScale<27>, + Decimal128WithPrecisionAndScale<38>, Decimal256WithPrecisionAndScale<39>, + Decimal256WithPrecisionAndScale<56>, Decimal256WithPrecisionAndScale<76>> DecimalTestTypes; TYPED_TEST_SUITE(TestIntegerAnnotateDecimalTypeParquetIO, DecimalTestTypes); diff --git a/cpp/src/parquet/arrow/arrow_schema_test.cc b/cpp/src/parquet/arrow/arrow_schema_test.cc index 10431c3a11813..4de4e1d3299f9 100644 --- a/cpp/src/parquet/arrow/arrow_schema_test.cc +++ b/cpp/src/parquet/arrow/arrow_schema_test.cc @@ -67,7 +67,7 @@ const auto TIMESTAMP_MS = ::arrow::timestamp(TimeUnit::MILLI); const auto TIMESTAMP_US = ::arrow::timestamp(TimeUnit::MICRO); const auto TIMESTAMP_NS = ::arrow::timestamp(TimeUnit::NANO); const auto BINARY = ::arrow::binary(); -const auto DECIMAL_8_4 = std::make_shared<::arrow::Decimal128Type>(8, 4); +const auto DECIMAL_8_4 = std::make_shared<::arrow::Decimal32Type>(8, 4); class TestConvertParquetSchema : public ::testing::Test { public: diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index 9d3171ea1a95d..3b5f75232d635 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -69,6 +69,12 @@ using arrow::Decimal128Type; using arrow::Decimal256; using arrow::Decimal256Array; using arrow::Decimal256Type; +using arrow::Decimal32; +using arrow::Decimal32Array; +using arrow::Decimal32Type; +using arrow::Decimal64; +using arrow::Decimal64Array; +using arrow::Decimal64Type; using arrow::Field; using arrow::Int32Array; using arrow::ListArray; @@ -211,25 +217,35 @@ Status ExtractDecimalMinMaxFromBytesType(const Statistics& statistics, std::shared_ptr<::arrow::Scalar>* max) { const DecimalLogicalType& decimal_type = checked_cast(logical_type); - - Result> maybe_type = - Decimal128Type::Make(decimal_type.precision(), decimal_type.scale()); + auto precision = decimal_type.precision(); + auto scale = decimal_type.scale(); std::shared_ptr arrow_type; - if (maybe_type.ok()) { - arrow_type = maybe_type.ValueOrDie(); + + if (precision <= Decimal32Type::kMaxPrecision) { + ARROW_ASSIGN_OR_RAISE(arrow_type, Decimal32Type::Make(precision, scale)); + ARROW_ASSIGN_OR_RAISE( + *min, FromBigEndianString(statistics.EncodeMin(), arrow_type)); + ARROW_ASSIGN_OR_RAISE(*max, FromBigEndianString(statistics.EncodeMax(), + std::move(arrow_type))); + } else if (precision <= Decimal64Type::kMaxPrecision) { + ARROW_ASSIGN_OR_RAISE(arrow_type, Decimal64Type::Make(precision, scale)); + ARROW_ASSIGN_OR_RAISE( + *min, FromBigEndianString(statistics.EncodeMin(), arrow_type)); + ARROW_ASSIGN_OR_RAISE(*max, FromBigEndianString(statistics.EncodeMax(), + std::move(arrow_type))); + } else if (precision <= Decimal128Type::kMaxPrecision) { + ARROW_ASSIGN_OR_RAISE(arrow_type, Decimal128Type::Make(precision, scale)); ARROW_ASSIGN_OR_RAISE( *min, FromBigEndianString(statistics.EncodeMin(), arrow_type)); ARROW_ASSIGN_OR_RAISE(*max, FromBigEndianString(statistics.EncodeMax(), std::move(arrow_type))); - return Status::OK(); + } else { + ARROW_ASSIGN_OR_RAISE(arrow_type, Decimal256Type::Make(precision, scale)); + ARROW_ASSIGN_OR_RAISE( + *min, FromBigEndianString(statistics.EncodeMin(), arrow_type)); + ARROW_ASSIGN_OR_RAISE(*max, FromBigEndianString(statistics.EncodeMax(), + std::move(arrow_type))); } - // Fallback to see if Decimal256 can represent the type. - ARROW_ASSIGN_OR_RAISE( - arrow_type, Decimal256Type::Make(decimal_type.precision(), decimal_type.scale())); - ARROW_ASSIGN_OR_RAISE( - *min, FromBigEndianString(statistics.EncodeMin(), arrow_type)); - ARROW_ASSIGN_OR_RAISE(*max, FromBigEndianString(statistics.EncodeMax(), - std::move(arrow_type))); return Status::OK(); } @@ -570,7 +586,8 @@ Status TransferBinary(RecordReader* reader, MemoryPool* pool, } // ---------------------------------------------------------------------- -// INT32 / INT64 / BYTE_ARRAY / FIXED_LEN_BYTE_ARRAY -> Decimal128 || Decimal256 +// INT32 / INT64 / BYTE_ARRAY / FIXED_LEN_BYTE_ARRAY +// -> Decimal32 || Decimal64 || Decimal128 || Decimal256 template Status RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width, @@ -583,6 +600,16 @@ Status RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width, template struct DecimalTypeTrait; +template <> +struct DecimalTypeTrait<::arrow::Decimal32Array> { + using value = ::arrow::Decimal32; +}; + +template <> +struct DecimalTypeTrait<::arrow::Decimal64Array> { + using value = ::arrow::Decimal64; +}; + template <> struct DecimalTypeTrait<::arrow::Decimal128Array> { using value = ::arrow::Decimal128; @@ -701,7 +728,7 @@ struct DecimalConverter { } }; -/// \brief Convert an Int32 or Int64 array into a Decimal128Array +/// \brief Convert an Int32 or Int64 array into a Decimal32Array or Decimal64Array /// The parquet spec allows systems to write decimals in int32, int64 if the values are /// small enough to fit in less 4 bytes or less than 8 bytes, respectively. /// This function implements the conversion from int32 and int64 arrays to decimal arrays. @@ -711,10 +738,10 @@ template < std::is_same::value>> static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool, const std::shared_ptr& field, Datum* out) { - // Decimal128 and Decimal256 are only Arrow constructs. Parquet does not + // Decimal32 and Decimal64 are only Arrow constructs. Parquet does not // specifically distinguish between decimal byte widths. - DCHECK(field->type()->id() == ::arrow::Type::DECIMAL128 || - field->type()->id() == ::arrow::Type::DECIMAL256); + DCHECK(field->type()->id() == ::arrow::Type::DECIMAL32 || + field->type()->id() == ::arrow::Type::DECIMAL64); const int64_t length = reader->values_written(); @@ -737,11 +764,11 @@ static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool, // sign/zero extend int32_t values, otherwise a no-op const auto value = static_cast(values[i]); - if constexpr (std::is_same_v) { - ::arrow::Decimal128 decimal(value); + if constexpr (std::is_same_v) { + ::arrow::Decimal32 decimal(value); decimal.ToBytes(out_ptr); } else { - ::arrow::Decimal256 decimal(value); + ::arrow::Decimal64 decimal(value); decimal.ToBytes(out_ptr); } } @@ -880,40 +907,63 @@ Status TransferColumnData(RecordReader* reader, } RETURN_NOT_OK(TransferHalfFloat(reader, pool, value_field, &result)); } break; - case ::arrow::Type::DECIMAL128: { + case ::arrow::Type::DECIMAL32: { switch (descr->physical_type()) { case ::parquet::Type::INT32: { - auto fn = DecimalIntegerTransfer; + auto fn = DecimalIntegerTransfer; + RETURN_NOT_OK(fn(reader, pool, value_field, &result)); + } break; + case ::parquet::Type::BYTE_ARRAY: { + auto fn = &TransferDecimal; + RETURN_NOT_OK(fn(reader, pool, value_field, &result)); + } break; + case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: { + auto fn = &TransferDecimal; RETURN_NOT_OK(fn(reader, pool, value_field, &result)); } break; + default: + return Status::Invalid( + "Physical type for decimal32 must be int32, byte array, or fixed length " + "binary"); + } + } break; + case ::arrow::Type::DECIMAL64: { + switch (descr->physical_type()) { case ::parquet::Type::INT64: { - auto fn = &DecimalIntegerTransfer; + auto fn = DecimalIntegerTransfer; RETURN_NOT_OK(fn(reader, pool, value_field, &result)); } break; case ::parquet::Type::BYTE_ARRAY: { - auto fn = &TransferDecimal; + auto fn = &TransferDecimal; RETURN_NOT_OK(fn(reader, pool, value_field, &result)); } break; case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: { - auto fn = &TransferDecimal; + auto fn = &TransferDecimal; RETURN_NOT_OK(fn(reader, pool, value_field, &result)); } break; default: return Status::Invalid( - "Physical type for decimal128 must be int32, int64, byte array, or fixed " - "length binary"); + "Physical type for decimal64 must be int64, byte array, or fixed length " + "binary"); } } break; - case ::arrow::Type::DECIMAL256: + case ::arrow::Type::DECIMAL128: { switch (descr->physical_type()) { - case ::parquet::Type::INT32: { - auto fn = DecimalIntegerTransfer; + case ::parquet::Type::BYTE_ARRAY: { + auto fn = &TransferDecimal; RETURN_NOT_OK(fn(reader, pool, value_field, &result)); } break; - case ::parquet::Type::INT64: { - auto fn = &DecimalIntegerTransfer; + case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: { + auto fn = &TransferDecimal; RETURN_NOT_OK(fn(reader, pool, value_field, &result)); } break; + default: + return Status::Invalid( + "Physical type for decimal128 must be byte array, or fixed length binary"); + } + } break; + case ::arrow::Type::DECIMAL256: { + switch (descr->physical_type()) { case ::parquet::Type::BYTE_ARRAY: { auto fn = &TransferDecimal; RETURN_NOT_OK(fn(reader, pool, value_field, &result)); @@ -924,11 +974,9 @@ Status TransferColumnData(RecordReader* reader, } break; default: return Status::Invalid( - "Physical type for decimal256 must be int32, int64, byte array, or fixed " - "length binary"); + "Physical type for decimal256 must be byte array, or fixed length binary"); } - break; - + } break; case ::arrow::Type::TIMESTAMP: { const ::arrow::TimestampType& timestamp_type = checked_cast<::arrow::TimestampType&>(*value_field->type()); diff --git a/cpp/src/parquet/arrow/test_util.h b/cpp/src/parquet/arrow/test_util.h index c8fcbbb65d1b6..d134aaf3dfe5a 100644 --- a/cpp/src/parquet/arrow/test_util.h +++ b/cpp/src/parquet/arrow/test_util.h @@ -47,9 +47,35 @@ using ::arrow::Array; using ::arrow::ChunkedArray; using ::arrow::Status; +template +struct Decimal32WithPrecisionAndScale { + static_assert(PRECISION >= ::arrow::Decimal32Type::kMinPrecision && + PRECISION <= ::arrow::Decimal32Type::kMaxPrecision, + "Invalid precision value"); + + using type = ::arrow::Decimal32Type; + static constexpr ::arrow::Type::type type_id = ::arrow::Decimal32Type::type_id; + static constexpr int32_t precision = PRECISION; + static constexpr int32_t scale = PRECISION - 1; +}; + +template +struct Decimal64WithPrecisionAndScale { + static_assert(PRECISION >= ::arrow::Decimal64Type::kMinPrecision && + PRECISION <= ::arrow::Decimal64Type::kMaxPrecision, + "Invalid precision value"); + + using type = ::arrow::Decimal64Type; + static constexpr ::arrow::Type::type type_id = ::arrow::Decimal64Type::type_id; + static constexpr int32_t precision = PRECISION; + static constexpr int32_t scale = PRECISION - 1; +}; + template struct Decimal128WithPrecisionAndScale { - static_assert(PRECISION >= 1 && PRECISION <= 38, "Invalid precision value"); + static_assert(PRECISION >= ::arrow::Decimal128Type::kMinPrecision && + PRECISION <= ::arrow::Decimal128Type::kMaxPrecision, + "Invalid precision value"); using type = ::arrow::Decimal128Type; static constexpr ::arrow::Type::type type_id = ::arrow::Decimal128Type::type_id; @@ -59,7 +85,9 @@ struct Decimal128WithPrecisionAndScale { template struct Decimal256WithPrecisionAndScale { - static_assert(PRECISION >= 1 && PRECISION <= 76, "Invalid precision value"); + static_assert(PRECISION >= ::arrow::Decimal256Type::kMinPrecision && + PRECISION <= ::arrow::Decimal256Type::kMaxPrecision, + "Invalid precision value"); using type = ::arrow::Decimal256Type; static constexpr ::arrow::Type::type type_id = ::arrow::Decimal256Type::type_id; @@ -154,6 +182,50 @@ static void random_decimals(int64_t n, uint32_t seed, int32_t precision, uint8_t std::memcpy(out, decimals->data()->GetValues(1, 0), byte_width * n); } +template +::arrow::enable_if_t< + std::is_same>::value, Status> +NonNullArray(size_t size, std::shared_ptr* out) { + constexpr int32_t kDecimalPrecision = precision; + constexpr int32_t kDecimalScale = Decimal32WithPrecisionAndScale::scale; + + const auto type = ::arrow::decimal32(kDecimalPrecision, kDecimalScale); + ::arrow::Decimal32Builder builder(type); + const int32_t byte_width = + static_cast(*type).byte_width(); + + constexpr int32_t seed = 0; + + ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width)); + random_decimals<::arrow::Decimal32Type::kByteWidth>(size, seed, kDecimalPrecision, + out_buf->mutable_data()); + + RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size)); + return builder.Finish(out); +} + +template +::arrow::enable_if_t< + std::is_same>::value, Status> +NonNullArray(size_t size, std::shared_ptr* out) { + constexpr int32_t kDecimalPrecision = precision; + constexpr int32_t kDecimalScale = Decimal64WithPrecisionAndScale::scale; + + const auto type = ::arrow::decimal64(kDecimalPrecision, kDecimalScale); + ::arrow::Decimal64Builder builder(type); + const int32_t byte_width = + static_cast(*type).byte_width(); + + constexpr int32_t seed = 0; + + ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width)); + random_decimals<::arrow::Decimal64Type::kByteWidth>(size, seed, kDecimalPrecision, + out_buf->mutable_data()); + + RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size)); + return builder.Finish(out); +} + template ::arrow::enable_if_t< std::is_same>::value, Status> @@ -341,6 +413,60 @@ ::arrow::enable_if_fixed_size_binary NullableArray( return builder.Finish(out); } +template +::arrow::enable_if_t< + std::is_same>::value, Status> +NullableArray(size_t size, size_t num_nulls, uint32_t seed, + std::shared_ptr<::arrow::Array>* out) { + std::vector valid_bytes(size, '\1'); + + for (size_t i = 0; i < num_nulls; ++i) { + valid_bytes[i * 2] = '\0'; + } + + constexpr int32_t kDecimalPrecision = precision; + constexpr int32_t kDecimalScale = Decimal32WithPrecisionAndScale::scale; + const auto type = ::arrow::decimal32(kDecimalPrecision, kDecimalScale); + const int32_t byte_width = + static_cast(*type).byte_width(); + + ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width)); + + random_decimals<::arrow::Decimal32Type::kByteWidth>(size, seed, precision, + out_buf->mutable_data()); + + ::arrow::Decimal32Builder builder(type); + RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size, valid_bytes.data())); + return builder.Finish(out); +} + +template +::arrow::enable_if_t< + std::is_same>::value, Status> +NullableArray(size_t size, size_t num_nulls, uint32_t seed, + std::shared_ptr<::arrow::Array>* out) { + std::vector valid_bytes(size, '\1'); + + for (size_t i = 0; i < num_nulls; ++i) { + valid_bytes[i * 2] = '\0'; + } + + constexpr int32_t kDecimalPrecision = precision; + constexpr int32_t kDecimalScale = Decimal64WithPrecisionAndScale::scale; + const auto type = ::arrow::decimal64(kDecimalPrecision, kDecimalScale); + const int32_t byte_width = + static_cast(*type).byte_width(); + + ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width)); + + random_decimals<::arrow::Decimal64Type::kByteWidth>(size, seed, precision, + out_buf->mutable_data()); + + ::arrow::Decimal64Builder builder(type); + RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size, valid_bytes.data())); + return builder.Finish(out); +} + template ::arrow::enable_if_t< std::is_same>::value, Status> diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 118dca7cf3d9b..defac2624239c 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -2377,7 +2377,7 @@ struct SerializeFunctor< } // Parquet's Decimal are stored with FixedLength values where the length is - // proportional to the precision. Arrow's Decimal are always stored with 16/32 + // proportional to the precision. Arrow's Decimal are always stored with 4/8/16/32 // bytes. Thus the internal FLBA pointer must be adjusted by the offset calculated // here. int32_t Offset(const Array& array) { @@ -2391,29 +2391,45 @@ struct SerializeFunctor< int64_t non_null_count = array.length() - array.null_count(); int64_t size = non_null_count * ArrowType::kByteWidth; scratch_buffer = AllocateBuffer(ctx->memory_pool, size); - scratch = reinterpret_cast(scratch_buffer->mutable_data()); + scratch_i32 = reinterpret_cast(scratch_buffer->mutable_data()); + scratch_i64 = reinterpret_cast(scratch_buffer->mutable_data()); } template FixedLenByteArray FixDecimalEndianness(const uint8_t* in, int64_t offset) { + static_assert(byte_width == ::arrow::Decimal32Type::kByteWidth || + byte_width == ::arrow::Decimal64Type::kByteWidth || + byte_width == ::arrow::Decimal128Type::kByteWidth || + byte_width == ::arrow::Decimal256Type::kByteWidth, + "only 4/8/16/32 byte Decimals supported"); + + if constexpr (byte_width == ::arrow::Decimal32Type::kByteWidth) { + const auto* u32_in = reinterpret_cast(in); + auto out = reinterpret_cast(scratch_i32) + offset; + *scratch_i32++ = ::arrow::bit_util::ToBigEndian(u32_in[0]); + return FixedLenByteArray(out); + } + const auto* u64_in = reinterpret_cast(in); - auto out = reinterpret_cast(scratch) + offset; - static_assert(byte_width == 16 || byte_width == 32, - "only 16 and 32 byte Decimals supported"); - if (byte_width == 32) { - *scratch++ = ::arrow::bit_util::ToBigEndian(u64_in[3]); - *scratch++ = ::arrow::bit_util::ToBigEndian(u64_in[2]); - *scratch++ = ::arrow::bit_util::ToBigEndian(u64_in[1]); - *scratch++ = ::arrow::bit_util::ToBigEndian(u64_in[0]); + auto out = reinterpret_cast(scratch_i64) + offset; + if constexpr (byte_width == ::arrow::Decimal64Type::kByteWidth) { + *scratch_i64++ = ::arrow::bit_util::ToBigEndian(u64_in[0]); + } else if constexpr (byte_width == ::arrow::Decimal128Type::kByteWidth) { + *scratch_i64++ = ::arrow::bit_util::ToBigEndian(u64_in[1]); + *scratch_i64++ = ::arrow::bit_util::ToBigEndian(u64_in[0]); } else { - *scratch++ = ::arrow::bit_util::ToBigEndian(u64_in[1]); - *scratch++ = ::arrow::bit_util::ToBigEndian(u64_in[0]); + *scratch_i64++ = ::arrow::bit_util::ToBigEndian(u64_in[3]); + *scratch_i64++ = ::arrow::bit_util::ToBigEndian(u64_in[2]); + *scratch_i64++ = ::arrow::bit_util::ToBigEndian(u64_in[1]); + *scratch_i64++ = ::arrow::bit_util::ToBigEndian(u64_in[0]); } + return FixedLenByteArray(out); } std::shared_ptr scratch_buffer; - int64_t* scratch; + int32_t* scratch_i32; + int64_t* scratch_i64; }; // ----------------------------------------------------------------------