From 1c23cb7a148d345dbcc8812cacbb0a593d050f3f Mon Sep 17 00:00:00 2001 From: Gabor Roczei Date: Tue, 15 Oct 2024 15:21:24 +0200 Subject: [PATCH] Make the maximum string length limit configurable in case of Spark History Server --- .../spark/deploy/history/HistoryServer.scala | 3 ++ .../spark/internal/config/package.scala | 9 ++++++ .../spark/scheduler/ReplayListenerBus.scala | 2 ++ .../org/apache/spark/util/JsonProtocol.scala | 29 ++++++++++++++++--- .../deploy/history/HistoryServerSuite.scala | 12 ++++++++ 5 files changed, 51 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 86e738d986931..60e8bac154926 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -31,6 +31,7 @@ import org.apache.spark.deploy.Utils.addRenderLogHandler import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys._ import org.apache.spark.internal.config.History +import org.apache.spark.internal.config.SPARK_JSON_PROTOCOL_MAX_STRING_LENGTH import org.apache.spark.internal.config.UI._ import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, UIRoot} import org.apache.spark.ui.{SparkUI, UIUtils, WebUI} @@ -296,6 +297,8 @@ object HistoryServer extends Logging { val UI_PATH_PREFIX = "/history" + val maxJsonStringLength = conf.get(SPARK_JSON_PROTOCOL_MAX_STRING_LENGTH) + def main(argStrings: Array[String]): Unit = { Utils.resetStructuredLogging() Utils.initDaemon(log) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index c58c371da20cf..813d02ed09ccc 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2809,4 +2809,13 @@ package object config { .version("4.0.0") .timeConf(TimeUnit.MILLISECONDS) .createOptional + + val SPARK_JSON_PROTOCOL_MAX_STRING_LENGTH = + ConfigBuilder("spark.jsonProtocol.maxStringLen") + .doc("") + .version("4.0.0") + .intConf + .checkValue(v => v > 0, "The value should be a positive integer.") + .createWithDefault(Int.MaxValue) + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index 26c7486010c02..4a0605f2b938a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -86,7 +86,9 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { currentLine = entry._1 lineNumber = entry._2 + 1 + logInfo(s"sparkEventFromJson - json: ------------START-------------") postToAll(JsonProtocol.sparkEventFromJson(currentLine)) + logInfo(s"sparkEventFromJson - json: ------------END-------------") } catch { case e: ClassNotFoundException => // Ignore unknown events, parse through the event log file. diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index e30380f41566a..10b3c15362e23 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -22,12 +22,15 @@ import java.util.{Properties, UUID} import scala.collection.Map import scala.jdk.CollectionConverters._ -import com.fasterxml.jackson.core.JsonGenerator -import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.core.{JsonFactory, JsonGenerator, StreamReadConstraints} +import com.fasterxml.jackson.databind.{DeserializationFeature, JsonNode, ObjectMapper} +import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.json4s.jackson.JsonMethods.compact import org.apache.spark._ +import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.executor._ +import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.metrics.ExecutorMetricType import org.apache.spark.rdd.{DeterministicLevel, RDDOperationScope} @@ -43,9 +46,10 @@ import org.apache.spark.util.Utils.weakIntern * We use this instead of passing SparkConf directly because it lets us avoid * repeated re-parsing of configuration values on each read. */ -private[spark] class JsonProtocolOptions(conf: SparkConf) { +private[spark] class JsonProtocolOptions(conf: SparkConf) extends Logging { val includeTaskMetricsAccumulators: Boolean = conf.get(EVENT_LOG_INCLUDE_TASK_METRICS_ACCUMULATORS) + } /** @@ -63,12 +67,26 @@ private[spark] class JsonProtocolOptions(conf: SparkConf) { * - Any new JSON fields should be optional; use `jsonOption` when reading these fields * in `*FromJson` methods. */ -private[spark] object JsonProtocol extends JsonUtils { +private[spark] object JsonProtocol extends JsonUtils with Logging { // TODO: Remove this file and put JSON serialization into each individual class. private[util] val defaultOptions: JsonProtocolOptions = new JsonProtocolOptions(new SparkConf(false)) + private val streamReadConstraints: StreamReadConstraints = StreamReadConstraints + .builder() + .maxStringLength(HistoryServer.maxJsonStringLength) + .build() + + private val jsonFactory = new JsonFactory().setStreamReadConstraints(streamReadConstraints) + + logInfo("BBB") + logInfo(s"HistoryServer.maxJsonStringLength: ${HistoryServer.maxJsonStringLength}") + + protected override val mapper: ObjectMapper = new ObjectMapper(jsonFactory) + .registerModule(DefaultScalaModule) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + /** ------------------------------------------------- * * JSON serialization methods for SparkListenerEvents | * -------------------------------------------------- */ @@ -931,7 +949,10 @@ private[spark] object JsonProtocol extends JsonUtils { } def sparkEventFromJson(json: String): SparkListenerEvent = { + logInfo("CCCCC") + logInfo(s"sparkEventFromJson - json: ${json}") sparkEventFromJson(mapper.readTree(json)) + } def sparkEventFromJson(json: JsonNode): SparkListenerEvent = { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 6b2bd90cd4314..5a6c6d055ee8c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -45,6 +45,7 @@ import org.scalatestplus.selenium.WebBrowser import org.apache.spark._ import org.apache.spark.internal.config._ import org.apache.spark.internal.config.History._ +import org.apache.spark.internal.config.SPARK_JSON_PROTOCOL_MAX_STRING_LENGTH import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.internal.config.UI._ import org.apache.spark.status.api.v1.ApplicationInfo @@ -79,6 +80,7 @@ abstract class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with private var port: Int = -1 protected def diskBackend: HybridStoreDiskBackend.Value + protected val maxJsonStringLength : Int = Int.MaxValue def getExpRoot: File = expRoot @@ -93,6 +95,7 @@ abstract class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with .set(EVENT_LOG_STAGE_EXECUTOR_METRICS, true) .set(EXECUTOR_PROCESS_TREE_METRICS_ENABLED, true) .set(HYBRID_STORE_DISK_BACKEND, diskBackend.toString) + .set(SPARK_JSON_PROTOCOL_MAX_STRING_LENGTH, maxJsonStringLength) conf.setAll(extraConf) provider = new FsHistoryProvider(conf) provider.checkForLogs() @@ -825,3 +828,12 @@ class RocksDBBackendHistoryServerSuite extends HistoryServerSuite { override protected def diskBackend: History.HybridStoreDiskBackend.Value = HybridStoreDiskBackend.ROCKSDB } + +@WebBrowserTest +class LevelDBBackendHistoryLimitTheMaxJsonStringServerSuite extends HistoryServerSuite { + override protected val maxJsonStringLength: Int = 100 + override protected def diskBackend: History.HybridStoreDiskBackend.Value = + HybridStoreDiskBackend.LEVELDB + +} +