Skip to content

Commit

Permalink
Restore withRetry block
Browse files Browse the repository at this point in the history
  • Loading branch information
jlowe committed Jan 3, 2025
1 parent de96074 commit 1150c4f
Showing 1 changed file with 31 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2815,38 +2815,40 @@ class MultiFileCloudParquetPartitionReader(
// about to start using the GPU
GpuSemaphore.acquireIfNecessary(TaskContext.get())

// RmmRapidsRetryIterator.withRetryNoSplit(hostBuffer) { _ =>
// The MakeParquetTableProducer will close the input buffer, and that would be bad
// because we don't want to close it until we know that we are done with it
// hostBuffer.incRefCount()
val tableReader = MakeParquetTableProducer(useChunkedReader,
maxChunkedReaderMemoryUsageSizeBytes,
conf, targetBatchSizeBytes,
parseOpts,
hostBuffers, metrics,
dateRebaseMode, timestampRebaseMode, hasInt96Timestamps,
isSchemaCaseSensitive, useFieldId, readDataSchema, clippedSchema, files,
debugDumpPrefix, debugDumpAlways)

val batchIter = CachedGpuBatchIterator(tableReader, colTypes)

if (allPartValues.isDefined) {
val allPartInternalRows = allPartValues.get.map(_._2)
val rowsPerPartition = allPartValues.get.map(_._1)
new GpuColumnarBatchWithPartitionValuesIterator(batchIter, allPartInternalRows,
rowsPerPartition, partitionSchema, maxGpuColumnSizeBytes)
} else {
// this is a bit weird, we don't have number of rows when allPartValues isn't
// filled in so can't use GpuColumnarBatchWithPartitionValuesIterator
batchIter.flatMap { batch =>
// we have to add partition values here for this batch, we already verified that
// its not different for all the blocks in this batch
BatchWithPartitionDataUtils.addSinglePartitionValueToBatch(batch,
partedFile.partitionValues, partitionSchema, maxGpuColumnSizeBytes)
withResource(hostBuffers) { _ =>
RmmRapidsRetryIterator.withRetryNoSplit {
// The MakeParquetTableProducer will close the input buffers, and that would be bad
// because we don't want to close them until we know that we are done with them.
hostBuffers.foreach(_.incRefCount())
val tableReader = MakeParquetTableProducer(useChunkedReader,
maxChunkedReaderMemoryUsageSizeBytes,
conf, targetBatchSizeBytes,
parseOpts,
hostBuffers, metrics,
dateRebaseMode, timestampRebaseMode, hasInt96Timestamps,
isSchemaCaseSensitive, useFieldId, readDataSchema, clippedSchema, files,
debugDumpPrefix, debugDumpAlways)

val batchIter = CachedGpuBatchIterator(tableReader, colTypes)

if (allPartValues.isDefined) {
val allPartInternalRows = allPartValues.get.map(_._2)
val rowsPerPartition = allPartValues.get.map(_._1)
new GpuColumnarBatchWithPartitionValuesIterator(batchIter, allPartInternalRows,
rowsPerPartition, partitionSchema, maxGpuColumnSizeBytes)
} else {
// this is a bit weird, we don't have number of rows when allPartValues isn't
// filled in so can't use GpuColumnarBatchWithPartitionValuesIterator
batchIter.flatMap { batch =>
// we have to add partition values here for this batch, we already verified that
// its not different for all the blocks in this batch
BatchWithPartitionDataUtils.addSinglePartitionValueToBatch(batch,
partedFile.partitionValues, partitionSchema, maxGpuColumnSizeBytes)
}
}
}
}
// }
}
}

object MakeParquetTableProducer extends Logging {
Expand Down

0 comments on commit 1150c4f

Please sign in to comment.