diff --git a/velox/dwio/parquet/writer/Writer.cpp b/velox/dwio/parquet/writer/Writer.cpp index f1d7c5d3a537..458d7cf515e5 100644 --- a/velox/dwio/parquet/writer/Writer.cpp +++ b/velox/dwio/parquet/writer/Writer.cpp @@ -135,10 +135,26 @@ Writer::Writer( stream_(std::make_shared( std::move(sink), *generalPool_, - bufferGrowRatio_)), - schema_(schema) { + bufferGrowRatio_)) { arrowContext_ = std::make_shared(); arrowContext_->properties = getArrowParquetWriterOptions(options); + arrowContext_->schema = schema; + + if (arrowContext_->schema) { + // If the input iterator is empty, the writer will do nothing and build a + // empty file. We should at least write the parquet magic header so the + // reader can regonize it is a valid parquet file. So, we initialize the + // writer at first even there is no data. + auto arrowProperties = ::parquet::ArrowWriterProperties::Builder().build(); + PARQUET_ASSIGN_OR_THROW( + arrowContext_->writer, + ::parquet::arrow::FileWriter::Open( + *arrowContext_->schema.get(), + arrow::default_memory_pool(), + stream_, + arrowContext_->properties, + arrowProperties)); + } } Writer::Writer( @@ -200,20 +216,23 @@ void Writer::flush() { */ void Writer::write(const VectorPtr& data) { ArrowArray array; - ArrowSchema schema; exportToArrow(data, array, generalPool_.get()); - exportToArrow(data, schema); std::shared_ptr recordBatch; - if (schema_) { + if (arrowContext_->schema) { PARQUET_ASSIGN_OR_THROW( - recordBatch, arrow::ImportRecordBatch(&array, schema_)); + recordBatch, arrow::ImportRecordBatch(&array, arrowContext_->schema)); } else { + ArrowSchema schema; + exportToArrow(data, schema); PARQUET_ASSIGN_OR_THROW( recordBatch, arrow::ImportRecordBatch(&array, &schema)); } - if (!arrowContext_->schema) { - arrowContext_->schema = recordBatch->schema(); + if (arrowContext_->stagingChunks.empty()) { + if (!arrowContext_->schema) { + arrowContext_->schema = recordBatch->schema(); + } + for (int colIdx = 0; colIdx < arrowContext_->schema->num_fields(); colIdx++) { arrowContext_->stagingChunks.push_back( diff --git a/velox/dwio/parquet/writer/Writer.h b/velox/dwio/parquet/writer/Writer.h index 9361453e63c6..731e96b2da49 100644 --- a/velox/dwio/parquet/writer/Writer.h +++ b/velox/dwio/parquet/writer/Writer.h @@ -93,8 +93,6 @@ class Writer : public dwio::common::Writer { std::shared_ptr stream_; std::shared_ptr arrowContext_; - - std::shared_ptr schema_; }; class ParquetWriterFactory : public dwio::common::WriterFactory {