-
Notifications
You must be signed in to change notification settings - Fork 59
Spark SQL
SQL is a common language used for doing analytics on databases. But it's a pain to connect big data processing pipelines like Spark or Hadoop to an SQL database.
Everything about SQL is structured (fixed data-types, fixed set of operations). This rigidity has been used to get all kinds of performance speedups.
We want to:
- seamlessly intermix SQL queries with Scala
- get all the optimizations used in databases on Spark Jobs
Spark SQL delivers both these features!
-
Support relational processing (SQL like syntax) both within Spark programs (on RDDs) and on external data sources with a friendly API. Sometimes its more desirable to express a computation in SQL syntax with functional APIs and vice versa.
-
High performance: achieved by using optimizations techniques used in databases.
-
Easily support new data sources such as semi-structured (like json) and structured data (like external databases), to get them easily into Spark.
It is a component of the Spark Stack.
- It is a Spark module for structured data processing.
- It is implemented as a library on top of Spark.
Three main APIs that it provides:
- SQL literal syntax
-
DataFrame
s -
Dataset
s
Two specialized backend components:
- Catalyst: query optimizer
- Tungsten: off-heap serializer
Data is organized into one or more tables. Tables typically represent objects of a certain type, and they contain columns and rows.
Our terminology: A relation is just a table. Columns are Attributes. Rows are records or tuples.
DataFrame is Spark SQL's core abstraction, conceptually equivalent to a table in a relational database. Thus Dataframes are, conceptually, RDDs full of records with a known schema. (RDDs on the other hand do not have any schema info).
DataFrames are untyped. (unlike RDDs which have a type parameter (generic type parameter) : RDD[T]. The elements within DataFrames are Row
s, which are not parameterized by a type. Hence transformations on DataFrames are known as untyped transformations. Also, hence the Scala compiler cannot type-check Spark SQL schemas in DataFrames.
SparkSession
is the new SparkContext
. We use it when we use Spark SQL.
import org.apache.spark.sql.SparkSession
val sparkSession = SparkSession.builder()
.appName("My App")
//.config("spark.some.config.option", "some-value")
.getOrCreate()
DataFrame
s can be created in 2 ways:
- From an existing RDD: Either with schema inference, or with an explicit schema
- Reading in a specific data-source from a file: common structured or semi-structured formats such as JSON
Given a Pair RDD: RDD[(T1, T2, ...., TN)]
, a DataFrame
can be created with its schema automatically inferred using the toDF
method:
val conf: SparkConf = new SparkConf
val sc: SparkContext = new SparkContext(master = "local[*]", appName = "foo", conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val tupleRdd = ... // Assume RDD[(Int, String String, String)]
val tupleDF = tupleRdd.toDF("id", "name", "city", "country") // columnnames in the dataframe
Note: If toDF
is used without arguments in the above case, then Spark assigns numbers as the attributes i.e. _1, _2, _3, etc.
to the DataFrame
.
If the RDD uses a type which is already a case class, then Spark can infer the attributes directly from the case class's fields:
case class Person(id: Int, name: String, city: String)
val peopleRDD = ... // Assume RDD[Person]
val peopleDF = peopleRDD.toDF
It needs 3 steps:
- create an RDD of
Rows
from the original RDD - create the schema represented by a
StructType
matching the structure ofRows
in the RDD created in step 1. - Apply the schmea to the RDD of
Rows
viacreateDataFrame
method provided bySparkSession
case class Person(name: String, age: Int)
val peopleRdd = sc.textFile(...) // Assume RDD[Person]
// The schema is encoded in a string
val schemaString = "name age"
// Generate the schema based on the string of schema
val fields = schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim))
// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)
Using the SparkSession
object, you can read semi-structure/structured data by using the read
method.
JSON, CSV, Parquet, JDBC files can be directly read. Info on all methods avialble to do this: http://spark.apache.org/docs/latest/api/scala/#org.apache.spark.sql.DataFrameReader
Eg. for a JSON file:
val df = sparkSession.read.json("examples/src/main/resources/people.json")
Once you have a DataFrame
, you can now freely write familiar SQL syntax to operate on it!
Given a dataframe
, we just have to register it as a temprary SQL view first. This essentially gives a name to our DataFrame in SQL so we can refer to it in an SQL FROM statement.
// Register the DataFrame as a SQL temporary view
peopleDF.createOrReplaceTempView("people")
// SQL literals can be passed to Spark SQLās sql method
val adultsDF = spark.sql("SELECT * FROM people WHERE age > 17")
The SQL Statements avaiable are largely what's available in HiveQL. This includes standard SQL statements such as:
SELECT
FROM
WHERE
COUNT
HAVING
GROUP BY
ORDER BY
SORT BY
DISTINCT
JOIN
(LEFT|RIGHT|FULL) OUTER JOIN
- Subqueries:
SELECT col FROM ( SELECT a + b AS col from t1) t2
Supported Spark SQL Syntax:
HiveQL CheatSheet:
Updated list of supported Hive features in Spark SQL, the official Spark SQL docs enumerate:
Lets assume we have a DataFrame
representing a data set of employees:
case class Employee(id: Int, fname: String, lname: String, age: Int, city: String)
// DataFrame with schema defined in Employee case class
val employeeDF = sc.parallelize(...).toDF
// registered as table called "emloyees"
// employeeDF:
// +---+-----+-------+---+--------+
// | id|fname| lname |age| city |
// +---+-----+-------+---+--------+
// | 12| Joe| Smith| 38|New York|
// |563|Sally| Owens| 48|New York|
// |645|Slate|Markham| 28| Sydney|
// |221|David| Walker| 21| Sydney|
// +---+-----+-------+---+--------+
Goal is to obtain just the IDs and Lastnames of employees working in a specific city, say Sydney, Australia. And we want to sort the result in order of imcreasing employee ID.
What would this SQL query look like?
val sydneyEmployeesDF = sparkSession.sql("""SELECT id, lname
FROM employees
WHERE city = "sydney"
ORDER BY id""")
// sydneyEmployeesDF:
// +---+-------+
// | id| lname|
// +---+-------+
// |221| Walker|
// |645|Markham|
// +---+-------+
Note: Its best to use Spark 2.1+ with Scala 2.11+ for doing SQL queries with Spark SQL.
Week 1
- Introduction
- Data Parallel to Distributed Data Parallel
- Latency
- RDDs: Spark's Distributed Collection
- RDDs: Transformation and Action
- Evaluation in Spark: Unlike Scala Collections!
- Cluster Topology Matters!
Week 2
- Reduction Operations (fold, foldLeft, aggregate)
- Pair RDDs
- Pair RDDs: Transformations and Actions
- Pair RDDs: Joins
Week 3
- Shuffling: What it is and why it's important
- Partitioning
- Optimizing with Partitioners
- Wide vs Narrow Dependencies
Week 4