Implicit functions for querying interval data with Apache Spark/Scala. Features:
- support for closed interval and half open intervals (closed-from, open-to)
- support for discrete (timestamp, integer) and continuous (double, float) interval axis datatype
Breaking changes in version 2.x:
- temporalRoundDiscreteTime is no longer included in temporalCleanupExtend. Add it separately if needed.
- temporalCombine is no longer included in temporalCleanupExtend. Add it separately if needed. Note that this affects also temporal*Join methods.
- superfluous parameter
keys:Seq[String] = Seq()
is removed from temporalCombine
Spark-temporalquery releases are published on maven central. To use it just add the following maven dependency for your Scala version to the project:
<dependency>
<groupId>ch.zzeekk.spark</groupId>
<artifactId>spark-temporalquery_2.12</artifactId>
<version>2.0.1</version>
</dependency>
See also Builds to review compatibility between Spark, Scala and Java.
TemporalQueryUtil provides implicit function on DataFrame to query temporal data with timestamp interval axis datatype.
// this imports temporal* implicit functions on DataFrame
import ch.zzeekk.spark.temporalquery.TemporalQueryUtil._
import ch.zzeekk.spark.temporalquery.{ClosedInterval, DiscreteTimeAxis}
import java.sql.Timestamp
import java.time.temporal.ChronoUnit
// configure options for temporal query operations
val intervalDef = ClosedInterval(Timestamp.valueOf("0001-01-01 00:00:00"), Timestamp.valueOf("9999-12-31 00:00:00"), DiscreteTimeAxis(ChronoUnit.MILLIS))
implicit val tqc = TemporalClosedIntervalQueryConfig( fromColName="valid_from", toColName="valid_to", intervalDef = intervalDef)
// make SparkSession implicitly available
implicit val sss = spark
import sss.implicits._
// prepare some DataFrames
val dfLeft = Seq((0, Timestamp.valueOf("2017-12-10 00:00:00"), Timestamp.valueOf("2018-12-08 23:59:59.999"), 4.2))
.toDF("id", "valid_from", "valid_to","value_l")
val dfRight = Seq((0, Timestamp.valueOf("2018-01-01 00:00:00"), Timestamp.valueOf("2018-12-31 23:59:59.999"), 5))
.toDF("id", "valid_from", "valid_to","value_r")
// use temporal* functions
dfLeft.temporalInnerJoin(dfRight, Seq("id"))
LinearGenericQueryUtil provides implicit function on DataFrame to query linear data with any numeric interval axis datatype. The following shortcuts exists to use it with predefined datatypes:
- LinearFloatQueryUtil
- LinearDoubleQueryUtil
// this imports linear* implicit functions on DataFrame
import ch.zzeekk.spark.temporalquery.LinearDoubleQueryUtil._
import ch.zzeekk.spark.temporalquery.HalfOpenInterval
// configure options for linear query operations
val intervalDef = HalfOpenInterval(0d, Double.MaxValue)
implicit val lqc: LinearQueryConfig = LinearHalfOpenIntervalQueryConfig(fromColName="pos_from", toColName="pos_to", intervalDef = intervalDef)
// make SparkSession implicitly available
implicit val sss = session
import sss.implicits._
// prepare some DataFrames
val dfLeft = Seq((0, 0.0, 100.0, 4.2))
.toDF("id", "pos_from", "pos_to","value_l")
val dfRight = Seq((0, 50.0, 200.0, 5))
.toDF("id", "pos_from", "pos_to","value_r")
// use linear* functions
dfLeft.linearInnerJoin(dfRight, Seq("id"))
The following sections are written for temporal queries, but the library works in the same way with linear queries by exchanging temporal* vs. linear* in function names.
For temporal queries a time axis with datatype timestamp is needed. The axis can be configured as:
- ClosedInterval with different discrete step size, or
- HalfOpenInterval
via
TemporalQueryConfig.intervalDef
The axis starts atTemporalClosedIntervalQueryConfig.intervalDef.lowerHorizon
and ends atTemporalClosedIntervalQueryConfig.intervalDef.upperHorizon
.
Before using the operations below you must ensure that your data satisfies the requirements of the chosen intervalDef
configuration.
Moreover the data frame must not contain temporally overlapping entries or entries where validTo < validFrom as this will lead to confusing results.
You may use the method temporalCleanupExtend
and temporalCombine()
in order to clean up your data frame. For example
Id | val | validFrom | validTo | comment |
---|---|---|---|---|
1 | 2.72 | 2019-01-05 12:34:56.123456789 | 2019-02-01 02:34:56.1235 | nanoseconds |
1 | 2.72 | 2019-02-01 01:00:00.0 | 2019-02-01 02:34:56.1245 | overlaps with previous |
1 | 2.72 | 2019-02-10 00:00:0 | 2019-02-09 00:00:0 | ends before it starts |
1 | 42.0 | 2019-01-01 00:00:0 | 2019-12-31 23:59:59.999 | does not overlap because different value: many-to-many relation |
Id | val | validFrom | validTo |
---|---|---|---|
1 | 2.72 | 2019-01-05 12:34:56.124 | 2019-02-01 02:34:56.124 |
1 | 42.0 | 2019-01-01 00:00:0 | 2019-12-31 23:59:59.999 |
You can then use the following additional functions on Dataset/DataFrame
temporalInnerJoin( df2:DataFrame, keys:Seq[String] )
Inner Join of two temporal datasets using a list of key-columns named the same as condition (using-join). "Inner join" means that the result for a given key contains only periods which are defined in both DataFrames.temporalInnerJoin( df2:DataFrame, keyCondition:Column )
Inner Join of two temporal datasets using a given expression as join conditiontemporalFullJoin( df2:DataFrame, keys:Seq[String], rnkExpressions:Seq[Column] = Seq(), additionalJoinFilterCondition:Column = lit(true) )
Full Outer Join of two temporal datasets using a list of key-columns named the same as condition (using-join). "Outer join" means that the result for a given key contains all periods from DataFrame 1, 2 respectively, with null values for attributes of DataFrame 2, 1 respectively, where the period is missing from Data Frame 2, 1 respectively.- rnkExpressions: In case df1 or df2 are not a temporal 1-to-1 or many-to-1 mapping, this parameter is used to select a sub-dataFrame which constitutes a temporal 1-1 mapping: by ordering for each key according to rnkExpressions and selecting the first row. In case df1 or df2 are a to-many relation you need to skip this cleaning by not setting the parameter rnkExpressions or by setting it to the empty sequence.
- additionalJoinFilterCondition: you can provide additional non-equi-join conditions which will be combined with the conditions generated from the list of keys.
temporalLeftJoin( df2:DataFrame, keys:Seq[String], rnkExpressions:Seq[Column] = Seq(), additionalJoinFilterCondition:Column = lit(true) )
Left Outer Join of two temporal datasets using a list of key-columns named the same as condition (using-join). "Left join" means that the result for a given key contains all periods from DataFrame 1 with null values for attributes of DataFrame 2 where the period is missing from Data Frame 2.- rnkExpressions: In case df2 is not a temporal 1-to-1 or many-to-1 mapping, this parameter is used to select a sub-dataFrame which constitutes a temporal 1-1 mapping: by ordering for each key according to rnkExpressions and selecting the first row. In case df2 is a to-many relation you need to skip this cleaning by not setting the parameter rnkExpressions or by setting it to the empty sequence.
- additionalJoinFilterCondition: you can provide additional non-equi-join conditions which will be combined with the conditions generated from the list of keys.
temporalRightJoin( df2:DataFrame, keys:Seq[String], rnkExpressions:Seq[Column] = Seq(), additionalJoinFilterCondition:Column = lit(true) )
Right Outer Join of two temporal datasets using a list of key-columns named the same as condition (using-join). "Right join" means that the result for a given key contains all periods from DataFrame 2 with null values for attributes of DataFrame 1 where the period is missing from Data Frame 1.- rnkExpressions: In case df1 is not a temporal 1-to-1 or many-to-1 mapping, this parameter is used to select a sub-dataFrame which constitutes a temporal 1-1 mapping: by ordering for each key according to rnkExpressions and selecting the first row. In case df1 is a to-many relation you need to skip this cleaning by not setting the parameter rnkExpressions or by setting it to the empty sequence.
- additionalJoinFilterCondition: you can provide additional non-equi-join conditions which will be combined with the conditions generated from the list of keys.
temporalLeftAntiJoin( df2:DataFrame, joinColumns:Seq[String], additionalJoinFilterCondition:Column = lit(true))
Left Anti Join of two temporal datasets using a list of key-columns named the same as condition (using-join). "Anti left join" means that the result contains all periods from DataFrame 1 which do not occur in DataFrame2 for the given joinColumns.- additionalJoinFilterCondition: you can provide additional non-equi-join conditions which will be combined with the conditions generated from the list of keys. Note: this function is not yet supported on intervalDef's other than type ClosedInterval.
temporalCleanupExtend( keys:Seq[String], rnkExpressions:Seq[Column], aggExpressions:Seq[(String,Column)], rnkFilter:Boolean = true, extend: Boolean = true, fillGapsWithNull: Boolean = true )
Solve temporal overlaps by a prioritizing Records according to rnkExpressions and extend the temporal range of each key to be defined over the whole timeline. The resulting DataFrame has an additional column_defined
which is false for extended ranges.- aggExpressions: Aggregates to be calculated on overlapping records (e.g. count)
- rnkFilter: Flag if overlapping records should be tagged or filtered (default=filtered=true)
- extend: If true and fillGapsWithNull=true, every key is extended with additional records with null values, so that for every key the whole timeline [minDate , maxDate] is covered (default=extend=true)
- fillGapsWithNull: If true, gaps in history are filled with records with null values for every key (default=fillGapsWithNull=true)
- Note: extend=true needs fillGapsWithNull=true in order to work
temporalCombine(ignoreColNames:Seq[String] = Seq() )
Combines successive records if there are no changes on the non-technical attributes.- ignoreColName: A list of columns to be ignored in change detection
temporalUnifyRanges( keys:Seq[String] )
Unify temporal ranges in group of records defined by 'keys' (needed for temporal aggregations).temporalExtendRange( keys:Seq[String], extendMin:Boolean=true, extendMax:Boolean=true )
Extend temporal range to min/maxDate according to TemporalQueryConfigtemporalContinuous2discrete
transforms a data frame with continuous time to a frame with discrete time Note: this function only works on intervalDef's of type ClosedInterval.temporalRoundDiscreteTime
sets the discreteness of the time scale to the discrete step size chosen in TemporalQueryConfig Note: this function only works on intervalDef's of type ClosedInterval.
Spark-temporalquery is built and released for Scala 2.11 with Spark 2.4.x and Scala 2.12 with Spark 3.x. Spark 3.x does not support Scala 2.11. Newer versions of Spark 2.4.x would support Scala 2.12, but there is no spark-temporalquery release for this combination.
Note that Spark 2.4 needs Java version 8, whereas Spark 3.x is compatible with Java 8/11/17. See also https://spark.apache.org/docs/latest/#downloading.
On exception org.apache.spark.sql.AnalysisException: Column ... are ambiguous. It's probably because you joined several Datasets together, and some of these Datasets are the same. ...
when using temporal*Join methods, try to use df.alias on both DataFrames before joining.
If temporal-query finds aliases it will use them in the join conditions.
The exception might remain. In these cases you can disable the check by setting Spark property spark.sql.analyzer.failAmbiguousSelfJoin = false
.