Skip to content

Commit

Permalink
Support row group skip for Parquet decimal
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo committed Nov 28, 2024
1 parent e8fe5b9 commit 0d472e4
Show file tree
Hide file tree
Showing 21 changed files with 219 additions and 83 deletions.
6 changes: 4 additions & 2 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,8 @@ bool testFilters(
const std::unordered_map<std::string, std::optional<std::string>>&
partitionKeys,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
partitionKeysHandle) {
partitionKeysHandle,
dwio::common::FileFormat fileFormat) {
const auto totalRows = reader->numberOfRows();
const auto& fileTypeWithId = reader->typeWithId();
const auto& rowType = reader->rowType();
Expand Down Expand Up @@ -722,7 +723,8 @@ bool testFilters(
child->filter(),
columnStats.get(),
totalRows.value(),
typeWithId->type())) {
typeWithId->type(),
fileFormat)) {
VLOG(1) << "Skipping " << filePath
<< " based on stats and filter for column "
<< child->fieldName();
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/HiveConnectorUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ bool testFilters(
const std::unordered_map<std::string, std::optional<std::string>>&
partitionKey,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
partitionKeysHandle);
partitionKeysHandle,
dwio::common::FileFormat fileFormat);

std::unique_ptr<dwio::common::BufferedInput> createBufferedInput(
const FileHandle& fileHandle,
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ bool SplitReader::checkIfSplitIsEmpty(
baseReader_.get(),
hiveSplit_->filePath,
hiveSplit_->partitionKeys,
*partitionKeys_)) {
*partitionKeys_,
hiveSplit_->fileFormat)) {
++runtimeStats.skippedSplits;
runtimeStats.skippedSplitBytes += hiveSplit_->length;
emptySplit_ = true;
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ PositionalDeleteFileReader::PositionalDeleteFileReader(
deleteReader.get(),
deleteSplit_->filePath,
deleteSplit_->partitionKeys,
{})) {
{},
deleteSplit_->fileFormat)) {
// We only count the number of base splits skipped as skippedSplits runtime
// statistics in Velox. Skipped delta split is only counted as skipped
// bytes.
Expand Down
65 changes: 42 additions & 23 deletions velox/dwio/common/ScanSpec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "velox/dwio/common/ScanSpec.h"

#include "velox/core/Expressions.h"
#include "velox/dwio/common/Options.h"
#include "velox/dwio/common/Statistics.h"

namespace facebook::velox::common {
Expand Down Expand Up @@ -167,36 +168,46 @@ void ScanSpec::moveAdaptationFrom(ScanSpec& other) {
}

namespace {

// Test the filter against integer column statistics. T could be int64_t or
// int128_t.
template <typename T>
bool testIntFilter(
common::Filter* filter,
dwio::common::IntegerColumnStatistics* intStats,
dwio::common::IntegerColumnStatistics<T>* intStats,
bool mayHaveNull) {
if (!intStats) {
return true;
}

if (intStats->getMinimum().has_value() &&
intStats->getMaximum().has_value()) {
return filter->testInt64Range(
intStats->getMinimum().value(),
intStats->getMaximum().value(),
mayHaveNull);
const auto minValue = intStats->getMinimum().value();
const auto maxValue = intStats->getMaximum().value();
if constexpr (std::is_same_v<T, int64_t>) {
return filter->testInt64Range(minValue, maxValue, mayHaveNull);
}
return filter->testInt128Range(minValue, maxValue, mayHaveNull);
}

// only min value
if (intStats->getMinimum().has_value()) {
return filter->testInt64Range(
intStats->getMinimum().value(),
std::numeric_limits<int64_t>::max(),
mayHaveNull);
const auto minValue = intStats->getMinimum().value();
const auto maxValue = std::numeric_limits<int64_t>::max();
if constexpr (std::is_same_v<T, int64_t>) {
return filter->testInt64Range(minValue, maxValue, mayHaveNull);
}
return filter->testInt128Range(minValue, maxValue, mayHaveNull);
}

// only max value
if (intStats->getMaximum().has_value()) {
return filter->testInt64Range(
std::numeric_limits<int64_t>::min(),
intStats->getMaximum().value(),
mayHaveNull);
const auto minValue = std::numeric_limits<int64_t>::min();
const auto maxValue = intStats->getMaximum().value();
if constexpr (std::is_same_v<T, int64_t>) {
return filter->testInt64Range(minValue, maxValue, mayHaveNull);
}
return filter->testInt128Range(minValue, maxValue, mayHaveNull);
}

return true;
Expand Down Expand Up @@ -292,7 +303,8 @@ bool testFilter(
common::Filter* filter,
dwio::common::ColumnStatistics* stats,
uint64_t totalRows,
const TypePtr& type) {
const TypePtr& type,
dwio::common::FileFormat fileFormat) {
bool mayHaveNull{true};

// Has-null statistics is often not set. Hence, we supplement it with
Expand All @@ -315,20 +327,27 @@ bool testFilter(
if (mayHaveNull && filter->testNull()) {
return true;
}
if (type->isDecimal()) {
// The min and max value in the metadata for decimal type in Parquet can be
// stored in different physical types, including int32, int64 and
// fixed_len_byte_array. The loading of them is not supported in Metadata.

if (fileFormat != dwio::common::FileFormat::PARQUET && type->isDecimal()) {
// For non-Parquet files, row group skip based on stats for decimal column
// is not supported.
return true;
}

switch (type->kind()) {
case TypeKind::BIGINT:
case TypeKind::INTEGER:
case TypeKind::TINYINT:
case TypeKind::SMALLINT:
case TypeKind::TINYINT: {
case TypeKind::INTEGER:
case TypeKind::BIGINT: {
auto* intStats =
dynamic_cast<dwio::common::IntegerColumnStatistics<int64_t>*>(stats);
return testIntFilter<int64_t>(filter, intStats, mayHaveNull);
}
case TypeKind::HUGEINT: {
VELOX_CHECK(type->isLongDecimal());
auto* intStats =
dynamic_cast<dwio::common::IntegerColumnStatistics*>(stats);
return testIntFilter(filter, intStats, mayHaveNull);
dynamic_cast<dwio::common::IntegerColumnStatistics<int128_t>*>(stats);
return testIntFilter<int128_t>(filter, intStats, mayHaveNull);
}
case TypeKind::REAL:
case TypeKind::DOUBLE: {
Expand Down
6 changes: 4 additions & 2 deletions velox/dwio/common/ScanSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ namespace facebook {
namespace velox {
namespace dwio::common {
class ColumnStatistics;
}
enum class FileFormat;
} // namespace dwio::common
namespace common {

// Describes the filtering and value extraction for a
Expand Down Expand Up @@ -475,7 +476,8 @@ bool testFilter(
common::Filter* filter,
dwio::common::ColumnStatistics* stats,
uint64_t totalRows,
const TypePtr& type);
const TypePtr& type,
dwio::common::FileFormat fileFormat);

} // namespace common
} // namespace velox
Expand Down
32 changes: 16 additions & 16 deletions velox/dwio/common/Statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,30 +309,30 @@ class DoubleColumnStatistics : public virtual ColumnStatistics {
std::optional<double> sum_;
};

/**
* Statistics for all of the integer columns, such as byte, short, int, and
* long.
*/
/// Statistics for all of the integer columns, such as byte, short, int, long,
/// and int128_t. T is the type of statistics value. By default, it is int64_t
/// and can be changed to int128_t for huge int statistics.
template <typename T = int64_t>
class IntegerColumnStatistics : public virtual ColumnStatistics {
public:
IntegerColumnStatistics(
std::optional<uint64_t> valueCount,
std::optional<bool> hasNull,
std::optional<uint64_t> rawSize,
std::optional<uint64_t> size,
std::optional<int64_t> min,
std::optional<int64_t> max,
std::optional<int64_t> sum)
std::optional<T> min,
std::optional<T> max,
std::optional<T> sum)
: ColumnStatistics(valueCount, hasNull, rawSize, size),
min_(min),
max_(max),
sum_(sum) {}

IntegerColumnStatistics(
const ColumnStatistics& colStats,
std::optional<int64_t> min,
std::optional<int64_t> max,
std::optional<int64_t> sum)
std::optional<T> min,
std::optional<T> max,
std::optional<T> sum)
: ColumnStatistics(colStats), min_(min), max_(max), sum_(sum) {}

~IntegerColumnStatistics() override = default;
Expand All @@ -341,23 +341,23 @@ class IntegerColumnStatistics : public virtual ColumnStatistics {
* Get optional smallest value in the column. Only defined if
* getNumberOfValues is non-zero.
*/
std::optional<int64_t> getMinimum() const {
std::optional<T> getMinimum() const {
return min_;
}

/**
* Get optional largest value in the column. Only defined if getNumberOfValues
* is non-zero.
*/
std::optional<int64_t> getMaximum() const {
std::optional<T> getMaximum() const {
return max_;
}

/**
* Get optional sum of the column. Only valid if getNumberOfValues is non-zero
* and sum doesn't overflow
*/
std::optional<int64_t> getSum() const {
std::optional<T> getSum() const {
return sum_;
}

Expand All @@ -375,9 +375,9 @@ class IntegerColumnStatistics : public virtual ColumnStatistics {
protected:
IntegerColumnStatistics() {}

std::optional<int64_t> min_;
std::optional<int64_t> max_;
std::optional<int64_t> sum_;
std::optional<T> min_;
std::optional<T> max_;
std::optional<T> sum_;
};

/**
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/tests/utils/DataSetBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class DataSetBuilder {
if (counter % 100 < repeats) {
numbers->set(row, T(counter % repeats));
} else if (counter % 100 > 90 && row > 0) {
numbers->copy(numbers, row - 1, row, 1);
numbers->copy(numbers, row, row - 1, 1);
} else {
int64_t value;
if (rareFrequency && counter % rareFrequency == 0) {
Expand Down
4 changes: 3 additions & 1 deletion velox/dwio/common/tests/utils/E2EFilterTestBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,9 @@ void E2EFilterTestBase::testRowGroupSkip(
// Makes a row group skipping filter for the first bigint column.
for (auto& field : filterable) {
VectorPtr child = getChildBySubfield(batches[0].get(), Subfield(field));
if (child->type() == BIGINT() || child->typeKind() == TypeKind::VARCHAR) {
if (child->typeKind() == TypeKind::BIGINT ||
child->typeKind() == TypeKind::HUGEINT ||
child->typeKind() == TypeKind::VARCHAR) {
specs.emplace_back();
specs.back().field = field;
specs.back().isForRowGroupSkip = true;
Expand Down
7 changes: 7 additions & 0 deletions velox/dwio/common/tests/utils/FilterGenerator.h
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,13 @@ class ColumnStats : public AbstractColumnStats {
}
}
}
if (max == std::numeric_limits<T>::max()) {
return std::make_unique<velox::common::AlwaysFalse>();
}
max += 1;
if constexpr (std::is_same_v<T, int128_t>) {
return std::make_unique<velox::common::HugeintRange>(max, max, false);
}
return std::make_unique<velox::common::BigintRange>(
getIntegerValue(max), getIntegerValue(max), false);
}
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/common/Statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ std::unique_ptr<ColumnStatistics> buildColumnStatisticsFromProto(
if (!stats.hasNumberOfValues() || stats.numberOfValues() > 0) {
if (stats.hasIntStatistics()) {
const auto& intStats = stats.intStatistics();
return std::make_unique<IntegerColumnStatistics>(
return std::make_unique<IntegerColumnStatistics<>>(
colStats,
intStats.hasMinimum() ? std::optional(intStats.minimum())
: std::nullopt,
Expand Down
9 changes: 7 additions & 2 deletions velox/dwio/dwrf/reader/DwrfData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,11 @@ void DwrfData::filterRowGroups(
ColumnStatisticsWrapper(&entry.statistics()), *dwrfContext);
if (filter &&
!testFilter(
filter, columnStats.get(), rowGroupSize, fileType_->type())) {
filter,
columnStats.get(),
rowGroupSize,
fileType_->type(),
dwio::common::FileFormat::DWRF)) {
VLOG(1) << "Drop stride " << i << " on " << scanSpec.toString();
bits::setBit(result.filterResult.data(), i);
continue;
Expand All @@ -183,7 +187,8 @@ void DwrfData::filterRowGroups(
metadataFilter,
columnStats.get(),
rowGroupSize,
fileType_->type())) {
fileType_->type(),
dwio::common::FileFormat::DWRF)) {
bits::setBit(
result.metadataFilterResults[metadataFiltersStartIndex + j]
.second.data(),
Expand Down
Loading

0 comments on commit 0d472e4

Please sign in to comment.