-
Notifications
You must be signed in to change notification settings - Fork 59
DataFrames (1)
So far we saw what DataFrame
s are, how to create them and how to use SQL queries on them.
DataFrame
s have their own APIs as well!
To enable optimization, Spark SQL's DataFrame
s operate on a restricted set of data types.
Thus, since we have to provide some kind of shchema to Spark when we create a DataFrame, the types that are used by Spark are Spark SQL Data Types i.e. the ones below corresponding to the types we provide in schema. If we have a datatype that does not have a corresponding Spark SQL Type, then we cannot use Spark SQL with that type.
Basic Spark SQL Data Types:
Scala Type SQL Type Details
---------------------------------------------------------------------------------------------------------
Byte ByteType 1 byte signed integers (-128, 127)
Short ShortType 2 byte signed integers (-32768, 32767)
Int IntegerType 4 byte signed integers (-2147483648, 2147483647)
Long LongType 8 byte signed integers
java.math.BigDecimal DecimalType Arbitrary precision signed decimals
Float FloatType 4 byte floating point number
Double DoubleType 8 byte floating point number
Array[Byte] BinaryType Byte sequence values
Boolean BooleanType true/false
Boolean BooleanType true/false
java.sql.Timestamp TimestampType Date containing year, month, day, hour, minute, second
java.sql.Date DateType Date containing year, month, day
String StringType Character string values (stored as UTF8)
Complex Spark SQL Data Types:
Scala Type SQL Type
-------------------------------------------------------------------
Array[T] ArrayType(elementType, containsNull)
Map[K, V] MapType(keyType, valueType, valueContainsNull)
case class StructType(List[StructFields])
Array of only one type of element (elementType
).
containsNull
is set to true
if the elements in ArrayType
value can have null values
E.g.
// scala type // sql type
Array[String] ArrayType(StringType, true)
Map of key/value pairs with two type of elements.
valuecontainsNull
is set to true
if the elements in MapType
value can have null values.
E.g.
// scala type // sql type
Map[Int, String] Map(IntegerType, StringType, true)
Struct type with the list of possible fields of different types.
containsNull
is set to true
if the elements in StructType
can have null values.
Class
in Scala is realized using a StructType
, and ClassVariables
using StructFields
.
E.g.
// scala type // sql type
case class Person(name: String, age: Int) StructType(List(StructField("name", StringType, true)
StructField("age", StringType, true)))
It's possible to arbitrarily nest complex data types! For example, below the Project
type is defined in Scala on the left, and in Spark SQL type on the right.
Important: In order to access any of these data types, basic or complex, you must first import Spark SQL types!
import org.apache.spark.sql.types._
When introduced, the DataFrames API introduced a no. of relational operations.
The main difference between the RDD-API and the DataFrames-API was that DataFrame APIs accept Spark SQL expressions, instead of arbitrary user-defined function literals like we were used to on RDDs. This allows the optimized to understand the the computation represents, and for example with filter, it can often be used to skip reading unnecessary records.
Similar looking to SQL: Example methods include:
select
where
limit
orderBy
gorupBy
join
Before we get into transformations and actions on DataFrame
s, lets first look at the ways we can have a look at our dataset.
-
dataframe.show()
: pretty-printsDatFrame
in tabular form. Shows first 20 elements.case class Employee(id: Int, fname: String, lname: String, age: Int, city: String) // DataFrame with schema defined in Employee case class val employeeDF = sc.parallelize(...).toDF employeeDF.show() // employeeDF: // +---+-----+-------+---+--------+ // | id|fname| lname |age| city | // +---+-----+-------+---+--------+ // | 12| Joe| Smith| 38|New York| // |563|Sally| Owens| 48|New York| // |645|Slate|Markham| 28| Sydney| // |221|David| Walker| 21| Sydney| // +---+-----+-------+---+--------+
-
dataframe.printSchema()
: prints the schema of theDatFrame
in tree format.case class Employee(id: Int, fname: String, lname: String, age: Int, city: String) // DataFrame with schema defined in Employee case class val employeeDF = sc.parallelize(...).toDF employeeDF.printSchema() // root // |-- id: integer (nullable = true) // |-- fname: string (nullable = true) // |-- lname: string (nullable = true) // |-- age: integer (nullable = true) // |-- city: string (nullable = true)
Like on RDDs, transformation on DataFrames are:
- operations which return a
DataFrame
as a results - lazily evaluated.
Some common transformations include:
def select(col: String, cols: String*): DataFrame
// selects a set of named columns anreturns a new DataFrame with these columns as a result
def agg(expr: Column, expr: Column*): DataFrame
// performs aggregations on a series of columns and returns a new DataFrame with the calculated output
def groupBy(col1: String, cols: String*): DataFrame //simplified
// groups the DataFrame using the specified columns. Intended to used before an aggregation.
def join(right: DataFrame): DataFrame //simplified
// inner join with another DataFrame
Other transformations include: filter
, limit
, orderBy
, where
, as
, sort
, union
, drop
, amongst others.
As seen above, most methods take a parameter of type Column
or String
, thus always referring to a column/attribute in the dataset.
Most methods on DataFrame
s tend to work with some well defined operation on column of the data set.
There are 3 ways:
- Using the $ notation:
// requires "import spark.implicits._ df.filter($"age" > 18)
- Referring to a
DataFrame
:df.filter(df("age") > 18)
- Using SQL query string:
df.filter("age > 18") // sometimes is error prone. So use the above 2.
Recall the previous example we saw. How do we solve this using the DataFrame-API?
case class Employee(id: Int, fname: String, lname: String, age: Int, city: String)
// DataFrame with schema defined in Employee case class
val employeeDF = sc.parallelize(...).toDF
// No need to register here like we did previously
// employeeDF:
// +---+-----+-------+---+--------+
// | id|fname| lname |age| city |
// +---+-----+-------+---+--------+
// | 12| Joe| Smith| 38|New York|
// |563|Sally| Owens| 48|New York|
// |645|Slate|Markham| 28| Sydney|
// |221|David| Walker| 21| Sydney|
// +---+-----+-------+---+--------+
val sydneyEmployeesDF = sparkSession.select("id", "lname")
.where("city = sydney")
.orderBy("id")
// sydneyEmployeesDF:
// +---+-------+
// | id| lname|
// +---+-------+
// |221| Walker|
// |645|Markham|
// +---+-------+
The DataFrame API gives us 2 methods for filtering: filter
and where
. They are equivalent!
val over30 = employeDF.filter("age > 30").show()
// same as
val over30 = employeDF.where("age > 30").show()
Filters can be complex as well, using logical operators, groups, etc:
employeDF.filter(($"age" > 30) && ($"city" === "sydney")).show()
One of the most common tasks on tables is to: (1) group data by a certain attribute, and then (2) do some kind of aggregation on it, like count.
For grouping and aggregating, Spark SQL provides:
- a
groupBy
function which returns aRelationalGroupedDataSet
- The
RelationalGroupedDataSet
has several standard aggregation functions defined on it likecount
,sum
,max
,min
,avg
,agg
.
So, how to group and aggregate:
- Call
groupBy
on a column/attribute of a DataFrame. - On the resulting
RelationalGroupedDataSet
, call one ofcount
,max
, oragg
. Here foragg
also specify which column/attribute to call the subsequent functions upon.
df.groupBy($"attribute1")
.agg(sum($"attribute2"))
df.groupBy($"attribute1")
.count($"attribute2")
We have a dataset of homes available for sale. Lets calculate the most and least expensive home per zip code.
case class listings(street: String, zip: Int, price: Int)
val listingDF = ...
import org.apache.spark.sql.functions._
val mostExpensiveDF = listings.groupBy($"zip")
.max($"price")
val leastExpensiveDF = listings.groupBy($"zip")
.min($"price")
We have datasets of all the posts in an online forum. We want to tally up each authors posts per subforum, and then rank he authors with the most posts per subforum
case class post(authorId: Int, subForum: String, likes: Int, date: String)
val postsDF = ...
import org.apache.spark.sql.functions._
val rankedDF = post.groupBy($"authorId", $"subForum")
.agg(count($"authorId")) // new DF with columns: authorId,, subForum, count(authorId)
.orderBy($"subForum", $"count(authorId)".desc)
// postsDF:
// +---------+--------+-------+-------+
// | authorId|subForum| likes |dates |
// +---------+--------+-------+-------+
// | 1| design| 2| "2012"|
// | 1| debate| 0| "2012"|
// | 2| debate| 0| "2012"|
// | 3| debate| 23| "2012"|
// | 1| design| 1| "2012"|
// | 1| design| 0| "2012"|
// | 2| design| 0| "2012"|
// | 2| debate| 0| "2012"|
// +---------+--------+-------+-------+
// rankedDF:
// +---------+--------+---------------+
// | authorId|subForum|count(authorId)|
// +---------+--------+---------------+
// | 2| debate| 2|
// | 1| debate| 1|
// | 3| debate| 1|
// | 1| design| 3|
// | 2| design| 1|
// +---------+--------+-------+-------+
Finally:
-
RelationalGroupedDataset
API: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.RelationalGroupedDataset - Methods within
agg
: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$
Week 1
- Introduction
- Data Parallel to Distributed Data Parallel
- Latency
- RDDs: Spark's Distributed Collection
- RDDs: Transformation and Action
- Evaluation in Spark: Unlike Scala Collections!
- Cluster Topology Matters!
Week 2
- Reduction Operations (fold, foldLeft, aggregate)
- Pair RDDs
- Pair RDDs: Transformations and Actions
- Pair RDDs: Joins
Week 3
- Shuffling: What it is and why it's important
- Partitioning
- Optimizing with Partitioners
- Wide vs Narrow Dependencies
Week 4