From bff1d91d5805dd516b3e894dc1d29860b33b70c4 Mon Sep 17 00:00:00 2001 From: Hubert Date: Tue, 5 Dec 2023 22:33:55 -0500 Subject: [PATCH 1/2] exposing anomaly thresholds, refactored anomaly strategy to output all data points with anomaly detection details, passed that through the constraint into the constraint result as anomaly detection metadata field --- .../anomalydetection/DetectionResult.scala | 107 ++++++++++++-- .../constraints/AnomalyBasedConstraint.scala | 132 ++++++++++++++++++ .../AnomalyBasedConstraintTest.scala | 5 + 3 files changed, 230 insertions(+), 14 deletions(-) create mode 100644 src/main/scala/com/amazon/deequ/constraints/AnomalyBasedConstraint.scala create mode 100644 src/test/scala/com/amazon/deequ/constraints/AnomalyBasedConstraintTest.scala diff --git a/src/main/scala/com/amazon/deequ/anomalydetection/DetectionResult.scala b/src/main/scala/com/amazon/deequ/anomalydetection/DetectionResult.scala index 01c5f171c..6f0ab9f5d 100644 --- a/src/main/scala/com/amazon/deequ/anomalydetection/DetectionResult.scala +++ b/src/main/scala/com/amazon/deequ/anomalydetection/DetectionResult.scala @@ -16,24 +16,47 @@ package com.amazon.deequ.anomalydetection -class Anomaly( - val value: Option[Double], - val confidence: Double, - val detail: Option[String] = None) { + +/** + * Anomaly Detection Data Point class (previously Anomaly) + * + * @param dataMetricValue The metric value that is the data point + * @param anomalyMetricValue The metric value that is being used in the anomaly calculation. + * This usually aligns with dataMetricValue but not always, + * like in a rate of change strategy where the rate of change is the anomaly metric + * which may not equal the actual data point value + * @param anomalyThreshold The thresholds used in the anomaly check, the anomalyMetricValue is + * compared to this threshold + * @param isAnomaly If the data point is an anomaly + * @param confidence TODO fill in more info about this + * @param detail Detailed error message + */ +class AnomalyDetectionDataPoint( + val dataMetricValue: Double, + val anomalyMetricValue: Double, + val anomalyThreshold: AnomalyThreshold, + val isAnomaly: Boolean, + val confidence: Double, + val detail: Option[String]) { def canEqual(that: Any): Boolean = { - that.isInstanceOf[Anomaly] + that.isInstanceOf[AnomalyDetectionDataPoint] } /** - * Tests anomalies for equality. Ignores detailed explanation. + * Tests anomalyDetectionDataPoints for equality. Ignores detailed explanation. * * @param obj The object/ anomaly to compare against - * @return true, if and only if the value and confidence are the same + * @return true, if and only if the dataMetricValue, anomalyMetricValue, anomalyThreshold, isAnomaly + * and confidence are the same */ override def equals(obj: Any): Boolean = { obj match { - case anomaly: Anomaly => anomaly.value == value && anomaly.confidence == confidence + case anomaly: AnomalyDetectionDataPoint => anomaly.dataMetricValue == dataMetricValue && + anomaly.anomalyMetricValue == anomalyMetricValue && + anomaly.anomalyThreshold == anomalyThreshold && + anomaly.isAnomaly == isAnomaly && + anomaly.confidence == confidence case _ => false } } @@ -41,16 +64,72 @@ class Anomaly( override def hashCode: Int = { val prime = 31 var result = 1 - result = prime * result + (if (value == null) 0 else value.hashCode) - prime * result + confidence.hashCode + result = prime * result + dataMetricValue.hashCode() + result = prime * result + anomalyMetricValue.hashCode() + result = prime * result + anomalyThreshold.hashCode() + result = prime * result + isAnomaly.hashCode() + result = prime * result + confidence.hashCode() + result } } -object Anomaly { - def apply(value: Option[Double], confidence: Double, detail: Option[String] = None): Anomaly = { - new Anomaly(value, confidence, detail) +object AnomalyDetectionDataPoint { + def apply(dataMetricValue: Double, anomalyMetricValue: Double, + anomalyThreshold: AnomalyThreshold = AnomalyThreshold(), isAnomaly: Boolean = false, + confidence: Double, detail: Option[String] = None + ): AnomalyDetectionDataPoint = { + new AnomalyDetectionDataPoint(dataMetricValue, anomalyMetricValue, anomalyThreshold, isAnomaly, confidence, detail) } } -case class DetectionResult(anomalies: Seq[(Long, Anomaly)] = Seq.empty) + +/** + * AnomalyThreshold class + * Defines threshold for the anomaly detection, defaults to inclusive bounds of Double.Min and Double.Max + * @param upperBound The upper bound or threshold + * @param lowerBound The lower bound or threshold + */ +case class AnomalyThreshold(lowerBound: Bound = Bound(Double.MinValue), upperBound: Bound = Bound(Double.MaxValue)) + +/** + * Bound Class + * Class representing a threshold/bound, with value and inclusive/exclusive boolean + * @param value The value of the bound as a Double + * @param inclusive Boolean indicating if the Bound is inclusive or not + */ +case class Bound(value: Double, inclusive: Boolean = true) + + + +/** + * AnomalyDetectionResult Class + * This class is returned from the detectAnomaliesInHistory function + * @param anomalyDetectionDataPointSequence The sequence of (timestamp, anomaly) pairs + */ +case class AnomalyDetectionResult(anomalyDetectionDataPointSequence: Seq[(Long, AnomalyDetectionDataPoint)] = Seq.empty) + +/** + * AnomalyDetectionAssertionResult Class + * This class is returned by the anomaly detection assertion function + * @param hasNoAnomaly Boolean indicating if anomaly was detected + * @param anomalyDetectionMetadata Anomaly Detection metadata class containing anomaly details + * about the data point being checked + */ +case class AnomalyDetectionAssertionResult(hasNoAnomaly: Boolean, anomalyDetectionMetadata: AnomalyDetectionMetadata) + + +/** + * AnomalyDetectionMetadata Class + * This class containst anomaly detection metadata and is currently an optional field + * in the ConstraintResult class that is exposed to users + * + * Currently, anomaly detection only runs on "newest" data point (referring to the dataframe being + * run on by the verification suite) and not multiple data points, so this metadata class only contains + * one anomalyDetectionDataPoint for now + * In the future, if we allow the anomaly check to detect multiple points, we can return the anomalyDetectionResult + * instead, which contains a sequence of (Long, AnomalyDetectionDataPoints) + * @param anomalyDetectionDataPoint Anomaly detection data point + */ +case class AnomalyDetectionMetadata(anomalyDetectionDataPoint: AnomalyDetectionDataPoint) + diff --git a/src/main/scala/com/amazon/deequ/constraints/AnomalyBasedConstraint.scala b/src/main/scala/com/amazon/deequ/constraints/AnomalyBasedConstraint.scala new file mode 100644 index 000000000..ae23a604e --- /dev/null +++ b/src/main/scala/com/amazon/deequ/constraints/AnomalyBasedConstraint.scala @@ -0,0 +1,132 @@ +/** + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License + * is located at + * + * http://aws.amazon.com/apache2.0/ + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.deequ.constraints + +import com.amazon.deequ.analyzers.{Analyzer, State} +import com.amazon.deequ.anomalydetection.{AnomalyAssertionResult, AnomalyThreshold, DetectionResult} +import com.amazon.deequ.metrics.Metric +import org.apache.spark.sql.DataFrame + +import scala.util.{Failure, Success, Try} + +/** + * Case class for anomaly based constraints that provides unified way to access + * AnalyzerContext and metrics stored in it. + * TODO this differs from AnalysisBasedConstraint only in that it uses an assertion function that + * TODO returns an AnomalyAssertionResult with a potential anomaly as well as the default boolean + * TODO figure out if it's better to use some inheritance/composition + * + * Runs the analysis and get the value of the metric returned by the analysis, + * picks the numeric value that will be used in the assertion function with metric picker + * runs the assertion. + * + * @param analyzer Analyzer to be run on the data frame + * @param assertion Assertion function that returns an AnomalyAssertionResult with a potential anomaly + * @param valuePicker Optional function to pick the interested part of the metric value that the + * assertion will be running on. Absence of such function means the metric + * value would be used in the assertion as it is. + * @param hint A hint to provide additional context why a constraint could have failed + * @tparam M : Type of the metric value generated by the Analyzer + * @tparam V : Type of the value being used in assertion function + * + */ +private[deequ] case class AnomalyBasedConstraint[S <: State[S], M, V]( + analyzer: Analyzer[S, Metric[M]], + private[deequ] val assertion: V => AnomalyAssertionResult, + private[deequ] val valuePicker: Option[M => V] = None, + private[deequ] val hint: Option[String] = None) + extends Constraint { + + private[deequ] def calculateAndEvaluate(data: DataFrame) = { + val metric = analyzer.calculate(data) + evaluate(Map(analyzer -> metric)) + } + + override def evaluate( + analysisResults: Map[Analyzer[_, Metric[_]], Metric[_]]) + : ConstraintResult = { + + val metric = analysisResults.get(analyzer).map(_.asInstanceOf[Metric[M]]) + + metric.map(pickValueAndAssert).getOrElse( + // Analysis is missing + ConstraintResult(this, ConstraintStatus.Failure, + message = Some(AnomalyBasedConstraint.MissingAnalysis), metric = metric) + ) + } + + private[this] def pickValueAndAssert(metric: Metric[M]): ConstraintResult = { + + metric.value match { + // Analysis done successfully and result metric is there + case Success(metricValue) => + try { + val assertOn = runPickerOnMetric(metricValue) + val anomalyAssertionResult = runAssertion(assertOn) + + if (anomalyAssertionResult.hasNoAnomaly) { + ConstraintResult(this, ConstraintStatus.Success, metric = Some(metric), + anomaly = anomalyAssertionResult.anomaly) + } else { + var errorMessage = s"Value: $assertOn does not meet the constraint requirement, has anomaly, check result!" + hint.foreach(hint => errorMessage += s" $hint") + + ConstraintResult(this, ConstraintStatus.Failure, Some(errorMessage), Some(metric), + anomaly = anomalyAssertionResult.anomaly) + } + + } catch { + case AnomalyBasedConstraint.ConstraintAssertionException(msg) => + ConstraintResult(this, ConstraintStatus.Failure, + message = Some(s"${AnomalyBasedConstraint.AssertionException}: $msg!"), metric = Some(metric)) + case AnomalyBasedConstraint.ValuePickerException(msg) => + ConstraintResult(this, ConstraintStatus.Failure, + message = Some(s"${AnomalyBasedConstraint.ProblematicMetricPicker}: $msg!"), metric = Some(metric)) + } + // An exception occurred during analysis + case Failure(e) => ConstraintResult(this, + ConstraintStatus.Failure, message = Some(e.getMessage), metric = Some(metric)) + } + } + + private def runPickerOnMetric(metricValue: M): V = + try { + valuePicker.map(function => function(metricValue)).getOrElse(metricValue.asInstanceOf[V]) + } catch { + case e: Exception => throw AnomalyBasedConstraint.ValuePickerException(e.getMessage) + } + + private def runAssertion(assertOn: V): AnomalyAssertionResult = + try { + assertion(assertOn) + } catch { + case e: Exception => throw AnomalyBasedConstraint.ConstraintAssertionException(e.getMessage) + } + + // 'assertion' and 'valuePicker' are lambdas we have to represent them like '' + override def toString: String = + s"AnomalyBasedConstraint($analyzer,,${valuePicker.map(_ => "")},$hint)" +} + +private[deequ] object AnomalyBasedConstraint { + val MissingAnalysis = "Missing Analysis, can't run the constraint!" + val ProblematicMetricPicker = "Can't retrieve the value to assert on" + val AssertionException = "Can't execute the assertion" + + private case class ValuePickerException(message: String) extends RuntimeException(message) + private case class ConstraintAssertionException(message: String) extends RuntimeException(message) +} diff --git a/src/test/scala/com/amazon/deequ/constraints/AnomalyBasedConstraintTest.scala b/src/test/scala/com/amazon/deequ/constraints/AnomalyBasedConstraintTest.scala new file mode 100644 index 000000000..8f5281048 --- /dev/null +++ b/src/test/scala/com/amazon/deequ/constraints/AnomalyBasedConstraintTest.scala @@ -0,0 +1,5 @@ +package com.amazon.deequ.constraints + +class AnomalyBasedConstraintTest { + +} From 847d522eaba94e0e276240ef7631c15e85d2a149 Mon Sep 17 00:00:00 2001 From: Hubert Date: Wed, 6 Dec 2023 10:34:44 -0500 Subject: [PATCH 2/2] exposing anomaly thresholds, refactored anomaly strategy to output all data points with anomaly detection details, passed that through the constraint into the constraint result as anomaly detection metadata field --- .../applicability/Applicability.scala | 12 +- .../AnomalyDetectionStrategy.scala | 2 +- .../anomalydetection/AnomalyDetector.scala | 12 +- .../anomalydetection/BaseChangeStrategy.scala | 29 +- .../BatchNormalStrategy.scala | 16 +- .../OnlineNormalStrategy.scala | 9 +- .../SimpleThresholdStrategy.scala | 14 +- .../seasonal/HoltWinters.scala | 30 +- .../scala/com/amazon/deequ/checks/Check.scala | 36 ++- .../constraints/AnomalyBasedConstraint.scala | 28 +- .../amazon/deequ/constraints/Constraint.scala | 27 +- .../amazon/deequ/VerificationSuiteTest.scala | 36 ++- .../AbsoluteChangeStrategyTest.scala | 139 ++++++--- .../AnomalyDetectorTest.scala | 46 ++- .../BatchNormalStrategyTest.scala | 80 +++-- .../OnlineNormalStrategyTest.scala | 118 ++++++-- .../RateOfChangeStrategyTest.scala | 23 +- .../RelativeRateOfChangeStrategyTest.scala | 120 ++++++-- .../SimpleThresholdStrategyTest.scala | 45 ++- .../seasonal/HoltWintersTest.scala | 16 +- .../deequ/checks/ApplicabilityTest.scala | 28 +- .../com/amazon/deequ/checks/CheckTest.scala | 100 ++++++- .../AnomalyBasedConstraintTest.scala | 282 +++++++++++++++++- .../deequ/constraints/ConstraintUtils.scala | 5 +- .../deequ/constraints/ConstraintsTest.scala | 26 +- 25 files changed, 1019 insertions(+), 260 deletions(-) diff --git a/src/main/scala/com/amazon/deequ/analyzers/applicability/Applicability.scala b/src/main/scala/com/amazon/deequ/analyzers/applicability/Applicability.scala index e2c282c14..2299c5e21 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/applicability/Applicability.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/applicability/Applicability.scala @@ -21,7 +21,7 @@ import java.util.Calendar import com.amazon.deequ.analyzers.{Analyzer, State} import com.amazon.deequ.checks.Check -import com.amazon.deequ.constraints.{AnalysisBasedConstraint, Constraint, ConstraintDecorator} +import com.amazon.deequ.constraints.{AnalysisBasedConstraint, AnomalyBasedConstraint, Constraint, ConstraintDecorator} import com.amazon.deequ.metrics.Metric import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Row, SparkSession} @@ -187,9 +187,13 @@ private[deequ] class Applicability(session: SparkSession) { case (name, nc: ConstraintDecorator) => name -> nc.inner case (name, c: Constraint) => name -> c } - .collect { case (name, constraint: AnalysisBasedConstraint[_, _, _]) => - val metric = constraint.analyzer.calculate(data).value - name -> metric + .collect { + case (name, constraint: AnalysisBasedConstraint[_, _, _]) => + val metric = constraint.analyzer.calculate(data).value + name -> metric + case (name, constraint: AnomalyBasedConstraint[_, _, _]) => + val metric = constraint.analyzer.calculate(data).value + name -> metric } val constraintApplicabilities = check.constraints.zip(namedMetrics).map { diff --git a/src/main/scala/com/amazon/deequ/anomalydetection/AnomalyDetectionStrategy.scala b/src/main/scala/com/amazon/deequ/anomalydetection/AnomalyDetectionStrategy.scala index 0c3f6805e..042983a0f 100644 --- a/src/main/scala/com/amazon/deequ/anomalydetection/AnomalyDetectionStrategy.scala +++ b/src/main/scala/com/amazon/deequ/anomalydetection/AnomalyDetectionStrategy.scala @@ -28,5 +28,5 @@ trait AnomalyDetectionStrategy { */ def detect( dataSeries: Vector[Double], - searchInterval: (Int, Int) = (0, Int.MaxValue)): Seq[(Int, Anomaly)] + searchInterval: (Int, Int) = (0, Int.MaxValue)): Seq[(Int, AnomalyDetectionDataPoint)] } diff --git a/src/main/scala/com/amazon/deequ/anomalydetection/AnomalyDetector.scala b/src/main/scala/com/amazon/deequ/anomalydetection/AnomalyDetector.scala index e7146c0e9..0f8a18f87 100644 --- a/src/main/scala/com/amazon/deequ/anomalydetection/AnomalyDetector.scala +++ b/src/main/scala/com/amazon/deequ/anomalydetection/AnomalyDetector.scala @@ -39,7 +39,7 @@ case class AnomalyDetector(strategy: AnomalyDetectionStrategy) { def isNewPointAnomalous( historicalDataPoints: Seq[DataPoint[Double]], newPoint: DataPoint[Double]) - : DetectionResult = { + : AnomalyDetectionResult = { require(historicalDataPoints.nonEmpty, "historicalDataPoints must not be empty!") @@ -57,11 +57,7 @@ case class AnomalyDetector(strategy: AnomalyDetectionStrategy) { val allDataPoints = sortedDataPoints :+ newPoint // Run anomaly - val anomalies = detectAnomaliesInHistory(allDataPoints, (newPoint.time, Long.MaxValue)) - .anomalies - - // Create a Detection result with all anomalies - DetectionResult(anomalies) + detectAnomaliesInHistory(allDataPoints, (newPoint.time, Long.MaxValue)) } /** @@ -74,7 +70,7 @@ case class AnomalyDetector(strategy: AnomalyDetectionStrategy) { def detectAnomaliesInHistory( dataSeries: Seq[DataPoint[Double]], searchInterval: (Long, Long) = (Long.MinValue, Long.MaxValue)) - : DetectionResult = { + : AnomalyDetectionResult = { def findIndexForBound(sortedTimestamps: Seq[Long], boundValue: Long): Int = { sortedTimestamps.search(boundValue).insertionPoint @@ -97,6 +93,6 @@ case class AnomalyDetector(strategy: AnomalyDetectionStrategy) { val anomalies = strategy.detect( sortedSeries.flatMap { _.metricValue }.toVector, (lowerBoundIndex, upperBoundIndex)) - DetectionResult(anomalies.map { case (index, anomaly) => (sortedTimestamps(index), anomaly) }) + AnomalyDetectionResult(anomalies.map { case (index, anomaly) => (sortedTimestamps(index), anomaly) }) } } diff --git a/src/main/scala/com/amazon/deequ/anomalydetection/BaseChangeStrategy.scala b/src/main/scala/com/amazon/deequ/anomalydetection/BaseChangeStrategy.scala index e00c86772..86a183719 100644 --- a/src/main/scala/com/amazon/deequ/anomalydetection/BaseChangeStrategy.scala +++ b/src/main/scala/com/amazon/deequ/anomalydetection/BaseChangeStrategy.scala @@ -80,7 +80,7 @@ trait BaseChangeStrategy override def detect( dataSeries: Vector[Double], searchInterval: (Int, Int)) - : Seq[(Int, Anomaly)] = { + : Seq[(Int, AnomalyDetectionDataPoint)] = { val (start, end) = searchInterval require(start <= end, @@ -89,15 +89,24 @@ trait BaseChangeStrategy val startPoint = Seq(start - order, 0).max val data = diff(DenseVector(dataSeries.slice(startPoint, end): _*), order).data - data.zipWithIndex.filter { case (value, _) => - (value < maxRateDecrease.getOrElse(Double.MinValue) - || value > maxRateIncrease.getOrElse(Double.MaxValue)) - } - .map { case (change, index) => - (index + startPoint + order, Anomaly(Option(dataSeries(index + startPoint + order)), 1.0, - Some(s"[AbsoluteChangeStrategy]: Change of $change is not in bounds [" + - s"${maxRateDecrease.getOrElse(Double.MinValue)}, " + - s"${maxRateIncrease.getOrElse(Double.MaxValue)}]. Order=$order"))) + val lowerBound = maxRateDecrease.getOrElse(Double.MinValue) + val upperBound = maxRateIncrease.getOrElse(Double.MaxValue) + + + data.zipWithIndex.map { + case (change, index) => + val outputSequenceIndex = index + startPoint + order + val value = dataSeries(outputSequenceIndex) + val (detail, isAnomaly) = if (change < lowerBound || change > upperBound) { + (Some(s"[AbsoluteChangeStrategy]: Change of $change is not in bounds [" + + s"$lowerBound, " + + s"$upperBound]. Order=$order"), true) + } + else { + (None, false) + } + (outputSequenceIndex, AnomalyDetectionDataPoint(value, change, + AnomalyThreshold(lowerBound = Bound(lowerBound), upperBound = Bound(upperBound)), isAnomaly, 1.0, detail)) } } } diff --git a/src/main/scala/com/amazon/deequ/anomalydetection/BatchNormalStrategy.scala b/src/main/scala/com/amazon/deequ/anomalydetection/BatchNormalStrategy.scala index baff49c03..f4b7cba5b 100644 --- a/src/main/scala/com/amazon/deequ/anomalydetection/BatchNormalStrategy.scala +++ b/src/main/scala/com/amazon/deequ/anomalydetection/BatchNormalStrategy.scala @@ -51,7 +51,7 @@ case class BatchNormalStrategy( */ override def detect( dataSeries: Vector[Double], - searchInterval: (Int, Int)): Seq[(Int, Anomaly)] = { + searchInterval: (Int, Int)): Seq[(Int, AnomalyDetectionDataPoint)] = { val (searchStart, searchEnd) = searchInterval @@ -83,13 +83,15 @@ case class BatchNormalStrategy( dataSeries.zipWithIndex .slice(searchStart, searchEnd) - .filter { case (value, _) => value > upperBound || value < lowerBound } .map { case (value, index) => - - val detail = Some(s"[BatchNormalStrategy]: Value $value is not in " + - s"bounds [$lowerBound, $upperBound].") - - (index, Anomaly(Option(value), 1.0, detail)) + val (detail, isAnomaly) = if (value > upperBound || value < lowerBound) { + (Some(s"[BatchNormalStrategy]: Value $value is not in " + + s"bounds [$lowerBound, $upperBound]."), true) + } else { + (None, false) + } + (index, AnomalyDetectionDataPoint(value, value, + AnomalyThreshold(lowerBound = Bound(lowerBound), upperBound = Bound(upperBound)), isAnomaly, 1.0, detail)) } } } diff --git a/src/main/scala/com/amazon/deequ/anomalydetection/OnlineNormalStrategy.scala b/src/main/scala/com/amazon/deequ/anomalydetection/OnlineNormalStrategy.scala index 8bf8b634c..69afac963 100644 --- a/src/main/scala/com/amazon/deequ/anomalydetection/OnlineNormalStrategy.scala +++ b/src/main/scala/com/amazon/deequ/anomalydetection/OnlineNormalStrategy.scala @@ -130,7 +130,7 @@ case class OnlineNormalStrategy( override def detect( dataSeries: Vector[Double], searchInterval: (Int, Int)) - : Seq[(Int, Anomaly)] = { + : Seq[(Int, AnomalyDetectionDataPoint)] = { val (searchStart, searchEnd) = searchInterval @@ -139,7 +139,6 @@ case class OnlineNormalStrategy( computeStatsAndAnomalies(dataSeries, searchInterval) .zipWithIndex .slice(searchStart, searchEnd) - .filter { case (result, _) => result.isAnomaly } .map { case (calcRes, index) => val lowerBound = calcRes.mean - lowerDeviationFactor.getOrElse(Double.MaxValue) * calcRes.stdDev @@ -149,7 +148,11 @@ case class OnlineNormalStrategy( val detail = Some(s"[OnlineNormalStrategy]: Value ${dataSeries(index)} is not in " + s"bounds [$lowerBound, $upperBound].") - (index, Anomaly(Option(dataSeries(index)), 1.0, detail)) + val value = dataSeries(index) + + (index, AnomalyDetectionDataPoint(value, value, + AnomalyThreshold(lowerBound = Bound(lowerBound), upperBound = Bound(upperBound)), + calcRes.isAnomaly, 1.0, detail)) } } } diff --git a/src/main/scala/com/amazon/deequ/anomalydetection/SimpleThresholdStrategy.scala b/src/main/scala/com/amazon/deequ/anomalydetection/SimpleThresholdStrategy.scala index ec7f5df74..5564c2ed5 100644 --- a/src/main/scala/com/amazon/deequ/anomalydetection/SimpleThresholdStrategy.scala +++ b/src/main/scala/com/amazon/deequ/anomalydetection/SimpleThresholdStrategy.scala @@ -38,7 +38,7 @@ case class SimpleThresholdStrategy( */ override def detect( dataSeries: Vector[Double], - searchInterval: (Int, Int)): Seq[(Int, Anomaly)] = { + searchInterval: (Int, Int)): Seq[(Int, AnomalyDetectionDataPoint)] = { val (searchStart, searchEnd) = searchInterval @@ -46,13 +46,17 @@ case class SimpleThresholdStrategy( dataSeries.zipWithIndex .slice(searchStart, searchEnd) - .filter { case (value, _) => value < lowerBound || value > upperBound } .map { case (value, index) => - val detail = Some(s"[SimpleThresholdStrategy]: Value $value is not in " + - s"bounds [$lowerBound, $upperBound]") + val (detail, isAnomaly) = if ( value < lowerBound || value > upperBound ) { + (Some(s"[SimpleThresholdStrategy]: Value $value is not in " + + s"bounds [$lowerBound, $upperBound]"), true) + } else { + (None, false) + } - (index, Anomaly(Option(value), 1.0, detail)) + (index, AnomalyDetectionDataPoint(value, value, + AnomalyThreshold(lowerBound = Bound(lowerBound), upperBound = Bound(upperBound)), isAnomaly, 1.0, detail)) } } } diff --git a/src/main/scala/com/amazon/deequ/anomalydetection/seasonal/HoltWinters.scala b/src/main/scala/com/amazon/deequ/anomalydetection/seasonal/HoltWinters.scala index 0ee0ac25f..a129352f4 100644 --- a/src/main/scala/com/amazon/deequ/anomalydetection/seasonal/HoltWinters.scala +++ b/src/main/scala/com/amazon/deequ/anomalydetection/seasonal/HoltWinters.scala @@ -18,7 +18,7 @@ package com.amazon.deequ.anomalydetection.seasonal import breeze.linalg.DenseVector import breeze.optimize.{ApproximateGradientFunction, DiffFunction, LBFGSB} -import com.amazon.deequ.anomalydetection.{Anomaly, AnomalyDetectionStrategy} +import com.amazon.deequ.anomalydetection.{AnomalyDetectionDataPoint, AnomalyDetectionStrategy, AnomalyThreshold, Bound} import collection.mutable.ListBuffer @@ -178,17 +178,27 @@ class HoltWinters( forecasts: Seq[Double], startIndex: Int, residualSD: Double) - : Seq[(Int, Anomaly)] = { + : Seq[(Int, AnomalyDetectionDataPoint)] = { testSeries.zip(forecasts).zipWithIndex - .collect { case ((inputValue, forecastedValue), detectionIndex) - if math.abs(inputValue - forecastedValue) > 1.96 * residualSD => - - detectionIndex + startIndex -> Anomaly( - value = Some(inputValue), - confidence = 1.0, - detail = Some(s"Forecasted $forecastedValue for observed value $inputValue") + .collect { case ((inputValue, forecastedValue), detectionIndex) => + val anomalyMetricValue = math.abs(inputValue - forecastedValue) + val upperBound = 1.96 * residualSD + + val (detail, isAnomaly) = if (anomalyMetricValue > upperBound) { + (Some(s"Forecasted $forecastedValue for observed value $inputValue"), true) + } else { + (None, false) + } + detectionIndex + startIndex -> AnomalyDetectionDataPoint( + dataMetricValue = inputValue, + anomalyMetricValue = anomalyMetricValue, + anomalyThreshold = AnomalyThreshold(upperBound = Bound(upperBound)), + isAnomaly = isAnomaly, + confidence = 1.0, + detail = detail ) + } } @@ -202,7 +212,7 @@ class HoltWinters( override def detect( dataSeries: Vector[Double], searchInterval: (Int, Int) = (0, Int.MaxValue)) - : Seq[(Int, Anomaly)] = { + : Seq[(Int, AnomalyDetectionDataPoint)] = { require(dataSeries.nonEmpty, "Provided data series is empty") diff --git a/src/main/scala/com/amazon/deequ/checks/Check.scala b/src/main/scala/com/amazon/deequ/checks/Check.scala index afd3a1a3b..c7044109a 100644 --- a/src/main/scala/com/amazon/deequ/checks/Check.scala +++ b/src/main/scala/com/amazon/deequ/checks/Check.scala @@ -18,14 +18,13 @@ package com.amazon.deequ.checks import com.amazon.deequ.analyzers.AnalyzerOptions import com.amazon.deequ.analyzers.{Analyzer, Histogram, KLLParameters, Patterns, State} -import com.amazon.deequ.anomalydetection.{AnomalyDetectionStrategy, AnomalyDetector, DataPoint} +import com.amazon.deequ.anomalydetection.{AnomalyDetectionAssertionResult, AnomalyDetectionDataPoint, AnomalyDetectionMetadata, AnomalyDetectionResult, AnomalyDetectionStrategy, AnomalyDetector, AnomalyThreshold, Bound, DataPoint, HistoryUtils} import com.amazon.deequ.analyzers.runners.AnalyzerContext import com.amazon.deequ.constraints.Constraint._ import com.amazon.deequ.constraints._ import com.amazon.deequ.metrics.{BucketDistribution, Distribution, Metric} import com.amazon.deequ.repository.MetricsRepository import org.apache.spark.sql.expressions.UserDefinedFunction -import com.amazon.deequ.anomalydetection.HistoryUtils import com.amazon.deequ.checks.ColumnCondition.{isAnyNotNull, isEachNotNull} import scala.util.matching.Regex @@ -1092,7 +1091,10 @@ case class Check( case nc: ConstraintDecorator => nc.inner case c: Constraint => c } - .collect { case constraint: AnalysisBasedConstraint[_, _, _] => constraint.analyzer } + .collect { + case constraint: AnalysisBasedConstraint[_, _, _] => constraint.analyzer + case constraint: AnomalyBasedConstraint[_, _, _] => constraint.analyzer + } .map { _.asInstanceOf[Analyzer[_, Metric[_]]] } .toSet } @@ -1134,7 +1136,7 @@ object Check { afterDate: Option[Long], beforeDate: Option[Long])( currentMetricValue: Double) - : Boolean = { + : AnomalyDetectionAssertionResult = { // Get history keys var repositoryLoader = metricsRepository.load() @@ -1178,10 +1180,32 @@ object Check { // Run given anomaly detection strategy and return false if the newest value is an Anomaly val anomalyDetector = AnomalyDetector(anomalyDetectionStrategy) - val detectedAnomalies = anomalyDetector.isNewPointAnomalous( + val anomalyDetectionResult: AnomalyDetectionResult = anomalyDetector.isNewPointAnomalous( HistoryUtils.extractMetricValues[Double](historicalMetrics), DataPoint(testDateTime, Some(currentMetricValue))) - detectedAnomalies.anomalies.isEmpty + + // this function checks if the newest point is anomalous and returns a boolean for assertion, + // along with that newest point with anomaly check details + getNewestPointAnomalyResults(anomalyDetectionResult) + } + + private[deequ] def getNewestPointAnomalyResults(anomalyDetectionResult: AnomalyDetectionResult): + AnomalyDetectionAssertionResult = { + val (hasNoAnomaly, anomalyDetectionMetaData): (Boolean, AnomalyDetectionMetadata) = { + + // Based on upstream code, this anomaly detection data point sequence should never be empty + require(anomalyDetectionResult.anomalyDetectionDataPointSequence != Nil, + "AnomalyDetectionDataPointSequence cannot be empty") + + // get the last anomaly detection data point of sequence (there should only be one element for now) + // and check the isAnomaly boolean, also return the last anomaly detection data point + // wrapped in the anomaly detection metadata class + anomalyDetectionResult.anomalyDetectionDataPointSequence match { + case _ :+ lastAnomalyDataPoint => + (!lastAnomalyDataPoint._2.isAnomaly, AnomalyDetectionMetadata(lastAnomalyDataPoint._2)) + } + } + AnomalyDetectionAssertionResult(hasNoAnomaly = hasNoAnomaly, anomalyDetectionMetadata = anomalyDetectionMetaData) } } diff --git a/src/main/scala/com/amazon/deequ/constraints/AnomalyBasedConstraint.scala b/src/main/scala/com/amazon/deequ/constraints/AnomalyBasedConstraint.scala index ae23a604e..4c3392fdb 100644 --- a/src/main/scala/com/amazon/deequ/constraints/AnomalyBasedConstraint.scala +++ b/src/main/scala/com/amazon/deequ/constraints/AnomalyBasedConstraint.scala @@ -17,25 +17,26 @@ package com.amazon.deequ.constraints import com.amazon.deequ.analyzers.{Analyzer, State} -import com.amazon.deequ.anomalydetection.{AnomalyAssertionResult, AnomalyThreshold, DetectionResult} +import com.amazon.deequ.anomalydetection.AnomalyDetectionAssertionResult import com.amazon.deequ.metrics.Metric import org.apache.spark.sql.DataFrame -import scala.util.{Failure, Success, Try} +import scala.util.{Failure, Success} /** * Case class for anomaly based constraints that provides unified way to access * AnalyzerContext and metrics stored in it. * TODO this differs from AnalysisBasedConstraint only in that it uses an assertion function that - * TODO returns an AnomalyAssertionResult with a potential anomaly as well as the default boolean - * TODO figure out if it's better to use some inheritance/composition + * returns an AnomalyAssertionResult with an anomaly detection metadata as well as the assertion boolean. + * Figure out if it's better to use some inheritance/composition from a common trait/abstract class. * * Runs the analysis and get the value of the metric returned by the analysis, * picks the numeric value that will be used in the assertion function with metric picker * runs the assertion. * * @param analyzer Analyzer to be run on the data frame - * @param assertion Assertion function that returns an AnomalyAssertionResult with a potential anomaly + * @param assertion Assertion function that returns an AnomalyDetectionAssertionResult with + * anomaly detection metadata as well as the assertion boolean * @param valuePicker Optional function to pick the interested part of the metric value that the * assertion will be running on. Absence of such function means the metric * value would be used in the assertion as it is. @@ -45,10 +46,10 @@ import scala.util.{Failure, Success, Try} * */ private[deequ] case class AnomalyBasedConstraint[S <: State[S], M, V]( - analyzer: Analyzer[S, Metric[M]], - private[deequ] val assertion: V => AnomalyAssertionResult, - private[deequ] val valuePicker: Option[M => V] = None, - private[deequ] val hint: Option[String] = None) + analyzer: Analyzer[S, Metric[M]], + private[deequ] val assertion: V => AnomalyDetectionAssertionResult, + private[deequ] val valuePicker: Option[M => V] = None, + private[deequ] val hint: Option[String] = None) extends Constraint { private[deequ] def calculateAndEvaluate(data: DataFrame) = { @@ -80,13 +81,14 @@ private[deequ] case class AnomalyBasedConstraint[S <: State[S], M, V]( if (anomalyAssertionResult.hasNoAnomaly) { ConstraintResult(this, ConstraintStatus.Success, metric = Some(metric), - anomaly = anomalyAssertionResult.anomaly) + anomalyDetectionMetadata = Some(anomalyAssertionResult.anomalyDetectionMetadata)) } else { - var errorMessage = s"Value: $assertOn does not meet the constraint requirement, has anomaly, check result!" + var errorMessage = s"Value: $assertOn does not meet the constraint requirement," + + s" check the anomaly detection metadata!" hint.foreach(hint => errorMessage += s" $hint") ConstraintResult(this, ConstraintStatus.Failure, Some(errorMessage), Some(metric), - anomaly = anomalyAssertionResult.anomaly) + anomalyDetectionMetadata = Some(anomalyAssertionResult.anomalyDetectionMetadata)) } } catch { @@ -110,7 +112,7 @@ private[deequ] case class AnomalyBasedConstraint[S <: State[S], M, V]( case e: Exception => throw AnomalyBasedConstraint.ValuePickerException(e.getMessage) } - private def runAssertion(assertOn: V): AnomalyAssertionResult = + private def runAssertion(assertOn: V): AnomalyDetectionAssertionResult = try { assertion(assertOn) } catch { diff --git a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala index 39d8f522f..7b8cbd9e0 100644 --- a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala +++ b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala @@ -17,6 +17,7 @@ package com.amazon.deequ.constraints import com.amazon.deequ.analyzers._ +import com.amazon.deequ.anomalydetection.{AnomalyDetectionAssertionResult, AnomalyDetectionDataPoint, AnomalyDetectionMetadata, AnomalyDetectionResult} import com.amazon.deequ.metrics.{BucketDistribution, Distribution, Metric} import org.apache.spark.sql.expressions.UserDefinedFunction @@ -26,11 +27,24 @@ object ConstraintStatus extends Enumeration { val Success, Failure = Value } +/** + * ConstraintResult Class + * + * @param constraint Constraint associated with result + * @param status Status of constraint (Success, Failure) + * @param message Optional message for errors + * @param metric Optional Metric from calculation + * @param anomalyDetectionMetadata anomaly detection metadata details (including anomaly thresholds) for users + * TODO figure out if the new anomaly metadata field should be here or use inheritance/composition + * in a separate AnomalyConstraintResult class that extends a ConstraintResult trait/abstract class + * since it will only be populated for anomaly constraints and not other constraints + */ case class ConstraintResult( constraint: Constraint, status: ConstraintStatus.Value, message: Option[String] = None, - metric: Option[Metric[_]] = None) + metric: Option[Metric[_]] = None, + anomalyDetectionMetadata: Option[AnomalyDetectionMetadata] = None) /** Common trait for all data quality constraints */ trait Constraint extends Serializable { @@ -215,17 +229,16 @@ object Constraint { * * @param analyzer Analyzer for the metric to do Anomaly Detection on * @param anomalyAssertion Function that receives a double input parameter - * (since the metric is double metric) and returns a boolean + * (since the metric is double metric) and returns an Anomaly Detection Assertion Result * @param hint A hint to provide additional context why a constraint could have failed */ def anomalyConstraint[S <: State[S]]( - analyzer: Analyzer[S, Metric[Double]], - anomalyAssertion: Double => Boolean, - hint: Option[String] = None) + analyzer: Analyzer[S, Metric[Double]], + anomalyAssertion: Double => AnomalyDetectionAssertionResult, + hint: Option[String] = None) : Constraint = { - val constraint = AnalysisBasedConstraint[S, Double, Double](analyzer, anomalyAssertion, - hint = hint) + val constraint = AnomalyBasedConstraint[S, Double, Double](analyzer, anomalyAssertion, hint = hint) new NamedConstraint(constraint, s"AnomalyConstraint($analyzer)") } diff --git a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala index a21bfc934..26de8dc86 100644 --- a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala +++ b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala @@ -18,7 +18,7 @@ package com.amazon.deequ import com.amazon.deequ.analyzers._ import com.amazon.deequ.analyzers.runners.AnalyzerContext -import com.amazon.deequ.anomalydetection.AbsoluteChangeStrategy +import com.amazon.deequ.anomalydetection.{AbsoluteChangeStrategy, AnomalyDetectionDataPoint, AnomalyThreshold, Bound} import com.amazon.deequ.checks.Check import com.amazon.deequ.checks.CheckLevel import com.amazon.deequ.checks.CheckStatus @@ -625,12 +625,33 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec .run() val checkResultsOne = verificationResultOne.checkResults.head._2.status + val actualResultsOneAnomalyDetectionDataPoint = + verificationResultOne.checkResults.head._2.constraintResults.head + .anomalyDetectionMetadata.get.anomalyDetectionDataPoint + val expectedResultsOneAnomalyDetectionDataPoint = + AnomalyDetectionDataPoint(11.0, 7.0, AnomalyThreshold(Bound(-2.0), Bound(2.0)), isAnomaly = true, 1.0) + + val checkResultsTwo = verificationResultTwo.checkResults.head._2.status + val actualResultsTwoAnomalyDetectionDataPoint = + verificationResultTwo.checkResults.head._2.constraintResults.head + .anomalyDetectionMetadata.get.anomalyDetectionDataPoint + val expectedResultsTwoAnomalyDetectionDataPoint = + AnomalyDetectionDataPoint(11.0, 0.0, AnomalyThreshold(Bound(-7.0), Bound(7.0)), isAnomaly = false, 1.0) + val checkResultsThree = verificationResultThree.checkResults.head._2.status + val actualResultsThreeAnomalyDetectionDataPoint = + verificationResultThree.checkResults.head._2.constraintResults.head + .anomalyDetectionMetadata.get.anomalyDetectionDataPoint + val expectedResultsThreeAnomalyDetectionDataPoint = + AnomalyDetectionDataPoint(11.0, 0.0, AnomalyThreshold(Bound(-7.0), Bound(7.0)), isAnomaly = false, 1.0) assert(checkResultsOne == CheckStatus.Warning) + assert(actualResultsOneAnomalyDetectionDataPoint == expectedResultsOneAnomalyDetectionDataPoint) assert(checkResultsTwo == CheckStatus.Success) + assert(actualResultsTwoAnomalyDetectionDataPoint == expectedResultsTwoAnomalyDetectionDataPoint) assert(checkResultsThree == CheckStatus.Success) + assert(actualResultsThreeAnomalyDetectionDataPoint == expectedResultsThreeAnomalyDetectionDataPoint) } } @@ -670,10 +691,23 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec .run() val checkResultsOne = verificationResultOne.checkResults.values.toSeq(1).status + val actualResultsOneAnomalyDetectionDataPoint = + verificationResultOne.checkResults.values.toSeq(1).constraintResults.head + .anomalyDetectionMetadata.get.anomalyDetectionDataPoint + val expectedResultsOneAnomalyDetectionDataPoint = + AnomalyDetectionDataPoint(11.0, 7.0, AnomalyThreshold(Bound(-2.0), Bound(2.0)), isAnomaly = true, 1.0) + val checkResultsTwo = verificationResultTwo.checkResults.head._2.status + val actualResultsTwoAnomalyDetectionDataPoint = + verificationResultTwo.checkResults.head._2.constraintResults.head + .anomalyDetectionMetadata.get.anomalyDetectionDataPoint + val expectedResultsTwoAnomalyDetectionDataPoint = + AnomalyDetectionDataPoint(11.0, 0.0, AnomalyThreshold(Bound(-7.0), Bound(7.0)), isAnomaly = false, 1.0) assert(checkResultsOne == CheckStatus.Warning) + assert(actualResultsOneAnomalyDetectionDataPoint == expectedResultsOneAnomalyDetectionDataPoint) assert(checkResultsTwo == CheckStatus.Success) + assert(actualResultsTwoAnomalyDetectionDataPoint == expectedResultsTwoAnomalyDetectionDataPoint) } } diff --git a/src/test/scala/com/amazon/deequ/anomalydetection/AbsoluteChangeStrategyTest.scala b/src/test/scala/com/amazon/deequ/anomalydetection/AbsoluteChangeStrategyTest.scala index 66d3c737a..f92497c4e 100644 --- a/src/test/scala/com/amazon/deequ/anomalydetection/AbsoluteChangeStrategyTest.scala +++ b/src/test/scala/com/amazon/deequ/anomalydetection/AbsoluteChangeStrategyTest.scala @@ -19,6 +19,8 @@ package com.amazon.deequ.anomalydetection import breeze.linalg.DenseVector import org.scalatest.{Matchers, WordSpec} +import scala.collection.mutable + class AbsoluteChangeStrategyTest extends WordSpec with Matchers { "Absolute Change Strategy" should { @@ -32,99 +34,139 @@ class AbsoluteChangeStrategyTest extends WordSpec with Matchers { } }).toVector + "detect all anomalies if no interval specified" in { - val anomalyResult = strategy.detect(data) - val expected = for (i <- 20 to 31) yield { - (i, Anomaly(Option(data(i)), 1.0)) - } - assert(anomalyResult == expected) + val anomalyResult = strategy.detect(data).filter({case (_, anom) => anom.isAnomaly}) + val expectedAnomalyThreshold = AnomalyThreshold(Bound(-2.0), Bound(2.0)) + val expectedResult: Seq[(Int, AnomalyDetectionDataPoint)] = Seq( + (20, AnomalyDetectionDataPoint(20, 19, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (21, AnomalyDetectionDataPoint(-21, -41, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (22, AnomalyDetectionDataPoint(22, 43, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (23, AnomalyDetectionDataPoint(-23, -45, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (24, AnomalyDetectionDataPoint(24, 47, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (25, AnomalyDetectionDataPoint(-25, -49, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (26, AnomalyDetectionDataPoint(26, 51, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (27, AnomalyDetectionDataPoint(-27, -53, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (28, AnomalyDetectionDataPoint(28, 55, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (29, AnomalyDetectionDataPoint(-29, -57, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (30, AnomalyDetectionDataPoint(30, 59, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (31, AnomalyDetectionDataPoint(1, -29, expectedAnomalyThreshold, isAnomaly = true, 1.0)) + ) + assert(anomalyResult == expectedResult) } "only detect anomalies in interval" in { - val anomalyResult = strategy.detect(data, (25, 50)) - val expected = for (i <- 25 to 31) yield { - (i, Anomaly(Option(data(i)), 1.0)) - } - assert(anomalyResult == expected) + val anomalyResult = strategy.detect(data, (25, 50)).filter({case (_, anom) => anom.isAnomaly}) + val expectedAnomalyThreshold = AnomalyThreshold(Bound(-2.0), Bound(2.0)) + val expectedResult: Seq[(Int, AnomalyDetectionDataPoint)] = Seq( + (25, AnomalyDetectionDataPoint(-25, -49, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (26, AnomalyDetectionDataPoint(26, 51, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (27, AnomalyDetectionDataPoint(-27, -53, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (28, AnomalyDetectionDataPoint(28, 55, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (29, AnomalyDetectionDataPoint(-29, -57, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (30, AnomalyDetectionDataPoint(30, 59, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (31, AnomalyDetectionDataPoint(1, -29, expectedAnomalyThreshold, isAnomaly = true, 1.0)) + ) + assert(anomalyResult == expectedResult) } "ignore min rate if none is given" in { val strategy = AbsoluteChangeStrategy(None, Some(1.0)) - val anomalyResult = strategy.detect(data) - + val anomalyResult = strategy.detect(data).filter({case (_, anom) => anom.isAnomaly}) + val expectedAnomalyThreshold = AnomalyThreshold(upperBound = Bound(1.0)) // Anomalies with positive values only - val expected = for (i <- 20 to 30 by 2) yield { - (i, Anomaly(Option(data(i)), 1.0)) - } - assert(anomalyResult == expected) + val expectedResult: Seq[(Int, AnomalyDetectionDataPoint)] = Seq( + (20, AnomalyDetectionDataPoint(20, 19, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (22, AnomalyDetectionDataPoint(22, 43, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (24, AnomalyDetectionDataPoint(24, 47, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (26, AnomalyDetectionDataPoint(26, 51, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (28, AnomalyDetectionDataPoint(28, 55, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (30, AnomalyDetectionDataPoint(30, 59, expectedAnomalyThreshold, isAnomaly = true, 1.0)) + ) + + assert(anomalyResult == expectedResult) } "ignore max rate if none is given" in { val strategy = AbsoluteChangeStrategy(Some(-1.0), None) - val anomalyResult = strategy.detect(data) + val anomalyResult = strategy.detect(data).filter({case (_, anom) => anom.isAnomaly}) + val expectedAnomalyThreshold = AnomalyThreshold(lowerBound = Bound(-1.0)) // Anomalies with negative values only - val expected = for (i <- 21 to 31 by 2) yield { - (i, Anomaly(Option(data(i)), 1.0)) - } - assert(anomalyResult == expected) + val expectedResult: Seq[(Int, AnomalyDetectionDataPoint)] = Seq( + (21, AnomalyDetectionDataPoint(-21, -41, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (23, AnomalyDetectionDataPoint(-23, -45, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (25, AnomalyDetectionDataPoint(-25, -49, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (27, AnomalyDetectionDataPoint(-27, -53, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (29, AnomalyDetectionDataPoint(-29, -57, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (31, AnomalyDetectionDataPoint(1, -29, expectedAnomalyThreshold, isAnomaly = true, 1.0)) + ) + assert(anomalyResult == expectedResult) } "detect no anomalies if rates are set to min/ max value" in { val strategy = AbsoluteChangeStrategy(Some(Double.MinValue), Some(Double.MaxValue)) - val anomalyResult = strategy.detect(data) + val anomalyResult = strategy.detect(data).filter({case (_, anom) => anom.isAnomaly}) - val expected: List[(Int, Anomaly)] = List() - assert(anomalyResult == expected) + val expectedResult: List[(Int, AnomalyDetectionDataPoint)] = List() + assert(anomalyResult == expectedResult) } "derive first order correctly" in { val data = DenseVector(1.0, 2.0, 4.0, 1.0, 2.0, 8.0) val result = strategy.diff(data, 1).data - val expected = Array(1.0, 2.0, -3.0, 1.0, 6.0) - assert(result === expected) + val expectedResult = Array(1.0, 2.0, -3.0, 1.0, 6.0) + assert(result === expectedResult) } "derive second order correctly" in { val data = DenseVector(1.0, 2.0, 4.0, 1.0, 2.0, 8.0) val result = strategy.diff(data, 2).data - val expected = Array(1.0, -5.0, 4.0, 5.0) - assert(result === expected) + val expectedResult = Array(1.0, -5.0, 4.0, 5.0) + assert(result === expectedResult) } "derive third order correctly" in { val data = DenseVector(1.0, 5.0, -10.0, 3.0, 100.0, 0.01, 0.0065) val result = strategy.diff(data, 3).data - val expected = Array(47, 56, -280.99, 296.9765) - assert(result === expected) + val expectedResult = Array(47, 56, -280.99, 296.9765) + assert(result === expectedResult) } "attribute indices correctly for higher orders without search interval" in { val data = Vector(0.0, 1.0, 3.0, 6.0, 18.0, 72.0) val strategy = AbsoluteChangeStrategy(None, Some(8.0), order = 2) - val result = strategy.detect(data) + val result = strategy.detect(data).filter({ case (_, anom) => anom.isAnomaly }) - val expected = Seq((4, Anomaly(Option(18.0), 1.0)), (5, Anomaly(Option(72.0), 1.0))) - assert(result == expected) + val expectedResult = Seq( + (4, AnomalyDetectionDataPoint(18.0, 9.0, AnomalyThreshold(upperBound = Bound(8.0)), isAnomaly = true, 1.0)), + (5, AnomalyDetectionDataPoint(72.0, 42.0, AnomalyThreshold(upperBound = Bound(8.0)), isAnomaly = true, 1.0)) + ) + assert(result == expectedResult) } "attribute indices correctly for higher orders with search interval" in { val data = Vector(0.0, 1.0, 3.0, 6.0, 18.0, 72.0) val strategy = AbsoluteChangeStrategy(None, Some(8.0), order = 2) - val result = strategy.detect(data, (5, 6)) + val result = strategy.detect(data, (5, 6)).filter({case (_, anom) => anom.isAnomaly}) - val expected = Seq((5, Anomaly(Option(72.0), 1.0))) - assert(result == expected) + val expectedResult = Seq( + (5, AnomalyDetectionDataPoint(72.0, 42.0, AnomalyThreshold(upperBound = Bound(8.0)), isAnomaly = true, 1.0)) + ) + assert(result == expectedResult) } "behave like the threshold strategy when order is 0" in { val data = Vector(1.0, -1.0, 4.0, -7.0) - val result = strategy.detect(data) + val result = strategy.detect(data).filter({case (_, anom) => anom.isAnomaly}) - val expected = Seq((2, Anomaly(Option(4.0), 1.0)), (3, Anomaly(Option(-7.0), 1.0))) - assert(result == expected) + val expectedResult = Seq( + (2, AnomalyDetectionDataPoint(4.0, 5.0, AnomalyThreshold(Bound(-2.0), Bound(2.0)), isAnomaly = true, 1.0)), + (3, AnomalyDetectionDataPoint(-7.0, -11.0, AnomalyThreshold(Bound(-2.0), Bound(2.0)), isAnomaly = true, 1.0)) + ) + assert(result == expectedResult) } "throw an error when rates aren't ordered" in { @@ -141,18 +183,31 @@ class AbsoluteChangeStrategyTest extends WordSpec with Matchers { "work fine with empty input" in { val emptySeries = Vector[Double]() - val anomalyResult = strategy.detect(emptySeries) + val anomalyResult = strategy.detect(emptySeries).filter({case (_, anom) => anom.isAnomaly}) - assert(anomalyResult == Seq[(Int, Anomaly)]()) + assert(anomalyResult == Seq[(Int, AnomalyDetectionDataPoint)]()) } "produce error message with correct value and bounds" in { - val result = strategy.detect(data) + val result = strategy.detect(data).filter({case (_, anom) => anom.isAnomaly}) result.foreach { case (_, anom) => val (value, lowerBound, upperBound) = AnomalyDetectionTestUtils.firstThreeDoublesFromString(anom.detail.get) + assert(value === anom.anomalyMetricValue) + assert(value < lowerBound || value > upperBound) + } + } + + "assert anomalies are outside of anomaly bounds" in { + val result = strategy.detect(data).filter({ case (_, anom) => anom.isAnomaly }) + + result.foreach { case (_, anom) => + val value = anom.anomalyMetricValue + val upperBound = anom.anomalyThreshold.upperBound.value + val lowerBound = anom.anomalyThreshold.lowerBound.value + assert(value < lowerBound || value > upperBound) } } diff --git a/src/test/scala/com/amazon/deequ/anomalydetection/AnomalyDetectorTest.scala b/src/test/scala/com/amazon/deequ/anomalydetection/AnomalyDetectorTest.scala index 08f411bd1..5852bca7e 100644 --- a/src/test/scala/com/amazon/deequ/anomalydetection/AnomalyDetectorTest.scala +++ b/src/test/scala/com/amazon/deequ/anomalydetection/AnomalyDetectorTest.scala @@ -35,20 +35,20 @@ class AnomalyDetectorTest extends WordSpec with Matchers with MockFactory with P DataPoint[Double](2L, None), DataPoint[Double](3L, Option(1.0))) (fakeAnomalyDetector.detect _ when(Vector(1.0, 2.0, 1.0), (0, 3))) - .returns(Seq((1, Anomaly(Option(2.0), 1.0)))) + .returns(Seq((1, AnomalyDetectionDataPoint(2.0, 2.0, AnomalyThreshold(), confidence = 1.0)))) val anomalyResult = aD.detectAnomaliesInHistory(data, (0L, 4L)) - assert(anomalyResult == DetectionResult(Seq((1L, Anomaly(Option(2.0), 1.0))))) + assert(anomalyResult == AnomalyDetectionResult(Seq((1L, AnomalyDetectionDataPoint(2.0, 2.0, confidence = 1.0))))) } "only detect values in range" in { (fakeAnomalyDetector.detect _ when(Vector(-1.0, 2.0, 3.0, 0.5), (2, 4))) - .returns(Seq((2, Anomaly(Option(3.0), 1.0)))) + .returns(Seq((2, AnomalyDetectionDataPoint(3.0, 3.0, confidence = 1.0)))) val anomalyResult = aD.detectAnomaliesInHistory(data, (2L, 4L)) - assert(anomalyResult == DetectionResult(Seq((2L, Anomaly(Option(3.0), 1.0))))) + assert(anomalyResult == AnomalyDetectionResult(Seq((2L, AnomalyDetectionDataPoint(3.0, 3.0, confidence = 1.0))))) } "throw an error when intervals are not ordered" in { @@ -65,12 +65,18 @@ class AnomalyDetectorTest extends WordSpec with Matchers with MockFactory with P } (fakeAnomalyDetector.detect _ when(data.map(_.metricValue.get).toVector, (0, 2))) - .returns (Seq((0, Anomaly(Option(5.0), 1.0)), (1, Anomaly(Option(5.0), 1.0)))) + .returns ( + Seq( + (0, AnomalyDetectionDataPoint(5.0, 5.0, confidence = 1.0)), + (1, AnomalyDetectionDataPoint(5.0, 5.0, confidence = 1.0)) + ) + ) val anomalyResult = aD.detectAnomaliesInHistory(data, (200L, 401L)) - assert(anomalyResult == DetectionResult(Seq((200L, Anomaly(Option(5.0), 1.0)), - (400L, Anomaly(Option(5.0), 1.0))))) + assert(anomalyResult == AnomalyDetectionResult(Seq( + (200L, AnomalyDetectionDataPoint(5.0, 5.0, confidence = 1.0)), + (400L, AnomalyDetectionDataPoint(5.0, 5.0, confidence = 1.0))))) } "treat unordered values with time gaps correctly" in { @@ -80,13 +86,20 @@ class AnomalyDetectorTest extends WordSpec with Matchers with MockFactory with P val tS = AnomalyDetector(SimpleThresholdStrategy(lowerBound = -0.5, upperBound = 1.0)) (fakeAnomalyDetector.detect _ when(Vector(0.5, -1.0, 3.0, 2.0), (0, 4))) - .returns(Seq((1, Anomaly(Option(-1.0), 1.0)), (2, Anomaly(Option(3.0), 1.0)), - (3, Anomaly(Option(2.0), 1.0)))) + .returns( + Seq( + (1, AnomalyDetectionDataPoint(-1.0, -1.0, confidence = 1.0)), + (2, AnomalyDetectionDataPoint(3.0, 3.0, confidence = 1.0)), + (3, AnomalyDetectionDataPoint(2.0, 2.0, confidence = 1.0)) + ) + ) val anomalyResult = aD.detectAnomaliesInHistory(data) - assert(anomalyResult == DetectionResult(Seq((10L, Anomaly(Option(-1.0), 1.0)), - (11L, Anomaly(Option(3.0), 1.0)), (25L, Anomaly(Option(2.0), 1.0))))) + assert(anomalyResult == AnomalyDetectionResult( + Seq((10L, AnomalyDetectionDataPoint(-1.0, -1.0, confidence = 1.0)), + (11L, AnomalyDetectionDataPoint(3.0, 3.0, confidence = 1.0)), + (25L, AnomalyDetectionDataPoint(2.0, 2.0, confidence = 1.0))))) } "treat unordered values without time gaps correctly" in { @@ -95,13 +108,16 @@ class AnomalyDetectorTest extends WordSpec with Matchers with MockFactory with P } (fakeAnomalyDetector.detect _ when(Vector(0.5, -1.0, 3.0, 2.0), (0, 4))) - .returns(Seq((1, Anomaly(Option(-1.0), 1.0)), (2, Anomaly(Option(3.0), 1.0)), - (3, Anomaly(Option(2.0), 1.0)))) + .returns(Seq((1, AnomalyDetectionDataPoint(-1.0, -1.0, confidence = 1.0)), + (2, AnomalyDetectionDataPoint(3.0, 3.0, confidence = 1.0)), + (3, AnomalyDetectionDataPoint(2.0, 2.0, confidence = 1.0)))) val anomalyResult = aD.detectAnomaliesInHistory(data) - assert(anomalyResult == DetectionResult(Seq((1L, Anomaly(Option(-1.0), 1.0)), - (2L, Anomaly(Option(3.0), 1.0)), (3L, Anomaly(Option(2.0), 1.0))))) + assert(anomalyResult == AnomalyDetectionResult(Seq( + (1L, AnomalyDetectionDataPoint(-1.0, -1.0, confidence = 1.0)), + (2L, AnomalyDetectionDataPoint(3.0, 3.0, confidence = 1.0)), + (3L, AnomalyDetectionDataPoint(2.0, 2.0, confidence = 1.0))))) } } diff --git a/src/test/scala/com/amazon/deequ/anomalydetection/BatchNormalStrategyTest.scala b/src/test/scala/com/amazon/deequ/anomalydetection/BatchNormalStrategyTest.scala index 05b9a6272..8fdf6e74e 100644 --- a/src/test/scala/com/amazon/deequ/anomalydetection/BatchNormalStrategyTest.scala +++ b/src/test/scala/com/amazon/deequ/anomalydetection/BatchNormalStrategyTest.scala @@ -39,57 +39,77 @@ class BatchNormalStrategyTest extends WordSpec with Matchers { val data = dist.toVector "only detect anomalies in interval" in { - val anomalyResult = strategy.detect(data, (25, 50)) - val expected = for (i <- 25 to 30) yield { - (i, Anomaly(Option(data(i)), 1.0)) - } - assert(anomalyResult == expected) + val anomalyResult = strategy.detect(data, (25, 50)).filter({case (_, anom) => anom.isAnomaly}) + + val expectedAnomalyThreshold = AnomalyThreshold(Bound(-9.280850004177061), Bound(10.639954755150061)) + val expectedResult: Seq[(Int, AnomalyDetectionDataPoint)] = Seq( + (25, AnomalyDetectionDataPoint(data(25), data(25), expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (26, AnomalyDetectionDataPoint(data(26), data(26), expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (27, AnomalyDetectionDataPoint(data(27), data(27), expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (28, AnomalyDetectionDataPoint(data(28), data(28), expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (29, AnomalyDetectionDataPoint(data(29), data(29), expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (30, AnomalyDetectionDataPoint(data(30), data(30), expectedAnomalyThreshold, isAnomaly = true, 1.0)) + ) + assert(anomalyResult == expectedResult) } "ignore lower factor if none is given" in { val strategy = BatchNormalStrategy(None, Some(1.0)) - val anomalyResult = strategy.detect(data, (20, 31)) + val anomalyResult = strategy.detect(data, (20, 31)).filter({case (_, anom) => anom.isAnomaly}) + val expectedAnomalyThreshold = AnomalyThreshold(Bound(Double.NegativeInfinity), Bound(0.7781496015857838)) // Anomalies with positive values only - val expected = for (i <- 20 to 30 by 2) yield { - (i, Anomaly(Option(data(i)), 1.0)) - } - assert(anomalyResult == expected) + val expectedResult: Seq[(Int, AnomalyDetectionDataPoint)] = Seq( + (20, AnomalyDetectionDataPoint(data(20), data(20), expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (22, AnomalyDetectionDataPoint(data(22), data(22), expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (24, AnomalyDetectionDataPoint(data(24), data(24), expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (26, AnomalyDetectionDataPoint(data(26), data(26), expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (28, AnomalyDetectionDataPoint(data(28), data(28), expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (30, AnomalyDetectionDataPoint(data(30), data(30), expectedAnomalyThreshold, isAnomaly = true, 1.0)) + ) + assert(anomalyResult == expectedResult) } "ignore upper factor if none is given" in { val strategy = BatchNormalStrategy(Some(1.0), None) - val anomalyResult = strategy.detect(data, (10, 30)) + val anomalyResult = strategy.detect(data, (10, 30)).filter({case (_, anom) => anom.isAnomaly}) + val expectedAnomalyThreshold = AnomalyThreshold(Bound(-5.063730045618394), Bound(Double.PositiveInfinity)) // Anomalies with negative values only - val expected = for (i <- 21 to 29 by 2) yield { - (i, Anomaly(Option(data(i)), 1.0)) - } - assert(anomalyResult == expected) + val expectedResult: Seq[(Int, AnomalyDetectionDataPoint)] = Seq( + (21, AnomalyDetectionDataPoint(data(21), data(21), expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (23, AnomalyDetectionDataPoint(data(23), data(23), expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (25, AnomalyDetectionDataPoint(data(25), data(25), expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (27, AnomalyDetectionDataPoint(data(27), data(27), expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (29, AnomalyDetectionDataPoint(data(29), data(29), expectedAnomalyThreshold, isAnomaly = true, 1.0)) + ) + assert(anomalyResult == expectedResult) } "ignore values in interval for mean/ stdDev if specified" in { val data = Vector(1.0, 1.0, 1.0, 1000.0, 500.0, 1.0) val strategy = BatchNormalStrategy(Some(3.0), Some(3.0)) - val anomalyResult = strategy.detect(data, (3, 5)) - val expected = Seq((3, Anomaly(Option(data(3)), 1.0)), (4, Anomaly(Option(data(4)), 1.0))) + val anomalyResult = strategy.detect(data, (3, 5)).filter({case (_, anom) => anom.isAnomaly}) - assert(anomalyResult == expected) + val expectedResult: Seq[(Int, AnomalyDetectionDataPoint)] = Seq( + (3, AnomalyDetectionDataPoint(1000, 1000, AnomalyThreshold(Bound(1.0), Bound(1.0)), isAnomaly = true, 1.0)), + (4, AnomalyDetectionDataPoint(500, 500, AnomalyThreshold(Bound(1.0), Bound(1.0)), isAnomaly = true, 1.0)) + ) + assert(anomalyResult == expectedResult) } "throw an exception when trying to exclude all data points from calculation" in { val strategy = BatchNormalStrategy() intercept[IllegalArgumentException] { - strategy.detect(data) + strategy.detect(data).filter({case (_, anom) => anom.isAnomaly}) } } - "detect no anomalies if factors are set to max value" in { val strategy = BatchNormalStrategy(Some(Double.MaxValue), Some(Double.MaxValue)) - val anomalyResult = strategy.detect(data, (30, 51)) + val anomalyResult = strategy.detect(data, (30, 51)).filter({case (_, anom) => anom.isAnomaly}) - val expected: List[(Int, Anomaly)] = List() + val expected: List[(Int, AnomalyDetectionDataPoint)] = List() assert(anomalyResult == expected) } @@ -109,13 +129,25 @@ class BatchNormalStrategyTest extends WordSpec with Matchers { } "produce error message with correct value and bounds" in { - val result = strategy.detect(data, (25, 50)) + val result = strategy.detect(data, (25, 50)).filter({case (_, anom) => anom.isAnomaly}) result.foreach { case (_, anom) => val (value, lowerBound, upperBound) = AnomalyDetectionTestUtils.firstThreeDoublesFromString(anom.detail.get) - assert(anom.value.isDefined && value === anom.value.get) + assert(value === anom.anomalyMetricValue) + assert(value < lowerBound || value > upperBound) + } + } + + "assert anomalies are outside of anomaly bounds" in { + val result = strategy.detect(data, (25, 50)).filter({ case (_, anom) => anom.isAnomaly }) + + result.foreach { case (_, anom) => + val value = anom.anomalyMetricValue + val upperBound = anom.anomalyThreshold.upperBound.value + val lowerBound = anom.anomalyThreshold.lowerBound.value + assert(value < lowerBound || value > upperBound) } } diff --git a/src/test/scala/com/amazon/deequ/anomalydetection/OnlineNormalStrategyTest.scala b/src/test/scala/com/amazon/deequ/anomalydetection/OnlineNormalStrategyTest.scala index 781ffb7ad..d35439585 100644 --- a/src/test/scala/com/amazon/deequ/anomalydetection/OnlineNormalStrategyTest.scala +++ b/src/test/scala/com/amazon/deequ/anomalydetection/OnlineNormalStrategyTest.scala @@ -42,57 +42,111 @@ class OnlineNormalStrategyTest extends WordSpec with Matchers { "detect all anomalies if no interval specified" in { val strategy = OnlineNormalStrategy(lowerDeviationFactor = Some(3.5), upperDeviationFactor = Some(3.5), ignoreStartPercentage = 0.2) - val anomalyResult = strategy.detect(data) - val expected = for (i <- 20 to 30) yield { - (i, Anomaly(Option(data(i)), 1.0)) - } - assert(anomalyResult == expected) + val anomalyResult = strategy.detect(data).filter({case (_, anom) => anom.isAnomaly}) + + val expectedResult: Seq[(Int, AnomalyDetectionDataPoint)] = Seq( + (20, AnomalyDetectionDataPoint(data(20), data(20), + AnomalyThreshold(Bound(-14.868489924421404), Bound(14.255383455388895)), isAnomaly = true, 1.0)), + (21, AnomalyDetectionDataPoint(data(21), data(21), + AnomalyThreshold(Bound(-13.6338479733374), Bound(13.02074150430489)), isAnomaly = true, 1.0)), + (22, AnomalyDetectionDataPoint(data(22), data(22), + AnomalyThreshold(Bound(-16.71733585267535), Bound(16.104229383642842)), isAnomaly = true, 1.0)), + (23, AnomalyDetectionDataPoint(data(23), data(23), + AnomalyThreshold(Bound(-17.346915620547467), Bound(16.733809151514958)), isAnomaly = true, 1.0)), + (24, AnomalyDetectionDataPoint(data(24), data(24), + AnomalyThreshold(Bound(-17.496117397890874), Bound(16.883010928858365)), isAnomaly = true, 1.0)), + (25, AnomalyDetectionDataPoint(data(25), data(25), + AnomalyThreshold(Bound(-17.90391150851199), Bound(17.29080503947948)), isAnomaly = true, 1.0)), + (26, AnomalyDetectionDataPoint(data(26), data(26), + AnomalyThreshold(Bound(-17.028892797350824), Bound(16.415786328318315)), isAnomaly = true, 1.0)), + (27, AnomalyDetectionDataPoint(data(27), data(27), + AnomalyThreshold(Bound(-17.720100310354653), Bound(17.106993841322144)), isAnomaly = true, 1.0)), + (28, AnomalyDetectionDataPoint(data(28), data(28), + AnomalyThreshold(Bound(-18.23663168508628), Bound(17.62352521605377)), isAnomaly = true, 1.0)), + (29, AnomalyDetectionDataPoint(data(29), data(29), + AnomalyThreshold(Bound(-19.32641622778204), Bound(18.71330975874953)), isAnomaly = true, 1.0)), + (30, AnomalyDetectionDataPoint(data(30), data(30), + AnomalyThreshold(Bound(-18.96540323993527), Bound(18.35229677090276)), isAnomaly = true, 1.0)) + ) + assert(anomalyResult == expectedResult) } "only detect anomalies in interval" in { - val anomalyResult = strategy.detect(data, (25, 31)) - val expected = for (i <- 25 to 30) yield { - (i, Anomaly(Option(data(i)), 1.0)) - } - assert(anomalyResult == expected) + val anomalyResult = strategy.detect(data, (25, 31)).filter({case (_, anom) => anom.isAnomaly}) + + val expectedResult: Seq[(Int, AnomalyDetectionDataPoint)] = Seq( + (25, AnomalyDetectionDataPoint(data(25), data(25), + AnomalyThreshold(Bound(-15.630116599125694), Bound(16.989221350098695)), isAnomaly = true, 1.0)), + (26, AnomalyDetectionDataPoint(data(26), data(26), + AnomalyThreshold(Bound(-14.963376676338362), Bound(16.322481427311363)), isAnomaly = true, 1.0)), + (27, AnomalyDetectionDataPoint(data(27), data(27), + AnomalyThreshold(Bound(-15.131834814393196), Bound(16.490939565366197)), isAnomaly = true, 1.0)), + (28, AnomalyDetectionDataPoint(data(28), data(28), + AnomalyThreshold(Bound(-14.76810451038132), Bound(16.12720926135432)), isAnomaly = true, 1.0)), + (29, AnomalyDetectionDataPoint(data(29), data(29), + AnomalyThreshold(Bound(-15.078145049879462), Bound(16.437249800852463)), isAnomaly = true, 1.0)), + (30, AnomalyDetectionDataPoint(data(30), data(30), + AnomalyThreshold(Bound(-14.540171084298914), Bound(15.899275835271913)), isAnomaly = true, 1.0)) + ) + assert(anomalyResult == expectedResult) } "ignore lower factor if none is given" in { val strategy = OnlineNormalStrategy(lowerDeviationFactor = None, upperDeviationFactor = Some(1.5)) - val anomalyResult = strategy.detect(data) + val anomalyResult = strategy.detect(data).filter({case (_, anom) => anom.isAnomaly}) // Anomalies with positive values only - val expected = for (i <- 20 to 30 by 2) yield { - (i, Anomaly(Option(data(i)), 1.0)) - } - assert(anomalyResult == expected) + val expectedResult: Seq[(Int, AnomalyDetectionDataPoint)] = Seq( + (20, AnomalyDetectionDataPoint(data(20), data(20), + AnomalyThreshold(Bound(Double.NegativeInfinity), Bound(5.934276775443095)), isAnomaly = true, 1.0)), + (22, AnomalyDetectionDataPoint(data(22), data(22), + AnomalyThreshold(Bound(Double.NegativeInfinity), Bound(7.979098353666404)), isAnomaly = true, 1.0)), + (24, AnomalyDetectionDataPoint(data(24), data(24), + AnomalyThreshold(Bound(Double.NegativeInfinity), Bound(9.582136909647211)), isAnomaly = true, 1.0)), + (26, AnomalyDetectionDataPoint(data(26), data(26), + AnomalyThreshold(Bound(Double.NegativeInfinity), Bound(10.320400087389258)), isAnomaly = true, 1.0)), + (28, AnomalyDetectionDataPoint(data(28), data(28), + AnomalyThreshold(Bound(Double.NegativeInfinity), Bound(11.113502213504855)), isAnomaly = true, 1.0)), + (30, AnomalyDetectionDataPoint(data(30), data(30), + AnomalyThreshold(Bound(Double.NegativeInfinity), Bound(11.776810456746686)), isAnomaly = true, 1.0)) + ) + assert(anomalyResult == expectedResult) } "ignore upper factor if none is given" in { val strategy = OnlineNormalStrategy(lowerDeviationFactor = Some(1.5), upperDeviationFactor = None) - val anomalyResult = strategy.detect(data) + val anomalyResult = strategy.detect(data).filter({case (_, anom) => anom.isAnomaly}) // Anomalies with negative values only - val expected = for (i <- 21 to 29 by 2) yield { - (i, Anomaly(Option(data(i)), 1.0)) - } - assert(anomalyResult == expected) + val expectedResult: Seq[(Int, AnomalyDetectionDataPoint)] = Seq( + (21, AnomalyDetectionDataPoint(data(21), data(21), + AnomalyThreshold(Bound(-7.855820681098751), Bound(Double.PositiveInfinity)), isAnomaly = true, 1.0)), + (23, AnomalyDetectionDataPoint(data(23), data(23), + AnomalyThreshold(Bound(-10.14631437278386), Bound(Double.PositiveInfinity)), isAnomaly = true, 1.0)), + (25, AnomalyDetectionDataPoint(data(25), data(25), + AnomalyThreshold(Bound(-11.038751996286909), Bound(Double.PositiveInfinity)), isAnomaly = true, 1.0)), + (27, AnomalyDetectionDataPoint(data(27), data(27), + AnomalyThreshold(Bound(-11.359107787232386), Bound(Double.PositiveInfinity)), isAnomaly = true, 1.0)), + (29, AnomalyDetectionDataPoint(data(29), data(29), + AnomalyThreshold(Bound(-12.097995027317015), Bound(Double.PositiveInfinity)), isAnomaly = true, 1.0)) + ) + assert(anomalyResult == expectedResult) } "work fine with empty input" in { val emptySeries = Vector[Double]() - val anomalyResult = strategy.detect(emptySeries) + val anomalyResult = strategy.detect(emptySeries).filter({case (_, anom) => anom.isAnomaly}) - assert(anomalyResult == Seq[(Int, Anomaly)]()) + assert(anomalyResult == Seq[(Int, AnomalyDetectionDataPoint)]()) } "detect no anomalies if factors are set to max value" in { val strategy = OnlineNormalStrategy(Some(Double.MaxValue), Some(Double.MaxValue)) - val anomalyResult = strategy.detect(data) + val anomalyResult = strategy.detect(data).filter({case (_, anom) => anom.isAnomaly}) - val expected: List[(Int, Anomaly)] = List() + val expected: List[(Int, AnomalyDetectionDataPoint)] = List() assert(anomalyResult == expected) } @@ -157,13 +211,25 @@ class OnlineNormalStrategyTest extends WordSpec with Matchers { } "produce error message with correct value and bounds" in { - val result = strategy.detect(data) + val result = strategy.detect(data).filter({case (_, anom) => anom.isAnomaly}) result.foreach { case (_, anom) => val (value, lowerBound, upperBound) = AnomalyDetectionTestUtils.firstThreeDoublesFromString(anom.detail.get) - assert(anom.value.isDefined && value === anom.value.get) + assert(value === anom.anomalyMetricValue) + assert(value < lowerBound || value > upperBound) + } + } + + "assert anomalies are outside of anomaly bounds" in { + val result = strategy.detect(data).filter({ case (_, anom) => anom.isAnomaly }) + + result.foreach { case (_, anom) => + val value = anom.anomalyMetricValue + val upperBound = anom.anomalyThreshold.upperBound.value + val lowerBound = anom.anomalyThreshold.lowerBound.value + assert(value < lowerBound || value > upperBound) } } diff --git a/src/test/scala/com/amazon/deequ/anomalydetection/RateOfChangeStrategyTest.scala b/src/test/scala/com/amazon/deequ/anomalydetection/RateOfChangeStrategyTest.scala index 70f66f033..ddcaf013e 100644 --- a/src/test/scala/com/amazon/deequ/anomalydetection/RateOfChangeStrategyTest.scala +++ b/src/test/scala/com/amazon/deequ/anomalydetection/RateOfChangeStrategyTest.scala @@ -36,11 +36,24 @@ class RateOfChangeStrategyTest extends WordSpec with Matchers { }).toVector "detect all anomalies if no interval specified" in { - val anomalyResult = strategy.detect(data) - val expected = for (i <- 20 to 31) yield { - (i, Anomaly(Option(data(i)), 1.0)) - } - assert(anomalyResult == expected) + val anomalyResult = strategy.detect(data).filter({case (_, anom) => anom.isAnomaly}) + + val expectedAnomalyThreshold = AnomalyThreshold(Bound(-2.0), Bound(2.0)) + val expectedResult: Seq[(Int, AnomalyDetectionDataPoint)] = Seq( + (20, AnomalyDetectionDataPoint(20, 19, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (21, AnomalyDetectionDataPoint(-21, -41, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (22, AnomalyDetectionDataPoint(22, 43, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (23, AnomalyDetectionDataPoint(-23, -45, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (24, AnomalyDetectionDataPoint(24, 47, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (25, AnomalyDetectionDataPoint(-25, -49, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (26, AnomalyDetectionDataPoint(26, 51, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (27, AnomalyDetectionDataPoint(-27, -53, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (28, AnomalyDetectionDataPoint(28, 55, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (29, AnomalyDetectionDataPoint(-29, -57, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (30, AnomalyDetectionDataPoint(30, 59, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (31, AnomalyDetectionDataPoint(1, -29, expectedAnomalyThreshold, isAnomaly = true, 1.0)) + ) + assert(anomalyResult == expectedResult) } } } diff --git a/src/test/scala/com/amazon/deequ/anomalydetection/RelativeRateOfChangeStrategyTest.scala b/src/test/scala/com/amazon/deequ/anomalydetection/RelativeRateOfChangeStrategyTest.scala index bfde6ba18..10cf917ba 100644 --- a/src/test/scala/com/amazon/deequ/anomalydetection/RelativeRateOfChangeStrategyTest.scala +++ b/src/test/scala/com/amazon/deequ/anomalydetection/RelativeRateOfChangeStrategyTest.scala @@ -33,48 +33,81 @@ class RelativeRateOfChangeStrategyTest extends WordSpec with Matchers { }).toVector "detect all anomalies if no interval specified" in { - val anomalyResult = strategy.detect(data) - val expected = for (i <- 20 to 31) yield { - (i, Anomaly(Option(data(i)), 1.0)) - } - assert(anomalyResult == expected) + val anomalyResult = strategy.detect(data).filter({case (_, anom) => anom.isAnomaly}) + + val expectedAnomalyThreshold = AnomalyThreshold(Bound(0.5), Bound(2.0)) + val expectedResult: Seq[(Int, AnomalyDetectionDataPoint)] = Seq( + (20, AnomalyDetectionDataPoint(20, 20, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (21, AnomalyDetectionDataPoint(1, 0.05, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (22, AnomalyDetectionDataPoint(22, 22, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (23, AnomalyDetectionDataPoint(1, 0.045454545454545456, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (24, AnomalyDetectionDataPoint(24, 24, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (25, AnomalyDetectionDataPoint(1, 0.041666666666666664, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (26, AnomalyDetectionDataPoint(26, 26, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (27, AnomalyDetectionDataPoint(1, 0.038461538461538464, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (28, AnomalyDetectionDataPoint(28, 28, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (29, AnomalyDetectionDataPoint(1, 0.03571428571428571, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (30, AnomalyDetectionDataPoint(30, 30, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (31, AnomalyDetectionDataPoint(1, 0.03333333333333333, expectedAnomalyThreshold, isAnomaly = true, 1.0)) + ) + assert(anomalyResult == expectedResult) } "only detect anomalies in interval" in { - val anomalyResult = strategy.detect(data, (25, 50)) - val expected = for (i <- 25 to 31) yield { - (i, Anomaly(Option(data(i)), 1.0)) - } - assert(anomalyResult == expected) + val anomalyResult = strategy.detect(data, (25, 50)).filter({case (_, anom) => anom.isAnomaly}) + + val expectedAnomalyThreshold = AnomalyThreshold(Bound(0.5), Bound(2.0)) + val expectedResult: Seq[(Int, AnomalyDetectionDataPoint)] = Seq( + (25, AnomalyDetectionDataPoint(1, 0.041666666666666664, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (26, AnomalyDetectionDataPoint(26, 26, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (27, AnomalyDetectionDataPoint(1, 0.038461538461538464, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (28, AnomalyDetectionDataPoint(28, 28, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (29, AnomalyDetectionDataPoint(1, 0.03571428571428571, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (30, AnomalyDetectionDataPoint(30, 30, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (31, AnomalyDetectionDataPoint(1, 0.03333333333333333, expectedAnomalyThreshold, isAnomaly = true, 1.0)) + ) + assert(anomalyResult == expectedResult) } "ignore min rate if none is given" in { val strategy = RelativeRateOfChangeStrategy(None, Some(1.0)) - val anomalyResult = strategy.detect(data) + val anomalyResult = strategy.detect(data).filter({case (_, anom) => anom.isAnomaly}) // Anomalies with positive values only - val expected = for (i <- 20 to 30 by 2) yield { - (i, Anomaly(Option(data(i)), 1.0)) - } - assert(anomalyResult == expected) + val expectedAnomalyThreshold = AnomalyThreshold(Bound(-1.7976931348623157E308), Bound(1.0)) + val expectedResult: Seq[(Int, AnomalyDetectionDataPoint)] = Seq( + (20, AnomalyDetectionDataPoint(20, 20, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (22, AnomalyDetectionDataPoint(22, 22, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (24, AnomalyDetectionDataPoint(24, 24, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (26, AnomalyDetectionDataPoint(26, 26, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (28, AnomalyDetectionDataPoint(28, 28, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (30, AnomalyDetectionDataPoint(30, 30, expectedAnomalyThreshold, isAnomaly = true, 1.0)) + ) + assert(anomalyResult == expectedResult) } "ignore max rate if none is given" in { val strategy = RelativeRateOfChangeStrategy(Some(0.5), None) - val anomalyResult = strategy.detect(data) + val anomalyResult = strategy.detect(data).filter({case (_, anom) => anom.isAnomaly}) // Anomalies with negative values only - val expected = for (i <- 21 to 31 by 2) yield { - (i, Anomaly(Option(data(i)), 1.0)) - } - assert(anomalyResult == expected) + val expectedAnomalyThreshold = AnomalyThreshold(Bound(0.5), Bound(1.7976931348623157E308)) + val expectedResult: Seq[(Int, AnomalyDetectionDataPoint)] = Seq( + (21, AnomalyDetectionDataPoint(1, 0.05, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (23, AnomalyDetectionDataPoint(1, 0.045454545454545456, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (25, AnomalyDetectionDataPoint(1, 0.041666666666666664, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (27, AnomalyDetectionDataPoint(1, 0.038461538461538464, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (29, AnomalyDetectionDataPoint(1, 0.03571428571428571, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (31, AnomalyDetectionDataPoint(1, 0.03333333333333333, expectedAnomalyThreshold, isAnomaly = true, 1.0)) + ) + assert(anomalyResult == expectedResult) } "detect no anomalies if rates are set to min/ max value" in { val strategy = RelativeRateOfChangeStrategy(Some(Double.MinValue), Some(Double.MaxValue)) - val anomalyResult = strategy.detect(data) + val anomalyResult = strategy.detect(data).filter({case (_, anom) => anom.isAnomaly}) - val expected: List[(Int, Anomaly)] = List() + val expected: List[(Int, AnomalyDetectionDataPoint)] = List() assert(anomalyResult == expected) } @@ -104,19 +137,27 @@ class RelativeRateOfChangeStrategyTest extends WordSpec with Matchers { "attribute indices correctly for higher orders without search interval" in { val data = Vector(0.0, 1.0, 3.0, 6.0, 18.0, 72.0) val strategy = RelativeRateOfChangeStrategy(None, Some(8.0), order = 2) - val result = strategy.detect(data) - - val expected = Seq((2, Anomaly(Option(3.0), 1.0)), (5, Anomaly(Option(72.0), 1.0))) - assert(result == expected) + val anomalyResult = strategy.detect(data).filter({case (_, anom) => anom.isAnomaly}) + + val expectedResult: Seq[(Int, AnomalyDetectionDataPoint)] = Seq( + (2, AnomalyDetectionDataPoint(3, Double.PositiveInfinity, + AnomalyThreshold(Bound(-1.7976931348623157E308), Bound(8.0)), isAnomaly = true, 1.0)), + (5, AnomalyDetectionDataPoint(72, 12, + AnomalyThreshold(Bound(-1.7976931348623157E308), Bound(8.0)), isAnomaly = true, 1.0)) + ) + assert(anomalyResult == expectedResult) } "attribute indices correctly for higher orders with search interval" in { val data = Vector(0.0, 1.0, 3.0, 6.0, 18.0, 72.0) val strategy = RelativeRateOfChangeStrategy(None, Some(8.0), order = 2) - val result = strategy.detect(data, (5, 6)) + val anomalyResult = strategy.detect(data, (5, 6)).filter({case (_, anom) => anom.isAnomaly}) - val expected = Seq((5, Anomaly(Option(72.0), 1.0))) - assert(result == expected) + val expectedResult: Seq[(Int, AnomalyDetectionDataPoint)] = Seq( + (5, AnomalyDetectionDataPoint(72, 12, + AnomalyThreshold(Bound(-1.7976931348623157E308), Bound(8.0)), isAnomaly = true, 1.0)) + ) + assert(anomalyResult == expectedResult) } @@ -134,19 +175,32 @@ class RelativeRateOfChangeStrategyTest extends WordSpec with Matchers { "work fine with empty input" in { val emptySeries = Vector[Double]() - val anomalyResult = strategy.detect(emptySeries) + val anomalyResult = strategy.detect(emptySeries).filter({case (_, anom) => anom.isAnomaly}) - assert(anomalyResult == Seq[(Int, Anomaly)]()) + assert(anomalyResult == Seq[(Int, AnomalyDetectionDataPoint)]()) } "produce error message with correct value and bounds" in { - val result = strategy.detect(data) + val result = strategy.detect(data).filter({case (_, anom) => anom.isAnomaly}) result.foreach { case (_, anom) => val (value, lowerBound, upperBound) = AnomalyDetectionTestUtils.firstThreeDoublesFromString(anom.detail.get) - assert(value -1 < lowerBound || value -1 > upperBound) + assert(value === anom.anomalyMetricValue) + assert(value < lowerBound || value > upperBound) + } + } + + "assert anomalies are outside of anomaly bounds" in { + val result = strategy.detect(data).filter({ case (_, anom) => anom.isAnomaly }) + + result.foreach { case (_, anom) => + val value = anom.anomalyMetricValue + val upperBound = anom.anomalyThreshold.upperBound.value + val lowerBound = anom.anomalyThreshold.lowerBound.value + + assert(value < lowerBound || value > upperBound) } } } diff --git a/src/test/scala/com/amazon/deequ/anomalydetection/SimpleThresholdStrategyTest.scala b/src/test/scala/com/amazon/deequ/anomalydetection/SimpleThresholdStrategyTest.scala index 92ead9e48..d05c0057d 100644 --- a/src/test/scala/com/amazon/deequ/anomalydetection/SimpleThresholdStrategyTest.scala +++ b/src/test/scala/com/amazon/deequ/anomalydetection/SimpleThresholdStrategyTest.scala @@ -24,33 +24,40 @@ class SimpleThresholdStrategyTest extends WordSpec with Matchers { val strategy = SimpleThresholdStrategy(upperBound = 1.0) val data = Vector(-1.0, 2.0, 3.0, 0.5) - val expected = Seq((1, Anomaly(Option(2.0), 1.0)), (2, Anomaly(Option(3.0), 1.0))) + val expectedAnomalyThreshold = AnomalyThreshold(upperBound = Bound(1.0)) + val expectedResult = Seq( + (1, AnomalyDetectionDataPoint(2.0, 2.0, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (2, AnomalyDetectionDataPoint(3.0, 3.0, expectedAnomalyThreshold, isAnomaly = true, 1.0))) "detect values above threshold" in { - val anomalyResult = strategy.detect(data, (0, 4)) + val anomalyResult = strategy.detect(data, (0, 4)).filter({case (_, anom) => anom.isAnomaly}) - assert(anomalyResult == expected) + assert(anomalyResult == expectedResult) } "detect all values without range specified" in { - val anomalyResult = strategy.detect(data) + val anomalyResult = strategy.detect(data).filter({case (_, anom) => anom.isAnomaly}) - assert(anomalyResult == expected) + assert(anomalyResult == expectedResult) } "work fine with empty input" in { val emptySeries = Vector[Double]() - val anomalyResult = strategy.detect(emptySeries) + val anomalyResult = strategy.detect(emptySeries).filter({case (_, anom) => anom.isAnomaly}) - assert(anomalyResult == Seq[(Int, Anomaly)]()) + assert(anomalyResult == Seq[(Int, AnomalyDetectionDataPoint)]()) } "work with upper and lower threshold" in { val tS = SimpleThresholdStrategy(lowerBound = -0.5, upperBound = 1.0) - val anomalyResult = tS.detect(data) - - assert(anomalyResult == Seq((0, Anomaly(Option(-1.0), 1.0)), - (1, Anomaly(Option(2.0), 1.0)), (2, Anomaly(Option(3.0), 1.0)))) + val anomalyResult = tS.detect(data).filter({case (_, anom) => anom.isAnomaly}) + val expectedAnomalyThreshold = AnomalyThreshold(Bound(-0.5), Bound(1.0)) + val expectedResult = Seq( + (0, AnomalyDetectionDataPoint(-1.0, -1.0, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (1, AnomalyDetectionDataPoint(2.0, 2.0, expectedAnomalyThreshold, isAnomaly = true, 1.0)), + (2, AnomalyDetectionDataPoint(3.0, 3.0, expectedAnomalyThreshold, isAnomaly = true, 1.0))) + + assert(anomalyResult == expectedResult) } "throw an error when thresholds are not ordered " in { @@ -60,13 +67,25 @@ class SimpleThresholdStrategyTest extends WordSpec with Matchers { } "produce error message with correct value and bounds" in { - val result = strategy.detect(data) + val result = strategy.detect(data).filter({case (_, anom) => anom.isAnomaly}) result.foreach { case (_, anom) => val (value, lowerBound, upperBound) = AnomalyDetectionTestUtils.firstThreeDoublesFromString(anom.detail.get) - assert(anom.value.isDefined && value === anom.value.get) + assert(value === anom.anomalyMetricValue) + assert(value < lowerBound || value > upperBound) + } + } + + "assert anomalies are outside of anomaly bounds" in { + val result = strategy.detect(data).filter({ case (_, anom) => anom.isAnomaly }) + + result.foreach { case (_, anom) => + val value = anom.anomalyMetricValue + val upperBound = anom.anomalyThreshold.upperBound.value + val lowerBound = anom.anomalyThreshold.lowerBound.value + assert(value < lowerBound || value > upperBound) } } diff --git a/src/test/scala/com/amazon/deequ/anomalydetection/seasonal/HoltWintersTest.scala b/src/test/scala/com/amazon/deequ/anomalydetection/seasonal/HoltWintersTest.scala index decf5a91c..fd6cab62f 100644 --- a/src/test/scala/com/amazon/deequ/anomalydetection/seasonal/HoltWintersTest.scala +++ b/src/test/scala/com/amazon/deequ/anomalydetection/seasonal/HoltWintersTest.scala @@ -16,7 +16,7 @@ package com.amazon.deequ.anomalydetection.seasonal -import com.amazon.deequ.anomalydetection.Anomaly +import com.amazon.deequ.anomalydetection.AnomalyDetectionDataPoint import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec @@ -76,6 +76,7 @@ class HoltWintersTest extends AnyWordSpec with Matchers { "predict no anomaly for normally distributed errors" in { val seriesWithOutlier = twoWeeksOfData ++ Vector(twoWeeksOfData.head) val anomalies = dailyMetricsWithWeeklySeasonalityAnomalies(seriesWithOutlier, 14 -> 15) + .filter({case (_, anom) => anom.isAnomaly}) anomalies shouldBe empty } @@ -92,19 +93,21 @@ class HoltWintersTest extends AnyWordSpec with Matchers { "predict no anomalies on longer series" in { val seriesWithOutlier = twoWeeksOfData ++ twoWeeksOfData val anomalies = dailyMetricsWithWeeklySeasonalityAnomalies( - seriesWithOutlier, 26 -> Int.MaxValue) + seriesWithOutlier, 26 -> Int.MaxValue).filter({case (_, anom) => anom.isAnomaly}) anomalies shouldBe empty } "detect no anomalies on constant series" in { val series = (0 until 21).map(_ => 1.0).toVector val anomalies = dailyMetricsWithWeeklySeasonalityAnomalies(series, 14 -> Int.MaxValue) + .filter({case (_, anom) => anom.isAnomaly}) anomalies shouldBe empty } "detect a single anomaly in constant series with a single error" in { val series = ((0 until 20).map(_ => 1.0) ++ Seq(0.0)).toVector val anomalies = dailyMetricsWithWeeklySeasonalityAnomalies(series, 14 -> Int.MaxValue) + .filter({case (_, anom) => anom.isAnomaly}) anomalies should have size 1 val (detectionIndex, _) = anomalies.head @@ -114,6 +117,7 @@ class HoltWintersTest extends AnyWordSpec with Matchers { "detect no anomalies on exact linear trend series" in { val series = (0 until 48).map(_.toDouble).toVector val anomalies = dailyMetricsWithWeeklySeasonalityAnomalies(series, 36 -> Int.MaxValue) + .filter({case (_, anom) => anom.isAnomaly}) anomalies shouldBe empty } @@ -123,6 +127,7 @@ class HoltWintersTest extends AnyWordSpec with Matchers { .zipWithIndex.map { case (s, level) => s + level }.toVector val anomalies = dailyMetricsWithWeeklySeasonalityAnomalies(series, 36 -> Int.MaxValue) + .filter({case (_, anom) => anom.isAnomaly}) anomalies shouldBe empty } @@ -132,6 +137,7 @@ class HoltWintersTest extends AnyWordSpec with Matchers { val series = train ++ test val anomalies = dailyMetricsWithWeeklySeasonalityAnomalies(series, 14 -> 21) + .filter({case (_, anom) => anom.isAnomaly}) anomalies should have size 1 val (detectionIndex, _) = anomalies.head @@ -170,7 +176,7 @@ class HoltWintersTest extends AnyWordSpec with Matchers { val anomalies = strategy.detect( monthlyMilkProduction.take(nTotal), trainSize -> nTotal - ) + ).filter({case (_, anom) => anom.isAnomaly}) anomalies should have size 7 } @@ -202,7 +208,7 @@ class HoltWintersTest extends AnyWordSpec with Matchers { val anomalies = strategy.detect( monthlyCarSalesQuebec.take(nTotal), trainSize -> nTotal - ) + ).filter({case (_, anom) => anom.isAnomaly}) anomalies should have size 3 } @@ -213,7 +219,7 @@ object HoltWintersTest { def dailyMetricsWithWeeklySeasonalityAnomalies( series: Vector[Double], - interval: (Int, Int)): Seq[(Int, Anomaly)] = { + interval: (Int, Int)): Seq[(Int, AnomalyDetectionDataPoint)] = { val strategy = new HoltWinters( HoltWinters.MetricInterval.Daily, diff --git a/src/test/scala/com/amazon/deequ/checks/ApplicabilityTest.scala b/src/test/scala/com/amazon/deequ/checks/ApplicabilityTest.scala index 542f40fcf..25ce8bb6b 100644 --- a/src/test/scala/com/amazon/deequ/checks/ApplicabilityTest.scala +++ b/src/test/scala/com/amazon/deequ/checks/ApplicabilityTest.scala @@ -18,11 +18,14 @@ package com.amazon.deequ package checks import com.amazon.deequ.analyzers.applicability.Applicability -import com.amazon.deequ.analyzers.{Completeness, Compliance, Maximum, Minimum} +import com.amazon.deequ.analyzers.{Completeness, Compliance, Maximum, Minimum, Size} +import com.amazon.deequ.anomalydetection.AnomalyDetectionStrategy +import com.amazon.deequ.repository.MetricsRepository import org.apache.spark.sql.types._ +import org.scalamock.scalatest.MockFactory import org.scalatest.wordspec.AnyWordSpec -class ApplicabilityTest extends AnyWordSpec with SparkContextSpec { +class ApplicabilityTest extends AnyWordSpec with SparkContextSpec with MockFactory { private[this] val schema = StructType(Array( StructField("stringCol", StringType, nullable = true), @@ -48,7 +51,7 @@ class ApplicabilityTest extends AnyWordSpec with SparkContextSpec { "Applicability tests for checks" should { - "recognize applicable checks as applicable" in withSparkSession { session => + "recognize applicable analysis based checks as applicable" in withSparkSession { session => val applicability = new Applicability(session) @@ -66,6 +69,25 @@ class ApplicabilityTest extends AnyWordSpec with SparkContextSpec { } } + "recognize applicable anomaly based checks as applicable" in withSparkSession { session => + + val applicability = new Applicability(session) + val fakeAnomalyDetector = mock[AnomalyDetectionStrategy] + val repository = mock[MetricsRepository] + val validCheck = Check(CheckLevel.Error, "anomaly test") + .isNewestPointNonAnomalous(repository, fakeAnomalyDetector, Size(), Map.empty, + None, None) + + val resultForValidCheck = applicability.isApplicable(validCheck, schema) + + assert(resultForValidCheck.isApplicable) + assert(resultForValidCheck.failures.isEmpty) + assert(resultForValidCheck.constraintApplicabilities.size == validCheck.constraints.size) + resultForValidCheck.constraintApplicabilities.foreach { case (_, applicable) => + assert(applicable) + } + } + "detect checks with non existing columns" in withSparkSession { session => val applicability = new Applicability(session) diff --git a/src/test/scala/com/amazon/deequ/checks/CheckTest.scala b/src/test/scala/com/amazon/deequ/checks/CheckTest.scala index 0a46b9e25..8e8fefa9d 100644 --- a/src/test/scala/com/amazon/deequ/checks/CheckTest.scala +++ b/src/test/scala/com/amazon/deequ/checks/CheckTest.scala @@ -19,7 +19,8 @@ package checks import com.amazon.deequ.analyzers._ import com.amazon.deequ.analyzers.runners.{AnalysisRunner, AnalyzerContext} -import com.amazon.deequ.anomalydetection.{Anomaly, AnomalyDetectionStrategy} +import com.amazon.deequ.anomalydetection.{AnomalyDetectionAssertionResult, AnomalyDetectionDataPoint, AnomalyDetectionResult, AnomalyDetectionStrategy} +import com.amazon.deequ.checks.Check.getNewestPointAnomalyResults import com.amazon.deequ.constraints.{ConstrainableDataTypes, ConstraintStatus} import com.amazon.deequ.metrics.{DoubleMetric, Entity} import com.amazon.deequ.repository.memory.InMemoryMetricsRepository @@ -87,7 +88,7 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix assertEvaluatesTo(check1, context, CheckStatus.Success) assertEvaluatesTo(check2, context, CheckStatus.Error) assertEvaluatesTo(check3, context, CheckStatus.Warning) - } + } "return the correct check status for combined completeness with . in column name" in withSparkSession { sparkSession => @@ -962,18 +963,39 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix // Size results (fakeAnomalyDetector.detect _) .expects(Vector(1.0, 2.0, 3.0, 4.0, 11.0), (4, 5)) - .returns(Seq()) + .returns(Seq( + (0, AnomalyDetectionDataPoint(1.0, 1.0, confidence = 1.0)), + (1, AnomalyDetectionDataPoint(2.0, 2.0, confidence = 1.0)), + (2, AnomalyDetectionDataPoint(3.0, 3.0, confidence = 1.0)), + (3, AnomalyDetectionDataPoint(4.0, 4.0, confidence = 1.0)), + (4, AnomalyDetectionDataPoint(11.0, 11.0, confidence = 1.0)))) .once() (fakeAnomalyDetector.detect _).expects(Vector(1.0, 2.0, 3.0, 4.0, 4.0), (4, 5)) - .returns(Seq((4, Anomaly(Option(4.0), 1.0)))) + .returns(Seq( + (0, AnomalyDetectionDataPoint(1.0, 1.0, confidence = 1.0)), + (1, AnomalyDetectionDataPoint(2.0, 2.0, confidence = 1.0)), + (2, AnomalyDetectionDataPoint(3.0, 3.0, confidence = 1.0)), + (3, AnomalyDetectionDataPoint(4.0, 4.0, confidence = 1.0)), + (4, AnomalyDetectionDataPoint(4.0, 4.0, isAnomaly = true, confidence = 1.0)))) .once() // Distinctness results (fakeAnomalyDetector.detect _) .expects(Vector(1.0, 2.0, 3.0, 4.0, 1), (4, 5)) - .returns(Seq()) + .returns(Seq( + (0, AnomalyDetectionDataPoint(1.0, 1.0, confidence = 1.0)), + (1, AnomalyDetectionDataPoint(2.0, 2.0, confidence = 1.0)), + (2, AnomalyDetectionDataPoint(3.0, 3.0, confidence = 1.0)), + (3, AnomalyDetectionDataPoint(4.0, 4.0, confidence = 1.0)), + (4, AnomalyDetectionDataPoint(1.0, 1.0, confidence = 1.0)))) .once() - (fakeAnomalyDetector.detect _).expects(Vector(1.0, 2.0, 3.0, 4.0, 1), (4, 5)) - .returns(Seq((4, Anomaly(Option(4.0), 0)))) + (fakeAnomalyDetector.detect _) + .expects(Vector(1.0, 2.0, 3.0, 4.0, 1), (4, 5)) + .returns(Seq( + (0, AnomalyDetectionDataPoint(1.0, 1.0, confidence = 1.0)), + (1, AnomalyDetectionDataPoint(2.0, 2.0, confidence = 1.0)), + (2, AnomalyDetectionDataPoint(3.0, 3.0, confidence = 1.0)), + (3, AnomalyDetectionDataPoint(4.0, 4.0, confidence = 1.0)), + (4, AnomalyDetectionDataPoint(1.0, 1.0, isAnomaly = true, confidence = 1.0)))) .once() } @@ -1013,10 +1035,16 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix // Size results (fakeAnomalyDetector.detect _) .expects(Vector(1.0, 2.0, 11.0), (2, 3)) - .returns(Seq()) + .returns(Seq( + (0, AnomalyDetectionDataPoint(1.0, 1.0, confidence = 1.0)), + (1, AnomalyDetectionDataPoint(2.0, 2.0, confidence = 1.0)), + (2, AnomalyDetectionDataPoint(11.0, 11.0, confidence = 1.0)))) .once() (fakeAnomalyDetector.detect _).expects(Vector(1.0, 2.0, 4.0), (2, 3)) - .returns(Seq((4, Anomaly(Option(4.0), 1.0)))) + .returns(Seq( + (0, AnomalyDetectionDataPoint(1.0, 1.0, confidence = 1.0)), + (1, AnomalyDetectionDataPoint(2.0, 2.0, confidence = 1.0)), + (2, AnomalyDetectionDataPoint(4.0, 4.0, isAnomaly = true, confidence = 1.0)))) .once() } @@ -1047,10 +1075,16 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix // Size results (fakeAnomalyDetector.detect _) .expects(Vector(3.0, 4.0, 11.0), (2, 3)) - .returns(Seq()) + .returns(Seq( + (0, AnomalyDetectionDataPoint(3.0, 3.0, confidence = 1.0)), + (1, AnomalyDetectionDataPoint(4.0, 4.0, confidence = 1.0)), + (2, AnomalyDetectionDataPoint(11.0, 11.0, confidence = 1.0)))) .once() (fakeAnomalyDetector.detect _).expects(Vector(3.0, 4.0, 4.0), (2, 3)) - .returns(Seq((4, Anomaly(Option(4.0), 1.0)))) + .returns(Seq( + (0, AnomalyDetectionDataPoint(3.0, 3.0, confidence = 1.0)), + (1, AnomalyDetectionDataPoint(4.0, 4.0, confidence = 1.0)), + (2, AnomalyDetectionDataPoint(4.0, 4.0, isAnomaly = true, confidence = 1.0)))) .once() } @@ -1081,10 +1115,16 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix // Size results (fakeAnomalyDetector.detect _) .expects(Vector(1.0, 2.0, 11.0), (2, 3)) - .returns(Seq()) + .returns(Seq( + (0, AnomalyDetectionDataPoint(1.0, 1.0, confidence = 1.0)), + (1, AnomalyDetectionDataPoint(2.0, 2.0, confidence = 1.0)), + (2, AnomalyDetectionDataPoint(11.0, 11.0, confidence = 1.0)))) .once() (fakeAnomalyDetector.detect _).expects(Vector(1.0, 2.0, 4.0), (2, 3)) - .returns(Seq((4, Anomaly(Option(4.0), 1.0)))) + .returns(Seq( + (0, AnomalyDetectionDataPoint(1.0, 1.0, confidence = 1.0)), + (1, AnomalyDetectionDataPoint(2.0, 2.0, confidence = 1.0)), + (2, AnomalyDetectionDataPoint(4.0, 4.0, isAnomaly = true, confidence = 1.0)))) .once() } @@ -1105,6 +1145,40 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix assert(sizeAnomalyCheck.evaluate(contextNoRows).status == CheckStatus.Error) } } + + "getNewestPointAnomalyResults returns correct assertion result from anomaly detection data point sequence " + + "with multiple data points" in { + val anomalySequence: Seq[(Long, AnomalyDetectionDataPoint)] = + Seq( + (0, AnomalyDetectionDataPoint(1.0, 1.0, confidence = 1.0)), + (1, AnomalyDetectionDataPoint(2.0, 2.0, confidence = 1.0)), + (2, AnomalyDetectionDataPoint(11.0, 11.0, isAnomaly = true, confidence = 1.0))) + val result: AnomalyDetectionAssertionResult = + getNewestPointAnomalyResults(AnomalyDetectionResult(anomalySequence)) + assert(!result.hasNoAnomaly) + assert(result.anomalyDetectionMetadata.anomalyDetectionDataPoint == + AnomalyDetectionDataPoint(11.0, 11.0, isAnomaly = true, confidence = 1.0)) + } + + "getNewestPointAnomalyResults returns correct assertion result from anomaly detection data point sequence " + + "with one data point" in { + val anomalySequence: Seq[(Long, AnomalyDetectionDataPoint)] = + Seq( + (0, AnomalyDetectionDataPoint(11.0, 11.0, confidence = 1.0))) + val result: AnomalyDetectionAssertionResult = + getNewestPointAnomalyResults(AnomalyDetectionResult(anomalySequence)) + assert(result.hasNoAnomaly) + assert(result.anomalyDetectionMetadata.anomalyDetectionDataPoint == + AnomalyDetectionDataPoint(11.0, 11.0, confidence = 1.0)) + } + + "assert getNewestPointAnomalyResults throws exception from empty anomaly detection sequence" in { + val anomalySequence: Seq[(Long, AnomalyDetectionDataPoint)] = Seq() + intercept[IllegalArgumentException] { + getNewestPointAnomalyResults(AnomalyDetectionResult(anomalySequence)) + } + } + } /** Run anomaly detection using a repository with some previous analysis results for testing */ diff --git a/src/test/scala/com/amazon/deequ/constraints/AnomalyBasedConstraintTest.scala b/src/test/scala/com/amazon/deequ/constraints/AnomalyBasedConstraintTest.scala index 8f5281048..a21038ccd 100644 --- a/src/test/scala/com/amazon/deequ/constraints/AnomalyBasedConstraintTest.scala +++ b/src/test/scala/com/amazon/deequ/constraints/AnomalyBasedConstraintTest.scala @@ -1,5 +1,285 @@ +/** + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License + * is located at + * + * http://aws.amazon.com/apache2.0/ + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ package com.amazon.deequ.constraints -class AnomalyBasedConstraintTest { +import com.amazon.deequ.SparkContextSpec +import com.amazon.deequ.analyzers.runners.MetricCalculationException +import com.amazon.deequ.analyzers.{Analyzer, NumMatches, StateLoader, StatePersister} +import com.amazon.deequ.anomalydetection.{AnomalyDetectionAssertionResult, AnomalyDetectionDataPoint, AnomalyDetectionMetadata} +import com.amazon.deequ.constraints.ConstraintUtils.calculate +import com.amazon.deequ.metrics.{DoubleMetric, Entity, Metric} +import com.amazon.deequ.utils.FixtureSupport +import org.apache.spark.sql.DataFrame +import org.scalamock.scalatest.MockFactory +import org.scalatest.wordspec.AnyWordSpec +import org.scalatest.PrivateMethodTester + +import scala.util.{Failure, Try} + +class AnomalyBasedConstraintTest extends AnyWordSpec with SparkContextSpec + with FixtureSupport with MockFactory with PrivateMethodTester { + /** + * Sample function to use as value picker + * + * @return Returns input multiplied by 2 + */ + def valueDoubler(value: Double): Double = { + value * 2 + } + + /** + * Sample analyzer that returns a 1.0 value if the given column exists and fails otherwise. + */ + case class SampleAnalyzer(column: String) extends Analyzer[NumMatches, DoubleMetric] { + override def toFailureMetric(exception: Exception): DoubleMetric = { + DoubleMetric(Entity.Column, "sample", column, Failure(MetricCalculationException + .wrapIfNecessary(exception))) + } + + + override def calculate( + data: DataFrame, + stateLoader: Option[StateLoader], + statePersister: Option[StatePersister]) + : DoubleMetric = { + val value: Try[Double] = Try { + require(data.columns.contains(column), s"Missing column $column") + 1.0 + } + DoubleMetric(Entity.Column, "sample", column, value) + } + + override def computeStateFrom(data: DataFrame): Option[NumMatches] = { + throw new NotImplementedError() + } + + + override def computeMetricFrom(state: Option[NumMatches]): DoubleMetric = { + throw new NotImplementedError() + } + } + + "Analysis based constraint" should { + + + "assert correctly on values if analysis is successful" in + withSparkSession { sparkSession => + val df = getDfMissing(sparkSession) + + // Analysis result should equal to 1.0 for an existing column + val functionA = (metric: Double) => { AnomalyDetectionAssertionResult(metric == 1.0, + AnomalyDetectionMetadata(AnomalyDetectionDataPoint(1.0, 1.0, confidence = 1.0))) } + val resultA = calculate(AnomalyBasedConstraint[NumMatches, Double, Double]( + SampleAnalyzer("att1"), functionA), df) + + assert(resultA.status == ConstraintStatus.Success) + assert(resultA.message.isEmpty) + assert(resultA.metric.isDefined) + + // Analysis result should equal to 1.0 for an existing column + val functionB = (metric: Double) => { + AnomalyDetectionAssertionResult(metric != 1.0, + AnomalyDetectionMetadata(AnomalyDetectionDataPoint(1.0, 1.0, confidence = 1.0))) + } + val resultB = calculate(AnomalyBasedConstraint[NumMatches, Double, Double]( + SampleAnalyzer("att1"), functionB), df) + + assert(resultB.status == ConstraintStatus.Failure) + assert(resultB.message.contains("Value: 1.0 does not meet the constraint requirement, " + + "check the anomaly detection metadata!")) + assert(resultB.metric.isDefined) + + // Analysis should fail for a non existing column + val functionC = (metric: Double) => { + AnomalyDetectionAssertionResult(metric == 1.0, + AnomalyDetectionMetadata(AnomalyDetectionDataPoint(1.0, 1.0, confidence = 1.0))) + } + val resultC = calculate(AnomalyBasedConstraint[NumMatches, Double, Double]( + SampleAnalyzer("someMissingColumn"), functionC), df) + + assert(resultC.status == ConstraintStatus.Failure) + assert(resultC.message.contains("requirement failed: Missing column someMissingColumn")) + assert(resultC.metric.isDefined) + } + + "execute value picker on the analysis result value, if provided" in + withSparkSession { sparkSession => + + + val df = getDfMissing(sparkSession) + + // Analysis result should equal to 100.0 for an existing column + val functionA = (metric: Double) => { + AnomalyDetectionAssertionResult(metric == 2.0, + AnomalyDetectionMetadata(AnomalyDetectionDataPoint(2.0, 2.0, confidence = 1.0))) + } + assert(calculate(AnomalyBasedConstraint[NumMatches, Double, Double]( + SampleAnalyzer("att1"), functionA, Some(valueDoubler)), df).status == + ConstraintStatus.Success) + + val functionB = (metric: Double) => { + AnomalyDetectionAssertionResult(metric != 2.0, + AnomalyDetectionMetadata(AnomalyDetectionDataPoint(2.0, 2.0, confidence = 1.0))) + } + assert(calculate(AnomalyBasedConstraint[NumMatches, Double, Double]( + SampleAnalyzer("att1"), functionB, Some(valueDoubler)), df).status == + ConstraintStatus.Failure) + + // Analysis should fail for a non existing column + assert(calculate(AnomalyBasedConstraint[NumMatches, Double, Double]( + SampleAnalyzer("someMissingColumn"), functionA, Some(valueDoubler)), df).status == + ConstraintStatus.Failure) + } + + "get the analysis from the context, if provided" in withSparkSession { sparkSession => + val df = getDfMissing(sparkSession) + + val emptyResults = Map.empty[Analyzer[_, Metric[_]], Metric[_]] + val validResults = Map[Analyzer[_, Metric[_]], Metric[_]]( + SampleAnalyzer("att1") -> SampleAnalyzer("att1").calculate(df), + SampleAnalyzer("someMissingColumn") -> SampleAnalyzer("someMissingColumn").calculate(df) + ) + + // Analysis result should equal to 1.0 for an existing column + val functionA = (metric: Double) => { + AnomalyDetectionAssertionResult(metric == 1.0, + AnomalyDetectionMetadata(AnomalyDetectionDataPoint(1.0, 1.0, confidence = 1.0))) + } + assert(AnomalyBasedConstraint[NumMatches, Double, Double](SampleAnalyzer("att1"), functionA) + .evaluate(validResults).status == ConstraintStatus.Success) + + val functionB = (metric: Double) => { + AnomalyDetectionAssertionResult(metric != 1.0, + AnomalyDetectionMetadata(AnomalyDetectionDataPoint(1.0, 1.0, confidence = 1.0))) + } + assert(AnomalyBasedConstraint[NumMatches, Double, Double](SampleAnalyzer("att1"), functionB) + .evaluate(validResults).status == ConstraintStatus.Failure) + + assert(AnomalyBasedConstraint[NumMatches, Double, Double]( + SampleAnalyzer("someMissingColumn"), functionA) + .evaluate(validResults).status == ConstraintStatus.Failure) + + // Although assertion would pass, since analysis result is missing, + // constraint fails with missing analysis message + AnomalyBasedConstraint[NumMatches, Double, Double](SampleAnalyzer("att1"), functionA) + .evaluate(emptyResults) match { + case result => + assert(result.status == ConstraintStatus.Failure) + assert(result.message.contains("Missing Analysis, can't run the constraint!")) + assert(result.metric.isEmpty) + } + } + + "execute value picker on the analysis result value retrieved from context, if provided" in + withSparkSession { sparkSession => + val df = getDfMissing(sparkSession) + val validResults = Map[Analyzer[_, Metric[_]], Metric[_]]( + SampleAnalyzer("att1") -> SampleAnalyzer("att1").calculate(df)) + + val functionA = (metric: Double) => { + AnomalyDetectionAssertionResult(metric == 2.0, + AnomalyDetectionMetadata(AnomalyDetectionDataPoint(2.0, 2.0, confidence = 1.0))) + } + assert(AnomalyBasedConstraint[NumMatches, Double, Double]( + SampleAnalyzer("att1"), functionA, Some(valueDoubler)) + .evaluate(validResults).status == ConstraintStatus.Success) + } + + + "fail on analysis if value picker is provided but fails" in withSparkSession { sparkSession => + def problematicValuePicker(value: Double): Double = { + throw new RuntimeException("Something wrong with this picker") + } + + val df = getDfMissing(sparkSession) + + val emptyResults = Map.empty[Analyzer[_, Metric[_]], Metric[_]] + val validResults = Map[Analyzer[_, Metric[_]], Metric[_]]( + SampleAnalyzer("att1") -> SampleAnalyzer("att1").calculate(df)) + + val functionA = (metric: Double) => { + AnomalyDetectionAssertionResult(metric == 1.0, + AnomalyDetectionMetadata(AnomalyDetectionDataPoint(1.0, 1.0, confidence = 1.0))) + } + val constraint = AnomalyBasedConstraint[NumMatches, Double, Double]( + SampleAnalyzer("att1"), functionA, Some(problematicValuePicker)) + + calculate(constraint, df) match { + case result => + assert(result.status == ConstraintStatus.Failure) + assert(result.message.get.contains("Can't retrieve the value to assert on")) + assert(result.metric.isDefined) + } + + constraint.evaluate(validResults) match { + case result => + assert(result.status == ConstraintStatus.Failure) + assert(result.message.isDefined) + assert(result.message.get.startsWith("Can't retrieve the value to assert on")) + assert(result.metric.isDefined) + } + + constraint.evaluate(emptyResults) match { + case result => + assert(result.status == ConstraintStatus.Failure) + assert(result.message.contains("Missing Analysis, can't run the constraint!")) + assert(result.metric.isEmpty) + } + + } + + "fail on failed assertion function with hint in exception message if provided" in + withSparkSession { sparkSession => + + val df = getDfMissing(sparkSession) + + val assertionFunction = (metric: Double) => { + AnomalyDetectionAssertionResult(metric == 0.9, + AnomalyDetectionMetadata(AnomalyDetectionDataPoint(1.0, 1.0, confidence = 1.0))) + } + val failingConstraint = AnomalyBasedConstraint[NumMatches, Double, Double]( + SampleAnalyzer("att1"), assertionFunction, hint = Some("Value should be like ...!")) + + calculate(failingConstraint, df) match { + case result => + assert(result.status == ConstraintStatus.Failure) + assert(result.message.isDefined) + assert(result.message.get == "Value: 1.0 does not meet the constraint requirement, " + + "check the anomaly detection metadata! Value should be like ...!") + assert(result.metric.isDefined) + } + } + + "return failed constraint for a failing assertion" in withSparkSession { session => + val msg = "-test-" + val exception = new RuntimeException(msg) + val df = getDfMissing(session) + + def failingAssertion(value: Double): AnomalyDetectionAssertionResult = throw exception + + val constraintResult = calculate( + AnomalyBasedConstraint[NumMatches, Double, Double]( + SampleAnalyzer("att1"), failingAssertion), df + ) + + assert(constraintResult.status == ConstraintStatus.Failure) + assert(constraintResult.metric.isDefined) + assert(constraintResult.message.contains(s"Can't execute the assertion: $msg!")) + } + + } } diff --git a/src/test/scala/com/amazon/deequ/constraints/ConstraintUtils.scala b/src/test/scala/com/amazon/deequ/constraints/ConstraintUtils.scala index 5782bc18c..228e41cd0 100644 --- a/src/test/scala/com/amazon/deequ/constraints/ConstraintUtils.scala +++ b/src/test/scala/com/amazon/deequ/constraints/ConstraintUtils.scala @@ -27,6 +27,9 @@ object ConstraintUtils { case c: Constraint => c } - analysisBasedConstraint.asInstanceOf[AnalysisBasedConstraint[_, _, _]].calculateAndEvaluate(df) + analysisBasedConstraint match { + case analysis: AnalysisBasedConstraint[_, _, _] => analysis.calculateAndEvaluate(df) + case anomaly: AnomalyBasedConstraint[_, _, _] => anomaly.calculateAndEvaluate(df) + } } } diff --git a/src/test/scala/com/amazon/deequ/constraints/ConstraintsTest.scala b/src/test/scala/com/amazon/deequ/constraints/ConstraintsTest.scala index e4a8ba898..19712c196 100644 --- a/src/test/scala/com/amazon/deequ/constraints/ConstraintsTest.scala +++ b/src/test/scala/com/amazon/deequ/constraints/ConstraintsTest.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.types.{DoubleType, StringType} import Constraint._ import com.amazon.deequ.SparkContextSpec +import com.amazon.deequ.anomalydetection.{AnomalyDetectionAssertionResult, AnomalyDetectionDataPoint, AnomalyDetectionMetadata} class ConstraintsTest extends WordSpec with Matchers with SparkContextSpec with FixtureSupport { @@ -163,15 +164,32 @@ class ConstraintsTest extends WordSpec with Matchers with SparkContextSpec with "Anomaly constraint" should { "assert on anomaly analyzer values" in withSparkSession { sparkSession => val df = getDfMissing(sparkSession) + assert(calculate(Constraint.anomalyConstraint[NumMatchesAndCount]( - Completeness("att1"), _ > 0.4), df).status == ConstraintStatus.Success) + Completeness("att1"), (metric: Double) => { + AnomalyDetectionAssertionResult(metric > 0.4, + AnomalyDetectionMetadata(AnomalyDetectionDataPoint(1.0, 1.0, confidence = 1.0))) + } ), df) + .status == ConstraintStatus.Success) assert(calculate(Constraint.anomalyConstraint[NumMatchesAndCount]( - Completeness("att1"), _ < 0.4), df).status == ConstraintStatus.Failure) + Completeness("att1"), (metric: Double) => { + AnomalyDetectionAssertionResult(metric < 0.4, + AnomalyDetectionMetadata(AnomalyDetectionDataPoint(1.0, 1.0, confidence = 1.0))) + }), df) + .status == ConstraintStatus.Failure) assert(calculate(Constraint.anomalyConstraint[NumMatchesAndCount]( - Completeness("att2"), _ > 0.7), df).status == ConstraintStatus.Success) + Completeness("att2"), (metric: Double) => { + AnomalyDetectionAssertionResult(metric > 0.7, + AnomalyDetectionMetadata(AnomalyDetectionDataPoint(1.0, 1.0, confidence = 1.0))) + }), df) + .status == ConstraintStatus.Success) assert(calculate(Constraint.anomalyConstraint[NumMatchesAndCount]( - Completeness("att2"), _ < 0.7), df).status == ConstraintStatus.Failure) + Completeness("att2"), (metric: Double) => { + AnomalyDetectionAssertionResult(metric < 0.7, + AnomalyDetectionMetadata(AnomalyDetectionDataPoint(1.0, 1.0, confidence = 1.0))) + }), df) + .status == ConstraintStatus.Failure) } } }