Skip to content

Commit

Permalink
add data synchronization test to verification Suite. (#526)
Browse files Browse the repository at this point in the history
* add data synchronization test to verification suite

* review comments

* update test and doc strings
  • Loading branch information
VenkataKarthikP authored and eycho-am committed Feb 20, 2024
1 parent 57c193d commit 74f7a4b
Show file tree
Hide file tree
Showing 10 changed files with 631 additions and 80 deletions.
10 changes: 5 additions & 5 deletions src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
Expand All @@ -19,17 +19,17 @@ package com.amazon.deequ.analyzers
import com.amazon.deequ.analyzers.Analyzers._
import com.amazon.deequ.analyzers.NullBehavior.NullBehavior
import com.amazon.deequ.analyzers.runners._
import com.amazon.deequ.metrics.FullColumn
import com.amazon.deequ.utilities.ColumnUtil.removeEscapeColumn
import com.amazon.deequ.metrics.DoubleMetric
import com.amazon.deequ.metrics.Entity
import com.amazon.deequ.metrics.FullColumn
import com.amazon.deequ.metrics.Metric
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import com.amazon.deequ.utilities.ColumnUtil.removeEscapeColumn
import org.apache.spark.sql.Column
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

import scala.language.existentials
import scala.util.Failure
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
* is located at
*
* http://aws.amazon.com/apache2.0/
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/

package com.amazon.deequ.analyzers

import com.amazon.deequ.analyzers.Analyzers.metricFromFailure
import com.amazon.deequ.comparison.DataSynchronization
import com.amazon.deequ.comparison.DataSynchronizationFailed
import com.amazon.deequ.comparison.DataSynchronizationSucceeded
import com.amazon.deequ.metrics.DoubleMetric
import com.amazon.deequ.metrics.Entity
import org.apache.spark.sql.DataFrame

import scala.util.Failure
import scala.util.Try


/**
* An Analyzer for Deequ that performs a data synchronization check between two DataFrames.
* It evaluates the degree of synchronization based on specified column mappings and an assertion function.
*
* The analyzer computes a ratio of synchronized data points to the total data points, represented as a DoubleMetric.
* Refer to [[com.amazon.deequ.comparison.DataSynchronization.columnMatch]] for DataSynchronization implementation
*
* @param dfToCompare The DataFrame to compare with the primary DataFrame that is setup
* during [[com.amazon.deequ.VerificationSuite.onData]] setup.
* @param columnMappings A map where each key-value pair represents a column in the primary DataFrame
* and its corresponding column in dfToCompare.
* @param assertion A function that takes a Double (the match ratio) and returns a Boolean.
* It defines the condition for successful synchronization.
*
* Usage:
* This analyzer is used in Deequ's VerificationSuite based if `isDataSynchronized` check is defined or could be used
* manually as well.
*
* Example:
* val analyzer = DataSynchronizationAnalyzer(dfToCompare, Map("col1" -> "col2"), _ > 0.8)
* val verificationResult = VerificationSuite().onData(df).addAnalyzer(analyzer).run()
*
* // or could do something like below
* val verificationResult = VerificationSuite().onData(df).isDataSynchronized(dfToCompare, Map("col1" -> "col2"),
* _ > 0.8).run()
*
*
* The computeStateFrom method calculates the synchronization state by comparing the specified columns of the two
* DataFrames.
* The computeMetricFrom method then converts this state into a DoubleMetric representing the synchronization ratio.
*
*/
case class DataSynchronizationAnalyzer(dfToCompare: DataFrame,
columnMappings: Map[String, String],
assertion: Double => Boolean)
extends Analyzer[DataSynchronizationState, DoubleMetric] {

override def computeStateFrom(data: DataFrame): Option[DataSynchronizationState] = {

val result = DataSynchronization.columnMatch(data, dfToCompare, columnMappings, assertion)

result match {
case succeeded: DataSynchronizationSucceeded =>
Some(DataSynchronizationState(succeeded.passedCount, succeeded.totalCount))
case failed: DataSynchronizationFailed =>
Some(DataSynchronizationState(failed.passedCount.getOrElse(0), failed.totalCount.getOrElse(0)))
case _ => None
}
}

override def computeMetricFrom(state: Option[DataSynchronizationState]): DoubleMetric = {

val metric = state match {
case Some(s) => Try(s.synchronizedDataCount.toDouble / s.totalDataCount.toDouble)
case _ => Failure(new IllegalStateException("No state available for DataSynchronizationAnalyzer"))
}

DoubleMetric(Entity.Dataset, "DataSynchronization", "", metric, None)
}

override private[deequ] def toFailureMetric(failure: Exception) =
metricFromFailure(failure, "DataSynchronization", "", Entity.Dataset)
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
* is located at
*
* http://aws.amazon.com/apache2.0/
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/

package com.amazon.deequ.analyzers

/**
* Represents the state of data synchronization between two DataFrames in Deequ.
* This state keeps track of the count of synchronized record count and the total record count.
* It is used to calculate a ratio of synchronization, which is a measure of how well the data
* in the two DataFrames are synchronized.
*
* @param synchronizedDataCount The count of records that are considered synchronized between the two DataFrames.
* @param totalDataCount The total count of records for check.
*
* The `sum` method allows for aggregation of this state with another, combining the counts from both states.
* This is useful in distributed computations where states from different partitions need to be aggregated.
*
* The `metricValue` method computes the synchronization ratio. It is the ratio of `synchronizedDataCount`
* to `dataCount`.
* If `dataCount` is zero, which means no data points were examined, the method returns `Double.NaN`
* to indicate the undefined state.
*
*/
case class DataSynchronizationState(synchronizedDataCount: Long, totalDataCount: Long)
extends DoubleValuedState[DataSynchronizationState] {
override def sum(other: DataSynchronizationState): DataSynchronizationState = {
DataSynchronizationState(synchronizedDataCount + other.synchronizedDataCount, totalDataCount + other.totalDataCount)
}

override def metricValue(): Double = {
if (totalDataCount == 0L) Double.NaN else synchronizedDataCount.toDouble / totalDataCount.toDouble
}
}

object DataSynchronizationState
83 changes: 75 additions & 8 deletions src/main/scala/com/amazon/deequ/checks/Check.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
Expand All @@ -16,17 +16,29 @@

package com.amazon.deequ.checks

import com.amazon.deequ.analyzers.AnalyzerOptions
import com.amazon.deequ.analyzers.{Analyzer, Histogram, KLLParameters, Patterns, State}
import com.amazon.deequ.anomalydetection.{AnomalyDetectionStrategy, AnomalyDetector, DataPoint}
import com.amazon.deequ.analyzers.runners.AnalyzerContext
import com.amazon.deequ.analyzers.Analyzer
import com.amazon.deequ.analyzers.AnalyzerOptions
import com.amazon.deequ.analyzers.DataSynchronizationAnalyzer
import com.amazon.deequ.analyzers.DataSynchronizationState
import com.amazon.deequ.analyzers.Histogram
import com.amazon.deequ.analyzers.KLLParameters
import com.amazon.deequ.analyzers.Patterns
import com.amazon.deequ.analyzers.State
import com.amazon.deequ.anomalydetection.HistoryUtils
import com.amazon.deequ.anomalydetection.AnomalyDetectionStrategy
import com.amazon.deequ.anomalydetection.AnomalyDetector
import com.amazon.deequ.anomalydetection.DataPoint
import com.amazon.deequ.checks.ColumnCondition.isAnyNotNull
import com.amazon.deequ.checks.ColumnCondition.isEachNotNull
import com.amazon.deequ.constraints.Constraint._
import com.amazon.deequ.constraints._
import com.amazon.deequ.metrics.{BucketDistribution, Distribution, Metric}
import com.amazon.deequ.metrics.BucketDistribution
import com.amazon.deequ.metrics.Distribution
import com.amazon.deequ.metrics.Metric
import com.amazon.deequ.repository.MetricsRepository
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions.UserDefinedFunction
import com.amazon.deequ.anomalydetection.HistoryUtils
import com.amazon.deequ.checks.ColumnCondition.{isAnyNotNull, isEachNotNull}

import scala.util.matching.Regex

Expand Down Expand Up @@ -338,6 +350,59 @@ case class Check(
uniqueValueRatioConstraint(columns, assertion, filter, hint) }
}

/**
* Performs a data synchronization check between the base DataFrame supplied to
* [[com.amazon.deequ.VerificationSuite.onData]] and other DataFrame supplied to this check using Deequ's
* [[com.amazon.deequ.comparison.DataSynchronization.columnMatch]] framework.
* This method compares specified columns of both DataFrames and assesses synchronization based on a custom assertion.
*
* Utilizes [[com.amazon.deequ.analyzers.DataSynchronizationAnalyzer]] for comparing the data
* and Constraint [[com.amazon.deequ.constraints.DataSynchronizationConstraint]].
*
* Usage:
* To use this method, create a VerificationSuite and invoke this method as part of adding checks:
* {{{
* val baseDataFrame: DataFrame = ...
* val otherDataFrame: DataFrame = ...
* val columnMappings: Map[String, String] = Map("baseCol1" -> "otherCol1", "baseCol2" -> "otherCol2")
* val assertionFunction: Double => Boolean = _ > 0.7
*
* val check = new Check(CheckLevel.Error, "Data Synchronization Check")
* .isDataSynchronized(otherDataFrame, columnMappings, assertionFunction)
*
* val verificationResult = VerificationSuite()
* .onData(baseDataFrame)
* .addCheck(check)
* .run()
* }}}
*
* This will add a data synchronization check to the VerificationSuite, comparing the specified columns of
* baseDataFrame and otherDataFrame based on the provided assertion function.
*
*
* @param otherDf The DataFrame to be compared with the current one. Analyzed in conjunction with the
* current DataFrame to assess data synchronization.
* @param columnMappings A map defining the column correlations between the current DataFrame and otherDf.
* Keys represent column names in the current DataFrame,
* and values are corresponding column names in otherDf.
* @param assertion A function that takes a Double (result of the comparison) and returns a Boolean.
* Defines the condition under which the data in both DataFrames is considered synchronized.
* For example (_ > 0.7) denoting metric value > 0.7 or 70% of records.
* @param hint Optional. Additional context or information about the synchronization check.
* Helpful for understanding the intent or specifics of the check. Default is None.
* @return A [[com.amazon.deequ.checks.Check]] object representing the outcome
* of the synchronization check. This object can be used in Deequ's verification suite to
* assert data quality constraints.
*
*/
def isDataSynchronized(otherDf: DataFrame, columnMappings: Map[String, String], assertion: Double => Boolean,
hint: Option[String] = None): Check = {
val dataSyncAnalyzer = DataSynchronizationAnalyzer(otherDf, columnMappings, assertion)
val constraint = AnalysisBasedConstraint[DataSynchronizationState, Double, Double](dataSyncAnalyzer, assertion,
hint = hint)
addConstraint(constraint)
}

/**
* Creates a constraint that asserts on the number of distinct values a column has.
*
Expand Down Expand Up @@ -1092,7 +1157,9 @@ case class Check(
case nc: ConstraintDecorator => nc.inner
case c: Constraint => c
}
.collect { case constraint: AnalysisBasedConstraint[_, _, _] => constraint.analyzer }
.collect {
case constraint: AnalysisBasedConstraint[_, _, _] => constraint.analyzer
}
.map { _.asInstanceOf[Analyzer[_, Metric[_]]] }
.toSet
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
Expand All @@ -17,5 +17,10 @@
package com.amazon.deequ.comparison

sealed trait ComparisonResult
case class ComparisonFailed(errorMessage: String) extends ComparisonResult
case class ComparisonSucceeded() extends ComparisonResult

case class ComparisonFailed(errorMessage: String, ratio: Double = 0) extends ComparisonResult
case class ComparisonSucceeded(ratio: Double = 0) extends ComparisonResult

case class DataSynchronizationFailed(errorMessage: String, passedCount: Option[Long] = None,
totalCount: Option[Long] = None) extends ComparisonResult
case class DataSynchronizationSucceeded(passedCount: Long, totalCount: Long) extends ComparisonResult
Loading

0 comments on commit 74f7a4b

Please sign in to comment.