diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index a223d6559ed..6aa234003ba 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -299,12 +299,19 @@ def test_parquet_read_round_trip_binary_as_string(std_input_path, read_func, rea @pytest.mark.parametrize('compress', parquet_compress_options) @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_parquet_compress_read_round_trip(spark_tmp_path, compress, v1_enabled_list, reader_confs): +@pytest.mark.parametrize('cpu_decompress', [True, False]) +def test_parquet_compress_read_round_trip(spark_tmp_path, compress, v1_enabled_list, reader_confs, cpu_decompress): data_path = spark_tmp_path + '/PARQUET_DATA' with_cpu_session( lambda spark : binary_op_df(spark, long_gen).write.parquet(data_path), conf={'spark.sql.parquet.compression.codec': compress}) all_confs = copy_and_update(reader_confs, {'spark.sql.sources.useV1SourceList': v1_enabled_list}) + if cpu_decompress: + all_confs = copy_and_update(all_confs, { + 'spark.rapids.sql.format.parquet.decompressCpu' : 'true', + 'spark.rapids.sql.format.parquet.decompressCpu.snappy' : 'true', + 'spark.rapids.sql.format.parquet.decompressCpu.zstd' : 'true' + }) assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.parquet(data_path), conf=all_confs) diff --git a/jenkins/databricks/install_deps.py b/jenkins/databricks/install_deps.py index 11e2162957e..23453912827 100644 --- a/jenkins/databricks/install_deps.py +++ b/jenkins/databricks/install_deps.py @@ -135,6 +135,8 @@ def define_deps(spark_version, scala_version): f'{prefix_ws_sp_mvn_hadoop}--org.apache.avro--avro-mapred--org.apache.avro__avro-mapred__*.jar'), Artifact('org.apache.avro', 'avro', f'{prefix_ws_sp_mvn_hadoop}--org.apache.avro--avro--org.apache.avro__avro__*.jar'), + Artifact('com.github.luben', 'zstd-jni', + f'{prefix_ws_sp_mvn_hadoop}--com.github.luben--zstd-jni--com.github.luben__zstd-jni__*.jar'), ] # Parquet diff --git a/scala2.13/shim-deps/databricks/pom.xml b/scala2.13/shim-deps/databricks/pom.xml index 9d6ff787ef1..484e2896f61 100644 --- a/scala2.13/shim-deps/databricks/pom.xml +++ b/scala2.13/shim-deps/databricks/pom.xml @@ -231,6 +231,12 @@ ${spark.version} compile + + com.github.luben + zstd-jni + ${spark.version} + compile + org.apache.arrow arrow-format diff --git a/shim-deps/databricks/pom.xml b/shim-deps/databricks/pom.xml index edfa3d6f896..5f36e529aa7 100644 --- a/shim-deps/databricks/pom.xml +++ b/shim-deps/databricks/pom.xml @@ -231,6 +231,12 @@ ${spark.version} compile + + com.github.luben + zstd-jni + ${spark.version} + compile + org.apache.arrow arrow-format diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/parquet/GpuParquetReader.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/parquet/GpuParquetReader.java index 47b649af6ed..c61f7c6b6f7 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/parquet/GpuParquetReader.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/parquet/GpuParquetReader.java @@ -25,6 +25,7 @@ import scala.collection.Seq; +import com.nvidia.spark.rapids.CpuCompressionConfig$; import com.nvidia.spark.rapids.DateTimeRebaseCorrected$; import com.nvidia.spark.rapids.GpuMetric; import com.nvidia.spark.rapids.GpuParquetUtils; @@ -144,6 +145,7 @@ public org.apache.iceberg.io.CloseableIterator iterator() { partReaderSparkSchema, debugDumpPrefix, debugDumpAlways, maxBatchSizeRows, maxBatchSizeBytes, targetBatchSizeBytes, useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes, + CpuCompressionConfig$.MODULE$.disabled(), metrics, DateTimeRebaseCorrected$.MODULE$, // dateRebaseMode DateTimeRebaseCorrected$.MODULE$, // timestampRebaseMode diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuMultiFileBatchReader.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuMultiFileBatchReader.java index 9c36fe76020..b32e5e755cb 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuMultiFileBatchReader.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuMultiFileBatchReader.java @@ -352,7 +352,8 @@ protected FilePartitionReaderBase createRapidsReader(PartitionedFile[] pFiles, return new MultiFileCloudParquetPartitionReader(conf, pFiles, this::filterParquetBlocks, caseSensitive, parquetDebugDumpPrefix, parquetDebugDumpAlways, maxBatchSizeRows, maxBatchSizeBytes, targetBatchSizeBytes, maxGpuColumnSizeBytes, - useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes, metrics, partitionSchema, + useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes, + CpuCompressionConfig$.MODULE$.disabled(), metrics, partitionSchema, numThreads, maxNumFileProcessed, false, // ignoreMissingFiles false, // ignoreCorruptFiles @@ -411,7 +412,7 @@ protected FilePartitionReaderBase createRapidsReader(PartitionedFile[] pFiles, JavaConverters.asJavaCollection(filteredInfo.parquetBlockMeta.blocks()).stream() .map(b -> ParquetSingleDataBlockMeta.apply( filteredInfo.parquetBlockMeta.filePath(), - ParquetDataBlock.apply(b), + ParquetDataBlock.apply(b, CpuCompressionConfig$.MODULE$.disabled()), InternalRow.empty(), ParquetSchemaWrapper.apply(filteredInfo.parquetBlockMeta.schema()), filteredInfo.parquetBlockMeta.readSchema(), @@ -431,6 +432,7 @@ protected FilePartitionReaderBase createRapidsReader(PartitionedFile[] pFiles, caseSensitive, parquetDebugDumpPrefix, parquetDebugDumpAlways, maxBatchSizeRows, maxBatchSizeBytes, targetBatchSizeBytes, maxGpuColumnSizeBytes, useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes, + CpuCompressionConfig$.MODULE$.disabled(), metrics, partitionSchema, numThreads, false, // ignoreMissingFiles false, // ignoreCorruptFiles diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index e38dab50d72..03eb48de6fb 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -16,7 +16,7 @@ package com.nvidia.spark.rapids -import java.io.{Closeable, EOFException, FileNotFoundException, IOException, OutputStream} +import java.io.{Closeable, EOFException, FileNotFoundException, InputStream, IOException, OutputStream} import java.net.URI import java.nio.ByteBuffer import java.nio.channels.SeekableByteChannel @@ -31,6 +31,7 @@ import scala.collection.mutable.ArrayBuffer import scala.language.implicitConversions import ai.rapids.cudf._ +import com.github.luben.zstd.ZstdDecompressCtx import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.GpuMetric._ import com.nvidia.spark.rapids.ParquetPartitionReader.{CopyRange, LocalCopy} @@ -47,6 +48,7 @@ import org.apache.parquet.bytes.BytesUtils import org.apache.parquet.bytes.BytesUtils.readIntLittleEndian import org.apache.parquet.column.ColumnDescriptor import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.format.Util import org.apache.parquet.format.converter.ParquetMetadataConverter import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat} import org.apache.parquet.hadoop.ParquetFileWriter.MAGIC @@ -54,6 +56,7 @@ import org.apache.parquet.hadoop.metadata._ import org.apache.parquet.io.{InputFile, SeekableInputStream} import org.apache.parquet.schema.{DecimalMetadata, GroupType, MessageType, OriginalType, PrimitiveType, Type} import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName +import org.xerial.snappy.Snappy import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast @@ -1106,6 +1109,7 @@ case class GpuParquetMultiFilePartitionReaderFactory( }.getOrElse(rapidsConf.getMultithreadedReaderKeepOrder) private val alluxioReplacementTaskTime = AlluxioCfgUtils.enabledAlluxioReplacementAlgoTaskTime(rapidsConf) + private val compressCfg = CpuCompressionConfig.forParquet(rapidsConf) // We can't use the coalescing files reader when InputFileName, InputFileBlockStart, // or InputFileBlockLength because we are combining all the files into a single buffer @@ -1137,7 +1141,7 @@ case class GpuParquetMultiFilePartitionReaderFactory( new MultiFileCloudParquetPartitionReader(conf, files, filterFunc, isCaseSensitive, debugDumpPrefix, debugDumpAlways, maxReadBatchSizeRows, maxReadBatchSizeBytes, targetBatchSizeBytes, maxGpuColumnSizeBytes, - useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes, + useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes, compressCfg, metrics, partitionSchema, numThreads, maxNumFileProcessed, ignoreMissingFiles, ignoreCorruptFiles, readUseFieldId, alluxioPathReplacementMap.getOrElse(Map.empty), alluxioReplacementTaskTime, queryUsesInputFile, keepReadsInOrderFromConf, combineConf) @@ -1244,7 +1248,7 @@ case class GpuParquetMultiFilePartitionReaderFactory( clippedBlocks ++= singleFileInfo.blocks.map(block => ParquetSingleDataBlockMeta( singleFileInfo.filePath, - ParquetDataBlock(block), + ParquetDataBlock(block, compressCfg), metaAndFile.file.partitionValues, ParquetSchemaWrapper(singleFileInfo.schema), singleFileInfo.readSchema, @@ -1262,7 +1266,7 @@ case class GpuParquetMultiFilePartitionReaderFactory( new MultiFileParquetPartitionReader(conf, files, clippedBlocks.toSeq, isCaseSensitive, debugDumpPrefix, debugDumpAlways, maxReadBatchSizeRows, maxReadBatchSizeBytes, targetBatchSizeBytes, maxGpuColumnSizeBytes, - useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes, + useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes, compressCfg, metrics, partitionSchema, numThreads, ignoreMissingFiles, ignoreCorruptFiles, readUseFieldId) } @@ -1307,6 +1311,7 @@ case class GpuParquetPartitionReaderFactory( private val readUseFieldId = ParquetSchemaClipShims.useFieldId(sqlConf) private val footerReadType = GpuParquetScan.footerReaderHeuristic( rapidsConf.parquetReaderFooterType, dataSchema, readDataSchema, readUseFieldId) + private val compressCfg = CpuCompressionConfig.forParquet(rapidsConf) override def supportColumnarReads(partition: InputPartition): Boolean = true @@ -1335,12 +1340,29 @@ case class GpuParquetPartitionReaderFactory( new ParquetPartitionReader(conf, file, singleFileInfo.filePath, singleFileInfo.blocks, singleFileInfo.schema, isCaseSensitive, readDataSchema, debugDumpPrefix, debugDumpAlways, maxReadBatchSizeRows, maxReadBatchSizeBytes, targetSizeBytes, - useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes, + useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes, compressCfg, metrics, singleFileInfo.dateRebaseMode, singleFileInfo.timestampRebaseMode, singleFileInfo.hasInt96Timestamps, readUseFieldId) } } +case class CpuCompressionConfig( + decompressSnappyCpu: Boolean, + decompressZstdCpu: Boolean) { + val decompressAnyCpu: Boolean = decompressSnappyCpu || decompressZstdCpu +} + +object CpuCompressionConfig { + def forParquet(conf: RapidsConf): CpuCompressionConfig = { + val cpuEnable = conf.parquetDecompressCpu + CpuCompressionConfig( + decompressSnappyCpu = cpuEnable && conf.parquetDecompressCpuSnappy, + decompressZstdCpu = cpuEnable && conf.parquetDecompressCpuZstd) + } + + def disabled(): CpuCompressionConfig = CpuCompressionConfig(false, false) +} + trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics with MultiFileReaderFunctions { // the size of Parquet magic (at start+end) and footer length values @@ -1353,6 +1375,8 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics def isSchemaCaseSensitive: Boolean + def compressCfg: CpuCompressionConfig + val copyBufferSize = conf.getInt("parquet.read.allocation.size", 8 * 1024 * 1024) def checkIfNeedToSplitBlocks(currentDateRebaseMode: DateTimeRebaseMode, @@ -1418,13 +1442,8 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics schema: MessageType, handleCoalesceFiles: Boolean): Long = { // start with the size of Parquet magic (at start+end) and footer length values - var size: Long = PARQUET_META_SIZE - - // Calculate the total amount of column data that will be copied - // NOTE: Avoid using block.getTotalByteSize here as that is the - // uncompressed size rather than the size in the file. - size += currentChunkedBlocks.flatMap(_.getColumns.asScala.map(_.getTotalSize)).sum - + val headerSize: Long = PARQUET_META_SIZE + val blocksSize = ParquetPartitionReader.computeOutputSize(currentChunkedBlocks, compressCfg) val footerSize = calculateParquetFooterSize(currentChunkedBlocks, schema) val extraMemory = if (handleCoalesceFiles) { val numCols = currentChunkedBlocks.head.getColumns().size() @@ -1432,8 +1451,7 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics } else { 0 } - val totalSize = size + footerSize + extraMemory - totalSize + headerSize + blocksSize + footerSize + extraMemory } protected def writeFooter( @@ -1532,7 +1550,7 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics * metadata but with the file offsets updated to reflect the new position of the column data * as written to the output. * - * @param in the input stream for the original Parquet file + * @param filePath the path to the Parquet file * @param out the output stream to receive the data * @param blocks block metadata from the original file that will appear in the computed file * @param realStartOffset starting file offset of the first block @@ -1575,6 +1593,258 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics computeBlockMetaData(blocks, realStartOffset) } + private class BufferedFileInput( + filePath: Path, + blocks: Seq[BlockMetaData], + metrics: Map[String, GpuMetric]) extends InputStream { + private[this] val in = filePath.getFileSystem(conf).open(filePath) + private[this] val buffer: Array[Byte] = new Array[Byte](copyBufferSize) + private[this] var bufferSize: Int = 0 + private[this] var bufferFilePos: Long = in.getPos + private[this] var bufferPos: Int = 0 + private[this] val columnIter = blocks.flatMap(_.getColumns.asScala).iterator + private[this] var currentColumn: Option[ColumnChunkMetaData] = None + private[this] val readTime: GpuMetric = metrics.getOrElse(READ_FS_TIME, NoopMetric) + + override def read(): Int = { + while (bufferPos == bufferSize) { + fillBuffer() + } + val result = buffer(bufferPos) + bufferPos += 1 + result + } + + override def read(b: Array[Byte]): Int = read(b, 0, b.length) + + override def read(dest: Array[Byte], off: Int, len: Int): Int = { + var bytesLeft = len + while (bytesLeft > 0) { + if (bufferPos == bufferSize) { + fillBuffer() + } + val numBytes = Math.min(bytesLeft, bufferSize - bufferPos) + System.arraycopy(buffer, bufferPos, dest, off + len - bytesLeft, numBytes) + bufferPos += numBytes + bytesLeft -= numBytes + } + len + } + + def read(out: HostMemoryOutputStream, len: Long): Unit = { + var bytesLeft = len + while (bytesLeft > 0) { + if (bufferPos == bufferSize) { + fillBuffer() + } + // downcast is safe because bufferSize is an int + val numBytes = Math.min(bytesLeft, bufferSize - bufferPos).toInt + out.write(buffer, bufferPos, numBytes) + bufferPos += numBytes + bytesLeft -= numBytes + } + } + + def read(out: HostMemoryBuffer, len: Long): Unit = { + var bytesLeft = len + while (bytesLeft > 0) { + if (bufferPos == bufferSize) { + fillBuffer() + } + // downcast is safe because bufferSize is an int + val numBytes = Math.min(bytesLeft, bufferSize - bufferPos).toInt + out.setBytes(len - bytesLeft, buffer, bufferPos, numBytes) + bufferPos += numBytes + bytesLeft -= numBytes + } + } + + override def skip(n: Long): Long = { + seek(getPos + n) + n + } + + def getPos: Long = bufferFilePos + bufferPos + + def seek(desiredPos: Long): Unit = { + require(desiredPos >= getPos, "Only supports seeking forward") + val posDiff = desiredPos - bufferFilePos + if (posDiff >= 0 && posDiff < bufferSize) { + bufferPos = posDiff.toInt + } else { + in.seek(desiredPos) + bufferFilePos = desiredPos + bufferSize = 0 + bufferPos = 0 + } + } + + override def close(): Unit = { + readTime.ns { + in.close() + } + } + + private def fillBuffer(): Unit = { + // TODO: Add FileCache support https://github.com/NVIDIA/spark-rapids/issues/11775 + var bytesToCopy = currentColumn.map { c => + Math.max(0, c.getStartingPos + c.getTotalSize - getPos) + }.getOrElse(0L) + var done = bytesToCopy >= buffer.length + while (!done && columnIter.hasNext) { + val column = columnIter.next() + currentColumn = Some(column) + done = if (getPos + bytesToCopy == column.getStartingPos) { + bytesToCopy += column.getTotalSize + bytesToCopy >= buffer.length + } else { + true + } + } + if (bytesToCopy <= 0) { + throw new EOFException("read beyond column data range") + } + bufferFilePos = in.getPos + bufferPos = 0 + bufferSize = Math.min(bytesToCopy, buffer.length).toInt + readTime.ns { + in.readFully(buffer, 0, bufferSize) + } + } + } + + /** + * Copies the data corresponding to the clipped blocks in the original file and compute the + * block metadata for the output. The output blocks will contain the same column chunk + * metadata but with the file offsets updated to reflect the new position of the column data + * as written to the output. + * + * @param filePath the path to the Parquet file + * @param out the output stream to receive the data + * @param blocks block metadata from the original file that will appear in the computed file + * @param realStartOffset starting file offset of the first block + * @return updated block metadata corresponding to the output + */ + protected def copyAndUncompressBlocksData( + filePath: Path, + out: HostMemoryOutputStream, + blocks: Seq[BlockMetaData], + realStartOffset: Long, + metrics: Map[String, GpuMetric], + compressCfg: CpuCompressionConfig): Seq[BlockMetaData] = { + val outStartPos = out.getPos + val writeTime = metrics.getOrElse(WRITE_BUFFER_TIME, NoopMetric) + withResource(new BufferedFileInput(filePath, blocks, metrics)) { in => + val newBlocks = blocks.map { block => + val newColumns = block.getColumns.asScala.map { column => + var columnTotalSize = column.getTotalSize + var columnCodec = column.getCodec + val columnStartingPos = realStartOffset + out.getPos - outStartPos + val columnDictOffset = if (column.getDictionaryPageOffset > 0) { + column.getDictionaryPageOffset + columnStartingPos - column.getStartingPos + } else { + 0 + } + writeTime.ns { + columnCodec match { + case CompressionCodecName.SNAPPY if compressCfg.decompressSnappyCpu => + val columnStartPos = out.getPos + decompressSnappy(in, out, column) + columnCodec = CompressionCodecName.UNCOMPRESSED + columnTotalSize = out.getPos - columnStartPos + case CompressionCodecName.ZSTD if compressCfg.decompressZstdCpu => + val columnStartPos = out.getPos + decompressZstd(in, out, column) + columnCodec = CompressionCodecName.UNCOMPRESSED + columnTotalSize = out.getPos - columnStartPos + case _ => + in.seek(column.getStartingPos) + in.read(out, columnTotalSize) + } + } + ColumnChunkMetaData.get( + column.getPath, + column.getPrimitiveType, + columnCodec, + column.getEncodingStats, + column.getEncodings, + column.getStatistics, + columnStartingPos, + columnDictOffset, + column.getValueCount, + columnTotalSize, + columnTotalSize) + } + GpuParquetUtils.newBlockMeta(block.getRowCount, newColumns.toSeq) + } + newBlocks + } + } + + private def decompressSnappy( + in: BufferedFileInput, + out: HostMemoryOutputStream, + column: ColumnChunkMetaData): Unit = { + val endPos = column.getStartingPos + column.getTotalSize + in.seek(column.getStartingPos) + var inData: Option[HostMemoryBuffer] = None + try { + while (in.getPos != endPos) { + val pageHeader = Util.readPageHeader(in) + val compressedSize = pageHeader.getCompressed_page_size + val uncompressedSize = pageHeader.getUncompressed_page_size + pageHeader.unsetCrc() + pageHeader.setCompressed_page_size(uncompressedSize) + Util.writePageHeader(pageHeader, out) + if (inData.map(_.getLength).getOrElse(0L) < compressedSize) { + inData.foreach(_.close()) + inData = Some(HostMemoryBuffer.allocate(compressedSize, false)) + } + inData.foreach { compressedBuffer => + in.read(compressedBuffer, compressedSize) + val bbIn = compressedBuffer.asByteBuffer(0, compressedSize) + val bbOut = out.writeAsByteBuffer(uncompressedSize) + Snappy.uncompress(bbIn, bbOut) + } + } + } finally { + inData.foreach(_.close()) + } + } + + private def decompressZstd( + in: BufferedFileInput, + out: HostMemoryOutputStream, + column: ColumnChunkMetaData): Unit = { + val endPos = column.getStartingPos + column.getTotalSize + in.seek(column.getStartingPos) + var inData: Option[HostMemoryBuffer] = None + try { + withResource(new ZstdDecompressCtx()) { ctx => + while (in.getPos != endPos) { + val pageHeader = Util.readPageHeader(in) + val compressedSize = pageHeader.getCompressed_page_size + val uncompressedSize = pageHeader.getUncompressed_page_size + pageHeader.unsetCrc() + pageHeader.setCompressed_page_size(uncompressedSize) + Util.writePageHeader(pageHeader, out) + if (inData.map(_.getLength).getOrElse(0L) < compressedSize) { + inData.foreach(_.close()) + inData = Some(HostMemoryBuffer.allocate(compressedSize, false)) + } + inData.foreach { compressedBuffer => + in.read(compressedBuffer, compressedSize) + val bbIn = compressedBuffer.asByteBuffer(0, compressedSize) + val bbOut = out.writeAsByteBuffer(uncompressedSize) + ctx.decompress(bbOut, bbIn) + } + } + } + } finally { + inData.foreach(_.close()) + } + } + private def copyRemoteBlocksData( remoteCopies: Seq[CopyRange], filePath: Path, @@ -1666,7 +1936,11 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics closeOnExcept(HostMemoryBuffer.allocate(estTotalSize)) { hmb => val out = new HostMemoryOutputStream(hmb) out.write(ParquetPartitionReader.PARQUET_MAGIC) - val outputBlocks = copyBlocksData(filePath, out, blocks, out.getPos, metrics) + val outputBlocks = if (compressCfg.decompressAnyCpu) { + copyAndUncompressBlocksData(filePath, out, blocks, out.getPos, metrics, compressCfg) + } else { + copyBlocksData(filePath, out, blocks, out.getPos, metrics) + } val footerPos = out.getPos writeFooter(out, outputBlocks, clippedSchema) BytesUtils.writeIntLittleEndian(out, (out.getPos - footerPos).toInt) @@ -1802,7 +2076,7 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics block.asInstanceOf[ParquetDataBlock].dataBlock implicit def toDataBlockBase(blocks: Seq[BlockMetaData]): Seq[DataBlockBase] = - blocks.map(ParquetDataBlock) + blocks.map(b => ParquetDataBlock(b, compressCfg)) implicit def toBlockMetaDataSeq(blocks: Seq[DataBlockBase]): Seq[BlockMetaData] = blocks.map(_.asInstanceOf[ParquetDataBlock].dataBlock) @@ -1814,10 +2088,14 @@ private case class ParquetSchemaWrapper(schema: MessageType) extends SchemaBase } // Parquet BlockMetaData wrapper -private case class ParquetDataBlock(dataBlock: BlockMetaData) extends DataBlockBase { +private case class ParquetDataBlock( + dataBlock: BlockMetaData, + compressCfg: CpuCompressionConfig) extends DataBlockBase { override def getRowCount: Long = dataBlock.getRowCount override def getReadDataSize: Long = dataBlock.getTotalByteSize - override def getBlockSize: Long = dataBlock.getColumns.asScala.map(_.getTotalSize).sum + override def getBlockSize: Long = { + ParquetPartitionReader.computeOutputSize(dataBlock, compressCfg) + } } /** Parquet extra information containing rebase modes and whether there is int96 timestamp */ @@ -1876,6 +2154,7 @@ class MultiFileParquetPartitionReader( maxGpuColumnSizeBytes: Long, useChunkedReader: Boolean, maxChunkedReaderMemoryUsageSizeBytes: Long, + override val compressCfg: CpuCompressionConfig, override val execMetrics: Map[String, GpuMetric], partitionSchema: StructType, numThreads: Int, @@ -1900,7 +2179,8 @@ class MultiFileParquetPartitionReader( file: Path, outhmb: HostMemoryBuffer, blocks: ArrayBuffer[DataBlockBase], - offset: Long) + offset: Long, + compressCfg: CpuCompressionConfig) extends Callable[(Seq[DataBlockBase], Long)] { override def call(): (Seq[DataBlockBase], Long) = { @@ -1909,7 +2189,11 @@ class MultiFileParquetPartitionReader( val startBytesRead = fileSystemBytesRead() val outputBlocks = withResource(outhmb) { _ => withResource(new HostMemoryOutputStream(outhmb)) { out => - copyBlocksData(file, out, blocks.toSeq, offset, metrics) + if (compressCfg.decompressAnyCpu) { + copyAndUncompressBlocksData(file, out, blocks.toSeq, offset, metrics, compressCfg) + } else { + copyBlocksData(file, out, blocks.toSeq, offset, metrics) + } } } val bytesRead = fileSystemBytesRead() - startBytesRead @@ -1961,7 +2245,7 @@ class MultiFileParquetPartitionReader( blocks: ArrayBuffer[DataBlockBase], offset: Long, batchContext: BatchContext): Callable[(Seq[DataBlockBase], Long)] = { - new ParquetCopyBlocksRunner(taskContext, file, outhmb, blocks, offset) + new ParquetCopyBlocksRunner(taskContext, file, outhmb, blocks, offset, compressCfg) } override final def getFileFormatShortName: String = "Parquet" @@ -2072,6 +2356,7 @@ class MultiFileCloudParquetPartitionReader( maxGpuColumnSizeBytes: Long, useChunkedReader: Boolean, maxChunkedReaderMemoryUsageSizeBytes: Long, + override val compressCfg: CpuCompressionConfig, override val execMetrics: Map[String, GpuMetric], partitionSchema: StructType, numThreads: Int, @@ -2761,6 +3046,7 @@ class ParquetPartitionReader( targetBatchSizeBytes: Long, useChunkedReader: Boolean, maxChunkedReaderMemoryUsageSizeBytes: Long, + override val compressCfg: CpuCompressionConfig, override val execMetrics: Map[String, GpuMetric], dateRebaseMode: DateTimeRebaseMode, timestampRebaseMode: DateTimeRebaseMode, @@ -2873,26 +3159,34 @@ object ParquetPartitionReader { length: Long, outputOffset: Long) extends CopyItem - /** - * Build a new BlockMetaData - * - * @param rowCount the number of rows in this block - * @param columns the new column chunks to reference in the new BlockMetaData - * @return the new BlockMetaData - */ - private[rapids] def newParquetBlock( - rowCount: Long, - columns: Seq[ColumnChunkMetaData]): BlockMetaData = { - val block = new BlockMetaData - block.setRowCount(rowCount) + private[rapids] def computeOutputSize( + blocks: Seq[BlockMetaData], + compressCfg: CpuCompressionConfig): Long = { + blocks.map { block => + computeOutputSize(block, compressCfg) + }.sum + } - var totalSize: Long = 0 - columns.foreach { column => - block.addColumn(column) - totalSize += column.getTotalUncompressedSize + private[rapids] def computeOutputSize( + block: BlockMetaData, + compressCfg: CpuCompressionConfig): Long = { + if (compressCfg.decompressAnyCpu) { + block.getColumns.asScala.map { c => + if ((c.getCodec == CompressionCodecName.SNAPPY && compressCfg.decompressSnappyCpu) + || (c.getCodec == CompressionCodecName.ZSTD && compressCfg.decompressZstdCpu)) { + // Page headers need to be rewritten when CPU decompresses, and that may + // increase the size of the page header. Guess how many pages there may be + // and add a fudge factor per page to try to avoid a late realloc+copy. + // NOTE: Avoid using block.getTotalByteSize as that is the + // uncompressed size rather than the size in the file. + val estimatedPageCount = (c.getTotalUncompressedSize / (1024 * 1024)) + 1 + c.getTotalUncompressedSize + estimatedPageCount * 8 + } else { + c.getTotalSize + } + }.sum + } else { + block.getColumns.asScala.map(_.getTotalSize).sum } - block.setTotalByteSize(totalSize) - - block } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostMemoryStreams.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostMemoryStreams.scala index 08fe5be50b2..4be11b13254 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostMemoryStreams.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostMemoryStreams.scala @@ -54,6 +54,12 @@ class HostMemoryOutputStream(val buffer: HostMemoryBuffer) extends OutputStream pos += numBytes } + def writeAsByteBuffer(length: Int): ByteBuffer = { + val bb = buffer.asByteBuffer(pos, length) + pos += length + bb + } + def getPos: Long = pos def seek(newPos: Long): Unit = { @@ -132,6 +138,12 @@ trait HostMemoryInputStreamMixIn extends InputStream { } } + def readByteBuffer(length: Int): ByteBuffer = { + val bb = hmb.asByteBuffer(pos, length) + pos += length + bb + } + override def skip(count: Long): Long = { val oldPos = pos pos = Math.min(pos + count, hmbLength) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index ab7a788d205..406aeb0365b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -1120,6 +1120,31 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") .checkValues(RapidsReaderType.values.map(_.toString)) .createWithDefault(RapidsReaderType.AUTO.toString) + val PARQUET_DECOMPRESS_CPU = + conf("spark.rapids.sql.format.parquet.decompressCpu") + .doc("If true then the CPU is eligible to decompress Parquet data rather than the GPU. " + + s"See other spark.rapids.sql.format.parquet.decompressCpu.* configuration settings " + + "to control this for specific compression codecs.") + .internal() + .booleanConf + .createWithDefault(false) + + val PARQUET_DECOMPRESS_CPU_SNAPPY = + conf("spark.rapids.sql.format.parquet.decompressCpu.snappy") + .doc(s"If true and $PARQUET_DECOMPRESS_CPU is true then the CPU decompresses " + + "Parquet Snappy data rather than the GPU") + .internal() + .booleanConf + .createWithDefault(true) + + val PARQUET_DECOMPRESS_CPU_ZSTD = + conf("spark.rapids.sql.format.parquet.decompressCpu.zstd") + .doc(s"If true and $PARQUET_DECOMPRESS_CPU is true then the CPU decompresses " + + "Parquet Zstandard data rather than the GPU") + .internal() + .booleanConf + .createWithDefault(true) + val READER_MULTITHREADED_COMBINE_THRESHOLD = conf("spark.rapids.sql.reader.multithreaded.combine.sizeBytes") .doc("The target size in bytes to combine multiple small files together when using the " + @@ -2960,6 +2985,12 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isParquetMultiThreadReadEnabled: Boolean = isParquetAutoReaderEnabled || RapidsReaderType.withName(get(PARQUET_READER_TYPE)) == RapidsReaderType.MULTITHREADED + lazy val parquetDecompressCpu: Boolean = get(PARQUET_DECOMPRESS_CPU) + + lazy val parquetDecompressCpuSnappy: Boolean = get(PARQUET_DECOMPRESS_CPU_SNAPPY) + + lazy val parquetDecompressCpuZstd: Boolean = get(PARQUET_DECOMPRESS_CPU_ZSTD) + lazy val maxNumParquetFilesParallel: Int = get(PARQUET_MULTITHREAD_READ_MAX_NUM_FILES_PARALLEL) lazy val isParquetReadEnabled: Boolean = get(ENABLE_PARQUET_READ)