diff --git a/core/src/main/scala/org/apache/spark/internal/config/UI.scala b/core/src/main/scala/org/apache/spark/internal/config/UI.scala
index e02cb7a968314..333498e708c2f 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/UI.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/UI.scala
@@ -221,7 +221,7 @@ private[spark] object UI {
.createOptional
val CUSTOM_EXECUTOR_LOG_URL = ConfigBuilder("spark.ui.custom.executor.log.url")
- .doc("Specifies custom spark executor log url for supporting external log service instead of " +
+ .doc("Specifies custom Spark executor log url for supporting external log service instead of " +
"using cluster managers' application log urls in the Spark UI. Spark will support " +
"some path variables via patterns which can vary on cluster manager. Please check the " +
"documentation for your cluster manager to see which patterns are supported, if any. " +
diff --git a/docs/configuration.md b/docs/configuration.md
index 6833d4e54fd03..77e20cf985930 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1658,15 +1658,13 @@ Apart from these, the following properties are also available, and may be useful
spark.ui.custom.executor.log.url |
(none) |
- Specifies custom spark executor log URL for supporting external log service instead of using cluster
+ Specifies custom Spark executor log URL for supporting external log service instead of using cluster
managers' application log URLs in Spark UI. Spark will support some path variables via patterns
which can vary on cluster manager. Please check the documentation for your cluster manager to
see which patterns are supported, if any.
Please note that this configuration also replaces original log urls in event log,
which will be also effective when accessing the application on history server. The new log urls must be
permanent, otherwise you might have dead link for executor log urls.
-
- For now, only YARN mode supports this configuration
|
3.0.0 |
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 655b30756a298..a2172732ad481 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -1662,6 +1662,30 @@ See the [configuration page](configuration.html) for information on Spark config
3.3.0 |
+
+ spark.ui.custom.executor.log.url |
+ (none) |
+
+ Specifies custom Spark executor log URL for supporting external log service instead of using cluster
+ managers' application log URLs in Spark UI. Spark will support some path variables via patterns
+ which can vary on cluster manager. Spark Kubernetes cluster manager supports the following path variables:
+
+ APP_ID : The unique application id
+ EXECUTOR_ID : The executor id (a positive integer larger than zero)
+ HOSTNAME : The name of the host where the executor runs
+ KUBERNETES_NAMESPACE : The namespace where the executor pods run
+ KUBERNETES_POD_NAME : The name of the pod that contains the executor
+ FILE_NAME : The name of the log, which is always "log"
+
+ Please note that this configuration also replaces original log urls in event log,
+ which will be also effective when accessing the application on history server. The new log urls must be
+ permanent, otherwise you might have dead link for executor log urls.
+ Example: Config value "https://my.custom.url/logs?app={{APP_ID}}&executor={{EXECUTOR_ID}}"
+ adds for application "app-example-123" and executor 1 this link to the Spark UI:
+ https://my.custom.url/logs?app=app-example-123&executor=1
+ |
+ 4.0.0 |
+
#### Pod template properties
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala
index 61570f5cf4bb2..481f2b275f35a 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala
@@ -18,6 +18,7 @@ package org.apache.spark.scheduler.cluster.k8s
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.k8s.Config.KUBERNETES_NAMESPACE
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.executor.CoarseGrainedExecutorBackend
import org.apache.spark.internal.Logging
@@ -28,6 +29,47 @@ import org.apache.spark.rpc._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.Utils
+/**
+ * Custom implementation of CoarseGrainedExecutorBackend for Kubernetes resource manager.
+ * This class provides kubernetes executor attributes.
+ */
+private[spark] class KubernetesExecutorBackend(
+ rpcEnv: RpcEnv,
+ appId: String,
+ driverUrl: String,
+ executorId: String,
+ bindAddress: String,
+ hostname: String,
+ podName: String,
+ cores: Int,
+ env: SparkEnv,
+ resourcesFile: Option[String],
+ resourceProfile: ResourceProfile)
+ extends CoarseGrainedExecutorBackend(
+ rpcEnv,
+ driverUrl,
+ executorId,
+ bindAddress,
+ hostname,
+ cores,
+ env,
+ resourcesFile,
+ resourceProfile) with Logging {
+
+ override def extractAttributes: Map[String, String] = {
+ // give precedence to attributes extracted by overridden method
+ Map(
+ "LOG_FILES" -> "log",
+ "APP_ID" -> appId,
+ "EXECUTOR_ID" -> executorId,
+ "HOSTNAME" -> hostname,
+ "KUBERNETES_NAMESPACE" -> env.conf.get(KUBERNETES_NAMESPACE),
+ "KUBERNETES_POD_NAME" -> podName
+ ) ++ super.extractAttributes
+ }
+
+}
+
private[spark] object KubernetesExecutorBackend extends Logging {
// Message used internally to start the executor when the driver successfully accepted the
@@ -49,8 +91,8 @@ private[spark] object KubernetesExecutorBackend extends Logging {
def main(args: Array[String]): Unit = {
val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile, String) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile, execId) =>
- new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, execId,
- arguments.bindAddress, arguments.hostname, arguments.cores,
+ new KubernetesExecutorBackend(rpcEnv, arguments.appId, arguments.driverUrl, execId,
+ arguments.bindAddress, arguments.hostname, arguments.podName, arguments.cores,
env, arguments.resourcesFileOpt, resourceProfile)
}
run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackendSuite.scala
new file mode 100644
index 0000000000000..73228371aaa5b
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackendSuite.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import org.mockito.Mockito.mock
+import org.mockito.Mockito.when
+
+import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.resource.ResourceProfile
+import org.apache.spark.rpc.RpcEnv
+
+class KubernetesExecutorBackendSuite extends SparkFunSuite {
+ test("extract log urls and attributes") {
+ val mockRpcEnv = mock(classOf[RpcEnv])
+ val mockSparkEnv = mock(classOf[SparkEnv])
+ val conf = new SparkConf()
+ when(mockSparkEnv.conf).thenReturn(conf)
+ val mockResourceProfile = mock(classOf[ResourceProfile])
+
+ val backend = new KubernetesExecutorBackend(
+ mockRpcEnv, "app-id", "driver-url", "executor-id", "bind-address", "hostname", "pod-name",
+ 1, mockSparkEnv, None, mockResourceProfile)
+
+ val expectedKubernetesAttributes = Map(
+ "LOG_FILES" -> "log",
+ "APP_ID" -> "app-id",
+ "EXECUTOR_ID" -> "executor-id",
+ "HOSTNAME" -> "hostname",
+ "KUBERNETES_NAMESPACE" -> "default",
+ "KUBERNETES_POD_NAME" -> "pod-name"
+ )
+
+ assert(backend.extractLogUrls === Map.empty)
+ assert(backend.extractAttributes === expectedKubernetesAttributes)
+
+ withEnvs(
+ "SPARK_LOG_URL_STDOUT" -> "https://my.custom.url/logs/stdout",
+ "SPARK_LOG_URL_STDERR" -> "https://my.custom.url/logs/stderr",
+ "SPARK_EXECUTOR_ATTRIBUTE_ENV_1" -> "val1",
+ "SPARK_EXECUTOR_ATTRIBUTE_ENV_2" -> "val2") {
+ assert(backend.extractLogUrls === Map(
+ "stdout" -> "https://my.custom.url/logs/stdout",
+ "stderr" -> "https://my.custom.url/logs/stderr"
+ ))
+ assert(backend.extractAttributes === Map(
+ "ENV_1" -> "val1", "ENV_2" -> "val2"
+ ) ++ expectedKubernetesAttributes)
+ }
+
+ // env vars have precedence
+ withEnvs(
+ "SPARK_EXECUTOR_ATTRIBUTE_LOG_FILES" -> "env-log",
+ "SPARK_EXECUTOR_ATTRIBUTE_APP_ID" -> "env-app-id",
+ "SPARK_EXECUTOR_ATTRIBUTE_EXECUTOR_ID" -> "env-exec-id",
+ "SPARK_EXECUTOR_ATTRIBUTE_HOSTNAME" -> "env-hostname",
+ "SPARK_EXECUTOR_ATTRIBUTE_KUBERNETES_NAMESPACE" -> "env-namespace",
+ "SPARK_EXECUTOR_ATTRIBUTE_KUBERNETES_POD_NAME" -> "env-pod-name") {
+ assert(backend.extractAttributes === Map(
+ "LOG_FILES" -> "env-log",
+ "APP_ID" -> "env-app-id",
+ "EXECUTOR_ID" -> "env-exec-id",
+ "HOSTNAME" -> "env-hostname",
+ "KUBERNETES_NAMESPACE" -> "env-namespace",
+ "KUBERNETES_POD_NAME" -> "env-pod-name"
+ ))
+ }
+
+ // namespace 'default' above is the config default, here we set a value
+ conf.set("spark.kubernetes.namespace", "my-namespace")
+ assert(backend.extractAttributes === expectedKubernetesAttributes ++
+ Map("KUBERNETES_NAMESPACE" -> "my-namespace"))
+ }
+
+ private def withEnvs(pairs: (String, String)*)(f: => Unit): Unit = {
+ val readonlyEnv = System.getenv()
+ val field = readonlyEnv.getClass.getDeclaredField("m")
+ field.setAccessible(true)
+ val modifiableEnv = field.get(readonlyEnv).asInstanceOf[java.util.Map[String, String]]
+ try {
+ for ((k, v) <- pairs) {
+ assert(!modifiableEnv.containsKey(k))
+ modifiableEnv.put(k, v)
+ }
+ f
+ } finally {
+ for ((k, _) <- pairs) {
+ modifiableEnv.remove(k)
+ }
+ }
+ }
+}