diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 042179d86c31a..16e416a6d8861 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -240,6 +240,7 @@ class SparkContext(config: SparkConf) extends Logging { private var _shutdownHookRef: AnyRef = _ private var _statusStore: AppStatusStore = _ private var _heartbeater: Heartbeater = _ + private var _driverThreadDumpCollector: ThreadDumpCollector = _ private var _resources: immutable.Map[String, ResourceInformation] = _ private var _shuffleDriverComponents: ShuffleDriverComponents = _ private var _plugins: Option[PluginContainer] = None @@ -613,6 +614,15 @@ class SparkContext(config: SparkConf) extends Logging { conf.get(EXECUTOR_HEARTBEAT_INTERVAL)) _heartbeater.start() + // Create and start the thread dump collector for the Spark driver + if (_conf.get(DRIVER_THREAD_DUMP_COLLECTOR_ENABLED)) { + _driverThreadDumpCollector = new ThreadDumpCollector( + () => Utils.writeThreadDumpToFile(env), + "driver-threadDumpCollector", + conf.get(THREAD_DUMP_COLLECTOR_INTERVAL)) + _driverThreadDumpCollector.start() + } + // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's // constructor _taskScheduler.start() @@ -2375,6 +2385,12 @@ class SparkContext(config: SparkConf) extends Logging { } _heartbeater = null } + if (_conf.get(DRIVER_THREAD_DUMP_COLLECTOR_ENABLED) && _driverThreadDumpCollector != null) { + Utils.tryLogNonFatalError { + _driverThreadDumpCollector.stop() + } + _driverThreadDumpCollector = null + } if (env != null && _heartbeatReceiver != null) { Utils.tryLogNonFatalError { env.rpcEnv.stop(_heartbeatReceiver) diff --git a/core/src/main/scala/org/apache/spark/ThreadDumpCollector.scala b/core/src/main/scala/org/apache/spark/ThreadDumpCollector.scala new file mode 100644 index 0000000000000..49d8363bbb12e --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ThreadDumpCollector.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.TimeUnit + +import org.apache.spark.internal.Logging +import org.apache.spark.util.{ThreadUtils, Utils} + + +/** + * Creates a thread dump collector thread which will call the specified collectThreadDumps + * function at intervals of intervalMs. + * + * @param collectThreadDumps the thread dump collector function to call. + * @param name the thread name for the thread dump collector. + * @param intervalMs the interval between stack trace collections. + */ +private[spark] class ThreadDumpCollector( + collectThreadDumps: () => Unit, + name: String, + intervalMs: Long) extends Logging { + // Executor for the thread collector task + private val threadDumpCollector = ThreadUtils.newDaemonSingleThreadScheduledExecutor(name) + + /** Schedules a task to collect the thread dumps */ + def start(): Unit = { + val threadDumpCollectorTask = new Runnable() { + override def run(): Unit = Utils.logUncaughtExceptions(collectThreadDumps()) + } + threadDumpCollector.scheduleAtFixedRate(threadDumpCollectorTask, intervalMs, intervalMs, + TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { + threadDumpCollector.shutdown() + threadDumpCollector.awaitTermination(10, TimeUnit.SECONDS) + } +} diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 0f0e7cb3d8974..f78aba72fd13b 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -89,6 +89,8 @@ private[spark] class Executor( log"${LogMDC(OS_ARCH, System.getProperty("os.arch"))}") logInfo(log"Java version ${LogMDC(JAVA_VERSION, System.getProperty("java.version"))}") + private var executorThreadDumpCollector: ThreadDumpCollector = _ + private val executorShutdown = new AtomicBoolean(false) val stopHookReference = ShutdownHookManager.addShutdownHook( () => stop() @@ -325,6 +327,15 @@ private[spark] class Executor( heartbeater.start() + // Create and start the thread dump collector for the Spark executor + if (conf.get(EXECUTOR_THREAD_DUMP_COLLECTOR_ENABLED)) { + executorThreadDumpCollector = new ThreadDumpCollector( + () => Utils.writeThreadDumpToFile(env), + "executor-ThreadDumpCollector", + conf.get(THREAD_DUMP_COLLECTOR_INTERVAL)) + executorThreadDumpCollector.start() + } + private val appStartTime = conf.getLong("spark.app.startTime", 0) // To allow users to distribute plugins and their required files @@ -445,6 +456,15 @@ private[spark] class Executor( case NonFatal(e) => logWarning("Unable to stop heartbeater", e) } + try { + if (conf.get(EXECUTOR_THREAD_DUMP_COLLECTOR_ENABLED) && executorThreadDumpCollector != + null) { + executorThreadDumpCollector.stop() + } + } catch { + case NonFatal(e) => + logWarning("Unable to stop the executor thread dump collector", e) + } ShuffleBlockPusher.stop() if (threadPool != null) { threadPool.shutdown() diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index c5646d2956aeb..5ac4ae1182b84 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2801,4 +2801,27 @@ package object config { .version("4.0.0") .timeConf(TimeUnit.MILLISECONDS) .createOptional + + private[spark] val DRIVER_THREAD_DUMP_COLLECTOR_ENABLED = ConfigBuilder("spark.driver" + + ".threadDumpCollector.enabled") + .version("4.0.0") + .booleanConf + .createWithDefault(false) + + private[spark] val EXECUTOR_THREAD_DUMP_COLLECTOR_ENABLED = ConfigBuilder("spark.executor" + + ".threadDumpCollector.enabled") + .version("4.0.0") + .booleanConf + .createWithDefault(false) + + private[spark] val THREAD_DUMP_COLLECTOR_INTERVAL = + ConfigBuilder("spark.threadDumpCollectorInterval") + .version("4.0.0") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("10s") + + private[spark] val THREAD_DUMP_COLLECTOR_DIR = ConfigBuilder("spark.threadDumpCollector.dir") + .version("4.0.0") + .stringConf + .createWithDefault("/tmp/spark-thread-dumps") } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 5703128aacbb9..0effb079ecdae 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -28,6 +28,7 @@ import java.nio.channels.Channels import java.nio.charset.StandardCharsets import java.nio.file.Files import java.security.SecureRandom +import java.time.Instant import java.util.{Locale, Properties, Random, UUID} import java.util.concurrent._ import java.util.concurrent.TimeUnit.NANOSECONDS @@ -53,7 +54,8 @@ import org.apache.commons.codec.binary.Hex import org.apache.commons.io.IOUtils import org.apache.commons.lang3.{JavaVersion, SystemUtils} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} +import org.apache.hadoop.fs.{FileSystem, FileUtil, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec} import org.apache.hadoop.ipc.{CallerContext => HadoopCallerContext} import org.apache.hadoop.ipc.CallerContext.{Builder => HadoopCallerContextBuilder} @@ -2112,6 +2114,31 @@ private[spark] object Utils }.map(threadInfoToThreadStackTrace) } + def writeThreadDumpToFile(env: SparkEnv): Unit = { + val collectedThreadDump = getThreadDump().map(_.toString).mkString + val hadoopConf = SparkHadoopUtil.get.newConfiguration(env.conf) + val rootDir = env.conf.get(THREAD_DUMP_COLLECTOR_DIR) + var outputStream: FSDataOutputStream = null + val fileSystem: FileSystem = new Path(rootDir).getFileSystem(hadoopConf) + val threadDumpFilePermissions = new FsPermission(Integer.parseInt("770", 8).toShort) + val timestamp = Instant.now().getEpochSecond().toString() + val threadDumpFileName = env.conf.getAppId + "-" + env.executorId + "-" + timestamp + ".txt" + val dfsLogFile: Path = fileSystem.makeQualified(new Path(rootDir, threadDumpFileName)) + try { + outputStream = SparkHadoopUtil.createFile(fileSystem, dfsLogFile, true) + fileSystem.setPermission(dfsLogFile, threadDumpFilePermissions) + outputStream.write(collectedThreadDump.getBytes(StandardCharsets.UTF_8)) + outputStream.close() + } catch { + case e: Exception => + logError( + log"Could not collect thread dumps from executor ${ + MDC(LogKeys.EXECUTOR_ID, + env.executorId) + }", e) + } + } + /** Return a heap dump. Used to capture dumps for the web UI */ def getHeapHistogram(): Array[String] = { val pid = String.valueOf(ProcessHandle.current().pid())