Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Commit

Permalink
Use the driver pod IP address for spark.driver.bindAddress (#533)
Browse files Browse the repository at this point in the history
* Use the driver pod IP address for spark.driver.bindAddress

* Addressed comments

* Addressed more comments

* Fixed broken DriverServiceBootstrapStepSuite
  • Loading branch information
liyinan926 authored Oct 26, 2017
1 parent 0abf0b9 commit 6b1caca
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ package object constants {
// Environment Variables
private[spark] val ENV_EXECUTOR_PORT = "SPARK_EXECUTOR_PORT"
private[spark] val ENV_DRIVER_URL = "SPARK_DRIVER_URL"
private[spark] val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS"
private[spark] val ENV_EXECUTOR_CORES = "SPARK_EXECUTOR_CORES"
private[spark] val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY"
private[spark] val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.ConfigurationUtils
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep}
import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, DriverServiceBootstrapStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep}
import org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator
import org.apache.spark.deploy.k8s.submit.submitsteps.LocalDirectoryMountConfigurationStep
import org.apache.spark.launcher.SparkLauncher
Expand Down Expand Up @@ -103,7 +103,7 @@ private[spark] class DriverConfigurationStepsOrchestrator(
mainClass,
appArgs,
submissionSparkConf)
val driverAddressStep = new DriverAddressConfigurationStep(
val driverAddressStep = new DriverServiceBootstrapStep(
kubernetesResourceNamePrefix,
allDriverLabels,
submissionSparkConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.deploy.k8s.submit.submitsteps

import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, PodBuilder, QuantityBuilder}
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
import scala.collection.JavaConverters._

import org.apache.spark.SparkConf
Expand Down Expand Up @@ -114,6 +114,12 @@ private[spark] class BaseDriverConfigurationStep(
.withName(ENV_DRIVER_ARGS)
.withValue(appArgs.mkString(" "))
.endEnv()
.addNewEnv()
.withName(ENV_DRIVER_BIND_ADDRESS)
.withValueFrom(new EnvVarSourceBuilder()
.withNewFieldRef("v1", "status.podIP")
.build())
.endEnv()
.withNewResources()
.addToRequests("cpu", driverCpuQuantity)
.addToRequests("memory", driverMemoryQuantity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,20 @@ import org.apache.spark.util.Clock
* Allows the driver to be reachable by executor pods through a headless service. The service's
* ports should correspond to the ports that the executor will reach the pod at for RPC.
*/
private[spark] class DriverAddressConfigurationStep(
private[spark] class DriverServiceBootstrapStep(
kubernetesResourceNamePrefix: String,
driverLabels: Map[String, String],
submissionSparkConf: SparkConf,
clock: Clock) extends DriverConfigurationStep with Logging {
import DriverAddressConfigurationStep._
import DriverServiceBootstrapStep._

override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
require(submissionSparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty,
s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as the driver's hostname" +
s" will be managed via a Kubernetes service.")
s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as the driver's bind" +
s" address is managed and set to the driver pod's IP address.")
require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty,
s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be" +
s" managed via a Kubernetes service.")
s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be" +
s" managed via a Kubernetes service.")

val preferredServiceName = s"$kubernetesResourceNamePrefix$DRIVER_SVC_POSTFIX"
val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
Expand All @@ -51,8 +51,8 @@ private[spark] class DriverAddressConfigurationStep(
val randomServiceId = clock.getTimeMillis()
val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is" +
s" too long (must be <= 63 characters). Falling back to use $shorterServiceName" +
s" as the driver service's name.")
s" too long (must be <= 63 characters). Falling back to use $shorterServiceName" +
s" as the driver service's name.")
shorterServiceName
}

Expand Down Expand Up @@ -82,19 +82,18 @@ private[spark] class DriverAddressConfigurationStep(
val namespace = submissionSparkConf.get(KUBERNETES_NAMESPACE)
val driverHostname = s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local"
val resolvedSparkConf = driverSpec.driverSparkConf.clone()
.set(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS, driverHostname)
.set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, driverHostname)
.set("spark.driver.port", driverPort.toString)
.set(
org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, driverBlockManagerPort)
org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, driverBlockManagerPort)

driverSpec.copy(
driverSparkConf = resolvedSparkConf,
otherKubernetesResources = driverSpec.otherKubernetesResources ++ Seq(driverService))
driverSparkConf = resolvedSparkConf,
otherKubernetesResources = driverSpec.otherKubernetesResources ++ Seq(driverService))
}
}

private[spark] object DriverAddressConfigurationStep {
private[spark] object DriverServiceBootstrapStep {
val DRIVER_BIND_ADDRESS_KEY = org.apache.spark.internal.config.DRIVER_BIND_ADDRESS.key
val DRIVER_HOST_KEY = org.apache.spark.internal.config.DRIVER_HOST_ADDRESS.key
val DRIVER_SVC_POSTFIX = "-driver-svc"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, LocalDirectoryMountConfigurationStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep}
import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, DriverServiceBootstrapStep, InitContainerBootstrapStep, LocalDirectoryMountConfigurationStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep}

private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite {

Expand Down Expand Up @@ -50,7 +50,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
validateStepTypes(
orchestrator,
classOf[BaseDriverConfigurationStep],
classOf[DriverAddressConfigurationStep],
classOf[DriverServiceBootstrapStep],
classOf[DriverKubernetesCredentialsStep],
classOf[DependencyResolutionStep],
classOf[LocalDirectoryMountConfigurationStep])
Expand All @@ -74,7 +74,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
validateStepTypes(
orchestrator,
classOf[BaseDriverConfigurationStep],
classOf[DriverAddressConfigurationStep],
classOf[DriverServiceBootstrapStep],
classOf[DriverKubernetesCredentialsStep],
classOf[DependencyResolutionStep],
classOf[LocalDirectoryMountConfigurationStep],
Expand All @@ -97,7 +97,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
validateStepTypes(
orchestrator,
classOf[BaseDriverConfigurationStep],
classOf[DriverAddressConfigurationStep],
classOf[DriverServiceBootstrapStep],
classOf[DriverKubernetesCredentialsStep],
classOf[DependencyResolutionStep],
classOf[LocalDirectoryMountConfigurationStep],
Expand All @@ -120,7 +120,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
validateStepTypes(
orchestrator,
classOf[BaseDriverConfigurationStep],
classOf[DriverAddressConfigurationStep],
classOf[DriverServiceBootstrapStep],
classOf[DriverKubernetesCredentialsStep],
classOf[DependencyResolutionStep],
classOf[LocalDirectoryMountConfigurationStep],
Expand All @@ -144,7 +144,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
validateStepTypes(
orchestrator,
classOf[BaseDriverConfigurationStep],
classOf[DriverAddressConfigurationStep],
classOf[DriverServiceBootstrapStep],
classOf[DriverKubernetesCredentialsStep],
classOf[DependencyResolutionStep],
classOf[LocalDirectoryMountConfigurationStep],
Expand All @@ -169,7 +169,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
validateStepTypes(
orchestrator,
classOf[BaseDriverConfigurationStep],
classOf[DriverAddressConfigurationStep],
classOf[DriverServiceBootstrapStep],
classOf[DriverKubernetesCredentialsStep],
classOf[DependencyResolutionStep],
classOf[LocalDirectoryMountConfigurationStep],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,30 @@ private[spark] class BaseDriverConfigurationStepSuite extends SparkFunSuite {
driverContainer = new ContainerBuilder().build(),
driverSparkConf = new SparkConf(false),
otherKubernetesResources = Seq.empty[HasMetadata])

val preparedDriverSpec = submissionStep.configureDriver(baseDriverSpec)

assert(preparedDriverSpec.driverContainer.getName === DRIVER_CONTAINER_NAME)
assert(preparedDriverSpec.driverContainer.getImage === "spark-driver:latest")
assert(preparedDriverSpec.driverContainer.getImagePullPolicy === DOCKER_IMAGE_PULL_POLICY)

assert(preparedDriverSpec.driverContainer.getEnv.size === 7)
val envs = preparedDriverSpec.driverContainer
.getEnv
.asScala
.map(env => (env.getName, env.getValue))
.toMap
assert(envs.size === 6)
assert(envs(ENV_SUBMIT_EXTRA_CLASSPATH) === "/opt/spark/spark-examples.jar")
assert(envs(ENV_DRIVER_MEMORY) === "256M")
assert(envs(ENV_DRIVER_MAIN_CLASS) === MAIN_CLASS)
assert(envs(ENV_DRIVER_ARGS) === "arg1 arg2")
assert(envs(DRIVER_CUSTOM_ENV_KEY1) === "customDriverEnv1")
assert(envs(DRIVER_CUSTOM_ENV_KEY2) === "customDriverEnv2")

assert(preparedDriverSpec.driverContainer.getEnv.asScala.exists(envVar =>
envVar.getName.equals(ENV_DRIVER_BIND_ADDRESS) &&
envVar.getValueFrom.getFieldRef.getApiVersion.equals("v1") &&
envVar.getValueFrom.getFieldRef.getFieldPath.equals("status.podIP")))

val resourceRequirements = preparedDriverSpec.driverContainer.getResources
val requests = resourceRequirements.getRequests.asScala
assert(requests("cpu").getAmount === "2")
Expand Down
Loading

0 comments on commit 6b1caca

Please sign in to comment.