Skip to content

Commit

Permalink
Improve observation metrics in Spark UI
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Sep 26, 2023
1 parent f1cdcfa commit 0b7857d
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
import scala.collection.immutable.Seq;

import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET;

Expand Down Expand Up @@ -557,7 +558,7 @@ public byte[] getBytes() {
// This is for debugging
@Override
public String toString() {
StringBuilder build = new StringBuilder("[");
StringBuilder build = new StringBuilder("UnsafeRow[");
for (int i = 0; i < sizeInBytes; i += 8) {
if (i != 0) build.append(',');
build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, baseOffset + i)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,17 +220,7 @@ final class SpecificInternalRow(val values: Array[MutableValue]) extends BaseGen

def this() = this(Seq.empty)

def this(schema: StructType) = {
// SPARK-32550: use while loop instead of map
this(new Array[MutableValue](schema.fields.length))
val length = values.length
val fields = schema.fields
var i = 0
while (i < length) {
values(i) = dataTypeToMutableValue(fields(i).dataType)
i += 1
}
}
def this(schema: StructType) = this(schema.fields.map(_.dataType))

override def numFields: Int = values.length

Expand Down Expand Up @@ -319,4 +309,8 @@ final class SpecificInternalRow(val values: Array[MutableValue]) extends BaseGen
override def getByte(i: Int): Byte = {
values(i).asInstanceOf[MutableByte].value
}

override def toString: String = {
s"specrow=${0.to(numFields).map(i => values(i).boxed.toString).mkString("[", ", ", "]")}"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,14 @@ class GenericInternalRow(val values: Array[Any]) extends BaseGenericInternalRow

override def update(i: Int, value: Any): Unit = { values(i) = value }
}

/**
* An internal row implementation as GenericInternalRow plus data type information.
*/
class GenericInternalRowWithDataType(override val values: Array[Any], val dataTypes: Seq[DataType])
extends GenericInternalRow {
/** No-arg constructor for serialization. */
protected def this() = this(null, null)

override def toString: String = "GenericInternalRowWithDataType: " + toSeq(dataTypes).mkString("[", ", ", "]")
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package org.apache.spark.sql.execution

import scala.collection.mutable

import org.apache.spark.TaskContext
import org.apache.spark.{InternalAccumulator, TaskContext}
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSeq, BindReferences, Expression, InterpretedMutableProjection, InterpretedUnsafeProjection, JoinedRow, MutableProjection, NamedExpression, Projection, SpecificInternalRow}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, DeclarativeAggregate, ImperativeAggregate, NoOp, TypedImperativeAggregate}
Expand Down Expand Up @@ -230,6 +230,26 @@ class AggregatingAccumulator private(
buffer = other.buffer
joinedRow = other.joinedRow
}

/**
* Creates an [[AccumulableInfo]] representation of this [[AccumulatorV2]] with the provided
* values.
*/
override private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
if (update.exists(!_.isInstanceOf[InternalRow])) {
throw new IllegalArgumentException(s"AggregatingAccumulator has InternalRow values, " +
s"not ${update.get.getClass}")
}
if (value.exists(!_.isInstanceOf[InternalRow])) {
throw new IllegalArgumentException(s"AggregatingAccumulator has InternalRow values, " +
s"not ${value.get.getClass}")
}
val isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))
val updateStrings = update.map(_.asInstanceOf[InternalRow].toSeq(schema))
val valueStrings = value.map(_.asInstanceOf[InternalRow].toSeq(schema))
AccumulableInfo(id, name, updateStrings.map(_.mkString("[", ", ", "]")),
valueStrings.map(_.mkString("[", ", ", "]")), isInternal, countFailedValues)
}
}

object AggregatingAccumulator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ case class CollectMetricsExec(

private lazy val accumulator: AggregatingAccumulator = {
val acc = AggregatingAccumulator(metricExpressions, child.output)
acc.register(sparkContext, Option("Collected metrics"))
acc.register(sparkContext, Option(name))
acc
}

Expand Down
14 changes: 12 additions & 2 deletions sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ package org.apache.spark.sql

import java.io.{Externalizable, ObjectInput, ObjectOutput}
import java.sql.{Date, Timestamp}

import scala.reflect.ClassTag
import scala.util.Random

import org.apache.hadoop.fs.{Path, PathFilter}
import org.scalatest.Assertions._
import org.scalatest.exceptions.TestFailedException
Expand Down Expand Up @@ -448,6 +446,18 @@ class DatasetSuite extends QueryTest
assert(ds.schema.equals(ds2.schema))
}

test("accumulators") {
val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS()
val d = spark.sparkContext.doubleAccumulator("double")
val c = spark.sparkContext.collectionAccumulator("coll")
val l = spark.sparkContext.longAccumulator("long")
spark.sparkContext
ds.observe(Observation("Rows"), count("*"))
.observe(Observation(), count("*"))
.foreach(v => d.add(v._2))
Thread.sleep(60000)
}

test("foreach") {
val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS()
val acc = sparkContext.longAccumulator
Expand Down

0 comments on commit 0b7857d

Please sign in to comment.