Skip to content

Commit

Permalink
typelevel#787 - tests have ordering and precision issues when run on …
Browse files Browse the repository at this point in the history
…clusters
  • Loading branch information
chris-twiner committed Apr 10, 2024
1 parent f793fc7 commit e582962
Show file tree
Hide file tree
Showing 5 changed files with 1,121 additions and 460 deletions.
285 changes: 215 additions & 70 deletions dataset/src/test/scala/frameless/OrderByTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,25 @@ import org.apache.spark.sql.Column
import org.scalatest.matchers.should.Matchers

class OrderByTests extends TypedDatasetSuite with Matchers {
def sortings[A : CatalystOrdered, T]: Seq[(TypedColumn[T, A] => SortedTypedColumn[T, A], Column => Column)] = Seq(
(_.desc, _.desc),
(_.asc, _.asc),
(t => t, t => t) //default ascending
)

def sortings[A: CatalystOrdered, T]: Seq[(TypedColumn[T, A] => SortedTypedColumn[T, A], Column => Column)] =
Seq(
(_.desc, _.desc),
(_.asc, _.asc),
(t => t, t => t) // default ascending
)

test("single column non nullable orderBy") {
def prop[A: TypedEncoder : CatalystOrdered](data: Vector[X1[A]]): Prop = {
def prop[A: TypedEncoder: CatalystOrdered](data: Vector[X1[A]]): Prop = {
val ds = TypedDataset.create(data)

sortings[A, X1[A]].map { case (typ, untyp) =>
ds.dataset.orderBy(untyp(ds.dataset.col("a"))).collect().toVector.?=(
ds.orderBy(typ(ds('a))).collect().run().toVector)
sortings[A, X1[A]].map {
case (typ, untyp) =>
ds.dataset
.orderBy(untyp(ds.dataset.col("a")))
.collect()
.toVector
.?=(ds.orderBy(typ(ds('a))).collect().run().toVector)
}.reduce(_ && _)
}

Expand All @@ -36,12 +42,16 @@ class OrderByTests extends TypedDatasetSuite with Matchers {
}

test("single column non nullable partition sorting") {
def prop[A: TypedEncoder : CatalystOrdered](data: Vector[X1[A]]): Prop = {
def prop[A: TypedEncoder: CatalystOrdered](data: Vector[X1[A]]): Prop = {
val ds = TypedDataset.create(data)

sortings[A, X1[A]].map { case (typ, untyp) =>
ds.dataset.sortWithinPartitions(untyp(ds.dataset.col("a"))).collect().toVector.?=(
ds.sortWithinPartitions(typ(ds('a))).collect().run().toVector)
sortings[A, X1[A]].map {
case (typ, untyp) =>
ds.dataset
.sortWithinPartitions(untyp(ds.dataset.col("a")))
.collect()
.toVector
.?=(ds.sortWithinPartitions(typ(ds('a))).collect().run().toVector)
}.reduce(_ && _)
}

Expand All @@ -58,15 +68,34 @@ class OrderByTests extends TypedDatasetSuite with Matchers {
}

test("two columns non nullable orderBy") {
def prop[A: TypedEncoder : CatalystOrdered, B: TypedEncoder : CatalystOrdered](data: Vector[X2[A,B]]): Prop = {
def prop[
A: TypedEncoder: CatalystOrdered,
B: TypedEncoder: CatalystOrdered
](data: Vector[X2[A, B]]
): Prop = {
val ds = TypedDataset.create(data)

sortings[A, X2[A, B]].reverse.zip(sortings[B, X2[A, B]]).map { case ((typA, untypA), (typB, untypB)) =>
val vanillaSpark = ds.dataset.orderBy(untypA(ds.dataset.col("a")), untypB(ds.dataset.col("b"))).collect().toVector
vanillaSpark.?=(ds.orderBy(typA(ds('a)), typB(ds('b))).collect().run().toVector).&&(
vanillaSpark ?= ds.orderByMany(typA(ds('a)), typB(ds('b))).collect().run().toVector
)
}.reduce(_ && _)
sortings[A, X2[A, B]].reverse
.zip(sortings[B, X2[A, B]])
.map {
case ((typA, untypA), (typB, untypB)) =>
val vanillaSpark = ds.dataset
.orderBy(untypA(ds.dataset.col("a")), untypB(ds.dataset.col("b")))
.collect()
.toVector
vanillaSpark
.?=(
ds.orderBy(typA(ds('a)), typB(ds('b))).collect().run().toVector
)
.&&(
vanillaSpark ?= ds
.orderByMany(typA(ds('a)), typB(ds('b)))
.collect()
.run()
.toVector
)
}
.reduce(_ && _)
}

check(forAll(prop[SQLDate, Long] _))
Expand All @@ -75,15 +104,40 @@ class OrderByTests extends TypedDatasetSuite with Matchers {
}

test("two columns non nullable partition sorting") {
def prop[A: TypedEncoder : CatalystOrdered, B: TypedEncoder : CatalystOrdered](data: Vector[X2[A,B]]): Prop = {
def prop[
A: TypedEncoder: CatalystOrdered,
B: TypedEncoder: CatalystOrdered
](data: Vector[X2[A, B]]
): Prop = {
val ds = TypedDataset.create(data)

sortings[A, X2[A, B]].reverse.zip(sortings[B, X2[A, B]]).map { case ((typA, untypA), (typB, untypB)) =>
val vanillaSpark = ds.dataset.sortWithinPartitions(untypA(ds.dataset.col("a")), untypB(ds.dataset.col("b"))).collect().toVector
vanillaSpark.?=(ds.sortWithinPartitions(typA(ds('a)), typB(ds('b))).collect().run().toVector).&&(
vanillaSpark ?= ds.sortWithinPartitionsMany(typA(ds('a)), typB(ds('b))).collect().run().toVector
)
}.reduce(_ && _)
sortings[A, X2[A, B]].reverse
.zip(sortings[B, X2[A, B]])
.map {
case ((typA, untypA), (typB, untypB)) =>
val vanillaSpark = ds.dataset
.sortWithinPartitions(
untypA(ds.dataset.col("a")),
untypB(ds.dataset.col("b"))
)
.collect()
.toVector
vanillaSpark
.?=(
ds.sortWithinPartitions(typA(ds('a)), typB(ds('b)))
.collect()
.run()
.toVector
)
.&&(
vanillaSpark ?= ds
.sortWithinPartitionsMany(typA(ds('a)), typB(ds('b)))
.collect()
.run()
.toVector
)
}
.reduce(_ && _)
}

check(forAll(prop[SQLDate, Long] _))
Expand All @@ -92,21 +146,43 @@ class OrderByTests extends TypedDatasetSuite with Matchers {
}

test("three columns non nullable orderBy") {
def prop[A: TypedEncoder : CatalystOrdered, B: TypedEncoder : CatalystOrdered](data: Vector[X3[A,B,A]]): Prop = {
def prop[
A: TypedEncoder: CatalystOrdered,
B: TypedEncoder: CatalystOrdered
](data: Vector[X3[A, B, A]]
): Prop = {
val ds = TypedDataset.create(data)

sortings[A, X3[A, B, A]].reverse
.zip(sortings[B, X3[A, B, A]])
.zip(sortings[A, X3[A, B, A]])
.map { case (((typA, untypA), (typB, untypB)), (typA2, untypA2)) =>
val vanillaSpark = ds.dataset
.orderBy(untypA(ds.dataset.col("a")), untypB(ds.dataset.col("b")), untypA2(ds.dataset.col("c")))
.collect().toVector

vanillaSpark.?=(ds.orderBy(typA(ds('a)), typB(ds('b)), typA2(ds('c))).collect().run().toVector).&&(
vanillaSpark ?= ds.orderByMany(typA(ds('a)), typB(ds('b)), typA2(ds('c))).collect().run().toVector
)
}.reduce(_ && _)
.map {
case (((typA, untypA), (typB, untypB)), (typA2, untypA2)) =>
val vanillaSpark = ds.dataset
.orderBy(
untypA(ds.dataset.col("a")),
untypB(ds.dataset.col("b")),
untypA2(ds.dataset.col("c"))
)
.collect()
.toVector

vanillaSpark
.?=(
ds.orderBy(typA(ds('a)), typB(ds('b)), typA2(ds('c)))
.collect()
.run()
.toVector
)
.&&(
vanillaSpark ?= ds
.orderByMany(typA(ds('a)), typB(ds('b)), typA2(ds('c)))
.collect()
.run()
.toVector
)
}
.reduce(_ && _)
}

check(forAll(prop[SQLDate, Long] _))
Expand All @@ -115,21 +191,50 @@ class OrderByTests extends TypedDatasetSuite with Matchers {
}

test("three columns non nullable partition sorting") {
def prop[A: TypedEncoder : CatalystOrdered, B: TypedEncoder : CatalystOrdered](data: Vector[X3[A,B,A]]): Prop = {
def prop[
A: TypedEncoder: CatalystOrdered,
B: TypedEncoder: CatalystOrdered
](data: Vector[X3[A, B, A]]
): Prop = {
val ds = TypedDataset.create(data)

sortings[A, X3[A, B, A]].reverse
.zip(sortings[B, X3[A, B, A]])
.zip(sortings[A, X3[A, B, A]])
.map { case (((typA, untypA), (typB, untypB)), (typA2, untypA2)) =>
val vanillaSpark = ds.dataset
.sortWithinPartitions(untypA(ds.dataset.col("a")), untypB(ds.dataset.col("b")), untypA2(ds.dataset.col("c")))
.collect().toVector

vanillaSpark.?=(ds.sortWithinPartitions(typA(ds('a)), typB(ds('b)), typA2(ds('c))).collect().run().toVector).&&(
vanillaSpark ?= ds.sortWithinPartitionsMany(typA(ds('a)), typB(ds('b)), typA2(ds('c))).collect().run().toVector
)
}.reduce(_ && _)
.map {
case (((typA, untypA), (typB, untypB)), (typA2, untypA2)) =>
val vanillaSpark = ds.dataset
.sortWithinPartitions(
untypA(ds.dataset.col("a")),
untypB(ds.dataset.col("b")),
untypA2(ds.dataset.col("c"))
)
.collect()
.toVector

vanillaSpark
.?=(
ds.sortWithinPartitions(
typA(ds('a)),
typB(ds('b)),
typA2(ds('c))
).collect()
.run()
.toVector
)
.&&(
vanillaSpark ?= ds
.sortWithinPartitionsMany(
typA(ds('a)),
typB(ds('b)),
typA2(ds('c))
)
.collect()
.run()
.toVector
)
}
.reduce(_ && _)
}

check(forAll(prop[SQLDate, Long] _))
Expand All @@ -138,13 +243,28 @@ class OrderByTests extends TypedDatasetSuite with Matchers {
}

test("sort support for mixed default and explicit ordering") {
def prop[A: TypedEncoder : CatalystOrdered, B: TypedEncoder : CatalystOrdered](data: Vector[X2[A, B]]): Prop = {
def prop[
A: TypedEncoder: CatalystOrdered,
B: TypedEncoder: CatalystOrdered
](data: Vector[X2[A, B]]
): Prop = {
val ds = TypedDataset.create(data)

ds.dataset.orderBy(ds.dataset.col("a"), ds.dataset.col("b").desc).collect().toVector.?=(
ds.orderByMany(ds('a), ds('b).desc).collect().run().toVector) &&
ds.dataset.sortWithinPartitions(ds.dataset.col("a"), ds.dataset.col("b").desc).collect().toVector.?=(
ds.sortWithinPartitionsMany(ds('a), ds('b).desc).collect().run().toVector)
ds.dataset
.orderBy(ds.dataset.col("a"), ds.dataset.col("b").desc)
.collect()
.toVector
.?=(ds.orderByMany(ds('a), ds('b).desc).collect().run().toVector) &&
ds.dataset
.sortWithinPartitions(ds.dataset.col("a"), ds.dataset.col("b").desc)
.collect()
.toVector
.?=(
ds.sortWithinPartitionsMany(ds('a), ds('b).desc)
.collect()
.run()
.toVector
)
}

check(forAll(prop[SQLDate, Long] _))
Expand All @@ -159,50 +279,75 @@ class OrderByTests extends TypedDatasetSuite with Matchers {
illTyped("""d.sortWithinPartitions(d('b).desc)""")
}

test("derives a CatalystOrdered for case classes when all fields are comparable") {
test(
"derives a CatalystOrdered for case classes when all fields are comparable"
) {
type T[A, B] = X3[Int, Boolean, X2[A, B]]
def prop[
A: TypedEncoder : CatalystOrdered,
B: TypedEncoder : CatalystOrdered
](data: Vector[T[A, B]]): Prop = {
A: TypedEncoder: CatalystOrdered,
B: TypedEncoder: CatalystOrdered
](data: Vector[T[A, B]]
): Prop = {
val ds = TypedDataset.create(data)

sortings[X2[A, B], T[A, B]].map { case (typX2, untypX2) =>
val vanilla = ds.dataset.orderBy(untypX2(ds.dataset.col("c"))).collect().toVector.map(_.c)
val frameless = ds.orderBy(typX2(ds('c))).collect().run.toVector.map(_.c)
vanilla ?= frameless
sortings[X2[A, B], T[A, B]].map {
case (typX2, untypX2) =>
val vanilla = ds.dataset
.orderBy(untypX2(ds.dataset.col("c")))
.collect()
.toVector
.map(_.c)
val frameless =
ds.orderBy(typX2(ds('c))).collect().run.toVector.map(_.c)
vanilla ?= frameless
}.reduce(_ && _)
}

check(forAll(prop[Int, Long] _))
check(forAll(prop[(String, SQLDate), Float] _))
// Check that nested case classes are properly derived too
check(forAll(prop[X2[Boolean, Float], X4[SQLTimestamp, Double, Short, Byte]] _))
check(
forAll(prop[X2[Boolean, Float], X4[SQLTimestamp, Double, Short, Byte]] _)
)
}

test("derives a CatalystOrdered for tuples when all fields are comparable") {
type T[A, B] = X2[Int, (A, B)]
def prop[
A: TypedEncoder : CatalystOrdered,
B: TypedEncoder : CatalystOrdered
](data: Vector[T[A, B]]): Prop = {
A: TypedEncoder: CatalystOrdered,
B: TypedEncoder: CatalystOrdered
](data: Vector[T[A, B]]
): Prop = {
val ds = TypedDataset.create(data)

sortings[(A, B), T[A, B]].map { case (typX2, untypX2) =>
val vanilla = ds.dataset.orderBy(untypX2(ds.dataset.col("b"))).collect().toVector.map(_.b)
val frameless = ds.orderBy(typX2(ds('b))).collect().run.toVector.map(_.b)
vanilla ?= frameless
sortings[(A, B), T[A, B]].map {
case (typX2, untypX2) =>
val vanilla = ds.dataset
.orderBy(untypX2(ds.dataset.col("b")))
.collect()
.toVector
.map(_.b)
val frameless =
ds.orderBy(typX2(ds('b))).collect().run.toVector.map(_.b)
vanilla ?= frameless
}.reduce(_ && _)
}

check(forAll(prop[Int, Long] _))
check(forAll(prop[(String, SQLDate), Float] _))
check(forAll(prop[X2[Boolean, Float], X1[(SQLTimestamp, Double, Short, Byte)]] _))
check(
forAll(
prop[X2[Boolean, Float], X1[(SQLTimestamp, Double, Short, Byte)]] _
)
)
}

test("fails to compile when one of the field isn't comparable") {
type T = X2[Int, X2[Int, Map[String, String]]]
val d = TypedDataset.create(X2(1, X2(2, Map("not" -> "comparable"))) :: Nil)
illTyped("d.orderBy(d('b).desc)", """Cannot compare columns of type frameless.X2\[Int,scala.collection.immutable.Map\[String,String]].""")
illTyped(
"d.orderBy(d('b).desc)",
"""Cannot compare columns of type frameless.X2\[Int,scala.collection.immutable.Map\[String,String]]."""
)
}
}
Loading

0 comments on commit e582962

Please sign in to comment.