From 66b31e9b2011ee81093f315cf8a8462d3a9c6cf6 Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Thu, 11 Apr 2024 12:20:34 +0200 Subject: [PATCH] #787 - attempt to solve all but covar_pop and kurtosis --- .../functions/DoubleBehaviourUtils.scala | 42 +++++--- .../NonAggregateFunctionsTests.scala | 50 ++++++++-- .../test/scala/frameless/ops/CubeTests.scala | 90 ++++++++++++----- .../scala/frameless/ops/RollupTests.scala | 96 ++++++++++++++----- 4 files changed, 208 insertions(+), 70 deletions(-) diff --git a/dataset/src/test/scala/frameless/functions/DoubleBehaviourUtils.scala b/dataset/src/test/scala/frameless/functions/DoubleBehaviourUtils.scala index e43a7cd8..f25e1815 100644 --- a/dataset/src/test/scala/frameless/functions/DoubleBehaviourUtils.scala +++ b/dataset/src/test/scala/frameless/functions/DoubleBehaviourUtils.scala @@ -20,19 +20,35 @@ object DoubleBehaviourUtils { val nanNullHandler: Any => Option[BigDecimal] = { case null => None case d: Double => - nanHandler(d).map { d => - if (d == Double.NegativeInfinity || d == Double.PositiveInfinity) - BigDecimal("1000000.000000") * (if (d == Double.PositiveInfinity) 1 - else -1) - else - BigDecimal(d).setScale( - 6, - if (d > 0) - BigDecimal.RoundingMode.FLOOR - else - BigDecimal.RoundingMode.CEILING - ) - } + nanHandler(d).map(truncate) case _ => ??? } + + /** ensure different serializations are 'comparable' */ + def truncate(d: Double): BigDecimal = + if (d == Double.NegativeInfinity || d == Double.PositiveInfinity) + BigDecimal("1000000.000000") * (if (d == Double.PositiveInfinity) 1 + else -1) + else + BigDecimal(d).setScale( + 6, + if (d > 0) + BigDecimal.RoundingMode.FLOOR + else + BigDecimal.RoundingMode.CEILING + ) +} + +/** drop in conversion for doubles to handle serialization on cluster */ +trait ToDecimal[A] { + def truncate(a: A): Option[BigDecimal] +} + +object ToDecimal { + + implicit val doubleToDecimal: ToDecimal[Double] = new ToDecimal[Double] { + + override def truncate(a: Double): Option[BigDecimal] = + DoubleBehaviourUtils.nanNullHandler(a) + } } diff --git a/dataset/src/test/scala/frameless/functions/NonAggregateFunctionsTests.scala b/dataset/src/test/scala/frameless/functions/NonAggregateFunctionsTests.scala index 2a155b57..8ec1276f 100644 --- a/dataset/src/test/scala/frameless/functions/NonAggregateFunctionsTests.scala +++ b/dataset/src/test/scala/frameless/functions/NonAggregateFunctionsTests.scala @@ -798,7 +798,7 @@ class NonAggregateFunctionsTests extends TypedDatasetSuite { val spark = session import spark.implicits._ - def prop[A: CatalystNumeric: TypedEncoder: Encoder]( + def prop[A: CatalystNumeric: TypedEncoder: Encoder: CatalystOrdered]( na: A, values: List[X1[A]] )(implicit @@ -811,6 +811,7 @@ class NonAggregateFunctionsTests extends TypedDatasetSuite { .map(DoubleBehaviourUtils.nanNullHandler) .collect() .toList + .sorted val typedDS = TypedDataset.create(cDS) val res = typedDS @@ -820,20 +821,26 @@ class NonAggregateFunctionsTests extends TypedDatasetSuite { .collect() .run() .toList + .sorted val aggrTyped = typedDS + .orderBy(typedDS('a).asc) .agg(atan(frameless.functions.aggregate.first(typedDS('a)))) .firstOption() .run() .get val aggrSpark = cDS + .orderBy("a") .select( sparkFunctions.atan(sparkFunctions.first("a")).as[Double] ) .first() - (res ?= resCompare).&&(aggrTyped ?= aggrSpark) + (res ?= resCompare).&&( + DoubleBehaviourUtils.nanNullHandler(aggrTyped) ?= DoubleBehaviourUtils + .nanNullHandler(aggrSpark) + ) } check(forAll(prop[Int] _)) @@ -849,8 +856,8 @@ class NonAggregateFunctionsTests extends TypedDatasetSuite { import spark.implicits._ def prop[ - A: CatalystNumeric: TypedEncoder: Encoder, - B: CatalystNumeric: TypedEncoder: Encoder + A: CatalystNumeric: TypedEncoder: Encoder: CatalystOrdered, + B: CatalystNumeric: TypedEncoder: Encoder: CatalystOrdered ](na: X2[A, B], values: List[X2[A, B]] )(implicit @@ -863,6 +870,7 @@ class NonAggregateFunctionsTests extends TypedDatasetSuite { .map(DoubleBehaviourUtils.nanNullHandler) .collect() .toList + .sorted val typedDS = TypedDataset.create(cDS) val res = typedDS @@ -872,8 +880,10 @@ class NonAggregateFunctionsTests extends TypedDatasetSuite { .collect() .run() .toList + .sorted val aggrTyped = typedDS + .orderBy(typedDS('a).asc, typedDS('b).asc) .agg( atan2( frameless.functions.aggregate.first(typedDS('a)), @@ -885,6 +895,7 @@ class NonAggregateFunctionsTests extends TypedDatasetSuite { .get val aggrSpark = cDS + .orderBy("a", "b") .select( sparkFunctions .atan2(sparkFunctions.first("a"), sparkFunctions.first("b")) @@ -892,7 +903,10 @@ class NonAggregateFunctionsTests extends TypedDatasetSuite { ) .first() - (res ?= resCompare).&&(aggrTyped ?= aggrSpark) + (res ?= resCompare).&&( + DoubleBehaviourUtils.nanNullHandler(aggrTyped) ?= DoubleBehaviourUtils + .nanNullHandler(aggrSpark) + ) } check(forAll(prop[Int, Long] _)) @@ -907,7 +921,7 @@ class NonAggregateFunctionsTests extends TypedDatasetSuite { val spark = session import spark.implicits._ - def prop[A: CatalystNumeric: TypedEncoder: Encoder]( + def prop[A: CatalystNumeric: TypedEncoder: Encoder: CatalystOrdered]( na: X1[A], value: List[X1[A]], lit: Double @@ -921,6 +935,7 @@ class NonAggregateFunctionsTests extends TypedDatasetSuite { .map(DoubleBehaviourUtils.nanNullHandler) .collect() .toList + .sorted val typedDS = TypedDataset.create(cDS) val res = typedDS @@ -930,20 +945,26 @@ class NonAggregateFunctionsTests extends TypedDatasetSuite { .collect() .run() .toList + .sorted val aggrTyped = typedDS + .orderBy(typedDS('a).asc) .agg(atan2(lit, frameless.functions.aggregate.first(typedDS('a)))) .firstOption() .run() .get val aggrSpark = cDS + .orderBy("a") .select( sparkFunctions.atan2(lit, sparkFunctions.first("a")).as[Double] ) .first() - (res ?= resCompare).&&(aggrTyped ?= aggrSpark) + (res ?= resCompare).&&( + DoubleBehaviourUtils.nanNullHandler(aggrTyped) ?= DoubleBehaviourUtils + .nanNullHandler(aggrSpark) + ) } check(forAll(prop[Int] _)) @@ -958,7 +979,7 @@ class NonAggregateFunctionsTests extends TypedDatasetSuite { val spark = session import spark.implicits._ - def prop[A: CatalystNumeric: TypedEncoder: Encoder]( + def prop[A: CatalystNumeric: TypedEncoder: Encoder: CatalystOrdered]( na: X1[A], value: List[X1[A]], lit: Double @@ -972,6 +993,7 @@ class NonAggregateFunctionsTests extends TypedDatasetSuite { .map(DoubleBehaviourUtils.nanNullHandler) .collect() .toList + .sorted val typedDS = TypedDataset.create(cDS) val res = typedDS @@ -981,20 +1003,26 @@ class NonAggregateFunctionsTests extends TypedDatasetSuite { .collect() .run() .toList + .sorted val aggrTyped = typedDS + .orderBy(typedDS('a).asc) .agg(atan2(frameless.functions.aggregate.first(typedDS('a)), lit)) .firstOption() .run() .get val aggrSpark = cDS + .orderBy("a") .select( sparkFunctions.atan2(sparkFunctions.first("a"), lit).as[Double] ) .first() - (res ?= resCompare).&&(aggrTyped ?= aggrSpark) + (res ?= resCompare).&&( + DoubleBehaviourUtils.nanNullHandler(aggrTyped) ?= DoubleBehaviourUtils + .nanNullHandler(aggrSpark) + ) } check(forAll(prop[Int] _)) @@ -2139,15 +2167,18 @@ class NonAggregateFunctionsTests extends TypedDatasetSuite { .map(_.getAs[Int](0)) .collect() .toVector + .sorted val typed = ds .select(levenshtein(ds('a), concat(ds('a), lit("Hello")))) .collect() .run() .toVector + .sorted val cDS = ds.dataset val aggrTyped = ds + .orderBy(ds('a).asc) .agg( levenshtein( frameless.functions.aggregate.first(ds('a)), @@ -2159,6 +2190,7 @@ class NonAggregateFunctionsTests extends TypedDatasetSuite { .get val aggrSpark = cDS + .orderBy("a") .select( sparkFunctions .levenshtein(sparkFunctions.first("a"), sparkFunctions.lit("Hello")) diff --git a/dataset/src/test/scala/frameless/ops/CubeTests.scala b/dataset/src/test/scala/frameless/ops/CubeTests.scala index d0ee52aa..a4960f6a 100644 --- a/dataset/src/test/scala/frameless/ops/CubeTests.scala +++ b/dataset/src/test/scala/frameless/ops/CubeTests.scala @@ -1,6 +1,7 @@ package frameless package ops +import frameless.functions.ToDecimal import frameless.functions.aggregate._ import org.scalacheck.Prop import org.scalacheck.Prop._ @@ -137,7 +138,7 @@ class CubeTests extends TypedDatasetSuite { B: TypedEncoder, C: TypedEncoder, OutB: TypedEncoder: Numeric, - OutC: TypedEncoder: Numeric + OutC: TypedEncoder: Numeric: ToDecimal ](data: List[X3[A, B, C]] )(implicit summableB: CatalystSummable[B, OutB], @@ -148,12 +149,15 @@ class CubeTests extends TypedDatasetSuite { val B = dataset.col[B]('b) val C = dataset.col[C]('c) + val toDecOpt = implicitly[ToDecimal[OutC]].truncate _ + val framelessSumBC = dataset .cube(A) .agg(sum(B), sum(C)) .collect() .run() .toVector + .map(row => row.copy(_3 = toDecOpt(row._3))) .sortBy(t => (t._1, t._2, t._3)) val sparkSumBC = dataset.dataset @@ -162,7 +166,11 @@ class CubeTests extends TypedDatasetSuite { .collect() .toVector .map(row => - (Option(row.getAs[A](0)), row.getAs[OutB](1), row.getAs[OutC](2)) + ( + Option(row.getAs[A](0)), + row.getAs[OutB](1), + toDecOpt(row.getAs[OutC](2)) + ) ) .sortBy(t => (t._1, t._2, t._3)) @@ -172,6 +180,7 @@ class CubeTests extends TypedDatasetSuite { .collect() .run() .toVector + .map(row => row.copy(_3 = toDecOpt(row._3))) .sortBy(t => (t._1, t._2, t._3)) val sparkSumBCB = dataset.dataset @@ -183,7 +192,7 @@ class CubeTests extends TypedDatasetSuite { ( Option(row.getAs[A](0)), row.getAs[OutB](1), - row.getAs[OutC](2), + toDecOpt(row.getAs[OutC](2)), row.getAs[OutB](3) ) ) @@ -195,6 +204,7 @@ class CubeTests extends TypedDatasetSuite { .collect() .run() .toVector + .map(row => row.copy(_3 = toDecOpt(row._3), _5 = toDecOpt(row._5))) .sortBy(t => (t._1, t._2, t._3)) val sparkSumBCBC = dataset.dataset @@ -206,9 +216,9 @@ class CubeTests extends TypedDatasetSuite { ( Option(row.getAs[A](0)), row.getAs[OutB](1), - row.getAs[OutC](2), + toDecOpt(row.getAs[OutC](2)), row.getAs[OutB](3), - row.getAs[OutC](4) + toDecOpt(row.getAs[OutC](4)) ) ) .sortBy(t => (t._1, t._2, t._3)) @@ -219,6 +229,7 @@ class CubeTests extends TypedDatasetSuite { .collect() .run() .toVector + .map(row => row.copy(_3 = toDecOpt(row._3), _5 = toDecOpt(row._5))) .sortBy(t => (t._1, t._2, t._3)) val sparkSumBCBCB = dataset.dataset @@ -230,9 +241,9 @@ class CubeTests extends TypedDatasetSuite { ( Option(row.getAs[A](0)), row.getAs[OutB](1), - row.getAs[OutC](2), + toDecOpt(row.getAs[OutC](2)), row.getAs[OutB](3), - row.getAs[OutC](4), + toDecOpt(row.getAs[OutC](4)), row.getAs[OutB](5) ) ) @@ -300,11 +311,14 @@ class CubeTests extends TypedDatasetSuite { A: TypedEncoder: Ordering, B: TypedEncoder: Ordering, C: TypedEncoder, - OutC: TypedEncoder: Numeric + OutC: TypedEncoder: Numeric: ToDecimal ](data: List[X3[A, B, C]] )(implicit summableC: CatalystSummable[C, OutC] ): Prop = { + + val toDecOpt = implicitly[ToDecimal[OutC]].truncate _ + val dataset = TypedDataset.create(data) val A = dataset.col[A]('a) val B = dataset.col[B]('b) @@ -316,6 +330,7 @@ class CubeTests extends TypedDatasetSuite { .collect() .run() .toVector + .map(row => row.copy(_3 = toDecOpt(row._3))) .sortBy(t => (t._2, t._1, t._3)) val sparkSumC = dataset.dataset @@ -324,7 +339,11 @@ class CubeTests extends TypedDatasetSuite { .collect() .toVector .map(row => - (Option(row.getAs[A](0)), Option(row.getAs[B](1)), row.getAs[OutC](2)) + ( + Option(row.getAs[A](0)), + Option(row.getAs[B](1)), + toDecOpt(row.getAs[OutC](2)) + ) ) .sortBy(t => (t._2, t._1, t._3)) @@ -334,6 +353,7 @@ class CubeTests extends TypedDatasetSuite { .collect() .run() .toVector + .map(row => row.copy(_3 = toDecOpt(row._3), _4 = toDecOpt(row._4))) .sortBy(t => (t._2, t._1, t._3)) val sparkSumCC = dataset.dataset @@ -345,8 +365,8 @@ class CubeTests extends TypedDatasetSuite { ( Option(row.getAs[A](0)), Option(row.getAs[B](1)), - row.getAs[OutC](2), - row.getAs[OutC](3) + toDecOpt(row.getAs[OutC](2)), + toDecOpt(row.getAs[OutC](3)) ) ) .sortBy(t => (t._2, t._1, t._3)) @@ -357,6 +377,13 @@ class CubeTests extends TypedDatasetSuite { .collect() .run() .toVector + .map(row => + row.copy( + _3 = toDecOpt(row._3), + _4 = toDecOpt(row._4), + _5 = toDecOpt(row._5) + ) + ) .sortBy(t => (t._2, t._1, t._3)) val sparkSumCCC = dataset.dataset @@ -368,9 +395,9 @@ class CubeTests extends TypedDatasetSuite { ( Option(row.getAs[A](0)), Option(row.getAs[B](1)), - row.getAs[OutC](2), - row.getAs[OutC](3), - row.getAs[OutC](4) + toDecOpt(row.getAs[OutC](2)), + toDecOpt(row.getAs[OutC](3)), + toDecOpt(row.getAs[OutC](4)) ) ) .sortBy(t => (t._2, t._1, t._3)) @@ -381,6 +408,14 @@ class CubeTests extends TypedDatasetSuite { .collect() .run() .toVector + .map(row => + row.copy( + _3 = toDecOpt(row._3), + _4 = toDecOpt(row._4), + _5 = toDecOpt(row._5), + _6 = toDecOpt(row._6) + ) + ) .sortBy(t => (t._2, t._1, t._3)) val sparkSumCCCC = dataset.dataset @@ -392,10 +427,10 @@ class CubeTests extends TypedDatasetSuite { ( Option(row.getAs[A](0)), Option(row.getAs[B](1)), - row.getAs[OutC](2), - row.getAs[OutC](3), - row.getAs[OutC](4), - row.getAs[OutC](5) + toDecOpt(row.getAs[OutC](2)), + toDecOpt(row.getAs[OutC](3)), + toDecOpt(row.getAs[OutC](4)), + toDecOpt(row.getAs[OutC](5)) ) ) .sortBy(t => (t._2, t._1, t._3)) @@ -406,6 +441,15 @@ class CubeTests extends TypedDatasetSuite { .collect() .run() .toVector + .map(row => + row.copy( + _3 = toDecOpt(row._3), + _4 = toDecOpt(row._4), + _5 = toDecOpt(row._5), + _6 = toDecOpt(row._6), + _7 = toDecOpt(row._7) + ) + ) .sortBy(t => (t._2, t._1, t._3)) val sparkSumCCCCC = dataset.dataset @@ -417,11 +461,11 @@ class CubeTests extends TypedDatasetSuite { ( Option(row.getAs[A](0)), Option(row.getAs[B](1)), - row.getAs[OutC](2), - row.getAs[OutC](3), - row.getAs[OutC](4), - row.getAs[OutC](5), - row.getAs[OutC](6) + toDecOpt(row.getAs[OutC](2)), + toDecOpt(row.getAs[OutC](3)), + toDecOpt(row.getAs[OutC](4)), + toDecOpt(row.getAs[OutC](5)), + toDecOpt(row.getAs[OutC](6)) ) ) .sortBy(t => (t._2, t._1, t._3)) diff --git a/dataset/src/test/scala/frameless/ops/RollupTests.scala b/dataset/src/test/scala/frameless/ops/RollupTests.scala index 34cd4062..f847e5b8 100644 --- a/dataset/src/test/scala/frameless/ops/RollupTests.scala +++ b/dataset/src/test/scala/frameless/ops/RollupTests.scala @@ -1,6 +1,7 @@ package frameless package ops +import frameless.functions.ToDecimal import frameless.functions.aggregate._ import org.scalacheck.Prop import org.scalacheck.Prop._ @@ -127,7 +128,7 @@ class RollupTests extends TypedDatasetSuite { B: TypedEncoder, C: TypedEncoder, OutB: TypedEncoder: Numeric, - OutC: TypedEncoder: Numeric + OutC: TypedEncoder: Numeric: ToDecimal ](data: List[X3[A, B, C]] )(implicit summableB: CatalystSummable[B, OutB], @@ -138,12 +139,15 @@ class RollupTests extends TypedDatasetSuite { val B = dataset.col[B]('b) val C = dataset.col[C]('c) + val toDecOpt = implicitly[ToDecimal[OutC]].truncate _ + val framelessSumBC = dataset .rollup(A) .agg(sum(B), sum(C)) .collect() .run() .toVector + .map(row => row.copy(_3 = toDecOpt(row._3))) .sortBy(identity) val sparkSumBC = dataset.dataset @@ -152,7 +156,11 @@ class RollupTests extends TypedDatasetSuite { .collect() .toVector .map(row => - (Option(row.getAs[A](0)), row.getAs[OutB](1), row.getAs[OutC](2)) + ( + Option(row.getAs[A](0)), + row.getAs[OutB](1), + toDecOpt(row.getAs[OutC](2)) + ) ) .sortBy(identity) @@ -162,6 +170,7 @@ class RollupTests extends TypedDatasetSuite { .collect() .run() .toVector + .map(row => row.copy(_3 = toDecOpt(row._3))) .sortBy(identity) val sparkSumBCB = dataset.dataset @@ -173,7 +182,7 @@ class RollupTests extends TypedDatasetSuite { ( Option(row.getAs[A](0)), row.getAs[OutB](1), - row.getAs[OutC](2), + toDecOpt(row.getAs[OutC](2)), row.getAs[OutB](3) ) ) @@ -185,6 +194,7 @@ class RollupTests extends TypedDatasetSuite { .collect() .run() .toVector + .map(row => row.copy(_3 = toDecOpt(row._3), _5 = toDecOpt(row._5))) .sortBy(identity) val sparkSumBCBC = dataset.dataset @@ -196,9 +206,9 @@ class RollupTests extends TypedDatasetSuite { ( Option(row.getAs[A](0)), row.getAs[OutB](1), - row.getAs[OutC](2), + toDecOpt(row.getAs[OutC](2)), row.getAs[OutB](3), - row.getAs[OutC](4) + toDecOpt(row.getAs[OutC](4)) ) ) .sortBy(identity) @@ -209,6 +219,7 @@ class RollupTests extends TypedDatasetSuite { .collect() .run() .toVector + .map(row => row.copy(_3 = toDecOpt(row._3), _5 = toDecOpt(row._5))) .sortBy(identity) val sparkSumBCBCB = dataset.dataset @@ -220,9 +231,9 @@ class RollupTests extends TypedDatasetSuite { ( Option(row.getAs[A](0)), row.getAs[OutB](1), - row.getAs[OutC](2), + toDecOpt(row.getAs[OutC](2)), row.getAs[OutB](3), - row.getAs[OutC](4), + toDecOpt(row.getAs[OutC](4)), row.getAs[OutB](5) ) ) @@ -244,7 +255,7 @@ class RollupTests extends TypedDatasetSuite { C: TypedEncoder, D: TypedEncoder, OutC: TypedEncoder: Numeric, - OutD: TypedEncoder: Numeric + OutD: TypedEncoder: Numeric: ToDecimal ](data: List[X4[A, B, C, D]] )(implicit summableC: CatalystSummable[C, OutC], @@ -256,12 +267,15 @@ class RollupTests extends TypedDatasetSuite { val C = dataset.col[C]('c) val D = dataset.col[D]('d) + val toDecOpt = implicitly[ToDecimal[OutD]].truncate _ + val framelessSumByAB = dataset .rollup(A, B) .agg(sum(C), sum(D)) .collect() .run() .toVector + .map(row => row.copy(_4 = toDecOpt(row._4))) .sortBy(t => (t._2, t._1, t._3, t._4)) val sparkSumByAB = dataset.dataset @@ -274,7 +288,7 @@ class RollupTests extends TypedDatasetSuite { Option(row.getAs[A](0)), Option(row.getAs[B](1)), row.getAs[OutC](2), - row.getAs[OutD](3) + toDecOpt(row.getAs[OutD](3)) ) ) .sortBy(t => (t._2, t._1, t._3, t._4)) @@ -290,7 +304,7 @@ class RollupTests extends TypedDatasetSuite { A: TypedEncoder: Ordering, B: TypedEncoder: Ordering, C: TypedEncoder, - OutC: TypedEncoder: Numeric + OutC: TypedEncoder: Numeric: ToDecimal ](data: List[X3[A, B, C]] )(implicit summableC: CatalystSummable[C, OutC] @@ -300,12 +314,15 @@ class RollupTests extends TypedDatasetSuite { val B = dataset.col[B]('b) val C = dataset.col[C]('c) + val toDecOpt = implicitly[ToDecimal[OutC]].truncate _ + val framelessSumC = dataset .rollup(A, B) .agg(sum(C)) .collect() .run() .toVector + .map(row => row.copy(_3 = toDecOpt(row._3))) .sortBy(t => (t._2, t._1, t._3)) val sparkSumC = dataset.dataset @@ -314,7 +331,11 @@ class RollupTests extends TypedDatasetSuite { .collect() .toVector .map(row => - (Option(row.getAs[A](0)), Option(row.getAs[B](1)), row.getAs[OutC](2)) + ( + Option(row.getAs[A](0)), + Option(row.getAs[B](1)), + toDecOpt(row.getAs[OutC](2)) + ) ) .sortBy(t => (t._2, t._1, t._3)) @@ -324,6 +345,7 @@ class RollupTests extends TypedDatasetSuite { .collect() .run() .toVector + .map(row => row.copy(_3 = toDecOpt(row._3), _4 = toDecOpt(row._4))) .sortBy(t => (t._2, t._1, t._3)) val sparkSumCC = dataset.dataset @@ -335,8 +357,8 @@ class RollupTests extends TypedDatasetSuite { ( Option(row.getAs[A](0)), Option(row.getAs[B](1)), - row.getAs[OutC](2), - row.getAs[OutC](3) + toDecOpt(row.getAs[OutC](2)), + toDecOpt(row.getAs[OutC](3)) ) ) .sortBy(t => (t._2, t._1, t._3)) @@ -347,6 +369,13 @@ class RollupTests extends TypedDatasetSuite { .collect() .run() .toVector + .map(row => + row.copy( + _3 = toDecOpt(row._3), + _4 = toDecOpt(row._4), + _5 = toDecOpt(row._5) + ) + ) .sortBy(t => (t._2, t._1, t._3)) val sparkSumCCC = dataset.dataset @@ -358,9 +387,9 @@ class RollupTests extends TypedDatasetSuite { ( Option(row.getAs[A](0)), Option(row.getAs[B](1)), - row.getAs[OutC](2), - row.getAs[OutC](3), - row.getAs[OutC](4) + toDecOpt(row.getAs[OutC](2)), + toDecOpt(row.getAs[OutC](3)), + toDecOpt(row.getAs[OutC](4)) ) ) .sortBy(t => (t._2, t._1, t._3)) @@ -371,6 +400,14 @@ class RollupTests extends TypedDatasetSuite { .collect() .run() .toVector + .map(row => + row.copy( + _3 = toDecOpt(row._3), + _4 = toDecOpt(row._4), + _5 = toDecOpt(row._5), + _6 = toDecOpt(row._6) + ) + ) .sortBy(t => (t._2, t._1, t._3)) val sparkSumCCCC = dataset.dataset @@ -382,10 +419,10 @@ class RollupTests extends TypedDatasetSuite { ( Option(row.getAs[A](0)), Option(row.getAs[B](1)), - row.getAs[OutC](2), - row.getAs[OutC](3), - row.getAs[OutC](4), - row.getAs[OutC](5) + toDecOpt(row.getAs[OutC](2)), + toDecOpt(row.getAs[OutC](3)), + toDecOpt(row.getAs[OutC](4)), + toDecOpt(row.getAs[OutC](5)) ) ) .sortBy(t => (t._2, t._1, t._3)) @@ -396,6 +433,15 @@ class RollupTests extends TypedDatasetSuite { .collect() .run() .toVector + .map(row => + row.copy( + _3 = toDecOpt(row._3), + _4 = toDecOpt(row._4), + _5 = toDecOpt(row._5), + _6 = toDecOpt(row._6), + _7 = toDecOpt(row._7) + ) + ) .sortBy(t => (t._2, t._1, t._3)) val sparkSumCCCCC = dataset.dataset @@ -407,11 +453,11 @@ class RollupTests extends TypedDatasetSuite { ( Option(row.getAs[A](0)), Option(row.getAs[B](1)), - row.getAs[OutC](2), - row.getAs[OutC](3), - row.getAs[OutC](4), - row.getAs[OutC](5), - row.getAs[OutC](6) + toDecOpt(row.getAs[OutC](2)), + toDecOpt(row.getAs[OutC](3)), + toDecOpt(row.getAs[OutC](4)), + toDecOpt(row.getAs[OutC](5)), + toDecOpt(row.getAs[OutC](6)) ) ) .sortBy(t => (t._2, t._1, t._3))