Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix row level bug when composing outcome #594

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/main/scala/com/amazon/deequ/VerificationResult.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import com.amazon.deequ.repository.SimpleResultSerde
import org.apache.spark.sql.Column
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.functions.{col, monotonically_increasing_id}

import java.util.UUID
Expand Down Expand Up @@ -144,9 +145,9 @@ object VerificationResult {
val constraint = constraintResult.constraint
constraint match {
case asserted: RowLevelAssertedConstraint =>
constraintResult.metric.flatMap(metricToColumn).map(asserted.assertion(_))
constraintResult.metric.flatMap(metricToColumn).map(asserted.assertion(_)).orElse(Some(lit(false)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably need to look at the code again, but could you remind me the difference between RowLevelAssertedConstraint and RowLevelConstraint / why we don't need to add it for the latter? Are RowLevelConstraint for checks like Completeness, Uniqueness where row level results are boolean and RowLevelAssertedConstraint are for checks like Minimum, Sum where we need to compare the row-level value to a given assertion?

case _: RowLevelConstraint =>
constraintResult.metric.flatMap(metricToColumn)
constraintResult.metric.flatMap(metricToColumn).orElse(Some(lit(false)))
case _: RowLevelGroupedConstraint =>
constraintResult.metric.flatMap(metricToColumn)
case _ => None
Expand All @@ -160,7 +161,6 @@ object VerificationResult {
}
}


private[this] def getSimplifiedCheckResultOutput(
verificationResult: VerificationResult)
: Seq[SimpleCheckResultOutput] = {
Expand Down
56 changes: 56 additions & 0 deletions src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1996,6 +1996,62 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec
}

"Verification Suite's Row Level Results" should {
"yield correct results for invalid column type" in withSparkSession { sparkSession =>
import sparkSession.implicits._
val df = Seq(
("1", 1, "blue"),
("2", 2, "green"),
("3", 3, "blue"),
("4", 4, "red"),
("5", 5, "purple")
).toDF("id", "id2", "color")

val idColumn = "id"
val id2Column = "id2"

val minCheckOnInvalidColumnDescription = s"min check on $idColumn"
val minCheckOnValidColumnDescription = s"min check on $id2Column"
val patternMatchCheckOnInvalidColumnDescription = s"pattern check on $id2Column"
val patternMatchCheckOnValidColumnDescription = s"pattern check on $idColumn"

val minCheckOnInvalidColumn = Check(CheckLevel.Error, minCheckOnInvalidColumnDescription)
.hasMin(idColumn, _ >= 3)
.isComplete(idColumn)
val minCheckOnValidColumn = Check(CheckLevel.Error, minCheckOnValidColumnDescription)
.hasMin(id2Column, _ >= 3)
.isComplete(id2Column)

val patternMatchCheckOnInvalidColumn = Check(CheckLevel.Error, patternMatchCheckOnInvalidColumnDescription)
.hasPattern(id2Column, "[0-3]+".r)
val patternMatchCheckOnValidColumn = Check(CheckLevel.Error, patternMatchCheckOnValidColumnDescription)
.hasPattern(idColumn, "[0-3]+".r)

val checks = Seq(
minCheckOnInvalidColumn,
minCheckOnValidColumn,
patternMatchCheckOnInvalidColumn,
patternMatchCheckOnValidColumn
)

val verificationResult = VerificationSuite().onData(df).addChecks(checks).run()
val rowLevelResultsDF = VerificationResult.rowLevelResultsAsDataFrame(sparkSession, verificationResult, df)
val rowLevelResults = rowLevelResultsDF.collect()

val minCheckOnInvalidColumnRowLevelResults =
rowLevelResults.map(_.getAs[Boolean](minCheckOnInvalidColumnDescription))
val minCheckOnValidColumnRowLevelResults =
rowLevelResults.map(_.getAs[Boolean](minCheckOnValidColumnDescription))
val patternMatchCheckOnInvalidColumnRowLevelResults =
rowLevelResults.map(_.getAs[Boolean](patternMatchCheckOnInvalidColumnDescription))
val patternMatchCheckOnValidColumnRowLevelResults =
rowLevelResults.map(_.getAs[Boolean](patternMatchCheckOnValidColumnDescription))

minCheckOnInvalidColumnRowLevelResults shouldBe Seq(false, false, false, false, false)
minCheckOnValidColumnRowLevelResults shouldBe Seq(false, false, true, true, true)
patternMatchCheckOnInvalidColumnRowLevelResults shouldBe Seq(false, false, false, false, false)
patternMatchCheckOnValidColumnRowLevelResults shouldBe Seq(true, true, true, false, false)
}

"yield correct results for satisfies check" in withSparkSession { sparkSession =>
import sparkSession.implicits._
val df = Seq(
Expand Down
Loading