Skip to content

Commit

Permalink
Make external shuffle service host configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Oct 21, 2024
1 parent 32232e9 commit 6365ed7
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ public class ExecutorShuffleInfo implements Encodable {
public final int subDirsPerLocalDir;
/**
* Shuffle manager (SortShuffleManager) that the executor is using.
* If this string contains semicolon, it will also include the meta information
* for push based shuffle in JSON format. Example of the string with semicolon would be:
* If this string contains a colon (":"), it will also include the meta information
* for push based shuffle in JSON format. Example of the colon would be:
* SortShuffleManager:{"mergeDir": "mergeDirectory_1", "attemptId": 1}
* The colon delimiter is configured in RemoteBlockPushResolver.SHUFFLE_META_DELIMITER.
*/
public final String shuffleManager;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,8 @@ package object config {
.checkValues(DBBackend.values.map(_.toString).toSet)
.createWithDefault(DBBackend.LEVELDB.name)

private[spark] val SHUFFLE_SERVICE_HOST =
ConfigBuilder("spark.shuffle.service.host").version("4.0.0").stringConf.createOptional
private[spark] val SHUFFLE_SERVICE_PORT =
ConfigBuilder("spark.shuffle.service.port").version("1.2.0").intConf.createWithDefault(7337)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ private[spark] class BlockManager(
private val maxOnHeapMemory = memoryManager.maxOnHeapStorageMemory
private val maxOffHeapMemory = memoryManager.maxOffHeapStorageMemory

private[spark] val externalShuffleServiceHost = StorageUtils.externalShuffleServiceHost(conf)
private[spark] val externalShuffleServicePort = StorageUtils.externalShuffleServicePort(conf)

var blockManagerId: BlockManagerId = _
Expand Down Expand Up @@ -524,8 +525,12 @@ private[spark] class BlockManager(
// the registration with the ESS. Therefore, this registration should be prior to
// the BlockManager registration. See SPARK-39647.
if (externalShuffleServiceEnabled) {
externalShuffleServiceHost.foreach { host =>
logInfo(log"external shuffle service host = $host")
}
logInfo(s"external shuffle service port = $externalShuffleServicePort")
shuffleServerId = BlockManagerId(executorId, blockTransferService.hostName,
shuffleServerId = BlockManagerId(executorId,
externalShuffleServiceHost.getOrElse(blockTransferService.hostName),
externalShuffleServicePort)
if (!isDriver) {
registerWithExternalShuffleServer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,13 @@ private[spark] object StorageUtils extends Logging {
}
}

/**
* Get the host used by the external shuffle service.
*/
def externalShuffleServiceHost(conf: SparkConf): Option[String] = {
conf.getOption(config.SHUFFLE_SERVICE_HOST.key)
}

/**
* Get the port used by the external shuffle service. In Yarn mode, this may be already be
* set through the Hadoop configuration as the server is launched in the Yarn NM.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ private[spark] object KubernetesExecutorBackend extends Logging {

// Create SparkEnv using properties we fetched from the driver.
val driverConf = new SparkConf()
sys.env.get("EXECUTOR_EXTERNAL_SHUFFLE_SERVICE_HOST").foreach { host =>
log.info(s"Using external shuffle service on '$host'")
driverConf.set(SHUFFLE_SERVICE_HOST, host)
}
for ((key, value) <- props) {
// this is required for SSL in standalone mode
if (SparkConf.isExecutorStartupConf(key)) {
Expand Down
56 changes: 56 additions & 0 deletions spark-ess-daemonset.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: spark-external-shuffle-service
labels:
k8s-app: spark-ess
spec:
selector:
matchLabels:
name: spark-ess
template:
metadata:
labels:
name: spark-ess
spec:
nodeSelector:
# run on all nodes with label 'spark'
#name: spark
#kubernetes.io/hostname: minikube-m02
# instead of nodeSelector, use affinity.nodeAffinity

tolerations:
# these tolerations are to have the daemonset runnable on control plane nodes
# remove them if your control plane nodes should not run pods
- key: node-role.kubernetes.io/control-plane
operator: Exists
effect: NoSchedule
- key: node-role.kubernetes.io/master
operator: Exists
effect: NoSchedule
containers:
- name: spark-ess
image: spark:k8s
resources:
limits:
memory: 1024Mi
requests:
cpu: 1
memory: 1024Mi
args: ["/opt/spark/bin/spark-class", "org.apache.spark.deploy.ExternalShuffleService"]
ports:
- containerPort: 7337
hostPort: 7337
name: ess-rpc-port
protocol: TCP
volumeMounts:
- name: shuffle-data
mountPath: /var/data/
# it may be desirable to set a high priority class to ensure that a DaemonSet Pod
# preempts running Pods
#priorityClassName: important
terminationGracePeriodSeconds: 30
volumes:
- name: shuffle-data
hostPath:
path: /tmp/spark-ess/shuffle-data
11 changes: 11 additions & 0 deletions spark-executor-template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
apiVersion: v1
Kind: Pod
spec:
containers:
- name: container
env:
- name: EXECUTOR_EXTERNAL_SHUFFLE_SERVICE_HOST
valueFrom:
fieldRef:
fieldPath: status.hostIP

0 comments on commit 6365ed7

Please sign in to comment.