diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index bd516bc64bb15..ed4bee9b89f14 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -3155,6 +3155,44 @@ ], "sqlState" : "42613" }, + "INVALID_SPARK_CONFIG" : { + "message" : [ + "Invalid Spark config:" + ], + "subClass" : { + "INVALID_EXECUTOR_HEARTBEAT_INTERVAL" : { + "message" : [ + "The value of =ms must be greater than the value of =ms." + ] + }, + "INVALID_EXECUTOR_MEMORY_OPTIONS" : { + "message" : [ + " is not allowed to specify max heap memory settings (was ''). Use spark.executor.memory instead." + ] + }, + "INVALID_EXECUTOR_SPARK_OPTIONS" : { + "message" : [ + " is not allowed to set Spark options (was ''). Set them directly on a SparkConf or in a properties file when using ./bin/spark-submit." + ] + }, + "INVALID_MEMORY_FRACTION" : { + "message" : [ + " should be between 0 and 1 (was '')." + ] + }, + "INVALID_SPARK_SUBMIT_DEPLOY_MODE_KEY" : { + "message" : [ + " can only be \"cluster\" or \"client\"." + ] + }, + "NETWORK_AUTH_MUST_BE_ENABLED" : { + "message" : [ + " must be enabled when enabling encryption." + ] + } + }, + "sqlState" : "42616" + }, "INVALID_SQL_ARG" : { "message" : [ "The argument of `sql()` is invalid. Consider to replace it either by a SQL literal or by collection constructor functions such as `map()`, `array()`, `struct()`." diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index ae6ef1ee55608..bbf2809b7db23 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -562,15 +562,16 @@ class SparkConf(loadDefaults: Boolean) Seq(EXECUTOR_JAVA_OPTIONS.key, "spark.executor.defaultJavaOptions").foreach { executorOptsKey => getOption(executorOptsKey).foreach { javaOpts => if (javaOpts.contains("-Dspark")) { - val msg = s"$executorOptsKey is not allowed to set Spark options (was '$javaOpts'). " + - "Set them directly on a SparkConf or in a properties file " + - "when using ./bin/spark-submit." - throw new Exception(msg) + throw new SparkException( + errorClass = "INVALID_SPARK_CONFIG.INVALID_EXECUTOR_SPARK_OPTIONS", + messageParameters = Map("executorOptsKey" -> executorOptsKey, "javaOpts" -> javaOpts), + cause = null) } if (javaOpts.contains("-Xmx")) { - val msg = s"$executorOptsKey is not allowed to specify max heap memory settings " + - s"(was '$javaOpts'). Use spark.executor.memory instead." - throw new Exception(msg) + throw new SparkException( + errorClass = "INVALID_SPARK_CONFIG.INVALID_EXECUTOR_MEMORY_OPTIONS", + messageParameters = Map("executorOptsKey" -> executorOptsKey, "javaOpts" -> javaOpts), + cause = null) } } } @@ -581,13 +582,21 @@ class SparkConf(loadDefaults: Boolean) if (value > 1 || value < 0) { throw new IllegalArgumentException(s"$key should be between 0 and 1 (was '$value').") } + SparkException.require( + value >= 0 && value <= 1, + errorClass = "INVALID_SPARK_CONFIG.INVALID_MEMORY_FRACTION", + messageParameters = Map( + "memoryFractionKey" -> key, + "memoryFractionValue" -> value.toString)) } if (contains(SUBMIT_DEPLOY_MODE)) { get(SUBMIT_DEPLOY_MODE) match { case "cluster" | "client" => - case e => throw new SparkException(s"${SUBMIT_DEPLOY_MODE.key} can only be " + - "\"cluster\" or \"client\".") + case _ => throw new SparkException( + errorClass = "INVALID_SPARK_CONFIG.INVALID_SPARK_SUBMIT_DEPLOY_MODE_KEY", + messageParameters = Map("sparkSubmitDeployModeKey" -> SUBMIT_DEPLOY_MODE.key), + cause = null) } } @@ -607,17 +616,23 @@ class SparkConf(loadDefaults: Boolean) } val encryptionEnabled = get(NETWORK_CRYPTO_ENABLED) || get(SASL_ENCRYPTION_ENABLED) - require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED), - s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.") + SparkException.require( + !encryptionEnabled || get(NETWORK_AUTH_ENABLED), + errorClass = "INVALID_SPARK_CONFIG.NETWORK_AUTH_MUST_BE_ENABLED", + messageParameters = Map("networkAuthEnabledConf" -> NETWORK_AUTH_ENABLED.key)) val executorTimeoutThresholdMs = get(NETWORK_TIMEOUT) * 1000 val executorHeartbeatIntervalMs = get(EXECUTOR_HEARTBEAT_INTERVAL) - val networkTimeout = NETWORK_TIMEOUT.key // If spark.executor.heartbeatInterval bigger than spark.network.timeout, // it will almost always cause ExecutorLostFailure. See SPARK-22754. - require(executorTimeoutThresholdMs > executorHeartbeatIntervalMs, "The value of " + - s"${networkTimeout}=${executorTimeoutThresholdMs}ms must be greater than the value of " + - s"${EXECUTOR_HEARTBEAT_INTERVAL.key}=${executorHeartbeatIntervalMs}ms.") + SparkException.require( + executorTimeoutThresholdMs > executorHeartbeatIntervalMs, + errorClass = "INVALID_SPARK_CONFIG.INVALID_EXECUTOR_HEARTBEAT_INTERVAL", + messageParameters = Map( + "networkTimeoutKey" -> NETWORK_TIMEOUT.key, + "networkTimeoutValue" -> executorTimeoutThresholdMs.toString, + "executorHeartbeatIntervalKey" -> EXECUTOR_HEARTBEAT_INTERVAL.key, + "executorHeartbeatIntervalValue" -> executorHeartbeatIntervalMs.toString)) } /**