From 30c4ddb8d2232407a55855350c7a25d4d285824e Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Fri, 6 Dec 2024 09:29:23 +0800 Subject: [PATCH] Add support for kudo write metrics (#11784) * Add support for kudo write metrics * Refactor Signed-off-by: liurenjie1024 * Address comments * Resolve comments * Fix compiler * Fix build break * Fix build break * Fix build break * Fix build break --------- Signed-off-by: liurenjie1024 --- .../delta/GpuOptimizeWriteExchangeExec.scala | 24 ++--- .../rapids/GpuColumnarBatchSerializer.scala | 53 +++++++---- .../com/nvidia/spark/rapids/GpuExec.scala | 10 +- .../RapidsShuffleInternalManagerBase.scala | 17 ++-- .../GpuShuffleExchangeExecBase.scala | 92 +++++++++++++++---- .../RapidsShuffleThreadedReaderSuite.scala | 3 +- 6 files changed, 132 insertions(+), 67 deletions(-) diff --git a/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala b/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala index 0c212d6842a..5b26943a541 100644 --- a/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala +++ b/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.{CoalescedPartitionSpec, ShufflePartitionS import org.apache.spark.sql.execution.exchange.Exchange import org.apache.spark.sql.execution.metric.{SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} import org.apache.spark.sql.rapids.execution.{GpuShuffleExchangeExecBase, ShuffledBatchRDD} +import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.createAdditionalExchangeMetrics import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.ThreadUtils @@ -71,22 +72,11 @@ case class GpuOptimizeWriteExchangeExec( private[sql] lazy val readMetrics = SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) - override lazy val additionalMetrics: Map[String, GpuMetric] = Map( - "dataSize" -> createSizeMetric(ESSENTIAL_LEVEL, "data size"), - "dataReadSize" -> createSizeMetric(MODERATE_LEVEL, "data read size"), - "rapidsShuffleSerializationTime" -> - createNanoTimingMetric(DEBUG_LEVEL, "rs. serialization time"), - "rapidsShuffleDeserializationTime" -> - createNanoTimingMetric(DEBUG_LEVEL, "rs. deserialization time"), - "rapidsShuffleWriteTime" -> - createNanoTimingMetric(ESSENTIAL_LEVEL, "rs. shuffle write time"), - "rapidsShuffleCombineTime" -> - createNanoTimingMetric(DEBUG_LEVEL, "rs. shuffle combine time"), - "rapidsShuffleWriteIoTime" -> - createNanoTimingMetric(DEBUG_LEVEL, "rs. shuffle write io time"), - "rapidsShuffleReadTime" -> - createNanoTimingMetric(ESSENTIAL_LEVEL, "rs. shuffle read time") - ) ++ GpuMetric.wrap(readMetrics) ++ GpuMetric.wrap(writeMetrics) + override lazy val additionalMetrics : Map[String, GpuMetric] = { + createAdditionalExchangeMetrics(this) ++ + GpuMetric.wrap(readMetrics) ++ + GpuMetric.wrap(writeMetrics) + } override lazy val allMetrics: Map[String, GpuMetric] = { Map( @@ -98,7 +88,7 @@ case class GpuOptimizeWriteExchangeExec( } private lazy val serializer: Serializer = - new GpuColumnarBatchSerializer(gpuLongMetric("dataSize"), + new GpuColumnarBatchSerializer(allMetrics, child.output.map(_.dataType).toArray, RapidsConf.SHUFFLE_KUDO_SERIALIZER_ENABLED.get(child.conf)) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala index 8fde39eecf8..82f368936a7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala @@ -31,6 +31,10 @@ import com.nvidia.spark.rapids.jni.kudo.{KudoSerializer, KudoTable, KudoTableHea import org.apache.spark.TaskContext import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer, SerializerInstance} +import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.{METRIC_DATA_SIZE, + METRIC_SHUFFLE_DESER_STREAM_TIME, METRIC_SHUFFLE_SER_CALC_HEADER_TIME, + METRIC_SHUFFLE_SER_COPY_BUFFER_TIME, METRIC_SHUFFLE_SER_COPY_HEADER_TIME, + METRIC_SHUFFLE_SER_STREAM_TIME} import org.apache.spark.sql.types.{DataType, NullType} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -45,7 +49,7 @@ trait BaseSerializedTableIterator extends Iterator[(Int, ColumnarBatch)] { def peekNextBatchSize(): Option[Long] } -class SerializedBatchIterator(dIn: DataInputStream) +class SerializedBatchIterator(dIn: DataInputStream, deserTime: GpuMetric) extends BaseSerializedTableIterator { private[this] var nextHeader: Option[SerializedTableHeader] = None private[this] var streamClosed: Boolean = false @@ -58,7 +62,7 @@ class SerializedBatchIterator(dIn: DataInputStream) } } - override def peekNextBatchSize(): Option[Long] = { + override def peekNextBatchSize(): Option[Long] = deserTime.ns { if (streamClosed) { None } else { @@ -78,7 +82,7 @@ class SerializedBatchIterator(dIn: DataInputStream) } } - private def readNextBatch(): ColumnarBatch = { + private def readNextBatch(): ColumnarBatch = deserTime.ns { withResource(new NvtxRange("Read Batch", NvtxColor.YELLOW)) { _ => val header = nextHeader.get nextHeader = None @@ -125,26 +129,32 @@ class SerializedBatchIterator(dIn: DataInputStream) * * @note The RAPIDS shuffle does not use this code. */ -class GpuColumnarBatchSerializer(dataSize: GpuMetric, dataTypes: Array[DataType], useKudo: Boolean) +class GpuColumnarBatchSerializer(metrics: Map[String, GpuMetric], dataTypes: Array[DataType], + useKudo: Boolean) extends Serializer with Serializable { override def newInstance(): SerializerInstance = { if (useKudo) { - new KudoSerializerInstance(dataSize, dataTypes) + new KudoSerializerInstance(metrics, dataTypes) } else { - new GpuColumnarBatchSerializerInstance(dataSize) + new GpuColumnarBatchSerializerInstance(metrics) } } override def supportsRelocationOfSerializedObjects: Boolean = true } -private class GpuColumnarBatchSerializerInstance(dataSize: GpuMetric) extends SerializerInstance { +private class GpuColumnarBatchSerializerInstance(metrics: Map[String, GpuMetric]) extends + SerializerInstance { + private val dataSize = metrics(METRIC_DATA_SIZE) + private val serTime = metrics(METRIC_SHUFFLE_SER_STREAM_TIME) + private val deserTime = metrics(METRIC_SHUFFLE_DESER_STREAM_TIME) + override def serializeStream(out: OutputStream): SerializationStream = new SerializationStream { private[this] val dOut: DataOutputStream = new DataOutputStream(new BufferedOutputStream(out)) - override def writeValue[T: ClassTag](value: T): SerializationStream = { + override def writeValue[T: ClassTag](value: T): SerializationStream = serTime.ns { val batch = value.asInstanceOf[ColumnarBatch] val numColumns = batch.numCols() val columns: Array[HostColumnVector] = new Array(numColumns) @@ -227,7 +237,7 @@ private class GpuColumnarBatchSerializerInstance(dataSize: GpuMetric) extends Se private[this] val dIn: DataInputStream = new DataInputStream(new BufferedInputStream(in)) override def asKeyValueIterator: Iterator[(Int, ColumnarBatch)] = { - new SerializedBatchIterator(dIn) + new SerializedBatchIterator(dIn, deserTime) } override def asIterator: Iterator[Any] = { @@ -327,8 +337,14 @@ object SerializedTableColumn { * @param dataTypes data types of the columns in the batch */ private class KudoSerializerInstance( - val dataSize: GpuMetric, + val metrics: Map[String, GpuMetric], val dataTypes: Array[DataType]) extends SerializerInstance { + private val dataSize = metrics(METRIC_DATA_SIZE) + private val serTime = metrics(METRIC_SHUFFLE_SER_STREAM_TIME) + private val serCalcHeaderTime = metrics(METRIC_SHUFFLE_SER_CALC_HEADER_TIME) + private val serCopyHeaderTime = metrics(METRIC_SHUFFLE_SER_COPY_HEADER_TIME) + private val serCopyBufferTime = metrics(METRIC_SHUFFLE_SER_COPY_BUFFER_TIME) + private val deserTime = metrics(METRIC_SHUFFLE_DESER_STREAM_TIME) private lazy val kudo = new KudoSerializer(GpuColumnVector.from(dataTypes)) @@ -336,7 +352,7 @@ private class KudoSerializerInstance( private[this] val dOut: DataOutputStream = new DataOutputStream(new BufferedOutputStream(out)) - override def writeValue[T: ClassTag](value: T): SerializationStream = { + override def writeValue[T: ClassTag](value: T): SerializationStream = serTime.ns { val batch = value.asInstanceOf[ColumnarBatch] val numColumns = batch.numCols() val columns: Array[HostColumnVector] = new Array(numColumns) @@ -368,7 +384,12 @@ private class KudoSerializerInstance( } withResource(new NvtxRange("Serialize Batch", NvtxColor.YELLOW)) { _ => - dataSize += kudo.writeToStream(columns, dOut, startRow, numRows) + val writeMetric = kudo.writeToStreamWithMetrics(columns, dOut, startRow, numRows) + + dataSize += writeMetric.getWrittenBytes + serCalcHeaderTime += writeMetric.getCalcHeaderTime + serCopyHeaderTime += writeMetric.getCopyHeaderTime + serCopyBufferTime += writeMetric.getCopyBufferTime } } else { withResource(new NvtxRange("Serialize Row Only Batch", NvtxColor.YELLOW)) { _ => @@ -410,7 +431,7 @@ private class KudoSerializerInstance( private[this] val dIn: DataInputStream = new DataInputStream(new BufferedInputStream(in)) override def asKeyValueIterator: Iterator[(Int, ColumnarBatch)] = { - new KudoSerializedBatchIterator(dIn) + new KudoSerializedBatchIterator(dIn, deserTime) } override def asIterator: Iterator[Any] = { @@ -483,7 +504,7 @@ object KudoSerializedTableColumn { } } -class KudoSerializedBatchIterator(dIn: DataInputStream) +class KudoSerializedBatchIterator(dIn: DataInputStream, deserTime: GpuMetric) extends BaseSerializedTableIterator { private[this] var nextHeader: Option[KudoTableHeader] = None private[this] var streamClosed: Boolean = false @@ -496,7 +517,7 @@ class KudoSerializedBatchIterator(dIn: DataInputStream) } } - override def peekNextBatchSize(): Option[Long] = { + override def peekNextBatchSize(): Option[Long] = deserTime.ns { if (streamClosed) { None } else { @@ -516,7 +537,7 @@ class KudoSerializedBatchIterator(dIn: DataInputStream) } } - private def readNextBatch(): ColumnarBatch = { + private def readNextBatch(): ColumnarBatch = deserTime.ns { withResource(new NvtxRange("Read Batch", NvtxColor.YELLOW)) { _ => val header = nextHeader.get nextHeader = None diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala index 3d9b6285a91..850a04f390f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala @@ -327,19 +327,19 @@ trait GpuExec extends SparkPlan { } } - protected def createMetric(level: MetricsLevel, name: String): GpuMetric = + def createMetric(level: MetricsLevel, name: String): GpuMetric = createMetricInternal(level, SQLMetrics.createMetric(sparkContext, name)) - protected def createNanoTimingMetric(level: MetricsLevel, name: String): GpuMetric = + def createNanoTimingMetric(level: MetricsLevel, name: String): GpuMetric = createMetricInternal(level, SQLMetrics.createNanoTimingMetric(sparkContext, name)) - protected def createSizeMetric(level: MetricsLevel, name: String): GpuMetric = + def createSizeMetric(level: MetricsLevel, name: String): GpuMetric = createMetricInternal(level, SQLMetrics.createSizeMetric(sparkContext, name)) - protected def createAverageMetric(level: MetricsLevel, name: String): GpuMetric = + def createAverageMetric(level: MetricsLevel, name: String): GpuMetric = createMetricInternal(level, SQLMetrics.createAverageMetric(sparkContext, name)) - protected def createTimingMetric(level: MetricsLevel, name: String): GpuMetric = + def createTimingMetric(level: MetricsLevel, name: String): GpuMetric = createMetricInternal(level, SQLMetrics.createTimingMetric(sparkContext, name)) protected def createFileCacheMetrics(): Map[String, GpuMetric] = { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala index 05bc76c3fab..fc255a6bfd0 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala @@ -42,6 +42,7 @@ import org.apache.spark.shuffle.{ShuffleWriter, _} import org.apache.spark.shuffle.api._ import org.apache.spark.shuffle.sort.{BypassMergeSortShuffleHandle, SortShuffleManager} import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.{METRIC_DATA_READ_SIZE, METRIC_DATA_SIZE, METRIC_SHUFFLE_COMBINE_TIME, METRIC_SHUFFLE_DESERIALIZATION_TIME, METRIC_SHUFFLE_READ_TIME, METRIC_SHUFFLE_SERIALIZATION_TIME, METRIC_SHUFFLE_WRITE_IO_TIME, METRIC_SHUFFLE_WRITE_TIME} import org.apache.spark.sql.rapids.shims.{GpuShuffleBlockResolver, RapidsShuffleThreadedReader, RapidsShuffleThreadedWriter} import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.storage.{RapidsShuffleBlockFetcherIterator, _} @@ -246,13 +247,13 @@ abstract class RapidsShuffleThreadedWriterBase[K, V]( with RapidsShuffleWriterShimHelper { private val metrics = handle.metrics private val serializationTimeMetric = - metrics.get("rapidsShuffleSerializationTime") + metrics.get(METRIC_SHUFFLE_SERIALIZATION_TIME) private val shuffleWriteTimeMetric = - metrics.get("rapidsShuffleWriteTime") + metrics.get(METRIC_SHUFFLE_WRITE_TIME) private val shuffleCombineTimeMetric = - metrics.get("rapidsShuffleCombineTime") + metrics.get(METRIC_SHUFFLE_COMBINE_TIME) private val ioTimeMetric = - metrics.get("rapidsShuffleWriteIoTime") + metrics.get(METRIC_SHUFFLE_WRITE_IO_TIME) private val dep: ShuffleDependency[K, V, V] = handle.dependency private val shuffleId = dep.shuffleId private val partitioner = dep.partitioner @@ -596,9 +597,9 @@ abstract class RapidsShuffleThreadedReaderBase[K, C]( private val sqlMetrics = handle.metrics private val dep = handle.dependency - private val deserializationTimeNs = sqlMetrics.get("rapidsShuffleDeserializationTime") - private val shuffleReadTimeNs = sqlMetrics.get("rapidsShuffleReadTime") - private val dataReadSize = sqlMetrics.get("dataReadSize") + private val deserializationTimeNs = sqlMetrics.get(METRIC_SHUFFLE_DESERIALIZATION_TIME) + private val shuffleReadTimeNs = sqlMetrics.get(METRIC_SHUFFLE_READ_TIME) + private val dataReadSize = sqlMetrics.get(METRIC_DATA_READ_SIZE) private var shuffleReadRange: NvtxRange = new NvtxRange("ThreadedReader.read", NvtxColor.PURPLE) @@ -1048,7 +1049,7 @@ class RapidsCachingWriter[K, V]( metrics: Map[String, SQLMetric]) extends RapidsCachingWriterBase[K, V](blockManager, handle, mapId, rapidsShuffleServer, catalog) { - private val uncompressedMetric: SQLMetric = metrics("dataSize") + private val uncompressedMetric: SQLMetric = metrics(METRIC_DATA_SIZE) // This is here for the special case where we have no columns like with the .count // case or when we have 0-byte columns. We pick 100 as an arbitrary number so that diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala index f17cfbac13f..332545a99e1 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala @@ -20,6 +20,7 @@ import scala.collection.AbstractIterator import scala.concurrent.Future import com.nvidia.spark.rapids._ +import com.nvidia.spark.rapids.GpuMetric.{DEBUG_LEVEL, ESSENTIAL_LEVEL, MODERATE_LEVEL} import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.shims.{GpuHashPartitioning, GpuRangePartitioning, ShimUnaryExecNode, ShuffleOriginUtil, SparkShimImpl} @@ -37,6 +38,7 @@ import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec} import org.apache.spark.sql.execution.metric._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.GpuShuffleDependency +import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.createAdditionalExchangeMetrics import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.MutablePair @@ -195,24 +197,11 @@ abstract class GpuShuffleExchangeExecBase( SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) lazy val readMetrics = SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) - override lazy val additionalMetrics : Map[String, GpuMetric] = Map( - // dataSize and dataReadSize are uncompressed, one is on write and the - // other on read - "dataSize" -> createSizeMetric(ESSENTIAL_LEVEL,"data size"), - "dataReadSize" -> createSizeMetric(MODERATE_LEVEL, "data read size"), - "rapidsShuffleSerializationTime" -> - createNanoTimingMetric(DEBUG_LEVEL,"rs. serialization time"), - "rapidsShuffleDeserializationTime" -> - createNanoTimingMetric(DEBUG_LEVEL,"rs. deserialization time"), - "rapidsShuffleWriteTime" -> - createNanoTimingMetric(ESSENTIAL_LEVEL,"rs. shuffle write time"), - "rapidsShuffleCombineTime" -> - createNanoTimingMetric(DEBUG_LEVEL,"rs. shuffle combine time"), - "rapidsShuffleWriteIoTime" -> - createNanoTimingMetric(DEBUG_LEVEL,"rs. shuffle write io time"), - "rapidsShuffleReadTime" -> - createNanoTimingMetric(ESSENTIAL_LEVEL,"rs. shuffle read time") - ) ++ GpuMetric.wrap(readMetrics) ++ GpuMetric.wrap(writeMetrics) + override lazy val additionalMetrics : Map[String, GpuMetric] = { + createAdditionalExchangeMetrics(this) ++ + GpuMetric.wrap(readMetrics) ++ + GpuMetric.wrap(writeMetrics) + } // Spark doesn't report totalTime for this operator so we override metrics override lazy val allMetrics: Map[String, GpuMetric] = Map( @@ -233,7 +222,7 @@ abstract class GpuShuffleExchangeExecBase( // This value must be lazy because the child's output may not have been resolved // yet in all cases. private lazy val serializer: Serializer = new GpuColumnarBatchSerializer( - gpuLongMetric("dataSize"), sparkTypes, useKudo) + allMetrics, sparkTypes, useKudo) @transient lazy val inputBatchRDD: RDD[ColumnarBatch] = child.executeColumnar() @@ -276,6 +265,66 @@ abstract class GpuShuffleExchangeExecBase( } object GpuShuffleExchangeExecBase { + val METRIC_DATA_SIZE = "dataSize" + val METRIC_DESC_DATA_SIZE = "data size" + val METRIC_DATA_READ_SIZE = "dataReadSize" + val METRIC_DESC_DATA_READ_SIZE = "data read size" + val METRIC_SHUFFLE_SERIALIZATION_TIME = "rapidsShuffleSerializationTime" + val METRIC_DESC_SHUFFLE_SERIALIZATION_TIME = "RAPIDS shuffle serialization time" + val METRIC_SHUFFLE_SER_STREAM_TIME = "rapidsShuffleSerializationStreamTime" + val METRIC_DESC_SHUFFLE_SER_STREAM_TIME = "RAPIDS shuffle serialization to output stream time" + val METRIC_SHUFFLE_DESERIALIZATION_TIME = "rapidsShuffleDeserializationTime" + val METRIC_DESC_SHUFFLE_DESERIALIZATION_TIME = "RAPIDS shuffle deserialization time" + val METRIC_SHUFFLE_DESER_STREAM_TIME = "rapidsShuffleDeserializationStreamTime" + val METRIC_DESC_SHUFFLE_DESER_STREAM_TIME = + "RAPIDS shuffle deserialization from input stream time" + val METRIC_SHUFFLE_PARTITION_TIME = "rapidsShufflePartitionTime" + val METRIC_DESC_SHUFFLE_PARTITION_TIME = "RAPIDS shuffle partition time" + val METRIC_SHUFFLE_WRITE_TIME = "rapidsShuffleWriteTime" + val METRIC_DESC_SHUFFLE_WRITE_TIME = "RAPIDS shuffle shuffle write time" + val METRIC_SHUFFLE_COMBINE_TIME = "rapidsShuffleCombineTime" + val METRIC_DESC_SHUFFLE_COMBINE_TIME = "RAPIDS shuffle shuffle combine time" + val METRIC_SHUFFLE_WRITE_IO_TIME = "rapidsShuffleWriteIoTime" + val METRIC_DESC_SHUFFLE_WRITE_IO_TIME = "RAPIDS shuffle shuffle write io time" + val METRIC_SHUFFLE_READ_TIME = "rapidsShuffleReadTime" + val METRIC_DESC_SHUFFLE_READ_TIME = "RAPIDS shuffle shuffle read time" + val METRIC_SHUFFLE_SER_CALC_HEADER_TIME = "rapidsShuffleSerializationCalcHeaderTime" + val METRIC_DESC_SHUFFLE_SER_CALC_HEADER_TIME = "RAPIDS shuffle serialization calc header time" + val METRIC_SHUFFLE_SER_COPY_HEADER_TIME = "rapidsShuffleSerializationCopyHeaderTime" + val METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME = "RAPIDS shuffle serialization copy header time" + val METRIC_SHUFFLE_SER_COPY_BUFFER_TIME = "rapidsShuffleSerializationCopyBufferTime" + val METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME = "RAPIDS shuffle serialization copy buffer time" + + def createAdditionalExchangeMetrics(gpu: GpuExec): Map[String, GpuMetric] = Map( + // dataSize and dataReadSize are uncompressed, one is on write and the other on read + METRIC_DATA_SIZE -> gpu.createSizeMetric(ESSENTIAL_LEVEL, METRIC_DESC_DATA_SIZE), + METRIC_DATA_READ_SIZE -> gpu.createSizeMetric(MODERATE_LEVEL, METRIC_DESC_DATA_READ_SIZE), + METRIC_SHUFFLE_SERIALIZATION_TIME -> + gpu.createNanoTimingMetric(DEBUG_LEVEL,METRIC_DESC_SHUFFLE_SERIALIZATION_TIME), + METRIC_SHUFFLE_SER_STREAM_TIME -> + gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_STREAM_TIME), + METRIC_SHUFFLE_DESERIALIZATION_TIME -> + gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_DESERIALIZATION_TIME), + METRIC_SHUFFLE_DESER_STREAM_TIME -> + gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_DESER_STREAM_TIME), + METRIC_SHUFFLE_PARTITION_TIME -> + gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_PARTITION_TIME), + METRIC_SHUFFLE_WRITE_TIME -> + gpu.createNanoTimingMetric(ESSENTIAL_LEVEL, METRIC_DESC_SHUFFLE_WRITE_TIME), + METRIC_SHUFFLE_COMBINE_TIME -> + gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_COMBINE_TIME), + METRIC_SHUFFLE_WRITE_IO_TIME -> + gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_WRITE_IO_TIME), + METRIC_SHUFFLE_READ_TIME -> + gpu.createNanoTimingMetric(ESSENTIAL_LEVEL, METRIC_DESC_SHUFFLE_READ_TIME), + METRIC_SHUFFLE_SER_CALC_HEADER_TIME -> + gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_CALC_HEADER_TIME), + METRIC_SHUFFLE_SER_COPY_HEADER_TIME -> + gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME), + METRIC_SHUFFLE_SER_COPY_BUFFER_TIME -> + gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME) + ) + def prepareBatchShuffleDependency( rdd: RDD[ColumnarBatch], outputAttributes: Seq[Attribute], @@ -315,8 +364,11 @@ object GpuShuffleExchangeExecBase { rdd } val partitioner: GpuExpression = getPartitioner(newRdd, outputAttributes, newPartitioning) + val partitionTime: GpuMetric = metrics(METRIC_SHUFFLE_PARTITION_TIME) def getPartitioned: ColumnarBatch => Any = { - batch => partitioner.columnarEvalAny(batch) + batch => partitionTime.ns { + partitioner.columnarEvalAny(batch) + } } val rddWithPartitionIds: RDD[Product2[Int, ColumnarBatch]] = { newRdd.mapPartitions { iter => diff --git a/tests/src/test/spark321/scala/org/apache/spark/sql/rapids/RapidsShuffleThreadedReaderSuite.scala b/tests/src/test/spark321/scala/org/apache/spark/sql/rapids/RapidsShuffleThreadedReaderSuite.scala index 3958dce6fdb..c006031da7d 100644 --- a/tests/src/test/spark321/scala/org/apache/spark/sql/rapids/RapidsShuffleThreadedReaderSuite.scala +++ b/tests/src/test/spark321/scala/org/apache/spark/sql/rapids/RapidsShuffleThreadedReaderSuite.scala @@ -113,7 +113,8 @@ class RapidsShuffleThreadedReaderSuite val shuffleId = 22 val numMaps = 6 val keyValuePairsPerMap = 10 - val serializer = new GpuColumnarBatchSerializer(NoopMetric, Array.empty, false) + val serializer = new GpuColumnarBatchSerializer(Map.empty.withDefaultValue(NoopMetric), + Array.empty, false) // Make a mock BlockManager that will return RecordingManagedByteBuffers of data, so that we // can ensure retain() and release() are properly called.