diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala index 61570f5cf4bb2..741aff939d439 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala @@ -28,6 +28,44 @@ import org.apache.spark.rpc._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.util.Utils +/** + * Custom implementation of CoarseGrainedExecutorBackend for Kubernetes resource manager. + * This class extracts executor log URLs and executor attributes from kubernetes labels. + */ +private[spark] class KubernetesExecutorBackend( + rpcEnv: RpcEnv, + appId: String, + driverUrl: String, + executorId: String, + bindAddress: String, + hostname: String, + cores: Int, + env: SparkEnv, + resourcesFile: Option[String], + resourceProfile: ResourceProfile) + extends CoarseGrainedExecutorBackend( + rpcEnv, + driverUrl, + executorId, + bindAddress, + hostname, + cores, + env, + resourcesFile, + resourceProfile) with Logging { + + override def extractLogUrls: Map[String, String] = Map.empty + + override def extractAttributes: Map[String, String] = { + Map( + "LOG_FILES" -> "log", + "APP_ID" -> appId, + "EXECUTOR_ID" -> executorId + ) + } + +} + private[spark] object KubernetesExecutorBackend extends Logging { // Message used internally to start the executor when the driver successfully accepted the @@ -49,7 +87,7 @@ private[spark] object KubernetesExecutorBackend extends Logging { def main(args: Array[String]): Unit = { val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile, String) => CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile, execId) => - new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, execId, + new KubernetesExecutorBackend(rpcEnv, arguments.appId, arguments.driverUrl, execId, arguments.bindAddress, arguments.hostname, arguments.cores, env, arguments.resourcesFileOpt, resourceProfile) } @@ -129,6 +167,7 @@ private[spark] object KubernetesExecutorBackend extends Logging { env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url, isChildProcessStopping = backend.stopping)) } + sys.env.foreach(e => log.error(e._1 + "=" + e._2)) env.rpcEnv.awaitTermination() } }