From 7566efd33812cc65e1fbd22ada33d6a05b5a8b83 Mon Sep 17 00:00:00 2001 From: Gabor Roczei Date: Tue, 15 Oct 2024 15:21:24 +0200 Subject: [PATCH] Make the maximum string length limit configurable in case of JsonProtocol --- .../spark/internal/config/package.scala | 9 ++++++ .../org/apache/spark/util/JsonProtocol.scala | 30 +++++++++++++++---- .../deploy/history/HistoryServerSuite.scala | 2 +- 3 files changed, 34 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index c58c371da20cf..813d02ed09ccc 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2809,4 +2809,13 @@ package object config { .version("4.0.0") .timeConf(TimeUnit.MILLISECONDS) .createOptional + + val SPARK_JSON_PROTOCOL_MAX_STRING_LENGTH = + ConfigBuilder("spark.jsonProtocol.maxStringLen") + .doc("") + .version("4.0.0") + .intConf + .checkValue(v => v > 0, "The value should be a positive integer.") + .createWithDefault(Int.MaxValue) + } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index e30380f41566a..2349036d6b830 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -22,12 +22,14 @@ import java.util.{Properties, UUID} import scala.collection.Map import scala.jdk.CollectionConverters._ -import com.fasterxml.jackson.core.JsonGenerator -import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.core.{JsonFactory, JsonGenerator, StreamReadConstraints} +import com.fasterxml.jackson.databind.{DeserializationFeature, JsonNode, ObjectMapper} +import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.json4s.jackson.JsonMethods.compact import org.apache.spark._ import org.apache.spark.executor._ +import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.metrics.ExecutorMetricType import org.apache.spark.rdd.{DeterministicLevel, RDDOperationScope} @@ -43,9 +45,11 @@ import org.apache.spark.util.Utils.weakIntern * We use this instead of passing SparkConf directly because it lets us avoid * repeated re-parsing of configuration values on each read. */ -private[spark] class JsonProtocolOptions(conf: SparkConf) { +private[spark] class JsonProtocolOptions(conf: SparkConf) extends Logging { val includeTaskMetricsAccumulators: Boolean = conf.get(EVENT_LOG_INCLUDE_TASK_METRICS_ACCUMULATORS) + + val maxJsonStringLength: Int = conf.get(SPARK_JSON_PROTOCOL_MAX_STRING_LENGTH) } /** @@ -63,11 +67,24 @@ private[spark] class JsonProtocolOptions(conf: SparkConf) { * - Any new JSON fields should be optional; use `jsonOption` when reading these fields * in `*FromJson` methods. */ -private[spark] object JsonProtocol extends JsonUtils { +private[spark] object JsonProtocol extends JsonUtils with Logging { // TODO: Remove this file and put JSON serialization into each individual class. private[util] - val defaultOptions: JsonProtocolOptions = new JsonProtocolOptions(new SparkConf(false)) + val defaultOptions: JsonProtocolOptions = new JsonProtocolOptions(new SparkConf(true)) + + private val streamReadConstraints: StreamReadConstraints = StreamReadConstraints + .builder() + .maxStringLength(defaultOptions.maxJsonStringLength) + .build() + + private val jsonFactory = new JsonFactory().setStreamReadConstraints(streamReadConstraints) + + logInfo(s"maxJsonStringLength: ${defaultOptions.maxJsonStringLength}") + + protected override val mapper: ObjectMapper = new ObjectMapper(jsonFactory) + .registerModule(DefaultScalaModule) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) /** ------------------------------------------------- * * JSON serialization methods for SparkListenerEvents | @@ -932,6 +949,7 @@ private[spark] object JsonProtocol extends JsonUtils { def sparkEventFromJson(json: String): SparkListenerEvent = { sparkEventFromJson(mapper.readTree(json)) + } def sparkEventFromJson(json: JsonNode): SparkListenerEvent = { @@ -1692,4 +1710,4 @@ private[spark] object JsonProtocol extends JsonUtils { json.textValue } } -} + } \ No newline at end of file diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 6b2bd90cd4314..f8964ad719ffd 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -824,4 +824,4 @@ class LevelDBBackendHistoryServerSuite extends HistoryServerSuite { class RocksDBBackendHistoryServerSuite extends HistoryServerSuite { override protected def diskBackend: History.HybridStoreDiskBackend.Value = HybridStoreDiskBackend.ROCKSDB -} +} \ No newline at end of file