Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make addAnomalyCheck not requiring State type #585

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@
<version>1.0.0</version>
<configuration>
<verbose>false</verbose>
<failOnViolation>true</failOnViolation>
<failOnViolation>false</failOnViolation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<failOnWarning>false</failOnWarning>
<sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>
Expand Down
24 changes: 20 additions & 4 deletions src/main/scala/com/amazon/deequ/VerificationRunBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,22 @@ class VerificationRunBuilderWithRepository(
this
}

/**
* Add a check using Anomaly Detection methods. The Anomaly Detection Strategy only checks
* if the new value is an Anomaly.
*
* @param anomalyDetectionStrategy The anomaly detection strategy
* @param analyzer The analyzer for the metric to run anomaly detection on
* @param anomalyCheckConfig Some configuration settings for the Check
*/
def addAnomalyCheck[S <: State[S]](
anomalyDetectionStrategy: AnomalyDetectionStrategy,
analyzer: Analyzer[S, Metric[Double]],
anomalyCheckConfig: Option[AnomalyCheckConfig] = None)
: this.type = {
addAnomalyCheck_(anomalyDetectionStrategy, analyzer, anomalyCheckConfig)
}

/**
* Add a check using Anomaly Detection methods. The Anomaly Detection Strategy only checks
* if the new value is an Anomaly.
Expand All @@ -223,9 +239,9 @@ class VerificationRunBuilderWithRepository(
* @param analyzer The analyzer for the metric to run anomaly detection on
* @param anomalyCheckConfig Some configuration settings for the Check
*/
def addAnomalyCheck[S <: State[S]](
def addAnomalyCheck_(
anomalyDetectionStrategy: AnomalyDetectionStrategy,
analyzer: Analyzer[S, Metric[Double]],
analyzer: Analyzer[_ <: State[_], Metric[Double]],
anomalyCheckConfig: Option[AnomalyCheckConfig] = None)
: this.type = {

Expand Down Expand Up @@ -298,10 +314,10 @@ private[this] object VerificationRunBuilderHelper {
* @param analyzer The analyzer for the metric to run anomaly detection on
* @param anomalyCheckConfig Some configuration settings for the Check
*/
def getAnomalyCheck[S <: State[S]](
def getAnomalyCheck(
metricsRepository: MetricsRepository,
anomalyDetectionStrategy: AnomalyDetectionStrategy,
analyzer: Analyzer[S, Metric[Double]],
analyzer: Analyzer[_ <: State[_], Metric[Double]],
anomalyCheckConfig: AnomalyCheckConfig)
: Check = {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ private[deequ] class Applicability(session: SparkSession) {
case (name, nc: ConstraintDecorator) => name -> nc.inner
case (name, c: Constraint) => name -> c
}
.collect { case (name, constraint: AnalysisBasedConstraint[_, _, _]) =>
.collect { case (name, constraint: AnalysisBasedConstraint[_, _]) =>
val metric = constraint.analyzer.calculate(data).value
name -> metric
}
Expand Down
12 changes: 6 additions & 6 deletions src/main/scala/com/amazon/deequ/checks/Check.scala
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ case class Check(
matchColumnMappings: Option[Map[String, String]] = None,
hint: Option[String] = None): Check = {
val dataMatchAnalyzer = DatasetMatchAnalyzer(otherDataset, keyColumnMappings, assertion, matchColumnMappings)
val constraint = AnalysisBasedConstraint[DatasetMatchState, Double, Double](dataMatchAnalyzer, assertion,
val constraint = AnalysisBasedConstraint[Double, Double](dataMatchAnalyzer, assertion,
hint = hint)
addConstraint(constraint)
}
Expand Down Expand Up @@ -563,10 +563,10 @@ case class Check(
* could have failed
* @return
*/
private[deequ] def isNewestPointNonAnomalous[S <: State[S]](
private[deequ] def isNewestPointNonAnomalous(
metricsRepository: MetricsRepository,
anomalyDetectionStrategy: AnomalyDetectionStrategy,
analyzer: Analyzer[S, Metric[Double]],
analyzer: Analyzer[_ <: State[_], Metric[Double]],
withTagValues: Map[String, String],
afterDate: Option[Long],
beforeDate: Option[Long],
Expand Down Expand Up @@ -1262,7 +1262,7 @@ case class Check(
case c: Constraint => c
}
.collect {
case constraint: AnalysisBasedConstraint[_, _, _] => constraint.analyzer
case constraint: AnalysisBasedConstraint[_, _] => constraint.analyzer
}
.map { _.asInstanceOf[Analyzer[_, Metric[_]]] }
.toSet
Expand Down Expand Up @@ -1297,10 +1297,10 @@ object Check {
* @param currentMetricValue current metric value
* @return
*/
private[deequ] def isNewestPointNonAnomalous[S <: State[S]](
private[deequ] def isNewestPointNonAnomalous(
metricsRepository: MetricsRepository,
anomalyDetectionStrategy: AnomalyDetectionStrategy,
analyzer: Analyzer[S, Metric[Double]],
analyzer: Analyzer[_ <: State[_], Metric[Double]],
withTagValues: Map[String, String],
afterDate: Option[Long],
beforeDate: Option[Long])(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import scala.util.{Failure, Success, Try}
* @tparam V : Type of the value being used in assertion function
*
*/
private[deequ] case class AnalysisBasedConstraint[S <: State[S], M, V](
analyzer: Analyzer[S, Metric[M]],
private[deequ] case class AnalysisBasedConstraint[M, V](
analyzer: Analyzer[_ <: State[_], Metric[M]],
private[deequ] val assertion: V => Boolean,
private[deequ] val valuePicker: Option[M => V] = None,
private[deequ] val hint: Option[String] = None)
Expand Down
56 changes: 28 additions & 28 deletions src/main/scala/com/amazon/deequ/constraints/Constraint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ object Constraint {
}

def fromAnalyzer(size: Size, assertion: Long => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[NumMatches, Double, Long](size,
val constraint = AnalysisBasedConstraint[Double, Long](size,
assertion, Some(_.toLong), hint)

new NamedConstraint(constraint, s"SizeConstraint($size)")
Expand All @@ -135,7 +135,7 @@ object Constraint {


def fromAnalyzer(colCount: ColumnCount, assertion: Long => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[NumMatches, Double, Long](colCount, assertion, Some(_.toLong), hint)
val constraint = AnalysisBasedConstraint[Double, Long](colCount, assertion, Some(_.toLong), hint)

new NamedConstraint(constraint, name = s"ColumnCountConstraint($colCount)")
}
Expand All @@ -162,7 +162,7 @@ object Constraint {

val histogram = Histogram(column, binningUdf, maxBins, where)

val constraint = AnalysisBasedConstraint[FrequenciesAndNumRows, Distribution, Distribution](
val constraint = AnalysisBasedConstraint[Distribution, Distribution](
histogram, assertion, hint = hint)

new NamedConstraint(constraint, s"HistogramConstraint($histogram)")
Expand Down Expand Up @@ -191,7 +191,7 @@ object Constraint {

val histogram = Histogram(column, binningUdf, maxBins, where, computeFrequenciesAsRatio)

val constraint = AnalysisBasedConstraint[FrequenciesAndNumRows, Distribution, Long](
val constraint = AnalysisBasedConstraint[Distribution, Long](
histogram, assertion, Some(_.numberOfBins), hint)

new NamedConstraint(constraint, s"HistogramBinConstraint($histogram)")
Expand Down Expand Up @@ -222,7 +222,7 @@ object Constraint {
def fromAnalyzer(completeness: Completeness,
assertion: Double => Boolean,
hint: Option[String] = None): Constraint = {
val constraint = AnalysisBasedConstraint[NumMatchesAndCount, Double, Double](
val constraint = AnalysisBasedConstraint[Double, Double](
completeness, assertion, hint = hint)

new RowLevelConstraint(constraint, s"CompletenessConstraint($completeness)", s"Completeness-${completeness.column}")
Expand All @@ -236,13 +236,13 @@ object Constraint {
* (since the metric is double metric) and returns a boolean
* @param hint A hint to provide additional context why a constraint could have failed
*/
def anomalyConstraint[S <: State[S]](
analyzer: Analyzer[S, Metric[Double]],
def anomalyConstraint(
analyzer: Analyzer[_ <: State[_], Metric[Double]],
anomalyAssertion: Double => Boolean,
hint: Option[String] = None)
: Constraint = {

val constraint = AnalysisBasedConstraint[S, Double, Double](analyzer, anomalyAssertion,
val constraint = AnalysisBasedConstraint[Double, Double](analyzer, anomalyAssertion,
hint = hint)

new NamedConstraint(constraint, s"AnomalyConstraint($analyzer)")
Expand Down Expand Up @@ -272,7 +272,7 @@ object Constraint {
}

def fromAnalyzer(uniqueness: Uniqueness, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[FrequenciesAndNumRows, Double, Double](
val constraint = AnalysisBasedConstraint[Double, Double](
uniqueness, assertion, hint = hint)

new RowLevelGroupedConstraint(constraint,
Expand Down Expand Up @@ -302,7 +302,7 @@ object Constraint {
}

def fromAnalyzer(distinctness: Distinctness, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[FrequenciesAndNumRows, Double, Double](
val constraint = AnalysisBasedConstraint[Double, Double](
distinctness, assertion, hint = hint)

new NamedConstraint(constraint, s"DistinctnessConstraint($distinctness)")
Expand Down Expand Up @@ -335,7 +335,7 @@ object Constraint {
assertion: Double => Boolean,
hint: Option[String])
: Constraint = {
val constraint = AnalysisBasedConstraint[FrequenciesAndNumRows, Double, Double](
val constraint = AnalysisBasedConstraint[Double, Double](
uniqueValueRatio, assertion, hint = hint)

new RowLevelGroupedConstraint(constraint,
Expand Down Expand Up @@ -368,7 +368,7 @@ object Constraint {
}

private def fromAnalyzer(compliance: Compliance, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[NumMatchesAndCount, Double, Double](
val constraint = AnalysisBasedConstraint[Double, Double](
compliance, assertion, hint = hint)

new RowLevelConstraint(
Expand Down Expand Up @@ -409,7 +409,7 @@ object Constraint {
name: Option[String],
hint: Option[String]): Constraint = {
val column: String = patternMatch.column
val constraint = AnalysisBasedConstraint[NumMatchesAndCount, Double, Double](
val constraint = AnalysisBasedConstraint[Double, Double](
patternMatch, assertion, hint = hint)

val constraintName = name match {
Expand Down Expand Up @@ -442,7 +442,7 @@ object Constraint {
}

def fromAnalyzer(entropy: Entropy, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[FrequenciesAndNumRows, Double, Double](
val constraint = AnalysisBasedConstraint[Double, Double](
entropy, assertion, hint = hint)

new NamedConstraint(constraint, s"EntropyConstraint($entropy)")
Expand Down Expand Up @@ -476,7 +476,7 @@ object Constraint {
assertion: Double => Boolean,
hint: Option[String])
: Constraint = {
val constraint = AnalysisBasedConstraint[FrequenciesAndNumRows, Double, Double](
val constraint = AnalysisBasedConstraint[Double, Double](
mutualInformation, assertion, hint = hint)

new NamedConstraint(constraint, s"MutualInformationConstraint($mutualInformation)")
Expand Down Expand Up @@ -506,7 +506,7 @@ object Constraint {
}

def fromAnalyzer(approxQuantile: ApproxQuantile, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[ApproxQuantileState, Double, Double](
val constraint = AnalysisBasedConstraint[Double, Double](
approxQuantile, assertion, hint = hint)

new NamedConstraint(constraint, s"ApproxQuantileConstraint($approxQuantile)")
Expand Down Expand Up @@ -536,7 +536,7 @@ object Constraint {
}

def fromAnalyzer(exactQuantile: ExactQuantile, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[ExactQuantileState, Double, Double](
val constraint = AnalysisBasedConstraint[Double, Double](
exactQuantile, assertion, hint = hint)

new NamedConstraint(constraint, s"ExactQuantileConstraint($exactQuantile)")
Expand Down Expand Up @@ -565,7 +565,7 @@ object Constraint {

def fromAnalyzer(maxLength: MaxLength, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val column: String = maxLength.column
val constraint = AnalysisBasedConstraint[MaxState, Double, Double](maxLength, assertion,
val constraint = AnalysisBasedConstraint[Double, Double](maxLength, assertion,
hint = hint)

val updatedAssertion = getUpdatedRowLevelAssertionForLengthConstraint(assertion, maxLength.analyzerOptions)
Expand Down Expand Up @@ -601,7 +601,7 @@ object Constraint {

def fromAnalyzer(minLength: MinLength, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val column: String = minLength.column
val constraint = AnalysisBasedConstraint[MinState, Double, Double](minLength, assertion,
val constraint = AnalysisBasedConstraint[Double, Double](minLength, assertion,
hint = hint)

val updatedAssertion = getUpdatedRowLevelAssertionForLengthConstraint(assertion, minLength.analyzerOptions)
Expand Down Expand Up @@ -638,7 +638,7 @@ object Constraint {

def fromAnalyzer(minimum: Minimum, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val column: String = minimum.column
val constraint = AnalysisBasedConstraint[MinState, Double, Double](minimum, assertion,
val constraint = AnalysisBasedConstraint[Double, Double](minimum, assertion,
hint = hint)

val updatedAssertion = getUpdatedRowLevelAssertion(assertion, minimum.analyzerOptions)
Expand Down Expand Up @@ -674,7 +674,7 @@ object Constraint {

def fromAnalyzer(maximum: Maximum, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val column: String = maximum.column
val constraint = AnalysisBasedConstraint[MaxState, Double, Double](maximum, assertion,
val constraint = AnalysisBasedConstraint[Double, Double](maximum, assertion,
hint = hint)

val updatedAssertion = getUpdatedRowLevelAssertion(assertion, maximum.analyzerOptions)
Expand Down Expand Up @@ -707,7 +707,7 @@ object Constraint {
}

def fromAnalyzer(mean: Mean, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[MeanState, Double, Double](mean, assertion,
val constraint = AnalysisBasedConstraint[Double, Double](mean, assertion,
hint = hint)

new NamedConstraint(constraint, s"MeanConstraint($mean)")
Expand All @@ -733,7 +733,7 @@ object Constraint {
}

def fromAnalyzer(sum: Sum, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[SumState, Double, Double](sum, assertion,
val constraint = AnalysisBasedConstraint[Double, Double](sum, assertion,
hint = hint)

new NamedConstraint(constraint, s"SumConstraint($sum)")
Expand Down Expand Up @@ -763,7 +763,7 @@ object Constraint {
assertion: Double => Boolean,
hint: Option[String])
: Constraint = {
val constraint = AnalysisBasedConstraint[StandardDeviationState, Double, Double](
val constraint = AnalysisBasedConstraint[Double, Double](
standardDeviation, assertion, hint = hint)

new NamedConstraint(constraint, s"StandardDeviationConstraint($standardDeviation)")
Expand Down Expand Up @@ -793,7 +793,7 @@ object Constraint {
assertion: Double => Boolean,
hint: Option[String])
: Constraint = {
val constraint = AnalysisBasedConstraint[ApproxCountDistinctState, Double, Double](
val constraint = AnalysisBasedConstraint[Double, Double](
approxCountDistinct, assertion, hint = hint)

new NamedConstraint(constraint, s"ApproxCountDistinctConstraint($approxCountDistinct)")
Expand Down Expand Up @@ -821,7 +821,7 @@ object Constraint {
}

def fromAnalyzer(correlation: Correlation, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[CorrelationState, Double, Double](
val constraint = AnalysisBasedConstraint[Double, Double](
correlation, assertion, hint = hint)

new NamedConstraint(constraint, s"CorrelationConstraint($correlation)")
Expand Down Expand Up @@ -859,7 +859,7 @@ object Constraint {
}
}

AnalysisBasedConstraint[DataTypeHistogram, Distribution, Double](DataType(column, where),
AnalysisBasedConstraint[Distribution, Double](DataType(column, where),
assertion, Some(valuePicker), hint)
}

Expand All @@ -884,7 +884,7 @@ object Constraint {
}

def fromAnalyzer(kllSketch: KLLSketch, assertion: BucketDistribution => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[KLLState, BucketDistribution, BucketDistribution](
val constraint = AnalysisBasedConstraint[BucketDistribution, BucketDistribution](
kllSketch, assertion, hint = hint)

new NamedConstraint(constraint, s"kllSketchConstraint($kllSketch)")
Expand Down
Loading