Skip to content
JoshRosen edited this page Nov 21, 2014 · 8 revisions

Welcome to the spark-perf wiki. This page lists several useful scripts, helper functions, and analysis tools for running spark-perf tests

Running tests

Automatically testing against multiple Spark versions

In config.py:

import os
SPARK_COMMIT_ID = os.environ["SPARK_COMMIT_ID"]

To run against multiple commits, use a shell script to repeatedly call bin/run with different environment variables:

#!/usr/bin/env bash

# Note: the spaces in the parens are necessary:
versions=( "origin/tag/v1.1.0" "origin/tag/v1.1.1-rc2" "origin/tag/v1.2.0-snapshot1" "origin/branch-1.2" )
for version in ${versions[@]}
do
  export SPARK_COMMIT_ID="$version"
  ./bin/run
done

To print the SHAs of every NRth commit between two git tags (useful for bisecting):

git log --oneline origin/branch-1.2...v1.1.0 | awk 'NR == 1 || NR % 50 == 0' | cut -d ' ' -f1

Analyzing results

Uploading logs from spark-ec2 clusters to S3

Upgrade to a newer version of the aws tool and configure AWS credentials:

sudo easy_install --upgrade awscli
aws configure
  # AWS Access Key ID [None]: AKIAIOSFODNN7EXAMPLE
  # AWS Secret Access Key [None]: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
  # Default region name [None]:
  # Default output format [None]:

Sync the results folder to an S3 bucket:

aws s3 cp --recursive /local/path/to/results/directory s3://bucket-name/resultsdir/

Analyzing JSON-formatted test results using Spark SQL

Many test suites report their results as JSON objects that contain information on the test environment and configuration, such as the Spark version / commit SHA, the contents of SparkConf, environment variables, etc. Using Spark SQL, we can create a SchemaRDD from these JSON objects in order to analyze test results.

When running spark-perf, it will produce a results directory that contains one subdirectory per run of bin/run. Each subdirectory contains several log files; we're interested in the *.out files that contain test results. First, list the paths of those files:

  • In Databricks Cloud:

    Note: this step is only relevant to Databricks employees; the rest of this tutorial should work in any environment, though:

    import dbutils.fs
    val resultsDir = "/mount/path/in/dbfs"
    fs.mount("s3n://AWSKEY:AWSSECRETKEY@bucket-name/bucket-filder/", resultsDir)
    val logFiles: Seq[String] = for (
      logDir: String <- fs.ls(resultsDir).filter(_.name.endsWith("_logs/")).map(d => s"$resultsDir/${d.name}");
      logFiles: Seq[String] = fs.ls(logDir).map(f => s"$logDir/${f.name}").filter(_.endsWith(".out"));
      logFile: String <- logFiles
    ) yield logFile
  • On a local filesystem:

    TODO

Next, create an RDD that loads these files, extracts the results lines (which start with "result: "), and grabs the JSON objects (which are written on a single line):

val allLogLines: RDD[String] = sc.union(logFiles.map(f => sc.textFile(f)))
val allResultJsons = allLogLines.filter(_.startsWith("results:")).map(_.stripPrefix("results: "))

Using SparkSQL, create a SchemaRDD from these logs. In the JSON output that spark-perf writes, some properties have names that contain dots (such as Java system properties like spark.shuffle.manager). Spark SQL does not currently support column names with dots (see SPARK-2775), so we'll need to post-process the inferred schema to convert dots into underscores:

import sqlContext.applySchema
import org.apache.spark.sql._

/** Replaces . in column names with _ (underscore) */
def cleanSchema(dataType: DataType): DataType = dataType match {
  case StructType(fields) =>
    StructType(
      fields.map(f =>
        f.copy(name = f.name.replaceAllLiterally(".", "_"), dataType = cleanSchema(f.dataType))))
  case ArrayType(elemType, nullable) => ArrayType(cleanSchema(elemType), nullable)
  case other => other
}

/** Replaces . in column names with _ */
def cleanSchema(schemaRDD: SchemaRDD): SchemaRDD = {
  applySchema(schemaRDD, cleanSchema(schemaRDD.schema).asInstanceOf[StructType])
}

val resultsRdd = cleanSchema(sqlContext.jsonRDD(allResultJsons))
resultsRdd.registerTempTable("results")

Now, we have a SQL table with a schema that looks something like this:

root
 |-- options: struct (nullable = true)
 |    |-- closure-size: string (nullable = true)
 |    |-- inter-trial-wait: string (nullable = true)
 |    |-- key-length: string (nullable = true)
 |    |-- num-jobs: string (nullable = true)
 |    |-- num-partitions: string (nullable = true)
 |    |-- num-records: string (nullable = true)
      [...]
 |-- results: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- time: double (nullable = true)
 |-- sparkConf: struct (nullable = true)
 |    |-- spark_app_id: string (nullable = true)
 |    |-- spark_app_name: string (nullable = true)
 |    |-- spark_driver_host: string (nullable = true)
 |    |-- spark_driver_memory: string (nullable = true)
 |    |-- spark_driver_port: string (nullable = true)
      [...]
 |-- sparkVersion: string (nullable = true)
 |-- systemProperties: struct (nullable = true)
 |    |-- SPARK_SUBMIT: string (nullable = true)
 |    |-- awt_toolkit: string (nullable = true)
 |    |-- file_encoding: string (nullable = true)
      [...]
 |-- testName: string (nullable = true)

In this schema, results is an array of per-test-run times (or other outcome metrics); the rest of the schema describes the configuration used by those runs. To analyze these results using SQL, we can use Hive's lateral view explode to un-nest this data and produce one row per test run. For example

SELECT result.time, results.systemProperties.spark_shuffle_blockTransferService as bts, testName, options, sparkVersion, results.systemProperties.sparkperf_commitSHA as commit
FROM results LATERAL VIEW EXPLODE(results) r as result
WHERE testName = "count";
Clone this wiki locally