Copyright 2019 ABSA Group Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Apache Spark 2.4.0 introduced high-order functions as a part of SQL expressions. These new functions are accessible only via textual representation of Spark SQL.
This library makes the high-order functions accessible also for Dataframe/Dataset Scala API to get type safety when using the functions.
Reference the library (e.g. via Maven)
<dependency>
<groupId>za.co.absa</groupId>
<artifactId>spark-hofs</artifactId>
<version>0.1.0</version>
</dependency>
Import Scala API of the high-order functions into your scope.
import za.co.absa.spark.hofs._
The transform function is an equivalent to the map function from functional programming. It takes a column of arrays as the first argument and projects every element in each array with using a function passed as the second argument.
scala> df.withColumn("output", transform('input, x => x + 1)).show
+------------+------------+
| input| output|
+------------+------------+
|[1, 4, 5, 7]|[2, 5, 6, 8]|
+------------+------------+
If the logic of the projection function requires information about the element position of a given array, the transform function can pass an index starting from 0 to the projection function as the second argument.
scala> df.withColumn("output", transform('input, (x, i) => x + i)).show
+------------+-------------+
| input| output|
+------------+-------------+
|[1, 4, 5, 7]|[1, 5, 7, 10]|
+------------+-------------+
By default, the lambda variable representing the element will be seen as elm
and the lambda variable representing
the index as idx
in Spark execution plans.
scala> df.withColumn("output", transform('input, (x, i) => x + i)).explain(true)
== Parsed Logical Plan ==
'Project [input#8, transform('input, lambdafunction(('elm + 'idx), 'elm, 'idx, false)) AS output#45]
+- Project [value#6 AS input#8]
+- LocalRelation [value#6]
== Analyzed Logical Plan ==
input: array<int>, output: array<int>
Project [input#8, transform(input#8, lambdafunction((lambda elm#51 + lambda idx#52), lambda elm#51, lambda idx#52, false)) AS output#45]
+- Project [value#6 AS input#8]
+- LocalRelation [value#6]
...
Names of the lambda variables can be changed by passing extra argument to the transform function.
scala> df.withColumn("output", transform('input, (x, i) => x + i, "myelm", "myidx")).explain(true)
== Parsed Logical Plan ==
'Project [input#8, transform('input, lambdafunction(('myelm + 'myidx), 'myelm, 'myidx, false)) AS output#53]
+- Project [value#6 AS input#8]
+- LocalRelation [value#6]
== Analyzed Logical Plan ==
input: array<int>, output: array<int>
Project [input#8, transform(input#8, lambdafunction((lambda myelm#59 + lambda myidx#60), lambda myelm#59, lambda myidx#60, false)) AS output#53]
+- Project [value#6 AS input#8]
+- LocalRelation [value#6]
...
The filter function takes a column of arrays as the first argument and eliminates all elements that do not satisfy the predicate that is passed as the second argument.
scala> df.withColumn("output", filter('input, x => x % 2 === 1)).show
+------------------+---------+
| input| output|
+------------------+---------+
|[1, 2, 4, 5, 7, 8]|[1, 5, 7]|
+------------------+---------+
The lambda variable within the predicate will be seen as elm
in Spark execution plans. This name can be changed by
passing the third argument to the filter function.
The aggregate function is an equivalent of the foldLeft function from functional programming. The method takes a column of arrays and a column of zero elements as first two arguments. The next argument is a binary function merging a zero element and all elements from an input array into one element. The first argument of the merging function is an accumulated value and the second one is an element of given iteration.
scala> df.withColumn("output", aggregate('input, 'zero, (acc, x) => acc + x)).show
+------------------+----+------+
| input|zero|output|
+------------------+----+------+
|[1, 2, 4, 5, 7, 8]| 100| 127|
+------------------+----+------+
If an user wants to transform the reduced value before returning the result, the user can pass a function performing the transformation logic as the fourth argument.
scala> df.withColumn("output", aggregate('input, 'zero, (acc, x) => acc + x, y => concat(y, y))).show
+------------------+----+------+
| input|zero|output|
+------------------+----+------+
|[1, 2, 4, 5, 7, 8]| 100|127127|
+------------------+----+------+
The lambda variable representing the accumulator will be seen as acc
and the lambda variable representing the element
as elm
in Spark execution plans. The names can be changed by passing extra arguments to the aggregate function.
The zip_with function takes two columns of arrays as the first two arguments and performs element-wise merge into a single column of arrays. The third argument ia a function taking one element from each array at the same position and specifying the merge logic. If one array is shorter, null elements are appended this array to be the same length as the longer array.
scala> df.withColumn("output", zip_with('input1, 'input2, (x, y) => x + y)).show
+---------------+-------------+---------------+
| input1| input2| output|
+---------------+-------------+---------------+
|[1, 2, 4, 5, 7]|[2, 4, 8, 12]|[3, 6, 12, 17,]|
+---------------+-------------+---------------+
The lambda variables indicating input elements to the merging function will be seen as left
and right
in
Spark execution plans. The names can be changed by passing extra arguments to the zip_with function.