diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index acf18683083d0..9de3d1a358c68 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -106,7 +106,7 @@ private[spark] class KubernetesClusterSchedulerBackend( "LOG_FILES" -> "log", "APP_ID" -> applicationId(), "KUBERNETES_NAMESPACE" -> namespace, - "KUBERNETES_POD_NAME" -> System.getenv(ENV_DRIVER_POD_NAME) + "KUBERNETES_POD_NAME" -> Option(System.getenv(ENV_DRIVER_POD_NAME)).getOrElse("[null]") )) override def start(): Unit = { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala index b2e4a7182a774..f07efc7e6cd70 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala @@ -32,6 +32,7 @@ import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Fabric8Aliases._ +import org.apache.spark.internal.config.UI import org.apache.spark.resource.{ResourceProfile, ResourceProfileManager} import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler.{ExecutorKilled, LiveListenerBus, TaskSchedulerImpl} @@ -47,6 +48,11 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn .set("spark.app.id", TEST_SPARK_APP_ID) .set(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL.key, "soLong") .set(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL_VALUE.key, "cruelWorld") + .set( + UI.CUSTOM_DRIVER_LOG_URL.key, + "https://my-custom.url/api/logs?applicationId={{APP_ID}}&namespace={{KUBERNETES_NAMESPACE}}" + + "&pod_name={{KUBERNETES_POD_NAME}}" + ) @Mock private var sc: SparkContext = _ @@ -259,4 +265,52 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn endpoint.receiveAndReply(context).apply(GenerateExecID("cheeseBurger")) verify(context).reply("1") } + + test("Driver attributes") { + assert(schedulerBackendUnderTest.getDriverAttributes === Some(Map( + "LOG_FILES" -> "log", + "APP_ID" -> "spark-app-id", + "KUBERNETES_NAMESPACE" -> "default", + "KUBERNETES_POD_NAME" -> "[null]" + ))) + withEnvs(ENV_DRIVER_POD_NAME -> "pod.name") { + assert(schedulerBackendUnderTest.getDriverAttributes === Some(Map( + "LOG_FILES" -> "log", + "APP_ID" -> "spark-app-id", + "KUBERNETES_NAMESPACE" -> "default", + "KUBERNETES_POD_NAME" -> "pod.name" + ))) + } + } + + test("Driver log urls") { + assert(schedulerBackendUnderTest.getDriverLogUrls === Some(Map( + "log" -> ("https://my-custom.url/api/logs?applicationId=spark-app-id&namespace=default" + + "&pod_name=[null]") + ))) + withEnvs(ENV_DRIVER_POD_NAME -> "pod.name") { + assert(schedulerBackendUnderTest.getDriverLogUrls === Some(Map( + "log" -> ("https://my-custom.url/api/logs?applicationId=spark-app-id&namespace=default" + + "&pod_name=pod.name") + ))) + } + } + + private def withEnvs(pairs: (String, String)*)(f: => Unit): Unit = { + val readonlyEnv = System.getenv() + val field = readonlyEnv.getClass.getDeclaredField("m") + field.setAccessible(true) + val modifiableEnv = field.get(readonlyEnv).asInstanceOf[java.util.Map[String, String]] + try { + for ((k, v) <- pairs) { + assert(!modifiableEnv.containsKey(k)) + modifiableEnv.put(k, v) + } + f + } finally { + for ((k, _) <- pairs) { + modifiableEnv.remove(k) + } + } + } }