Skip to content

Commit

Permalink
Configurable RetainCompletenessRule (awslabs#564)
Browse files Browse the repository at this point in the history
* Configurable RetainCompletenessRule

* Add doc string

* Add default completeness const
  • Loading branch information
zeotuan authored and eycho-am committed Oct 9, 2024
1 parent ed5a164 commit b474a37
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,23 @@ import com.amazon.deequ.constraints.Constraint.completenessConstraint
import com.amazon.deequ.profiles.ColumnProfile
import com.amazon.deequ.suggestions.CommonConstraintSuggestion
import com.amazon.deequ.suggestions.ConstraintSuggestion
import com.amazon.deequ.suggestions.rules.RetainCompletenessRule._

import scala.math.BigDecimal.RoundingMode

/**
* If a column is incomplete in the sample, we model its completeness as a binomial variable,
* estimate a confidence interval and use this to define a lower bound for the completeness
*
* @param minCompleteness : minimum completeness threshold to determine if rule should be applied
* @param maxCompleteness : maximum completeness threshold to determine if rule should be applied
*/
case class RetainCompletenessRule() extends ConstraintRule[ColumnProfile] {

case class RetainCompletenessRule(
minCompleteness: Double = defaultMinCompleteness,
maxCompleteness: Double = defaultMaxCompleteness
) extends ConstraintRule[ColumnProfile] {
override def shouldBeApplied(profile: ColumnProfile, numRecords: Long): Boolean = {
profile.completeness > 0.2 && profile.completeness < 1.0
profile.completeness > minCompleteness && profile.completeness < maxCompleteness
}

override def candidate(profile: ColumnProfile, numRecords: Long): ConstraintSuggestion = {
Expand Down Expand Up @@ -65,3 +71,8 @@ case class RetainCompletenessRule() extends ConstraintRule[ColumnProfile] {
"we model its completeness as a binomial variable, estimate a confidence interval " +
"and use this to define a lower bound for the completeness"
}

object RetainCompletenessRule {
private val defaultMinCompleteness: Double = 0.2
private val defaultMaxCompleteness: Double = 1.0
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,14 @@ class ConstraintRulesTest extends WordSpec with FixtureSupport with SparkContext
"be applied correctly" in {

val complete = StandardColumnProfile("col1", 1.0, 100, String, false, Map.empty, None)
val tenPercent = StandardColumnProfile("col1", 0.1, 100, String, false, Map.empty, None)
val incomplete = StandardColumnProfile("col1", .25, 100, String, false, Map.empty, None)

assert(!RetainCompletenessRule().shouldBeApplied(complete, 1000))
assert(!RetainCompletenessRule(0.05, 0.9).shouldBeApplied(complete, 1000))
assert(RetainCompletenessRule(0.05, 0.9).shouldBeApplied(tenPercent, 1000))
assert(RetainCompletenessRule(0.0).shouldBeApplied(tenPercent, 1000))
assert(RetainCompletenessRule(0.0).shouldBeApplied(incomplete, 1000))
assert(RetainCompletenessRule().shouldBeApplied(incomplete, 1000))
}

Expand Down Expand Up @@ -183,6 +188,26 @@ class ConstraintRulesTest extends WordSpec with FixtureSupport with SparkContext

assert(metricResult.value.isSuccess)
}

"return evaluable constraint candidates with custom min/max completeness" in
withSparkSession { session =>

val dfWithColumnCandidate = getDfFull(session)

val fakeColumnProfile = getFakeColumnProfileWithNameAndCompleteness("att1", 0.5)

val check = Check(CheckLevel.Warning, "some")
.addConstraint(RetainCompletenessRule(0.4, 0.6).candidate(fakeColumnProfile, 100).constraint)

val verificationResult = VerificationSuite()
.onData(dfWithColumnCandidate)
.addCheck(check)
.run()

val metricResult = verificationResult.metrics.head._2

assert(metricResult.value.isSuccess)
}
}

"UniqueIfApproximatelyUniqueRule" should {
Expand Down

0 comments on commit b474a37

Please sign in to comment.