Skip to content

Commit

Permalink
Add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Mar 26, 2024
1 parent 17f2fd1 commit 5850c38
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
"LOG_FILES" -> "log",
"APP_ID" -> applicationId(),
"KUBERNETES_NAMESPACE" -> namespace,
"KUBERNETES_POD_NAME" -> System.getenv(ENV_DRIVER_POD_NAME)
"KUBERNETES_POD_NAME" -> Option(System.getenv(ENV_DRIVER_POD_NAME)).getOrElse("[null]")
))

override def start(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.internal.config.UI
import org.apache.spark.resource.{ResourceProfile, ResourceProfileManager}
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.{ExecutorKilled, LiveListenerBus, TaskSchedulerImpl}
Expand All @@ -47,6 +48,11 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
.set("spark.app.id", TEST_SPARK_APP_ID)
.set(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL.key, "soLong")
.set(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL_VALUE.key, "cruelWorld")
.set(
UI.CUSTOM_DRIVER_LOG_URL.key,
"https://my-custom.url/api/logs?applicationId={{APP_ID}}&namespace={{KUBERNETES_NAMESPACE}}" +
"&pod_name={{KUBERNETES_POD_NAME}}"
)

@Mock
private var sc: SparkContext = _
Expand Down Expand Up @@ -259,4 +265,52 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
endpoint.receiveAndReply(context).apply(GenerateExecID("cheeseBurger"))
verify(context).reply("1")
}

test("Driver attributes") {
assert(schedulerBackendUnderTest.getDriverAttributes === Some(Map(
"LOG_FILES" -> "log",
"APP_ID" -> "spark-app-id",
"KUBERNETES_NAMESPACE" -> "default",
"KUBERNETES_POD_NAME" -> "[null]"
)))
withEnvs(ENV_DRIVER_POD_NAME -> "pod.name") {
assert(schedulerBackendUnderTest.getDriverAttributes === Some(Map(
"LOG_FILES" -> "log",
"APP_ID" -> "spark-app-id",
"KUBERNETES_NAMESPACE" -> "default",
"KUBERNETES_POD_NAME" -> "pod.name"
)))
}
}

test("Driver log urls") {
assert(schedulerBackendUnderTest.getDriverLogUrls === Some(Map(
"log" -> ("https://my-custom.url/api/logs?applicationId=spark-app-id&namespace=default" +
"&pod_name=[null]")
)))
withEnvs(ENV_DRIVER_POD_NAME -> "pod.name") {
assert(schedulerBackendUnderTest.getDriverLogUrls === Some(Map(
"log" -> ("https://my-custom.url/api/logs?applicationId=spark-app-id&namespace=default" +
"&pod_name=pod.name")
)))
}
}

private def withEnvs(pairs: (String, String)*)(f: => Unit): Unit = {
val readonlyEnv = System.getenv()
val field = readonlyEnv.getClass.getDeclaredField("m")
field.setAccessible(true)
val modifiableEnv = field.get(readonlyEnv).asInstanceOf[java.util.Map[String, String]]
try {
for ((k, v) <- pairs) {
assert(!modifiableEnv.containsKey(k))
modifiableEnv.put(k, v)
}
f
} finally {
for ((k, _) <- pairs) {
modifiableEnv.remove(k)
}
}
}
}

0 comments on commit 5850c38

Please sign in to comment.