From 4a920795f12e88798bd4fee23e2962e2e0c8a41f Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Thu, 7 Mar 2024 16:15:33 +0100 Subject: [PATCH 1/4] Support custom executor log urls --- docs/configuration.md | 2 - .../k8s/KubernetesExecutorBackend.scala | 44 +++++++++- .../k8s/KubernetesExecutorBackendSuite.scala | 83 +++++++++++++++++++ 3 files changed, 125 insertions(+), 4 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackendSuite.scala diff --git a/docs/configuration.md b/docs/configuration.md index 6833d4e54fd03..a83ce28c02b39 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1665,8 +1665,6 @@ Apart from these, the following properties are also available, and may be useful Please note that this configuration also replaces original log urls in event log, which will be also effective when accessing the application on history server. The new log urls must be permanent, otherwise you might have dead link for executor log urls. -

- For now, only YARN mode supports this configuration 3.0.0 diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala index 61570f5cf4bb2..d4d252a25d700 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala @@ -28,6 +28,46 @@ import org.apache.spark.rpc._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.util.Utils +/** + * Custom implementation of CoarseGrainedExecutorBackend for Kubernetes resource manager. + * This class provides kubernetes executor attributes. + */ +private[spark] class KubernetesExecutorBackend( + rpcEnv: RpcEnv, + appId: String, + driverUrl: String, + executorId: String, + bindAddress: String, + hostname: String, + podName: String, + cores: Int, + env: SparkEnv, + resourcesFile: Option[String], + resourceProfile: ResourceProfile) + extends CoarseGrainedExecutorBackend( + rpcEnv, + driverUrl, + executorId, + bindAddress, + hostname, + cores, + env, + resourcesFile, + resourceProfile) with Logging { + + override def extractAttributes: Map[String, String] = { + super.extractAttributes ++ + Map( + "LOG_FILES" -> "log", + "APP_ID" -> appId, + "EXECUTOR_ID" -> executorId, + "HOSTNAME" -> hostname, + "POD_NAME" -> podName + ) + } + +} + private[spark] object KubernetesExecutorBackend extends Logging { // Message used internally to start the executor when the driver successfully accepted the @@ -49,8 +89,8 @@ private[spark] object KubernetesExecutorBackend extends Logging { def main(args: Array[String]): Unit = { val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile, String) => CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile, execId) => - new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, execId, - arguments.bindAddress, arguments.hostname, arguments.cores, + new KubernetesExecutorBackend(rpcEnv, arguments.appId, arguments.driverUrl, execId, + arguments.bindAddress, arguments.hostname, arguments.podName, arguments.cores, env, arguments.resourcesFileOpt, resourceProfile) } run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackendSuite.scala new file mode 100644 index 0000000000000..0fce06ea177ec --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackendSuite.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import org.mockito.Mockito.mock + +import org.apache.spark.{SparkEnv, SparkFunSuite} +import org.apache.spark.resource.ResourceProfile +import org.apache.spark.rpc.RpcEnv + +class KubernetesExecutorBackendSuite extends SparkFunSuite { + test("extract log urls and attributes") { + val mockRpcEnv = mock(classOf[RpcEnv]) + val mockSparkEnv = mock(classOf[SparkEnv]) + val mockResourceProfile = mock(classOf[ResourceProfile]) + + val backend = new KubernetesExecutorBackend( + mockRpcEnv, "app-id", "driver-url", "executor-id", "bind-address", "hostname", "pod-name", + 1, mockSparkEnv, None, mockResourceProfile) + + val expectedKubernetesAttributes = Map( + "LOG_FILES" -> "log", + "APP_ID" -> "app-id", + "EXECUTOR_ID" -> "executor-id", + "HOSTNAME" -> "hostname", + "POD_NAME" -> "pod-name" + ) + + assert(backend.extractLogUrls === Map.empty) + assert(backend.extractAttributes === expectedKubernetesAttributes) + + withEnvs( + "SPARK_LOG_URL_STDOUT" -> "https://my.custom.url/logs/stdout", + "SPARK_LOG_URL_STDERR" -> "https://my.custom.url/logs/stderr") { + assert(backend.extractLogUrls === Map( + "stdout" -> "https://my.custom.url/logs/stdout", + "stderr" -> "https://my.custom.url/logs/stderr" + )) + assert(backend.extractAttributes === expectedKubernetesAttributes) + } + + withEnvs( + "SPARK_EXECUTOR_ATTRIBUTE_ENV_1" -> "val1", + "SPARK_EXECUTOR_ATTRIBUTE_ENV_2" -> "val2") { + assert(backend.extractLogUrls === Map.empty) + assert(backend.extractAttributes === Map( + "ENV_1" -> "val1", "ENV_2" -> "val2" + ) ++ expectedKubernetesAttributes) + } + } + + private def withEnvs(pairs: (String, String)*)(f: => Unit): Unit = { + val readonlyEnv = System.getenv() + val field = readonlyEnv.getClass.getDeclaredField("m") + field.setAccessible(true) + val modifiableEnv = field.get(readonlyEnv).asInstanceOf[java.util.Map[String, String]] + try { + for ((k, v) <- pairs) { + assert(!modifiableEnv.containsKey(k)) + modifiableEnv.put(k, v) + } + f + } finally { + for ((k, _) <- pairs) { + modifiableEnv.remove(k) + } + } + } +} From e5d73ca8cbeeaec0b52ba1ff4fb063fbc69fd2d1 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Mon, 11 Mar 2024 09:47:09 +0100 Subject: [PATCH 2/4] Add option to kubernetes docs --- .../org/apache/spark/internal/config/UI.scala | 2 +- docs/configuration.md | 2 +- docs/running-on-kubernetes.md | 23 +++++++++++++++++++ 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/UI.scala b/core/src/main/scala/org/apache/spark/internal/config/UI.scala index e02cb7a968314..333498e708c2f 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/UI.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/UI.scala @@ -221,7 +221,7 @@ private[spark] object UI { .createOptional val CUSTOM_EXECUTOR_LOG_URL = ConfigBuilder("spark.ui.custom.executor.log.url") - .doc("Specifies custom spark executor log url for supporting external log service instead of " + + .doc("Specifies custom Spark executor log url for supporting external log service instead of " + "using cluster managers' application log urls in the Spark UI. Spark will support " + "some path variables via patterns which can vary on cluster manager. Please check the " + "documentation for your cluster manager to see which patterns are supported, if any. " + diff --git a/docs/configuration.md b/docs/configuration.md index a83ce28c02b39..77e20cf985930 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1658,7 +1658,7 @@ Apart from these, the following properties are also available, and may be useful spark.ui.custom.executor.log.url (none) - Specifies custom spark executor log URL for supporting external log service instead of using cluster + Specifies custom Spark executor log URL for supporting external log service instead of using cluster managers' application log URLs in Spark UI. Spark will support some path variables via patterns which can vary on cluster manager. Please check the documentation for your cluster manager to see which patterns are supported, if any.

diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 655b30756a298..55d6839975042 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -1662,6 +1662,29 @@ See the [configuration page](configuration.html) for information on Spark config 3.3.0 + + spark.ui.custom.executor.log.url + (none) + + Specifies custom Spark executor log URL for supporting external log service instead of using cluster + managers' application log URLs in Spark UI. Spark will support some path variables via patterns + which can vary on cluster manager. Spark Kubernetes cluster manager supports the following path variables: +

+ Please note that this configuration also replaces original log urls in event log, + which will be also effective when accessing the application on history server. The new log urls must be + permanent, otherwise you might have dead link for executor log urls.

+ Example: Config value "https://my.custom.url/logs?app={{APP_ID}}&executor={{EXECUTOR_ID}}" + adds for application "app-example-123" and executor 1 this link to the Spark UI: + https://my.custom.url/logs?app=app-example-123&executor=1 + + 4.0.0 + #### Pod template properties From 45c3837445d97ee6e4a3cbf2cbdde59953187d2a Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Mon, 11 Mar 2024 17:39:48 +0100 Subject: [PATCH 3/4] Add KUBERNETES_NAMESPACE, rename pod name to KUBERNETES_POD_NAME --- docs/running-on-kubernetes.md | 3 ++- .../cluster/k8s/KubernetesExecutorBackend.scala | 4 +++- .../k8s/KubernetesExecutorBackendSuite.scala | 13 +++++++++++-- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 55d6839975042..a2172732ad481 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -1673,7 +1673,8 @@ See the [configuration page](configuration.html) for information on Spark config

  • APP_ID: The unique application id
  • EXECUTOR_ID: The executor id (a positive integer larger than zero)
  • HOSTNAME: The name of the host where the executor runs
  • -
  • POD_NAME: The name of the pod that contains the executor
  • +
  • KUBERNETES_NAMESPACE: The namespace where the executor pods run
  • +
  • KUBERNETES_POD_NAME: The name of the pod that contains the executor
  • FILE_NAME: The name of the log, which is always "log"
  • Please note that this configuration also replaces original log urls in event log, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala index d4d252a25d700..b12c3bf25bbf6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler.cluster.k8s import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.k8s.Config.KUBERNETES_NAMESPACE import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.executor.CoarseGrainedExecutorBackend import org.apache.spark.internal.Logging @@ -62,7 +63,8 @@ private[spark] class KubernetesExecutorBackend( "APP_ID" -> appId, "EXECUTOR_ID" -> executorId, "HOSTNAME" -> hostname, - "POD_NAME" -> podName + "KUBERNETES_NAMESPACE" -> env.conf.get(KUBERNETES_NAMESPACE), + "KUBERNETES_POD_NAME" -> podName ) } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackendSuite.scala index 0fce06ea177ec..6fc185aff1fee 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackendSuite.scala @@ -17,8 +17,9 @@ package org.apache.spark.scheduler.cluster.k8s import org.mockito.Mockito.mock +import org.mockito.Mockito.when -import org.apache.spark.{SparkEnv, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite} import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc.RpcEnv @@ -26,6 +27,8 @@ class KubernetesExecutorBackendSuite extends SparkFunSuite { test("extract log urls and attributes") { val mockRpcEnv = mock(classOf[RpcEnv]) val mockSparkEnv = mock(classOf[SparkEnv]) + val conf = new SparkConf() + when(mockSparkEnv.conf).thenReturn(conf) val mockResourceProfile = mock(classOf[ResourceProfile]) val backend = new KubernetesExecutorBackend( @@ -37,7 +40,8 @@ class KubernetesExecutorBackendSuite extends SparkFunSuite { "APP_ID" -> "app-id", "EXECUTOR_ID" -> "executor-id", "HOSTNAME" -> "hostname", - "POD_NAME" -> "pod-name" + "KUBERNETES_NAMESPACE" -> "default", + "KUBERNETES_POD_NAME" -> "pod-name" ) assert(backend.extractLogUrls === Map.empty) @@ -61,6 +65,11 @@ class KubernetesExecutorBackendSuite extends SparkFunSuite { "ENV_1" -> "val1", "ENV_2" -> "val2" ) ++ expectedKubernetesAttributes) } + + conf.set("spark.kubernetes.namespace", "my-namespace") + assert(backend.extractLogUrls === Map.empty) + assert(backend.extractAttributes === expectedKubernetesAttributes ++ + Map("KUBERNETES_NAMESPACE" -> "my-namespace")) } private def withEnvs(pairs: (String, String)*)(f: => Unit): Unit = { From 315a0bba4112de1c08641a165136e15f68539554 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Mon, 11 Mar 2024 21:29:54 +0100 Subject: [PATCH 4/4] Prefer env vars over kubernetes backend attributes --- .../k8s/KubernetesExecutorBackend.scala | 18 ++++++------ .../k8s/KubernetesExecutorBackendSuite.scala | 29 ++++++++++++++----- 2 files changed, 30 insertions(+), 17 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala index b12c3bf25bbf6..481f2b275f35a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala @@ -57,15 +57,15 @@ private[spark] class KubernetesExecutorBackend( resourceProfile) with Logging { override def extractAttributes: Map[String, String] = { - super.extractAttributes ++ - Map( - "LOG_FILES" -> "log", - "APP_ID" -> appId, - "EXECUTOR_ID" -> executorId, - "HOSTNAME" -> hostname, - "KUBERNETES_NAMESPACE" -> env.conf.get(KUBERNETES_NAMESPACE), - "KUBERNETES_POD_NAME" -> podName - ) + // give precedence to attributes extracted by overridden method + Map( + "LOG_FILES" -> "log", + "APP_ID" -> appId, + "EXECUTOR_ID" -> executorId, + "HOSTNAME" -> hostname, + "KUBERNETES_NAMESPACE" -> env.conf.get(KUBERNETES_NAMESPACE), + "KUBERNETES_POD_NAME" -> podName + ) ++ super.extractAttributes } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackendSuite.scala index 6fc185aff1fee..73228371aaa5b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackendSuite.scala @@ -49,25 +49,38 @@ class KubernetesExecutorBackendSuite extends SparkFunSuite { withEnvs( "SPARK_LOG_URL_STDOUT" -> "https://my.custom.url/logs/stdout", - "SPARK_LOG_URL_STDERR" -> "https://my.custom.url/logs/stderr") { + "SPARK_LOG_URL_STDERR" -> "https://my.custom.url/logs/stderr", + "SPARK_EXECUTOR_ATTRIBUTE_ENV_1" -> "val1", + "SPARK_EXECUTOR_ATTRIBUTE_ENV_2" -> "val2") { assert(backend.extractLogUrls === Map( "stdout" -> "https://my.custom.url/logs/stdout", "stderr" -> "https://my.custom.url/logs/stderr" )) - assert(backend.extractAttributes === expectedKubernetesAttributes) + assert(backend.extractAttributes === Map( + "ENV_1" -> "val1", "ENV_2" -> "val2" + ) ++ expectedKubernetesAttributes) } + // env vars have precedence withEnvs( - "SPARK_EXECUTOR_ATTRIBUTE_ENV_1" -> "val1", - "SPARK_EXECUTOR_ATTRIBUTE_ENV_2" -> "val2") { - assert(backend.extractLogUrls === Map.empty) + "SPARK_EXECUTOR_ATTRIBUTE_LOG_FILES" -> "env-log", + "SPARK_EXECUTOR_ATTRIBUTE_APP_ID" -> "env-app-id", + "SPARK_EXECUTOR_ATTRIBUTE_EXECUTOR_ID" -> "env-exec-id", + "SPARK_EXECUTOR_ATTRIBUTE_HOSTNAME" -> "env-hostname", + "SPARK_EXECUTOR_ATTRIBUTE_KUBERNETES_NAMESPACE" -> "env-namespace", + "SPARK_EXECUTOR_ATTRIBUTE_KUBERNETES_POD_NAME" -> "env-pod-name") { assert(backend.extractAttributes === Map( - "ENV_1" -> "val1", "ENV_2" -> "val2" - ) ++ expectedKubernetesAttributes) + "LOG_FILES" -> "env-log", + "APP_ID" -> "env-app-id", + "EXECUTOR_ID" -> "env-exec-id", + "HOSTNAME" -> "env-hostname", + "KUBERNETES_NAMESPACE" -> "env-namespace", + "KUBERNETES_POD_NAME" -> "env-pod-name" + )) } + // namespace 'default' above is the config default, here we set a value conf.set("spark.kubernetes.namespace", "my-namespace") - assert(backend.extractLogUrls === Map.empty) assert(backend.extractAttributes === expectedKubernetesAttributes ++ Map("KUBERNETES_NAMESPACE" -> "my-namespace")) }