From b474a3703414b3fd2a6d559015ab60d84fa2709e Mon Sep 17 00:00:00 2001
From: zeotuan <48720253+zeotuan@users.noreply.github.com>
Date: Tue, 7 May 2024 04:26:51 +1000
Subject: [PATCH] Configurable RetainCompletenessRule (#564)

* Configurable RetainCompletenessRule

* Add doc string

* Add default completeness const
---
 .../rules/RetainCompletenessRule.scala        | 17 ++++++++++---
 .../rules/ConstraintRulesTest.scala           | 25 +++++++++++++++++++
 2 files changed, 39 insertions(+), 3 deletions(-)

diff --git a/src/main/scala/com/amazon/deequ/suggestions/rules/RetainCompletenessRule.scala b/src/main/scala/com/amazon/deequ/suggestions/rules/RetainCompletenessRule.scala
index 67ae61f92..9f995a112 100644
--- a/src/main/scala/com/amazon/deequ/suggestions/rules/RetainCompletenessRule.scala
+++ b/src/main/scala/com/amazon/deequ/suggestions/rules/RetainCompletenessRule.scala
@@ -20,17 +20,23 @@ import com.amazon.deequ.constraints.Constraint.completenessConstraint
 import com.amazon.deequ.profiles.ColumnProfile
 import com.amazon.deequ.suggestions.CommonConstraintSuggestion
 import com.amazon.deequ.suggestions.ConstraintSuggestion
+import com.amazon.deequ.suggestions.rules.RetainCompletenessRule._
 
 import scala.math.BigDecimal.RoundingMode
 
 /**
   * If a column is incomplete in the sample, we model its completeness as a binomial variable,
   * estimate a confidence interval and use this to define a lower bound for the completeness
+  *
+  * @param minCompleteness : minimum completeness threshold to determine if rule should be applied
+  * @param maxCompleteness : maximum completeness threshold to determine if rule should be applied
   */
-case class RetainCompletenessRule() extends ConstraintRule[ColumnProfile] {
-
+case class RetainCompletenessRule(
+  minCompleteness: Double = defaultMinCompleteness,
+  maxCompleteness: Double = defaultMaxCompleteness
+) extends ConstraintRule[ColumnProfile] {
   override def shouldBeApplied(profile: ColumnProfile, numRecords: Long): Boolean = {
-    profile.completeness > 0.2 && profile.completeness < 1.0
+    profile.completeness > minCompleteness && profile.completeness < maxCompleteness
   }
 
   override def candidate(profile: ColumnProfile, numRecords: Long): ConstraintSuggestion = {
@@ -65,3 +71,8 @@ case class RetainCompletenessRule() extends ConstraintRule[ColumnProfile] {
     "we model its completeness as a binomial variable, estimate a confidence interval " +
     "and use this to define a lower bound for the completeness"
 }
+
+object RetainCompletenessRule {
+  private val defaultMinCompleteness: Double = 0.2
+  private val defaultMaxCompleteness: Double = 1.0
+}
diff --git a/src/test/scala/com/amazon/deequ/suggestions/rules/ConstraintRulesTest.scala b/src/test/scala/com/amazon/deequ/suggestions/rules/ConstraintRulesTest.scala
index 075247932..701a5d983 100644
--- a/src/test/scala/com/amazon/deequ/suggestions/rules/ConstraintRulesTest.scala
+++ b/src/test/scala/com/amazon/deequ/suggestions/rules/ConstraintRulesTest.scala
@@ -130,9 +130,14 @@ class ConstraintRulesTest extends WordSpec with FixtureSupport with SparkContext
     "be applied correctly" in {
 
       val complete = StandardColumnProfile("col1", 1.0, 100, String, false, Map.empty, None)
+      val tenPercent = StandardColumnProfile("col1", 0.1, 100, String, false, Map.empty, None)
       val incomplete = StandardColumnProfile("col1", .25, 100, String, false, Map.empty, None)
 
       assert(!RetainCompletenessRule().shouldBeApplied(complete, 1000))
+      assert(!RetainCompletenessRule(0.05, 0.9).shouldBeApplied(complete, 1000))
+      assert(RetainCompletenessRule(0.05, 0.9).shouldBeApplied(tenPercent, 1000))
+      assert(RetainCompletenessRule(0.0).shouldBeApplied(tenPercent, 1000))
+      assert(RetainCompletenessRule(0.0).shouldBeApplied(incomplete, 1000))
       assert(RetainCompletenessRule().shouldBeApplied(incomplete, 1000))
     }
 
@@ -183,6 +188,26 @@ class ConstraintRulesTest extends WordSpec with FixtureSupport with SparkContext
 
       assert(metricResult.value.isSuccess)
     }
+
+    "return evaluable constraint candidates with custom min/max completeness" in
+      withSparkSession { session =>
+
+        val dfWithColumnCandidate = getDfFull(session)
+
+        val fakeColumnProfile = getFakeColumnProfileWithNameAndCompleteness("att1", 0.5)
+
+        val check = Check(CheckLevel.Warning, "some")
+          .addConstraint(RetainCompletenessRule(0.4, 0.6).candidate(fakeColumnProfile, 100).constraint)
+
+        val verificationResult = VerificationSuite()
+          .onData(dfWithColumnCandidate)
+          .addCheck(check)
+          .run()
+
+        val metricResult = verificationResult.metrics.head._2
+
+        assert(metricResult.value.isSuccess)
+      }
   }
 
   "UniqueIfApproximatelyUniqueRule" should {