diff --git a/src/main/scala/com/amazon/deequ/VerificationResult.scala b/src/main/scala/com/amazon/deequ/VerificationResult.scala index b9b450f2..a6b4b37c 100644 --- a/src/main/scala/com/amazon/deequ/VerificationResult.scala +++ b/src/main/scala/com/amazon/deequ/VerificationResult.scala @@ -31,6 +31,7 @@ import com.amazon.deequ.repository.SimpleResultSerde import org.apache.spark.sql.Column import org.apache.spark.sql.DataFrame import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions.lit import org.apache.spark.sql.functions.{col, monotonically_increasing_id} import java.util.UUID @@ -144,9 +145,9 @@ object VerificationResult { val constraint = constraintResult.constraint constraint match { case asserted: RowLevelAssertedConstraint => - constraintResult.metric.flatMap(metricToColumn).map(asserted.assertion(_)) + constraintResult.metric.flatMap(metricToColumn).map(asserted.assertion(_)).orElse(Some(lit(false))) case _: RowLevelConstraint => - constraintResult.metric.flatMap(metricToColumn) + constraintResult.metric.flatMap(metricToColumn).orElse(Some(lit(false))) case _: RowLevelGroupedConstraint => constraintResult.metric.flatMap(metricToColumn) case _ => None @@ -160,7 +161,6 @@ object VerificationResult { } } - private[this] def getSimplifiedCheckResultOutput( verificationResult: VerificationResult) : Seq[SimpleCheckResultOutput] = { diff --git a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala index 146579e8..49ab00aa 100644 --- a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala +++ b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala @@ -1996,6 +1996,62 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec } "Verification Suite's Row Level Results" should { + "yield correct results for invalid column type" in withSparkSession { sparkSession => + import sparkSession.implicits._ + val df = Seq( + ("1", 1, "blue"), + ("2", 2, "green"), + ("3", 3, "blue"), + ("4", 4, "red"), + ("5", 5, "purple") + ).toDF("id", "id2", "color") + + val idColumn = "id" + val id2Column = "id2" + + val minCheckOnInvalidColumnDescription = s"min check on $idColumn" + val minCheckOnValidColumnDescription = s"min check on $id2Column" + val patternMatchCheckOnInvalidColumnDescription = s"pattern check on $id2Column" + val patternMatchCheckOnValidColumnDescription = s"pattern check on $idColumn" + + val minCheckOnInvalidColumn = Check(CheckLevel.Error, minCheckOnInvalidColumnDescription) + .hasMin(idColumn, _ >= 3) + .isComplete(idColumn) + val minCheckOnValidColumn = Check(CheckLevel.Error, minCheckOnValidColumnDescription) + .hasMin(id2Column, _ >= 3) + .isComplete(id2Column) + + val patternMatchCheckOnInvalidColumn = Check(CheckLevel.Error, patternMatchCheckOnInvalidColumnDescription) + .hasPattern(id2Column, "[0-3]+".r) + val patternMatchCheckOnValidColumn = Check(CheckLevel.Error, patternMatchCheckOnValidColumnDescription) + .hasPattern(idColumn, "[0-3]+".r) + + val checks = Seq( + minCheckOnInvalidColumn, + minCheckOnValidColumn, + patternMatchCheckOnInvalidColumn, + patternMatchCheckOnValidColumn + ) + + val verificationResult = VerificationSuite().onData(df).addChecks(checks).run() + val rowLevelResultsDF = VerificationResult.rowLevelResultsAsDataFrame(sparkSession, verificationResult, df) + val rowLevelResults = rowLevelResultsDF.collect() + + val minCheckOnInvalidColumnRowLevelResults = + rowLevelResults.map(_.getAs[Boolean](minCheckOnInvalidColumnDescription)) + val minCheckOnValidColumnRowLevelResults = + rowLevelResults.map(_.getAs[Boolean](minCheckOnValidColumnDescription)) + val patternMatchCheckOnInvalidColumnRowLevelResults = + rowLevelResults.map(_.getAs[Boolean](patternMatchCheckOnInvalidColumnDescription)) + val patternMatchCheckOnValidColumnRowLevelResults = + rowLevelResults.map(_.getAs[Boolean](patternMatchCheckOnValidColumnDescription)) + + minCheckOnInvalidColumnRowLevelResults shouldBe Seq(false, false, false, false, false) + minCheckOnValidColumnRowLevelResults shouldBe Seq(false, false, true, true, true) + patternMatchCheckOnInvalidColumnRowLevelResults shouldBe Seq(false, false, false, false, false) + patternMatchCheckOnValidColumnRowLevelResults shouldBe Seq(true, true, true, false, false) + } + "yield correct results for satisfies check" in withSparkSession { sparkSession => import sparkSession.implicits._ val df = Seq(