Use sparkMeasure in flight recording mode to instrument Spark applications without touching their code.
Flight recorder mode attaches a Spark Listener to the Spark Context, which takes care of
collecting execution metrics while the application runs and of saving them for later processing.
There are two different levels of granularity for metrics collection:
stage aggregation, with FlightRecorderStageMetrics, and task-level metrics with FlightRecorderTaskMetrics.
The collected metrics data can be saved to filesystem for later processing. The available file sinks are
locally mounted filesystems and Hadoop-compatible filesystems (HDFS, s3, etc).
Metrics can also be printed to stdout.
To record metrics at the stage execution level granularity add these configurations to spark-submit:
--packages ch.cern.sparkmeasure:spark-measure_2.12:0.23
--conf spark.extraListeners=ch.cern.sparkmeasure.FlightRecorderStageMetrics
Use this to record metrics at the task execution level granularity.
This can potentially generate large amounts of data in the driver.
Consider using stage-level granularity first.
The usage is almost the same as for the stage metrics mode described above, just change
FlightRecorderStageMetrics
with FlightRecorderTaskMetrics
.
The configuration parameters applicable to Flight recorder mode for Task granularity are:
--packages ch.cern.sparkmeasure:spark-measure_2.12:0.23
--conf spark.extraListeners=ch.cern.sparkmeasure.FlightRecorderTaskMetrics
Configuration parameters for the flight recorder more:
--conf spark.sparkmeasure.outputFormat=<format>
Note: valid values: json,json_to_hadoop,java the default is "json"
--conf spark.sparkmeasure.outputFilename=<output file>
Note: default = "/tmp/stageMetrics_flightRecorder"
--conf spark.sparkmeasure.printToStdout=<true|false> /
Note: default is false. Set this to true to print JSON serialized metrics for debug purposes.
Notes:
json
andjava
serialization formats, write to the driver local filesystemjson_to_hadoop
, writes to JSON serialized metrics to HDFS or to an Hadoop compliant filesystem, such as S3A- The amount of data generated by FlightRecorderStageMetrics is relatively small in most applications: O(number_of_stages)
- FlightRecorderTaskMetrics can generate a large amount of data O(Number_of_tasks), use with care
A Python example
- This runs the pi.py example script
- collects and saves the metrics to
/tmp/stageMetrics_flightRecorder
in json format:
bin/spark-submit --master local[*] --packages ch.cern.sparkmeasure:spark-measure_2.12:0.23 \
--conf spark.extraListeners=ch.cern.sparkmeasure.FlightRecorderStageMetrics \
examples/src/main/python/pi.py
# Read the flight recorder output file:
more /tmp/stageMetrics_flightRecorder
A Scala example
- same example as above, in addition use a custom output filename
- print metrics also to stdout
bin/spark-submit --master local[*] --packages ch.cern.sparkmeasure:spark-measure_2.12:0.23 \
--class org.apache.spark.examples.SparkPi \
--conf spark.extraListeners=ch.cern.sparkmeasure.FlightRecorderStageMetrics \
--conf spark.sparkmeasure.printToStdout=true \
--conf spark.sparkmeasure.outputFilename="/tmp/myoutput_$(date +%s).json" \
examples/jars/spark-examples_2.12-3.3.1.jar 10
# The metrics are printed on stdout and also saved to a file
# Find and read the flight recorder output file:
ls -ltr /tmp/myoutput*.json
Example of how to write the metrics output file to HDFS when running in cluster mode on YARN.
This example collected metrics with Task granularity.
(note: source the Hadoop environment before running this)
bin/spark-submit --master yarn --deploy-mode cluster \
--packages ch.cern.sparkmeasure:spark-measure_2.12:0.24 \
--conf spark.extraListeners=ch.cern.sparkmeasure.FlightRecorderTaskMetrics \
--conf spark.sparkmeasure.outputFormat=json_to_hadoop \
--conf spark.sparkmeasure.outputFilename="hdfs://myclustername/user/luca/test/myoutput_$(date +%s).json" \
examples/src/main/python/pi.py
# Find the output file in HDFS
hdfs dfs -ls <path>/myoutput_*.json
Example, use spark-3.3.0, Kubernetes, Scala 2.12 and write output to S3:
(note: export KUBECONFIG=... + setup Hadoop environment + configure s3a keys in the script)
bin/spark-submit --master k8s://https://XXX.XXX.XXX.XXX --deploy-mode client --conf spark.executor.instances=3 \
--conf spark.executor.cores=2 --executor-memory 6g --driver-memory 8g \
--conf spark.kubernetes.container.image=<registry-URL>/spark:v3.0.0_20190529_hadoop32 \
--packages org.apache.hadoop:hadoop-aws:3.3.2,ch.cern.sparkmeasure:spark-measure_2.12:0.24 \
--conf spark.hadoop.fs.s3a.secret.key="YYY..." \
--conf spark.hadoop.fs.s3a.access.key="ZZZ..." \
--conf spark.hadoop.fs.s3a.endpoint="https://s3.cern.ch" \
--conf spark.hadoop.fs.s3a.impl="org.apache.hadoop.fs.s3a.S3AFileSystem" \
--conf spark.extraListeners=ch.cern.sparkmeasure.FlightRecorderStageMetrics \
--conf spark.sparkmeasure.outputFormat=json_to_hadoop \
--conf spark.sparkmeasure.outputFilename="s3a://test/myoutput_$(date +%s).json" \
--class org.apache.spark.examples.SparkPi \
examples/jars/spark-examples_2.12-3.3.1.jar 10
To post-process the saved metrics you will need to deserialize objects saved by the flight mode. This is an example of how to do that using the supplied helper object sparkmeasure.Utils
bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.24
val myMetrics = ch.cern.sparkmeasure.IOUtils.readSerializedStageMetricsJSON("/tmp/stageMetrics_flightRecorder")
// use ch.cern.sparkmeasure.IOUtils.readSerializedStageMetrics("/tmp/stageMetrics.serialized") for java serialization
myMetrics.toDF.show()
The following methods of IOUtils
are used to read and write serialized Task and stage metrics data:
def writeSerializedJSON(fullPath: String, metricsData: AnyRef): Unit =
def writeSerializedJSONToHadoop(fullPath: String, metricsData: AnyRef, conf: SparkConf): Unit
def writeToStringSerializedJSON(metricsData: AnyRef): String
def writeSerialized(fullPath: String, metricsData: Any): Unit
def readSerializedStageMetricsJSON(stageMetricsFileName: String): List[StageVals]
def readSerializedStageMetrics(stageMetricsFileName: String): ListBuffer[StageVals]
def readSerializedTaskMetricsJSON(taskMetricsFileName: String): List[TaskVals]
def readSerializedTaskMetrics(stageMetricsFileName: String): ListBuffer[TaskVals]
-
If you are deploying applications using cluster mode, note that metrics serialized with json or java are written by the driver into the local filesystem. You could use a network filesystem mounted on the driver/cluster for convenience. You can also use json_to_hadoop serialization to write the metrics to HDFS or and Hadoop-compliant filesystem such as S3.
-
The flight recorder mode is similar to what Spark already does with the event log, where metrics are stored: see also Spark documentation for spark.eventLog.enabled and spark.eventLog.dir and for details on the Spark History Server.
See also this note with a few tips on how to read event log files -
For metrics analysis see also notes at Notes_on_metrics_analysis.md for a few examples.