diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala index b12c3bf25bbf6..481f2b275f35a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala @@ -57,15 +57,15 @@ private[spark] class KubernetesExecutorBackend( resourceProfile) with Logging { override def extractAttributes: Map[String, String] = { - super.extractAttributes ++ - Map( - "LOG_FILES" -> "log", - "APP_ID" -> appId, - "EXECUTOR_ID" -> executorId, - "HOSTNAME" -> hostname, - "KUBERNETES_NAMESPACE" -> env.conf.get(KUBERNETES_NAMESPACE), - "KUBERNETES_POD_NAME" -> podName - ) + // give precedence to attributes extracted by overridden method + Map( + "LOG_FILES" -> "log", + "APP_ID" -> appId, + "EXECUTOR_ID" -> executorId, + "HOSTNAME" -> hostname, + "KUBERNETES_NAMESPACE" -> env.conf.get(KUBERNETES_NAMESPACE), + "KUBERNETES_POD_NAME" -> podName + ) ++ super.extractAttributes } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackendSuite.scala index 6fc185aff1fee..73228371aaa5b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackendSuite.scala @@ -49,25 +49,38 @@ class KubernetesExecutorBackendSuite extends SparkFunSuite { withEnvs( "SPARK_LOG_URL_STDOUT" -> "https://my.custom.url/logs/stdout", - "SPARK_LOG_URL_STDERR" -> "https://my.custom.url/logs/stderr") { + "SPARK_LOG_URL_STDERR" -> "https://my.custom.url/logs/stderr", + "SPARK_EXECUTOR_ATTRIBUTE_ENV_1" -> "val1", + "SPARK_EXECUTOR_ATTRIBUTE_ENV_2" -> "val2") { assert(backend.extractLogUrls === Map( "stdout" -> "https://my.custom.url/logs/stdout", "stderr" -> "https://my.custom.url/logs/stderr" )) - assert(backend.extractAttributes === expectedKubernetesAttributes) + assert(backend.extractAttributes === Map( + "ENV_1" -> "val1", "ENV_2" -> "val2" + ) ++ expectedKubernetesAttributes) } + // env vars have precedence withEnvs( - "SPARK_EXECUTOR_ATTRIBUTE_ENV_1" -> "val1", - "SPARK_EXECUTOR_ATTRIBUTE_ENV_2" -> "val2") { - assert(backend.extractLogUrls === Map.empty) + "SPARK_EXECUTOR_ATTRIBUTE_LOG_FILES" -> "env-log", + "SPARK_EXECUTOR_ATTRIBUTE_APP_ID" -> "env-app-id", + "SPARK_EXECUTOR_ATTRIBUTE_EXECUTOR_ID" -> "env-exec-id", + "SPARK_EXECUTOR_ATTRIBUTE_HOSTNAME" -> "env-hostname", + "SPARK_EXECUTOR_ATTRIBUTE_KUBERNETES_NAMESPACE" -> "env-namespace", + "SPARK_EXECUTOR_ATTRIBUTE_KUBERNETES_POD_NAME" -> "env-pod-name") { assert(backend.extractAttributes === Map( - "ENV_1" -> "val1", "ENV_2" -> "val2" - ) ++ expectedKubernetesAttributes) + "LOG_FILES" -> "env-log", + "APP_ID" -> "env-app-id", + "EXECUTOR_ID" -> "env-exec-id", + "HOSTNAME" -> "env-hostname", + "KUBERNETES_NAMESPACE" -> "env-namespace", + "KUBERNETES_POD_NAME" -> "env-pod-name" + )) } + // namespace 'default' above is the config default, here we set a value conf.set("spark.kubernetes.namespace", "my-namespace") - assert(backend.extractLogUrls === Map.empty) assert(backend.extractAttributes === expectedKubernetesAttributes ++ Map("KUBERNETES_NAMESPACE" -> "my-namespace")) }