diff --git a/src/main/scala/com/amazon/deequ/checks/Check.scala b/src/main/scala/com/amazon/deequ/checks/Check.scala index bd62aafc..2537922b 100644 --- a/src/main/scala/com/amazon/deequ/checks/Check.scala +++ b/src/main/scala/com/amazon/deequ/checks/Check.scala @@ -256,10 +256,27 @@ case class Check( * @param hint A hint to provide additional context why a constraint could have failed * @return */ - def isPrimaryKey(column: String, hint: Option[String], columns: String*) + def isPrimaryKey(column: String, hint: Option[String], + analyzerOptions: Option[AnalyzerOptions], columns: String*) : CheckWithLastConstraintFilterable = { addFilterableConstraint { filter => - uniquenessConstraint(column :: columns.toList, Check.IsOne, filter, hint) } + uniquenessConstraint(column :: columns.toList, Check.IsOne, filter, hint, analyzerOptions) } + } + + /** + * Creates a constraint that asserts on a column(s) primary key characteristics. + * Currently only checks uniqueness, but reserved for primary key checks if there is another + * assertion to run on primary key columns. + * + * @param column Columns to run the assertion on + * @param hint A hint to provide additional context why a constraint could have failed + * @return + */ + def isPrimaryKey(column: String, hint: Option[String], columns: String*) + : CheckWithLastConstraintFilterable = { + addFilterableConstraint { filter => + uniquenessConstraint(column :: columns.toList, Check.IsOne, filter, hint) + } } /** @@ -380,16 +397,18 @@ case class Check( * @param assertion Function that receives a double input parameter and returns a boolean. * Refers to the fraction of distinct values. * @param hint A hint to provide additional context why a constraint could have failed + * @param analyzerOptions Options to configure analyzer behavior (NullTreatment, FilteredRow) * @return */ def hasUniqueValueRatio( columns: Seq[String], assertion: Double => Boolean, - hint: Option[String] = None) + hint: Option[String] = None, + analyzerOptions: Option[AnalyzerOptions] = None) : CheckWithLastConstraintFilterable = { addFilterableConstraint { filter => - uniqueValueRatioConstraint(columns, assertion, filter, hint) } + uniqueValueRatioConstraint(columns, assertion, filter, hint, analyzerOptions) } } /** diff --git a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala index 832aa951..7e7ea5a3 100644 --- a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala +++ b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala @@ -304,15 +304,17 @@ object Constraint { * (since the metric is double metric) and returns a boolean * @param where Additional filter to apply before the analyzer is run. * @param hint A hint to provide additional context why a constraint could have failed + * @param analyzerOptions Options to configure analyzer behavior (NullTreatment, FilteredRow) */ def uniqueValueRatioConstraint( columns: Seq[String], assertion: Double => Boolean, where: Option[String] = None, - hint: Option[String] = None) + hint: Option[String] = None, + analyzerOptions: Option[AnalyzerOptions] = None) : Constraint = { - val uniqueValueRatio = UniqueValueRatio(columns, where) + val uniqueValueRatio = UniqueValueRatio(columns, where, analyzerOptions) fromAnalyzer(uniqueValueRatio, assertion, hint) } diff --git a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala index 6d27066d..8de81117 100644 --- a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala +++ b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala @@ -324,6 +324,12 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec val patternMatch = new Check(CheckLevel.Error, "rule6") .hasPattern("att2", """(^f)""".r) .where("item < 4") + val isPrimaryKey = new Check(CheckLevel.Error, "rule7") + .isPrimaryKey("item") + .where("item < 3") + val uniqueValueRatio = new Check(CheckLevel.Error, "rule8") + .hasUniqueValueRatio(Seq("att1"), _ >= 0.5) + .where("item < 4") val expectedColumn1 = completeness.description val expectedColumn2 = uniqueness.description @@ -331,7 +337,8 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec val expectedColumn4 = min.description val expectedColumn5 = max.description val expectedColumn6 = patternMatch.description - + val expectedColumn7 = isPrimaryKey.description + val expectedColumn8 = uniqueValueRatio.description val suite = new VerificationSuite().onData(data) .addCheck(completeness) @@ -340,6 +347,8 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec .addCheck(min) .addCheck(max) .addCheck(patternMatch) + .addCheck(isPrimaryKey) + .addCheck(uniqueValueRatio) val result: VerificationResult = suite.run() @@ -349,7 +358,7 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec resultData.show(false) val expectedColumns: Set[String] = data.columns.toSet + expectedColumn1 + expectedColumn2 + expectedColumn3 + - expectedColumn4 + expectedColumn5 + expectedColumn6 + expectedColumn4 + expectedColumn5 + expectedColumn6 + expectedColumn7 + expectedColumn8 assert(resultData.columns.toSet == expectedColumns) // filtered rows 2,5 (where att1 = "a") @@ -374,6 +383,14 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec // filtered rows 4, 5, 6 (where item < 4) val rowLevel6 = resultData.select(expectedColumn6).collect().map(r => r.getAs[Any](0)) assert(Seq(true, false, false, true, true, true).sameElements(rowLevel6)) + + // filtered rows 4, 5, 6 (where item < 4) + val rowLevel7 = resultData.select(expectedColumn7).collect().map(r => r.getAs[Any](0)) + assert(Seq(true, true, true, true, true, true).sameElements(rowLevel7)) + + // filtered rows 4, 5, 6 (where item < 4) row 1 and 3 are the same -> not unique + val rowLevel8 = resultData.select(expectedColumn8).collect().map(r => r.getAs[Any](0)) + assert(Seq(false, true, false, true, true, true).sameElements(rowLevel8)) } "generate a result that contains row-level results with null for filtered rows" in withSparkSession { session => @@ -398,6 +415,12 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec val patternMatch = new Check(CheckLevel.Error, "rule6") .hasPattern("att2", """(^f)""".r, analyzerOptions = analyzerOptions) .where("item < 4") + val isPrimaryKey = new Check(CheckLevel.Error, "rule7") + .isPrimaryKey("item", None, analyzerOptions = analyzerOptions) + .where("item < 4") + val uniqueValueRatio = new Check(CheckLevel.Error, "rule8") + .hasUniqueValueRatio(Seq("att1"), _ >= 0.5, analyzerOptions = analyzerOptions) + .where("item < 4") val expectedColumn1 = completeness.description val expectedColumn2 = uniqueness.description @@ -405,6 +428,8 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec val expectedColumn4 = min.description val expectedColumn5 = max.description val expectedColumn6 = patternMatch.description + val expectedColumn7 = isPrimaryKey.description + val expectedColumn8 = uniqueValueRatio.description val suite = new VerificationSuite().onData(data) .addCheck(completeness) @@ -413,6 +438,8 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec .addCheck(min) .addCheck(max) .addCheck(patternMatch) + .addCheck(isPrimaryKey) + .addCheck(uniqueValueRatio) val result: VerificationResult = suite.run() @@ -422,7 +449,7 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec resultData.show(false) val expectedColumns: Set[String] = data.columns.toSet + expectedColumn1 + expectedColumn2 + expectedColumn3 + - expectedColumn4 + expectedColumn5 + expectedColumn6 + expectedColumn4 + expectedColumn5 + expectedColumn6 + expectedColumn7 + expectedColumn8 assert(resultData.columns.toSet == expectedColumns) val rowLevel1 = resultData.select(expectedColumn1).collect().map(r => r.getAs[Any](0)) @@ -446,6 +473,14 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec // filtered rows 4, 5, 6 (where item < 4) val rowLevel6 = resultData.select(expectedColumn6).collect().map(r => r.getAs[Any](0)) assert(Seq(true, false, false, null, null, null).sameElements(rowLevel6)) + + // filtered rows 4, 5, 6 (where item < 4) + val rowLevel7 = resultData.select(expectedColumn7).collect().map(r => r.getAs[Any](0)) + assert(Seq(true, true, true, null, null, null).sameElements(rowLevel7)) + + // filtered rows 4, 5, 6 (where item < 4) row 1 and 3 are the same -> not unique + val rowLevel8 = resultData.select(expectedColumn8).collect().map(r => r.getAs[Any](0)) + assert(Seq(false, true, false, null, null, null).sameElements(rowLevel8)) } "generate a result that contains compliance row-level results " in withSparkSession { session =>