Skip to content

Commit

Permalink
Improve tests, reduce duplicated code, test group sort order
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Jul 30, 2024
1 parent 2bb65be commit 80db558
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1545,7 +1545,7 @@ class AnalysisSuite extends AnalysisTest with Matchers {

test("SPARK-42199: resolve expression against scoped attributes") {
val plan = testRelation.select($"a".as("value")).analyze
implicit val intEncoder = ExpressionEncoder[Int]
implicit val intEncoder = ExpressionEncoder[Int]()
val appendCols = AppendColumns[Int, Int]((x: Int) => x, plan)

// AppendColumns adds a duplicate 'value' column, which makes $"value" ambiguous
Expand Down
97 changes: 62 additions & 35 deletions sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -567,39 +567,66 @@ class DatasetSuite extends QueryTest
(1, 1))
}

test("SPARK-42199: groupBy function, unresolved reference suggestions") {
test("SPARK-42199: groupBy function, agg, unresolved reference suggestions") {
checkError(
exception = intercept[AnalysisException] {
spark.range(10).groupByKey(id => id).agg(count("unknown"))
},
errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
parameters = Map("objectName" -> "`unknown`", "proposal" -> "`id`"))
parameters = Map("objectName" -> "`unknown`", "proposal" -> "`id`"),
context = ExpectedContext(fragment = "count", getCurrentClassCallSitePattern))
}

test("SPARK-42199: groupBy function with mapValues, unresolved reference suggestions") {
test("SPARK-42199: groupBy function, flatMapSortedGroups, unresolved reference suggestions") {
checkError(
exception = intercept[AnalysisException] {
spark.range(10).groupByKey(id => id).flatMapSortedGroups($"unknown") {
case (g, it) => Iterator((g, it.mkString(", ")))
}
},
errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
parameters = Map("objectName" -> "`unknown`", "proposal" -> "`id`"),
context = ExpectedContext(fragment = "$", getCurrentClassCallSitePattern))
}

test("SPARK-42199: groupBy function, mapValues, agg, unresolved reference suggestions") {
checkError(
exception = intercept[AnalysisException] {
spark.range(10).groupByKey(id => id).mapValues(id => id).agg(count("unknown"))
},
errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
parameters = Map("objectName" -> "`unknown`", "proposal" -> "`value`"))
parameters = Map("objectName" -> "`unknown`", "proposal" -> "`value`"),
context = ExpectedContext(fragment = "count", getCurrentClassCallSitePattern))
}

test("SPARK-42199: group by function, agg expr resolution") {
val actual2 = spark.range(3)
test("SPARK-42199: groupBy function, mapValues, flatMapSortedGroups, " +
"unresolved reference suggestions") {
checkError(
exception = intercept[AnalysisException] {
spark.range(10).groupByKey(id => id).mapValues(id => id).flatMapSortedGroups($"unknown") {
case (g, it) => Iterator((g, it.mkString(", ")))
}
},
errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
parameters = Map("objectName" -> "`unknown`", "proposal" -> "`value`"),
context = ExpectedContext(fragment = "$", getCurrentClassCallSitePattern))
}

test("SPARK-42199: groupBy function, agg expr resolution") {
val actual = spark.range(3)
.withColumnRenamed("id", "value").as[Long] // add column 'value' to dataset
.groupByKey(value => value * 2) // produces key column 'value'
.agg(sum("value").as[Long]) // 'value' does not resolve to key column
.collect()
assert(actual2.sorted === Seq((0, 0), (2, 1), (4, 2)))
assert(actual.sorted === Seq((0, 0), (2, 1), (4, 2)))

val actual3 = spark.range(3)
val actual2 = spark.range(3)
.withColumnRenamed("id", "value").as[Long] // add column 'value' to dataset
.groupByKey(value => value * 2) // produces key column 'value'
.mapValues(value => value * -1) // replaces value column 'value'
.agg(sum("value").as[Long]) // 'value' does not resolve to key column
.collect()
assert(actual3.sorted === Seq((0, 0), (2, -1), (4, -2)))
assert(actual2.sorted === Seq((0, 0), (2, -1), (4, -2)))
}

test("groupBy function, map") {
Expand Down Expand Up @@ -735,36 +762,36 @@ class DatasetSuite extends QueryTest
assert(result.sortBy(_.a) === Seq(K1(0), K1(0), K1(1), K1(1)))
}

test("SPARK-42199: groupBy function, flatMapSorted expr resolution") {
test("SPARK-42199: groupBy function, flatMapSortedGroups expr resolution") {
val ds = Seq(("a", 1, 10), ("a", 2, 20), ("b", 2, 1), ("b", 1, 2), ("c", 1, 1))
.toDF("key", "seq", "value")

// groupByKey produces key column 'value'
val grouped2 = ds.groupByKey(v => v.getString(0))
val grouped = ds.groupByKey(v => v.getString(0))
// 'value' does not resolve to key column
val aggregated2 = grouped2.flatMapSortedGroups($"value") {
val aggregated = grouped.flatMapSortedGroups($"value") {
(g, iter) => Iterator(g, iter.mkString(", "))
}

checkDatasetUnorderly(
aggregated2,
aggregated,
"a", "[a,1,10], [a,2,20]",
"b", "[b,2,1], [b,1,2]",
"c", "[c,1,1]"
)


// groupByKey produces key column 'value'
val grouped3 = ds.groupByKey(v => v.getString(0))
val grouped2 = ds.groupByKey(v => v.getString(0))
// mapValues replaces value column 'value'
.mapValues(v => v.getInt(1) * -1)
// 'value' does not resolve to key column
val aggregated3 = grouped3.flatMapSortedGroups($"value") {
val aggregated2 = grouped2.flatMapSortedGroups($"value") {
(g, iter) => Iterator(g, iter.mkString(", "))
}

checkDatasetUnorderly(
aggregated3,
aggregated2,
"a", "-2, -1",
"b", "-2, -1",
"c", "-1"
Expand Down Expand Up @@ -1044,41 +1071,41 @@ class DatasetSuite extends QueryTest
}

test("SPARK-42199: cogroup sorted, sort expr resolution") {
val left = Seq(1 -> "a", 3 -> "xyz", 5 -> "hello", 3 -> "abc", 3 -> "ijk").toDS()
val right = Seq(2 -> "q", 3 -> "w", 5 -> "x", 5 -> "z", 3 -> "a", 5 -> "y").toDS()
val left = Seq(1 -> "a", 3 -> "xyz", 5 -> "hello", 3 -> "abcx", 3 -> "ijk").toDF("id", "value")
val right = Seq(2 -> "q", 3 -> "w", 5 -> "x", 5 -> "z", 3 -> "a", 5 -> "y").toDF("id", "value")

// dataset column 'value' coexists with key column 'value' produced by groupByKey
val groupedLeft2 = left.toDF("id", "value").groupByKey(_.getInt(0))
val groupedRight2 = right.toDF("id", "value").groupByKey(_.getInt(0))
val groupedLeft = left.groupByKey(_.getInt(0))
val groupedRight = right.groupByKey(_.getInt(0))

// 'value' does not resolve to key column
val actual2 = groupedLeft2.cogroupSorted(groupedRight2)($"value")($"value") {
val actual = groupedLeft.cogroupSorted(groupedRight)($"value")($"value".desc) {
(key, left, right) => Iterator(key -> (left.mkString + "#" + right.mkString))
}
checkDatasetUnorderly(
actual2,
actual,
1 -> "[1,a]#",
2 -> "#[2,q]",
3 -> "[3,abc][3,ijk][3,xyz]#[3,a][3,w]",
5 -> "[5,hello]#[5,x][5,y][5,z]")
3 -> "[3,abcx][3,ijk][3,xyz]#[3,w][3,a]",
5 -> "[5,hello]#[5,z][5,y][5,x]")


val groupedLeft3 = left.toDF("id", "value")
// dataset column 'value' coexists with key column 'value' produced by groupByKey
.groupByKey(_.getInt(0))
val groupedLeft2 = groupedLeft
// mapValues replaces value column 'value'
.mapValues(_.getInt(0) * -1)
val groupedRight3 = right.toDF("id", "value")
// dataset column 'value' coexists with key column 'value' produced by groupByKey
.groupByKey(_.getInt(0))
.mapValues(_.getString(1).reverse)
val groupedRight2 = groupedRight
// mapValues replaces value column 'value'
.mapValues(_.getInt(0) * -1)
.mapValues(_.getString(1).reverse)
// 'value' does not resolve to key column
val actual3 = groupedLeft3.cogroupSorted(groupedRight3)($"value")($"value") {
val actual2 = groupedLeft2.cogroupSorted(groupedRight2)($"value")($"value".desc) {
(key, left, right) => Iterator(key -> (left.mkString + "#" + right.mkString))
}
checkDatasetUnorderly(
actual3,
1 -> "-1#", 2 -> "#-2", 3 -> "-3-3-3#-3-3", 5 -> "-5#-5-5-5")
actual2,
1 -> "a#",
2 -> "#q",
3 -> "kjixcbazyx#wa",
5 -> "olleh#zyx")
}

test("SPARK-34806: observation on datasets") {
Expand Down

0 comments on commit 80db558

Please sign in to comment.