Skip to content

Commit

Permalink
Add analyzerOption to add filteredRowOutcome for isPrimaryKey Check (#…
Browse files Browse the repository at this point in the history
…537)

* Add analyzerOption to add filteredRowOutcome for isPrimaryKey Check
* Add analyzerOption to add filteredRowOutcome for hasUniqueValueRatio Check
  • Loading branch information
eycho-am committed Feb 23, 2024
1 parent 6234d6b commit b3000e2
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 9 deletions.
27 changes: 23 additions & 4 deletions src/main/scala/com/amazon/deequ/checks/Check.scala
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,27 @@ case class Check(
* @param hint A hint to provide additional context why a constraint could have failed
* @return
*/
def isPrimaryKey(column: String, hint: Option[String], columns: String*)
def isPrimaryKey(column: String, hint: Option[String],
analyzerOptions: Option[AnalyzerOptions], columns: String*)
: CheckWithLastConstraintFilterable = {
addFilterableConstraint { filter =>
uniquenessConstraint(column :: columns.toList, Check.IsOne, filter, hint) }
uniquenessConstraint(column :: columns.toList, Check.IsOne, filter, hint, analyzerOptions) }
}

/**
* Creates a constraint that asserts on a column(s) primary key characteristics.
* Currently only checks uniqueness, but reserved for primary key checks if there is another
* assertion to run on primary key columns.
*
* @param column Columns to run the assertion on
* @param hint A hint to provide additional context why a constraint could have failed
* @return
*/
def isPrimaryKey(column: String, hint: Option[String], columns: String*)
: CheckWithLastConstraintFilterable = {
addFilterableConstraint { filter =>
uniquenessConstraint(column :: columns.toList, Check.IsOne, filter, hint)
}
}

/**
Expand Down Expand Up @@ -380,16 +397,18 @@ case class Check(
* @param assertion Function that receives a double input parameter and returns a boolean.
* Refers to the fraction of distinct values.
* @param hint A hint to provide additional context why a constraint could have failed
* @param analyzerOptions Options to configure analyzer behavior (NullTreatment, FilteredRow)
* @return
*/
def hasUniqueValueRatio(
columns: Seq[String],
assertion: Double => Boolean,
hint: Option[String] = None)
hint: Option[String] = None,
analyzerOptions: Option[AnalyzerOptions] = None)
: CheckWithLastConstraintFilterable = {

addFilterableConstraint { filter =>
uniqueValueRatioConstraint(columns, assertion, filter, hint) }
uniqueValueRatioConstraint(columns, assertion, filter, hint, analyzerOptions) }
}

/**
Expand Down
6 changes: 4 additions & 2 deletions src/main/scala/com/amazon/deequ/constraints/Constraint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -304,15 +304,17 @@ object Constraint {
* (since the metric is double metric) and returns a boolean
* @param where Additional filter to apply before the analyzer is run.
* @param hint A hint to provide additional context why a constraint could have failed
* @param analyzerOptions Options to configure analyzer behavior (NullTreatment, FilteredRow)
*/
def uniqueValueRatioConstraint(
columns: Seq[String],
assertion: Double => Boolean,
where: Option[String] = None,
hint: Option[String] = None)
hint: Option[String] = None,
analyzerOptions: Option[AnalyzerOptions] = None)
: Constraint = {

val uniqueValueRatio = UniqueValueRatio(columns, where)
val uniqueValueRatio = UniqueValueRatio(columns, where, analyzerOptions)
fromAnalyzer(uniqueValueRatio, assertion, hint)
}

Expand Down
41 changes: 38 additions & 3 deletions src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -324,14 +324,21 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec
val patternMatch = new Check(CheckLevel.Error, "rule6")
.hasPattern("att2", """(^f)""".r)
.where("item < 4")
val isPrimaryKey = new Check(CheckLevel.Error, "rule7")
.isPrimaryKey("item")
.where("item < 3")
val uniqueValueRatio = new Check(CheckLevel.Error, "rule8")
.hasUniqueValueRatio(Seq("att1"), _ >= 0.5)
.where("item < 4")

val expectedColumn1 = completeness.description
val expectedColumn2 = uniqueness.description
val expectedColumn3 = uniquenessWhere.description
val expectedColumn4 = min.description
val expectedColumn5 = max.description
val expectedColumn6 = patternMatch.description

val expectedColumn7 = isPrimaryKey.description
val expectedColumn8 = uniqueValueRatio.description

val suite = new VerificationSuite().onData(data)
.addCheck(completeness)
Expand All @@ -340,6 +347,8 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec
.addCheck(min)
.addCheck(max)
.addCheck(patternMatch)
.addCheck(isPrimaryKey)
.addCheck(uniqueValueRatio)

val result: VerificationResult = suite.run()

Expand All @@ -349,7 +358,7 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec
resultData.show(false)
val expectedColumns: Set[String] =
data.columns.toSet + expectedColumn1 + expectedColumn2 + expectedColumn3 +
expectedColumn4 + expectedColumn5 + expectedColumn6
expectedColumn4 + expectedColumn5 + expectedColumn6 + expectedColumn7 + expectedColumn8
assert(resultData.columns.toSet == expectedColumns)

// filtered rows 2,5 (where att1 = "a")
Expand All @@ -374,6 +383,14 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec
// filtered rows 4, 5, 6 (where item < 4)
val rowLevel6 = resultData.select(expectedColumn6).collect().map(r => r.getAs[Any](0))
assert(Seq(true, false, false, true, true, true).sameElements(rowLevel6))

// filtered rows 4, 5, 6 (where item < 4)
val rowLevel7 = resultData.select(expectedColumn7).collect().map(r => r.getAs[Any](0))
assert(Seq(true, true, true, true, true, true).sameElements(rowLevel7))

// filtered rows 4, 5, 6 (where item < 4) row 1 and 3 are the same -> not unique
val rowLevel8 = resultData.select(expectedColumn8).collect().map(r => r.getAs[Any](0))
assert(Seq(false, true, false, true, true, true).sameElements(rowLevel8))
}

"generate a result that contains row-level results with null for filtered rows" in withSparkSession { session =>
Expand All @@ -398,13 +415,21 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec
val patternMatch = new Check(CheckLevel.Error, "rule6")
.hasPattern("att2", """(^f)""".r, analyzerOptions = analyzerOptions)
.where("item < 4")
val isPrimaryKey = new Check(CheckLevel.Error, "rule7")
.isPrimaryKey("item", None, analyzerOptions = analyzerOptions)
.where("item < 4")
val uniqueValueRatio = new Check(CheckLevel.Error, "rule8")
.hasUniqueValueRatio(Seq("att1"), _ >= 0.5, analyzerOptions = analyzerOptions)
.where("item < 4")

val expectedColumn1 = completeness.description
val expectedColumn2 = uniqueness.description
val expectedColumn3 = uniquenessWhere.description
val expectedColumn4 = min.description
val expectedColumn5 = max.description
val expectedColumn6 = patternMatch.description
val expectedColumn7 = isPrimaryKey.description
val expectedColumn8 = uniqueValueRatio.description

val suite = new VerificationSuite().onData(data)
.addCheck(completeness)
Expand All @@ -413,6 +438,8 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec
.addCheck(min)
.addCheck(max)
.addCheck(patternMatch)
.addCheck(isPrimaryKey)
.addCheck(uniqueValueRatio)

val result: VerificationResult = suite.run()

Expand All @@ -422,7 +449,7 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec
resultData.show(false)
val expectedColumns: Set[String] =
data.columns.toSet + expectedColumn1 + expectedColumn2 + expectedColumn3 +
expectedColumn4 + expectedColumn5 + expectedColumn6
expectedColumn4 + expectedColumn5 + expectedColumn6 + expectedColumn7 + expectedColumn8
assert(resultData.columns.toSet == expectedColumns)

val rowLevel1 = resultData.select(expectedColumn1).collect().map(r => r.getAs[Any](0))
Expand All @@ -446,6 +473,14 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec
// filtered rows 4, 5, 6 (where item < 4)
val rowLevel6 = resultData.select(expectedColumn6).collect().map(r => r.getAs[Any](0))
assert(Seq(true, false, false, null, null, null).sameElements(rowLevel6))

// filtered rows 4, 5, 6 (where item < 4)
val rowLevel7 = resultData.select(expectedColumn7).collect().map(r => r.getAs[Any](0))
assert(Seq(true, true, true, null, null, null).sameElements(rowLevel7))

// filtered rows 4, 5, 6 (where item < 4) row 1 and 3 are the same -> not unique
val rowLevel8 = resultData.select(expectedColumn8).collect().map(r => r.getAs[Any](0))
assert(Seq(false, true, false, null, null, null).sameElements(rowLevel8))
}

"generate a result that contains compliance row-level results " in withSparkSession { session =>
Expand Down

0 comments on commit b3000e2

Please sign in to comment.