Skip to content

Commit

Permalink
Make the maximum string length limit configurable in case of JsonProt…
Browse files Browse the repository at this point in the history
…ocol
  • Loading branch information
roczei committed Nov 16, 2024
1 parent 9858ab6 commit 7566efd
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2809,4 +2809,13 @@ package object config {
.version("4.0.0")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional

val SPARK_JSON_PROTOCOL_MAX_STRING_LENGTH =
ConfigBuilder("spark.jsonProtocol.maxStringLen")
.doc("")
.version("4.0.0")
.intConf
.checkValue(v => v > 0, "The value should be a positive integer.")
.createWithDefault(Int.MaxValue)

}
30 changes: 24 additions & 6 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import java.util.{Properties, UUID}
import scala.collection.Map
import scala.jdk.CollectionConverters._

import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.core.{JsonFactory, JsonGenerator, StreamReadConstraints}
import com.fasterxml.jackson.databind.{DeserializationFeature, JsonNode, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.json4s.jackson.JsonMethods.compact

import org.apache.spark._
import org.apache.spark.executor._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.metrics.ExecutorMetricType
import org.apache.spark.rdd.{DeterministicLevel, RDDOperationScope}
Expand All @@ -43,9 +45,11 @@ import org.apache.spark.util.Utils.weakIntern
* We use this instead of passing SparkConf directly because it lets us avoid
* repeated re-parsing of configuration values on each read.
*/
private[spark] class JsonProtocolOptions(conf: SparkConf) {
private[spark] class JsonProtocolOptions(conf: SparkConf) extends Logging {
val includeTaskMetricsAccumulators: Boolean =
conf.get(EVENT_LOG_INCLUDE_TASK_METRICS_ACCUMULATORS)

val maxJsonStringLength: Int = conf.get(SPARK_JSON_PROTOCOL_MAX_STRING_LENGTH)
}

/**
Expand All @@ -63,11 +67,24 @@ private[spark] class JsonProtocolOptions(conf: SparkConf) {
* - Any new JSON fields should be optional; use `jsonOption` when reading these fields
* in `*FromJson` methods.
*/
private[spark] object JsonProtocol extends JsonUtils {
private[spark] object JsonProtocol extends JsonUtils with Logging {
// TODO: Remove this file and put JSON serialization into each individual class.

private[util]
val defaultOptions: JsonProtocolOptions = new JsonProtocolOptions(new SparkConf(false))
val defaultOptions: JsonProtocolOptions = new JsonProtocolOptions(new SparkConf(true))

private val streamReadConstraints: StreamReadConstraints = StreamReadConstraints
.builder()
.maxStringLength(defaultOptions.maxJsonStringLength)
.build()

private val jsonFactory = new JsonFactory().setStreamReadConstraints(streamReadConstraints)

logInfo(s"maxJsonStringLength: ${defaultOptions.maxJsonStringLength}")

protected override val mapper: ObjectMapper = new ObjectMapper(jsonFactory)
.registerModule(DefaultScalaModule)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)

/** ------------------------------------------------- *
* JSON serialization methods for SparkListenerEvents |
Expand Down Expand Up @@ -932,6 +949,7 @@ private[spark] object JsonProtocol extends JsonUtils {

def sparkEventFromJson(json: String): SparkListenerEvent = {
sparkEventFromJson(mapper.readTree(json))

}

def sparkEventFromJson(json: JsonNode): SparkListenerEvent = {
Expand Down Expand Up @@ -1692,4 +1710,4 @@ private[spark] object JsonProtocol extends JsonUtils {
json.textValue
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -824,4 +824,4 @@ class LevelDBBackendHistoryServerSuite extends HistoryServerSuite {
class RocksDBBackendHistoryServerSuite extends HistoryServerSuite {
override protected def diskBackend: History.HybridStoreDiskBackend.Value =
HybridStoreDiskBackend.ROCKSDB
}
}

0 comments on commit 7566efd

Please sign in to comment.