From 0b7857d9ee88faf4d6ecdb86e64d2f46db0f08f7 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Tue, 26 Sep 2023 20:27:40 +0200 Subject: [PATCH] Improve observation metrics in Spark UI --- .../sql/catalyst/expressions/UnsafeRow.java | 3 ++- .../expressions/SpecificInternalRow.scala | 16 ++++--------- .../spark/sql/catalyst/expressions/rows.scala | 11 +++++++++ .../execution/AggregatingAccumulator.scala | 24 +++++++++++++++++-- .../sql/execution/CollectMetricsExec.scala | 2 +- .../org/apache/spark/sql/DatasetSuite.scala | 14 +++++++++-- 6 files changed, 53 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index d2433292fc7bd..d2a26fd5549d7 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -36,6 +36,7 @@ import org.apache.spark.unsafe.hash.Murmur3_x86_32; import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; +import scala.collection.immutable.Seq; import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET; @@ -557,7 +558,7 @@ public byte[] getBytes() { // This is for debugging @Override public String toString() { - StringBuilder build = new StringBuilder("["); + StringBuilder build = new StringBuilder("UnsafeRow["); for (int i = 0; i < sizeInBytes; i += 8) { if (i != 0) build.append(','); build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, baseOffset + i))); diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala index fb6ebc899d8fa..1e5f30438738c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala @@ -220,17 +220,7 @@ final class SpecificInternalRow(val values: Array[MutableValue]) extends BaseGen def this() = this(Seq.empty) - def this(schema: StructType) = { - // SPARK-32550: use while loop instead of map - this(new Array[MutableValue](schema.fields.length)) - val length = values.length - val fields = schema.fields - var i = 0 - while (i < length) { - values(i) = dataTypeToMutableValue(fields(i).dataType) - i += 1 - } - } + def this(schema: StructType) = this(schema.fields.map(_.dataType)) override def numFields: Int = values.length @@ -319,4 +309,8 @@ final class SpecificInternalRow(val values: Array[MutableValue]) extends BaseGen override def getByte(i: Int): Byte = { values(i).asInstanceOf[MutableByte].value } + + override def toString: String = { + s"specrow=${0.to(numFields).map(i => values(i).boxed.toString).mkString("[", ", ", "]")}" + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala index 09d78f79edd17..7862fd3b562ab 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala @@ -178,3 +178,14 @@ class GenericInternalRow(val values: Array[Any]) extends BaseGenericInternalRow override def update(i: Int, value: Any): Unit = { values(i) = value } } + +/** + * An internal row implementation as GenericInternalRow plus data type information. + */ +class GenericInternalRowWithDataType(override val values: Array[Any], val dataTypes: Seq[DataType]) + extends GenericInternalRow { + /** No-arg constructor for serialization. */ + protected def this() = this(null, null) + + override def toString: String = "GenericInternalRowWithDataType: " + toSeq(dataTypes).mkString("[", ", ", "]") +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala index 667d1a67b3932..4b3f1b200e066 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql.execution import scala.collection.mutable - -import org.apache.spark.TaskContext +import org.apache.spark.{InternalAccumulator, TaskContext} +import org.apache.spark.scheduler.AccumulableInfo import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSeq, BindReferences, Expression, InterpretedMutableProjection, InterpretedUnsafeProjection, JoinedRow, MutableProjection, NamedExpression, Projection, SpecificInternalRow} import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, DeclarativeAggregate, ImperativeAggregate, NoOp, TypedImperativeAggregate} @@ -230,6 +230,26 @@ class AggregatingAccumulator private( buffer = other.buffer joinedRow = other.joinedRow } + + /** + * Creates an [[AccumulableInfo]] representation of this [[AccumulatorV2]] with the provided + * values. + */ + override private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = { + if (update.exists(!_.isInstanceOf[InternalRow])) { + throw new IllegalArgumentException(s"AggregatingAccumulator has InternalRow values, " + + s"not ${update.get.getClass}") + } + if (value.exists(!_.isInstanceOf[InternalRow])) { + throw new IllegalArgumentException(s"AggregatingAccumulator has InternalRow values, " + + s"not ${value.get.getClass}") + } + val isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) + val updateStrings = update.map(_.asInstanceOf[InternalRow].toSeq(schema)) + val valueStrings = value.map(_.asInstanceOf[InternalRow].toSeq(schema)) + AccumulableInfo(id, name, updateStrings.map(_.mkString("[", ", ", "]")), + valueStrings.map(_.mkString("[", ", ", "]")), isInternal, countFailedValues) + } } object AggregatingAccumulator { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CollectMetricsExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CollectMetricsExec.scala index dc918e51d0550..851fb5569b1d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CollectMetricsExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CollectMetricsExec.scala @@ -38,7 +38,7 @@ case class CollectMetricsExec( private lazy val accumulator: AggregatingAccumulator = { val acc = AggregatingAccumulator(metricExpressions, child.output) - acc.register(sparkContext, Option("Collected metrics")) + acc.register(sparkContext, Option(name)) acc } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index d22397f86b21d..5f8c831047392 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -19,10 +19,8 @@ package org.apache.spark.sql import java.io.{Externalizable, ObjectInput, ObjectOutput} import java.sql.{Date, Timestamp} - import scala.reflect.ClassTag import scala.util.Random - import org.apache.hadoop.fs.{Path, PathFilter} import org.scalatest.Assertions._ import org.scalatest.exceptions.TestFailedException @@ -448,6 +446,18 @@ class DatasetSuite extends QueryTest assert(ds.schema.equals(ds2.schema)) } + test("accumulators") { + val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS() + val d = spark.sparkContext.doubleAccumulator("double") + val c = spark.sparkContext.collectionAccumulator("coll") + val l = spark.sparkContext.longAccumulator("long") + spark.sparkContext + ds.observe(Observation("Rows"), count("*")) + .observe(Observation(), count("*")) + .foreach(v => d.add(v._2)) + Thread.sleep(60000) + } + test("foreach") { val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS() val acc = sparkContext.longAccumulator