From 1150c4ff9409f6a5471f8157c57a76e89bd4cbc7 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Fri, 3 Jan 2025 15:23:16 -0600 Subject: [PATCH] Restore withRetry block --- .../nvidia/spark/rapids/GpuParquetScan.scala | 60 ++++++++++--------- 1 file changed, 31 insertions(+), 29 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index a7677476fff..4fa0d209190 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -2815,38 +2815,40 @@ class MultiFileCloudParquetPartitionReader( // about to start using the GPU GpuSemaphore.acquireIfNecessary(TaskContext.get()) -// RmmRapidsRetryIterator.withRetryNoSplit(hostBuffer) { _ => - // The MakeParquetTableProducer will close the input buffer, and that would be bad - // because we don't want to close it until we know that we are done with it -// hostBuffer.incRefCount() - val tableReader = MakeParquetTableProducer(useChunkedReader, - maxChunkedReaderMemoryUsageSizeBytes, - conf, targetBatchSizeBytes, - parseOpts, - hostBuffers, metrics, - dateRebaseMode, timestampRebaseMode, hasInt96Timestamps, - isSchemaCaseSensitive, useFieldId, readDataSchema, clippedSchema, files, - debugDumpPrefix, debugDumpAlways) - - val batchIter = CachedGpuBatchIterator(tableReader, colTypes) - - if (allPartValues.isDefined) { - val allPartInternalRows = allPartValues.get.map(_._2) - val rowsPerPartition = allPartValues.get.map(_._1) - new GpuColumnarBatchWithPartitionValuesIterator(batchIter, allPartInternalRows, - rowsPerPartition, partitionSchema, maxGpuColumnSizeBytes) - } else { - // this is a bit weird, we don't have number of rows when allPartValues isn't - // filled in so can't use GpuColumnarBatchWithPartitionValuesIterator - batchIter.flatMap { batch => - // we have to add partition values here for this batch, we already verified that - // its not different for all the blocks in this batch - BatchWithPartitionDataUtils.addSinglePartitionValueToBatch(batch, - partedFile.partitionValues, partitionSchema, maxGpuColumnSizeBytes) + withResource(hostBuffers) { _ => + RmmRapidsRetryIterator.withRetryNoSplit { + // The MakeParquetTableProducer will close the input buffers, and that would be bad + // because we don't want to close them until we know that we are done with them. + hostBuffers.foreach(_.incRefCount()) + val tableReader = MakeParquetTableProducer(useChunkedReader, + maxChunkedReaderMemoryUsageSizeBytes, + conf, targetBatchSizeBytes, + parseOpts, + hostBuffers, metrics, + dateRebaseMode, timestampRebaseMode, hasInt96Timestamps, + isSchemaCaseSensitive, useFieldId, readDataSchema, clippedSchema, files, + debugDumpPrefix, debugDumpAlways) + + val batchIter = CachedGpuBatchIterator(tableReader, colTypes) + + if (allPartValues.isDefined) { + val allPartInternalRows = allPartValues.get.map(_._2) + val rowsPerPartition = allPartValues.get.map(_._1) + new GpuColumnarBatchWithPartitionValuesIterator(batchIter, allPartInternalRows, + rowsPerPartition, partitionSchema, maxGpuColumnSizeBytes) + } else { + // this is a bit weird, we don't have number of rows when allPartValues isn't + // filled in so can't use GpuColumnarBatchWithPartitionValuesIterator + batchIter.flatMap { batch => + // we have to add partition values here for this batch, we already verified that + // its not different for all the blocks in this batch + BatchWithPartitionDataUtils.addSinglePartitionValueToBatch(batch, + partedFile.partitionValues, partitionSchema, maxGpuColumnSizeBytes) + } } } } -// } + } } object MakeParquetTableProducer extends Logging {