diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala index 80c52a93dfc31..26cdcaa7f67c8 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala @@ -54,6 +54,7 @@ package object constants { // Environment Variables private[spark] val ENV_EXECUTOR_PORT = "SPARK_EXECUTOR_PORT" private[spark] val ENV_DRIVER_URL = "SPARK_DRIVER_URL" + private[spark] val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS" private[spark] val ENV_EXECUTOR_CORES = "SPARK_EXECUTOR_CORES" private[spark] val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY" private[spark] val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala index 70fd9b9a17707..232aca4f2f55c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala @@ -20,7 +20,7 @@ import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.ConfigurationUtils import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.constants._ -import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep} +import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, DriverServiceBootstrapStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep} import org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator import org.apache.spark.deploy.k8s.submit.submitsteps.LocalDirectoryMountConfigurationStep import org.apache.spark.launcher.SparkLauncher @@ -103,7 +103,7 @@ private[spark] class DriverConfigurationStepsOrchestrator( mainClass, appArgs, submissionSparkConf) - val driverAddressStep = new DriverAddressConfigurationStep( + val driverAddressStep = new DriverServiceBootstrapStep( kubernetesResourceNamePrefix, allDriverLabels, submissionSparkConf, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/BaseDriverConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/BaseDriverConfigurationStep.scala index 563662ef80d11..e08e44d608de0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/BaseDriverConfigurationStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/BaseDriverConfigurationStep.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.deploy.k8s.submit.submitsteps -import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, PodBuilder, QuantityBuilder} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder} import scala.collection.JavaConverters._ import org.apache.spark.SparkConf @@ -114,6 +114,12 @@ private[spark] class BaseDriverConfigurationStep( .withName(ENV_DRIVER_ARGS) .withValue(appArgs.mkString(" ")) .endEnv() + .addNewEnv() + .withName(ENV_DRIVER_BIND_ADDRESS) + .withValueFrom(new EnvVarSourceBuilder() + .withNewFieldRef("v1", "status.podIP") + .build()) + .endEnv() .withNewResources() .addToRequests("cpu", driverCpuQuantity) .addToRequests("memory", driverMemoryQuantity) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/DriverAddressConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/DriverServiceBootstrapStep.scala similarity index 80% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/DriverAddressConfigurationStep.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/DriverServiceBootstrapStep.scala index 615261e80f302..1ee1851c61e6c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/DriverAddressConfigurationStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/DriverServiceBootstrapStep.scala @@ -29,20 +29,20 @@ import org.apache.spark.util.Clock * Allows the driver to be reachable by executor pods through a headless service. The service's * ports should correspond to the ports that the executor will reach the pod at for RPC. */ -private[spark] class DriverAddressConfigurationStep( +private[spark] class DriverServiceBootstrapStep( kubernetesResourceNamePrefix: String, driverLabels: Map[String, String], submissionSparkConf: SparkConf, clock: Clock) extends DriverConfigurationStep with Logging { - import DriverAddressConfigurationStep._ + import DriverServiceBootstrapStep._ override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { require(submissionSparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty, - s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as the driver's hostname" + - s" will be managed via a Kubernetes service.") + s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as the driver's bind" + + s" address is managed and set to the driver pod's IP address.") require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty, - s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be" + - s" managed via a Kubernetes service.") + s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be" + + s" managed via a Kubernetes service.") val preferredServiceName = s"$kubernetesResourceNamePrefix$DRIVER_SVC_POSTFIX" val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) { @@ -51,8 +51,8 @@ private[spark] class DriverAddressConfigurationStep( val randomServiceId = clock.getTimeMillis() val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX" logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is" + - s" too long (must be <= 63 characters). Falling back to use $shorterServiceName" + - s" as the driver service's name.") + s" too long (must be <= 63 characters). Falling back to use $shorterServiceName" + + s" as the driver service's name.") shorterServiceName } @@ -82,19 +82,18 @@ private[spark] class DriverAddressConfigurationStep( val namespace = submissionSparkConf.get(KUBERNETES_NAMESPACE) val driverHostname = s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local" val resolvedSparkConf = driverSpec.driverSparkConf.clone() - .set(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS, driverHostname) .set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, driverHostname) .set("spark.driver.port", driverPort.toString) .set( - org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, driverBlockManagerPort) + org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, driverBlockManagerPort) driverSpec.copy( - driverSparkConf = resolvedSparkConf, - otherKubernetesResources = driverSpec.otherKubernetesResources ++ Seq(driverService)) + driverSparkConf = resolvedSparkConf, + otherKubernetesResources = driverSpec.otherKubernetesResources ++ Seq(driverService)) } } -private[spark] object DriverAddressConfigurationStep { +private[spark] object DriverServiceBootstrapStep { val DRIVER_BIND_ADDRESS_KEY = org.apache.spark.internal.config.DRIVER_BIND_ADDRESS.key val DRIVER_HOST_KEY = org.apache.spark.internal.config.DRIVER_HOST_ADDRESS.key val DRIVER_SVC_POSTFIX = "-driver-svc" diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala index cd80bc53b2211..7af6613fcc9b3 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.config._ -import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, LocalDirectoryMountConfigurationStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep} +import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, DriverServiceBootstrapStep, InitContainerBootstrapStep, LocalDirectoryMountConfigurationStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep} private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite { @@ -50,7 +50,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS validateStepTypes( orchestrator, classOf[BaseDriverConfigurationStep], - classOf[DriverAddressConfigurationStep], + classOf[DriverServiceBootstrapStep], classOf[DriverKubernetesCredentialsStep], classOf[DependencyResolutionStep], classOf[LocalDirectoryMountConfigurationStep]) @@ -74,7 +74,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS validateStepTypes( orchestrator, classOf[BaseDriverConfigurationStep], - classOf[DriverAddressConfigurationStep], + classOf[DriverServiceBootstrapStep], classOf[DriverKubernetesCredentialsStep], classOf[DependencyResolutionStep], classOf[LocalDirectoryMountConfigurationStep], @@ -97,7 +97,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS validateStepTypes( orchestrator, classOf[BaseDriverConfigurationStep], - classOf[DriverAddressConfigurationStep], + classOf[DriverServiceBootstrapStep], classOf[DriverKubernetesCredentialsStep], classOf[DependencyResolutionStep], classOf[LocalDirectoryMountConfigurationStep], @@ -120,7 +120,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS validateStepTypes( orchestrator, classOf[BaseDriverConfigurationStep], - classOf[DriverAddressConfigurationStep], + classOf[DriverServiceBootstrapStep], classOf[DriverKubernetesCredentialsStep], classOf[DependencyResolutionStep], classOf[LocalDirectoryMountConfigurationStep], @@ -144,7 +144,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS validateStepTypes( orchestrator, classOf[BaseDriverConfigurationStep], - classOf[DriverAddressConfigurationStep], + classOf[DriverServiceBootstrapStep], classOf[DriverKubernetesCredentialsStep], classOf[DependencyResolutionStep], classOf[LocalDirectoryMountConfigurationStep], @@ -169,7 +169,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS validateStepTypes( orchestrator, classOf[BaseDriverConfigurationStep], - classOf[DriverAddressConfigurationStep], + classOf[DriverServiceBootstrapStep], classOf[DriverKubernetesCredentialsStep], classOf[DependencyResolutionStep], classOf[LocalDirectoryMountConfigurationStep], diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/BaseDriverConfigurationStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/BaseDriverConfigurationStepSuite.scala index 330d8e87a9fe9..f67b18789f4c4 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/BaseDriverConfigurationStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/BaseDriverConfigurationStepSuite.scala @@ -65,23 +65,30 @@ private[spark] class BaseDriverConfigurationStepSuite extends SparkFunSuite { driverContainer = new ContainerBuilder().build(), driverSparkConf = new SparkConf(false), otherKubernetesResources = Seq.empty[HasMetadata]) - val preparedDriverSpec = submissionStep.configureDriver(baseDriverSpec) + assert(preparedDriverSpec.driverContainer.getName === DRIVER_CONTAINER_NAME) assert(preparedDriverSpec.driverContainer.getImage === "spark-driver:latest") assert(preparedDriverSpec.driverContainer.getImagePullPolicy === DOCKER_IMAGE_PULL_POLICY) + + assert(preparedDriverSpec.driverContainer.getEnv.size === 7) val envs = preparedDriverSpec.driverContainer .getEnv .asScala .map(env => (env.getName, env.getValue)) .toMap - assert(envs.size === 6) assert(envs(ENV_SUBMIT_EXTRA_CLASSPATH) === "/opt/spark/spark-examples.jar") assert(envs(ENV_DRIVER_MEMORY) === "256M") assert(envs(ENV_DRIVER_MAIN_CLASS) === MAIN_CLASS) assert(envs(ENV_DRIVER_ARGS) === "arg1 arg2") assert(envs(DRIVER_CUSTOM_ENV_KEY1) === "customDriverEnv1") assert(envs(DRIVER_CUSTOM_ENV_KEY2) === "customDriverEnv2") + + assert(preparedDriverSpec.driverContainer.getEnv.asScala.exists(envVar => + envVar.getName.equals(ENV_DRIVER_BIND_ADDRESS) && + envVar.getValueFrom.getFieldRef.getApiVersion.equals("v1") && + envVar.getValueFrom.getFieldRef.getFieldPath.equals("status.podIP"))) + val resourceRequirements = preparedDriverSpec.driverContainer.getResources val requests = resourceRequirements.getRequests.asScala assert(requests("cpu").getAmount === "2") diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/DriverAddressConfigurationStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/DriverServiceBootstrapStepSuite.scala similarity index 64% rename from resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/DriverAddressConfigurationStepSuite.scala rename to resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/DriverServiceBootstrapStepSuite.scala index 2b9570fdd625c..7359017cc7806 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/DriverAddressConfigurationStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/DriverServiceBootstrapStepSuite.scala @@ -27,16 +27,16 @@ import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.constants._ import org.apache.spark.util.Clock -private[spark] class DriverAddressConfigurationStepSuite +private[spark] class DriverServiceBootstrapStepSuite extends SparkFunSuite with BeforeAndAfter { private val SHORT_RESOURCE_NAME_PREFIX = - "a" * (DriverAddressConfigurationStep.MAX_SERVICE_NAME_LENGTH - - DriverAddressConfigurationStep.DRIVER_SVC_POSTFIX.length) + "a" * (DriverServiceBootstrapStep.MAX_SERVICE_NAME_LENGTH - + DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX.length) private val LONG_RESOURCE_NAME_PREFIX = - "a" * (DriverAddressConfigurationStep.MAX_SERVICE_NAME_LENGTH - - DriverAddressConfigurationStep.DRIVER_SVC_POSTFIX.length + 1) + "a" * (DriverServiceBootstrapStep.MAX_SERVICE_NAME_LENGTH - + DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX.length + 1) private val DRIVER_LABELS = Map( "label1key" -> "label1value", "label2key" -> "label2value") @@ -52,92 +52,92 @@ private[spark] class DriverAddressConfigurationStepSuite } test("Headless service has a port for the driver RPC and the block manager.") { - val configurationStep = new DriverAddressConfigurationStep( - SHORT_RESOURCE_NAME_PREFIX, - DRIVER_LABELS, - sparkConf - .set("spark.driver.port", "9000") - .set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080), - clock) + val configurationStep = new DriverServiceBootstrapStep( + SHORT_RESOURCE_NAME_PREFIX, + DRIVER_LABELS, + sparkConf + .set("spark.driver.port", "9000") + .set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080), + clock) val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone()) val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec) assert(resolvedDriverSpec.otherKubernetesResources.size === 1) assert(resolvedDriverSpec.otherKubernetesResources.head.isInstanceOf[Service]) val driverService = resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service] verifyService( - 9000, - 8080, - s"$SHORT_RESOURCE_NAME_PREFIX${DriverAddressConfigurationStep.DRIVER_SVC_POSTFIX}", - driverService) + 9000, + 8080, + s"$SHORT_RESOURCE_NAME_PREFIX${DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX}", + driverService) } test("Hostname and ports are set according to the service name.") { - val configurationStep = new DriverAddressConfigurationStep( - SHORT_RESOURCE_NAME_PREFIX, - DRIVER_LABELS, - sparkConf - .set("spark.driver.port", "9000") - .set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080) - .set(KUBERNETES_NAMESPACE, "my-namespace"), - clock) + val configurationStep = new DriverServiceBootstrapStep( + SHORT_RESOURCE_NAME_PREFIX, + DRIVER_LABELS, + sparkConf + .set("spark.driver.port", "9000") + .set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080) + .set(KUBERNETES_NAMESPACE, "my-namespace"), + clock) val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone()) val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec) val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX + - DriverAddressConfigurationStep.DRIVER_SVC_POSTFIX + DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX val expectedHostName = s"$expectedServiceName.my-namespace.svc.cluster.local" verifySparkConfHostNames(resolvedDriverSpec.driverSparkConf, expectedHostName) } test("Ports should resolve to defaults in SparkConf and in the service.") { - val configurationStep = new DriverAddressConfigurationStep( - SHORT_RESOURCE_NAME_PREFIX, - DRIVER_LABELS, - sparkConf, - clock) + val configurationStep = new DriverServiceBootstrapStep( + SHORT_RESOURCE_NAME_PREFIX, + DRIVER_LABELS, + sparkConf, + clock) val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone()) val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec) verifyService( - DEFAULT_DRIVER_PORT, - DEFAULT_BLOCKMANAGER_PORT, - s"$SHORT_RESOURCE_NAME_PREFIX${DriverAddressConfigurationStep.DRIVER_SVC_POSTFIX}", - resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service]) + DEFAULT_DRIVER_PORT, + DEFAULT_BLOCKMANAGER_PORT, + s"$SHORT_RESOURCE_NAME_PREFIX${DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX}", + resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service]) assert(resolvedDriverSpec.driverSparkConf.get("spark.driver.port") === - DEFAULT_DRIVER_PORT.toString) + DEFAULT_DRIVER_PORT.toString) assert(resolvedDriverSpec.driverSparkConf.get( - org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT) === DEFAULT_BLOCKMANAGER_PORT) + org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT) === DEFAULT_BLOCKMANAGER_PORT) } test("Long prefixes should switch to using a generated name.") { - val configurationStep = new DriverAddressConfigurationStep( - LONG_RESOURCE_NAME_PREFIX, - DRIVER_LABELS, - sparkConf.set(KUBERNETES_NAMESPACE, "my-namespace"), - clock) + val configurationStep = new DriverServiceBootstrapStep( + LONG_RESOURCE_NAME_PREFIX, + DRIVER_LABELS, + sparkConf.set(KUBERNETES_NAMESPACE, "my-namespace"), + clock) when(clock.getTimeMillis()).thenReturn(10000) val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone()) val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec) val driverService = resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service] - val expectedServiceName = s"spark-10000${DriverAddressConfigurationStep.DRIVER_SVC_POSTFIX}" + val expectedServiceName = s"spark-10000${DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX}" assert(driverService.getMetadata.getName === expectedServiceName) val expectedHostName = s"$expectedServiceName.my-namespace.svc.cluster.local" verifySparkConfHostNames(resolvedDriverSpec.driverSparkConf, expectedHostName) } test("Disallow bind address and driver host to be set explicitly.") { - val configurationStep = new DriverAddressConfigurationStep( - LONG_RESOURCE_NAME_PREFIX, - DRIVER_LABELS, - sparkConf.set(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS, "host"), - clock) + val configurationStep = new DriverServiceBootstrapStep( + LONG_RESOURCE_NAME_PREFIX, + DRIVER_LABELS, + sparkConf.set(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS, "host"), + clock) try { configurationStep.configureDriver(KubernetesDriverSpec.initialSpec(sparkConf)) fail("The driver bind address should not be allowed.") } catch { case e: Throwable => assert(e.getMessage === - s"requirement failed: ${DriverAddressConfigurationStep.DRIVER_BIND_ADDRESS_KEY} is" + - s" not supported in Kubernetes mode, as the driver's hostname will be managed via" + - s" a Kubernetes service.") + s"requirement failed: ${DriverServiceBootstrapStep.DRIVER_BIND_ADDRESS_KEY} is" + + s" not supported in Kubernetes mode, as the driver's bind address is managed" + + s" and set to the driver pod's IP address.") } sparkConf.remove(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS) sparkConf.set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, "host") @@ -147,9 +147,9 @@ private[spark] class DriverAddressConfigurationStepSuite } catch { case e: Throwable => assert(e.getMessage === - s"requirement failed: ${DriverAddressConfigurationStep.DRIVER_HOST_KEY} is" + - s" not supported in Kubernetes mode, as the driver's hostname will be managed via" + - s" a Kubernetes service.") + s"requirement failed: ${DriverServiceBootstrapStep.DRIVER_HOST_KEY} is" + + s" not supported in Kubernetes mode, as the driver's hostname will be managed via" + + s" a Kubernetes service.") } } @@ -174,8 +174,6 @@ private[spark] class DriverAddressConfigurationStepSuite private def verifySparkConfHostNames( driverSparkConf: SparkConf, expectedHostName: String): Unit = { assert(driverSparkConf.get( - org.apache.spark.internal.config.DRIVER_HOST_ADDRESS) === expectedHostName) - assert(driverSparkConf.get( - org.apache.spark.internal.config.DRIVER_BIND_ADDRESS) === expectedHostName) + org.apache.spark.internal.config.DRIVER_HOST_ADDRESS) === expectedHostName) } } diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-py/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-py/Dockerfile index 7b1effa911f19..e3340bfa05c7f 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-py/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-py/Dockerfile @@ -46,4 +46,4 @@ CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \ - ${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp $SPARK_CLASSPATH -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY $SPARK_DRIVER_CLASS $PYSPARK_PRIMARY $PYSPARK_FILES $SPARK_DRIVER_ARGS + ${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp $SPARK_CLASSPATH -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY -Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS $PYSPARK_PRIMARY $PYSPARK_FILES $SPARK_DRIVER_ARGS diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-r/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-r/Dockerfile index 26b1231ae2ec9..85aa3fc197bfd 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-r/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-r/Dockerfile @@ -36,4 +36,4 @@ CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \ - ${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp $SPARK_CLASSPATH -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY $SPARK_DRIVER_CLASS $R_FILE $SPARK_DRIVER_ARGS + ${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp $SPARK_CLASSPATH -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY -Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS $R_FILE $SPARK_DRIVER_ARGS diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile index 26d1d805fde2b..17d2f60983d63 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile @@ -31,4 +31,4 @@ CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \ - ${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp $SPARK_CLASSPATH -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY $SPARK_DRIVER_CLASS $SPARK_DRIVER_ARGS + ${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp $SPARK_CLASSPATH -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY -Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS $SPARK_DRIVER_ARGS