Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#61 Add infinity support to Dates and Numerics #63

Merged
merged 5 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.6.2
sbt.version=1.10.2
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ object MetadataKeys {
val DefaultValue = "default"
// date & timestamp
val DefaultTimeZone = "timezone"
val MinusInfinitySymbol = "minus_infinity_symbol"
val MinusInfinityValue = "minus_infinity_value"
val PlusInfinitySymbol = "plus_infinity_symbol"
val PlusInfinityValue = "plus_infinity_value"
// date & timestamp & all numeric
val Pattern = "pattern"
// all numeric
Expand Down
63 changes: 50 additions & 13 deletions src/main/scala/za/co/absa/standardization/stages/TypeParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.slf4j.{Logger, LoggerFactory}
import za.co.absa.spark.commons.implicits.ColumnImplicits.ColumnEnhancements
import za.co.absa.spark.commons.implicits.StructFieldImplicits.StructFieldMetadataEnhancements
import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements
import za.co.absa.spark.commons.utils.SchemaUtils
import za.co.absa.spark.hofs.transform
import za.co.absa.standardization.{ErrorMessage, StandardizationErrorMessage}
import za.co.absa.standardization.config.StandardizationConfig
import za.co.absa.standardization.implicits.StdColumnImplicits.StdColumnEnhancements
import za.co.absa.standardization.numeric.DecimalSymbols
import za.co.absa.standardization.schema.StdSchemaUtils.FieldWithSource
import za.co.absa.standardization.schema.{MetadataValues, StdSchemaUtils}
import za.co.absa.standardization.schema.{MetadataKeys, MetadataValues, StdSchemaUtils}
import za.co.absa.standardization.time.DateTimePattern
import za.co.absa.standardization.typeClasses.{DoubleLike, LongLike}
import za.co.absa.standardization.types.TypedStructField._
Expand Down Expand Up @@ -127,6 +129,7 @@ object TypeParser {
private val MicrosecondsPerSecond = 1000000
private val NanosecondsPerSecond = 1000000000
private val InfinityStr = "Infinity"
private val MinusInfinityStr = "-Infinity"
private val nullColumn = lit(null) //scalastyle:ignore null


Expand Down Expand Up @@ -315,7 +318,13 @@ object TypeParser {
}

private abstract class NumericParser[N: TypeTag](override val field: NumericTypeStructField[N])
(implicit defaults: TypeDefaults) extends ScalarParser[N] {
(implicit defaults: TypeDefaults) extends ScalarParser[N] with InfinitySupport {
override protected val infMinusSymbol: Option[String] = metadata.getOptString(MetadataKeys.MinusInfinitySymbol)
override protected val infMinusValue: Option[String] = metadata.getOptString(MetadataKeys.MinusInfinityValue)
override protected val infPlusSymbol: Option[String] = metadata.getOptString(MetadataKeys.PlusInfinitySymbol)
override protected val infPlusValue: Option[String] = metadata.getOptString(MetadataKeys.PlusInfinityValue)
private val columnWithInfinityReplaced = replaceInfinitySymbols(column)

override protected def standardizeAfterCheck(stdConfig: StandardizationConfig)(implicit logger: Logger): ParseOutput = {
if (field.needsUdfParsing) {
standardizeUsingUdf(stdConfig)
Expand All @@ -335,9 +344,9 @@ object TypeParser {
val columnWithProperDecimalSymbols: Column = if (replacements.nonEmpty) {
val from = replacements.keys.mkString
val to = replacements.values.mkString
translate(column, from, to)
translate(columnWithInfinityReplaced, from, to)
} else {
column
columnWithInfinityReplaced
}

val columnToCast = if (field.allowInfinity && (decimalSymbols.infinityValue != InfinityStr)) {
Expand Down Expand Up @@ -494,9 +503,14 @@ object TypeParser {
* Date | O | ->to_utc_timestamp->to_date
* Other | ->String->to_date | ->String->to_timestamp->to_utc_timestamp->to_date
*/
private abstract class DateTimeParser[T](implicit defaults: TypeDefaults) extends PrimitiveParser[T] {
private abstract class DateTimeParser[T](implicit defaults: TypeDefaults) extends PrimitiveParser[T] with InfinitySupport {
override val field: DateTimeTypeStructField[T]
protected val pattern: DateTimePattern = field.pattern.get.get
override protected val infMinusSymbol: Option[String] = metadata.getOptString(MetadataKeys.MinusInfinitySymbol)
override protected val infMinusValue: Option[String] = metadata.getOptString(MetadataKeys.MinusInfinityValue)
override protected val infPlusSymbol: Option[String] = metadata.getOptString(MetadataKeys.PlusInfinitySymbol)
override protected val infPlusValue: Option[String] = metadata.getOptString(MetadataKeys.PlusInfinityValue)
private val columnWithInfinityReplaced: Column = replaceInfinitySymbols(column)

override protected def assemblePrimitiveCastLogic: Column = {
if (pattern.isEpoch) {
Expand All @@ -516,23 +530,23 @@ object TypeParser {

private def castWithPattern(): Column = {
// sadly with parquet support, incoming might not be all `plain`
// underlyingType match {
// underlyingType match {
origType match {
case _: NullType => nullColumn
case _: DateType => castDateColumn(column)
case _: TimestampType => castTimestampColumn(column)
case _: StringType => castStringColumn(column)
case _: DateType => castDateColumn(columnWithInfinityReplaced)
case _: TimestampType => castTimestampColumn(columnWithInfinityReplaced)
case _: StringType => castStringColumn(columnWithInfinityReplaced)
case ot: DoubleType =>
// this case covers some IBM date format where it's represented as a double ddmmyyyy.hhmmss
patternNeeded(ot)
castFractionalColumn(column, ot)
castFractionalColumn(columnWithInfinityReplaced, ot)
case ot: FloatType =>
// this case covers some IBM date format where it's represented as a double ddmmyyyy.hhmmss
patternNeeded(ot)
castFractionalColumn(column, ot)
castFractionalColumn(columnWithInfinityReplaced, ot)
case ot =>
patternNeeded(ot)
castNonStringColumn(column, ot)
castNonStringColumn(columnWithInfinityReplaced, ot)
}
}

Expand All @@ -554,7 +568,7 @@ object TypeParser {
}

protected def castEpoch(): Column = {
(column.cast(decimalType) / pattern.epochFactor).cast(TimestampType)
(columnWithInfinityReplaced.cast(decimalType) / pattern.epochFactor).cast(TimestampType)
}

protected def castStringColumn(stringColumn: Column): Column
Expand Down Expand Up @@ -681,4 +695,27 @@ object TypeParser {
}
}

sealed trait InfinitySupport {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why sealed trait?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could and perhaps should be in its own file even.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only reason is that this is a library and there are not supposed to be used outside. It is supposed to be only used with our defined parsers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code of this trait is written in such a way, that outside usage doesn't really causes issue, if somebody finds it useful - unlikely 😉

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO the "hiding" is important, when the class/trait have special handling needs or has access to more sensitive parts of the code.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved out

protected def infMinusSymbol: Option[String]
protected def infMinusValue: Option[String]
protected def infPlusSymbol: Option[String]
protected def infPlusValue: Option[String]
protected val origType: DataType

def replaceInfinitySymbols(column: Column): Column = {
val columnWithNegativeInf: Column = infMinusSymbol.flatMap { minusSymbol =>
infMinusValue.map { minusValue =>
when(column === lit(minusSymbol).cast(origType), lit(minusValue).cast(origType)).otherwise(column)
}
}.getOrElse(column)

infPlusSymbol.flatMap { plusSymbol =>
infPlusValue.map { plusValue =>
when(columnWithNegativeInf === lit(plusSymbol).cast(origType), lit(plusValue).cast(origType))
.otherwise(columnWithNegativeInf)
}
}.getOrElse(columnWithNegativeInf)
}
}

class TypeParserException(message: String) extends Exception(message: String)
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import za.co.absa.standardization.types.{CommonTypeDefaults, TypeDefaults}
import za.co.absa.standardization.udf.UDFLibrary
import za.co.absa.standardization.{LoggerTestBase, Standardization, StandardizationErrorMessage}
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements
import za.co.absa.standardization.schema.MetadataKeys

class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBase with LoggerTestBase {
import spark.implicits._
Expand Down Expand Up @@ -53,7 +54,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
)
val desiredSchema = StructType(Seq(
StructField(fieldName, DateType, nullable = false,
new MetadataBuilder().putString("pattern", "epoch").build)
new MetadataBuilder().putString(MetadataKeys.Pattern, "epoch").build)
))
val exp = Seq(
DateRow(Date.valueOf("1970-01-01")),
Expand All @@ -75,18 +76,28 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
val seq = Seq(
0L,
86400000,
-1,
978307199999L,
1563288103123L
1563288103123L,
-2
)
val desiredSchema = StructType(Seq(
StructField(fieldName, DateType, nullable = false,
new MetadataBuilder().putString("pattern", "epochmilli").build)
new MetadataBuilder()
.putString(MetadataKeys.Pattern, "epochmilli")
.putString(MetadataKeys.PlusInfinitySymbol, "-1")
.putString(MetadataKeys.PlusInfinityValue, "1563278222094")
.putString(MetadataKeys.MinusInfinitySymbol, "-2")
.putString(MetadataKeys.MinusInfinityValue, "0")
.build)
))
val exp = Seq(
DateRow(Date.valueOf("1970-01-01")),
DateRow(Date.valueOf("1970-01-02")),
DateRow(Date.valueOf("2019-07-16")),
DateRow(Date.valueOf("2000-12-31")),
DateRow(Date.valueOf("2019-07-16"))
DateRow(Date.valueOf("2019-07-16")),
DateRow(Date.valueOf("1970-01-01"))
)

val src = seq.toDF(fieldName)
Expand All @@ -106,7 +117,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
)
val desiredSchema = StructType(Seq(
StructField(fieldName, DateType, nullable = false,
new MetadataBuilder().putString("pattern", "epochmicro").build)
new MetadataBuilder().putString(MetadataKeys.Pattern, "epochmicro").build)
))
val exp = Seq(
DateRow(Date.valueOf("1970-01-01")),
Expand All @@ -132,7 +143,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
)
val desiredSchema = StructType(Seq(
StructField(fieldName, DateType, nullable = false,
new MetadataBuilder().putString("pattern", "epochnano").build)
new MetadataBuilder().putString(MetadataKeys.Pattern, "epochnano").build)
))
val exp = Seq(
DateRow(Date.valueOf("1970-01-01")),
Expand All @@ -150,25 +161,31 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
}

test("simple date pattern") {
val seq = Seq(
val seq: Seq[String] = Seq(
"1970/01/01",
"1970/02/01",
"2000/31/12",
"2019/16/07",
"1970-02-02",
"crash"
"crash",
"Alfa"
)
val desiredSchema = StructType(Seq(
StructField(fieldName, DateType, nullable = false,
new MetadataBuilder().putString("pattern", "yyyy/dd/MM").build)
new MetadataBuilder()
.putString(MetadataKeys.Pattern, "yyyy/dd/MM")
.putString(MetadataKeys.PlusInfinitySymbol, "Alfa")
.putString(MetadataKeys.PlusInfinityValue, "1970/02/01")
.build)
))
val exp = Seq(
val exp: Seq[DateRow] = Seq(
DateRow(Date.valueOf("1970-01-01")),
DateRow(Date.valueOf("1970-01-02")),
DateRow(Date.valueOf("2000-12-31")),
DateRow(Date.valueOf("2019-07-16")),
DateRow(Date.valueOf("1970-01-01"), Seq(StandardizationErrorMessage.stdCastErr(fieldName, "1970-02-02", "string", "date", Some("yyyy/dd/MM")))),
DateRow(Date.valueOf("1970-01-01"), Seq(StandardizationErrorMessage.stdCastErr(fieldName, "crash", "string", "date", Some("yyyy/dd/MM"))))
DateRow(Date.valueOf("1970-01-01"), Seq(StandardizationErrorMessage.stdCastErr(fieldName, "crash", "string", "date", Some("yyyy/dd/MM")))),
DateRow(Date.valueOf("1970-01-02"))
)

val src = seq.toDF(fieldName)
Expand All @@ -190,7 +207,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
)
val desiredSchema = StructType(Seq(
StructField(fieldName, DateType, nullable = false,
new MetadataBuilder().putString("pattern", "HH-mm-ss dd.MM.yyyy ZZZ").build)
new MetadataBuilder().putString(MetadataKeys.Pattern, "HH-mm-ss dd.MM.yyyy ZZZ").build)
))
val exp = Seq(
DateRow(Date.valueOf("1970-01-01")),
Expand Down Expand Up @@ -221,7 +238,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
)
val desiredSchema = StructType(Seq(
StructField(fieldName, DateType, nullable = false,
new MetadataBuilder().putString("pattern", "HH:mm:ss(SSSnnnnnn) dd+MM+yyyy XXX").build)
new MetadataBuilder().putString(MetadataKeys.Pattern, "HH:mm:ss(SSSnnnnnn) dd+MM+yyyy XXX").build)
))
val exp = Seq(
DateRow(Date.valueOf("1970-01-01")),
Expand Down Expand Up @@ -252,7 +269,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
val desiredSchema = StructType(Seq(
StructField(fieldName, DateType, nullable = false,
new MetadataBuilder()
.putString("pattern", "yyyy/dd/MM")
.putString(MetadataKeys.Pattern, "yyyy/dd/MM")
.putString("timezone", "EST")
.build)
))
Expand Down Expand Up @@ -286,7 +303,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
val desiredSchema = StructType(Seq(
StructField(fieldName, DateType, nullable = false,
new MetadataBuilder()
.putString("pattern", "yyyy/dd/MM")
.putString(MetadataKeys.Pattern, "yyyy/dd/MM")
.putString("timezone", "Africa/Johannesburg")
.build)
))
Expand Down Expand Up @@ -319,7 +336,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
)
val desiredSchema = StructType(Seq(
StructField(fieldName, DateType, nullable = false,
new MetadataBuilder().putString("pattern", "MMMM d 'of' yyyy").build)
new MetadataBuilder().putString(MetadataKeys.Pattern, "MMMM d 'of' yyyy").build)
))
val exp = Seq(
DateRow(Date.valueOf("1970-01-01")),
Expand Down Expand Up @@ -351,7 +368,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas
)
val desiredSchema = StructType(Seq(
StructField(fieldName, DateType, nullable = false,
new MetadataBuilder().putString("pattern", "yyyy/MM/dd 'insignificant' iiiiii").build)
new MetadataBuilder().putString(MetadataKeys.Pattern, "yyyy/MM/dd 'insignificant' iiiiii").build)
))
val exp = Seq(
DateRow(Date.valueOf("1970-01-01")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ package za.co.absa.standardization.interpreter

import org.apache.spark.sql.types._
import org.scalatest.funsuite.AnyFunSuite
import za.co.absa.standardization.ErrorMessage
import za.co.absa.standardization.{ErrorMessage, LoggerTestBase, Standardization, StandardizationErrorMessage, interpreter}
import za.co.absa.spark.commons.test.SparkTestBase
import za.co.absa.standardization.RecordIdGeneration.IdType.NoId
import za.co.absa.standardization.config.{BasicMetadataColumnsConfig, BasicStandardizationConfig, ErrorCodesConfig}
import za.co.absa.standardization.schema.MetadataKeys
import za.co.absa.standardization.types.{CommonTypeDefaults, TypeDefaults}
import za.co.absa.standardization.udf.UDFLibrary
import za.co.absa.standardization.{LoggerTestBase, Standardization, StandardizationErrorMessage}
import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements

class StandardizationInterpreter_FractionalSuite extends AnyFunSuite with SparkTestBase with LoggerTestBase {
Expand Down Expand Up @@ -57,9 +56,17 @@ class StandardizationInterpreter_FractionalSuite extends AnyFunSuite with SparkT
private val desiredSchemaWithInfinity = StructType(Seq(
StructField("description", StringType, nullable = false),
StructField("floatField", FloatType, nullable = false,
new MetadataBuilder().putString("allow_infinity", value = "true").build),
new MetadataBuilder()
.putString("allow_infinity", value = "true")
.putString(MetadataKeys.MinusInfinitySymbol, "FRRR")
.putString(MetadataKeys.MinusInfinityValue, "0")
.build),
StructField("doubleField", DoubleType, nullable = true,
new MetadataBuilder().putString("allow_infinity", value = "true").build)
new MetadataBuilder()
.putString("allow_infinity", value = "true")
.putString(MetadataKeys.PlusInfinitySymbol, "MAXVALUE")
.putString(MetadataKeys.PlusInfinityValue, "∞")
.build)
))

test("From String") {
Expand Down Expand Up @@ -168,14 +175,15 @@ class StandardizationInterpreter_FractionalSuite extends AnyFunSuite with SparkT
("01-Euler", "2.71", "2.71"),
("02-Null", null, null),
("03-Long", Long.MaxValue.toString, Long.MinValue.toString),
("04-infinity", "-∞", ""),
("04-infinity", "-∞", "MAXVALUE"),
("05-Really big", "123456789123456791245678912324789123456789123456789.12",
"-1234567891234567912456789123247891234567891234567891234567891234567891234567891234567891234567891234567891234"
+ "567891234567891234567891234567891234567891234567891234567891234567891234567891234567891234567891234678912345"
+ "678912345678912345678912345679124567891232478912345678912345678912345678912345678912345679124567891232478912"
+ "3456789123456789123456789123456789123456789123456789123456789.1"),
("06-Text", "foo", "bar"),
("07-Exponential notation", "-1.23E4", "+9.8765E-3")
("07-Exponential notation", "-1.23E4", "+9.8765E-3"),
("08-Infinity", "FRRR", "MAXVALUE")
)
val src = seq.toDF("description","floatField", "doubleField")
logDataFrameContent(src)
Expand All @@ -193,7 +201,8 @@ class StandardizationInterpreter_FractionalSuite extends AnyFunSuite with SparkT
FractionalRow("06-Text", Option(0), None, Seq(
StandardizationErrorMessage.stdCastErr("floatField", "foo", "string", "float", None),
StandardizationErrorMessage.stdCastErr("doubleField", "bar", "string", "double", None))),
FractionalRow("07-Exponential notation", Option(-12300.0f), Option(0.0098765))
FractionalRow("07-Exponential notation", Option(-12300.0f), Option(0.0098765)),
FractionalRow("08-Infinity", Option(0f), Option(Double.PositiveInfinity))
)

assertResult(exp)(std.as[FractionalRow].collect().sortBy(_.description).toList)
Expand Down
Loading
Loading