From 4fea61b6f3ebdac6c8e701c9f9738c692a9ff85a Mon Sep 17 00:00:00 2001 From: Sasa Zejnilovic Date: Wed, 9 Oct 2024 10:15:18 +0200 Subject: [PATCH 1/4] #61 Add infinity support to Dates and Numerics --- project/build.properties | 2 +- .../standardization/schema/MetadataKeys.scala | 4 ++ .../standardization/stages/TypeParser.scala | 63 ++++++++++++++---- ...StandardizationInterpreter_DateSuite.scala | 51 +++++++++----- ...rdizationInterpreter_FractionalSuite.scala | 20 ++++-- ...ardizationInterpreter_TimestampSuite.scala | 41 +++++++++--- .../stages/TypeParserSuiteTemplate.scala | 66 ++++++++++++++++--- .../TypeParser_FromBooleanTypeSuite.scala | 22 ++++++- .../stages/TypeParser_FromDateTypeSuite.scala | 22 ++++++- .../TypeParser_FromDecimalTypeSuite.scala | 22 ++++++- .../TypeParser_FromDoubleTypeSuite.scala | 22 ++++++- .../stages/TypeParser_FromLongTypeSuite.scala | 24 +++++-- .../TypeParser_FromStringTypeSuite.scala | 43 ++++++++++-- .../TypeParser_FromTimestampTypeSuite.scala | 28 ++++++-- 14 files changed, 350 insertions(+), 80 deletions(-) diff --git a/project/build.properties b/project/build.properties index c8fcab5..0b699c3 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.6.2 +sbt.version=1.10.2 diff --git a/src/main/scala/za/co/absa/standardization/schema/MetadataKeys.scala b/src/main/scala/za/co/absa/standardization/schema/MetadataKeys.scala index 050a2d2..bd385e1 100644 --- a/src/main/scala/za/co/absa/standardization/schema/MetadataKeys.scala +++ b/src/main/scala/za/co/absa/standardization/schema/MetadataKeys.scala @@ -22,6 +22,10 @@ object MetadataKeys { val DefaultValue = "default" // date & timestamp val DefaultTimeZone = "timezone" + val MinusInfinitySymbol = "minus_infinity_symbol" + val MinusInfinityValue = "minus_infinity_value" + val PlusInfinitySymbol = "plus_infinity_symbol" + val PlusInfinityValue = "plus_infinity_value" // date & timestamp & all numeric val Pattern = "pattern" // all numeric diff --git a/src/main/scala/za/co/absa/standardization/stages/TypeParser.scala b/src/main/scala/za/co/absa/standardization/stages/TypeParser.scala index 66b70e8..7fd626f 100644 --- a/src/main/scala/za/co/absa/standardization/stages/TypeParser.scala +++ b/src/main/scala/za/co/absa/standardization/stages/TypeParser.scala @@ -22,14 +22,16 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.slf4j.{Logger, LoggerFactory} import za.co.absa.spark.commons.implicits.ColumnImplicits.ColumnEnhancements +import za.co.absa.spark.commons.implicits.StructFieldImplicits.StructFieldMetadataEnhancements import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements import za.co.absa.spark.commons.utils.SchemaUtils import za.co.absa.spark.hofs.transform import za.co.absa.standardization.{ErrorMessage, StandardizationErrorMessage} import za.co.absa.standardization.config.StandardizationConfig import za.co.absa.standardization.implicits.StdColumnImplicits.StdColumnEnhancements +import za.co.absa.standardization.numeric.DecimalSymbols import za.co.absa.standardization.schema.StdSchemaUtils.FieldWithSource -import za.co.absa.standardization.schema.{MetadataValues, StdSchemaUtils} +import za.co.absa.standardization.schema.{MetadataKeys, MetadataValues, StdSchemaUtils} import za.co.absa.standardization.time.DateTimePattern import za.co.absa.standardization.typeClasses.{DoubleLike, LongLike} import za.co.absa.standardization.types.TypedStructField._ @@ -127,6 +129,7 @@ object TypeParser { private val MicrosecondsPerSecond = 1000000 private val NanosecondsPerSecond = 1000000000 private val InfinityStr = "Infinity" + private val MinusInfinityStr = "-Infinity" private val nullColumn = lit(null) //scalastyle:ignore null @@ -315,7 +318,13 @@ object TypeParser { } private abstract class NumericParser[N: TypeTag](override val field: NumericTypeStructField[N]) - (implicit defaults: TypeDefaults) extends ScalarParser[N] { + (implicit defaults: TypeDefaults) extends ScalarParser[N] with InfinitySupport { + override protected val infMinusSymbol: Option[String] = metadata.getOptString(MetadataKeys.MinusInfinitySymbol) + override protected val infMinusValue: Option[String] = metadata.getOptString(MetadataKeys.MinusInfinityValue) + override protected val infPlusSymbol: Option[String] = metadata.getOptString(MetadataKeys.PlusInfinitySymbol) + override protected val infPlusValue: Option[String] = metadata.getOptString(MetadataKeys.PlusInfinityValue) + private val columnWithInfinityReplaced = replaceInfinitySymbols(column) + override protected def standardizeAfterCheck(stdConfig: StandardizationConfig)(implicit logger: Logger): ParseOutput = { if (field.needsUdfParsing) { standardizeUsingUdf(stdConfig) @@ -335,9 +344,9 @@ object TypeParser { val columnWithProperDecimalSymbols: Column = if (replacements.nonEmpty) { val from = replacements.keys.mkString val to = replacements.values.mkString - translate(column, from, to) + translate(columnWithInfinityReplaced, from, to) } else { - column + columnWithInfinityReplaced } val columnToCast = if (field.allowInfinity && (decimalSymbols.infinityValue != InfinityStr)) { @@ -494,9 +503,14 @@ object TypeParser { * Date | O | ->to_utc_timestamp->to_date * Other | ->String->to_date | ->String->to_timestamp->to_utc_timestamp->to_date */ - private abstract class DateTimeParser[T](implicit defaults: TypeDefaults) extends PrimitiveParser[T] { + private abstract class DateTimeParser[T](implicit defaults: TypeDefaults) extends PrimitiveParser[T] with InfinitySupport { override val field: DateTimeTypeStructField[T] protected val pattern: DateTimePattern = field.pattern.get.get + override protected val infMinusSymbol: Option[String] = metadata.getOptString(MetadataKeys.MinusInfinitySymbol) + override protected val infMinusValue: Option[String] = metadata.getOptString(MetadataKeys.MinusInfinityValue) + override protected val infPlusSymbol: Option[String] = metadata.getOptString(MetadataKeys.PlusInfinitySymbol) + override protected val infPlusValue: Option[String] = metadata.getOptString(MetadataKeys.PlusInfinityValue) + private val columnWithInfinityReplaced: Column = replaceInfinitySymbols(column) override protected def assemblePrimitiveCastLogic: Column = { if (pattern.isEpoch) { @@ -516,23 +530,23 @@ object TypeParser { private def castWithPattern(): Column = { // sadly with parquet support, incoming might not be all `plain` -// underlyingType match { + // underlyingType match { origType match { case _: NullType => nullColumn - case _: DateType => castDateColumn(column) - case _: TimestampType => castTimestampColumn(column) - case _: StringType => castStringColumn(column) + case _: DateType => castDateColumn(columnWithInfinityReplaced) + case _: TimestampType => castTimestampColumn(columnWithInfinityReplaced) + case _: StringType => castStringColumn(columnWithInfinityReplaced) case ot: DoubleType => // this case covers some IBM date format where it's represented as a double ddmmyyyy.hhmmss patternNeeded(ot) - castFractionalColumn(column, ot) + castFractionalColumn(columnWithInfinityReplaced, ot) case ot: FloatType => // this case covers some IBM date format where it's represented as a double ddmmyyyy.hhmmss patternNeeded(ot) - castFractionalColumn(column, ot) + castFractionalColumn(columnWithInfinityReplaced, ot) case ot => patternNeeded(ot) - castNonStringColumn(column, ot) + castNonStringColumn(columnWithInfinityReplaced, ot) } } @@ -554,7 +568,7 @@ object TypeParser { } protected def castEpoch(): Column = { - (column.cast(decimalType) / pattern.epochFactor).cast(TimestampType) + (columnWithInfinityReplaced.cast(decimalType) / pattern.epochFactor).cast(TimestampType) } protected def castStringColumn(stringColumn: Column): Column @@ -681,4 +695,27 @@ object TypeParser { } } +sealed trait InfinitySupport { + protected def infMinusSymbol: Option[String] + protected def infMinusValue: Option[String] + protected def infPlusSymbol: Option[String] + protected def infPlusValue: Option[String] + protected val origType: DataType + + def replaceInfinitySymbols(column: Column): Column = { + val columnWithNegativeInf: Column = infMinusSymbol.flatMap { minusSymbol => + infMinusValue.map { minusValue => + when(column === lit(minusSymbol).cast(origType), lit(minusValue).cast(origType)).otherwise(column) + } + }.getOrElse(column) + + infPlusSymbol.flatMap { plusSymbol => + infPlusValue.map { plusValue => + when(columnWithNegativeInf === lit(plusSymbol).cast(origType), lit(plusValue).cast(origType)) + .otherwise(columnWithNegativeInf) + } + }.getOrElse(columnWithNegativeInf) + } +} + class TypeParserException(message: String) extends Exception(message: String) diff --git a/src/test/scala/za/co/absa/standardization/interpreter/StandardizationInterpreter_DateSuite.scala b/src/test/scala/za/co/absa/standardization/interpreter/StandardizationInterpreter_DateSuite.scala index 1a52f0f..7eda9b7 100644 --- a/src/test/scala/za/co/absa/standardization/interpreter/StandardizationInterpreter_DateSuite.scala +++ b/src/test/scala/za/co/absa/standardization/interpreter/StandardizationInterpreter_DateSuite.scala @@ -26,6 +26,7 @@ import za.co.absa.standardization.types.{CommonTypeDefaults, TypeDefaults} import za.co.absa.standardization.udf.UDFLibrary import za.co.absa.standardization.{LoggerTestBase, Standardization, StandardizationErrorMessage} import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements +import za.co.absa.standardization.schema.MetadataKeys class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBase with LoggerTestBase { import spark.implicits._ @@ -53,7 +54,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas ) val desiredSchema = StructType(Seq( StructField(fieldName, DateType, nullable = false, - new MetadataBuilder().putString("pattern", "epoch").build) + new MetadataBuilder().putString(MetadataKeys.Pattern, "epoch").build) )) val exp = Seq( DateRow(Date.valueOf("1970-01-01")), @@ -75,18 +76,28 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas val seq = Seq( 0L, 86400000, + -1, 978307199999L, - 1563288103123L + 1563288103123L, + -2 ) val desiredSchema = StructType(Seq( StructField(fieldName, DateType, nullable = false, - new MetadataBuilder().putString("pattern", "epochmilli").build) + new MetadataBuilder() + .putString(MetadataKeys.Pattern, "epochmilli") + .putString(MetadataKeys.PlusInfinitySymbol, "-1") + .putString(MetadataKeys.PlusInfinityValue, "1563278222094") + .putString(MetadataKeys.MinusInfinitySymbol, "-2") + .putString(MetadataKeys.MinusInfinityValue, "0") + .build) )) val exp = Seq( DateRow(Date.valueOf("1970-01-01")), DateRow(Date.valueOf("1970-01-02")), + DateRow(Date.valueOf("2019-07-16")), DateRow(Date.valueOf("2000-12-31")), - DateRow(Date.valueOf("2019-07-16")) + DateRow(Date.valueOf("2019-07-16")), + DateRow(Date.valueOf("1970-01-01")) ) val src = seq.toDF(fieldName) @@ -106,7 +117,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas ) val desiredSchema = StructType(Seq( StructField(fieldName, DateType, nullable = false, - new MetadataBuilder().putString("pattern", "epochmicro").build) + new MetadataBuilder().putString(MetadataKeys.Pattern, "epochmicro").build) )) val exp = Seq( DateRow(Date.valueOf("1970-01-01")), @@ -132,7 +143,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas ) val desiredSchema = StructType(Seq( StructField(fieldName, DateType, nullable = false, - new MetadataBuilder().putString("pattern", "epochnano").build) + new MetadataBuilder().putString(MetadataKeys.Pattern, "epochnano").build) )) val exp = Seq( DateRow(Date.valueOf("1970-01-01")), @@ -150,25 +161,31 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas } test("simple date pattern") { - val seq = Seq( + val seq: Seq[String] = Seq( "1970/01/01", "1970/02/01", "2000/31/12", "2019/16/07", "1970-02-02", - "crash" + "crash", + "Alfa" ) val desiredSchema = StructType(Seq( StructField(fieldName, DateType, nullable = false, - new MetadataBuilder().putString("pattern", "yyyy/dd/MM").build) + new MetadataBuilder() + .putString(MetadataKeys.Pattern, "yyyy/dd/MM") + .putString(MetadataKeys.PlusInfinitySymbol, "Alfa") + .putString(MetadataKeys.PlusInfinityValue, "1970/02/01") + .build) )) - val exp = Seq( + val exp: Seq[DateRow] = Seq( DateRow(Date.valueOf("1970-01-01")), DateRow(Date.valueOf("1970-01-02")), DateRow(Date.valueOf("2000-12-31")), DateRow(Date.valueOf("2019-07-16")), DateRow(Date.valueOf("1970-01-01"), Seq(StandardizationErrorMessage.stdCastErr(fieldName, "1970-02-02", "string", "date", Some("yyyy/dd/MM")))), - DateRow(Date.valueOf("1970-01-01"), Seq(StandardizationErrorMessage.stdCastErr(fieldName, "crash", "string", "date", Some("yyyy/dd/MM")))) + DateRow(Date.valueOf("1970-01-01"), Seq(StandardizationErrorMessage.stdCastErr(fieldName, "crash", "string", "date", Some("yyyy/dd/MM")))), + DateRow(Date.valueOf("1970-01-02")) ) val src = seq.toDF(fieldName) @@ -190,7 +207,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas ) val desiredSchema = StructType(Seq( StructField(fieldName, DateType, nullable = false, - new MetadataBuilder().putString("pattern", "HH-mm-ss dd.MM.yyyy ZZZ").build) + new MetadataBuilder().putString(MetadataKeys.Pattern, "HH-mm-ss dd.MM.yyyy ZZZ").build) )) val exp = Seq( DateRow(Date.valueOf("1970-01-01")), @@ -221,7 +238,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas ) val desiredSchema = StructType(Seq( StructField(fieldName, DateType, nullable = false, - new MetadataBuilder().putString("pattern", "HH:mm:ss(SSSnnnnnn) dd+MM+yyyy XXX").build) + new MetadataBuilder().putString(MetadataKeys.Pattern, "HH:mm:ss(SSSnnnnnn) dd+MM+yyyy XXX").build) )) val exp = Seq( DateRow(Date.valueOf("1970-01-01")), @@ -252,7 +269,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas val desiredSchema = StructType(Seq( StructField(fieldName, DateType, nullable = false, new MetadataBuilder() - .putString("pattern", "yyyy/dd/MM") + .putString(MetadataKeys.Pattern, "yyyy/dd/MM") .putString("timezone", "EST") .build) )) @@ -286,7 +303,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas val desiredSchema = StructType(Seq( StructField(fieldName, DateType, nullable = false, new MetadataBuilder() - .putString("pattern", "yyyy/dd/MM") + .putString(MetadataKeys.Pattern, "yyyy/dd/MM") .putString("timezone", "Africa/Johannesburg") .build) )) @@ -319,7 +336,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas ) val desiredSchema = StructType(Seq( StructField(fieldName, DateType, nullable = false, - new MetadataBuilder().putString("pattern", "MMMM d 'of' yyyy").build) + new MetadataBuilder().putString(MetadataKeys.Pattern, "MMMM d 'of' yyyy").build) )) val exp = Seq( DateRow(Date.valueOf("1970-01-01")), @@ -351,7 +368,7 @@ class StandardizationInterpreter_DateSuite extends AnyFunSuite with SparkTestBas ) val desiredSchema = StructType(Seq( StructField(fieldName, DateType, nullable = false, - new MetadataBuilder().putString("pattern", "yyyy/MM/dd 'insignificant' iiiiii").build) + new MetadataBuilder().putString(MetadataKeys.Pattern, "yyyy/MM/dd 'insignificant' iiiiii").build) )) val exp = Seq( DateRow(Date.valueOf("1970-01-01")), diff --git a/src/test/scala/za/co/absa/standardization/interpreter/StandardizationInterpreter_FractionalSuite.scala b/src/test/scala/za/co/absa/standardization/interpreter/StandardizationInterpreter_FractionalSuite.scala index 698857b..74a7f28 100644 --- a/src/test/scala/za/co/absa/standardization/interpreter/StandardizationInterpreter_FractionalSuite.scala +++ b/src/test/scala/za/co/absa/standardization/interpreter/StandardizationInterpreter_FractionalSuite.scala @@ -57,9 +57,17 @@ class StandardizationInterpreter_FractionalSuite extends AnyFunSuite with SparkT private val desiredSchemaWithInfinity = StructType(Seq( StructField("description", StringType, nullable = false), StructField("floatField", FloatType, nullable = false, - new MetadataBuilder().putString("allow_infinity", value = "true").build), + new MetadataBuilder() + .putString("allow_infinity", value = "true") + .putString(MetadataKeys.MinusInfinitySymbol, "FRRR") + .putString(MetadataKeys.MinusInfinityValue, "0") + .build), StructField("doubleField", DoubleType, nullable = true, - new MetadataBuilder().putString("allow_infinity", value = "true").build) + new MetadataBuilder() + .putString("allow_infinity", value = "true") + .putString(MetadataKeys.PlusInfinitySymbol, "MAXVALUE") + .putString(MetadataKeys.PlusInfinityValue, "∞") + .build) )) test("From String") { @@ -74,7 +82,8 @@ class StandardizationInterpreter_FractionalSuite extends AnyFunSuite with SparkT + "789123456789123456789123456791245678912324789123456789123456789123456789123456789123456791245678912324789123" + "456789123456789123456789123456789123456789123456789123456789.1"), ("06-Text", "foo", "bar"), - ("07-Exponential notation", "-1.23E4", "+9.8765E-3") + ("07-Exponential notation", "-1.23E4", "+9.8765E-3"), + ("08-Infinity", "FRRR", "MAXVALUE") ) val src = seq.toDF("description","floatField", "doubleField") logDataFrameContent(src) @@ -100,7 +109,8 @@ class StandardizationInterpreter_FractionalSuite extends AnyFunSuite with SparkT FractionalRow("06-Text", Option(0), None, Seq( StandardizationErrorMessage.stdCastErr("floatField", "foo", "string", "float", None), StandardizationErrorMessage.stdCastErr("doubleField", "bar", "string", "double", None))), - FractionalRow("07-Exponential notation", Option(-12300.0f), Option(0.0098765)) + FractionalRow("07-Exponential notation", Option(-12300.0f), Option(0.0098765)), + FractionalRow("08-Infinity", Option(0f), Option(Double.PositiveInfinity)) ) assertResult(exp)(std.as[FractionalRow].collect().sortBy(_.description).toList) @@ -168,7 +178,7 @@ class StandardizationInterpreter_FractionalSuite extends AnyFunSuite with SparkT ("01-Euler", "2.71", "2.71"), ("02-Null", null, null), ("03-Long", Long.MaxValue.toString, Long.MinValue.toString), - ("04-infinity", "-∞", "∞"), + ("04-infinity", "-∞", "MAXVALUE"), ("05-Really big", "123456789123456791245678912324789123456789123456789.12", "-1234567891234567912456789123247891234567891234567891234567891234567891234567891234567891234567891234567891234" + "567891234567891234567891234567891234567891234567891234567891234567891234567891234567891234567891234678912345" diff --git a/src/test/scala/za/co/absa/standardization/interpreter/StandardizationInterpreter_TimestampSuite.scala b/src/test/scala/za/co/absa/standardization/interpreter/StandardizationInterpreter_TimestampSuite.scala index 40f7294..0bb3e40 100644 --- a/src/test/scala/za/co/absa/standardization/interpreter/StandardizationInterpreter_TimestampSuite.scala +++ b/src/test/scala/za/co/absa/standardization/interpreter/StandardizationInterpreter_TimestampSuite.scala @@ -22,6 +22,7 @@ import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancemen import za.co.absa.spark.commons.test.SparkTestBase import za.co.absa.standardization.RecordIdGeneration.IdType.NoId import za.co.absa.standardization.config.{BasicMetadataColumnsConfig, BasicStandardizationConfig, ErrorCodesConfig} +import za.co.absa.standardization.schema.MetadataKeys import za.co.absa.standardization.types.{CommonTypeDefaults, TypeDefaults} import za.co.absa.standardization.udf.UDFLibrary import za.co.absa.standardization.{LoggerTestBase, Standardization, StandardizationErrorMessage} @@ -49,17 +50,27 @@ class StandardizationInterpreter_TimestampSuite extends AnyFunSuite with SparkTe 0, 86400, 978307199, - 1563288103 + 1563288103, + -1, + -2 ) val desiredSchema = StructType(Seq( StructField(fieldName, TimestampType, nullable = false, - new MetadataBuilder().putString("pattern", "epoch").build) + new MetadataBuilder() + .putString(MetadataKeys.Pattern, "epoch") + .putString(MetadataKeys.PlusInfinitySymbol, "-1") + .putString(MetadataKeys.PlusInfinityValue, "1563288103") + .putString(MetadataKeys.MinusInfinitySymbol, "-2") + .putString(MetadataKeys.MinusInfinityValue, "0") + .build) )) val exp = Seq( TimestampRow(Timestamp.valueOf("1970-01-01 00:00:00")), TimestampRow(Timestamp.valueOf("1970-01-02 00:00:00")), TimestampRow(Timestamp.valueOf("2000-12-31 23:59:59")), - TimestampRow(Timestamp.valueOf("2019-07-16 14:41:43")) + TimestampRow(Timestamp.valueOf("2019-07-16 14:41:43")), + TimestampRow(Timestamp.valueOf("2019-07-16 14:41:43")), + TimestampRow(Timestamp.valueOf("1970-01-01 00:00:00")) ) val src = seq.toDF(fieldName) @@ -131,17 +142,23 @@ class StandardizationInterpreter_TimestampSuite extends AnyFunSuite with SparkTe 0, 86400000000000L, 978307199999999999L, - 1563288103123456789L + 1563288103123456789L, + -1 ) val desiredSchema = StructType(Seq( StructField(fieldName, TimestampType, nullable = false, - new MetadataBuilder().putString("pattern", "epochnano").build) + new MetadataBuilder() + .putString("pattern", "epochnano") + .putString(MetadataKeys.PlusInfinitySymbol, "-1") + .putString(MetadataKeys.PlusInfinityValue, "325035936000000000") + .build) )) val exp = Seq( TimestampRow(Timestamp.valueOf("1970-01-01 00:00:00")), TimestampRow(Timestamp.valueOf("1970-01-02 00:00:00")), TimestampRow(Timestamp.valueOf("2000-12-31 23:59:59.999999000")), - TimestampRow(Timestamp.valueOf("2019-07-16 14:41:43.123456000")) + TimestampRow(Timestamp.valueOf("2019-07-16 14:41:43.123456000")), + TimestampRow(Timestamp.valueOf("1980-04-19 23:45:36.0")) ) val src = seq.toDF(fieldName) @@ -159,11 +176,16 @@ class StandardizationInterpreter_TimestampSuite extends AnyFunSuite with SparkTe "31.12.2000 23-59-59", "16.07.2019 14-41-43", "02.02.1970_00-00-00", - "nope" + "nope", + "-inf" ) val desiredSchema = StructType(Seq( StructField(fieldName, TimestampType, nullable = false, - new MetadataBuilder().putString("pattern", "dd.MM.yyyy HH-mm-ss").build) + new MetadataBuilder() + .putString("pattern", "dd.MM.yyyy HH-mm-ss") + .putString(MetadataKeys.MinusInfinitySymbol, "-inf") + .putString(MetadataKeys.MinusInfinityValue, "01.01.2000 00-00-00") + .build) )) val exp = Seq( TimestampRow(Timestamp.valueOf("1970-01-01 00:00:00")), @@ -171,7 +193,8 @@ class StandardizationInterpreter_TimestampSuite extends AnyFunSuite with SparkTe TimestampRow(Timestamp.valueOf("2000-12-31 23:59:59")), TimestampRow(Timestamp.valueOf("2019-07-16 14:41:43")), TimestampRow(Timestamp.valueOf("1970-01-01 00:00:00"), Seq(StandardizationErrorMessage.stdCastErr(fieldName, "02.02.1970_00-00-00", "string", "timestamp", Some("dd.MM.yyyy HH-mm-ss")))), - TimestampRow(Timestamp.valueOf("1970-01-01 00:00:00"), Seq(StandardizationErrorMessage.stdCastErr(fieldName, "nope", "string", "timestamp", Some("dd.MM.yyyy HH-mm-ss")))) + TimestampRow(Timestamp.valueOf("1970-01-01 00:00:00"), Seq(StandardizationErrorMessage.stdCastErr(fieldName, "nope", "string", "timestamp", Some("dd.MM.yyyy HH-mm-ss")))), + TimestampRow(Timestamp.valueOf("2000-01-01 00:00:00")) ) val src = seq.toDF(fieldName) diff --git a/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParserSuiteTemplate.scala b/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParserSuiteTemplate.scala index 6238857..f77614f 100644 --- a/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParserSuiteTemplate.scala +++ b/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParserSuiteTemplate.scala @@ -24,6 +24,7 @@ import za.co.absa.spark.commons.test.SparkTestBase import za.co.absa.standardization.RecordIdGeneration.IdType.NoId import za.co.absa.standardization.config.{BasicMetadataColumnsConfig, BasicStandardizationConfig} import za.co.absa.standardization.interpreter.stages.TypeParserSuiteTemplate._ +import za.co.absa.standardization.schema.MetadataKeys import za.co.absa.standardization.stages.TypeParser import za.co.absa.standardization.time.DateTimePattern import za.co.absa.standardization.types.{CommonTypeDefaults, ParseOutput, TypeDefaults, TypedStructField} @@ -46,7 +47,7 @@ trait TypeParserSuiteTemplate extends AnyFunSuite with SparkTestBase { private implicit val udfLib: UDFLibrary = new UDFLibrary(stdConfig) private implicit val defaults: TypeDefaults = CommonTypeDefaults - protected def createCastTemplate(toType: DataType, pattern: String, timezone: Option[String]): String + protected def createCastTemplate(srcType: StructField, target: StructField, pattern: String, timezone: Option[String]): String protected def createErrorCondition(srcField: String, target: StructField, castS: String):String private val sourceFieldName = "sourceField" @@ -85,6 +86,20 @@ trait TypeParserSuiteTemplate extends AnyFunSuite with SparkTestBase { testTemplate(floatField, schema, path) } + protected def doTestIntoFloatWithInf(input: Input): Unit = { + import input._ + val floatField = StructField("floatField", FloatType, nullable = false, + new MetadataBuilder() + .putString("sourcecolumn", sourceFieldName) + .putString(MetadataKeys.PlusInfinityValue, Float.PositiveInfinity.toString) + .putString(MetadataKeys.PlusInfinitySymbol, "inf") + .putString(MetadataKeys.MinusInfinityValue, Float.NegativeInfinity.toString) + .putString(MetadataKeys.MinusInfinitySymbol, "-inf") + .build) + val schema = buildSchema(Array(sourceField(baseType), floatField), path) + testTemplate(floatField, schema, path) + } + protected def doTestIntoIntegerField(input: Input): Unit = { import input._ val integerField = StructField("integerField", IntegerType, nullable = true, @@ -183,6 +198,35 @@ trait TypeParserSuiteTemplate extends AnyFunSuite with SparkTestBase { testTemplate(timestampField, schema, path, timestampPattern, Option(fixedTimezone)) } + protected def doTestIntoTimestampWithPlusInfinity(input: Input): Unit = { + import input._ + val timestampField = StructField("timestampField", TimestampType, nullable = false, + new MetadataBuilder() + .putString("sourcecolumn", sourceFieldName) + .putString("pattern", timestampPattern) + .putString(MetadataKeys.PlusInfinityValue, "99991231") + .putString(MetadataKeys.PlusInfinitySymbol, "inf") + .putString(MetadataKeys.MinusInfinityValue, "00010101") + .putString(MetadataKeys.MinusInfinitySymbol, "-inf") + .build) + val schema = buildSchema(Array(sourceField(baseType), timestampField), path) + testTemplate(timestampField, schema, path, timestampPattern) + } + + protected def doTestIntoDateFieldWithInf(input: Input): Unit = { + import input._ + val timestampField = StructField("dateField", DateType, nullable = false, + new MetadataBuilder() + .putString("sourcecolumn", sourceFieldName) + .putString(MetadataKeys.PlusInfinityValue, "99991231") + .putString(MetadataKeys.PlusInfinitySymbol, "inf") + .putString(MetadataKeys.MinusInfinityValue, "00010101") + .putString(MetadataKeys.MinusInfinitySymbol, "-inf") + .build) + val schema = buildSchema(Array(sourceField(baseType), timestampField), path) + testTemplate(timestampField, schema, path, "yyyy-MM-dd") + } + protected def doTestIntoDateFieldWithEpochPattern(input: Input): Unit = { import input._ val dateField = StructField("dateField", DateType, nullable = false, @@ -225,14 +269,15 @@ trait TypeParserSuiteTemplate extends AnyFunSuite with SparkTestBase { private def testTemplate(target: StructField, schema: StructType, path: String, pattern: String = "", timezone: Option[String] = None): Unit = { val srcField = fullName(path, sourceFieldName) - val srcType = getFieldByFullName(schema, srcField).dataType - val castString = createCastTemplate(target.dataType, pattern, timezone).format(srcField, srcField) - val errColumnExpression = assembleErrorExpression(srcField, target, applyRecasting(castString), srcType.typeName, target.dataType.typeName, pattern) + val srcStructField = getFieldByFullName(schema, srcField) + val srcType = srcStructField.dataType + val castString = createCastTemplate(srcStructField, target, pattern, timezone).replace("%s", "%1$s").format(srcField) + val errColumnExpression = assembleErrorExpression(srcField, target, applyRecasting(castString), srcType, target.dataType.typeName, pattern) val stdCastExpression = assembleCastExpression(srcField, target, applyRecasting(castString), errColumnExpression) val output: ParseOutput = TypeParser.standardize(target, path, schema, stdConfig) - doAssert(errColumnExpression, output.errors.toString()) - doAssert(stdCastExpression, output.stdCol.toString()) + doAssert(errColumnExpression, output.errors.toString(), "assembleErrorExpression") + doAssert(stdCastExpression, output.stdCol.toString(), "assembleCastExpression") } def applyRecasting(expr: String): String = { @@ -286,22 +331,23 @@ trait TypeParserSuiteTemplate extends AnyFunSuite with SparkTestBase { if (SPARK_VERSION.startsWith("2.")) expresionWithQuotes else expresionWithQuotes.replaceAll("`", "") } - private def assembleErrorExpression(srcField: String, target: StructField, castS: String, fromType: String, toType: String, pattern: String): String = { + private def assembleErrorExpression(srcField: String, target: StructField, castS: String, fromType: DataType, toType: String, pattern: String): String = { val errCond = createErrorCondition(srcField, target, castS) val patternExpr = if (pattern.isEmpty) "NULL" else pattern if (target.nullable) { - s"CASE WHEN (($srcField IS NOT NULL) AND ($errCond)) THEN array(stdCastErr($srcField, CAST($srcField AS STRING), $fromType, $toType, $patternExpr)) ELSE [] END" + s"CASE WHEN (($srcField IS NOT NULL) AND ($errCond)) THEN array(stdCastErr($srcField, CAST($srcField AS STRING), ${fromType.typeName}, $toType, $patternExpr)) ELSE [] END" } else { s"CASE WHEN ($srcField IS NULL) THEN array(stdNullErr($srcField)) ELSE " + - s"CASE WHEN ($errCond) THEN array(stdCastErr($srcField, CAST($srcField AS STRING), $fromType, $toType, $patternExpr)) ELSE [] END END" + s"CASE WHEN ($errCond) THEN array(stdCastErr($srcField, CAST($srcField AS STRING), ${fromType.typeName}, $toType, $patternExpr)) ELSE [] END END" } } - private def doAssert(expectedExpression: String, actualExpression: String): Unit = { + private def doAssert(expectedExpression: String, actualExpression: String, method: String): Unit = { if (actualExpression != expectedExpression) { // the expressions tend to be rather long, the assert most often cuts the beginning and/or end of the string // showing just the vicinity of the difference, so we log the output of the whole strings + log.error(s"Method: $method") log.error(s"Expected: $expectedExpression") log.error(s"Actual : $actualExpression") assert(actualExpression == expectedExpression) diff --git a/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromBooleanTypeSuite.scala b/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromBooleanTypeSuite.scala index 4f2bcb0..17c782e 100644 --- a/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromBooleanTypeSuite.scala +++ b/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromBooleanTypeSuite.scala @@ -17,7 +17,9 @@ package za.co.absa.standardization.interpreter.stages import org.apache.spark.sql.types._ +import za.co.absa.spark.commons.implicits.StructFieldImplicits.StructFieldMetadataEnhancements import za.co.absa.standardization.interpreter.stages.TypeParserSuiteTemplate.Input +import za.co.absa.standardization.schema.MetadataKeys import za.co.absa.standardization.time.DateTimePattern class TypeParser_FromBooleanTypeSuite extends TypeParserSuiteTemplate { @@ -32,16 +34,30 @@ class TypeParser_FromBooleanTypeSuite extends TypeParserSuiteTemplate { path = "Boo" ) - override protected def createCastTemplate(toType: DataType, pattern: String, timezone: Option[String]): String = { + override protected def createCastTemplate(srcStructField: StructField, target: StructField, pattern: String, timezone: Option[String]): String = { + val (infMinusValue, infMinusSymbol, infPlusValue, infPlusSymbol): (Option[String], Option[String], Option[String], Option[String]) = { + val infMinusValue = target.metadata.getOptString(MetadataKeys.MinusInfinityValue) + val infMinusSymbol = target.metadata.getOptString(MetadataKeys.MinusInfinitySymbol) + val infPlusValue = target.metadata.getOptString(MetadataKeys.PlusInfinityValue) + val infPlusSymbol = target.metadata.getOptString(MetadataKeys.PlusInfinitySymbol) + (infMinusValue, infMinusSymbol, infPlusValue, infPlusSymbol) + } + val srcType = srcStructField.dataType.sql + val isEpoch = DateTimePattern.isEpoch(pattern) - (toType, isEpoch, timezone) match { + (target.dataType, isEpoch, timezone) match { case (DateType, true, _) => s"to_date(CAST((CAST(`%s` AS DECIMAL(30,9)) / ${DateTimePattern.epochFactor(pattern)}L) AS TIMESTAMP))" case (TimestampType, true, _) => s"CAST((CAST(%s AS DECIMAL(30,9)) / ${DateTimePattern.epochFactor(pattern)}) AS TIMESTAMP)" case (DateType, _, Some(tz)) => s"to_date(to_utc_timestamp(to_timestamp(CAST(`%s` AS STRING), '$pattern'), '$tz'))" case (TimestampType, _, Some(tz)) => s"to_utc_timestamp(to_timestamp(CAST(`%s` AS STRING), '$pattern'), $tz)" case (TimestampType, _, _) => s"to_timestamp(CAST(`%s` AS STRING), '$pattern')" case (DateType, _, _) => s"to_date(CAST(`%s` AS STRING), '$pattern')" - case _ => s"CAST(%s AS ${toType.sql})" + case (ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | _: DecimalType, _, _) if (infMinusValue.isDefined && infMinusSymbol.isDefined && infPlusValue.isDefined && infPlusSymbol.isDefined) => + s"CAST(CASE WHEN (CASE WHEN (%s = ${infMinusSymbol.get}) THEN CAST(${infMinusValue.get} AS $srcType) " + + s"ELSE %s END = ${infPlusSymbol.get}) THEN CAST(${infPlusValue.get} AS $srcType) ELSE CASE WHEN " + + s"(%s = ${infMinusSymbol.get}) THEN CAST(${infMinusValue.get} AS $srcType) ELSE %s END END " + + s"AS ${target.dataType.sql})" + case _ => s"CAST(%s AS ${target.dataType.sql})" } } diff --git a/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromDateTypeSuite.scala b/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromDateTypeSuite.scala index 985ab15..3cc8c3a 100644 --- a/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromDateTypeSuite.scala +++ b/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromDateTypeSuite.scala @@ -17,7 +17,9 @@ package za.co.absa.standardization.interpreter.stages import org.apache.spark.sql.types._ +import za.co.absa.spark.commons.implicits.StructFieldImplicits.StructFieldMetadataEnhancements import za.co.absa.standardization.interpreter.stages.TypeParserSuiteTemplate.Input +import za.co.absa.standardization.schema.MetadataKeys import za.co.absa.standardization.time.DateTimePattern class TypeParser_FromDateTypeSuite extends TypeParserSuiteTemplate { @@ -33,16 +35,30 @@ class TypeParser_FromDateTypeSuite extends TypeParserSuiteTemplate { datetimeNeedsPattern = false ) - override protected def createCastTemplate(toType: DataType, pattern: String, timezone: Option[String]): String = { + override protected def createCastTemplate(srcStructField: StructField, target: StructField, pattern: String, timezone: Option[String]): String = { + val (infMinusValue, infMinusSymbol, infPlusValue, infPlusSymbol): (Option[String], Option[String], Option[String], Option[String]) = { + val infMinusValue = target.metadata.getOptString(MetadataKeys.MinusInfinityValue) + val infMinusSymbol = target.metadata.getOptString(MetadataKeys.MinusInfinitySymbol) + val infPlusValue = target.metadata.getOptString(MetadataKeys.PlusInfinityValue) + val infPlusSymbol = target.metadata.getOptString(MetadataKeys.PlusInfinitySymbol) + (infMinusValue, infMinusSymbol, infPlusValue, infPlusSymbol) + } + val srcType = srcStructField.dataType.sql + val isEpoch = DateTimePattern.isEpoch(pattern) - (toType, isEpoch, timezone) match { + (target.dataType, isEpoch, timezone) match { case (DateType, true, _) => s"to_date(CAST((CAST(`%s` AS DECIMAL(30,9)) / ${DateTimePattern.epochFactor(pattern)}L) AS TIMESTAMP))" case (TimestampType, true, _) => s"CAST((CAST(%s AS DECIMAL(30,9)) / ${DateTimePattern.epochFactor(pattern)}) AS TIMESTAMP)" case (DateType, _, Some(tz)) => s"to_date(to_utc_timestamp(`%s`, '$tz'))" case (TimestampType, _, Some(tz)) => s"to_utc_timestamp(%s, $tz)" case (DateType, _, _) => "%s" case (TimestampType, _, _) => "to_timestamp(`%s`)" - case _ => s"CAST(%s AS ${toType.sql})" + case (ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | _: DecimalType, _, _) if (infMinusValue.isDefined && infMinusSymbol.isDefined && infPlusValue.isDefined && infPlusSymbol.isDefined) => + s"CAST(CASE WHEN (CASE WHEN (%s = ${infMinusSymbol.get}) THEN CAST(${infMinusValue.get} AS $srcType) " + + s"ELSE %s END = ${infPlusSymbol.get}) THEN CAST(${infPlusValue.get} AS $srcType) ELSE CASE WHEN " + + s"(%s = ${infMinusSymbol.get}) THEN CAST(${infMinusValue.get} AS $srcType) ELSE %s END END " + + s"AS ${target.dataType.sql})" + case _ => s"CAST(%s AS ${target.dataType.sql})" } } diff --git a/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromDecimalTypeSuite.scala b/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromDecimalTypeSuite.scala index 6498c5b..f0ce6d3 100644 --- a/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromDecimalTypeSuite.scala +++ b/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromDecimalTypeSuite.scala @@ -17,7 +17,9 @@ package za.co.absa.standardization.interpreter.stages import org.apache.spark.sql.types._ +import za.co.absa.spark.commons.implicits.StructFieldImplicits.StructFieldMetadataEnhancements import za.co.absa.standardization.interpreter.stages.TypeParserSuiteTemplate.Input +import za.co.absa.standardization.schema.MetadataKeys import za.co.absa.standardization.time.DateTimePattern class TypeParser_FromDecimalTypeSuite extends TypeParserSuiteTemplate { @@ -32,16 +34,30 @@ class TypeParser_FromDecimalTypeSuite extends TypeParserSuiteTemplate { path = "hello" ) - override protected def createCastTemplate(toType: DataType, pattern: String, timezone: Option[String]): String = { + override protected def createCastTemplate(srcStructField: StructField, target: StructField, pattern: String, timezone: Option[String]): String = { + val (infMinusValue, infMinusSymbol, infPlusValue, infPlusSymbol): (Option[String], Option[String], Option[String], Option[String]) = { + val infMinusValue = target.metadata.getOptString(MetadataKeys.MinusInfinityValue) + val infMinusSymbol = target.metadata.getOptString(MetadataKeys.MinusInfinitySymbol) + val infPlusValue = target.metadata.getOptString(MetadataKeys.PlusInfinityValue) + val infPlusSymbol = target.metadata.getOptString(MetadataKeys.PlusInfinitySymbol) + (infMinusValue, infMinusSymbol, infPlusValue, infPlusSymbol) + } + val srcType = srcStructField.dataType.sql + val isEpoch = DateTimePattern.isEpoch(pattern) - (toType, isEpoch, timezone) match { + (target.dataType, isEpoch, timezone) match { case (DateType, true, _) => s"to_date(CAST((CAST(`%s` AS DECIMAL(30,9)) / ${DateTimePattern.epochFactor(pattern)}L) AS TIMESTAMP))" case (TimestampType, true, _) => s"CAST((CAST(%s AS DECIMAL(30,9)) / ${DateTimePattern.epochFactor(pattern)}) AS TIMESTAMP)" case (DateType, _, Some(tz)) => s"to_date(to_utc_timestamp(to_timestamp(CAST(`%s` AS STRING), '$pattern'), '$tz'))" case (TimestampType, _, Some(tz)) => s"to_utc_timestamp(to_timestamp(CAST(`%s` AS STRING), '$pattern'), $tz)" case (TimestampType, _, _) => s"to_timestamp(CAST(`%s` AS STRING), '$pattern')" case (DateType, _, _) => s"to_date(CAST(`%s` AS STRING), '$pattern')" - case _ => s"CAST(%s AS ${toType.sql})" + case (ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | _: DecimalType, _, _) if (infMinusValue.isDefined && infMinusSymbol.isDefined && infPlusValue.isDefined && infPlusSymbol.isDefined) => + s"CAST(CASE WHEN (CASE WHEN (%s = ${infMinusSymbol.get}) THEN CAST(${infMinusValue.get} AS $srcType) " + + s"ELSE %s END = ${infPlusSymbol.get}) THEN CAST(${infPlusValue.get} AS $srcType) ELSE CASE WHEN " + + s"(%s = ${infMinusSymbol.get}) THEN CAST(${infMinusValue.get} AS $srcType) ELSE %s END END " + + s"AS ${target.dataType.sql})" + case _ => s"CAST(%s AS ${target.dataType.sql})" } } diff --git a/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromDoubleTypeSuite.scala b/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromDoubleTypeSuite.scala index c6f5396..3adbe08 100644 --- a/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromDoubleTypeSuite.scala +++ b/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromDoubleTypeSuite.scala @@ -17,7 +17,9 @@ package za.co.absa.standardization.interpreter.stages import org.apache.spark.sql.types._ +import za.co.absa.spark.commons.implicits.StructFieldImplicits.StructFieldMetadataEnhancements import za.co.absa.standardization.interpreter.stages.TypeParserSuiteTemplate.Input +import za.co.absa.standardization.schema.MetadataKeys import za.co.absa.standardization.time.DateTimePattern class TypeParser_FromDoubleTypeSuite extends TypeParserSuiteTemplate { @@ -36,16 +38,30 @@ class TypeParser_FromDoubleTypeSuite extends TypeParserSuiteTemplate { private val datePatternDS = DS(6, 2) private val timestampPatternDS = DS(10, 4) - override protected def createCastTemplate(toType: DataType, pattern: String, timezone: Option[String]): String = { + override protected def createCastTemplate(srcStructField: StructField, target: StructField, pattern: String, timezone: Option[String]): String = { + val (infMinusValue, infMinusSymbol, infPlusValue, infPlusSymbol): (Option[String], Option[String], Option[String], Option[String]) = { + val infMinusValue = target.metadata.getOptString(MetadataKeys.MinusInfinityValue) + val infMinusSymbol = target.metadata.getOptString(MetadataKeys.MinusInfinitySymbol) + val infPlusValue = target.metadata.getOptString(MetadataKeys.PlusInfinityValue) + val infPlusSymbol = target.metadata.getOptString(MetadataKeys.PlusInfinitySymbol) + (infMinusValue, infMinusSymbol, infPlusValue, infPlusSymbol) + } + val srcType = srcStructField.dataType.sql + val isEpoch = DateTimePattern.isEpoch(pattern) - (toType, isEpoch, timezone) match { + (target.dataType, isEpoch, timezone) match { case (DateType, true, _) => s"to_date(CAST((CAST(`%s` AS DECIMAL(30,9)) / ${DateTimePattern.epochFactor(pattern)}L) AS TIMESTAMP))" case (TimestampType, true, _) => s"CAST((CAST(%s AS DECIMAL(30,9)) / ${DateTimePattern.epochFactor(pattern)}) AS TIMESTAMP)" case (DateType, _, Some(tz)) => s"to_date(to_utc_timestamp(to_timestamp(CAST(CAST(`%s` AS DECIMAL(${datePatternDS.precision},${datePatternDS.scale})) AS STRING), '$pattern'), '$tz'))" case (TimestampType, _, Some(tz)) => s"to_utc_timestamp(to_timestamp(CAST(CAST(`%s` AS DECIMAL(${timestampPatternDS.precision},${timestampPatternDS.scale})) AS STRING), '$pattern'), $tz)" case (DateType, _, _) => s"to_date(CAST(CAST(`%s` AS DECIMAL(${datePatternDS.precision},${datePatternDS.scale})) AS STRING), '$pattern')" case (TimestampType, _, _) => s"to_timestamp(CAST(CAST(`%s` AS DECIMAL(${timestampPatternDS.precision},${timestampPatternDS.scale})) AS STRING), '$pattern')" - case _ => s"CAST(%s AS ${toType.sql})" + case (ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | _: DecimalType, _, _) if (infMinusValue.isDefined && infMinusSymbol.isDefined && infPlusValue.isDefined && infPlusSymbol.isDefined) => + s"CAST(CASE WHEN (CASE WHEN (%s = ${infMinusSymbol.get}) THEN CAST(${infMinusValue.get} AS $srcType) " + + s"ELSE %s END = ${infPlusSymbol.get}) THEN CAST(${infPlusValue.get} AS $srcType) ELSE CASE WHEN " + + s"(%s = ${infMinusSymbol.get}) THEN CAST(${infMinusValue.get} AS $srcType) ELSE %s END END " + + s"AS ${target.dataType.sql})" + case _ => s"CAST(%s AS ${target.dataType.sql})" } } diff --git a/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromLongTypeSuite.scala b/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromLongTypeSuite.scala index ff6c71b..aa559a2 100644 --- a/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromLongTypeSuite.scala +++ b/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromLongTypeSuite.scala @@ -17,7 +17,9 @@ package za.co.absa.standardization.interpreter.stages import org.apache.spark.sql.types._ +import za.co.absa.spark.commons.implicits.StructFieldImplicits.StructFieldMetadataEnhancements import za.co.absa.standardization.interpreter.stages.TypeParserSuiteTemplate.Input +import za.co.absa.standardization.schema.MetadataKeys import za.co.absa.standardization.time.DateTimePattern class TypeParser_FromLongTypeSuite extends TypeParserSuiteTemplate { @@ -32,16 +34,30 @@ class TypeParser_FromLongTypeSuite extends TypeParserSuiteTemplate { path = "Hey" ) - override protected def createCastTemplate(toType: DataType, pattern: String, timezone: Option[String]): String = { + override protected def createCastTemplate(srcStructField: StructField, target: StructField, pattern: String, timezone: Option[String]): String = { + val (infMinusValue, infMinusSymbol, infPlusValue, infPlusSymbol): (Option[String], Option[String], Option[String], Option[String]) = { + val infMinusValue = target.metadata.getOptString(MetadataKeys.MinusInfinityValue) + val infMinusSymbol = target.metadata.getOptString(MetadataKeys.MinusInfinitySymbol) + val infPlusValue = target.metadata.getOptString(MetadataKeys.PlusInfinityValue) + val infPlusSymbol = target.metadata.getOptString(MetadataKeys.PlusInfinitySymbol) + (infMinusValue, infMinusSymbol, infPlusValue, infPlusSymbol) + } + val srcType = srcStructField.dataType.sql + val isEpoch = DateTimePattern.isEpoch(pattern) - (toType, isEpoch, timezone) match { + (target.dataType, isEpoch, timezone) match { case (DateType, true, _) => s"to_date(CAST((CAST(`%s` AS DECIMAL(30,9)) / ${DateTimePattern.epochFactor(pattern)}L) AS TIMESTAMP))" case (TimestampType, true, _) => s"CAST((CAST(%s AS DECIMAL(30,9)) / ${DateTimePattern.epochFactor(pattern)}) AS TIMESTAMP)" case (DateType, _, Some(tz)) => s"to_date(to_utc_timestamp(to_timestamp(CAST(`%s` AS STRING), '$pattern'), '$tz'))" case (TimestampType, _, Some(tz)) => s"to_utc_timestamp(to_timestamp(CAST(`%s` AS STRING), '$pattern'), $tz)" case (TimestampType, _, _) => s"to_timestamp(CAST(`%s` AS STRING), '$pattern')" case (DateType, _, _) => s"to_date(CAST(`%s` AS STRING), '$pattern')" - case _ => s"CAST(%s AS ${toType.sql})" + case (ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | _: DecimalType, _, _) if (infMinusValue.isDefined && infMinusSymbol.isDefined && infPlusValue.isDefined && infPlusSymbol.isDefined) => + s"CAST(CASE WHEN (CASE WHEN (%s = ${infMinusSymbol.get}) THEN CAST(${infMinusValue.get} AS $srcType) " + + s"ELSE %s END = ${infPlusSymbol.get}) THEN CAST(${infPlusValue.get} AS $srcType) ELSE CASE WHEN " + + s"(%s = ${infMinusSymbol.get}) THEN CAST(${infMinusValue.get} AS $srcType) ELSE %s END END " + + s"AS ${target.dataType.sql})" + case _ => s"CAST(%s AS ${target.dataType.sql})" } } @@ -49,7 +65,7 @@ class TypeParser_FromLongTypeSuite extends TypeParserSuiteTemplate { target.dataType match { case FloatType | DoubleType => s"(($castS IS NULL) OR isnan($castS)) OR ($castS IN (Infinity, -Infinity))" case ByteType | ShortType | IntegerType => - s"($castS IS NULL) OR (NOT (CAST(Hey.sourceField AS INT) = CAST(Hey.sourceField AS BIGINT)))" + s"($castS IS NULL) OR (NOT ($castS = CAST(Hey.sourceField AS BIGINT)))" case _ => s"$castS IS NULL" } } diff --git a/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromStringTypeSuite.scala b/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromStringTypeSuite.scala index 1642343..f94d02a 100644 --- a/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromStringTypeSuite.scala +++ b/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromStringTypeSuite.scala @@ -17,7 +17,9 @@ package za.co.absa.standardization.interpreter.stages import org.apache.spark.sql.types._ +import za.co.absa.spark.commons.implicits.StructFieldImplicits.StructFieldMetadataEnhancements import za.co.absa.standardization.interpreter.stages.TypeParserSuiteTemplate.Input +import za.co.absa.standardization.schema.MetadataKeys import za.co.absa.standardization.time.DateTimePattern class TypeParser_FromStringTypeSuite extends TypeParserSuiteTemplate { @@ -33,18 +35,38 @@ class TypeParser_FromStringTypeSuite extends TypeParserSuiteTemplate { datetimeNeedsPattern = false ) - override protected def createCastTemplate(toType: DataType, customPattern: String, timezone: Option[String]): String = { + override protected def createCastTemplate(srcStructField: StructField, target: StructField, customPattern: String, timezone: Option[String]): String = { + val (infMinusValue, infMinusSymbol, infPlusValue, infPlusSymbol): (Option[String], Option[String], Option[String], Option[String]) = { + val infMinusValue = target.metadata.getOptString(MetadataKeys.MinusInfinityValue) + val infMinusSymbol = target.metadata.getOptString(MetadataKeys.MinusInfinitySymbol) + val infPlusValue = target.metadata.getOptString(MetadataKeys.PlusInfinityValue) + val infPlusSymbol = target.metadata.getOptString(MetadataKeys.PlusInfinitySymbol) + (infMinusValue, infMinusSymbol, infPlusValue, infPlusSymbol) + } + val infDefined = infMinusValue.isDefined && infMinusSymbol.isDefined && infPlusValue.isDefined && infPlusSymbol.isDefined + val srcType = srcStructField.dataType.sql + val datePattern = if (customPattern.isEmpty) "yyyy-MM-dd" else customPattern val dateTimePattern = if (customPattern.isEmpty) "yyyy-MM-dd HH:mm:ss" else customPattern val isEpoch = DateTimePattern.isEpoch(customPattern) - (toType, isEpoch, timezone) match { + val basInfCasting = s"CASE WHEN (CASE WHEN (`%s` = CAST('${infMinusSymbol.getOrElse("")}' AS $srcType)) THEN CAST('${infMinusValue.getOrElse("")}' AS $srcType) " + + s"ELSE `%s` END = CAST('${infPlusSymbol.getOrElse("")}' AS $srcType)) THEN CAST('${infPlusValue.getOrElse("")}' AS $srcType) ELSE CASE WHEN " + + s"(`%s` = CAST('${infMinusSymbol.getOrElse("")}' AS $srcType)) THEN CAST('${infMinusValue.getOrElse("")}' AS $srcType) ELSE `%s` END END" + (target.dataType, isEpoch, timezone) match { case (DateType, true, _) => s"to_date(CAST((CAST(`%s` AS DECIMAL(30,9)) / ${DateTimePattern.epochFactor(customPattern)}L) AS TIMESTAMP))" case (TimestampType, true, _) => s"CAST((CAST(%s AS DECIMAL(30,9)) / ${DateTimePattern.epochFactor(customPattern)}) AS TIMESTAMP)" case (DateType, _, Some(tz)) => s"to_date(to_utc_timestamp(to_timestamp(`%s`, '$dateTimePattern'), '$tz'))" case (TimestampType, _, Some(tz)) => s"to_utc_timestamp(to_timestamp(`%s`, '$dateTimePattern'), $tz)" - case (DateType, _, _) => s"to_date(`%s`, '$datePattern')" - case (TimestampType, _, _) => s"to_timestamp(`%s`, '$dateTimePattern')" - case _ => s"CAST(%s AS ${toType.sql})" + case (DateType, _, _) if !infDefined => s"to_date(`%s`, '$datePattern')" + case (DateType, _, _) if infDefined => s"to_date($basInfCasting, '$datePattern')" + case (TimestampType, _, _) if !infDefined => s"to_timestamp(`%s`, '$dateTimePattern')" + case (TimestampType, _, _) if infDefined => s"to_timestamp($basInfCasting, '$dateTimePattern')" + case (ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | _: DecimalType, _, _) if infDefined => + s"CAST(CASE WHEN (CASE WHEN (%s = CAST(${infMinusSymbol.get} AS $srcType)) THEN CAST(${infMinusValue.get} AS $srcType) " + + s"ELSE %s END = CAST(${infPlusSymbol.get} AS $srcType)) THEN CAST(${infPlusValue.get} AS $srcType) ELSE CASE WHEN " + + s"(%s = CAST(${infMinusSymbol.get} AS $srcType)) THEN CAST(${infMinusValue.get} AS $srcType) ELSE %s END END " + + s"AS ${target.dataType.sql})" + case _ => s"CAST(%s AS ${target.dataType.sql})" } } @@ -120,4 +142,15 @@ class TypeParser_FromStringTypeSuite extends TypeParserSuiteTemplate { doTestIntoTimestampFieldWithEpochPattern(input) } + test("Into float field with inf"){ + doTestIntoFloatWithInf(input) + } + + test("Into timestamp field with inf") { + doTestIntoTimestampWithPlusInfinity(input) + } + + test("Into date field with inf"){ + doTestIntoDateFieldWithInf(input) + } } diff --git a/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromTimestampTypeSuite.scala b/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromTimestampTypeSuite.scala index c26e439..e4cb368 100644 --- a/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromTimestampTypeSuite.scala +++ b/src/test/scala/za/co/absa/standardization/interpreter/stages/TypeParser_FromTimestampTypeSuite.scala @@ -17,7 +17,9 @@ package za.co.absa.standardization.interpreter.stages import org.apache.spark.sql.types._ +import za.co.absa.spark.commons.implicits.StructFieldImplicits.StructFieldMetadataEnhancements import za.co.absa.standardization.interpreter.stages.TypeParserSuiteTemplate.Input +import za.co.absa.standardization.schema.MetadataKeys import za.co.absa.standardization.time.DateTimePattern class TypeParser_FromTimestampTypeSuite extends TypeParserSuiteTemplate { @@ -33,16 +35,31 @@ class TypeParser_FromTimestampTypeSuite extends TypeParserSuiteTemplate { datetimeNeedsPattern = false ) - override protected def createCastTemplate(toType: DataType, pattern: String, timezone: Option[String]): String = { + override protected def createCastTemplate(srcStructField: StructField, target: StructField, pattern: String, timezone: Option[String]): String = { + val (infMinusValue, infMinusSymbol, infPlusValue, infPlusSymbol): (Option[String], Option[String], Option[String], Option[String]) = { + val infMinusValue = target.metadata.getOptString(MetadataKeys.MinusInfinityValue) + val infMinusSymbol = target.metadata.getOptString(MetadataKeys.MinusInfinitySymbol) + val infPlusValue = target.metadata.getOptString(MetadataKeys.PlusInfinityValue) + val infPlusSymbol = target.metadata.getOptString(MetadataKeys.PlusInfinitySymbol) + (infMinusValue, infMinusSymbol, infPlusValue, infPlusSymbol) + } + val infDefined = infMinusValue.isDefined && infMinusSymbol.isDefined && infPlusValue.isDefined && infPlusSymbol.isDefined + val srcType = srcStructField.dataType.sql val isEpoch = DateTimePattern.isEpoch(pattern) - (toType, isEpoch, timezone) match { + val baseInfCasting = s"CASE WHEN (CASE WHEN (%s = CAST(${infMinusSymbol.getOrElse("")} AS $srcType)) THEN CAST(${infMinusValue.getOrElse("")} AS $srcType) " + + s"ELSE %s END = CAST(${infPlusSymbol.getOrElse("")} AS $srcType)) THEN CAST(${infPlusValue.getOrElse("")} AS $srcType) ELSE CASE WHEN " + + s"(%s = CAST(${infMinusSymbol.getOrElse("")} AS $srcType)) THEN CAST(${infMinusValue.getOrElse("")} AS $srcType) ELSE %s END END" + (target.dataType, isEpoch, timezone) match { case (DateType, true, _) => s"to_date(CAST((CAST(`%s` AS DECIMAL(30,9)) / ${DateTimePattern.epochFactor(pattern)}L) AS TIMESTAMP))" case (TimestampType, true, _) => s"CAST((CAST(%s AS DECIMAL(30,9)) / ${DateTimePattern.epochFactor(pattern)}) AS TIMESTAMP)" case (TimestampType, _, Some(tz)) => s"to_utc_timestamp(%s, $tz)" case (DateType, _, Some(tz)) => s"to_date(to_utc_timestamp(`%s`, '$tz'))" - case (TimestampType, _, _) => "%s" + case (TimestampType, _, _) if !infDefined => "%s" + case (TimestampType, _, _) if infDefined => baseInfCasting case (DateType, _, _) => "to_date(`%s`)" - case _ => s"CAST(%s AS ${toType.sql})" + case (ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | DateType | _: DecimalType, _, _) if infDefined => + s"CAST($baseInfCasting AS ${target.dataType.sql})" + case _ => s"CAST(%s AS ${target.dataType.sql})" } } @@ -117,4 +134,7 @@ class TypeParser_FromTimestampTypeSuite extends TypeParserSuiteTemplate { doTestIntoTimestampFieldWithEpochPattern(input) } + test("Into timestamp field with inf") { + doTestIntoTimestampWithPlusInfinity(input) + } } From d96bcc34a1d624c4cc0b04594fb9be508fc5b221 Mon Sep 17 00:00:00 2001 From: Sasa Zejnilovic Date: Wed, 9 Oct 2024 10:51:11 +0200 Subject: [PATCH 2/4] #61 Fix unit test --- ...StandardizationInterpreter_FractionalSuite.scala | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/test/scala/za/co/absa/standardization/interpreter/StandardizationInterpreter_FractionalSuite.scala b/src/test/scala/za/co/absa/standardization/interpreter/StandardizationInterpreter_FractionalSuite.scala index 74a7f28..7d608c7 100644 --- a/src/test/scala/za/co/absa/standardization/interpreter/StandardizationInterpreter_FractionalSuite.scala +++ b/src/test/scala/za/co/absa/standardization/interpreter/StandardizationInterpreter_FractionalSuite.scala @@ -18,14 +18,13 @@ package za.co.absa.standardization.interpreter import org.apache.spark.sql.types._ import org.scalatest.funsuite.AnyFunSuite -import za.co.absa.standardization.ErrorMessage +import za.co.absa.standardization.{ErrorMessage, LoggerTestBase, Standardization, StandardizationErrorMessage, interpreter} import za.co.absa.spark.commons.test.SparkTestBase import za.co.absa.standardization.RecordIdGeneration.IdType.NoId import za.co.absa.standardization.config.{BasicMetadataColumnsConfig, BasicStandardizationConfig, ErrorCodesConfig} import za.co.absa.standardization.schema.MetadataKeys import za.co.absa.standardization.types.{CommonTypeDefaults, TypeDefaults} import za.co.absa.standardization.udf.UDFLibrary -import za.co.absa.standardization.{LoggerTestBase, Standardization, StandardizationErrorMessage} import za.co.absa.spark.commons.implicits.DataFrameImplicits.DataFrameEnhancements class StandardizationInterpreter_FractionalSuite extends AnyFunSuite with SparkTestBase with LoggerTestBase { @@ -82,8 +81,7 @@ class StandardizationInterpreter_FractionalSuite extends AnyFunSuite with SparkT + "789123456789123456789123456791245678912324789123456789123456789123456789123456789123456791245678912324789123" + "456789123456789123456789123456789123456789123456789123456789.1"), ("06-Text", "foo", "bar"), - ("07-Exponential notation", "-1.23E4", "+9.8765E-3"), - ("08-Infinity", "FRRR", "MAXVALUE") + ("07-Exponential notation", "-1.23E4", "+9.8765E-3") ) val src = seq.toDF("description","floatField", "doubleField") logDataFrameContent(src) @@ -110,7 +108,6 @@ class StandardizationInterpreter_FractionalSuite extends AnyFunSuite with SparkT StandardizationErrorMessage.stdCastErr("floatField", "foo", "string", "float", None), StandardizationErrorMessage.stdCastErr("doubleField", "bar", "string", "double", None))), FractionalRow("07-Exponential notation", Option(-12300.0f), Option(0.0098765)), - FractionalRow("08-Infinity", Option(0f), Option(Double.PositiveInfinity)) ) assertResult(exp)(std.as[FractionalRow].collect().sortBy(_.description).toList) @@ -185,7 +182,8 @@ class StandardizationInterpreter_FractionalSuite extends AnyFunSuite with SparkT + "678912345678912345678912345679124567891232478912345678912345678912345678912345678912345679124567891232478912" + "3456789123456789123456789123456789123456789123456789123456789.1"), ("06-Text", "foo", "bar"), - ("07-Exponential notation", "-1.23E4", "+9.8765E-3") + ("07-Exponential notation", "-1.23E4", "+9.8765E-3"), + ("08-Infinity", "FRRR", "MAXVALUE") ) val src = seq.toDF("description","floatField", "doubleField") logDataFrameContent(src) @@ -203,7 +201,8 @@ class StandardizationInterpreter_FractionalSuite extends AnyFunSuite with SparkT FractionalRow("06-Text", Option(0), None, Seq( StandardizationErrorMessage.stdCastErr("floatField", "foo", "string", "float", None), StandardizationErrorMessage.stdCastErr("doubleField", "bar", "string", "double", None))), - FractionalRow("07-Exponential notation", Option(-12300.0f), Option(0.0098765)) + FractionalRow("07-Exponential notation", Option(-12300.0f), Option(0.0098765)), + FractionalRow("08-Infinity", Option(0f), Option(Double.PositiveInfinity)) ) assertResult(exp)(std.as[FractionalRow].collect().sortBy(_.description).toList) From f59efe327aff76475d9ebe59cf5f871154424fca Mon Sep 17 00:00:00 2001 From: Sasa Zejnilovic Date: Wed, 9 Oct 2024 10:52:57 +0200 Subject: [PATCH 3/4] #61 Fix unit test --- .../StandardizationInterpreter_FractionalSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/scala/za/co/absa/standardization/interpreter/StandardizationInterpreter_FractionalSuite.scala b/src/test/scala/za/co/absa/standardization/interpreter/StandardizationInterpreter_FractionalSuite.scala index 7d608c7..0fec191 100644 --- a/src/test/scala/za/co/absa/standardization/interpreter/StandardizationInterpreter_FractionalSuite.scala +++ b/src/test/scala/za/co/absa/standardization/interpreter/StandardizationInterpreter_FractionalSuite.scala @@ -107,7 +107,7 @@ class StandardizationInterpreter_FractionalSuite extends AnyFunSuite with SparkT FractionalRow("06-Text", Option(0), None, Seq( StandardizationErrorMessage.stdCastErr("floatField", "foo", "string", "float", None), StandardizationErrorMessage.stdCastErr("doubleField", "bar", "string", "double", None))), - FractionalRow("07-Exponential notation", Option(-12300.0f), Option(0.0098765)), + FractionalRow("07-Exponential notation", Option(-12300.0f), Option(0.0098765)) ) assertResult(exp)(std.as[FractionalRow].collect().sortBy(_.description).toList) From 2b4d54005ff8b2c8499c8185317265a2b46ca874 Mon Sep 17 00:00:00 2001 From: Sasa Zejnilovic Date: Mon, 14 Oct 2024 10:37:06 +0200 Subject: [PATCH 4/4] #61 Move InfinitySupport to its own file --- .../stages/InfinitySupport.scala | 48 +++++++++++++++++++ .../standardization/stages/TypeParser.scala | 24 ---------- 2 files changed, 48 insertions(+), 24 deletions(-) create mode 100644 src/main/scala/za/co/absa/standardization/stages/InfinitySupport.scala diff --git a/src/main/scala/za/co/absa/standardization/stages/InfinitySupport.scala b/src/main/scala/za/co/absa/standardization/stages/InfinitySupport.scala new file mode 100644 index 0000000..21dfa1f --- /dev/null +++ b/src/main/scala/za/co/absa/standardization/stages/InfinitySupport.scala @@ -0,0 +1,48 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.standardization.stages + +import org.apache.spark.sql.Column +import org.apache.spark.sql.functions.{lit, when} +import org.apache.spark.sql.types.DataType + +trait InfinitySupport { + protected def infMinusSymbol: Option[String] + + protected def infMinusValue: Option[String] + + protected def infPlusSymbol: Option[String] + + protected def infPlusValue: Option[String] + + protected val origType: DataType + + def replaceInfinitySymbols(column: Column): Column = { + val columnWithNegativeInf: Column = infMinusSymbol.flatMap { minusSymbol => + infMinusValue.map { minusValue => + when(column === lit(minusSymbol).cast(origType), lit(minusValue).cast(origType)).otherwise(column) + } + }.getOrElse(column) + + infPlusSymbol.flatMap { plusSymbol => + infPlusValue.map { plusValue => + when(columnWithNegativeInf === lit(plusSymbol).cast(origType), lit(plusValue).cast(origType)) + .otherwise(columnWithNegativeInf) + } + }.getOrElse(columnWithNegativeInf) + } +} diff --git a/src/main/scala/za/co/absa/standardization/stages/TypeParser.scala b/src/main/scala/za/co/absa/standardization/stages/TypeParser.scala index 7fd626f..25a8d46 100644 --- a/src/main/scala/za/co/absa/standardization/stages/TypeParser.scala +++ b/src/main/scala/za/co/absa/standardization/stages/TypeParser.scala @@ -29,7 +29,6 @@ import za.co.absa.spark.hofs.transform import za.co.absa.standardization.{ErrorMessage, StandardizationErrorMessage} import za.co.absa.standardization.config.StandardizationConfig import za.co.absa.standardization.implicits.StdColumnImplicits.StdColumnEnhancements -import za.co.absa.standardization.numeric.DecimalSymbols import za.co.absa.standardization.schema.StdSchemaUtils.FieldWithSource import za.co.absa.standardization.schema.{MetadataKeys, MetadataValues, StdSchemaUtils} import za.co.absa.standardization.time.DateTimePattern @@ -695,27 +694,4 @@ object TypeParser { } } -sealed trait InfinitySupport { - protected def infMinusSymbol: Option[String] - protected def infMinusValue: Option[String] - protected def infPlusSymbol: Option[String] - protected def infPlusValue: Option[String] - protected val origType: DataType - - def replaceInfinitySymbols(column: Column): Column = { - val columnWithNegativeInf: Column = infMinusSymbol.flatMap { minusSymbol => - infMinusValue.map { minusValue => - when(column === lit(minusSymbol).cast(origType), lit(minusValue).cast(origType)).otherwise(column) - } - }.getOrElse(column) - - infPlusSymbol.flatMap { plusSymbol => - infPlusValue.map { plusValue => - when(columnWithNegativeInf === lit(plusSymbol).cast(origType), lit(plusValue).cast(origType)) - .otherwise(columnWithNegativeInf) - } - }.getOrElse(columnWithNegativeInf) - } -} - class TypeParserException(message: String) extends Exception(message: String)