Skip to content

Commit

Permalink
Support custom executor log urls
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Mar 7, 2024
1 parent 7a429aa commit bfd719d
Showing 1 changed file with 40 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,44 @@ import org.apache.spark.rpc._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.Utils

/**
* Custom implementation of CoarseGrainedExecutorBackend for Kubernetes resource manager.
* This class extracts executor log URLs and executor attributes from kubernetes labels.
*/
private[spark] class KubernetesExecutorBackend(
rpcEnv: RpcEnv,
appId: String,
driverUrl: String,
executorId: String,
bindAddress: String,
hostname: String,
cores: Int,
env: SparkEnv,
resourcesFile: Option[String],
resourceProfile: ResourceProfile)
extends CoarseGrainedExecutorBackend(
rpcEnv,
driverUrl,
executorId,
bindAddress,
hostname,
cores,
env,
resourcesFile,
resourceProfile) with Logging {

override def extractLogUrls: Map[String, String] = Map.empty

override def extractAttributes: Map[String, String] = {
Map(
"LOG_FILES" -> "log",
"APP_ID" -> appId,
"EXECUTOR_ID" -> executorId
)
}

}

private[spark] object KubernetesExecutorBackend extends Logging {

// Message used internally to start the executor when the driver successfully accepted the
Expand All @@ -49,7 +87,7 @@ private[spark] object KubernetesExecutorBackend extends Logging {
def main(args: Array[String]): Unit = {
val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile, String) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile, execId) =>
new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, execId,
new KubernetesExecutorBackend(rpcEnv, arguments.appId, arguments.driverUrl, execId,
arguments.bindAddress, arguments.hostname, arguments.cores,
env, arguments.resourcesFileOpt, resourceProfile)
}
Expand Down Expand Up @@ -129,6 +167,7 @@ private[spark] object KubernetesExecutorBackend extends Logging {
env.rpcEnv.setupEndpoint("WorkerWatcher",
new WorkerWatcher(env.rpcEnv, url, isChildProcessStopping = backend.stopping))
}
sys.env.foreach(e => log.error(e._1 + "=" + e._2))
env.rpcEnv.awaitTermination()
}
}
Expand Down

0 comments on commit bfd719d

Please sign in to comment.