Skip to content

Commit

Permalink
#61 Add infinity support to Dates and Numerics
Browse files Browse the repository at this point in the history
  • Loading branch information
Zejnilovic authored Oct 31, 2024
1 parent 0ee16c8 commit 8e36e5f
Show file tree
Hide file tree
Showing 15 changed files with 375 additions and 82 deletions.
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.6.2
sbt.version=1.10.2
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ object MetadataKeys {
val DefaultValue = "default"
// date & timestamp
val DefaultTimeZone = "timezone"
val MinusInfinitySymbol = "minus_infinity_symbol"
val MinusInfinityValue = "minus_infinity_value"
val PlusInfinitySymbol = "plus_infinity_symbol"
val PlusInfinityValue = "plus_infinity_value"
// date & timestamp & all numeric
val Pattern = "pattern"
// all numeric
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.standardization.stages

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.{lit, when}
import org.apache.spark.sql.types.DataType

trait InfinitySupport {
protected def infMinusSymbol: Option[String]

protected def infMinusValue: Option[String]

protected def infPlusSymbol: Option[String]

protected def infPlusValue: Option[String]

protected val origType: DataType

def replaceInfinitySymbols(column: Column): Column = {
val columnWithNegativeInf: Column = infMinusSymbol.flatMap { minusSymbol =>
infMinusValue.map { minusValue =>
when(column === lit(minusSymbol).cast(origType), lit(minusValue).cast(origType)).otherwise(column)
}
}.getOrElse(column)

infPlusSymbol.flatMap { plusSymbol =>
infPlusValue.map { plusValue =>
when(columnWithNegativeInf === lit(plusSymbol).cast(origType), lit(plusValue).cast(origType))
.otherwise(columnWithNegativeInf)
}
}.getOrElse(columnWithNegativeInf)
}
}
39 changes: 26 additions & 13 deletions src/main/scala/za/co/absa/standardization/stages/TypeParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.slf4j.{Logger, LoggerFactory}
import za.co.absa.spark.commons.implicits.ColumnImplicits.ColumnEnhancements
import za.co.absa.spark.commons.implicits.StructFieldImplicits.StructFieldMetadataEnhancements
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements
import za.co.absa.spark.commons.utils.SchemaUtils
import za.co.absa.spark.hofs.transform
import za.co.absa.standardization.{ErrorMessage, StandardizationErrorMessage}
import za.co.absa.standardization.config.StandardizationConfig
import za.co.absa.standardization.implicits.StdColumnImplicits.StdColumnEnhancements
import za.co.absa.standardization.schema.StdSchemaUtils.FieldWithSource
import za.co.absa.standardization.schema.{MetadataValues, StdSchemaUtils}
import za.co.absa.standardization.schema.{MetadataKeys, MetadataValues, StdSchemaUtils}
import za.co.absa.standardization.time.DateTimePattern
import za.co.absa.standardization.typeClasses.{DoubleLike, LongLike}
import za.co.absa.standardization.types.TypedStructField._
Expand Down Expand Up @@ -127,6 +128,7 @@ object TypeParser {
private val MicrosecondsPerSecond = 1000000
private val NanosecondsPerSecond = 1000000000
private val InfinityStr = "Infinity"
private val MinusInfinityStr = "-Infinity"
private val nullColumn = lit(null) //scalastyle:ignore null


Expand Down Expand Up @@ -315,7 +317,13 @@ object TypeParser {
}

private abstract class NumericParser[N: TypeTag](override val field: NumericTypeStructField[N])
(implicit defaults: TypeDefaults) extends ScalarParser[N] {
(implicit defaults: TypeDefaults) extends ScalarParser[N] with InfinitySupport {
override protected val infMinusSymbol: Option[String] = metadata.getOptString(MetadataKeys.MinusInfinitySymbol)
override protected val infMinusValue: Option[String] = metadata.getOptString(MetadataKeys.MinusInfinityValue)
override protected val infPlusSymbol: Option[String] = metadata.getOptString(MetadataKeys.PlusInfinitySymbol)
override protected val infPlusValue: Option[String] = metadata.getOptString(MetadataKeys.PlusInfinityValue)
private val columnWithInfinityReplaced = replaceInfinitySymbols(column)

override protected def standardizeAfterCheck(stdConfig: StandardizationConfig)(implicit logger: Logger): ParseOutput = {
if (field.needsUdfParsing) {
standardizeUsingUdf(stdConfig)
Expand All @@ -335,9 +343,9 @@ object TypeParser {
val columnWithProperDecimalSymbols: Column = if (replacements.nonEmpty) {
val from = replacements.keys.mkString
val to = replacements.values.mkString
translate(column, from, to)
translate(columnWithInfinityReplaced, from, to)
} else {
column
columnWithInfinityReplaced
}

val columnToCast = if (field.allowInfinity && (decimalSymbols.infinityValue != InfinityStr)) {
Expand Down Expand Up @@ -494,9 +502,14 @@ object TypeParser {
* Date | O | ->to_utc_timestamp->to_date
* Other | ->String->to_date | ->String->to_timestamp->to_utc_timestamp->to_date
*/
private abstract class DateTimeParser[T](implicit defaults: TypeDefaults) extends PrimitiveParser[T] {
private abstract class DateTimeParser[T](implicit defaults: TypeDefaults) extends PrimitiveParser[T] with InfinitySupport {
override val field: DateTimeTypeStructField[T]
protected val pattern: DateTimePattern = field.pattern.get.get
override protected val infMinusSymbol: Option[String] = metadata.getOptString(MetadataKeys.MinusInfinitySymbol)
override protected val infMinusValue: Option[String] = metadata.getOptString(MetadataKeys.MinusInfinityValue)
override protected val infPlusSymbol: Option[String] = metadata.getOptString(MetadataKeys.PlusInfinitySymbol)
override protected val infPlusValue: Option[String] = metadata.getOptString(MetadataKeys.PlusInfinityValue)
private val columnWithInfinityReplaced: Column = replaceInfinitySymbols(column)

override protected def assemblePrimitiveCastLogic: Column = {
if (pattern.isEpoch) {
Expand All @@ -516,23 +529,23 @@ object TypeParser {

private def castWithPattern(): Column = {
// sadly with parquet support, incoming might not be all `plain`
// underlyingType match {
// underlyingType match {
origType match {
case _: NullType => nullColumn
case _: DateType => castDateColumn(column)
case _: TimestampType => castTimestampColumn(column)
case _: StringType => castStringColumn(column)
case _: DateType => castDateColumn(columnWithInfinityReplaced)
case _: TimestampType => castTimestampColumn(columnWithInfinityReplaced)
case _: StringType => castStringColumn(columnWithInfinityReplaced)
case ot: DoubleType =>
// this case covers some IBM date format where it's represented as a double ddmmyyyy.hhmmss
patternNeeded(ot)
castFractionalColumn(column, ot)
castFractionalColumn(columnWithInfinityReplaced, ot)
case ot: FloatType =>
// this case covers some IBM date format where it's represented as a double ddmmyyyy.hhmmss
patternNeeded(ot)
castFractionalColumn(column, ot)
castFractionalColumn(columnWithInfinityReplaced, ot)
case ot =>
patternNeeded(ot)
castNonStringColumn(column, ot)
castNonStringColumn(columnWithInfinityReplaced, ot)
}
}

Expand All @@ -554,7 +567,7 @@ object TypeParser {
}

protected def castEpoch(): Column = {
(column.cast(decimalType) / pattern.epochFactor).cast(TimestampType)
(columnWithInfinityReplaced.cast(decimalType) / pattern.epochFactor).cast(TimestampType)
}

protected def castStringColumn(stringColumn: Column): Column
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import za.co.absa.standardization.types.{CommonTypeDefaults, TypeDefaults}
import za.co.absa.standardization.udf.UDFLibrary
import za.co.absa.standardization.{LoggerTestBase, Standardization, StandardizationErrorMessage}
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements
import za.co.absa.standardization.schema.MetadataKeys

class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBase with LoggerTestBase {
import spark.implicits._
Expand Down Expand Up @@ -53,7 +54,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
)
val desiredSchema = StructType(Seq(
StructField(fieldName, DateType, nullable = false,
new MetadataBuilder().putString("pattern", "epoch").build)
new MetadataBuilder().putString(MetadataKeys.Pattern, "epoch").build)
))
val exp = Seq(
DateRow(Date.valueOf("1970-01-01")),
Expand All @@ -75,18 +76,28 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
val seq = Seq(
0L,
86400000,
-1,
978307199999L,
1563288103123L
1563288103123L,
-2
)
val desiredSchema = StructType(Seq(
StructField(fieldName, DateType, nullable = false,
new MetadataBuilder().putString("pattern", "epochmilli").build)
new MetadataBuilder()
.putString(MetadataKeys.Pattern, "epochmilli")
.putString(MetadataKeys.PlusInfinitySymbol, "-1")
.putString(MetadataKeys.PlusInfinityValue, "1563278222094")
.putString(MetadataKeys.MinusInfinitySymbol, "-2")
.putString(MetadataKeys.MinusInfinityValue, "0")
.build)
))
val exp = Seq(
DateRow(Date.valueOf("1970-01-01")),
DateRow(Date.valueOf("1970-01-02")),
DateRow(Date.valueOf("2019-07-16")),
DateRow(Date.valueOf("2000-12-31")),
DateRow(Date.valueOf("2019-07-16"))
DateRow(Date.valueOf("2019-07-16")),
DateRow(Date.valueOf("1970-01-01"))
)

val src = seq.toDF(fieldName)
Expand All @@ -106,7 +117,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
)
val desiredSchema = StructType(Seq(
StructField(fieldName, DateType, nullable = false,
new MetadataBuilder().putString("pattern", "epochmicro").build)
new MetadataBuilder().putString(MetadataKeys.Pattern, "epochmicro").build)
))
val exp = Seq(
DateRow(Date.valueOf("1970-01-01")),
Expand All @@ -132,7 +143,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
)
val desiredSchema = StructType(Seq(
StructField(fieldName, DateType, nullable = false,
new MetadataBuilder().putString("pattern", "epochnano").build)
new MetadataBuilder().putString(MetadataKeys.Pattern, "epochnano").build)
))
val exp = Seq(
DateRow(Date.valueOf("1970-01-01")),
Expand All @@ -150,25 +161,31 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
}

test("simple date pattern") {
val seq = Seq(
val seq: Seq[String] = Seq(
"1970/01/01",
"1970/02/01",
"2000/31/12",
"2019/16/07",
"1970-02-02",
"crash"
"crash",
"Alfa"
)
val desiredSchema = StructType(Seq(
StructField(fieldName, DateType, nullable = false,
new MetadataBuilder().putString("pattern", "yyyy/dd/MM").build)
new MetadataBuilder()
.putString(MetadataKeys.Pattern, "yyyy/dd/MM")
.putString(MetadataKeys.PlusInfinitySymbol, "Alfa")
.putString(MetadataKeys.PlusInfinityValue, "1970/02/01")
.build)
))
val exp = Seq(
val exp: Seq[DateRow] = Seq(
DateRow(Date.valueOf("1970-01-01")),
DateRow(Date.valueOf("1970-01-02")),
DateRow(Date.valueOf("2000-12-31")),
DateRow(Date.valueOf("2019-07-16")),
DateRow(Date.valueOf("1970-01-01"), Seq(StandardizationErrorMessage.stdCastErr(fieldName, "1970-02-02", "string", "date", Some("yyyy/dd/MM")))),
DateRow(Date.valueOf("1970-01-01"), Seq(StandardizationErrorMessage.stdCastErr(fieldName, "crash", "string", "date", Some("yyyy/dd/MM"))))
DateRow(Date.valueOf("1970-01-01"), Seq(StandardizationErrorMessage.stdCastErr(fieldName, "crash", "string", "date", Some("yyyy/dd/MM")))),
DateRow(Date.valueOf("1970-01-02"))
)

val src = seq.toDF(fieldName)
Expand All @@ -190,7 +207,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
)
val desiredSchema = StructType(Seq(
StructField(fieldName, DateType, nullable = false,
new MetadataBuilder().putString("pattern", "HH-mm-ss dd.MM.yyyy ZZZ").build)
new MetadataBuilder().putString(MetadataKeys.Pattern, "HH-mm-ss dd.MM.yyyy ZZZ").build)
))
val exp = Seq(
DateRow(Date.valueOf("1970-01-01")),
Expand Down Expand Up @@ -221,7 +238,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
)
val desiredSchema = StructType(Seq(
StructField(fieldName, DateType, nullable = false,
new MetadataBuilder().putString("pattern", "HH:mm:ss(SSSnnnnnn) dd+MM+yyyy XXX").build)
new MetadataBuilder().putString(MetadataKeys.Pattern, "HH:mm:ss(SSSnnnnnn) dd+MM+yyyy XXX").build)
))
val exp = Seq(
DateRow(Date.valueOf("1970-01-01")),
Expand Down Expand Up @@ -252,7 +269,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
val desiredSchema = StructType(Seq(
StructField(fieldName, DateType, nullable = false,
new MetadataBuilder()
.putString("pattern", "yyyy/dd/MM")
.putString(MetadataKeys.Pattern, "yyyy/dd/MM")
.putString("timezone", "EST")
.build)
))
Expand Down Expand Up @@ -286,7 +303,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
val desiredSchema = StructType(Seq(
StructField(fieldName, DateType, nullable = false,
new MetadataBuilder()
.putString("pattern", "yyyy/dd/MM")
.putString(MetadataKeys.Pattern, "yyyy/dd/MM")
.putString("timezone", "Africa/Johannesburg")
.build)
))
Expand Down Expand Up @@ -319,7 +336,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
)
val desiredSchema = StructType(Seq(
StructField(fieldName, DateType, nullable = false,
new MetadataBuilder().putString("pattern", "MMMM d 'of' yyyy").build)
new MetadataBuilder().putString(MetadataKeys.Pattern, "MMMM d 'of' yyyy").build)
))
val exp = Seq(
DateRow(Date.valueOf("1970-01-01")),
Expand Down Expand Up @@ -351,7 +368,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
)
val desiredSchema = StructType(Seq(
StructField(fieldName, DateType, nullable = false,
new MetadataBuilder().putString("pattern", "yyyy/MM/dd 'insignificant' iiiiii").build)
new MetadataBuilder().putString(MetadataKeys.Pattern, "yyyy/MM/dd 'insignificant' iiiiii").build)
))
val exp = Seq(
DateRow(Date.valueOf("1970-01-01")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ package za.co.absa.standardization.interpreter

import org.apache.spark.sql.types._
import org.scalatest.funsuite.AnyFunSuite
import za.co.absa.standardization.ErrorMessage
import za.co.absa.standardization.{ErrorMessage, LoggerTestBase, Standardization, StandardizationErrorMessage, interpreter}
import za.co.absa.spark.commons.test.SparkTestBase
import za.co.absa.standardization.RecordIdGeneration.IdType.NoId
import za.co.absa.standardization.config.{BasicMetadataColumnsConfig, BasicStandardizationConfig, ErrorCodesConfig}
import za.co.absa.standardization.schema.MetadataKeys
import za.co.absa.standardization.types.{CommonTypeDefaults, TypeDefaults}
import za.co.absa.standardization.udf.UDFLibrary
import za.co.absa.standardization.{LoggerTestBase, Standardization, StandardizationErrorMessage}
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements

class StandardizationInterpreter_FractionalSuite extends AnyFunSuite with SparkTestBase with LoggerTestBase {
Expand Down Expand Up @@ -57,9 +56,17 @@ class StandardizationInterpreter_FractionalSuite extends AnyFunSuite with SparkT
private val desiredSchemaWithInfinity = StructType(Seq(
StructField("description", StringType, nullable = false),
StructField("floatField", FloatType, nullable = false,
new MetadataBuilder().putString("allow_infinity", value = "true").build),
new MetadataBuilder()
.putString("allow_infinity", value = "true")
.putString(MetadataKeys.MinusInfinitySymbol, "FRRR")
.putString(MetadataKeys.MinusInfinityValue, "0")
.build),
StructField("doubleField", DoubleType, nullable = true,
new MetadataBuilder().putString("allow_infinity", value = "true").build)
new MetadataBuilder()
.putString("allow_infinity", value = "true")
.putString(MetadataKeys.PlusInfinitySymbol, "MAXVALUE")
.putString(MetadataKeys.PlusInfinityValue, "")
.build)
))

test("From String") {
Expand Down Expand Up @@ -168,14 +175,15 @@ class StandardizationInterpreter_FractionalSuite extends AnyFunSuite with SparkT
("01-Euler", "2.71", "2.71"),
("02-Null", null, null),
("03-Long", Long.MaxValue.toString, Long.MinValue.toString),
("04-infinity", "-∞", ""),
("04-infinity", "-∞", "MAXVALUE"),
("05-Really big", "123456789123456791245678912324789123456789123456789.12",
"-1234567891234567912456789123247891234567891234567891234567891234567891234567891234567891234567891234567891234"
+ "567891234567891234567891234567891234567891234567891234567891234567891234567891234567891234567891234678912345"
+ "678912345678912345678912345679124567891232478912345678912345678912345678912345678912345679124567891232478912"
+ "3456789123456789123456789123456789123456789123456789123456789.1"),
("06-Text", "foo", "bar"),
("07-Exponential notation", "-1.23E4", "+9.8765E-3")
("07-Exponential notation", "-1.23E4", "+9.8765E-3"),
("08-Infinity", "FRRR", "MAXVALUE")
)
val src = seq.toDF("description","floatField", "doubleField")
logDataFrameContent(src)
Expand All @@ -193,7 +201,8 @@ class StandardizationInterpreter_FractionalSuite extends AnyFunSuite with SparkT
FractionalRow("06-Text", Option(0), None, Seq(
StandardizationErrorMessage.stdCastErr("floatField", "foo", "string", "float", None),
StandardizationErrorMessage.stdCastErr("doubleField", "bar", "string", "double", None))),
FractionalRow("07-Exponential notation", Option(-12300.0f), Option(0.0098765))
FractionalRow("07-Exponential notation", Option(-12300.0f), Option(0.0098765)),
FractionalRow("08-Infinity", Option(0f), Option(Double.PositiveInfinity))
)

assertResult(exp)(std.as[FractionalRow].collect().sortBy(_.description).toList)
Expand Down
Loading

0 comments on commit 8e36e5f

Please sign in to comment.