Skip to content

Commit

Permalink
Merge pull request #338 from okkez/add-parquet-compressor
Browse files Browse the repository at this point in the history
Add parquet compressor using columnify
  • Loading branch information
ashie authored Apr 8, 2021
2 parents 6ab7c53 + 7ae7cf4 commit ab61912
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 0 deletions.
48 changes: 48 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,57 @@ archive format on S3. You can use several format:
* gzip_command (Need gzip command)
* This compressor uses an external gzip command, hence would result in
utilizing CPU cores well compared with `gzip`
* parquet (Need columnify command)
* This compressor uses an external [columnify](https://github.com/reproio/columnify) command.
* Use `<compress>` section to configure columnify command behavior.

See `Use your compression algorithm` section for adding another format.

**`<compress>`** (for parquet compressor only)

**parquet_compression_codec**

parquet compression codec.

* uncompressed
* snappy (default)
* gzip
* lzo (unsupported by columnify)
* brotli (unsupported by columnify)
* lz4 (unsupported by columnify)
* zstd

**parquet_page_size**

parquet file page size. default: 8192 bytes

**parquet_row_group_size**

parquet file row group size. default: 128 MB

**record_type**

record data format type.

* avro
* csv
* jsonl
* msgpack
* tsv
* msgpack (default)
* json

**schema_type**

schema type.

* avro (default)
* bigquery

**schema_file (required)**

path to schema file.

**`<format>` or format**

Change one line format in the S3 object. Supported formats are "out_file",
Expand Down
83 changes: 83 additions & 0 deletions lib/fluent/plugin/s3_compressor_parquet.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
require "open3"

module Fluent::Plugin
class S3Output
class ParquetCompressor < Compressor
S3Output.register_compressor("parquet", self)

config_section :compress, multi: false do
desc "parquet compression codec"
config_param :parquet_compression_codec, :enum, list: [:uncompressed, :snappy, :gzip, :lzo, :brotli, :lz4, :zstd], default: :snappy
desc "parquet file page size"
config_param :parquet_page_size, :size, default: 8192
desc "parquet file row group size"
config_param :parquet_row_group_size, :size, default: 128 * 1024 * 1024
desc "record data format type"
config_param :record_type, :enum, list: [:avro, :csv, :jsonl, :msgpack, :tsv, :json], default: :msgpack
desc "schema type"
config_param :schema_type, :enum, list: [:avro, :bigquery], default: :avro
desc "path to schema file"
config_param :schema_file, :string
end

def configure(conf)
super
check_command("columnify", "-h")

if [:lzo, :brotli, :lz4].include?(@compress.parquet_compression_codec)
raise Fluent::ConfigError, "unsupported compression codec: #{@compress.parquet_compression_codec}"
end

@parquet_compression_codec = @compress.parquet_compression_codec.to_s.upcase
if @compress.record_type == :json
@record_type = :jsonl
else
@record_type = @compress.record_type
end
end

def ext
"parquet".freeze
end

def content_type
"application/octet-stream".freeze
end

def compress(chunk, tmp)
chunk_is_file = @buffer_type == "file"
path = if chunk_is_file
chunk.path
else
w = Tempfile.new("chunk-parquet-tmp")
w.binmode
chunk.write_to(w)
w.close
w.path
end
stdout, stderr, status = columnify(path, tmp.path)
unless status.success?
raise Fluent::UnrecoverableError, "failed to execute columnify command. stdout=#{stdout} stderr=#{stderr} status=#{status.inspect}"
end
ensure
unless chunk_is_file
w.close(true) rescue nil
end
end

private

def columnify(src_path, dst_path)
Open3.capture3("columnify",
"-parquetCompressionCodec", @parquet_compression_codec,
"-parquetPageSize", @compress.parquet_page_size.to_s,
"-parquetRowGroupSize", @compress.parquet_row_group_size.to_s,
"-recordType", @record_type.to_s,
"-schemaType", @compress.schema_type.to_s,
"-schemaFile", @compress.schema_file,
"-output", dst_path,
src_path)
end
end
end
end

0 comments on commit ab61912

Please sign in to comment.