Skip to content

Commit

Permalink
Make the maximum string length limit configurable in case of JsonProt…
Browse files Browse the repository at this point in the history
…ocol
  • Loading branch information
roczei committed Nov 16, 2024
1 parent 9858ab6 commit e99e1a9
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2809,4 +2809,13 @@ package object config {
.version("4.0.0")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional

val SPARK_JSON_PROTOCOL_MAX_STRING_LENGTH =
ConfigBuilder("spark.jsonProtocol.maxStringLen")
.doc("")
.version("4.0.0")
.intConf
.checkValue(v => v > 0, "The value should be a positive integer.")
.createWithDefault(Int.MaxValue)

}
23 changes: 20 additions & 3 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import java.util.{Properties, UUID}
import scala.collection.Map
import scala.jdk.CollectionConverters._

import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.core.{JsonFactory, JsonGenerator, StreamReadConstraints}
import com.fasterxml.jackson.databind.{DeserializationFeature, JsonNode, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.json4s.jackson.JsonMethods.compact

import org.apache.spark._
import org.apache.spark.executor._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.metrics.ExecutorMetricType
import org.apache.spark.rdd.{DeterministicLevel, RDDOperationScope}
Expand All @@ -46,6 +48,8 @@ import org.apache.spark.util.Utils.weakIntern
private[spark] class JsonProtocolOptions(conf: SparkConf) {
val includeTaskMetricsAccumulators: Boolean =
conf.get(EVENT_LOG_INCLUDE_TASK_METRICS_ACCUMULATORS)

val maxJsonStringLength: Int = conf.get(SPARK_JSON_PROTOCOL_MAX_STRING_LENGTH)
}

/**
Expand All @@ -67,7 +71,20 @@ private[spark] object JsonProtocol extends JsonUtils {
// TODO: Remove this file and put JSON serialization into each individual class.

private[util]
val defaultOptions: JsonProtocolOptions = new JsonProtocolOptions(new SparkConf(false))
val defaultOptions: JsonProtocolOptions = new JsonProtocolOptions(new SparkConf(true))

private val streamReadConstraints: StreamReadConstraints = StreamReadConstraints
.builder()
.maxStringLength(defaultOptions.maxJsonStringLength)
.build()

private val jsonFactory = new JsonFactory().setStreamReadConstraints(streamReadConstraints)

logInfo(s"maxJsonStringLength: ${defaultOptions.maxJsonStringLength}")

protected override val mapper: ObjectMapper = new ObjectMapper(jsonFactory)
.registerModule(DefaultScalaModule)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)

/** ------------------------------------------------- *
* JSON serialization methods for SparkListenerEvents |
Expand Down

0 comments on commit e99e1a9

Please sign in to comment.