Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for kudo write metrics #11784

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.{CoalescedPartitionSpec, ShufflePartitionS
import org.apache.spark.sql.execution.exchange.Exchange
import org.apache.spark.sql.execution.metric.{SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
import org.apache.spark.sql.rapids.execution.{GpuShuffleExchangeExecBase, ShuffledBatchRDD}
import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.createAdditionalExchangeMetris
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.ThreadUtils

Expand Down Expand Up @@ -71,22 +72,11 @@ case class GpuOptimizeWriteExchangeExec(
private[sql] lazy val readMetrics =
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)

override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
"dataSize" -> createSizeMetric(ESSENTIAL_LEVEL, "data size"),
"dataReadSize" -> createSizeMetric(MODERATE_LEVEL, "data read size"),
"rapidsShuffleSerializationTime" ->
createNanoTimingMetric(DEBUG_LEVEL, "rs. serialization time"),
"rapidsShuffleDeserializationTime" ->
createNanoTimingMetric(DEBUG_LEVEL, "rs. deserialization time"),
"rapidsShuffleWriteTime" ->
createNanoTimingMetric(ESSENTIAL_LEVEL, "rs. shuffle write time"),
"rapidsShuffleCombineTime" ->
createNanoTimingMetric(DEBUG_LEVEL, "rs. shuffle combine time"),
"rapidsShuffleWriteIoTime" ->
createNanoTimingMetric(DEBUG_LEVEL, "rs. shuffle write io time"),
"rapidsShuffleReadTime" ->
createNanoTimingMetric(ESSENTIAL_LEVEL, "rs. shuffle read time")
) ++ GpuMetric.wrap(readMetrics) ++ GpuMetric.wrap(writeMetrics)
override lazy val additionalMetrics : Map[String, GpuMetric] = {
createAdditionalExchangeMetris(this) ++
GpuMetric.wrap(readMetrics) ++
GpuMetric.wrap(writeMetrics)
}

override lazy val allMetrics: Map[String, GpuMetric] = {
Map(
Expand All @@ -98,7 +88,7 @@ case class GpuOptimizeWriteExchangeExec(
}

private lazy val serializer: Serializer =
new GpuColumnarBatchSerializer(gpuLongMetric("dataSize"),
new GpuColumnarBatchSerializer(allMetrics,
child.output.map(_.dataType).toArray,
RapidsConf.SHUFFLE_KUDO_SERIALIZER_ENABLED.get(child.conf))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion
import com.nvidia.spark.rapids.jni.kudo.{KudoSerializer, KudoTable, KudoTableHeader}

import org.apache.spark.TaskContext

import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer, SerializerInstance}
import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.{METRIC_DATA_SIZE, METRIC_SHUFFLE_DESERIALIZATION_TIME, METRIC_SHUFFLE_SER_CALC_HEADER_TIME, METRIC_SHUFFLE_SER_COPY_BUFFER_TIME, METRIC_SHUFFLE_SER_COPY_HEADER_TIME, METRIC_SHUFFLE_SERIALIZATION_TIME}
import org.apache.spark.sql.types.{DataType, NullType}
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand All @@ -45,7 +46,7 @@ trait BaseSerializedTableIterator extends Iterator[(Int, ColumnarBatch)] {
def peekNextBatchSize(): Option[Long]
}

class SerializedBatchIterator(dIn: DataInputStream)
class SerializedBatchIterator(dIn: DataInputStream, deserTime: GpuMetric)
extends BaseSerializedTableIterator {
private[this] var nextHeader: Option[SerializedTableHeader] = None
private[this] var toBeReturned: Option[ColumnarBatch] = None
Expand All @@ -60,7 +61,7 @@ class SerializedBatchIterator(dIn: DataInputStream)
}
}

override def peekNextBatchSize(): Option[Long] = {
override def peekNextBatchSize(): Option[Long] = deserTime.ns {
if (streamClosed) {
None
} else {
Expand All @@ -80,7 +81,7 @@ class SerializedBatchIterator(dIn: DataInputStream)
}
}

private def tryReadNext(): Option[ColumnarBatch] = {
private def tryReadNext(): Option[ColumnarBatch] = deserTime.ns {
if (nextHeader.isEmpty) {
None
} else {
Expand Down Expand Up @@ -137,26 +138,32 @@ class SerializedBatchIterator(dIn: DataInputStream)
*
* @note The RAPIDS shuffle does not use this code.
*/
class GpuColumnarBatchSerializer(dataSize: GpuMetric, dataTypes: Array[DataType], useKudo: Boolean)
class GpuColumnarBatchSerializer(metrics: Map[String, GpuMetric], dataTypes: Array[DataType],
useKudo: Boolean)
extends Serializer with Serializable {
override def newInstance(): SerializerInstance = {
if (useKudo) {
new KudoSerializerInstance(dataSize, dataTypes)
new KudoSerializerInstance(metrics, dataTypes)
} else {
new GpuColumnarBatchSerializerInstance(dataSize)
new GpuColumnarBatchSerializerInstance(metrics)
}
}

override def supportsRelocationOfSerializedObjects: Boolean = true
}

private class GpuColumnarBatchSerializerInstance(dataSize: GpuMetric) extends SerializerInstance {
private class GpuColumnarBatchSerializerInstance(metrics: Map[String, GpuMetric]) extends
SerializerInstance {
private val dataSize = metrics(METRIC_DATA_SIZE)
private val serTime = metrics(METRIC_SHUFFLE_SERIALIZATION_TIME)
private val deserTime = metrics(METRIC_SHUFFLE_DESERIALIZATION_TIME)


override def serializeStream(out: OutputStream): SerializationStream = new SerializationStream {
private[this] val dOut: DataOutputStream =
new DataOutputStream(new BufferedOutputStream(out))

override def writeValue[T: ClassTag](value: T): SerializationStream = {
override def writeValue[T: ClassTag](value: T): SerializationStream = serTime.ns {
val batch = value.asInstanceOf[ColumnarBatch]
val numColumns = batch.numCols()
val columns: Array[HostColumnVector] = new Array(numColumns)
Expand Down Expand Up @@ -239,7 +246,7 @@ private class GpuColumnarBatchSerializerInstance(dataSize: GpuMetric) extends Se
private[this] val dIn: DataInputStream = new DataInputStream(new BufferedInputStream(in))

override def asKeyValueIterator: Iterator[(Int, ColumnarBatch)] = {
new SerializedBatchIterator(dIn)
new SerializedBatchIterator(dIn, deserTime)
}

override def asIterator: Iterator[Any] = {
Expand Down Expand Up @@ -339,16 +346,22 @@ object SerializedTableColumn {
* @param dataTypes data types of the columns in the batch
*/
private class KudoSerializerInstance(
val dataSize: GpuMetric,
val metrics: Map[String, GpuMetric],
val dataTypes: Array[DataType]) extends SerializerInstance {
private val dataSize = metrics(METRIC_DATA_SIZE)
private val serTime = metrics(METRIC_SHUFFLE_SERIALIZATION_TIME)
private val serCalcHeaderTime = metrics(METRIC_SHUFFLE_SER_CALC_HEADER_TIME)
private val serCopyHeaderTime = metrics(METRIC_SHUFFLE_SER_COPY_HEADER_TIME)
private val serCopyBufferTime = metrics(METRIC_SHUFFLE_SER_COPY_BUFFER_TIME)
private val deserTime = metrics(METRIC_SHUFFLE_DESERIALIZATION_TIME)

private lazy val kudo = new KudoSerializer(GpuColumnVector.from(dataTypes))

override def serializeStream(out: OutputStream): SerializationStream = new SerializationStream {
private[this] val dOut: DataOutputStream =
new DataOutputStream(new BufferedOutputStream(out))

override def writeValue[T: ClassTag](value: T): SerializationStream = {
override def writeValue[T: ClassTag](value: T): SerializationStream = serTime.ns {
val batch = value.asInstanceOf[ColumnarBatch]
val numColumns = batch.numCols()
val columns: Array[HostColumnVector] = new Array(numColumns)
Expand Down Expand Up @@ -380,7 +393,12 @@ private class KudoSerializerInstance(
}

withResource(new NvtxRange("Serialize Batch", NvtxColor.YELLOW)) { _ =>
dataSize += kudo.writeToStream(columns, dOut, startRow, numRows)
val writeMetric = kudo.writeToStream(columns, dOut, startRow, numRows)

dataSize += writeMetric.getWrittenBytes
serCalcHeaderTime += writeMetric.getCalcHeaderTime
serCopyHeaderTime += writeMetric.getCopyHeaderTime
serCopyBufferTime += writeMetric.getCopyBufferTime
}
} else {
withResource(new NvtxRange("Serialize Row Only Batch", NvtxColor.YELLOW)) { _ =>
Expand Down Expand Up @@ -422,7 +440,7 @@ private class KudoSerializerInstance(
private[this] val dIn: DataInputStream = new DataInputStream(new BufferedInputStream(in))

override def asKeyValueIterator: Iterator[(Int, ColumnarBatch)] = {
new KudoSerializedBatchIterator(dIn)
new KudoSerializedBatchIterator(dIn, deserTime)
}

override def asIterator: Iterator[Any] = {
Expand Down Expand Up @@ -495,7 +513,7 @@ object KudoSerializedTableColumn {
}
}

class KudoSerializedBatchIterator(dIn: DataInputStream)
class KudoSerializedBatchIterator(dIn: DataInputStream, deserTime: GpuMetric)
extends BaseSerializedTableIterator {
private[this] var nextHeader: Option[KudoTableHeader] = None
private[this] var toBeReturned: Option[ColumnarBatch] = None
Expand All @@ -510,7 +528,7 @@ class KudoSerializedBatchIterator(dIn: DataInputStream)
}
}

override def peekNextBatchSize(): Option[Long] = {
override def peekNextBatchSize(): Option[Long] = deserTime.ns {
if (streamClosed) {
None
} else {
Expand All @@ -530,7 +548,7 @@ class KudoSerializedBatchIterator(dIn: DataInputStream)
}
}

private def tryReadNext(): Option[ColumnarBatch] = {
private def tryReadNext(): Option[ColumnarBatch] = deserTime.ns {
if (nextHeader.isEmpty) {
None
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion
import com.nvidia.spark.rapids.format.TableMeta
import com.nvidia.spark.rapids.shuffle.{RapidsShuffleRequestHandler, RapidsShuffleServer, RapidsShuffleTransport}

import org.apache.spark.{InterruptibleIterator, MapOutputTracker, ShuffleDependency, SparkConf, SparkEnv, TaskContext}

import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.io.CompressionCodec
Expand All @@ -42,6 +42,7 @@ import org.apache.spark.shuffle.{ShuffleWriter, _}
import org.apache.spark.shuffle.api._
import org.apache.spark.shuffle.sort.{BypassMergeSortShuffleHandle, SortShuffleManager}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.{METRIC_DATA_READ_SIZE, METRIC_DATA_SIZE, METRIC_SHUFFLE_COMBINE_TIME, METRIC_SHUFFLE_DESERIALIZATION_TIME, METRIC_SHUFFLE_READ_TIME, METRIC_SHUFFLE_SERIALIZATION_TIME, METRIC_SHUFFLE_WRITE_IO_TIME, METRIC_SHUFFLE_WRITE_TIME}
import org.apache.spark.sql.rapids.shims.{GpuShuffleBlockResolver, RapidsShuffleThreadedReader, RapidsShuffleThreadedWriter}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.storage.{RapidsShuffleBlockFetcherIterator, _}
Expand Down Expand Up @@ -245,14 +246,12 @@ abstract class RapidsShuffleThreadedWriterBase[K, V](
extends RapidsShuffleWriter[K, V]
with RapidsShuffleWriterShimHelper {
private val metrics = handle.metrics
private val serializationTimeMetric =
metrics.get("rapidsShuffleSerializationTime")
private val shuffleWriteTimeMetric =
metrics.get("rapidsShuffleWriteTime")
metrics.get(METRIC_SHUFFLE_WRITE_TIME)
private val shuffleCombineTimeMetric =
metrics.get("rapidsShuffleCombineTime")
metrics.get(METRIC_SHUFFLE_COMBINE_TIME)
private val ioTimeMetric =
metrics.get("rapidsShuffleWriteIoTime")
metrics.get(METRIC_SHUFFLE_WRITE_IO_TIME)
private val dep: ShuffleDependency[K, V, V] = handle.dependency
private val shuffleId = dep.shuffleId
private val partitioner = dep.partitioner
Expand Down Expand Up @@ -429,11 +428,9 @@ abstract class RapidsShuffleThreadedWriterBase[K, V](
// counted in the ioTime
val totalPerRecordWriteTime = recordWriteTime.get() + ioTimeNs
val ioRatio = (ioTimeNs.toDouble/totalPerRecordWriteTime)
val serializationRatio = 1.0 - ioRatio

// update metrics, note that we expect them to be relative to the task
ioTimeMetric.foreach(_ += (ioRatio * writeTimeNs).toLong)
serializationTimeMetric.foreach(_ += (serializationRatio * writeTimeNs).toLong)
// we add all three here because this metric is meant to show the time
// we are blocked on writes
shuffleWriteTimeMetric.foreach(_ += (writeTimeNs + combineTimeNs))
Expand Down Expand Up @@ -596,9 +593,8 @@ abstract class RapidsShuffleThreadedReaderBase[K, C](

private val sqlMetrics = handle.metrics
private val dep = handle.dependency
private val deserializationTimeNs = sqlMetrics.get("rapidsShuffleDeserializationTime")
private val shuffleReadTimeNs = sqlMetrics.get("rapidsShuffleReadTime")
private val dataReadSize = sqlMetrics.get("dataReadSize")
private val shuffleReadTimeNs = sqlMetrics.get(METRIC_SHUFFLE_READ_TIME)
private val dataReadSize = sqlMetrics.get(METRIC_DATA_READ_SIZE)

private var shuffleReadRange: NvtxRange =
new NvtxRange("ThreadedReader.read", NvtxColor.PURPLE)
Expand Down Expand Up @@ -679,7 +675,6 @@ abstract class RapidsShuffleThreadedReaderBase[K, C](
}
val res = currentIter.next()
val fetchTime = System.nanoTime() - fetchTimeStart
deserializationTimeNs.foreach(_ += (fetchTime - readBlockedTime))
jlowe marked this conversation as resolved.
Show resolved Hide resolved
shuffleReadTimeNs.foreach(_ += fetchTime)
res
}
Expand Down Expand Up @@ -852,7 +847,6 @@ abstract class RapidsShuffleThreadedReaderBase[K, C](
case _ => 0 // TODO: do we need to handle other types here?
}
waitTime += System.nanoTime() - waitTimeStart
deserializationTimeNs.foreach(_ += waitTime)
shuffleReadTimeNs.foreach(_ += waitTime)
res
}
Expand Down Expand Up @@ -958,7 +952,6 @@ abstract class RapidsShuffleThreadedReaderBase[K, C](
}
// keep track of the overall metric which includes blocked time
val fetchTime = System.nanoTime() - fetchTimeStart
deserializationTimeNs.foreach(_ += (fetchTime - readBlockedTime))
shuffleReadTimeNs.foreach(_ += fetchTime)
}
}
Expand Down Expand Up @@ -1048,7 +1041,7 @@ class RapidsCachingWriter[K, V](
metrics: Map[String, SQLMetric])
extends RapidsCachingWriterBase[K, V](blockManager, handle, mapId, rapidsShuffleServer, catalog) {

private val uncompressedMetric: SQLMetric = metrics("dataSize")
private val uncompressedMetric: SQLMetric = metrics(METRIC_DATA_SIZE)

// This is here for the special case where we have no columns like with the .count
// case or when we have 0-byte columns. We pick 100 as an arbitrary number so that
Expand Down
Loading
Loading