Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Nov 25, 2024
1 parent 84c299f commit 1a2032c
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 74 deletions.
14 changes: 13 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ option(VELOX_ENABLE_PARQUET "Enable Parquet support" OFF)
option(VELOX_ENABLE_ARROW "Enable Arrow support" OFF)
option(VELOX_ENABLE_REMOTE_FUNCTIONS "Enable remote function support" OFF)
option(VELOX_ENABLE_CCACHE "Use ccache if installed." ON)
option(VELOX_ENABLE_COMPRESSION_LZ4 "Enable Lz4 compression support." OFF)

option(VELOX_BUILD_TEST_UTILS "Builds Velox test utilities" OFF)
option(VELOX_BUILD_VECTOR_TEST_UTILS "Builds Velox vector test utilities" OFF)
Expand Down Expand Up @@ -150,6 +151,10 @@ if(${VELOX_BUILD_MINIMAL} OR ${VELOX_BUILD_MINIMAL_WITH_DWIO})
set(VELOX_ENABLE_SUBSTRAIT OFF)
endif()

if(${VELOX_BUILD_MINIMAL_WITH_DWIO} OR ${VELOX_ENABLE_HIVE_CONNECTOR})
set(VELOX_ENABLE_COMPRESSION_LZ4 ON)
endif()

if(${VELOX_BUILD_TESTING})
# Enable all components to build testing binaries
set(VELOX_ENABLE_PRESTO_FUNCTIONS ON)
Expand All @@ -161,6 +166,7 @@ if(${VELOX_BUILD_TESTING})
set(VELOX_ENABLE_SPARK_FUNCTIONS ON)
set(VELOX_ENABLE_EXAMPLES ON)
set(VELOX_ENABLE_PARQUET ON)
set(VELOX_ENABLE_COMPRESSION_LZ4 ON)
endif()

if(${VELOX_ENABLE_BENCHMARKS})
Expand Down Expand Up @@ -270,6 +276,10 @@ if(VELOX_ENABLE_PARQUET)
set(VELOX_ENABLE_ARROW ON)
endif()

if(VELOX_ENABLE_COMPRESSION_LZ4)
add_definitions(-DVELOX_ENABLE_COMPRESSION_LZ4)
endif()

# make buildPartitionBounds_ a vector int64 instead of int32 to avoid integer
# overflow
if(${VELOX_ENABLE_INT64_BUILD_PARTITION_BOUND})
Expand Down Expand Up @@ -450,7 +460,9 @@ velox_resolve_dependency(glog)
velox_set_source(fmt)
velox_resolve_dependency(fmt 9.0.0)

find_package(lz4 REQUIRED)
if(VELOX_ENABLE_COMPRESSION_LZ4)
find_package(lz4 REQUIRED)
endif()

if(${VELOX_BUILD_MINIMAL_WITH_DWIO} OR ${VELOX_ENABLE_HIVE_CONNECTOR})
# DWIO needs all sorts of stream compression libraries.
Expand Down
18 changes: 11 additions & 7 deletions velox/common/compression/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
endif()

velox_add_library(
velox_common_compression
Compression.cpp
LzoDecompressor.cpp
Lz4Compression.cpp
HadoopCompressionFormat.cpp)
set(VELOX_COMMON_COMPRESSION_SRCS Compression.cpp LzoDecompressor.cpp)
set(VELOX_COMMON_COMPRESSION_LINK_LIBS velox_status Folly::folly)

if(VELOX_ENABLE_COMPRESSION_LZ4)
list(APPEND VELOX_COMMON_COMPRESSION_SRCS Lz4Compression.cpp
HadoopCompressionFormat.cpp)
list(APPEND VELOX_COMMON_COMPRESSION_LINK_LIBS lz4::lz4)
endif()

velox_add_library(velox_common_compression ${VELOX_COMMON_COMPRESSION_SRCS})
velox_link_libraries(
velox_common_compression
PUBLIC velox_status Folly::folly lz4::lz4
PUBLIC ${VELOX_COMMON_COMPRESSION_LINK_LIBS}
PRIVATE velox_exception)
24 changes: 14 additions & 10 deletions velox/common/compression/Compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

#include "velox/common/compression/Compression.h"
#include "velox/common/base/Exceptions.h"
#ifdef VELOX_ENABLE_COMPRESSION_LZ4
#include "velox/common/compression/Lz4Compression.h"
#endif

#include <folly/Conv.h>

Expand Down Expand Up @@ -109,8 +111,10 @@ bool Codec::supportsGetUncompressedLength(CompressionKind kind) {

bool Codec::supportsStreamingCompression(CompressionKind kind) {
switch (kind) {
#ifdef VELOX_ENABLE_COMPRESSION_LZ4
case CompressionKind::CompressionKind_LZ4:
return true;
#endif
default:
return false;
}
Expand All @@ -121,7 +125,7 @@ bool Codec::supportsCompressFixedLength(CompressionKind kind) {
return false;
}

folly::Expected<std::unique_ptr<Codec>, Status> Codec::create(
Expected<std::unique_ptr<Codec>> Codec::create(
CompressionKind kind,
const CodecOptions& codecOptions) {
if (!isAvailable(kind)) {
Expand All @@ -137,6 +141,7 @@ folly::Expected<std::unique_ptr<Codec>, Status> Codec::create(
auto compressionLevel = codecOptions.compressionLevel;
std::unique_ptr<Codec> codec;
switch (kind) {
#ifdef VELOX_ENABLE_COMPRESSION_LZ4
case CompressionKind::CompressionKind_LZ4:
if (auto options = dynamic_cast<const Lz4CodecOptions*>(&codecOptions)) {
switch (options->type) {
Expand All @@ -154,6 +159,7 @@ folly::Expected<std::unique_ptr<Codec>, Status> Codec::create(
// By default, create LZ4 Frame codec.
codec = makeLz4FrameCodec(compressionLevel);
break;
#endif
default:
break;
}
Expand All @@ -169,7 +175,7 @@ folly::Expected<std::unique_ptr<Codec>, Status> Codec::create(
return codec;
}

folly::Expected<std::unique_ptr<Codec>, Status> Codec::create(
Expected<std::unique_ptr<Codec>> Codec::create(
CompressionKind kind,
int32_t compressionLevel) {
return create(kind, CodecOptions{compressionLevel});
Expand All @@ -178,13 +184,11 @@ folly::Expected<std::unique_ptr<Codec>, Status> Codec::create(
bool Codec::isAvailable(CompressionKind kind) {
switch (kind) {
case CompressionKind::CompressionKind_NONE:
return true;
#ifdef VELOX_ENABLE_COMPRESSION_LZ4
case CompressionKind::CompressionKind_LZ4:
return true;
case CompressionKind::CompressionKind_SNAPPY:
case CompressionKind::CompressionKind_GZIP:
case CompressionKind::CompressionKind_ZLIB:
case CompressionKind::CompressionKind_ZSTD:
case CompressionKind::CompressionKind_LZO:
#endif
default:
return false;
}
Expand All @@ -196,7 +200,7 @@ std::optional<uint64_t> Codec::getUncompressedLength(
return std::nullopt;
}

folly::Expected<uint64_t, Status> Codec::compressFixedLength(
Expected<uint64_t> Codec::compressFixedLength(
const uint8_t* input,
uint64_t inputLength,
uint8_t* output,
Expand All @@ -205,13 +209,13 @@ folly::Expected<uint64_t, Status> Codec::compressFixedLength(
Status::Invalid("'{}' doesn't support fixed-length compression", name()));
}

folly::Expected<std::shared_ptr<StreamingCompressor>, Status>
Expected<std::shared_ptr<StreamingCompressor>>
Codec::makeStreamingCompressor() {
return folly::makeUnexpected(Status::Invalid(
"Streaming compression is unsupported with {} format.", name()));
}

folly::Expected<std::shared_ptr<StreamingDecompressor>, Status>
Expected<std::shared_ptr<StreamingDecompressor>>
Codec::makeStreamingDecompressor() {
return folly::makeUnexpected(Status::Invalid(
"Streaming decompression is unsupported with {} format.", name()));
Expand Down
22 changes: 11 additions & 11 deletions velox/common/compression/Compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class StreamingCompressor {
/// Compress some input.
/// If CompressResult.outputTooSmall is true on return, then a larger output
/// buffer should be supplied.
virtual folly::Expected<CompressResult, Status> compress(
virtual Expected<CompressResult> compress(
const uint8_t* input,
uint64_t inputLength,
uint8_t* output,
Expand All @@ -105,15 +105,15 @@ class StreamingCompressor {
/// Flush part of the compressed output.
/// If FlushResult.outputTooSmall is true on return, flush() should be called
/// again with a larger buffer.
virtual folly::Expected<FlushResult, Status> flush(
virtual Expected<FlushResult> flush(
uint8_t* output,
uint64_t outputLength) = 0;

/// End compressing, doing whatever is necessary to end the stream.
/// If EndResult.outputTooSmall is true on return, end() should be called
/// again with a larger buffer. Otherwise, the StreamingCompressor should not
/// be used anymore. end() will flush the compressed output.
virtual folly::Expected<EndResult, Status> end(
virtual Expected<EndResult> end(
uint8_t* output,
uint64_t outputLength) = 0;
};
Expand All @@ -131,7 +131,7 @@ class StreamingDecompressor {
/// Decompress some input.
/// If outputTooSmall is true on return, a larger output buffer needs
/// to be supplied.
virtual folly::Expected<DecompressResult, Status> decompress(
virtual Expected<DecompressResult> decompress(
const uint8_t* input,
uint64_t inputLength,
uint8_t* output,
Expand All @@ -158,12 +158,12 @@ class Codec {
virtual ~Codec() = default;

// Create a kind for the given compression algorithm with CodecOptions.
static folly::Expected<std::unique_ptr<Codec>, Status> create(
static Expected<std::unique_ptr<Codec>> create(
CompressionKind kind,
const CodecOptions& codecOptions = CodecOptions{});

// Create a kind for the given compression algorithm.
static folly::Expected<std::unique_ptr<Codec>, Status> create(
static Expected<std::unique_ptr<Codec>> create(
CompressionKind kind,
int32_t compressionLevel);

Expand Down Expand Up @@ -202,7 +202,7 @@ class Codec {
/// Note: One-shot compression is not always compatible with streaming
/// decompression. Depending on the codec (e.g. LZ4), different formats may
/// be used.
virtual folly::Expected<uint64_t, Status> compress(
virtual Expected<uint64_t> compress(
const uint8_t* input,
uint64_t inputLength,
uint8_t* output,
Expand All @@ -214,7 +214,7 @@ class Codec {
/// Note: One-shot decompression is not always compatible with streaming
/// compression. Depending on the codec (e.g. LZ4), different formats may
/// be used.
virtual folly::Expected<uint64_t, Status> decompress(
virtual Expected<uint64_t> decompress(
const uint8_t* input,
uint64_t inputLength,
uint8_t* output,
Expand All @@ -229,7 +229,7 @@ class Codec {
/// function. This is useful when fixed-length compression blocks are required
/// by the caller.
/// Note: Only Gzip and Zstd codec supports this function.
virtual folly::Expected<uint64_t, Status> compressFixedLength(
virtual Expected<uint64_t> compressFixedLength(
const uint8_t* input,
uint64_t inputLength,
uint8_t* output,
Expand All @@ -247,11 +247,11 @@ class Codec {
const uint8_t* input) const;

// Create a streaming compressor instance.
virtual folly::Expected<std::shared_ptr<StreamingCompressor>, Status>
virtual Expected<std::shared_ptr<StreamingCompressor>>
makeStreamingCompressor();

// Create a streaming compressor instance.
virtual folly::Expected<std::shared_ptr<StreamingDecompressor>, Status>
virtual Expected<std::shared_ptr<StreamingDecompressor>>
makeStreamingDecompressor();

// This Codec's compression type.
Expand Down
10 changes: 0 additions & 10 deletions velox/common/compression/HadoopCompressionFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,6 @@ bool HadoopCompressionFormat::tryDecompressHadoop(
uint8_t* output,
uint64_t outputLength,
uint64_t& actualDecompressedSize) {
// Parquet files written with the Hadoop Lz4RawCodec use their own framing.
// The input buffer can contain an arbitrary number of "frames", each
// with the following structure:
// - bytes 0..3: big-endian uint32_t representing the frame decompressed
// size
// - bytes 4..7: big-endian uint32_t representing the frame compressed size
// - bytes 8...: frame compressed data
//
// The Hadoop Lz4Codec source code can be found here:
// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc
uint64_t totalDecompressedSize = 0;

while (inputLength >= kPrefixLength) {
Expand Down
15 changes: 14 additions & 1 deletion velox/common/compression/HadoopCompressionFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,29 @@

namespace facebook::velox::common {

/// Parquet files written with the Hadoop compression codecs use their own
/// framing.
/// The input buffer can contain an arbitrary number of "frames", each
/// with the following structure:
/// - bytes 0..3: big-endian uint32_t representing the frame decompressed
/// size
/// - bytes 4..7: big-endian uint32_t representing the frame compressed size
/// - bytes 8...: frame compressed data
class HadoopCompressionFormat {
protected:
/// Try to decompress input data in Hadoop's compression format.
/// Returns true if decompression is successful, false otherwise.
bool tryDecompressHadoop(
const uint8_t* input,
uint64_t inputLength,
uint8_t* output,
uint64_t outputLength,
uint64_t& actualDecompressedSize);

virtual folly::Expected<uint64_t, Status> decompressInternal(
/// Called by tryDecompressHadoop to decompress a single frame and
/// should be implemented based on the specific compression format.
/// E.g. Lz4HadoopCodec uses Lz4RawCodec::decompress to decompress a frame.
virtual Expected<uint64_t> decompressInternal(
const uint8_t* input,
uint64_t inputLength,
uint8_t* output,
Expand Down
Loading

0 comments on commit 1a2032c

Please sign in to comment.