diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py
index a223d6559ed..6aa234003ba 100644
--- a/integration_tests/src/main/python/parquet_test.py
+++ b/integration_tests/src/main/python/parquet_test.py
@@ -299,12 +299,19 @@ def test_parquet_read_round_trip_binary_as_string(std_input_path, read_func, rea
@pytest.mark.parametrize('compress', parquet_compress_options)
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
-def test_parquet_compress_read_round_trip(spark_tmp_path, compress, v1_enabled_list, reader_confs):
+@pytest.mark.parametrize('cpu_decompress', [True, False])
+def test_parquet_compress_read_round_trip(spark_tmp_path, compress, v1_enabled_list, reader_confs, cpu_decompress):
data_path = spark_tmp_path + '/PARQUET_DATA'
with_cpu_session(
lambda spark : binary_op_df(spark, long_gen).write.parquet(data_path),
conf={'spark.sql.parquet.compression.codec': compress})
all_confs = copy_and_update(reader_confs, {'spark.sql.sources.useV1SourceList': v1_enabled_list})
+ if cpu_decompress:
+ all_confs = copy_and_update(all_confs, {
+ 'spark.rapids.sql.format.parquet.decompressCpu' : 'true',
+ 'spark.rapids.sql.format.parquet.decompressCpu.snappy' : 'true',
+ 'spark.rapids.sql.format.parquet.decompressCpu.zstd' : 'true'
+ })
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.parquet(data_path),
conf=all_confs)
diff --git a/jenkins/databricks/install_deps.py b/jenkins/databricks/install_deps.py
index 11e2162957e..23453912827 100644
--- a/jenkins/databricks/install_deps.py
+++ b/jenkins/databricks/install_deps.py
@@ -135,6 +135,8 @@ def define_deps(spark_version, scala_version):
f'{prefix_ws_sp_mvn_hadoop}--org.apache.avro--avro-mapred--org.apache.avro__avro-mapred__*.jar'),
Artifact('org.apache.avro', 'avro',
f'{prefix_ws_sp_mvn_hadoop}--org.apache.avro--avro--org.apache.avro__avro__*.jar'),
+ Artifact('com.github.luben', 'zstd-jni',
+ f'{prefix_ws_sp_mvn_hadoop}--com.github.luben--zstd-jni--com.github.luben__zstd-jni__*.jar'),
]
# Parquet
diff --git a/scala2.13/shim-deps/databricks/pom.xml b/scala2.13/shim-deps/databricks/pom.xml
index 9d6ff787ef1..484e2896f61 100644
--- a/scala2.13/shim-deps/databricks/pom.xml
+++ b/scala2.13/shim-deps/databricks/pom.xml
@@ -231,6 +231,12 @@
${spark.version}
compile
+
+ com.github.luben
+ zstd-jni
+ ${spark.version}
+ compile
+
org.apache.arrow
arrow-format
diff --git a/shim-deps/databricks/pom.xml b/shim-deps/databricks/pom.xml
index edfa3d6f896..5f36e529aa7 100644
--- a/shim-deps/databricks/pom.xml
+++ b/shim-deps/databricks/pom.xml
@@ -231,6 +231,12 @@
${spark.version}
compile
+
+ com.github.luben
+ zstd-jni
+ ${spark.version}
+ compile
+
org.apache.arrow
arrow-format
diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/parquet/GpuParquetReader.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/parquet/GpuParquetReader.java
index 47b649af6ed..c61f7c6b6f7 100644
--- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/parquet/GpuParquetReader.java
+++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/parquet/GpuParquetReader.java
@@ -25,6 +25,7 @@
import scala.collection.Seq;
+import com.nvidia.spark.rapids.CpuCompressionConfig$;
import com.nvidia.spark.rapids.DateTimeRebaseCorrected$;
import com.nvidia.spark.rapids.GpuMetric;
import com.nvidia.spark.rapids.GpuParquetUtils;
@@ -144,6 +145,7 @@ public org.apache.iceberg.io.CloseableIterator iterator() {
partReaderSparkSchema, debugDumpPrefix, debugDumpAlways,
maxBatchSizeRows, maxBatchSizeBytes, targetBatchSizeBytes, useChunkedReader,
maxChunkedReaderMemoryUsageSizeBytes,
+ CpuCompressionConfig$.MODULE$.disabled(),
metrics,
DateTimeRebaseCorrected$.MODULE$, // dateRebaseMode
DateTimeRebaseCorrected$.MODULE$, // timestampRebaseMode
diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuMultiFileBatchReader.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuMultiFileBatchReader.java
index 9c36fe76020..b32e5e755cb 100644
--- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuMultiFileBatchReader.java
+++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuMultiFileBatchReader.java
@@ -352,7 +352,8 @@ protected FilePartitionReaderBase createRapidsReader(PartitionedFile[] pFiles,
return new MultiFileCloudParquetPartitionReader(conf, pFiles,
this::filterParquetBlocks, caseSensitive, parquetDebugDumpPrefix, parquetDebugDumpAlways,
maxBatchSizeRows, maxBatchSizeBytes, targetBatchSizeBytes, maxGpuColumnSizeBytes,
- useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes, metrics, partitionSchema,
+ useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes,
+ CpuCompressionConfig$.MODULE$.disabled(), metrics, partitionSchema,
numThreads, maxNumFileProcessed,
false, // ignoreMissingFiles
false, // ignoreCorruptFiles
@@ -411,7 +412,7 @@ protected FilePartitionReaderBase createRapidsReader(PartitionedFile[] pFiles,
JavaConverters.asJavaCollection(filteredInfo.parquetBlockMeta.blocks()).stream()
.map(b -> ParquetSingleDataBlockMeta.apply(
filteredInfo.parquetBlockMeta.filePath(),
- ParquetDataBlock.apply(b),
+ ParquetDataBlock.apply(b, CpuCompressionConfig$.MODULE$.disabled()),
InternalRow.empty(),
ParquetSchemaWrapper.apply(filteredInfo.parquetBlockMeta.schema()),
filteredInfo.parquetBlockMeta.readSchema(),
@@ -431,6 +432,7 @@ protected FilePartitionReaderBase createRapidsReader(PartitionedFile[] pFiles,
caseSensitive, parquetDebugDumpPrefix, parquetDebugDumpAlways,
maxBatchSizeRows, maxBatchSizeBytes, targetBatchSizeBytes, maxGpuColumnSizeBytes,
useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes,
+ CpuCompressionConfig$.MODULE$.disabled(),
metrics, partitionSchema, numThreads,
false, // ignoreMissingFiles
false, // ignoreCorruptFiles
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala
index e38dab50d72..03eb48de6fb 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala
@@ -16,7 +16,7 @@
package com.nvidia.spark.rapids
-import java.io.{Closeable, EOFException, FileNotFoundException, IOException, OutputStream}
+import java.io.{Closeable, EOFException, FileNotFoundException, InputStream, IOException, OutputStream}
import java.net.URI
import java.nio.ByteBuffer
import java.nio.channels.SeekableByteChannel
@@ -31,6 +31,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.language.implicitConversions
import ai.rapids.cudf._
+import com.github.luben.zstd.ZstdDecompressCtx
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.GpuMetric._
import com.nvidia.spark.rapids.ParquetPartitionReader.{CopyRange, LocalCopy}
@@ -47,6 +48,7 @@ import org.apache.parquet.bytes.BytesUtils
import org.apache.parquet.bytes.BytesUtils.readIntLittleEndian
import org.apache.parquet.column.ColumnDescriptor
import org.apache.parquet.filter2.predicate.FilterApi
+import org.apache.parquet.format.Util
import org.apache.parquet.format.converter.ParquetMetadataConverter
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat}
import org.apache.parquet.hadoop.ParquetFileWriter.MAGIC
@@ -54,6 +56,7 @@ import org.apache.parquet.hadoop.metadata._
import org.apache.parquet.io.{InputFile, SeekableInputStream}
import org.apache.parquet.schema.{DecimalMetadata, GroupType, MessageType, OriginalType, PrimitiveType, Type}
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
+import org.xerial.snappy.Snappy
import org.apache.spark.TaskContext
import org.apache.spark.broadcast.Broadcast
@@ -1106,6 +1109,7 @@ case class GpuParquetMultiFilePartitionReaderFactory(
}.getOrElse(rapidsConf.getMultithreadedReaderKeepOrder)
private val alluxioReplacementTaskTime =
AlluxioCfgUtils.enabledAlluxioReplacementAlgoTaskTime(rapidsConf)
+ private val compressCfg = CpuCompressionConfig.forParquet(rapidsConf)
// We can't use the coalescing files reader when InputFileName, InputFileBlockStart,
// or InputFileBlockLength because we are combining all the files into a single buffer
@@ -1137,7 +1141,7 @@ case class GpuParquetMultiFilePartitionReaderFactory(
new MultiFileCloudParquetPartitionReader(conf, files, filterFunc, isCaseSensitive,
debugDumpPrefix, debugDumpAlways, maxReadBatchSizeRows, maxReadBatchSizeBytes,
targetBatchSizeBytes, maxGpuColumnSizeBytes,
- useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes,
+ useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes, compressCfg,
metrics, partitionSchema, numThreads, maxNumFileProcessed, ignoreMissingFiles,
ignoreCorruptFiles, readUseFieldId, alluxioPathReplacementMap.getOrElse(Map.empty),
alluxioReplacementTaskTime, queryUsesInputFile, keepReadsInOrderFromConf, combineConf)
@@ -1244,7 +1248,7 @@ case class GpuParquetMultiFilePartitionReaderFactory(
clippedBlocks ++= singleFileInfo.blocks.map(block =>
ParquetSingleDataBlockMeta(
singleFileInfo.filePath,
- ParquetDataBlock(block),
+ ParquetDataBlock(block, compressCfg),
metaAndFile.file.partitionValues,
ParquetSchemaWrapper(singleFileInfo.schema),
singleFileInfo.readSchema,
@@ -1262,7 +1266,7 @@ case class GpuParquetMultiFilePartitionReaderFactory(
new MultiFileParquetPartitionReader(conf, files, clippedBlocks.toSeq, isCaseSensitive,
debugDumpPrefix, debugDumpAlways, maxReadBatchSizeRows, maxReadBatchSizeBytes,
targetBatchSizeBytes, maxGpuColumnSizeBytes,
- useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes,
+ useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes, compressCfg,
metrics, partitionSchema, numThreads, ignoreMissingFiles, ignoreCorruptFiles, readUseFieldId)
}
@@ -1307,6 +1311,7 @@ case class GpuParquetPartitionReaderFactory(
private val readUseFieldId = ParquetSchemaClipShims.useFieldId(sqlConf)
private val footerReadType = GpuParquetScan.footerReaderHeuristic(
rapidsConf.parquetReaderFooterType, dataSchema, readDataSchema, readUseFieldId)
+ private val compressCfg = CpuCompressionConfig.forParquet(rapidsConf)
override def supportColumnarReads(partition: InputPartition): Boolean = true
@@ -1335,12 +1340,29 @@ case class GpuParquetPartitionReaderFactory(
new ParquetPartitionReader(conf, file, singleFileInfo.filePath, singleFileInfo.blocks,
singleFileInfo.schema, isCaseSensitive, readDataSchema, debugDumpPrefix, debugDumpAlways,
maxReadBatchSizeRows, maxReadBatchSizeBytes, targetSizeBytes,
- useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes,
+ useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes, compressCfg,
metrics, singleFileInfo.dateRebaseMode,
singleFileInfo.timestampRebaseMode, singleFileInfo.hasInt96Timestamps, readUseFieldId)
}
}
+case class CpuCompressionConfig(
+ decompressSnappyCpu: Boolean,
+ decompressZstdCpu: Boolean) {
+ val decompressAnyCpu: Boolean = decompressSnappyCpu || decompressZstdCpu
+}
+
+object CpuCompressionConfig {
+ def forParquet(conf: RapidsConf): CpuCompressionConfig = {
+ val cpuEnable = conf.parquetDecompressCpu
+ CpuCompressionConfig(
+ decompressSnappyCpu = cpuEnable && conf.parquetDecompressCpuSnappy,
+ decompressZstdCpu = cpuEnable && conf.parquetDecompressCpuZstd)
+ }
+
+ def disabled(): CpuCompressionConfig = CpuCompressionConfig(false, false)
+}
+
trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics
with MultiFileReaderFunctions {
// the size of Parquet magic (at start+end) and footer length values
@@ -1353,6 +1375,8 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics
def isSchemaCaseSensitive: Boolean
+ def compressCfg: CpuCompressionConfig
+
val copyBufferSize = conf.getInt("parquet.read.allocation.size", 8 * 1024 * 1024)
def checkIfNeedToSplitBlocks(currentDateRebaseMode: DateTimeRebaseMode,
@@ -1418,13 +1442,8 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics
schema: MessageType,
handleCoalesceFiles: Boolean): Long = {
// start with the size of Parquet magic (at start+end) and footer length values
- var size: Long = PARQUET_META_SIZE
-
- // Calculate the total amount of column data that will be copied
- // NOTE: Avoid using block.getTotalByteSize here as that is the
- // uncompressed size rather than the size in the file.
- size += currentChunkedBlocks.flatMap(_.getColumns.asScala.map(_.getTotalSize)).sum
-
+ val headerSize: Long = PARQUET_META_SIZE
+ val blocksSize = ParquetPartitionReader.computeOutputSize(currentChunkedBlocks, compressCfg)
val footerSize = calculateParquetFooterSize(currentChunkedBlocks, schema)
val extraMemory = if (handleCoalesceFiles) {
val numCols = currentChunkedBlocks.head.getColumns().size()
@@ -1432,8 +1451,7 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics
} else {
0
}
- val totalSize = size + footerSize + extraMemory
- totalSize
+ headerSize + blocksSize + footerSize + extraMemory
}
protected def writeFooter(
@@ -1532,7 +1550,7 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics
* metadata but with the file offsets updated to reflect the new position of the column data
* as written to the output.
*
- * @param in the input stream for the original Parquet file
+ * @param filePath the path to the Parquet file
* @param out the output stream to receive the data
* @param blocks block metadata from the original file that will appear in the computed file
* @param realStartOffset starting file offset of the first block
@@ -1575,6 +1593,258 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics
computeBlockMetaData(blocks, realStartOffset)
}
+ private class BufferedFileInput(
+ filePath: Path,
+ blocks: Seq[BlockMetaData],
+ metrics: Map[String, GpuMetric]) extends InputStream {
+ private[this] val in = filePath.getFileSystem(conf).open(filePath)
+ private[this] val buffer: Array[Byte] = new Array[Byte](copyBufferSize)
+ private[this] var bufferSize: Int = 0
+ private[this] var bufferFilePos: Long = in.getPos
+ private[this] var bufferPos: Int = 0
+ private[this] val columnIter = blocks.flatMap(_.getColumns.asScala).iterator
+ private[this] var currentColumn: Option[ColumnChunkMetaData] = None
+ private[this] val readTime: GpuMetric = metrics.getOrElse(READ_FS_TIME, NoopMetric)
+
+ override def read(): Int = {
+ while (bufferPos == bufferSize) {
+ fillBuffer()
+ }
+ val result = buffer(bufferPos)
+ bufferPos += 1
+ result
+ }
+
+ override def read(b: Array[Byte]): Int = read(b, 0, b.length)
+
+ override def read(dest: Array[Byte], off: Int, len: Int): Int = {
+ var bytesLeft = len
+ while (bytesLeft > 0) {
+ if (bufferPos == bufferSize) {
+ fillBuffer()
+ }
+ val numBytes = Math.min(bytesLeft, bufferSize - bufferPos)
+ System.arraycopy(buffer, bufferPos, dest, off + len - bytesLeft, numBytes)
+ bufferPos += numBytes
+ bytesLeft -= numBytes
+ }
+ len
+ }
+
+ def read(out: HostMemoryOutputStream, len: Long): Unit = {
+ var bytesLeft = len
+ while (bytesLeft > 0) {
+ if (bufferPos == bufferSize) {
+ fillBuffer()
+ }
+ // downcast is safe because bufferSize is an int
+ val numBytes = Math.min(bytesLeft, bufferSize - bufferPos).toInt
+ out.write(buffer, bufferPos, numBytes)
+ bufferPos += numBytes
+ bytesLeft -= numBytes
+ }
+ }
+
+ def read(out: HostMemoryBuffer, len: Long): Unit = {
+ var bytesLeft = len
+ while (bytesLeft > 0) {
+ if (bufferPos == bufferSize) {
+ fillBuffer()
+ }
+ // downcast is safe because bufferSize is an int
+ val numBytes = Math.min(bytesLeft, bufferSize - bufferPos).toInt
+ out.setBytes(len - bytesLeft, buffer, bufferPos, numBytes)
+ bufferPos += numBytes
+ bytesLeft -= numBytes
+ }
+ }
+
+ override def skip(n: Long): Long = {
+ seek(getPos + n)
+ n
+ }
+
+ def getPos: Long = bufferFilePos + bufferPos
+
+ def seek(desiredPos: Long): Unit = {
+ require(desiredPos >= getPos, "Only supports seeking forward")
+ val posDiff = desiredPos - bufferFilePos
+ if (posDiff >= 0 && posDiff < bufferSize) {
+ bufferPos = posDiff.toInt
+ } else {
+ in.seek(desiredPos)
+ bufferFilePos = desiredPos
+ bufferSize = 0
+ bufferPos = 0
+ }
+ }
+
+ override def close(): Unit = {
+ readTime.ns {
+ in.close()
+ }
+ }
+
+ private def fillBuffer(): Unit = {
+ // TODO: Add FileCache support https://github.com/NVIDIA/spark-rapids/issues/11775
+ var bytesToCopy = currentColumn.map { c =>
+ Math.max(0, c.getStartingPos + c.getTotalSize - getPos)
+ }.getOrElse(0L)
+ var done = bytesToCopy >= buffer.length
+ while (!done && columnIter.hasNext) {
+ val column = columnIter.next()
+ currentColumn = Some(column)
+ done = if (getPos + bytesToCopy == column.getStartingPos) {
+ bytesToCopy += column.getTotalSize
+ bytesToCopy >= buffer.length
+ } else {
+ true
+ }
+ }
+ if (bytesToCopy <= 0) {
+ throw new EOFException("read beyond column data range")
+ }
+ bufferFilePos = in.getPos
+ bufferPos = 0
+ bufferSize = Math.min(bytesToCopy, buffer.length).toInt
+ readTime.ns {
+ in.readFully(buffer, 0, bufferSize)
+ }
+ }
+ }
+
+ /**
+ * Copies the data corresponding to the clipped blocks in the original file and compute the
+ * block metadata for the output. The output blocks will contain the same column chunk
+ * metadata but with the file offsets updated to reflect the new position of the column data
+ * as written to the output.
+ *
+ * @param filePath the path to the Parquet file
+ * @param out the output stream to receive the data
+ * @param blocks block metadata from the original file that will appear in the computed file
+ * @param realStartOffset starting file offset of the first block
+ * @return updated block metadata corresponding to the output
+ */
+ protected def copyAndUncompressBlocksData(
+ filePath: Path,
+ out: HostMemoryOutputStream,
+ blocks: Seq[BlockMetaData],
+ realStartOffset: Long,
+ metrics: Map[String, GpuMetric],
+ compressCfg: CpuCompressionConfig): Seq[BlockMetaData] = {
+ val outStartPos = out.getPos
+ val writeTime = metrics.getOrElse(WRITE_BUFFER_TIME, NoopMetric)
+ withResource(new BufferedFileInput(filePath, blocks, metrics)) { in =>
+ val newBlocks = blocks.map { block =>
+ val newColumns = block.getColumns.asScala.map { column =>
+ var columnTotalSize = column.getTotalSize
+ var columnCodec = column.getCodec
+ val columnStartingPos = realStartOffset + out.getPos - outStartPos
+ val columnDictOffset = if (column.getDictionaryPageOffset > 0) {
+ column.getDictionaryPageOffset + columnStartingPos - column.getStartingPos
+ } else {
+ 0
+ }
+ writeTime.ns {
+ columnCodec match {
+ case CompressionCodecName.SNAPPY if compressCfg.decompressSnappyCpu =>
+ val columnStartPos = out.getPos
+ decompressSnappy(in, out, column)
+ columnCodec = CompressionCodecName.UNCOMPRESSED
+ columnTotalSize = out.getPos - columnStartPos
+ case CompressionCodecName.ZSTD if compressCfg.decompressZstdCpu =>
+ val columnStartPos = out.getPos
+ decompressZstd(in, out, column)
+ columnCodec = CompressionCodecName.UNCOMPRESSED
+ columnTotalSize = out.getPos - columnStartPos
+ case _ =>
+ in.seek(column.getStartingPos)
+ in.read(out, columnTotalSize)
+ }
+ }
+ ColumnChunkMetaData.get(
+ column.getPath,
+ column.getPrimitiveType,
+ columnCodec,
+ column.getEncodingStats,
+ column.getEncodings,
+ column.getStatistics,
+ columnStartingPos,
+ columnDictOffset,
+ column.getValueCount,
+ columnTotalSize,
+ columnTotalSize)
+ }
+ GpuParquetUtils.newBlockMeta(block.getRowCount, newColumns.toSeq)
+ }
+ newBlocks
+ }
+ }
+
+ private def decompressSnappy(
+ in: BufferedFileInput,
+ out: HostMemoryOutputStream,
+ column: ColumnChunkMetaData): Unit = {
+ val endPos = column.getStartingPos + column.getTotalSize
+ in.seek(column.getStartingPos)
+ var inData: Option[HostMemoryBuffer] = None
+ try {
+ while (in.getPos != endPos) {
+ val pageHeader = Util.readPageHeader(in)
+ val compressedSize = pageHeader.getCompressed_page_size
+ val uncompressedSize = pageHeader.getUncompressed_page_size
+ pageHeader.unsetCrc()
+ pageHeader.setCompressed_page_size(uncompressedSize)
+ Util.writePageHeader(pageHeader, out)
+ if (inData.map(_.getLength).getOrElse(0L) < compressedSize) {
+ inData.foreach(_.close())
+ inData = Some(HostMemoryBuffer.allocate(compressedSize, false))
+ }
+ inData.foreach { compressedBuffer =>
+ in.read(compressedBuffer, compressedSize)
+ val bbIn = compressedBuffer.asByteBuffer(0, compressedSize)
+ val bbOut = out.writeAsByteBuffer(uncompressedSize)
+ Snappy.uncompress(bbIn, bbOut)
+ }
+ }
+ } finally {
+ inData.foreach(_.close())
+ }
+ }
+
+ private def decompressZstd(
+ in: BufferedFileInput,
+ out: HostMemoryOutputStream,
+ column: ColumnChunkMetaData): Unit = {
+ val endPos = column.getStartingPos + column.getTotalSize
+ in.seek(column.getStartingPos)
+ var inData: Option[HostMemoryBuffer] = None
+ try {
+ withResource(new ZstdDecompressCtx()) { ctx =>
+ while (in.getPos != endPos) {
+ val pageHeader = Util.readPageHeader(in)
+ val compressedSize = pageHeader.getCompressed_page_size
+ val uncompressedSize = pageHeader.getUncompressed_page_size
+ pageHeader.unsetCrc()
+ pageHeader.setCompressed_page_size(uncompressedSize)
+ Util.writePageHeader(pageHeader, out)
+ if (inData.map(_.getLength).getOrElse(0L) < compressedSize) {
+ inData.foreach(_.close())
+ inData = Some(HostMemoryBuffer.allocate(compressedSize, false))
+ }
+ inData.foreach { compressedBuffer =>
+ in.read(compressedBuffer, compressedSize)
+ val bbIn = compressedBuffer.asByteBuffer(0, compressedSize)
+ val bbOut = out.writeAsByteBuffer(uncompressedSize)
+ ctx.decompress(bbOut, bbIn)
+ }
+ }
+ }
+ } finally {
+ inData.foreach(_.close())
+ }
+ }
+
private def copyRemoteBlocksData(
remoteCopies: Seq[CopyRange],
filePath: Path,
@@ -1666,7 +1936,11 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics
closeOnExcept(HostMemoryBuffer.allocate(estTotalSize)) { hmb =>
val out = new HostMemoryOutputStream(hmb)
out.write(ParquetPartitionReader.PARQUET_MAGIC)
- val outputBlocks = copyBlocksData(filePath, out, blocks, out.getPos, metrics)
+ val outputBlocks = if (compressCfg.decompressAnyCpu) {
+ copyAndUncompressBlocksData(filePath, out, blocks, out.getPos, metrics, compressCfg)
+ } else {
+ copyBlocksData(filePath, out, blocks, out.getPos, metrics)
+ }
val footerPos = out.getPos
writeFooter(out, outputBlocks, clippedSchema)
BytesUtils.writeIntLittleEndian(out, (out.getPos - footerPos).toInt)
@@ -1802,7 +2076,7 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics
block.asInstanceOf[ParquetDataBlock].dataBlock
implicit def toDataBlockBase(blocks: Seq[BlockMetaData]): Seq[DataBlockBase] =
- blocks.map(ParquetDataBlock)
+ blocks.map(b => ParquetDataBlock(b, compressCfg))
implicit def toBlockMetaDataSeq(blocks: Seq[DataBlockBase]): Seq[BlockMetaData] =
blocks.map(_.asInstanceOf[ParquetDataBlock].dataBlock)
@@ -1814,10 +2088,14 @@ private case class ParquetSchemaWrapper(schema: MessageType) extends SchemaBase
}
// Parquet BlockMetaData wrapper
-private case class ParquetDataBlock(dataBlock: BlockMetaData) extends DataBlockBase {
+private case class ParquetDataBlock(
+ dataBlock: BlockMetaData,
+ compressCfg: CpuCompressionConfig) extends DataBlockBase {
override def getRowCount: Long = dataBlock.getRowCount
override def getReadDataSize: Long = dataBlock.getTotalByteSize
- override def getBlockSize: Long = dataBlock.getColumns.asScala.map(_.getTotalSize).sum
+ override def getBlockSize: Long = {
+ ParquetPartitionReader.computeOutputSize(dataBlock, compressCfg)
+ }
}
/** Parquet extra information containing rebase modes and whether there is int96 timestamp */
@@ -1876,6 +2154,7 @@ class MultiFileParquetPartitionReader(
maxGpuColumnSizeBytes: Long,
useChunkedReader: Boolean,
maxChunkedReaderMemoryUsageSizeBytes: Long,
+ override val compressCfg: CpuCompressionConfig,
override val execMetrics: Map[String, GpuMetric],
partitionSchema: StructType,
numThreads: Int,
@@ -1900,7 +2179,8 @@ class MultiFileParquetPartitionReader(
file: Path,
outhmb: HostMemoryBuffer,
blocks: ArrayBuffer[DataBlockBase],
- offset: Long)
+ offset: Long,
+ compressCfg: CpuCompressionConfig)
extends Callable[(Seq[DataBlockBase], Long)] {
override def call(): (Seq[DataBlockBase], Long) = {
@@ -1909,7 +2189,11 @@ class MultiFileParquetPartitionReader(
val startBytesRead = fileSystemBytesRead()
val outputBlocks = withResource(outhmb) { _ =>
withResource(new HostMemoryOutputStream(outhmb)) { out =>
- copyBlocksData(file, out, blocks.toSeq, offset, metrics)
+ if (compressCfg.decompressAnyCpu) {
+ copyAndUncompressBlocksData(file, out, blocks.toSeq, offset, metrics, compressCfg)
+ } else {
+ copyBlocksData(file, out, blocks.toSeq, offset, metrics)
+ }
}
}
val bytesRead = fileSystemBytesRead() - startBytesRead
@@ -1961,7 +2245,7 @@ class MultiFileParquetPartitionReader(
blocks: ArrayBuffer[DataBlockBase],
offset: Long,
batchContext: BatchContext): Callable[(Seq[DataBlockBase], Long)] = {
- new ParquetCopyBlocksRunner(taskContext, file, outhmb, blocks, offset)
+ new ParquetCopyBlocksRunner(taskContext, file, outhmb, blocks, offset, compressCfg)
}
override final def getFileFormatShortName: String = "Parquet"
@@ -2072,6 +2356,7 @@ class MultiFileCloudParquetPartitionReader(
maxGpuColumnSizeBytes: Long,
useChunkedReader: Boolean,
maxChunkedReaderMemoryUsageSizeBytes: Long,
+ override val compressCfg: CpuCompressionConfig,
override val execMetrics: Map[String, GpuMetric],
partitionSchema: StructType,
numThreads: Int,
@@ -2761,6 +3046,7 @@ class ParquetPartitionReader(
targetBatchSizeBytes: Long,
useChunkedReader: Boolean,
maxChunkedReaderMemoryUsageSizeBytes: Long,
+ override val compressCfg: CpuCompressionConfig,
override val execMetrics: Map[String, GpuMetric],
dateRebaseMode: DateTimeRebaseMode,
timestampRebaseMode: DateTimeRebaseMode,
@@ -2873,26 +3159,34 @@ object ParquetPartitionReader {
length: Long,
outputOffset: Long) extends CopyItem
- /**
- * Build a new BlockMetaData
- *
- * @param rowCount the number of rows in this block
- * @param columns the new column chunks to reference in the new BlockMetaData
- * @return the new BlockMetaData
- */
- private[rapids] def newParquetBlock(
- rowCount: Long,
- columns: Seq[ColumnChunkMetaData]): BlockMetaData = {
- val block = new BlockMetaData
- block.setRowCount(rowCount)
+ private[rapids] def computeOutputSize(
+ blocks: Seq[BlockMetaData],
+ compressCfg: CpuCompressionConfig): Long = {
+ blocks.map { block =>
+ computeOutputSize(block, compressCfg)
+ }.sum
+ }
- var totalSize: Long = 0
- columns.foreach { column =>
- block.addColumn(column)
- totalSize += column.getTotalUncompressedSize
+ private[rapids] def computeOutputSize(
+ block: BlockMetaData,
+ compressCfg: CpuCompressionConfig): Long = {
+ if (compressCfg.decompressAnyCpu) {
+ block.getColumns.asScala.map { c =>
+ if ((c.getCodec == CompressionCodecName.SNAPPY && compressCfg.decompressSnappyCpu)
+ || (c.getCodec == CompressionCodecName.ZSTD && compressCfg.decompressZstdCpu)) {
+ // Page headers need to be rewritten when CPU decompresses, and that may
+ // increase the size of the page header. Guess how many pages there may be
+ // and add a fudge factor per page to try to avoid a late realloc+copy.
+ // NOTE: Avoid using block.getTotalByteSize as that is the
+ // uncompressed size rather than the size in the file.
+ val estimatedPageCount = (c.getTotalUncompressedSize / (1024 * 1024)) + 1
+ c.getTotalUncompressedSize + estimatedPageCount * 8
+ } else {
+ c.getTotalSize
+ }
+ }.sum
+ } else {
+ block.getColumns.asScala.map(_.getTotalSize).sum
}
- block.setTotalByteSize(totalSize)
-
- block
}
}
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostMemoryStreams.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostMemoryStreams.scala
index 08fe5be50b2..4be11b13254 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostMemoryStreams.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostMemoryStreams.scala
@@ -54,6 +54,12 @@ class HostMemoryOutputStream(val buffer: HostMemoryBuffer) extends OutputStream
pos += numBytes
}
+ def writeAsByteBuffer(length: Int): ByteBuffer = {
+ val bb = buffer.asByteBuffer(pos, length)
+ pos += length
+ bb
+ }
+
def getPos: Long = pos
def seek(newPos: Long): Unit = {
@@ -132,6 +138,12 @@ trait HostMemoryInputStreamMixIn extends InputStream {
}
}
+ def readByteBuffer(length: Int): ByteBuffer = {
+ val bb = hmb.asByteBuffer(pos, length)
+ pos += length
+ bb
+ }
+
override def skip(count: Long): Long = {
val oldPos = pos
pos = Math.min(pos + count, hmbLength)
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
index ab7a788d205..406aeb0365b 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
@@ -1120,6 +1120,31 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.checkValues(RapidsReaderType.values.map(_.toString))
.createWithDefault(RapidsReaderType.AUTO.toString)
+ val PARQUET_DECOMPRESS_CPU =
+ conf("spark.rapids.sql.format.parquet.decompressCpu")
+ .doc("If true then the CPU is eligible to decompress Parquet data rather than the GPU. " +
+ s"See other spark.rapids.sql.format.parquet.decompressCpu.* configuration settings " +
+ "to control this for specific compression codecs.")
+ .internal()
+ .booleanConf
+ .createWithDefault(false)
+
+ val PARQUET_DECOMPRESS_CPU_SNAPPY =
+ conf("spark.rapids.sql.format.parquet.decompressCpu.snappy")
+ .doc(s"If true and $PARQUET_DECOMPRESS_CPU is true then the CPU decompresses " +
+ "Parquet Snappy data rather than the GPU")
+ .internal()
+ .booleanConf
+ .createWithDefault(true)
+
+ val PARQUET_DECOMPRESS_CPU_ZSTD =
+ conf("spark.rapids.sql.format.parquet.decompressCpu.zstd")
+ .doc(s"If true and $PARQUET_DECOMPRESS_CPU is true then the CPU decompresses " +
+ "Parquet Zstandard data rather than the GPU")
+ .internal()
+ .booleanConf
+ .createWithDefault(true)
+
val READER_MULTITHREADED_COMBINE_THRESHOLD =
conf("spark.rapids.sql.reader.multithreaded.combine.sizeBytes")
.doc("The target size in bytes to combine multiple small files together when using the " +
@@ -2960,6 +2985,12 @@ class RapidsConf(conf: Map[String, String]) extends Logging {
lazy val isParquetMultiThreadReadEnabled: Boolean = isParquetAutoReaderEnabled ||
RapidsReaderType.withName(get(PARQUET_READER_TYPE)) == RapidsReaderType.MULTITHREADED
+ lazy val parquetDecompressCpu: Boolean = get(PARQUET_DECOMPRESS_CPU)
+
+ lazy val parquetDecompressCpuSnappy: Boolean = get(PARQUET_DECOMPRESS_CPU_SNAPPY)
+
+ lazy val parquetDecompressCpuZstd: Boolean = get(PARQUET_DECOMPRESS_CPU_ZSTD)
+
lazy val maxNumParquetFilesParallel: Int = get(PARQUET_MULTITHREAD_READ_MAX_NUM_FILES_PARALLEL)
lazy val isParquetReadEnabled: Boolean = get(ENABLE_PARQUET_READ)