Skip to content

Commit

Permalink
support stage isolated profile
Browse files Browse the repository at this point in the history
  • Loading branch information
wangjunbo committed Mar 29, 2024
1 parent ad8ac17 commit b03a481
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.executor.profiler

import java.io.{BufferedInputStream, FileInputStream, InputStream, IOException}
import java.io.{BufferedInputStream, File, FileInputStream, InputStream, IOException}
import java.net.URI
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}

Expand All @@ -34,12 +34,13 @@ import org.apache.spark.util.ThreadUtils
*/
private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) extends Logging {

private var running = false
private val enableProfiler = conf.get(EXECUTOR_PROFILING_ENABLED)
@volatile private var running = false
@volatile private var stageId: Int = _
private val profilerOptions = conf.get(EXECUTOR_PROFILING_OPTIONS)
private val profilerDfsDir = conf.get(EXECUTOR_PROFILING_DFS_DIR)
private val profilerLocalDir = conf.get(EXECUTOR_PROFILING_LOCAL_DIR)
private val writeInterval = conf.get(EXECUTOR_PROFILING_WRITE_INTERVAL)
private val codeProfilingStageIsolated = conf.get(EXECUTOR_PROFILING_STAGE_ISOLATED)

private val startcmd = s"start,$profilerOptions,file=$profilerLocalDir/profile.jfr"
private val stopcmd = s"stop,$profilerOptions,file=$profilerLocalDir/profile.jfr"
Expand All @@ -55,7 +56,7 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex

val profiler: Option[AsyncProfiler] = {
Option(
if (enableProfiler && AsyncProfilerLoader.isSupported) AsyncProfilerLoader.load() else null
if (AsyncProfilerLoader.isSupported) AsyncProfilerLoader.load() else null
)
}

Expand All @@ -76,6 +77,17 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex
}
}

def reStart(id: Int): Unit = {
if (!running && id != this.stageId) {
val tempFile = new File(s"$profilerLocalDir/profile.jfr")
if (tempFile.exists()) {
tempFile.delete()
}
this.stageId = id;
start()
}
}

/** Stops the profiling and saves output to dfs location. */
def stop(): Unit = {
if (running) {
Expand All @@ -99,8 +111,11 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex
val appName = conf.get("spark.app.name").replace(" ", "-")
val profilerOutputDirname = profilerDfsDir.get

val profileOutputFile =
val profileOutputFile = if (codeProfilingStageIsolated) {
s"$profilerOutputDirname/$applicationId/profile-$appName-exec-$executorId-$stageId.jfr"
} else {
s"$profilerOutputDirname/$applicationId/profile-$appName-exec-$executorId.jfr"
}
val fs = FileSystem.get(new URI(profileOutputFile), config);
val filenamePath = new Path(profileOutputFile)
outputStream = fs.create(filenamePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.{Map => JMap}
import scala.jdk.CollectionConverters._
import scala.util.Random

import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, TaskContext, TaskFailedReason}
import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin}
import org.apache.spark.internal.Logging

Expand All @@ -42,24 +42,50 @@ class JVMProfilerExecutorPlugin extends ExecutorPlugin with Logging {
private var pluginCtx: PluginContext = _
private var profiler: ExecutorJVMProfiler = _
private var codeProfilingEnabled: Boolean = _
private var codeProfilingStageIsolated: Boolean = _
private var codeProfilingFraction: Double = _
private val rand: Random = new Random(System.currentTimeMillis())

override def init(ctx: PluginContext, extraConf: JMap[String, String]): Unit = {
pluginCtx = ctx
sparkConf = ctx.conf()
codeProfilingEnabled = sparkConf.get(EXECUTOR_PROFILING_ENABLED)
if (codeProfilingEnabled) {
codeProfilingStageIsolated = sparkConf.get(EXECUTOR_PROFILING_STAGE_ISOLATED)
if (codeProfilingEnabled ) {
codeProfilingFraction = sparkConf.get(EXECUTOR_PROFILING_FRACTION)
if (rand.nextInt(100) * 0.01 < codeProfilingFraction) {
logInfo(s"Executor id ${pluginCtx.executorID()} selected for JVM code profiling")
profiler = new ExecutorJVMProfiler(sparkConf, pluginCtx.executorID())
profiler.start()
if (!codeProfilingStageIsolated) {
profiler.start()
}
}
}
Map.empty[String, String].asJava
}

override def onTaskStart(): Unit = {
if (codeProfilingStageIsolated) {
val task = TaskContext.get()
if (task != null) {
val stageId = task.stageId();
profiler.reStart(stageId)
}
}
}

override def onTaskFailed(failureReason: TaskFailedReason): Unit = {
if (codeProfilingStageIsolated) {
onTaskSucceeded()
}
}

override def onTaskSucceeded(): Unit = {
if (codeProfilingStageIsolated) {
profiler.stop()
}
}

override def shutdown(): Unit = {
logInfo("Executor JVM profiler shutting down")
if (profiler != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ package object profiler {
.booleanConf
.createWithDefault(false)

private[profiler] val EXECUTOR_PROFILING_STAGE_ISOLATED =
ConfigBuilder("spark.executor.profiling.stage.isolated")
.doc("Turn on code profiling task isolated in executor.")
.version("4.0.0")
.booleanConf
.createWithDefault(false)

private[profiler] val EXECUTOR_PROFILING_DFS_DIR =
ConfigBuilder("spark.executor.profiling.dfsDir")
.doc("HDFS compatible file-system path to where the profiler will write output jfr files.")
Expand Down

0 comments on commit b03a481

Please sign in to comment.