Skip to content

Commit

Permalink
Make the maximum string length limit configurable in case of Spark Hi…
Browse files Browse the repository at this point in the history
…story Server
  • Loading branch information
roczei committed Nov 8, 2024
1 parent 9858ab6 commit 1c23cb7
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.deploy.Utils.addRenderLogHandler
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.internal.config.History
import org.apache.spark.internal.config.SPARK_JSON_PROTOCOL_MAX_STRING_LENGTH
import org.apache.spark.internal.config.UI._
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, UIRoot}
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
Expand Down Expand Up @@ -296,6 +297,8 @@ object HistoryServer extends Logging {

val UI_PATH_PREFIX = "/history"

val maxJsonStringLength = conf.get(SPARK_JSON_PROTOCOL_MAX_STRING_LENGTH)

def main(argStrings: Array[String]): Unit = {
Utils.resetStructuredLogging()
Utils.initDaemon(log)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2809,4 +2809,13 @@ package object config {
.version("4.0.0")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional

val SPARK_JSON_PROTOCOL_MAX_STRING_LENGTH =
ConfigBuilder("spark.jsonProtocol.maxStringLen")
.doc("")
.version("4.0.0")
.intConf
.checkValue(v => v > 0, "The value should be a positive integer.")
.createWithDefault(Int.MaxValue)

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
currentLine = entry._1
lineNumber = entry._2 + 1

logInfo(s"sparkEventFromJson - json: ------------START-------------")
postToAll(JsonProtocol.sparkEventFromJson(currentLine))
logInfo(s"sparkEventFromJson - json: ------------END-------------")
} catch {
case e: ClassNotFoundException =>
// Ignore unknown events, parse through the event log file.
Expand Down
29 changes: 25 additions & 4 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ import java.util.{Properties, UUID}
import scala.collection.Map
import scala.jdk.CollectionConverters._

import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.core.{JsonFactory, JsonGenerator, StreamReadConstraints}
import com.fasterxml.jackson.databind.{DeserializationFeature, JsonNode, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.json4s.jackson.JsonMethods.compact

import org.apache.spark._
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.executor._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.metrics.ExecutorMetricType
import org.apache.spark.rdd.{DeterministicLevel, RDDOperationScope}
Expand All @@ -43,9 +46,10 @@ import org.apache.spark.util.Utils.weakIntern
* We use this instead of passing SparkConf directly because it lets us avoid
* repeated re-parsing of configuration values on each read.
*/
private[spark] class JsonProtocolOptions(conf: SparkConf) {
private[spark] class JsonProtocolOptions(conf: SparkConf) extends Logging {
val includeTaskMetricsAccumulators: Boolean =
conf.get(EVENT_LOG_INCLUDE_TASK_METRICS_ACCUMULATORS)

}

/**
Expand All @@ -63,12 +67,26 @@ private[spark] class JsonProtocolOptions(conf: SparkConf) {
* - Any new JSON fields should be optional; use `jsonOption` when reading these fields
* in `*FromJson` methods.
*/
private[spark] object JsonProtocol extends JsonUtils {
private[spark] object JsonProtocol extends JsonUtils with Logging {
// TODO: Remove this file and put JSON serialization into each individual class.

private[util]
val defaultOptions: JsonProtocolOptions = new JsonProtocolOptions(new SparkConf(false))

private val streamReadConstraints: StreamReadConstraints = StreamReadConstraints
.builder()
.maxStringLength(HistoryServer.maxJsonStringLength)
.build()

private val jsonFactory = new JsonFactory().setStreamReadConstraints(streamReadConstraints)

logInfo("BBB")
logInfo(s"HistoryServer.maxJsonStringLength: ${HistoryServer.maxJsonStringLength}")

protected override val mapper: ObjectMapper = new ObjectMapper(jsonFactory)
.registerModule(DefaultScalaModule)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)

/** ------------------------------------------------- *
* JSON serialization methods for SparkListenerEvents |
* -------------------------------------------------- */
Expand Down Expand Up @@ -931,7 +949,10 @@ private[spark] object JsonProtocol extends JsonUtils {
}

def sparkEventFromJson(json: String): SparkListenerEvent = {
logInfo("CCCCC")
logInfo(s"sparkEventFromJson - json: ${json}")
sparkEventFromJson(mapper.readTree(json))

}

def sparkEventFromJson(json: JsonNode): SparkListenerEvent = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import org.scalatestplus.selenium.WebBrowser
import org.apache.spark._
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.History._
import org.apache.spark.internal.config.SPARK_JSON_PROTOCOL_MAX_STRING_LENGTH
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.internal.config.UI._
import org.apache.spark.status.api.v1.ApplicationInfo
Expand Down Expand Up @@ -79,6 +80,7 @@ abstract class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with
private var port: Int = -1

protected def diskBackend: HybridStoreDiskBackend.Value
protected val maxJsonStringLength : Int = Int.MaxValue

def getExpRoot: File = expRoot

Expand All @@ -93,6 +95,7 @@ abstract class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with
.set(EVENT_LOG_STAGE_EXECUTOR_METRICS, true)
.set(EXECUTOR_PROCESS_TREE_METRICS_ENABLED, true)
.set(HYBRID_STORE_DISK_BACKEND, diskBackend.toString)
.set(SPARK_JSON_PROTOCOL_MAX_STRING_LENGTH, maxJsonStringLength)
conf.setAll(extraConf)
provider = new FsHistoryProvider(conf)
provider.checkForLogs()
Expand Down Expand Up @@ -825,3 +828,12 @@ class RocksDBBackendHistoryServerSuite extends HistoryServerSuite {
override protected def diskBackend: History.HybridStoreDiskBackend.Value =
HybridStoreDiskBackend.ROCKSDB
}

@WebBrowserTest
class LevelDBBackendHistoryLimitTheMaxJsonStringServerSuite extends HistoryServerSuite {
override protected val maxJsonStringLength: Int = 100
override protected def diskBackend: History.HybridStoreDiskBackend.Value =
HybridStoreDiskBackend.LEVELDB

}

0 comments on commit 1c23cb7

Please sign in to comment.