From ddc0942ab903bf6ec2b0bb82ccbf8d22d2d1e419 Mon Sep 17 00:00:00 2001 From: Rahul Sharma Date: Mon, 16 Dec 2024 16:58:18 -0500 Subject: [PATCH 1/2] Fix row level bug when composing outcome - When a check fails due to a precondition failure, the row level results are not evaluated correctly. - For example, let's say a check has a completeness constraint which passes, and a minimum constraint which fails due to a precondition failure. - The row level results will be the results for just the completeness constraint. There will be no results generated for the minimum constraint, and therefore the row level results will be incorrect. - We fix this by adding a default outcome for when the row level result column is not provided by the analyzer. --- .../com/amazon/deequ/VerificationResult.scala | 4 +-- .../amazon/deequ/VerificationSuiteTest.scala | 29 +++++++++++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/amazon/deequ/VerificationResult.scala b/src/main/scala/com/amazon/deequ/VerificationResult.scala index b9b450f2..24b68726 100644 --- a/src/main/scala/com/amazon/deequ/VerificationResult.scala +++ b/src/main/scala/com/amazon/deequ/VerificationResult.scala @@ -31,6 +31,7 @@ import com.amazon.deequ.repository.SimpleResultSerde import org.apache.spark.sql.Column import org.apache.spark.sql.DataFrame import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions.lit import org.apache.spark.sql.functions.{col, monotonically_increasing_id} import java.util.UUID @@ -144,7 +145,7 @@ object VerificationResult { val constraint = constraintResult.constraint constraint match { case asserted: RowLevelAssertedConstraint => - constraintResult.metric.flatMap(metricToColumn).map(asserted.assertion(_)) + constraintResult.metric.flatMap(metricToColumn).map(asserted.assertion(_)).orElse(Some(lit(false))) case _: RowLevelConstraint => constraintResult.metric.flatMap(metricToColumn) case _: RowLevelGroupedConstraint => @@ -160,7 +161,6 @@ object VerificationResult { } } - private[this] def getSimplifiedCheckResultOutput( verificationResult: VerificationResult) : Seq[SimpleCheckResultOutput] = { diff --git a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala index 146579e8..fecc8c27 100644 --- a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala +++ b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala @@ -1996,6 +1996,35 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec } "Verification Suite's Row Level Results" should { + "yield correct results for invalid column type" in withSparkSession { sparkSession => + import sparkSession.implicits._ + val df = Seq( + ("1", 1, "blue"), + ("2", 2, "green"), + ("3", 3, "blue"), + ("4", 4, "red"), + ("5", 5, "purple") + ).toDF("id", "id2", "color") + val invalidColumn = "id" + val validColumn = "id2" + val checkOnInvalidColumnDescription = s"check on $invalidColumn" + val checkOnValidColumnDescription = s"check on $validColumn" + val checkOnInvalidColumn = Check(CheckLevel.Error, checkOnInvalidColumnDescription) + .hasMin(invalidColumn, _ >= 3) + .isComplete(invalidColumn) + val checkOnValidColumn = Check(CheckLevel.Error, checkOnValidColumnDescription) + .hasMin(validColumn, _ >= 3) + .isComplete(validColumn) + val verificationResult = + VerificationSuite().onData(df).addChecks(Seq(checkOnInvalidColumn, checkOnValidColumn)).run() + val rowLevelResults = + VerificationResult.rowLevelResultsAsDataFrame(sparkSession, verificationResult, df).collect() + val invalidColumnCheckRowLevelResults = rowLevelResults.map(_.getAs[Boolean](checkOnInvalidColumnDescription)) + val validColumnCheckRowLevelResults = rowLevelResults.map(_.getAs[Boolean](checkOnValidColumnDescription)) + invalidColumnCheckRowLevelResults shouldBe Seq(false, false, false, false, false) + validColumnCheckRowLevelResults shouldBe Seq(false, false, true, true, true) + } + "yield correct results for satisfies check" in withSparkSession { sparkSession => import sparkSession.implicits._ val df = Seq( From 2e7b2d8644ba14fc8106a6212d1f98763114f38a Mon Sep 17 00:00:00 2001 From: Rahul Sharma Date: Tue, 17 Dec 2024 11:55:27 -0500 Subject: [PATCH 2/2] Added similar logic to RowLevelConstraint as well Skipped RowLevelGroupedConstraint because only UniqueValueRatio/Uniqueness use it, and they don't use preconditions. --- .../com/amazon/deequ/VerificationResult.scala | 2 +- .../amazon/deequ/VerificationSuiteTest.scala | 63 +++++++++++++------ 2 files changed, 46 insertions(+), 19 deletions(-) diff --git a/src/main/scala/com/amazon/deequ/VerificationResult.scala b/src/main/scala/com/amazon/deequ/VerificationResult.scala index 24b68726..a6b4b37c 100644 --- a/src/main/scala/com/amazon/deequ/VerificationResult.scala +++ b/src/main/scala/com/amazon/deequ/VerificationResult.scala @@ -147,7 +147,7 @@ object VerificationResult { case asserted: RowLevelAssertedConstraint => constraintResult.metric.flatMap(metricToColumn).map(asserted.assertion(_)).orElse(Some(lit(false))) case _: RowLevelConstraint => - constraintResult.metric.flatMap(metricToColumn) + constraintResult.metric.flatMap(metricToColumn).orElse(Some(lit(false))) case _: RowLevelGroupedConstraint => constraintResult.metric.flatMap(metricToColumn) case _ => None diff --git a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala index fecc8c27..49ab00aa 100644 --- a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala +++ b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala @@ -2005,24 +2005,51 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec ("4", 4, "red"), ("5", 5, "purple") ).toDF("id", "id2", "color") - val invalidColumn = "id" - val validColumn = "id2" - val checkOnInvalidColumnDescription = s"check on $invalidColumn" - val checkOnValidColumnDescription = s"check on $validColumn" - val checkOnInvalidColumn = Check(CheckLevel.Error, checkOnInvalidColumnDescription) - .hasMin(invalidColumn, _ >= 3) - .isComplete(invalidColumn) - val checkOnValidColumn = Check(CheckLevel.Error, checkOnValidColumnDescription) - .hasMin(validColumn, _ >= 3) - .isComplete(validColumn) - val verificationResult = - VerificationSuite().onData(df).addChecks(Seq(checkOnInvalidColumn, checkOnValidColumn)).run() - val rowLevelResults = - VerificationResult.rowLevelResultsAsDataFrame(sparkSession, verificationResult, df).collect() - val invalidColumnCheckRowLevelResults = rowLevelResults.map(_.getAs[Boolean](checkOnInvalidColumnDescription)) - val validColumnCheckRowLevelResults = rowLevelResults.map(_.getAs[Boolean](checkOnValidColumnDescription)) - invalidColumnCheckRowLevelResults shouldBe Seq(false, false, false, false, false) - validColumnCheckRowLevelResults shouldBe Seq(false, false, true, true, true) + + val idColumn = "id" + val id2Column = "id2" + + val minCheckOnInvalidColumnDescription = s"min check on $idColumn" + val minCheckOnValidColumnDescription = s"min check on $id2Column" + val patternMatchCheckOnInvalidColumnDescription = s"pattern check on $id2Column" + val patternMatchCheckOnValidColumnDescription = s"pattern check on $idColumn" + + val minCheckOnInvalidColumn = Check(CheckLevel.Error, minCheckOnInvalidColumnDescription) + .hasMin(idColumn, _ >= 3) + .isComplete(idColumn) + val minCheckOnValidColumn = Check(CheckLevel.Error, minCheckOnValidColumnDescription) + .hasMin(id2Column, _ >= 3) + .isComplete(id2Column) + + val patternMatchCheckOnInvalidColumn = Check(CheckLevel.Error, patternMatchCheckOnInvalidColumnDescription) + .hasPattern(id2Column, "[0-3]+".r) + val patternMatchCheckOnValidColumn = Check(CheckLevel.Error, patternMatchCheckOnValidColumnDescription) + .hasPattern(idColumn, "[0-3]+".r) + + val checks = Seq( + minCheckOnInvalidColumn, + minCheckOnValidColumn, + patternMatchCheckOnInvalidColumn, + patternMatchCheckOnValidColumn + ) + + val verificationResult = VerificationSuite().onData(df).addChecks(checks).run() + val rowLevelResultsDF = VerificationResult.rowLevelResultsAsDataFrame(sparkSession, verificationResult, df) + val rowLevelResults = rowLevelResultsDF.collect() + + val minCheckOnInvalidColumnRowLevelResults = + rowLevelResults.map(_.getAs[Boolean](minCheckOnInvalidColumnDescription)) + val minCheckOnValidColumnRowLevelResults = + rowLevelResults.map(_.getAs[Boolean](minCheckOnValidColumnDescription)) + val patternMatchCheckOnInvalidColumnRowLevelResults = + rowLevelResults.map(_.getAs[Boolean](patternMatchCheckOnInvalidColumnDescription)) + val patternMatchCheckOnValidColumnRowLevelResults = + rowLevelResults.map(_.getAs[Boolean](patternMatchCheckOnValidColumnDescription)) + + minCheckOnInvalidColumnRowLevelResults shouldBe Seq(false, false, false, false, false) + minCheckOnValidColumnRowLevelResults shouldBe Seq(false, false, true, true, true) + patternMatchCheckOnInvalidColumnRowLevelResults shouldBe Seq(false, false, false, false, false) + patternMatchCheckOnValidColumnRowLevelResults shouldBe Seq(true, true, true, false, false) } "yield correct results for satisfies check" in withSparkSession { sparkSession =>