Skip to content

Commit

Permalink
[SPARK-50756][CORE] Use error class for exceptions in SparkConf.valid…
Browse files Browse the repository at this point in the history
…ateSettings

### What changes were proposed in this pull request?
This change is to use error class for exceptions thrown in SparkConf.validateSettings.

### Why are the changes needed?
To adapt to the error class framework.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#49491 from bozhang2820/spark-50756.

Authored-by: Bo Zhang <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
bozhang2820 authored and HyukjinKwon committed Jan 16, 2025
1 parent bd56285 commit 255d923
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 15 deletions.
38 changes: 38 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -3155,6 +3155,44 @@
],
"sqlState" : "42613"
},
"INVALID_SPARK_CONFIG" : {
"message" : [
"Invalid Spark config:"
],
"subClass" : {
"INVALID_EXECUTOR_HEARTBEAT_INTERVAL" : {
"message" : [
"The value of <networkTimeoutKey>=<networkTimeoutValue>ms must be greater than the value of <executorHeartbeatIntervalKey>=<executorHeartbeatIntervalValue>ms."
]
},
"INVALID_EXECUTOR_MEMORY_OPTIONS" : {
"message" : [
"<executorOptsKey> is not allowed to specify max heap memory settings (was '<javaOpts>'). Use spark.executor.memory instead."
]
},
"INVALID_EXECUTOR_SPARK_OPTIONS" : {
"message" : [
"<executorOptsKey> is not allowed to set Spark options (was '<javaOpts>'). Set them directly on a SparkConf or in a properties file when using ./bin/spark-submit."
]
},
"INVALID_MEMORY_FRACTION" : {
"message" : [
"<memoryFractionKey> should be between 0 and 1 (was '<memoryFractionValue>')."
]
},
"INVALID_SPARK_SUBMIT_DEPLOY_MODE_KEY" : {
"message" : [
"<sparkSubmitDeployModeKey> can only be \"cluster\" or \"client\"."
]
},
"NETWORK_AUTH_MUST_BE_ENABLED" : {
"message" : [
"<networkAuthEnabledConf> must be enabled when enabling encryption."
]
}
},
"sqlState" : "42616"
},
"INVALID_SQL_ARG" : {
"message" : [
"The argument <name> of `sql()` is invalid. Consider to replace it either by a SQL literal or by collection constructor functions such as `map()`, `array()`, `struct()`."
Expand Down
45 changes: 30 additions & 15 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -562,15 +562,16 @@ class SparkConf(loadDefaults: Boolean)
Seq(EXECUTOR_JAVA_OPTIONS.key, "spark.executor.defaultJavaOptions").foreach { executorOptsKey =>
getOption(executorOptsKey).foreach { javaOpts =>
if (javaOpts.contains("-Dspark")) {
val msg = s"$executorOptsKey is not allowed to set Spark options (was '$javaOpts'). " +
"Set them directly on a SparkConf or in a properties file " +
"when using ./bin/spark-submit."
throw new Exception(msg)
throw new SparkException(
errorClass = "INVALID_SPARK_CONFIG.INVALID_EXECUTOR_SPARK_OPTIONS",
messageParameters = Map("executorOptsKey" -> executorOptsKey, "javaOpts" -> javaOpts),
cause = null)
}
if (javaOpts.contains("-Xmx")) {
val msg = s"$executorOptsKey is not allowed to specify max heap memory settings " +
s"(was '$javaOpts'). Use spark.executor.memory instead."
throw new Exception(msg)
throw new SparkException(
errorClass = "INVALID_SPARK_CONFIG.INVALID_EXECUTOR_MEMORY_OPTIONS",
messageParameters = Map("executorOptsKey" -> executorOptsKey, "javaOpts" -> javaOpts),
cause = null)
}
}
}
Expand All @@ -581,13 +582,21 @@ class SparkConf(loadDefaults: Boolean)
if (value > 1 || value < 0) {
throw new IllegalArgumentException(s"$key should be between 0 and 1 (was '$value').")
}
SparkException.require(
value >= 0 && value <= 1,
errorClass = "INVALID_SPARK_CONFIG.INVALID_MEMORY_FRACTION",
messageParameters = Map(
"memoryFractionKey" -> key,
"memoryFractionValue" -> value.toString))
}

if (contains(SUBMIT_DEPLOY_MODE)) {
get(SUBMIT_DEPLOY_MODE) match {
case "cluster" | "client" =>
case e => throw new SparkException(s"${SUBMIT_DEPLOY_MODE.key} can only be " +
"\"cluster\" or \"client\".")
case _ => throw new SparkException(
errorClass = "INVALID_SPARK_CONFIG.INVALID_SPARK_SUBMIT_DEPLOY_MODE_KEY",
messageParameters = Map("sparkSubmitDeployModeKey" -> SUBMIT_DEPLOY_MODE.key),
cause = null)
}
}

Expand All @@ -607,17 +616,23 @@ class SparkConf(loadDefaults: Boolean)
}

val encryptionEnabled = get(NETWORK_CRYPTO_ENABLED) || get(SASL_ENCRYPTION_ENABLED)
require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED),
s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.")
SparkException.require(
!encryptionEnabled || get(NETWORK_AUTH_ENABLED),
errorClass = "INVALID_SPARK_CONFIG.NETWORK_AUTH_MUST_BE_ENABLED",
messageParameters = Map("networkAuthEnabledConf" -> NETWORK_AUTH_ENABLED.key))

val executorTimeoutThresholdMs = get(NETWORK_TIMEOUT) * 1000
val executorHeartbeatIntervalMs = get(EXECUTOR_HEARTBEAT_INTERVAL)
val networkTimeout = NETWORK_TIMEOUT.key
// If spark.executor.heartbeatInterval bigger than spark.network.timeout,
// it will almost always cause ExecutorLostFailure. See SPARK-22754.
require(executorTimeoutThresholdMs > executorHeartbeatIntervalMs, "The value of " +
s"${networkTimeout}=${executorTimeoutThresholdMs}ms must be greater than the value of " +
s"${EXECUTOR_HEARTBEAT_INTERVAL.key}=${executorHeartbeatIntervalMs}ms.")
SparkException.require(
executorTimeoutThresholdMs > executorHeartbeatIntervalMs,
errorClass = "INVALID_SPARK_CONFIG.INVALID_EXECUTOR_HEARTBEAT_INTERVAL",
messageParameters = Map(
"networkTimeoutKey" -> NETWORK_TIMEOUT.key,
"networkTimeoutValue" -> executorTimeoutThresholdMs.toString,
"executorHeartbeatIntervalKey" -> EXECUTOR_HEARTBEAT_INTERVAL.key,
"executorHeartbeatIntervalValue" -> executorHeartbeatIntervalMs.toString))
}

/**
Expand Down

0 comments on commit 255d923

Please sign in to comment.