Skip to content

Commit

Permalink
Prefer env vars over kubernetes backend attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Mar 11, 2024
1 parent 2339975 commit 80070ef
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ private[spark] class KubernetesExecutorBackend(
resourceProfile) with Logging {

override def extractAttributes: Map[String, String] = {
super.extractAttributes ++
Map(
"LOG_FILES" -> "log",
"APP_ID" -> appId,
"EXECUTOR_ID" -> executorId,
"HOSTNAME" -> hostname,
"KUBERNETES_NAMESPACE" -> env.conf.get(KUBERNETES_NAMESPACE),
"KUBERNETES_POD_NAME" -> podName
)
// give precedence to attributes extracted by overridden method
Map(
"LOG_FILES" -> "log",
"APP_ID" -> appId,
"EXECUTOR_ID" -> executorId,
"HOSTNAME" -> hostname,
"KUBERNETES_NAMESPACE" -> env.conf.get(KUBERNETES_NAMESPACE),
"KUBERNETES_POD_NAME" -> podName
) ++ super.extractAttributes
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,25 +49,38 @@ class KubernetesExecutorBackendSuite extends SparkFunSuite {

withEnvs(
"SPARK_LOG_URL_STDOUT" -> "https://my.custom.url/logs/stdout",
"SPARK_LOG_URL_STDERR" -> "https://my.custom.url/logs/stderr") {
"SPARK_LOG_URL_STDERR" -> "https://my.custom.url/logs/stderr",
"SPARK_EXECUTOR_ATTRIBUTE_ENV_1" -> "val1",
"SPARK_EXECUTOR_ATTRIBUTE_ENV_2" -> "val2") {
assert(backend.extractLogUrls === Map(
"stdout" -> "https://my.custom.url/logs/stdout",
"stderr" -> "https://my.custom.url/logs/stderr"
))
assert(backend.extractAttributes === expectedKubernetesAttributes)
assert(backend.extractAttributes === Map(
"ENV_1" -> "val1", "ENV_2" -> "val2"
) ++ expectedKubernetesAttributes)
}

// env vars have precedence
withEnvs(
"SPARK_EXECUTOR_ATTRIBUTE_ENV_1" -> "val1",
"SPARK_EXECUTOR_ATTRIBUTE_ENV_2" -> "val2") {
assert(backend.extractLogUrls === Map.empty)
"SPARK_EXECUTOR_ATTRIBUTE_LOG_FILES" -> "env-log",
"SPARK_EXECUTOR_ATTRIBUTE_APP_ID" -> "env-app-id",
"SPARK_EXECUTOR_ATTRIBUTE_EXECUTOR_ID" -> "env-exec-id",
"SPARK_EXECUTOR_ATTRIBUTE_HOSTNAME" -> "env-hostname",
"SPARK_EXECUTOR_ATTRIBUTE_KUBERNETES_NAMESPACE" -> "env-namespace",
"SPARK_EXECUTOR_ATTRIBUTE_KUBERNETES_POD_NAME" -> "env-pod-name") {
assert(backend.extractAttributes === Map(
"ENV_1" -> "val1", "ENV_2" -> "val2"
) ++ expectedKubernetesAttributes)
"LOG_FILES" -> "env-log",
"APP_ID" -> "env-app-id",
"EXECUTOR_ID" -> "env-exec-id",
"HOSTNAME" -> "env-hostname",
"KUBERNETES_NAMESPACE" -> "env-namespace",
"KUBERNETES_POD_NAME" -> "env-pod-name"
))
}

// namespace 'default' above is the config default, here we set a value
conf.set("spark.kubernetes.namespace", "my-namespace")
assert(backend.extractLogUrls === Map.empty)
assert(backend.extractAttributes === expectedKubernetesAttributes ++
Map("KUBERNETES_NAMESPACE" -> "my-namespace"))
}
Expand Down

0 comments on commit 80070ef

Please sign in to comment.