From eac58cfbd1dff837c37528eaa6b1c43d1be5376b Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Wed, 27 Nov 2024 11:26:27 +0800 Subject: [PATCH 1/9] Add support for kudo write metrics --- .../delta/GpuOptimizeWriteExchangeExec.scala | 35 ++++----- .../rapids/GpuColumnarBatchSerializer.scala | 52 +++++++++----- .../RapidsShuffleInternalManagerBase.scala | 23 +++--- .../GpuShuffleExchangeExecBase.scala | 72 ++++++++++++++----- 4 files changed, 117 insertions(+), 65 deletions(-) diff --git a/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala b/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala index 0c212d6842a..a3dd5a3e884 100644 --- a/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala +++ b/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala @@ -28,8 +28,8 @@ import scala.concurrent.duration.Duration import com.databricks.sql.transaction.tahoe.sources.DeltaSQLConf import com.nvidia.spark.rapids.{GpuColumnarBatchSerializer, GpuExec, GpuMetric, GpuPartitioning, GpuRoundRobinPartitioning, RapidsConf} import com.nvidia.spark.rapids.delta.RapidsDeltaSQLConf - import org.apache.spark.{MapOutputStatistics, ShuffleDependency} + import org.apache.spark.network.util.ByteUnit import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer @@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.{CoalescedPartitionSpec, ShufflePartitionS import org.apache.spark.sql.execution.exchange.Exchange import org.apache.spark.sql.execution.metric.{SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} import org.apache.spark.sql.rapids.execution.{GpuShuffleExchangeExecBase, ShuffledBatchRDD} +import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.{METRIC_DATA_READ_SIZE, METRIC_DATA_SIZE, METRIC_DESC_DATA_READ_SIZE, METRIC_DESC_DATA_SIZE, METRIC_DESC_SHUFFLE_COMBINE_TIME, METRIC_DESC_SHUFFLE_DESERIALIZATION_TIME, METRIC_DESC_SHUFFLE_PARTITION_TIME, METRIC_DESC_SHUFFLE_READ_TIME, METRIC_DESC_SHUFFLE_SERIALIZATION_TIME, METRIC_DESC_SHUFFLE_WRITE_IO_TIME, METRIC_DESC_SHUFFLE_WRITE_TIME, METRIC_SHUFFLE_COMBINE_TIME, METRIC_SHUFFLE_DESERIALIZATION_TIME, METRIC_SHUFFLE_PARTITION_TIME, METRIC_SHUFFLE_READ_TIME, METRIC_SHUFFLE_SERIALIZATION_TIME, METRIC_SHUFFLE_WRITE_IO_TIME, METRIC_SHUFFLE_WRITE_TIME} import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.ThreadUtils @@ -72,20 +73,22 @@ case class GpuOptimizeWriteExchangeExec( SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val additionalMetrics: Map[String, GpuMetric] = Map( - "dataSize" -> createSizeMetric(ESSENTIAL_LEVEL, "data size"), - "dataReadSize" -> createSizeMetric(MODERATE_LEVEL, "data read size"), - "rapidsShuffleSerializationTime" -> - createNanoTimingMetric(DEBUG_LEVEL, "rs. serialization time"), - "rapidsShuffleDeserializationTime" -> - createNanoTimingMetric(DEBUG_LEVEL, "rs. deserialization time"), - "rapidsShuffleWriteTime" -> - createNanoTimingMetric(ESSENTIAL_LEVEL, "rs. shuffle write time"), - "rapidsShuffleCombineTime" -> - createNanoTimingMetric(DEBUG_LEVEL, "rs. shuffle combine time"), - "rapidsShuffleWriteIoTime" -> - createNanoTimingMetric(DEBUG_LEVEL, "rs. shuffle write io time"), - "rapidsShuffleReadTime" -> - createNanoTimingMetric(ESSENTIAL_LEVEL, "rs. shuffle read time") + METRIC_DATA_SIZE -> createSizeMetric(ESSENTIAL_LEVEL, METRIC_DESC_DATA_SIZE), + METRIC_DATA_READ_SIZE -> createSizeMetric(MODERATE_LEVEL, METRIC_DESC_DATA_READ_SIZE), + METRIC_SHUFFLE_SERIALIZATION_TIME -> + createNanoTimingMetric(DEBUG_LEVEL,METRIC_DESC_SHUFFLE_SERIALIZATION_TIME), + METRIC_SHUFFLE_DESERIALIZATION_TIME -> + createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_DESERIALIZATION_TIME), + METRIC_SHUFFLE_PARTITION_TIME -> + createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_PARTITION_TIME), + METRIC_SHUFFLE_WRITE_TIME -> + createNanoTimingMetric(ESSENTIAL_LEVEL, METRIC_DESC_SHUFFLE_WRITE_TIME), + METRIC_SHUFFLE_COMBINE_TIME -> + createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_COMBINE_TIME), + METRIC_SHUFFLE_WRITE_IO_TIME -> + createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_WRITE_IO_TIME), + METRIC_SHUFFLE_READ_TIME -> + createNanoTimingMetric(ESSENTIAL_LEVEL, METRIC_DESC_SHUFFLE_READ_TIME) ) ++ GpuMetric.wrap(readMetrics) ++ GpuMetric.wrap(writeMetrics) override lazy val allMetrics: Map[String, GpuMetric] = { @@ -98,7 +101,7 @@ case class GpuOptimizeWriteExchangeExec( } private lazy val serializer: Serializer = - new GpuColumnarBatchSerializer(gpuLongMetric("dataSize"), + new GpuColumnarBatchSerializer(allMetrics, child.output.map(_.dataType).toArray, RapidsConf.SHUFFLE_KUDO_SERIALIZER_ENABLED.get(child.conf)) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala index 54252253d38..fc520cffa2b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala @@ -28,9 +28,10 @@ import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import com.nvidia.spark.rapids.jni.kudo.{KudoSerializer, KudoTable, KudoTableHeader} - import org.apache.spark.TaskContext + import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer, SerializerInstance} +import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.{METRIC_DATA_SIZE, METRIC_SHUFFLE_DESERIALIZATION_TIME, METRIC_SHUFFLE_SER_CALC_HEADER_TIME, METRIC_SHUFFLE_SER_COPY_BUFFER_TIME, METRIC_SHUFFLE_SER_COPY_HEADER_TIME, METRIC_SHUFFLE_SERIALIZATION_TIME} import org.apache.spark.sql.types.{DataType, NullType} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -45,7 +46,7 @@ trait BaseSerializedTableIterator extends Iterator[(Int, ColumnarBatch)] { def peekNextBatchSize(): Option[Long] } -class SerializedBatchIterator(dIn: DataInputStream) +class SerializedBatchIterator(dIn: DataInputStream, deserTime: GpuMetric) extends BaseSerializedTableIterator { private[this] var nextHeader: Option[SerializedTableHeader] = None private[this] var toBeReturned: Option[ColumnarBatch] = None @@ -60,7 +61,7 @@ class SerializedBatchIterator(dIn: DataInputStream) } } - override def peekNextBatchSize(): Option[Long] = { + override def peekNextBatchSize(): Option[Long] = deserTime.ns { if (streamClosed) { None } else { @@ -80,7 +81,7 @@ class SerializedBatchIterator(dIn: DataInputStream) } } - private def tryReadNext(): Option[ColumnarBatch] = { + private def tryReadNext(): Option[ColumnarBatch] = deserTime.ns { if (nextHeader.isEmpty) { None } else { @@ -137,26 +138,32 @@ class SerializedBatchIterator(dIn: DataInputStream) * * @note The RAPIDS shuffle does not use this code. */ -class GpuColumnarBatchSerializer(dataSize: GpuMetric, dataTypes: Array[DataType], useKudo: Boolean) +class GpuColumnarBatchSerializer(metrics: Map[String, GpuMetric], dataTypes: Array[DataType], + useKudo: Boolean) extends Serializer with Serializable { override def newInstance(): SerializerInstance = { if (useKudo) { - new KudoSerializerInstance(dataSize, dataTypes) + new KudoSerializerInstance(metrics, dataTypes) } else { - new GpuColumnarBatchSerializerInstance(dataSize) + new GpuColumnarBatchSerializerInstance(metrics) } } override def supportsRelocationOfSerializedObjects: Boolean = true } -private class GpuColumnarBatchSerializerInstance(dataSize: GpuMetric) extends SerializerInstance { +private class GpuColumnarBatchSerializerInstance(metrics: Map[String, GpuMetric]) extends + SerializerInstance { + private val dataSize = metrics(METRIC_DATA_SIZE) + private val serTime = metrics(METRIC_SHUFFLE_SERIALIZATION_TIME) + private val deserTime = metrics(METRIC_SHUFFLE_DESERIALIZATION_TIME) + override def serializeStream(out: OutputStream): SerializationStream = new SerializationStream { private[this] val dOut: DataOutputStream = new DataOutputStream(new BufferedOutputStream(out)) - override def writeValue[T: ClassTag](value: T): SerializationStream = { + override def writeValue[T: ClassTag](value: T): SerializationStream = serTime.ns { val batch = value.asInstanceOf[ColumnarBatch] val numColumns = batch.numCols() val columns: Array[HostColumnVector] = new Array(numColumns) @@ -239,7 +246,7 @@ private class GpuColumnarBatchSerializerInstance(dataSize: GpuMetric) extends Se private[this] val dIn: DataInputStream = new DataInputStream(new BufferedInputStream(in)) override def asKeyValueIterator: Iterator[(Int, ColumnarBatch)] = { - new SerializedBatchIterator(dIn) + new SerializedBatchIterator(dIn, deserTime) } override def asIterator: Iterator[Any] = { @@ -339,8 +346,14 @@ object SerializedTableColumn { * @param dataTypes data types of the columns in the batch */ private class KudoSerializerInstance( - val dataSize: GpuMetric, + val metrics: Map[String, GpuMetric], val dataTypes: Array[DataType]) extends SerializerInstance { + private val dataSize = metrics(METRIC_DATA_SIZE) + private val serTime = metrics(METRIC_SHUFFLE_SERIALIZATION_TIME) + private val serCalcHeaderTime = metrics(METRIC_SHUFFLE_SER_CALC_HEADER_TIME) + private val serCopyHeaderTime = metrics(METRIC_SHUFFLE_SER_COPY_HEADER_TIME) + private val serCopyBufferTime = metrics(METRIC_SHUFFLE_SER_COPY_BUFFER_TIME) + private val deserTime = metrics(METRIC_SHUFFLE_DESERIALIZATION_TIME) private lazy val kudo = new KudoSerializer(GpuColumnVector.from(dataTypes)) @@ -348,7 +361,7 @@ private class KudoSerializerInstance( private[this] val dOut: DataOutputStream = new DataOutputStream(new BufferedOutputStream(out)) - override def writeValue[T: ClassTag](value: T): SerializationStream = { + override def writeValue[T: ClassTag](value: T): SerializationStream = serTime.ns { val batch = value.asInstanceOf[ColumnarBatch] val numColumns = batch.numCols() val columns: Array[HostColumnVector] = new Array(numColumns) @@ -380,7 +393,12 @@ private class KudoSerializerInstance( } withResource(new NvtxRange("Serialize Batch", NvtxColor.YELLOW)) { _ => - dataSize += kudo.writeToStream(columns, dOut, startRow, numRows) + val writeMetric = kudo.writeToStream(columns, dOut, startRow, numRows) + + dataSize += writeMetric.getWrittenBytes + serCalcHeaderTime += writeMetric.getCalcHeaderTime + serCopyHeaderTime += writeMetric.getCopyHeaderTime + serCopyBufferTime += writeMetric.getCopyBufferTime } } else { withResource(new NvtxRange("Serialize Row Only Batch", NvtxColor.YELLOW)) { _ => @@ -422,7 +440,7 @@ private class KudoSerializerInstance( private[this] val dIn: DataInputStream = new DataInputStream(new BufferedInputStream(in)) override def asKeyValueIterator: Iterator[(Int, ColumnarBatch)] = { - new KudoSerializedBatchIterator(dIn) + new KudoSerializedBatchIterator(dIn, deserTime) } override def asIterator: Iterator[Any] = { @@ -495,7 +513,7 @@ object KudoSerializedTableColumn { } } -class KudoSerializedBatchIterator(dIn: DataInputStream) +class KudoSerializedBatchIterator(dIn: DataInputStream, deserTime: GpuMetric) extends BaseSerializedTableIterator { private[this] var nextHeader: Option[KudoTableHeader] = None private[this] var toBeReturned: Option[ColumnarBatch] = None @@ -510,7 +528,7 @@ class KudoSerializedBatchIterator(dIn: DataInputStream) } } - override def peekNextBatchSize(): Option[Long] = { + override def peekNextBatchSize(): Option[Long] = deserTime.ns { if (streamClosed) { None } else { @@ -530,7 +548,7 @@ class KudoSerializedBatchIterator(dIn: DataInputStream) } } - private def tryReadNext(): Option[ColumnarBatch] = { + private def tryReadNext(): Option[ColumnarBatch] = deserTime.ns { if (nextHeader.isEmpty) { None } else { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala index 05bc76c3fab..018cb5ab2bd 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala @@ -31,8 +31,8 @@ import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import com.nvidia.spark.rapids.format.TableMeta import com.nvidia.spark.rapids.shuffle.{RapidsShuffleRequestHandler, RapidsShuffleServer, RapidsShuffleTransport} - import org.apache.spark.{InterruptibleIterator, MapOutputTracker, ShuffleDependency, SparkConf, SparkEnv, TaskContext} + import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.internal.{config, Logging} import org.apache.spark.io.CompressionCodec @@ -42,6 +42,7 @@ import org.apache.spark.shuffle.{ShuffleWriter, _} import org.apache.spark.shuffle.api._ import org.apache.spark.shuffle.sort.{BypassMergeSortShuffleHandle, SortShuffleManager} import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.{METRIC_DATA_READ_SIZE, METRIC_DATA_SIZE, METRIC_SHUFFLE_COMBINE_TIME, METRIC_SHUFFLE_DESERIALIZATION_TIME, METRIC_SHUFFLE_READ_TIME, METRIC_SHUFFLE_SERIALIZATION_TIME, METRIC_SHUFFLE_WRITE_IO_TIME, METRIC_SHUFFLE_WRITE_TIME} import org.apache.spark.sql.rapids.shims.{GpuShuffleBlockResolver, RapidsShuffleThreadedReader, RapidsShuffleThreadedWriter} import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.storage.{RapidsShuffleBlockFetcherIterator, _} @@ -245,14 +246,12 @@ abstract class RapidsShuffleThreadedWriterBase[K, V]( extends RapidsShuffleWriter[K, V] with RapidsShuffleWriterShimHelper { private val metrics = handle.metrics - private val serializationTimeMetric = - metrics.get("rapidsShuffleSerializationTime") private val shuffleWriteTimeMetric = - metrics.get("rapidsShuffleWriteTime") + metrics.get(METRIC_SHUFFLE_WRITE_TIME) private val shuffleCombineTimeMetric = - metrics.get("rapidsShuffleCombineTime") + metrics.get(METRIC_SHUFFLE_COMBINE_TIME) private val ioTimeMetric = - metrics.get("rapidsShuffleWriteIoTime") + metrics.get(METRIC_SHUFFLE_WRITE_IO_TIME) private val dep: ShuffleDependency[K, V, V] = handle.dependency private val shuffleId = dep.shuffleId private val partitioner = dep.partitioner @@ -429,11 +428,9 @@ abstract class RapidsShuffleThreadedWriterBase[K, V]( // counted in the ioTime val totalPerRecordWriteTime = recordWriteTime.get() + ioTimeNs val ioRatio = (ioTimeNs.toDouble/totalPerRecordWriteTime) - val serializationRatio = 1.0 - ioRatio // update metrics, note that we expect them to be relative to the task ioTimeMetric.foreach(_ += (ioRatio * writeTimeNs).toLong) - serializationTimeMetric.foreach(_ += (serializationRatio * writeTimeNs).toLong) // we add all three here because this metric is meant to show the time // we are blocked on writes shuffleWriteTimeMetric.foreach(_ += (writeTimeNs + combineTimeNs)) @@ -596,9 +593,8 @@ abstract class RapidsShuffleThreadedReaderBase[K, C]( private val sqlMetrics = handle.metrics private val dep = handle.dependency - private val deserializationTimeNs = sqlMetrics.get("rapidsShuffleDeserializationTime") - private val shuffleReadTimeNs = sqlMetrics.get("rapidsShuffleReadTime") - private val dataReadSize = sqlMetrics.get("dataReadSize") + private val shuffleReadTimeNs = sqlMetrics.get(METRIC_SHUFFLE_READ_TIME) + private val dataReadSize = sqlMetrics.get(METRIC_DATA_READ_SIZE) private var shuffleReadRange: NvtxRange = new NvtxRange("ThreadedReader.read", NvtxColor.PURPLE) @@ -679,7 +675,6 @@ abstract class RapidsShuffleThreadedReaderBase[K, C]( } val res = currentIter.next() val fetchTime = System.nanoTime() - fetchTimeStart - deserializationTimeNs.foreach(_ += (fetchTime - readBlockedTime)) shuffleReadTimeNs.foreach(_ += fetchTime) res } @@ -852,7 +847,6 @@ abstract class RapidsShuffleThreadedReaderBase[K, C]( case _ => 0 // TODO: do we need to handle other types here? } waitTime += System.nanoTime() - waitTimeStart - deserializationTimeNs.foreach(_ += waitTime) shuffleReadTimeNs.foreach(_ += waitTime) res } @@ -958,7 +952,6 @@ abstract class RapidsShuffleThreadedReaderBase[K, C]( } // keep track of the overall metric which includes blocked time val fetchTime = System.nanoTime() - fetchTimeStart - deserializationTimeNs.foreach(_ += (fetchTime - readBlockedTime)) shuffleReadTimeNs.foreach(_ += fetchTime) } } @@ -1048,7 +1041,7 @@ class RapidsCachingWriter[K, V]( metrics: Map[String, SQLMetric]) extends RapidsCachingWriterBase[K, V](blockManager, handle, mapId, rapidsShuffleServer, catalog) { - private val uncompressedMetric: SQLMetric = metrics("dataSize") + private val uncompressedMetric: SQLMetric = metrics(METRIC_DATA_SIZE) // This is here for the special case where we have no columns like with the .count // case or when we have 0-byte columns. We pick 100 as an arbitrary number so that diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala index f17cfbac13f..be49ef6736f 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala @@ -22,8 +22,8 @@ import scala.concurrent.Future import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.shims.{GpuHashPartitioning, GpuRangePartitioning, ShimUnaryExecNode, ShuffleOriginUtil, SparkShimImpl} - import org.apache.spark.{MapOutputStatistics, ShuffleDependency} + import org.apache.spark.rapids.shims.GpuShuffleExchangeExec import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer @@ -37,6 +37,7 @@ import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec} import org.apache.spark.sql.execution.metric._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.GpuShuffleDependency +import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.{METRIC_DATA_READ_SIZE, METRIC_DATA_SIZE, METRIC_DESC_DATA_READ_SIZE, METRIC_DESC_DATA_SIZE, METRIC_DESC_SHUFFLE_COMBINE_TIME, METRIC_DESC_SHUFFLE_DESERIALIZATION_TIME, METRIC_DESC_SHUFFLE_PARTITION_TIME, METRIC_DESC_SHUFFLE_READ_TIME, METRIC_DESC_SHUFFLE_SER_CALC_HEADER_TIME, METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME, METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME, METRIC_DESC_SHUFFLE_SERIALIZATION_TIME, METRIC_DESC_SHUFFLE_WRITE_IO_TIME, METRIC_DESC_SHUFFLE_WRITE_TIME, METRIC_SHUFFLE_COMBINE_TIME, METRIC_SHUFFLE_DESERIALIZATION_TIME, METRIC_SHUFFLE_PARTITION_TIME, METRIC_SHUFFLE_READ_TIME, METRIC_SHUFFLE_SER_CALC_HEADER_TIME, METRIC_SHUFFLE_SER_COPY_BUFFER_TIME, METRIC_SHUFFLE_SER_COPY_HEADER_TIME, METRIC_SHUFFLE_SERIALIZATION_TIME, METRIC_SHUFFLE_WRITE_IO_TIME, METRIC_SHUFFLE_WRITE_TIME} import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.MutablePair @@ -198,20 +199,28 @@ abstract class GpuShuffleExchangeExecBase( override lazy val additionalMetrics : Map[String, GpuMetric] = Map( // dataSize and dataReadSize are uncompressed, one is on write and the // other on read - "dataSize" -> createSizeMetric(ESSENTIAL_LEVEL,"data size"), - "dataReadSize" -> createSizeMetric(MODERATE_LEVEL, "data read size"), - "rapidsShuffleSerializationTime" -> - createNanoTimingMetric(DEBUG_LEVEL,"rs. serialization time"), - "rapidsShuffleDeserializationTime" -> - createNanoTimingMetric(DEBUG_LEVEL,"rs. deserialization time"), - "rapidsShuffleWriteTime" -> - createNanoTimingMetric(ESSENTIAL_LEVEL,"rs. shuffle write time"), - "rapidsShuffleCombineTime" -> - createNanoTimingMetric(DEBUG_LEVEL,"rs. shuffle combine time"), - "rapidsShuffleWriteIoTime" -> - createNanoTimingMetric(DEBUG_LEVEL,"rs. shuffle write io time"), - "rapidsShuffleReadTime" -> - createNanoTimingMetric(ESSENTIAL_LEVEL,"rs. shuffle read time") + METRIC_DATA_SIZE -> createSizeMetric(ESSENTIAL_LEVEL, METRIC_DESC_DATA_SIZE), + METRIC_DATA_READ_SIZE -> createSizeMetric(MODERATE_LEVEL, METRIC_DESC_DATA_READ_SIZE), + METRIC_SHUFFLE_SERIALIZATION_TIME -> + createNanoTimingMetric(DEBUG_LEVEL,METRIC_DESC_SHUFFLE_SERIALIZATION_TIME), + METRIC_SHUFFLE_DESERIALIZATION_TIME -> + createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_DESERIALIZATION_TIME), + METRIC_SHUFFLE_PARTITION_TIME -> + createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_PARTITION_TIME), + METRIC_SHUFFLE_WRITE_TIME -> + createNanoTimingMetric(ESSENTIAL_LEVEL, METRIC_DESC_SHUFFLE_WRITE_TIME), + METRIC_SHUFFLE_COMBINE_TIME -> + createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_COMBINE_TIME), + METRIC_SHUFFLE_WRITE_IO_TIME -> + createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_WRITE_IO_TIME), + METRIC_SHUFFLE_READ_TIME -> + createNanoTimingMetric(ESSENTIAL_LEVEL, METRIC_DESC_SHUFFLE_READ_TIME), + METRIC_SHUFFLE_SER_CALC_HEADER_TIME -> + createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_CALC_HEADER_TIME), + METRIC_SHUFFLE_SER_COPY_HEADER_TIME -> + createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME), + METRIC_SHUFFLE_SER_COPY_BUFFER_TIME -> + createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME) ) ++ GpuMetric.wrap(readMetrics) ++ GpuMetric.wrap(writeMetrics) // Spark doesn't report totalTime for this operator so we override metrics @@ -233,7 +242,7 @@ abstract class GpuShuffleExchangeExecBase( // This value must be lazy because the child's output may not have been resolved // yet in all cases. private lazy val serializer: Serializer = new GpuColumnarBatchSerializer( - gpuLongMetric("dataSize"), sparkTypes, useKudo) + allMetrics, sparkTypes, useKudo) @transient lazy val inputBatchRDD: RDD[ColumnarBatch] = child.executeColumnar() @@ -276,6 +285,32 @@ abstract class GpuShuffleExchangeExecBase( } object GpuShuffleExchangeExecBase { + val METRIC_DATA_SIZE = "dataSize" + val METRIC_DESC_DATA_SIZE = "data size" + val METRIC_DATA_READ_SIZE = "dataReadSize" + val METRIC_DESC_DATA_READ_SIZE = "data read size" + val METRIC_SHUFFLE_SERIALIZATION_TIME = "rapidsShuffleSerializationTime" + val METRIC_DESC_SHUFFLE_SERIALIZATION_TIME = "rs. serialization time" + val METRIC_SHUFFLE_DESERIALIZATION_TIME = "rapidsShuffleDeserializationTime" + val METRIC_DESC_SHUFFLE_DESERIALIZATION_TIME = "rs. deserialization time" + val METRIC_SHUFFLE_PARTITION_TIME = "rapidsShufflePartitionTime" + val METRIC_DESC_SHUFFLE_PARTITION_TIME = "rs. partition time" + val METRIC_SHUFFLE_WRITE_TIME = "rapidsShuffleWriteTime" + val METRIC_DESC_SHUFFLE_WRITE_TIME = "rs. shuffle write time" + val METRIC_SHUFFLE_COMBINE_TIME = "rapidsShuffleCombineTime" + val METRIC_DESC_SHUFFLE_COMBINE_TIME = "rs. shuffle combine time" + val METRIC_SHUFFLE_WRITE_IO_TIME = "rapidsShuffleWriteIoTime" + val METRIC_DESC_SHUFFLE_WRITE_IO_TIME = "rs. shuffle write io time" + val METRIC_SHUFFLE_READ_TIME = "rapidsShuffleReadTime" + val METRIC_DESC_SHUFFLE_READ_TIME = "rs. shuffle read time" + val METRIC_SHUFFLE_SER_CALC_HEADER_TIME = "rapidsShuffleSerializationCalcHeaderTime" + val METRIC_DESC_SHUFFLE_SER_CALC_HEADER_TIME = "rs. serialization calc header time" + val METRIC_SHUFFLE_SER_COPY_HEADER_TIME = "rapidsShuffleSerializationCopyHeaderTime" + val METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME = "rs. serialization copy header time" + val METRIC_SHUFFLE_SER_COPY_BUFFER_TIME = "rapidsShuffleSerializationCopyBufferTime" + val METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME = "rs. serialization copy buffer time" + + def prepareBatchShuffleDependency( rdd: RDD[ColumnarBatch], outputAttributes: Seq[Attribute], @@ -315,8 +350,11 @@ object GpuShuffleExchangeExecBase { rdd } val partitioner: GpuExpression = getPartitioner(newRdd, outputAttributes, newPartitioning) + val partitonTime: GpuMetric = metrics(METRIC_SHUFFLE_PARTITION_TIME) def getPartitioned: ColumnarBatch => Any = { - batch => partitioner.columnarEvalAny(batch) + batch => partitonTime.ns { + partitioner.columnarEvalAny(batch) + } } val rddWithPartitionIds: RDD[Product2[Int, ColumnarBatch]] = { newRdd.mapPartitions { iter => From 7fd3ed97bc7828d07967c80e779333bd4daa81a1 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Wed, 27 Nov 2024 11:34:02 +0800 Subject: [PATCH 2/9] Refactor Signed-off-by: liurenjie1024 --- .../delta/GpuOptimizeWriteExchangeExec.scala | 27 +++----- .../GpuShuffleExchangeExecBase.scala | 62 ++++++++++--------- 2 files changed, 41 insertions(+), 48 deletions(-) diff --git a/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala b/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala index a3dd5a3e884..b3be25d14e9 100644 --- a/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala +++ b/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala @@ -28,8 +28,8 @@ import scala.concurrent.duration.Duration import com.databricks.sql.transaction.tahoe.sources.DeltaSQLConf import com.nvidia.spark.rapids.{GpuColumnarBatchSerializer, GpuExec, GpuMetric, GpuPartitioning, GpuRoundRobinPartitioning, RapidsConf} import com.nvidia.spark.rapids.delta.RapidsDeltaSQLConf -import org.apache.spark.{MapOutputStatistics, ShuffleDependency} +import org.apache.spark.{MapOutputStatistics, ShuffleDependency} import org.apache.spark.network.util.ByteUnit import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer @@ -39,7 +39,7 @@ import org.apache.spark.sql.execution.{CoalescedPartitionSpec, ShufflePartitionS import org.apache.spark.sql.execution.exchange.Exchange import org.apache.spark.sql.execution.metric.{SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} import org.apache.spark.sql.rapids.execution.{GpuShuffleExchangeExecBase, ShuffledBatchRDD} -import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.{METRIC_DATA_READ_SIZE, METRIC_DATA_SIZE, METRIC_DESC_DATA_READ_SIZE, METRIC_DESC_DATA_SIZE, METRIC_DESC_SHUFFLE_COMBINE_TIME, METRIC_DESC_SHUFFLE_DESERIALIZATION_TIME, METRIC_DESC_SHUFFLE_PARTITION_TIME, METRIC_DESC_SHUFFLE_READ_TIME, METRIC_DESC_SHUFFLE_SERIALIZATION_TIME, METRIC_DESC_SHUFFLE_WRITE_IO_TIME, METRIC_DESC_SHUFFLE_WRITE_TIME, METRIC_SHUFFLE_COMBINE_TIME, METRIC_SHUFFLE_DESERIALIZATION_TIME, METRIC_SHUFFLE_PARTITION_TIME, METRIC_SHUFFLE_READ_TIME, METRIC_SHUFFLE_SERIALIZATION_TIME, METRIC_SHUFFLE_WRITE_IO_TIME, METRIC_SHUFFLE_WRITE_TIME} +import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.createAdditionalExchangeMetris import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.ThreadUtils @@ -72,24 +72,11 @@ case class GpuOptimizeWriteExchangeExec( private[sql] lazy val readMetrics = SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) - override lazy val additionalMetrics: Map[String, GpuMetric] = Map( - METRIC_DATA_SIZE -> createSizeMetric(ESSENTIAL_LEVEL, METRIC_DESC_DATA_SIZE), - METRIC_DATA_READ_SIZE -> createSizeMetric(MODERATE_LEVEL, METRIC_DESC_DATA_READ_SIZE), - METRIC_SHUFFLE_SERIALIZATION_TIME -> - createNanoTimingMetric(DEBUG_LEVEL,METRIC_DESC_SHUFFLE_SERIALIZATION_TIME), - METRIC_SHUFFLE_DESERIALIZATION_TIME -> - createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_DESERIALIZATION_TIME), - METRIC_SHUFFLE_PARTITION_TIME -> - createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_PARTITION_TIME), - METRIC_SHUFFLE_WRITE_TIME -> - createNanoTimingMetric(ESSENTIAL_LEVEL, METRIC_DESC_SHUFFLE_WRITE_TIME), - METRIC_SHUFFLE_COMBINE_TIME -> - createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_COMBINE_TIME), - METRIC_SHUFFLE_WRITE_IO_TIME -> - createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_WRITE_IO_TIME), - METRIC_SHUFFLE_READ_TIME -> - createNanoTimingMetric(ESSENTIAL_LEVEL, METRIC_DESC_SHUFFLE_READ_TIME) - ) ++ GpuMetric.wrap(readMetrics) ++ GpuMetric.wrap(writeMetrics) + override lazy val additionalMetrics : Map[String, GpuMetric] = { + createAdditionalExchangeMetris(this) ++ + GpuMetric.wrap(readMetrics) ++ + GpuMetric.wrap(writeMetrics) + } override lazy val allMetrics: Map[String, GpuMetric] = { Map( diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala index be49ef6736f..dc1bf6671c4 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala @@ -20,10 +20,11 @@ import scala.collection.AbstractIterator import scala.concurrent.Future import com.nvidia.spark.rapids._ +import com.nvidia.spark.rapids.GpuMetric.{DEBUG_LEVEL, ESSENTIAL_LEVEL, MODERATE_LEVEL} import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.shims.{GpuHashPartitioning, GpuRangePartitioning, ShimUnaryExecNode, ShuffleOriginUtil, SparkShimImpl} -import org.apache.spark.{MapOutputStatistics, ShuffleDependency} +import org.apache.spark.{MapOutputStatistics, ShuffleDependency} import org.apache.spark.rapids.shims.GpuShuffleExchangeExec import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer @@ -37,7 +38,7 @@ import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec} import org.apache.spark.sql.execution.metric._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.GpuShuffleDependency -import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.{METRIC_DATA_READ_SIZE, METRIC_DATA_SIZE, METRIC_DESC_DATA_READ_SIZE, METRIC_DESC_DATA_SIZE, METRIC_DESC_SHUFFLE_COMBINE_TIME, METRIC_DESC_SHUFFLE_DESERIALIZATION_TIME, METRIC_DESC_SHUFFLE_PARTITION_TIME, METRIC_DESC_SHUFFLE_READ_TIME, METRIC_DESC_SHUFFLE_SER_CALC_HEADER_TIME, METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME, METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME, METRIC_DESC_SHUFFLE_SERIALIZATION_TIME, METRIC_DESC_SHUFFLE_WRITE_IO_TIME, METRIC_DESC_SHUFFLE_WRITE_TIME, METRIC_SHUFFLE_COMBINE_TIME, METRIC_SHUFFLE_DESERIALIZATION_TIME, METRIC_SHUFFLE_PARTITION_TIME, METRIC_SHUFFLE_READ_TIME, METRIC_SHUFFLE_SER_CALC_HEADER_TIME, METRIC_SHUFFLE_SER_COPY_BUFFER_TIME, METRIC_SHUFFLE_SER_COPY_HEADER_TIME, METRIC_SHUFFLE_SERIALIZATION_TIME, METRIC_SHUFFLE_WRITE_IO_TIME, METRIC_SHUFFLE_WRITE_TIME} +import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.{createAdditionalExchangeMetris, METRIC_DATA_READ_SIZE, METRIC_DATA_SIZE, METRIC_DESC_DATA_READ_SIZE, METRIC_DESC_DATA_SIZE, METRIC_DESC_SHUFFLE_COMBINE_TIME, METRIC_DESC_SHUFFLE_DESERIALIZATION_TIME, METRIC_DESC_SHUFFLE_PARTITION_TIME, METRIC_DESC_SHUFFLE_READ_TIME, METRIC_DESC_SHUFFLE_SER_CALC_HEADER_TIME, METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME, METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME, METRIC_DESC_SHUFFLE_SERIALIZATION_TIME, METRIC_DESC_SHUFFLE_WRITE_IO_TIME, METRIC_DESC_SHUFFLE_WRITE_TIME, METRIC_SHUFFLE_COMBINE_TIME, METRIC_SHUFFLE_DESERIALIZATION_TIME, METRIC_SHUFFLE_PARTITION_TIME, METRIC_SHUFFLE_READ_TIME, METRIC_SHUFFLE_SER_CALC_HEADER_TIME, METRIC_SHUFFLE_SER_COPY_BUFFER_TIME, METRIC_SHUFFLE_SER_COPY_HEADER_TIME, METRIC_SHUFFLE_SERIALIZATION_TIME, METRIC_SHUFFLE_WRITE_IO_TIME, METRIC_SHUFFLE_WRITE_TIME} import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.MutablePair @@ -196,32 +197,11 @@ abstract class GpuShuffleExchangeExecBase( SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) lazy val readMetrics = SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) - override lazy val additionalMetrics : Map[String, GpuMetric] = Map( - // dataSize and dataReadSize are uncompressed, one is on write and the - // other on read - METRIC_DATA_SIZE -> createSizeMetric(ESSENTIAL_LEVEL, METRIC_DESC_DATA_SIZE), - METRIC_DATA_READ_SIZE -> createSizeMetric(MODERATE_LEVEL, METRIC_DESC_DATA_READ_SIZE), - METRIC_SHUFFLE_SERIALIZATION_TIME -> - createNanoTimingMetric(DEBUG_LEVEL,METRIC_DESC_SHUFFLE_SERIALIZATION_TIME), - METRIC_SHUFFLE_DESERIALIZATION_TIME -> - createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_DESERIALIZATION_TIME), - METRIC_SHUFFLE_PARTITION_TIME -> - createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_PARTITION_TIME), - METRIC_SHUFFLE_WRITE_TIME -> - createNanoTimingMetric(ESSENTIAL_LEVEL, METRIC_DESC_SHUFFLE_WRITE_TIME), - METRIC_SHUFFLE_COMBINE_TIME -> - createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_COMBINE_TIME), - METRIC_SHUFFLE_WRITE_IO_TIME -> - createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_WRITE_IO_TIME), - METRIC_SHUFFLE_READ_TIME -> - createNanoTimingMetric(ESSENTIAL_LEVEL, METRIC_DESC_SHUFFLE_READ_TIME), - METRIC_SHUFFLE_SER_CALC_HEADER_TIME -> - createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_CALC_HEADER_TIME), - METRIC_SHUFFLE_SER_COPY_HEADER_TIME -> - createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME), - METRIC_SHUFFLE_SER_COPY_BUFFER_TIME -> - createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME) - ) ++ GpuMetric.wrap(readMetrics) ++ GpuMetric.wrap(writeMetrics) + override lazy val additionalMetrics : Map[String, GpuMetric] = { + createAdditionalExchangeMetris(this) ++ + GpuMetric.wrap(readMetrics) ++ + GpuMetric.wrap(writeMetrics) + } // Spark doesn't report totalTime for this operator so we override metrics override lazy val allMetrics: Map[String, GpuMetric] = Map( @@ -310,6 +290,32 @@ object GpuShuffleExchangeExecBase { val METRIC_SHUFFLE_SER_COPY_BUFFER_TIME = "rapidsShuffleSerializationCopyBufferTime" val METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME = "rs. serialization copy buffer time" + def createAdditionalExchangeMetris(gpu: GpuExec): Map[String, GpuMetric] = Map( + // dataSize and dataReadSize are uncompressed, one is on write and the + // other on read + METRIC_DATA_SIZE -> gpu.createSizeMetric(ESSENTIAL_LEVEL, METRIC_DESC_DATA_SIZE), + METRIC_DATA_READ_SIZE -> gpu.createSizeMetric(MODERATE_LEVEL, METRIC_DESC_DATA_READ_SIZE), + METRIC_SHUFFLE_SERIALIZATION_TIME -> + gpu.createNanoTimingMetric(DEBUG_LEVEL,METRIC_DESC_SHUFFLE_SERIALIZATION_TIME), + METRIC_SHUFFLE_DESERIALIZATION_TIME -> + gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_DESERIALIZATION_TIME), + METRIC_SHUFFLE_PARTITION_TIME -> + gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_PARTITION_TIME), + METRIC_SHUFFLE_WRITE_TIME -> + gpu.createNanoTimingMetric(ESSENTIAL_LEVEL, METRIC_DESC_SHUFFLE_WRITE_TIME), + METRIC_SHUFFLE_COMBINE_TIME -> + gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_COMBINE_TIME), + METRIC_SHUFFLE_WRITE_IO_TIME -> + gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_WRITE_IO_TIME), + METRIC_SHUFFLE_READ_TIME -> + gpu.createNanoTimingMetric(ESSENTIAL_LEVEL, METRIC_DESC_SHUFFLE_READ_TIME), + METRIC_SHUFFLE_SER_CALC_HEADER_TIME -> + gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_CALC_HEADER_TIME), + METRIC_SHUFFLE_SER_COPY_HEADER_TIME -> + gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME), + METRIC_SHUFFLE_SER_COPY_BUFFER_TIME -> + gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME) + ) def prepareBatchShuffleDependency( rdd: RDD[ColumnarBatch], From 9805c84cc4271f6b46fec0e1f81d317c133994d3 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Thu, 28 Nov 2024 12:08:03 +0800 Subject: [PATCH 3/9] Address comments --- .../rapids/GpuColumnarBatchSerializer.scala | 14 ++++---- .../RapidsShuffleInternalManagerBase.scala | 8 +++++ .../GpuShuffleExchangeExecBase.scala | 35 +++++++++++-------- 3 files changed, 35 insertions(+), 22 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala index 797bd64d258..b2db675b1c5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala @@ -31,7 +31,7 @@ import com.nvidia.spark.rapids.jni.kudo.{KudoSerializer, KudoTable, KudoTableHea import org.apache.spark.TaskContext import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer, SerializerInstance} -import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.{METRIC_DATA_SIZE, METRIC_SHUFFLE_DESERIALIZATION_TIME, METRIC_SHUFFLE_SER_CALC_HEADER_TIME, METRIC_SHUFFLE_SER_COPY_BUFFER_TIME, METRIC_SHUFFLE_SER_COPY_HEADER_TIME, METRIC_SHUFFLE_SERIALIZATION_TIME} +import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.{METRIC_DATA_SIZE, METRIC_SHUFFLE_DESER_STREAM_TIME, METRIC_SHUFFLE_DESERIALIZATION_TIME, METRIC_SHUFFLE_SER_CALC_HEADER_TIME, METRIC_SHUFFLE_SER_COPY_BUFFER_TIME, METRIC_SHUFFLE_SER_COPY_HEADER_TIME, METRIC_SHUFFLE_SER_STREAM_TIME, METRIC_SHUFFLE_SERIALIZATION_TIME} import org.apache.spark.sql.types.{DataType, NullType} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -79,7 +79,7 @@ class SerializedBatchIterator(dIn: DataInputStream, deserTime: GpuMetric) } } - private def readNextBatch(): ColumnarBatch = { + private def readNextBatch(): ColumnarBatch = deserTime.ns { withResource(new NvtxRange("Read Batch", NvtxColor.YELLOW)) { _ => val header = nextHeader.get nextHeader = None @@ -143,8 +143,8 @@ class GpuColumnarBatchSerializer(metrics: Map[String, GpuMetric], dataTypes: Arr private class GpuColumnarBatchSerializerInstance(metrics: Map[String, GpuMetric]) extends SerializerInstance { private val dataSize = metrics(METRIC_DATA_SIZE) - private val serTime = metrics(METRIC_SHUFFLE_SERIALIZATION_TIME) - private val deserTime = metrics(METRIC_SHUFFLE_DESERIALIZATION_TIME) + private val serTime = metrics(METRIC_SHUFFLE_SER_STREAM_TIME) + private val deserTime = metrics(METRIC_SHUFFLE_DESER_STREAM_TIME) override def serializeStream(out: OutputStream): SerializationStream = new SerializationStream { @@ -337,11 +337,11 @@ private class KudoSerializerInstance( val metrics: Map[String, GpuMetric], val dataTypes: Array[DataType]) extends SerializerInstance { private val dataSize = metrics(METRIC_DATA_SIZE) - private val serTime = metrics(METRIC_SHUFFLE_SERIALIZATION_TIME) + private val serTime = metrics(METRIC_SHUFFLE_SER_STREAM_TIME) private val serCalcHeaderTime = metrics(METRIC_SHUFFLE_SER_CALC_HEADER_TIME) private val serCopyHeaderTime = metrics(METRIC_SHUFFLE_SER_COPY_HEADER_TIME) private val serCopyBufferTime = metrics(METRIC_SHUFFLE_SER_COPY_BUFFER_TIME) - private val deserTime = metrics(METRIC_SHUFFLE_DESERIALIZATION_TIME) + private val deserTime = metrics(METRIC_SHUFFLE_DESER_STREAM_TIME) private lazy val kudo = new KudoSerializer(GpuColumnVector.from(dataTypes)) @@ -534,7 +534,7 @@ class KudoSerializedBatchIterator(dIn: DataInputStream, deserTime: GpuMetric) } } - private def readNextBatch(): ColumnarBatch = { + private def readNextBatch(): ColumnarBatch = deserTime.ns { withResource(new NvtxRange("Read Batch", NvtxColor.YELLOW)) { _ => val header = nextHeader.get nextHeader = None diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala index 018cb5ab2bd..d2c423ee3be 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala @@ -246,6 +246,8 @@ abstract class RapidsShuffleThreadedWriterBase[K, V]( extends RapidsShuffleWriter[K, V] with RapidsShuffleWriterShimHelper { private val metrics = handle.metrics + private val serializationTimeMetric = + metrics.get(METRIC_SHUFFLE_SERIALIZATION_TIME) private val shuffleWriteTimeMetric = metrics.get(METRIC_SHUFFLE_WRITE_TIME) private val shuffleCombineTimeMetric = @@ -428,9 +430,11 @@ abstract class RapidsShuffleThreadedWriterBase[K, V]( // counted in the ioTime val totalPerRecordWriteTime = recordWriteTime.get() + ioTimeNs val ioRatio = (ioTimeNs.toDouble/totalPerRecordWriteTime) + val serializationRatio = 1.0 - ioRatio // update metrics, note that we expect them to be relative to the task ioTimeMetric.foreach(_ += (ioRatio * writeTimeNs).toLong) + serializationTimeMetric.foreach(_ += (serializationRatio * writeTimeNs).toLong) // we add all three here because this metric is meant to show the time // we are blocked on writes shuffleWriteTimeMetric.foreach(_ += (writeTimeNs + combineTimeNs)) @@ -593,6 +597,7 @@ abstract class RapidsShuffleThreadedReaderBase[K, C]( private val sqlMetrics = handle.metrics private val dep = handle.dependency + private val deserializationTimeNs = sqlMetrics.get(METRIC_SHUFFLE_DESERIALIZATION_TIME) private val shuffleReadTimeNs = sqlMetrics.get(METRIC_SHUFFLE_READ_TIME) private val dataReadSize = sqlMetrics.get(METRIC_DATA_READ_SIZE) @@ -675,6 +680,7 @@ abstract class RapidsShuffleThreadedReaderBase[K, C]( } val res = currentIter.next() val fetchTime = System.nanoTime() - fetchTimeStart + deserializationTimeNs.foreach(_ += (fetchTime - readBlockedTime)) shuffleReadTimeNs.foreach(_ += fetchTime) res } @@ -847,6 +853,7 @@ abstract class RapidsShuffleThreadedReaderBase[K, C]( case _ => 0 // TODO: do we need to handle other types here? } waitTime += System.nanoTime() - waitTimeStart + deserializationTimeNs.foreach(_ += waitTime) shuffleReadTimeNs.foreach(_ += waitTime) res } @@ -952,6 +959,7 @@ abstract class RapidsShuffleThreadedReaderBase[K, C]( } // keep track of the overall metric which includes blocked time val fetchTime = System.nanoTime() - fetchTimeStart + deserializationTimeNs.foreach(_ += (fetchTime - readBlockedTime)) shuffleReadTimeNs.foreach(_ += fetchTime) } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala index dc1bf6671c4..153072f3dc5 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec} import org.apache.spark.sql.execution.metric._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.GpuShuffleDependency -import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.{createAdditionalExchangeMetris, METRIC_DATA_READ_SIZE, METRIC_DATA_SIZE, METRIC_DESC_DATA_READ_SIZE, METRIC_DESC_DATA_SIZE, METRIC_DESC_SHUFFLE_COMBINE_TIME, METRIC_DESC_SHUFFLE_DESERIALIZATION_TIME, METRIC_DESC_SHUFFLE_PARTITION_TIME, METRIC_DESC_SHUFFLE_READ_TIME, METRIC_DESC_SHUFFLE_SER_CALC_HEADER_TIME, METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME, METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME, METRIC_DESC_SHUFFLE_SERIALIZATION_TIME, METRIC_DESC_SHUFFLE_WRITE_IO_TIME, METRIC_DESC_SHUFFLE_WRITE_TIME, METRIC_SHUFFLE_COMBINE_TIME, METRIC_SHUFFLE_DESERIALIZATION_TIME, METRIC_SHUFFLE_PARTITION_TIME, METRIC_SHUFFLE_READ_TIME, METRIC_SHUFFLE_SER_CALC_HEADER_TIME, METRIC_SHUFFLE_SER_COPY_BUFFER_TIME, METRIC_SHUFFLE_SER_COPY_HEADER_TIME, METRIC_SHUFFLE_SERIALIZATION_TIME, METRIC_SHUFFLE_WRITE_IO_TIME, METRIC_SHUFFLE_WRITE_TIME} +import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.createAdditionalExchangeMetrics import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.MutablePair @@ -198,7 +198,7 @@ abstract class GpuShuffleExchangeExecBase( lazy val readMetrics = SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val additionalMetrics : Map[String, GpuMetric] = { - createAdditionalExchangeMetris(this) ++ + createAdditionalExchangeMetrics(this) ++ GpuMetric.wrap(readMetrics) ++ GpuMetric.wrap(writeMetrics) } @@ -270,27 +270,32 @@ object GpuShuffleExchangeExecBase { val METRIC_DATA_READ_SIZE = "dataReadSize" val METRIC_DESC_DATA_READ_SIZE = "data read size" val METRIC_SHUFFLE_SERIALIZATION_TIME = "rapidsShuffleSerializationTime" - val METRIC_DESC_SHUFFLE_SERIALIZATION_TIME = "rs. serialization time" + val METRIC_DESC_SHUFFLE_SERIALIZATION_TIME = "rapids shuffle serialization time" + val METRIC_SHUFFLE_SER_STREAM_TIME = "rapidsShuffleSerializationStreamTime" + val METRIC_DESC_SHUFFLE_SER_STREAM_TIME = "rapids shuffle serialization to output stream time" val METRIC_SHUFFLE_DESERIALIZATION_TIME = "rapidsShuffleDeserializationTime" - val METRIC_DESC_SHUFFLE_DESERIALIZATION_TIME = "rs. deserialization time" + val METRIC_DESC_SHUFFLE_DESERIALIZATION_TIME = "rapids shuffle deserialization time" + val METRIC_SHUFFLE_DESER_STREAM_TIME = "rapidsShuffleDeserializationStreamTime" + val METRIC_DESC_SHUFFLE_DESER_STREAM_TIME = + "rapids shuffle deserialization from input stream time" val METRIC_SHUFFLE_PARTITION_TIME = "rapidsShufflePartitionTime" - val METRIC_DESC_SHUFFLE_PARTITION_TIME = "rs. partition time" + val METRIC_DESC_SHUFFLE_PARTITION_TIME = "rapids shuffle partition time" val METRIC_SHUFFLE_WRITE_TIME = "rapidsShuffleWriteTime" - val METRIC_DESC_SHUFFLE_WRITE_TIME = "rs. shuffle write time" + val METRIC_DESC_SHUFFLE_WRITE_TIME = "rapids shuffle shuffle write time" val METRIC_SHUFFLE_COMBINE_TIME = "rapidsShuffleCombineTime" - val METRIC_DESC_SHUFFLE_COMBINE_TIME = "rs. shuffle combine time" + val METRIC_DESC_SHUFFLE_COMBINE_TIME = "rapids shuffle shuffle combine time" val METRIC_SHUFFLE_WRITE_IO_TIME = "rapidsShuffleWriteIoTime" - val METRIC_DESC_SHUFFLE_WRITE_IO_TIME = "rs. shuffle write io time" + val METRIC_DESC_SHUFFLE_WRITE_IO_TIME = "rapids shuffle shuffle write io time" val METRIC_SHUFFLE_READ_TIME = "rapidsShuffleReadTime" - val METRIC_DESC_SHUFFLE_READ_TIME = "rs. shuffle read time" + val METRIC_DESC_SHUFFLE_READ_TIME = "rapids shuffle shuffle read time" val METRIC_SHUFFLE_SER_CALC_HEADER_TIME = "rapidsShuffleSerializationCalcHeaderTime" - val METRIC_DESC_SHUFFLE_SER_CALC_HEADER_TIME = "rs. serialization calc header time" + val METRIC_DESC_SHUFFLE_SER_CALC_HEADER_TIME = "rapids shuffle serialization calc header time" val METRIC_SHUFFLE_SER_COPY_HEADER_TIME = "rapidsShuffleSerializationCopyHeaderTime" - val METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME = "rs. serialization copy header time" + val METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME = "rapids shuffle serialization copy header time" val METRIC_SHUFFLE_SER_COPY_BUFFER_TIME = "rapidsShuffleSerializationCopyBufferTime" - val METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME = "rs. serialization copy buffer time" + val METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME = "rapids shuffle serialization copy buffer time" - def createAdditionalExchangeMetris(gpu: GpuExec): Map[String, GpuMetric] = Map( + def createAdditionalExchangeMetrics(gpu: GpuExec): Map[String, GpuMetric] = Map( // dataSize and dataReadSize are uncompressed, one is on write and the // other on read METRIC_DATA_SIZE -> gpu.createSizeMetric(ESSENTIAL_LEVEL, METRIC_DESC_DATA_SIZE), @@ -356,9 +361,9 @@ object GpuShuffleExchangeExecBase { rdd } val partitioner: GpuExpression = getPartitioner(newRdd, outputAttributes, newPartitioning) - val partitonTime: GpuMetric = metrics(METRIC_SHUFFLE_PARTITION_TIME) + val partitionTime: GpuMetric = metrics(METRIC_SHUFFLE_PARTITION_TIME) def getPartitioned: ColumnarBatch => Any = { - batch => partitonTime.ns { + batch => partitionTime.ns { partitioner.columnarEvalAny(batch) } } From 4d5f56865340d6061c89e67f95314c37a8aed1da Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Wed, 4 Dec 2024 12:52:29 +0800 Subject: [PATCH 4/9] Resolve comments --- .../GpuShuffleExchangeExecBase.scala | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala index 153072f3dc5..332545a99e1 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala @@ -270,40 +270,43 @@ object GpuShuffleExchangeExecBase { val METRIC_DATA_READ_SIZE = "dataReadSize" val METRIC_DESC_DATA_READ_SIZE = "data read size" val METRIC_SHUFFLE_SERIALIZATION_TIME = "rapidsShuffleSerializationTime" - val METRIC_DESC_SHUFFLE_SERIALIZATION_TIME = "rapids shuffle serialization time" + val METRIC_DESC_SHUFFLE_SERIALIZATION_TIME = "RAPIDS shuffle serialization time" val METRIC_SHUFFLE_SER_STREAM_TIME = "rapidsShuffleSerializationStreamTime" - val METRIC_DESC_SHUFFLE_SER_STREAM_TIME = "rapids shuffle serialization to output stream time" + val METRIC_DESC_SHUFFLE_SER_STREAM_TIME = "RAPIDS shuffle serialization to output stream time" val METRIC_SHUFFLE_DESERIALIZATION_TIME = "rapidsShuffleDeserializationTime" - val METRIC_DESC_SHUFFLE_DESERIALIZATION_TIME = "rapids shuffle deserialization time" + val METRIC_DESC_SHUFFLE_DESERIALIZATION_TIME = "RAPIDS shuffle deserialization time" val METRIC_SHUFFLE_DESER_STREAM_TIME = "rapidsShuffleDeserializationStreamTime" val METRIC_DESC_SHUFFLE_DESER_STREAM_TIME = - "rapids shuffle deserialization from input stream time" + "RAPIDS shuffle deserialization from input stream time" val METRIC_SHUFFLE_PARTITION_TIME = "rapidsShufflePartitionTime" - val METRIC_DESC_SHUFFLE_PARTITION_TIME = "rapids shuffle partition time" + val METRIC_DESC_SHUFFLE_PARTITION_TIME = "RAPIDS shuffle partition time" val METRIC_SHUFFLE_WRITE_TIME = "rapidsShuffleWriteTime" - val METRIC_DESC_SHUFFLE_WRITE_TIME = "rapids shuffle shuffle write time" + val METRIC_DESC_SHUFFLE_WRITE_TIME = "RAPIDS shuffle shuffle write time" val METRIC_SHUFFLE_COMBINE_TIME = "rapidsShuffleCombineTime" - val METRIC_DESC_SHUFFLE_COMBINE_TIME = "rapids shuffle shuffle combine time" + val METRIC_DESC_SHUFFLE_COMBINE_TIME = "RAPIDS shuffle shuffle combine time" val METRIC_SHUFFLE_WRITE_IO_TIME = "rapidsShuffleWriteIoTime" - val METRIC_DESC_SHUFFLE_WRITE_IO_TIME = "rapids shuffle shuffle write io time" + val METRIC_DESC_SHUFFLE_WRITE_IO_TIME = "RAPIDS shuffle shuffle write io time" val METRIC_SHUFFLE_READ_TIME = "rapidsShuffleReadTime" - val METRIC_DESC_SHUFFLE_READ_TIME = "rapids shuffle shuffle read time" + val METRIC_DESC_SHUFFLE_READ_TIME = "RAPIDS shuffle shuffle read time" val METRIC_SHUFFLE_SER_CALC_HEADER_TIME = "rapidsShuffleSerializationCalcHeaderTime" - val METRIC_DESC_SHUFFLE_SER_CALC_HEADER_TIME = "rapids shuffle serialization calc header time" + val METRIC_DESC_SHUFFLE_SER_CALC_HEADER_TIME = "RAPIDS shuffle serialization calc header time" val METRIC_SHUFFLE_SER_COPY_HEADER_TIME = "rapidsShuffleSerializationCopyHeaderTime" - val METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME = "rapids shuffle serialization copy header time" + val METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME = "RAPIDS shuffle serialization copy header time" val METRIC_SHUFFLE_SER_COPY_BUFFER_TIME = "rapidsShuffleSerializationCopyBufferTime" - val METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME = "rapids shuffle serialization copy buffer time" + val METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME = "RAPIDS shuffle serialization copy buffer time" def createAdditionalExchangeMetrics(gpu: GpuExec): Map[String, GpuMetric] = Map( - // dataSize and dataReadSize are uncompressed, one is on write and the - // other on read + // dataSize and dataReadSize are uncompressed, one is on write and the other on read METRIC_DATA_SIZE -> gpu.createSizeMetric(ESSENTIAL_LEVEL, METRIC_DESC_DATA_SIZE), METRIC_DATA_READ_SIZE -> gpu.createSizeMetric(MODERATE_LEVEL, METRIC_DESC_DATA_READ_SIZE), METRIC_SHUFFLE_SERIALIZATION_TIME -> gpu.createNanoTimingMetric(DEBUG_LEVEL,METRIC_DESC_SHUFFLE_SERIALIZATION_TIME), + METRIC_SHUFFLE_SER_STREAM_TIME -> + gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_STREAM_TIME), METRIC_SHUFFLE_DESERIALIZATION_TIME -> gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_DESERIALIZATION_TIME), + METRIC_SHUFFLE_DESER_STREAM_TIME -> + gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_DESER_STREAM_TIME), METRIC_SHUFFLE_PARTITION_TIME -> gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_PARTITION_TIME), METRIC_SHUFFLE_WRITE_TIME -> From f35a903187a2aae0639785284d1a3cb7ad327450 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Wed, 4 Dec 2024 12:55:08 +0800 Subject: [PATCH 5/9] Fix compiler --- .../com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala index b2db675b1c5..38110a86970 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala @@ -381,7 +381,7 @@ private class KudoSerializerInstance( } withResource(new NvtxRange("Serialize Batch", NvtxColor.YELLOW)) { _ => - val writeMetric = kudo.writeToStream(columns, dOut, startRow, numRows) + val writeMetric = kudo.writeToStreamWithMetrics(columns, dOut, startRow, numRows) dataSize += writeMetric.getWrittenBytes serCalcHeaderTime += writeMetric.getCalcHeaderTime From deeaecbca4c72e5cea53268265ec403075b077d6 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Thu, 5 Dec 2024 10:34:01 +0800 Subject: [PATCH 6/9] Fix build break --- .../spark/rapids/GpuColumnarBatchSerializer.scala | 7 +++++-- .../main/scala/com/nvidia/spark/rapids/GpuExec.scala | 10 +++++----- .../sql/rapids/RapidsShuffleThreadedReaderSuite.scala | 3 ++- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala index 38110a86970..82f368936a7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala @@ -28,10 +28,13 @@ import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import com.nvidia.spark.rapids.jni.kudo.{KudoSerializer, KudoTable, KudoTableHeader} -import org.apache.spark.TaskContext +import org.apache.spark.TaskContext import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer, SerializerInstance} -import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.{METRIC_DATA_SIZE, METRIC_SHUFFLE_DESER_STREAM_TIME, METRIC_SHUFFLE_DESERIALIZATION_TIME, METRIC_SHUFFLE_SER_CALC_HEADER_TIME, METRIC_SHUFFLE_SER_COPY_BUFFER_TIME, METRIC_SHUFFLE_SER_COPY_HEADER_TIME, METRIC_SHUFFLE_SER_STREAM_TIME, METRIC_SHUFFLE_SERIALIZATION_TIME} +import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.{METRIC_DATA_SIZE, + METRIC_SHUFFLE_DESER_STREAM_TIME, METRIC_SHUFFLE_SER_CALC_HEADER_TIME, + METRIC_SHUFFLE_SER_COPY_BUFFER_TIME, METRIC_SHUFFLE_SER_COPY_HEADER_TIME, + METRIC_SHUFFLE_SER_STREAM_TIME} import org.apache.spark.sql.types.{DataType, NullType} import org.apache.spark.sql.vectorized.ColumnarBatch diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala index 3d9b6285a91..850a04f390f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala @@ -327,19 +327,19 @@ trait GpuExec extends SparkPlan { } } - protected def createMetric(level: MetricsLevel, name: String): GpuMetric = + def createMetric(level: MetricsLevel, name: String): GpuMetric = createMetricInternal(level, SQLMetrics.createMetric(sparkContext, name)) - protected def createNanoTimingMetric(level: MetricsLevel, name: String): GpuMetric = + def createNanoTimingMetric(level: MetricsLevel, name: String): GpuMetric = createMetricInternal(level, SQLMetrics.createNanoTimingMetric(sparkContext, name)) - protected def createSizeMetric(level: MetricsLevel, name: String): GpuMetric = + def createSizeMetric(level: MetricsLevel, name: String): GpuMetric = createMetricInternal(level, SQLMetrics.createSizeMetric(sparkContext, name)) - protected def createAverageMetric(level: MetricsLevel, name: String): GpuMetric = + def createAverageMetric(level: MetricsLevel, name: String): GpuMetric = createMetricInternal(level, SQLMetrics.createAverageMetric(sparkContext, name)) - protected def createTimingMetric(level: MetricsLevel, name: String): GpuMetric = + def createTimingMetric(level: MetricsLevel, name: String): GpuMetric = createMetricInternal(level, SQLMetrics.createTimingMetric(sparkContext, name)) protected def createFileCacheMetrics(): Map[String, GpuMetric] = { diff --git a/tests/src/test/spark321/scala/org/apache/spark/sql/rapids/RapidsShuffleThreadedReaderSuite.scala b/tests/src/test/spark321/scala/org/apache/spark/sql/rapids/RapidsShuffleThreadedReaderSuite.scala index 3958dce6fdb..c006031da7d 100644 --- a/tests/src/test/spark321/scala/org/apache/spark/sql/rapids/RapidsShuffleThreadedReaderSuite.scala +++ b/tests/src/test/spark321/scala/org/apache/spark/sql/rapids/RapidsShuffleThreadedReaderSuite.scala @@ -113,7 +113,8 @@ class RapidsShuffleThreadedReaderSuite val shuffleId = 22 val numMaps = 6 val keyValuePairsPerMap = 10 - val serializer = new GpuColumnarBatchSerializer(NoopMetric, Array.empty, false) + val serializer = new GpuColumnarBatchSerializer(Map.empty.withDefaultValue(NoopMetric), + Array.empty, false) // Make a mock BlockManager that will return RecordingManagedByteBuffers of data, so that we // can ensure retain() and release() are properly called. From 742e9abeefbcc495120db98fd0658a9a2ddc205c Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Thu, 5 Dec 2024 11:06:49 +0800 Subject: [PATCH 7/9] Fix build break --- .../spark/sql/rapids/RapidsShuffleInternalManagerBase.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala index d2c423ee3be..fc255a6bfd0 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala @@ -31,8 +31,8 @@ import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import com.nvidia.spark.rapids.format.TableMeta import com.nvidia.spark.rapids.shuffle.{RapidsShuffleRequestHandler, RapidsShuffleServer, RapidsShuffleTransport} -import org.apache.spark.{InterruptibleIterator, MapOutputTracker, ShuffleDependency, SparkConf, SparkEnv, TaskContext} +import org.apache.spark.{InterruptibleIterator, MapOutputTracker, ShuffleDependency, SparkConf, SparkEnv, TaskContext} import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.internal.{config, Logging} import org.apache.spark.io.CompressionCodec From 954bd2556f0c7ed90845c345f6ea8b9400ccc58f Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Thu, 5 Dec 2024 11:57:47 +0800 Subject: [PATCH 8/9] Fix build break --- .../spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala b/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala index b3be25d14e9..62a720002e4 100644 --- a/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala +++ b/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.execution.{CoalescedPartitionSpec, ShufflePartitionS import org.apache.spark.sql.execution.exchange.Exchange import org.apache.spark.sql.execution.metric.{SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} import org.apache.spark.sql.rapids.execution.{GpuShuffleExchangeExecBase, ShuffledBatchRDD} -import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.createAdditionalExchangeMetris +import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.createAdditionalExchangeMetrics import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.ThreadUtils From 7722f23c227df5d0b168933f6b1fe0d67c56a3db Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Thu, 5 Dec 2024 13:13:48 +0800 Subject: [PATCH 9/9] Fix build break --- .../spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala b/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala index 62a720002e4..5b26943a541 100644 --- a/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala +++ b/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala @@ -73,7 +73,7 @@ case class GpuOptimizeWriteExchangeExec( SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val additionalMetrics : Map[String, GpuMetric] = { - createAdditionalExchangeMetris(this) ++ + createAdditionalExchangeMetrics(this) ++ GpuMetric.wrap(readMetrics) ++ GpuMetric.wrap(writeMetrics) }