-
Notifications
You must be signed in to change notification settings - Fork 59
RDDs: Transformation and Action
Here we will look at different kind of operations and actions that we can perform on RDDs.
Recall transformers and accessors from Scala sequential and parallel collections.
Transformers: Return new collections as results. (Not single values, thus transform one collection into another)
Examples: map
, filter
, flatMap
, groupBy
map(f: A => B): Traversable[B]
Accessors: Return single values as results.
Examples: reduce
, fold
, aggregate
.
reduce(op: (A, A) => A): A
In Spark, we have these counterparts:
Transformers: Return new collections RDDs as results.
They are lazy, their result RDD is not immediately computed.
Actions: Compute a result based on an RDD, and its either returned or saved to an external storage system like HDFS.
They are eager, their result is immediately computed. So if RDD is not returned as a result, the given function an action.
Laziness/eagerness is how we can limit network communication using the programming model.
These properties is how Spark provides the benefits mentioned earlier and is able to aggresively reduce the required network communication, thus addressing latency. This example makes it clear:
Consider a transformation:
val largeList: List[String] = ...
val wordsRdd: RDD[String] = sc.parallelize(largeList) // sc is the SparkContext
val lengthsRdd: RDD[Int] = wordsRdd.map(_.length)
What has happened on the cluster at this point? Nothing: Execution of map (a transformation) is deferred as the transformations which are lazy, Spark just keeps track of the transformation.
Now we add an action to the above:
val totalChars = lengthsRdd.reduce( _ + _ )
Now the transformation is applied on the dataset and the the action which is eager is applied on the result of that.
Thus, as we saw previously, Spark minimizes latency by aggresively minimizing the network communications by using lazy transformations and eager actions.
So people erroneously think that after applying a transformation, the result has been computed, whereas in reality, the result is only computed when an action is used.
-
map[B](f: A => B): RDD[B]
: Apply functionf
to each element in RDD and return an RDD of the result -
flatMap[B](f: A => TraversableOnce[B]): RDD[B]
: Apply functionf
to each element in RDD and return an RDD of the contents of the iterators returned -
filter(pred: A => Boolean): RDD[A]
: applypred
function to each element in RDD and return an RDD of elements that pass the predicate condition. -
distinct(): RDD[B]
: return RDD with duplicates removed.
-
collect(): Array[T]
: returns all elements in RDD -
count(): Long
: returns num of elements in RDD -
take(num: Int): Array[T]
: returns firstnum
elements of RDD -
reduce(op: (A,A) => A): A
: combine the elements in the RDD together using the givenop
functions and return result. -
foreach(f: T => Unit): Array[T]
: apply function to each element in the RDD.
Given an RDD[String]
which contain gigabytes of logs. Each element of this RDD represents one line of logging. Assuming the dates come in the form YYYY-MM-DD:HH:MM:SS
, and errors are logged with prefix "ERROR", how would you determine the number of errors that were logged in December 2016?
val lastYearsLogs: RDD[String] = ...
val numErrorsLoggedInDec = lastYearsLogs.filter(arg => arg.contains("2016-12") && arg.contains("ERROR")) // line 1
.count // line 2
// line 1: A computation that we know we're going to eventually do, but we haven't started it yet - lazy.
// line 2: Actually gives the order to Spark to send this function over the network to all of the little individual machines to do their computations, and then to add them up and send back the results, the count call. And to aggregate it, combine it all up, so that you have one integer or one long with the number of errors in the logs
Spark computes RDDs the first time they are used in an action. This helps when processing large amounts of data. Consider the example from above:
val lastYearsLogs: RDD[String] = ...
val firstLogsWithErrors = lastYearsLogs.filter(_.contains("ERROR")).take(10)
The execution of filter
transformation is deferred until the take
action is applied.
Spark leverages this by analyzing and optimizing the chain of operations before executing it.
Spark will not compute intermediate RDDs. Instead, as soon as 10 elements of the filtered RDD have been computed, we are done. At this point, Spark stops working, saving time and space by not computing unused result of filter
. Also in case multiple transformation were used before calling the action, Spark can apply all of them on the RDD in a single pass when the action is called, rather than having to recompute RDD several times.
-
union(other: RDD[T]): RDD[T]
: Returns an RDD containing elements from both RDDs -
intersection(other: RDD[T]): RDD[T]
: Returns an RDD containing elements common to both RDDs -
subtract(other: RDD[T]): RDD[T]
: Returns the RDD with contents of other RDD removed. -
cartesian(other: RDD[T]): RDD[T]
: Returns an RDD that is cartesian product with the other RDD.
RDDs also contain other actions unrelated to Scala collections, but which are useful when dealing with distributed data
-
takeSample(withRep1: Boolean, num: Int): Array[T]
: returns an array with random sample ofnum
elements of the dataset, with or without replacement. -
takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
: returns the first n elements of the RDD using either rheir natural order or a custom comparator. -
saveAsTextFile(path: String): Unit
: write elements of the dataset as a text file in the local filesystem or HDFS. -
saveAsSequenceFile(path: String): Unit
: write elements of the dataset as a Hadoop sequence file in the local filesystem or HDFS.
Week 1
- Introduction
- Data Parallel to Distributed Data Parallel
- Latency
- RDDs: Spark's Distributed Collection
- RDDs: Transformation and Action
- Evaluation in Spark: Unlike Scala Collections!
- Cluster Topology Matters!
Week 2
- Reduction Operations (fold, foldLeft, aggregate)
- Pair RDDs
- Pair RDDs: Transformations and Actions
- Pair RDDs: Joins
Week 3
- Shuffling: What it is and why it's important
- Partitioning
- Optimizing with Partitioners
- Wide vs Narrow Dependencies
Week 4