Skip to content

Commit

Permalink
Add support for kudo write metrics (#11784)
Browse files Browse the repository at this point in the history
* Add support for kudo write metrics

* Refactor

Signed-off-by: liurenjie1024 <[email protected]>

* Address comments

* Resolve comments

* Fix compiler

* Fix build break

* Fix build break

* Fix build break

* Fix build break

---------

Signed-off-by: liurenjie1024 <[email protected]>
  • Loading branch information
liurenjie1024 authored Dec 6, 2024
1 parent f3ac8be commit 30c4ddb
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.{CoalescedPartitionSpec, ShufflePartitionS
import org.apache.spark.sql.execution.exchange.Exchange
import org.apache.spark.sql.execution.metric.{SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
import org.apache.spark.sql.rapids.execution.{GpuShuffleExchangeExecBase, ShuffledBatchRDD}
import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.createAdditionalExchangeMetrics
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.ThreadUtils

Expand Down Expand Up @@ -71,22 +72,11 @@ case class GpuOptimizeWriteExchangeExec(
private[sql] lazy val readMetrics =
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)

override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
"dataSize" -> createSizeMetric(ESSENTIAL_LEVEL, "data size"),
"dataReadSize" -> createSizeMetric(MODERATE_LEVEL, "data read size"),
"rapidsShuffleSerializationTime" ->
createNanoTimingMetric(DEBUG_LEVEL, "rs. serialization time"),
"rapidsShuffleDeserializationTime" ->
createNanoTimingMetric(DEBUG_LEVEL, "rs. deserialization time"),
"rapidsShuffleWriteTime" ->
createNanoTimingMetric(ESSENTIAL_LEVEL, "rs. shuffle write time"),
"rapidsShuffleCombineTime" ->
createNanoTimingMetric(DEBUG_LEVEL, "rs. shuffle combine time"),
"rapidsShuffleWriteIoTime" ->
createNanoTimingMetric(DEBUG_LEVEL, "rs. shuffle write io time"),
"rapidsShuffleReadTime" ->
createNanoTimingMetric(ESSENTIAL_LEVEL, "rs. shuffle read time")
) ++ GpuMetric.wrap(readMetrics) ++ GpuMetric.wrap(writeMetrics)
override lazy val additionalMetrics : Map[String, GpuMetric] = {
createAdditionalExchangeMetrics(this) ++
GpuMetric.wrap(readMetrics) ++
GpuMetric.wrap(writeMetrics)
}

override lazy val allMetrics: Map[String, GpuMetric] = {
Map(
Expand All @@ -98,7 +88,7 @@ case class GpuOptimizeWriteExchangeExec(
}

private lazy val serializer: Serializer =
new GpuColumnarBatchSerializer(gpuLongMetric("dataSize"),
new GpuColumnarBatchSerializer(allMetrics,
child.output.map(_.dataType).toArray,
RapidsConf.SHUFFLE_KUDO_SERIALIZER_ENABLED.get(child.conf))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ import com.nvidia.spark.rapids.jni.kudo.{KudoSerializer, KudoTable, KudoTableHea

import org.apache.spark.TaskContext
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer, SerializerInstance}
import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.{METRIC_DATA_SIZE,
METRIC_SHUFFLE_DESER_STREAM_TIME, METRIC_SHUFFLE_SER_CALC_HEADER_TIME,
METRIC_SHUFFLE_SER_COPY_BUFFER_TIME, METRIC_SHUFFLE_SER_COPY_HEADER_TIME,
METRIC_SHUFFLE_SER_STREAM_TIME}
import org.apache.spark.sql.types.{DataType, NullType}
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand All @@ -45,7 +49,7 @@ trait BaseSerializedTableIterator extends Iterator[(Int, ColumnarBatch)] {
def peekNextBatchSize(): Option[Long]
}

class SerializedBatchIterator(dIn: DataInputStream)
class SerializedBatchIterator(dIn: DataInputStream, deserTime: GpuMetric)
extends BaseSerializedTableIterator {
private[this] var nextHeader: Option[SerializedTableHeader] = None
private[this] var streamClosed: Boolean = false
Expand All @@ -58,7 +62,7 @@ class SerializedBatchIterator(dIn: DataInputStream)
}
}

override def peekNextBatchSize(): Option[Long] = {
override def peekNextBatchSize(): Option[Long] = deserTime.ns {
if (streamClosed) {
None
} else {
Expand All @@ -78,7 +82,7 @@ class SerializedBatchIterator(dIn: DataInputStream)
}
}

private def readNextBatch(): ColumnarBatch = {
private def readNextBatch(): ColumnarBatch = deserTime.ns {
withResource(new NvtxRange("Read Batch", NvtxColor.YELLOW)) { _ =>
val header = nextHeader.get
nextHeader = None
Expand Down Expand Up @@ -125,26 +129,32 @@ class SerializedBatchIterator(dIn: DataInputStream)
*
* @note The RAPIDS shuffle does not use this code.
*/
class GpuColumnarBatchSerializer(dataSize: GpuMetric, dataTypes: Array[DataType], useKudo: Boolean)
class GpuColumnarBatchSerializer(metrics: Map[String, GpuMetric], dataTypes: Array[DataType],
useKudo: Boolean)
extends Serializer with Serializable {
override def newInstance(): SerializerInstance = {
if (useKudo) {
new KudoSerializerInstance(dataSize, dataTypes)
new KudoSerializerInstance(metrics, dataTypes)
} else {
new GpuColumnarBatchSerializerInstance(dataSize)
new GpuColumnarBatchSerializerInstance(metrics)
}
}

override def supportsRelocationOfSerializedObjects: Boolean = true
}

private class GpuColumnarBatchSerializerInstance(dataSize: GpuMetric) extends SerializerInstance {
private class GpuColumnarBatchSerializerInstance(metrics: Map[String, GpuMetric]) extends
SerializerInstance {
private val dataSize = metrics(METRIC_DATA_SIZE)
private val serTime = metrics(METRIC_SHUFFLE_SER_STREAM_TIME)
private val deserTime = metrics(METRIC_SHUFFLE_DESER_STREAM_TIME)


override def serializeStream(out: OutputStream): SerializationStream = new SerializationStream {
private[this] val dOut: DataOutputStream =
new DataOutputStream(new BufferedOutputStream(out))

override def writeValue[T: ClassTag](value: T): SerializationStream = {
override def writeValue[T: ClassTag](value: T): SerializationStream = serTime.ns {
val batch = value.asInstanceOf[ColumnarBatch]
val numColumns = batch.numCols()
val columns: Array[HostColumnVector] = new Array(numColumns)
Expand Down Expand Up @@ -227,7 +237,7 @@ private class GpuColumnarBatchSerializerInstance(dataSize: GpuMetric) extends Se
private[this] val dIn: DataInputStream = new DataInputStream(new BufferedInputStream(in))

override def asKeyValueIterator: Iterator[(Int, ColumnarBatch)] = {
new SerializedBatchIterator(dIn)
new SerializedBatchIterator(dIn, deserTime)
}

override def asIterator: Iterator[Any] = {
Expand Down Expand Up @@ -327,16 +337,22 @@ object SerializedTableColumn {
* @param dataTypes data types of the columns in the batch
*/
private class KudoSerializerInstance(
val dataSize: GpuMetric,
val metrics: Map[String, GpuMetric],
val dataTypes: Array[DataType]) extends SerializerInstance {
private val dataSize = metrics(METRIC_DATA_SIZE)
private val serTime = metrics(METRIC_SHUFFLE_SER_STREAM_TIME)
private val serCalcHeaderTime = metrics(METRIC_SHUFFLE_SER_CALC_HEADER_TIME)
private val serCopyHeaderTime = metrics(METRIC_SHUFFLE_SER_COPY_HEADER_TIME)
private val serCopyBufferTime = metrics(METRIC_SHUFFLE_SER_COPY_BUFFER_TIME)
private val deserTime = metrics(METRIC_SHUFFLE_DESER_STREAM_TIME)

private lazy val kudo = new KudoSerializer(GpuColumnVector.from(dataTypes))

override def serializeStream(out: OutputStream): SerializationStream = new SerializationStream {
private[this] val dOut: DataOutputStream =
new DataOutputStream(new BufferedOutputStream(out))

override def writeValue[T: ClassTag](value: T): SerializationStream = {
override def writeValue[T: ClassTag](value: T): SerializationStream = serTime.ns {
val batch = value.asInstanceOf[ColumnarBatch]
val numColumns = batch.numCols()
val columns: Array[HostColumnVector] = new Array(numColumns)
Expand Down Expand Up @@ -368,7 +384,12 @@ private class KudoSerializerInstance(
}

withResource(new NvtxRange("Serialize Batch", NvtxColor.YELLOW)) { _ =>
dataSize += kudo.writeToStream(columns, dOut, startRow, numRows)
val writeMetric = kudo.writeToStreamWithMetrics(columns, dOut, startRow, numRows)

dataSize += writeMetric.getWrittenBytes
serCalcHeaderTime += writeMetric.getCalcHeaderTime
serCopyHeaderTime += writeMetric.getCopyHeaderTime
serCopyBufferTime += writeMetric.getCopyBufferTime
}
} else {
withResource(new NvtxRange("Serialize Row Only Batch", NvtxColor.YELLOW)) { _ =>
Expand Down Expand Up @@ -410,7 +431,7 @@ private class KudoSerializerInstance(
private[this] val dIn: DataInputStream = new DataInputStream(new BufferedInputStream(in))

override def asKeyValueIterator: Iterator[(Int, ColumnarBatch)] = {
new KudoSerializedBatchIterator(dIn)
new KudoSerializedBatchIterator(dIn, deserTime)
}

override def asIterator: Iterator[Any] = {
Expand Down Expand Up @@ -483,7 +504,7 @@ object KudoSerializedTableColumn {
}
}

class KudoSerializedBatchIterator(dIn: DataInputStream)
class KudoSerializedBatchIterator(dIn: DataInputStream, deserTime: GpuMetric)
extends BaseSerializedTableIterator {
private[this] var nextHeader: Option[KudoTableHeader] = None
private[this] var streamClosed: Boolean = false
Expand All @@ -496,7 +517,7 @@ class KudoSerializedBatchIterator(dIn: DataInputStream)
}
}

override def peekNextBatchSize(): Option[Long] = {
override def peekNextBatchSize(): Option[Long] = deserTime.ns {
if (streamClosed) {
None
} else {
Expand All @@ -516,7 +537,7 @@ class KudoSerializedBatchIterator(dIn: DataInputStream)
}
}

private def readNextBatch(): ColumnarBatch = {
private def readNextBatch(): ColumnarBatch = deserTime.ns {
withResource(new NvtxRange("Read Batch", NvtxColor.YELLOW)) { _ =>
val header = nextHeader.get
nextHeader = None
Expand Down
10 changes: 5 additions & 5 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -327,19 +327,19 @@ trait GpuExec extends SparkPlan {
}
}

protected def createMetric(level: MetricsLevel, name: String): GpuMetric =
def createMetric(level: MetricsLevel, name: String): GpuMetric =
createMetricInternal(level, SQLMetrics.createMetric(sparkContext, name))

protected def createNanoTimingMetric(level: MetricsLevel, name: String): GpuMetric =
def createNanoTimingMetric(level: MetricsLevel, name: String): GpuMetric =
createMetricInternal(level, SQLMetrics.createNanoTimingMetric(sparkContext, name))

protected def createSizeMetric(level: MetricsLevel, name: String): GpuMetric =
def createSizeMetric(level: MetricsLevel, name: String): GpuMetric =
createMetricInternal(level, SQLMetrics.createSizeMetric(sparkContext, name))

protected def createAverageMetric(level: MetricsLevel, name: String): GpuMetric =
def createAverageMetric(level: MetricsLevel, name: String): GpuMetric =
createMetricInternal(level, SQLMetrics.createAverageMetric(sparkContext, name))

protected def createTimingMetric(level: MetricsLevel, name: String): GpuMetric =
def createTimingMetric(level: MetricsLevel, name: String): GpuMetric =
createMetricInternal(level, SQLMetrics.createTimingMetric(sparkContext, name))

protected def createFileCacheMetrics(): Map[String, GpuMetric] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.spark.shuffle.{ShuffleWriter, _}
import org.apache.spark.shuffle.api._
import org.apache.spark.shuffle.sort.{BypassMergeSortShuffleHandle, SortShuffleManager}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.{METRIC_DATA_READ_SIZE, METRIC_DATA_SIZE, METRIC_SHUFFLE_COMBINE_TIME, METRIC_SHUFFLE_DESERIALIZATION_TIME, METRIC_SHUFFLE_READ_TIME, METRIC_SHUFFLE_SERIALIZATION_TIME, METRIC_SHUFFLE_WRITE_IO_TIME, METRIC_SHUFFLE_WRITE_TIME}
import org.apache.spark.sql.rapids.shims.{GpuShuffleBlockResolver, RapidsShuffleThreadedReader, RapidsShuffleThreadedWriter}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.storage.{RapidsShuffleBlockFetcherIterator, _}
Expand Down Expand Up @@ -246,13 +247,13 @@ abstract class RapidsShuffleThreadedWriterBase[K, V](
with RapidsShuffleWriterShimHelper {
private val metrics = handle.metrics
private val serializationTimeMetric =
metrics.get("rapidsShuffleSerializationTime")
metrics.get(METRIC_SHUFFLE_SERIALIZATION_TIME)
private val shuffleWriteTimeMetric =
metrics.get("rapidsShuffleWriteTime")
metrics.get(METRIC_SHUFFLE_WRITE_TIME)
private val shuffleCombineTimeMetric =
metrics.get("rapidsShuffleCombineTime")
metrics.get(METRIC_SHUFFLE_COMBINE_TIME)
private val ioTimeMetric =
metrics.get("rapidsShuffleWriteIoTime")
metrics.get(METRIC_SHUFFLE_WRITE_IO_TIME)
private val dep: ShuffleDependency[K, V, V] = handle.dependency
private val shuffleId = dep.shuffleId
private val partitioner = dep.partitioner
Expand Down Expand Up @@ -596,9 +597,9 @@ abstract class RapidsShuffleThreadedReaderBase[K, C](

private val sqlMetrics = handle.metrics
private val dep = handle.dependency
private val deserializationTimeNs = sqlMetrics.get("rapidsShuffleDeserializationTime")
private val shuffleReadTimeNs = sqlMetrics.get("rapidsShuffleReadTime")
private val dataReadSize = sqlMetrics.get("dataReadSize")
private val deserializationTimeNs = sqlMetrics.get(METRIC_SHUFFLE_DESERIALIZATION_TIME)
private val shuffleReadTimeNs = sqlMetrics.get(METRIC_SHUFFLE_READ_TIME)
private val dataReadSize = sqlMetrics.get(METRIC_DATA_READ_SIZE)

private var shuffleReadRange: NvtxRange =
new NvtxRange("ThreadedReader.read", NvtxColor.PURPLE)
Expand Down Expand Up @@ -1048,7 +1049,7 @@ class RapidsCachingWriter[K, V](
metrics: Map[String, SQLMetric])
extends RapidsCachingWriterBase[K, V](blockManager, handle, mapId, rapidsShuffleServer, catalog) {

private val uncompressedMetric: SQLMetric = metrics("dataSize")
private val uncompressedMetric: SQLMetric = metrics(METRIC_DATA_SIZE)

// This is here for the special case where we have no columns like with the .count
// case or when we have 0-byte columns. We pick 100 as an arbitrary number so that
Expand Down
Loading

0 comments on commit 30c4ddb

Please sign in to comment.