From 64618569b6c0555bb1bde88512cf87665cf260f0 Mon Sep 17 00:00:00 2001 From: Karthik Penikalapati Date: Mon, 25 Dec 2023 22:51:38 -0800 Subject: [PATCH 1/3] add data synchronization test to verification suite --- .../com/amazon/deequ/analyzers/Analyzer.scala | 64 +++++++-- .../analyzers/DataSynchronizationState.scala | 36 +++++ .../scala/com/amazon/deequ/checks/Check.scala | 43 +++++- .../deequ/comparison/ComparisonResult.scala | 12 +- .../comparison/DataSynchronization.scala | 15 +- .../amazon/deequ/constraints/Constraint.scala | 29 +++- .../amazon/deequ/VerificationSuiteTest.scala | 11 +- .../com/amazon/deequ/checks/CheckTest.scala | 129 +++++++++++++++++- .../comparison/DataSynchronizationTest.scala | 66 ++++----- 9 files changed, 343 insertions(+), 62 deletions(-) create mode 100644 src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationState.scala diff --git a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala index b3e44c4c7..1a016dc14 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala @@ -19,21 +19,15 @@ package com.amazon.deequ.analyzers import com.amazon.deequ.analyzers.Analyzers._ import com.amazon.deequ.analyzers.NullBehavior.NullBehavior import com.amazon.deequ.analyzers.runners._ -import com.amazon.deequ.metrics.FullColumn +import com.amazon.deequ.comparison.{DataSynchronization, DataSynchronizationFailed, DataSynchronizationSucceeded} +import com.amazon.deequ.metrics.{DoubleMetric, Entity, FullColumn, Metric} import com.amazon.deequ.utilities.ColumnUtil.removeEscapeColumn -import com.amazon.deequ.metrics.DoubleMetric -import com.amazon.deequ.metrics.Entity -import com.amazon.deequ.metrics.Metric +import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ -import org.apache.spark.sql.Column -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.Row -import org.apache.spark.sql.SparkSession import scala.language.existentials -import scala.util.Failure -import scala.util.Success +import scala.util.{Failure, Success, Try} /** * A state (sufficient statistic) computed from data, from which we can compute a metric. @@ -299,6 +293,56 @@ abstract class GroupingAnalyzer[S <: State[_], +M <: Metric[_]] extends Analyzer } } +/** + * Data Synchronization Analyzer + * + * @param dfToCompare DataFrame to compare + * @param columnMappings columns mappings + * @param assertion assertion logic + */ +case class DataSynchronizationAnalyzer(dfToCompare: DataFrame, + columnMappings: Map[String, String], + assertion: Double => Boolean) + extends Analyzer[DataSynchronizationState, DoubleMetric] { + + override def computeStateFrom(data: DataFrame): Option[DataSynchronizationState] = { + + val result = DataSynchronization.columnMatch(data, dfToCompare, columnMappings, assertion) + + result match { + case succeeded: DataSynchronizationSucceeded => + Some(DataSynchronizationState(succeeded.passedCount.getOrElse(0), succeeded.totalCount.getOrElse(0))) + case failed: DataSynchronizationFailed => + Some(DataSynchronizationState(failed.passedCount.getOrElse(0), failed.totalCount.getOrElse(0))) + case _ => None + } + } + + override def computeMetricFrom(state: Option[DataSynchronizationState]): DoubleMetric = { + + state match { + case Some(s) => DoubleMetric( + Entity.Dataset, + "DataSynchronization", + "", + Try(s.synchronizedDataCount.toDouble / s.dataCount.toDouble), + None + ) + case None => DoubleMetric( + Entity.Dataset, + "DataSynchronization", + "", + Try(0.0), + None + ) + } + } + + override private[deequ] def toFailureMetric(failure: Exception) = + metricFromFailure(failure, "DataSynchronization", "", Entity.Dataset) +} + + /** Helper method to check conditions on the schema of the data */ object Preconditions { diff --git a/src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationState.scala b/src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationState.scala new file mode 100644 index 000000000..dec49f530 --- /dev/null +++ b/src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationState.scala @@ -0,0 +1,36 @@ +/** + * Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License + * is located at + * + * http://aws.amazon.com/apache2.0/ + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.deequ.analyzers + +/** + * To store state of DataSynchronization + * + * @param synchronizedDataCount - Count Of rows that are in sync + * @param dataCount - total count of records to caluculate ratio. + */ +case class DataSynchronizationState(synchronizedDataCount: Long, dataCount: Long) + extends DoubleValuedState[DataSynchronizationState] { + override def sum(other: DataSynchronizationState): DataSynchronizationState = { + DataSynchronizationState(synchronizedDataCount + other.synchronizedDataCount, dataCount + other.dataCount) + } + + override def metricValue(): Double = { + if (dataCount == 0L) Double.NaN else synchronizedDataCount.toDouble / dataCount.toDouble + } +} + +object DataSynchronizationState diff --git a/src/main/scala/com/amazon/deequ/checks/Check.scala b/src/main/scala/com/amazon/deequ/checks/Check.scala index afd3a1a3b..e533ffc11 100644 --- a/src/main/scala/com/amazon/deequ/checks/Check.scala +++ b/src/main/scala/com/amazon/deequ/checks/Check.scala @@ -1,5 +1,5 @@ /** - * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License @@ -16,8 +16,7 @@ package com.amazon.deequ.checks -import com.amazon.deequ.analyzers.AnalyzerOptions -import com.amazon.deequ.analyzers.{Analyzer, Histogram, KLLParameters, Patterns, State} +import com.amazon.deequ.analyzers.{Analyzer, AnalyzerOptions, DataSynchronizationState, DataSynchronizationAnalyzer, Histogram, KLLParameters, Patterns, State} import com.amazon.deequ.anomalydetection.{AnomalyDetectionStrategy, AnomalyDetector, DataPoint} import com.amazon.deequ.analyzers.runners.AnalyzerContext import com.amazon.deequ.constraints.Constraint._ @@ -27,6 +26,7 @@ import com.amazon.deequ.repository.MetricsRepository import org.apache.spark.sql.expressions.UserDefinedFunction import com.amazon.deequ.anomalydetection.HistoryUtils import com.amazon.deequ.checks.ColumnCondition.{isAnyNotNull, isEachNotNull} +import org.apache.spark.sql.DataFrame import scala.util.matching.Regex @@ -338,6 +338,38 @@ case class Check( uniqueValueRatioConstraint(columns, assertion, filter, hint) } } + /** + * Performs a data synchronization check between the base DataFrame supplied to + * [[com.amazon.deequ.VerificationSuite.onData]] and other DataFrame supplied to this check using Deequ's + * [[com.amazon.deequ.comparison.DataSynchronization.columnMatch]] framework. + * This method compares specified columns of both DataFrames and assesses synchronization based on a custom assertion. + * + * Utilizes [[com.amazon.deequ.analyzers.DataSynchronizationAnalyzer]] for comparing the data + * and Constraint [[com.amazon.deequ.constraints.DataSynchronizationConstraint]]. + * + * @param otherDf The DataFrame to be compared with the current one. Analyzed in conjunction with the + * current DataFrame to assess data synchronization. + * @param columnMappings A map defining the column correlations between the current DataFrame and otherDf. + * Keys represent column names in the current DataFrame, + * and values are corresponding column names in otherDf. + * @param assertion A function that takes a Double (result of the comparison) and returns a Boolean. + * Defines the condition under which the data in both DataFrames is considered synchronized. + * For example (_ > 0.7) denoting metric value > 0.7 or 70% of records. + * @param hint Optional. Additional context or information about the synchronization check. + * Helpful for understanding the intent or specifics of the check. Default is None. + * @return A [[com.amazon.deequ.checks.Check]] object representing the outcome + * of the synchronization check. This object can be used in Deequ's verification suite to + * assert data quality constraints. + */ + def isDataSynchronized(otherDf: DataFrame, columnMappings: Map[String, String], assertion: Double => Boolean, + hint: Option[String] = None): Check = { + + val dataSyncAnalyzer = DataSynchronizationAnalyzer(otherDf, columnMappings, assertion) + val constraint = DataSynchronizationConstraint(dataSyncAnalyzer, hint) + addConstraint(constraint) + + } + /** * Creates a constraint that asserts on the number of distinct values a column has. * @@ -1092,7 +1124,10 @@ case class Check( case nc: ConstraintDecorator => nc.inner case c: Constraint => c } - .collect { case constraint: AnalysisBasedConstraint[_, _, _] => constraint.analyzer } + .collect { + case constraint: AnalysisBasedConstraint[_, _, _] => constraint.analyzer + case constraint: DataSynchronizationConstraint => constraint.analyzer + } .map { _.asInstanceOf[Analyzer[_, Metric[_]]] } .toSet } diff --git a/src/main/scala/com/amazon/deequ/comparison/ComparisonResult.scala b/src/main/scala/com/amazon/deequ/comparison/ComparisonResult.scala index ec2c6965c..c7654ba32 100644 --- a/src/main/scala/com/amazon/deequ/comparison/ComparisonResult.scala +++ b/src/main/scala/com/amazon/deequ/comparison/ComparisonResult.scala @@ -1,5 +1,5 @@ /** - * Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License @@ -17,5 +17,11 @@ package com.amazon.deequ.comparison sealed trait ComparisonResult -case class ComparisonFailed(errorMessage: String) extends ComparisonResult -case class ComparisonSucceeded() extends ComparisonResult + +case class ComparisonFailed(errorMessage: String, ratio: Double = 0) extends ComparisonResult +case class ComparisonSucceeded(ratio: Double = 0) extends ComparisonResult + +case class DataSynchronizationFailed(errorMessage: String, passedCount: Option[Long] = None, + totalCount: Option[Long] = None) extends ComparisonResult +case class DataSynchronizationSucceeded(passedCount: Option[Long] = None, totalCount: Option[Long] = None) + extends ComparisonResult diff --git a/src/main/scala/com/amazon/deequ/comparison/DataSynchronization.scala b/src/main/scala/com/amazon/deequ/comparison/DataSynchronization.scala index c5a82f768..dea665aab 100644 --- a/src/main/scala/com/amazon/deequ/comparison/DataSynchronization.scala +++ b/src/main/scala/com/amazon/deequ/comparison/DataSynchronization.scala @@ -1,5 +1,5 @@ /** - * Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License @@ -107,7 +107,7 @@ object DataSynchronization extends ComparisonBase { finalAssertion(ds1, ds2, mergedMaps, assertion) } } else { - ComparisonFailed(columnErrors.get) + DataSynchronizationFailed(columnErrors.get) } } @@ -147,7 +147,7 @@ object DataSynchronization extends ComparisonBase { finalAssertion(ds1, ds2, mergedMaps, assertion) } } else { - ComparisonFailed(keyColumnErrors.get) + DataSynchronizationFailed(keyColumnErrors.get) } } @@ -260,12 +260,15 @@ object DataSynchronization extends ComparisonBase { .reduce((e1, e2) => e1 && e2) val joined = ds1.join(ds2, joinExpression, "inner") - val ratio = joined.count().toDouble / ds1Count + val passedCount = joined.count() + val totalCount = ds1Count + val ratio = passedCount.toDouble / totalCount.toDouble if (assertion(ratio)) { - ComparisonSucceeded() + DataSynchronizationSucceeded(Some(passedCount), Some(totalCount)) } else { - ComparisonFailed(s"Value: $ratio does not meet the constraint requirement.") + DataSynchronizationFailed(s"Data Synchronization Comparison Metric Value: $ratio does not meet the constraint" + + s"requirement.", Some(passedCount), Some(totalCount)) } } } diff --git a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala index 39d8f522f..b6e009822 100644 --- a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala +++ b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala @@ -17,9 +17,10 @@ package com.amazon.deequ.constraints import com.amazon.deequ.analyzers._ -import com.amazon.deequ.metrics.{BucketDistribution, Distribution, Metric} +import com.amazon.deequ.metrics.{BucketDistribution, Distribution, DoubleMetric, Metric} import org.apache.spark.sql.expressions.UserDefinedFunction +import scala.util.{Success, Try} import scala.util.matching.Regex object ConstraintStatus extends Enumeration { @@ -897,3 +898,29 @@ object Constraint { } } + +/** + * Data Synchronization Constraint + * @param analyzer Data Synchronization Analyzer + * @param hint hint + */ +case class DataSynchronizationConstraint(analyzer: DataSynchronizationAnalyzer, hint: Option[String]) + extends Constraint { + + override def evaluate(metrics: Map[Analyzer[_, Metric[_]], Metric[_]]): ConstraintResult = { + + val anz = Try(metrics.filter(i => i._1.isInstanceOf[DataSynchronizationAnalyzer]).head._2) + anz match { + case Success(m: DoubleMetric) => + val result = m.value match { + case Success(value) => analyzer.assertion(value) + case _ => false + } + val status = if (result) ConstraintStatus.Success else ConstraintStatus.Failure + ConstraintResult(this, status, hint, Some(m)) + + case _ => + ConstraintResult(this, ConstraintStatus.Failure, hint, anz.toOption) + } + } +} diff --git a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala index a21bfc934..d9d5fa012 100644 --- a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala +++ b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala @@ -19,9 +19,7 @@ package com.amazon.deequ import com.amazon.deequ.analyzers._ import com.amazon.deequ.analyzers.runners.AnalyzerContext import com.amazon.deequ.anomalydetection.AbsoluteChangeStrategy -import com.amazon.deequ.checks.Check -import com.amazon.deequ.checks.CheckLevel -import com.amazon.deequ.checks.CheckStatus +import com.amazon.deequ.checks.{Check, CheckLevel, CheckStatus} import com.amazon.deequ.constraints.{Constraint, ConstraintResult} import com.amazon.deequ.io.DfsUtils import com.amazon.deequ.metrics.{DoubleMetric, Entity, Metric} @@ -806,6 +804,9 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec val complianceCheckThatShouldFailCompleteness = Check(CheckLevel.Error, "shouldErrorStringType") .hasCompleteness("fake", x => x > 0) + val checkHasDataInSyncTest = Check(CheckLevel.Error, "shouldSucceedForAge") + .isDataSynchronized(df, Map("age" -> "age"), _ > 0.99, Some("shouldpass")) + val verificationResult = VerificationSuite() .onData(df) .addCheck(checkThatShouldSucceed) @@ -815,6 +816,7 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec .addCheck(checkThatShouldFail) .addCheck(complianceCheckThatShouldFail) .addCheck(complianceCheckThatShouldFailCompleteness) + .addCheck(checkHasDataInSyncTest) .run() val checkSuccessResult = verificationResult.checkResults(checkThatShouldSucceed) @@ -846,6 +848,9 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec checkFailedCompletenessResult.constraintResults.map(_.message) shouldBe List(Some("Input data does not include column fake!")) assert(checkFailedCompletenessResult.status == CheckStatus.Error) + + val checkDataSyncResult = verificationResult.checkResults(checkHasDataInSyncTest) + checkDataSyncResult.status shouldBe CheckStatus.Success } "Well-defined checks should produce correct result even if another check throws an exception" in withSparkSession { diff --git a/src/test/scala/com/amazon/deequ/checks/CheckTest.scala b/src/test/scala/com/amazon/deequ/checks/CheckTest.scala index 0a46b9e25..408154fe0 100644 --- a/src/test/scala/com/amazon/deequ/checks/CheckTest.scala +++ b/src/test/scala/com/amazon/deequ/checks/CheckTest.scala @@ -1,5 +1,5 @@ /** - * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License @@ -25,11 +25,12 @@ import com.amazon.deequ.metrics.{DoubleMetric, Entity} import com.amazon.deequ.repository.memory.InMemoryMetricsRepository import com.amazon.deequ.repository.{MetricsRepository, ResultKey} import com.amazon.deequ.utils.FixtureSupport +import org.apache.spark.sql.functions.{col, when} import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.scalamock.scalatest.MockFactory -import org.scalatest.wordspec.AnyWordSpec import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec import scala.util.{Success, Try} @@ -1107,6 +1108,130 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix } } + /** + * Test for DataSync in verification suite. + */ + "Check hasDataInSync" should { + + val colMapAtt1 = Map("att1" -> "att1") + val colMapTwoCols = Map("att1" -> "att1", "att2" -> "att2") + + "yield success for basic data sync test for 1 col" in withSparkSession { sparkSession => + val dfInformative = getDfWithConditionallyInformativeColumns(sparkSession) + + val check = Check(CheckLevel.Error, "must have data in sync") + .isDataSynchronized(dfInformative, colMapAtt1, _ > 0.9, Some("show be in sync")) + val context = runChecks(dfInformative, check) + + assertSuccess(check, context) + } + + "yield failure when column doesnt exist in data sync test for 1 col" in withSparkSession { sparkSession => + val dfInformative = getDfWithConditionallyInformativeColumns(sparkSession) + val dfInformativeRenamed = dfInformative.withColumnRenamed("att1", "att1_renamed") + + val check = Check(CheckLevel.Error, "must fail as columns does not exist") + .isDataSynchronized(dfInformativeRenamed, colMapAtt1, _ > 0.9, Some("must fail as columns does not exist")) + val context = runChecks(dfInformative, check) + assertEvaluatesTo(check, context, CheckStatus.Error) + } + + "yield failure when row count varies in data sync test for 1 col" in withSparkSession { sparkSession => + val dfInformative = getDfWithConditionallyInformativeColumns(sparkSession) + val dfInformativeFiltered = dfInformative.filter("att1 > 2") + + val check = Check(CheckLevel.Error, "must fail as columns does not exist") + .isDataSynchronized(dfInformativeFiltered, colMapAtt1, _ > 0.9, Some("must fail as columns does not exist")) + val context = runChecks(dfInformative, check) + assertEvaluatesTo(check, context, CheckStatus.Error) + } + + "yield failed assertion for 0.9 for 1 col" in withSparkSession { sparkSession => + val df = getDfWithConditionallyInformativeColumns(sparkSession) + val modifiedDf = df.withColumn("att1", when(col("att1") === 3, 4) + .otherwise(col("att1"))) + + val check = Check(CheckLevel.Error, "must fail as rows mismatches") + .isDataSynchronized(modifiedDf, colMapAtt1, _ > 0.9, Some("must fail as rows mismatches")) + val context = runChecks(df, check) + assertEvaluatesTo(check, context, CheckStatus.Error) + + } + + "yield failed assertion for 0.6 for 1 col" in withSparkSession { sparkSession => + val df = getDfWithConditionallyInformativeColumns(sparkSession) + val modifiedDf = df.withColumn("att1", when(col("att1") === 3, 4) + .otherwise(col("att1"))) + + val check = Check(CheckLevel.Error, "must be success as rows count mismatches at assertion 0.6") + .isDataSynchronized(modifiedDf, colMapAtt1, _ > 0.6, + Some("must be success as rows count mismatches at assertion 0.6")) + val context = runChecks(df, check) + assertSuccess(check, context) + } + + + "yield success for basic data sync test for multiple columns" in withSparkSession { sparkSession => + val dfInformative = getDfWithConditionallyInformativeColumns(sparkSession) + + val check = Check(CheckLevel.Error, "must have data in sync") + .isDataSynchronized(dfInformative, colMapTwoCols, _ > 0.9, Some("show be in sync")) + val context = runChecks(dfInformative, check) + + assertSuccess(check, context) + } + + "yield failure when column doesnt exist in data sync test for multiple columns" in withSparkSession { + sparkSession => + val dfInformative = getDfWithConditionallyInformativeColumns(sparkSession) + val dfInformativeRenamed = dfInformative.withColumnRenamed("att1", "att1_renamed") + + val check = Check(CheckLevel.Error, "must fail as columns does not exist") + .isDataSynchronized(dfInformativeRenamed, colMapTwoCols, _ > 0.9, Some("must fail as columns does not exist")) + val context = runChecks(dfInformative, check) + + assertEvaluatesTo(check, context, CheckStatus.Error) + } + + "yield failure when row count varies in data sync test for multiple columns" in withSparkSession { sparkSession => + val dfInformative = getDfWithConditionallyInformativeColumns(sparkSession) + val dfInformativeFiltered = dfInformative.filter("att1 > 2") + + val check = Check(CheckLevel.Error, "must fail as columns does not exist") + .isDataSynchronized(dfInformativeFiltered, colMapTwoCols, _ > 0.9, Some("must fail as columns does not exist")) + val context = runChecks(dfInformative, check) + + assertEvaluatesTo(check, context, CheckStatus.Error) + } + + "yield failed assertion for 0.9 for multiple columns" in withSparkSession { sparkSession => + val df = getDfWithConditionallyInformativeColumns(sparkSession) + val modifiedDf = df.withColumn("att1", when(col("att1") === 3, 4) + .otherwise(col("att1"))) + + val check = Check(CheckLevel.Error, "must fail as rows mismatches") + .isDataSynchronized(modifiedDf, colMapTwoCols, _ > 0.9, Some("must fail as rows mismatches")) + val context = runChecks(df, check) + + assertEvaluatesTo(check, context, CheckStatus.Error) + + } + + "yield failed assertion for 0.6 for multiple columns" in withSparkSession { sparkSession => + val df = getDfWithConditionallyInformativeColumns(sparkSession) + val modifiedDf = df.withColumn("att1", when(col("att1") === 3, 4) + .otherwise(col("att1"))) + + val check = Check(CheckLevel.Error, "must be success as metric value is 0.66") + .isDataSynchronized(modifiedDf, colMapTwoCols, _ > 0.6, + Some("must be success as metric value is 0.66")) + val context = runChecks(df, check) + + assertSuccess(check, context) + } + + } + /** Run anomaly detection using a repository with some previous analysis results for testing */ private[this] def evaluateWithRepository(test: MetricsRepository => Unit): Unit = { diff --git a/src/test/scala/com/amazon/deequ/comparison/DataSynchronizationTest.scala b/src/test/scala/com/amazon/deequ/comparison/DataSynchronizationTest.scala index 932b3cbaa..74ebb70b2 100644 --- a/src/test/scala/com/amazon/deequ/comparison/DataSynchronizationTest.scala +++ b/src/test/scala/com/amazon/deequ/comparison/DataSynchronizationTest.scala @@ -1,5 +1,5 @@ /** - * Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License @@ -56,8 +56,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val compCols = Map("name" -> "name") val assertion: Double => Boolean = _ >= 0.60 - val result = (DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion)) - assert(result.isInstanceOf[ComparisonSucceeded]) + val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion) + assert(result.isInstanceOf[DataSynchronizationSucceeded]) } "match == 0.83 when id is colKey and state is compCols" in withSparkSession { spark => @@ -87,8 +87,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val compCols = Map("state" -> "state") val assertion: Double => Boolean = _ >= 0.80 - val result = (DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion)) - assert(result.isInstanceOf[ComparisonSucceeded]) + val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion) + assert(result.isInstanceOf[DataSynchronizationSucceeded]) } "return false because col name isn't unique" in withSparkSession { spark => @@ -118,8 +118,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val compCols = Map("state" -> "state") val assertion: Double => Boolean = _ >= 0.66 - val result = (DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion)) - assert(result.isInstanceOf[ComparisonFailed]) + val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion) + assert(result.isInstanceOf[DataSynchronizationFailed]) } "match >= 0.66 when id is unique col, rest compCols" in withSparkSession { spark => @@ -149,8 +149,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val compCols = Map("name" -> "name", "state" -> "state") val assertion: Double => Boolean = _ >= 0.60 - val result = (DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion)) - assert(result.isInstanceOf[ComparisonSucceeded]) + val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion) + assert(result.isInstanceOf[DataSynchronizationSucceeded]) } "match >= 0.66 (same test as above only the data sets change)" in withSparkSession{ spark => @@ -180,8 +180,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val compCols = Map("name" -> "name", "state" -> "state") val assertion: Double => Boolean = _ >= 0.60 - val result = (DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion)) - assert(result.isInstanceOf[ComparisonSucceeded]) + val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion) + assert(result.isInstanceOf[DataSynchronizationSucceeded]) } "return false because the id col in ds1 isn't unique" in withSparkSession { spark => @@ -212,8 +212,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val compCols = Map("name" -> "name", "state" -> "state") val assertion: Double => Boolean = _ >= 0.40 - val result = (DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion)) - assert(result.asInstanceOf[ComparisonFailed].errorMessage == + val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion) + assert(result.asInstanceOf[DataSynchronizationFailed].errorMessage == "The selected columns are not comparable due to duplicates present in the dataset." + "Comparison keys must be unique, but in Dataframe 1, there are 6 unique records and 7 rows, " + "and in Dataframe 2, there are 6 unique records and 6 rows, based on the combination of keys " + @@ -248,8 +248,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val compCols = Map("name" -> "name", "state" -> "state") val assertion: Double => Boolean = _ >= 0.40 - val result = (DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion)) - assert(result.isInstanceOf[ComparisonFailed]) + val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion) + assert(result.isInstanceOf[DataSynchronizationFailed]) } "return false because col state isn't unique" in withSparkSession { spark => @@ -280,7 +280,7 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val assertion: Double => Boolean = _ >= 0.66 val result = (DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion)) - assert(result.isInstanceOf[ComparisonFailed]) + assert(result.isInstanceOf[DataSynchronizationFailed]) } "check all columns and return an assertion of .66" in withSparkSession { spark => @@ -309,8 +309,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val colKeyMap = Map("id" -> "id") val assertion: Double => Boolean = _ >= 0.66 - val result = (DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion)) - assert(result.isInstanceOf[ComparisonSucceeded]) + val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion) + assert(result.isInstanceOf[DataSynchronizationSucceeded]) } "return false because state column isn't unique" in withSparkSession { spark => @@ -339,8 +339,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val colKeyMap = Map("state" -> "state") val assertion: Double => Boolean = _ >= 0.66 - val result = (DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion)) - assert(result.isInstanceOf[ComparisonFailed]) + val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion) + assert(result.isInstanceOf[DataSynchronizationFailed]) } "check all columns" in withSparkSession { spark => @@ -369,8 +369,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val colKeyMap = Map("id" -> "id") val assertion: Double => Boolean = _ >= 0.66 - val result = (DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion)) - assert(result.isInstanceOf[ComparisonSucceeded]) + val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion) + assert(result.isInstanceOf[DataSynchronizationSucceeded]) } "cols exist but 0 matches" in withSparkSession { spark => import spark.implicits._ @@ -398,8 +398,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val colKeyMap = Map("id" -> "id") val assertion: Double => Boolean = _ >= 0 - val result = (DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion)) - assert(result.isInstanceOf[ComparisonSucceeded]) + val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion) + assert(result.isInstanceOf[DataSynchronizationSucceeded]) } } @@ -643,7 +643,7 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { // Overall val assertion: Double => Boolean = _ >= 0.6 // 4 out of 6 rows match val overallResult = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion) - assert(overallResult.isInstanceOf[ComparisonSucceeded]) + assert(overallResult.isInstanceOf[DataSynchronizationSucceeded]) // Row Level val outcomeColName = "outcome" @@ -670,7 +670,7 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { // Overall val assertion: Double => Boolean = _ >= 0.6 // 4 out of 6 rows match val overallResult = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion) - assert(overallResult.isInstanceOf[ComparisonSucceeded]) + assert(overallResult.isInstanceOf[DataSynchronizationSucceeded]) // Row Level val outcomeColName = "outcome" @@ -700,8 +700,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val colKeyMap1 = Map(nonExistCol1 -> nonExistCol2) val overallResult1 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap1, assertion) - assert(overallResult1.isInstanceOf[ComparisonFailed]) - val failedOverallResult1 = overallResult1.asInstanceOf[ComparisonFailed] + assert(overallResult1.isInstanceOf[DataSynchronizationFailed]) + val failedOverallResult1 = overallResult1.asInstanceOf[DataSynchronizationFailed] assert(failedOverallResult1.errorMessage.contains("key columns were not found in the first dataset")) assert(failedOverallResult1.errorMessage.contains(nonExistCol1)) @@ -716,8 +716,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val colKeyMap2 = Map(nonExistCol1 -> idColumnName) val overallResult2 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap2, assertion) - assert(overallResult2.isInstanceOf[ComparisonFailed]) - val failedOverallResult2 = overallResult2.asInstanceOf[ComparisonFailed] + assert(overallResult2.isInstanceOf[DataSynchronizationFailed]) + val failedOverallResult2 = overallResult2.asInstanceOf[DataSynchronizationFailed] assert(failedOverallResult2.errorMessage.contains("key columns were not found in the first dataset")) assert(failedOverallResult2.errorMessage.contains(nonExistCol1)) @@ -732,8 +732,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val colKeyMap3 = Map(idColumnName -> nonExistCol2) val overallResult3 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap3, assertion) - assert(overallResult3.isInstanceOf[ComparisonFailed]) - val failedOverallResult3 = overallResult3.asInstanceOf[ComparisonFailed] + assert(overallResult3.isInstanceOf[DataSynchronizationFailed]) + val failedOverallResult3 = overallResult3.asInstanceOf[DataSynchronizationFailed] assert(failedOverallResult3.errorMessage.contains("key columns were not found in the second dataset")) assert(failedOverallResult3.errorMessage.contains(nonExistCol2)) @@ -800,7 +800,7 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val rowLevelResult3 = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap, Some(compColsMap3)) assert(rowLevelResult3.isLeft) val failedRowLevelResult3 = rowLevelResult3.left.get - assert(failedOverallResult3.errorMessage.contains( + assert(failedRowLevelResult3.errorMessage.contains( s"The following columns were not found in the second dataset: $nonExistCol2")) } } From 08a71abd31cf10ad8a6744c400dcaf60de81bc23 Mon Sep 17 00:00:00 2001 From: Karthik Penikalapati Date: Thu, 4 Jan 2024 02:31:26 -0800 Subject: [PATCH 2/3] review comments --- .../com/amazon/deequ/analyzers/Analyzer.scala | 66 +++---------- .../DataSynchronizationAnalyzer.scala | 94 +++++++++++++++++++ .../scala/com/amazon/deequ/checks/Check.scala | 30 ++++-- .../deequ/comparison/ComparisonResult.scala | 3 +- .../comparison/DataSynchronization.scala | 32 ++++--- .../amazon/deequ/constraints/Constraint.scala | 26 ++--- .../com/amazon/deequ/checks/CheckTest.scala | 1 + .../comparison/DataSynchronizationTest.scala | 12 +-- 8 files changed, 165 insertions(+), 99 deletions(-) create mode 100644 src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationAnalyzer.scala diff --git a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala index 1a016dc14..a80405825 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala @@ -1,5 +1,5 @@ /** - * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License @@ -19,15 +19,21 @@ package com.amazon.deequ.analyzers import com.amazon.deequ.analyzers.Analyzers._ import com.amazon.deequ.analyzers.NullBehavior.NullBehavior import com.amazon.deequ.analyzers.runners._ -import com.amazon.deequ.comparison.{DataSynchronization, DataSynchronizationFailed, DataSynchronizationSucceeded} -import com.amazon.deequ.metrics.{DoubleMetric, Entity, FullColumn, Metric} +import com.amazon.deequ.metrics.DoubleMetric +import com.amazon.deequ.metrics.Entity +import com.amazon.deequ.metrics.FullColumn +import com.amazon.deequ.metrics.Metric import com.amazon.deequ.utilities.ColumnUtil.removeEscapeColumn -import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession} +import org.apache.spark.sql.Column +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.Row +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import scala.language.existentials -import scala.util.{Failure, Success, Try} +import scala.util.Failure +import scala.util.Success /** * A state (sufficient statistic) computed from data, from which we can compute a metric. @@ -293,56 +299,6 @@ abstract class GroupingAnalyzer[S <: State[_], +M <: Metric[_]] extends Analyzer } } -/** - * Data Synchronization Analyzer - * - * @param dfToCompare DataFrame to compare - * @param columnMappings columns mappings - * @param assertion assertion logic - */ -case class DataSynchronizationAnalyzer(dfToCompare: DataFrame, - columnMappings: Map[String, String], - assertion: Double => Boolean) - extends Analyzer[DataSynchronizationState, DoubleMetric] { - - override def computeStateFrom(data: DataFrame): Option[DataSynchronizationState] = { - - val result = DataSynchronization.columnMatch(data, dfToCompare, columnMappings, assertion) - - result match { - case succeeded: DataSynchronizationSucceeded => - Some(DataSynchronizationState(succeeded.passedCount.getOrElse(0), succeeded.totalCount.getOrElse(0))) - case failed: DataSynchronizationFailed => - Some(DataSynchronizationState(failed.passedCount.getOrElse(0), failed.totalCount.getOrElse(0))) - case _ => None - } - } - - override def computeMetricFrom(state: Option[DataSynchronizationState]): DoubleMetric = { - - state match { - case Some(s) => DoubleMetric( - Entity.Dataset, - "DataSynchronization", - "", - Try(s.synchronizedDataCount.toDouble / s.dataCount.toDouble), - None - ) - case None => DoubleMetric( - Entity.Dataset, - "DataSynchronization", - "", - Try(0.0), - None - ) - } - } - - override private[deequ] def toFailureMetric(failure: Exception) = - metricFromFailure(failure, "DataSynchronization", "", Entity.Dataset) -} - - /** Helper method to check conditions on the schema of the data */ object Preconditions { diff --git a/src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationAnalyzer.scala b/src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationAnalyzer.scala new file mode 100644 index 000000000..e72fc86cb --- /dev/null +++ b/src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationAnalyzer.scala @@ -0,0 +1,94 @@ +/** + * Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License + * is located at + * + * http://aws.amazon.com/apache2.0/ + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.deequ.analyzers + +import com.amazon.deequ.analyzers.Analyzers.metricFromFailure +import com.amazon.deequ.comparison.DataSynchronization +import com.amazon.deequ.comparison.DataSynchronizationFailed +import com.amazon.deequ.comparison.DataSynchronizationSucceeded +import com.amazon.deequ.metrics.DoubleMetric +import com.amazon.deequ.metrics.Entity +import org.apache.spark.sql.DataFrame + +import scala.util.Failure +import scala.util.Try + + +/** + * An Analyzer for Deequ that performs a data synchronization check between two DataFrames. + * It evaluates the degree of synchronization based on specified column mappings and an assertion function. + * + * The analyzer computes a ratio of synchronized data points to the total data points, represented as a DoubleMetric. + * Refer to [[com.amazon.deequ.comparison.DataSynchronization.columnMatch]] for DataSynchronization implementation + * + * @param dfToCompare The DataFrame to compare with the primary DataFrame that is setup + * during [[com.amazon.deequ.VerificationSuite.onData]] setup. + * @param columnMappings A map where each key-value pair represents a column in the primary DataFrame + * and its corresponding column in dfToCompare. + * @param assertion A function that takes a Double (the match ratio) and returns a Boolean. + * It defines the condition for successful synchronization. + * + * Usage: + * This analyzer is used in Deequ's VerificationSuite based if `isDataSynchronized` check is defined or could be used + * manually as well. + * + * Example: + * val analyzer = DataSynchronizationAnalyzer(dfToCompare, Map("col1" -> "col2"), _ > 0.8) + * val verificationResult = VerificationSuite().onData(df).addAnalyzer(analyzer).run() + * + * // or could do something like below + * val verificationResult = VerificationSuite().onData(df).isDataSynchronized(dfToCompare, Map("col1" -> "col2"), + * _ > 0.8).run() + * + * + * The computeStateFrom method calculates the synchronization state by comparing the specified columns of the two + * DataFrames. + * The computeMetricFrom method then converts this state into a DoubleMetric representing the synchronization ratio. + * + */ +case class DataSynchronizationAnalyzer(dfToCompare: DataFrame, + columnMappings: Map[String, String], + assertion: Double => Boolean) + extends Analyzer[DataSynchronizationState, DoubleMetric] { + + override def computeStateFrom(data: DataFrame): Option[DataSynchronizationState] = { + + val result = DataSynchronization.columnMatch(data, dfToCompare, columnMappings, assertion) + + result match { + case succeeded: DataSynchronizationSucceeded => + Some(DataSynchronizationState(succeeded.passedCount, succeeded.totalCount)) + case failed: DataSynchronizationFailed => + Some(DataSynchronizationState(failed.passedCount.getOrElse(0), failed.totalCount.getOrElse(0))) + case _ => None + } + } + + override def computeMetricFrom(state: Option[DataSynchronizationState]): DoubleMetric = { + + val metric = state match { + case Some(s) => Try(s.synchronizedDataCount.toDouble / s.dataCount.toDouble) + case _ => Failure(new IllegalStateException("No state available for DataSynchronizationAnalyzer")) + } + + DoubleMetric(Entity.Dataset, "DataSynchronization", "", metric, None) + } + + override private[deequ] def toFailureMetric(failure: Exception) = + metricFromFailure(failure, "DataSynchronization", "", Entity.Dataset) +} + diff --git a/src/main/scala/com/amazon/deequ/checks/Check.scala b/src/main/scala/com/amazon/deequ/checks/Check.scala index e533ffc11..a02bc8f71 100644 --- a/src/main/scala/com/amazon/deequ/checks/Check.scala +++ b/src/main/scala/com/amazon/deequ/checks/Check.scala @@ -16,17 +16,29 @@ package com.amazon.deequ.checks -import com.amazon.deequ.analyzers.{Analyzer, AnalyzerOptions, DataSynchronizationState, DataSynchronizationAnalyzer, Histogram, KLLParameters, Patterns, State} -import com.amazon.deequ.anomalydetection.{AnomalyDetectionStrategy, AnomalyDetector, DataPoint} import com.amazon.deequ.analyzers.runners.AnalyzerContext +import com.amazon.deequ.analyzers.Analyzer +import com.amazon.deequ.analyzers.AnalyzerOptions +import com.amazon.deequ.analyzers.DataSynchronizationAnalyzer +import com.amazon.deequ.analyzers.DataSynchronizationState +import com.amazon.deequ.analyzers.Histogram +import com.amazon.deequ.analyzers.KLLParameters +import com.amazon.deequ.analyzers.Patterns +import com.amazon.deequ.analyzers.State +import com.amazon.deequ.anomalydetection.HistoryUtils +import com.amazon.deequ.anomalydetection.AnomalyDetectionStrategy +import com.amazon.deequ.anomalydetection.AnomalyDetector +import com.amazon.deequ.anomalydetection.DataPoint +import com.amazon.deequ.checks.ColumnCondition.isAnyNotNull +import com.amazon.deequ.checks.ColumnCondition.isEachNotNull import com.amazon.deequ.constraints.Constraint._ import com.amazon.deequ.constraints._ -import com.amazon.deequ.metrics.{BucketDistribution, Distribution, Metric} +import com.amazon.deequ.metrics.BucketDistribution +import com.amazon.deequ.metrics.Distribution +import com.amazon.deequ.metrics.Metric import com.amazon.deequ.repository.MetricsRepository -import org.apache.spark.sql.expressions.UserDefinedFunction -import com.amazon.deequ.anomalydetection.HistoryUtils -import com.amazon.deequ.checks.ColumnCondition.{isAnyNotNull, isEachNotNull} import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.expressions.UserDefinedFunction import scala.util.matching.Regex @@ -363,11 +375,10 @@ case class Check( */ def isDataSynchronized(otherDf: DataFrame, columnMappings: Map[String, String], assertion: Double => Boolean, hint: Option[String] = None): Check = { - val dataSyncAnalyzer = DataSynchronizationAnalyzer(otherDf, columnMappings, assertion) - val constraint = DataSynchronizationConstraint(dataSyncAnalyzer, hint) + val constraint = AnalysisBasedConstraint[DataSynchronizationState, Double, Double](dataSyncAnalyzer, assertion, + hint = hint) addConstraint(constraint) - } /** @@ -1126,7 +1137,6 @@ case class Check( } .collect { case constraint: AnalysisBasedConstraint[_, _, _] => constraint.analyzer - case constraint: DataSynchronizationConstraint => constraint.analyzer } .map { _.asInstanceOf[Analyzer[_, Metric[_]]] } .toSet diff --git a/src/main/scala/com/amazon/deequ/comparison/ComparisonResult.scala b/src/main/scala/com/amazon/deequ/comparison/ComparisonResult.scala index c7654ba32..67b4d4b47 100644 --- a/src/main/scala/com/amazon/deequ/comparison/ComparisonResult.scala +++ b/src/main/scala/com/amazon/deequ/comparison/ComparisonResult.scala @@ -23,5 +23,4 @@ case class ComparisonSucceeded(ratio: Double = 0) extends ComparisonResult case class DataSynchronizationFailed(errorMessage: String, passedCount: Option[Long] = None, totalCount: Option[Long] = None) extends ComparisonResult -case class DataSynchronizationSucceeded(passedCount: Option[Long] = None, totalCount: Option[Long] = None) - extends ComparisonResult +case class DataSynchronizationSucceeded(passedCount: Long, totalCount: Long) extends ComparisonResult diff --git a/src/main/scala/com/amazon/deequ/comparison/DataSynchronization.scala b/src/main/scala/com/amazon/deequ/comparison/DataSynchronization.scala index dea665aab..992dc48d0 100644 --- a/src/main/scala/com/amazon/deequ/comparison/DataSynchronization.scala +++ b/src/main/scala/com/amazon/deequ/comparison/DataSynchronization.scala @@ -1,5 +1,5 @@ /** - * Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License @@ -16,7 +16,8 @@ package com.amazon.deequ.comparison -import org.apache.spark.sql.{Column, DataFrame} +import org.apache.spark.sql.Column +import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions.col import org.apache.spark.sql.functions.hash import org.apache.spark.sql.functions.lit @@ -101,7 +102,7 @@ object DataSynchronization extends ComparisonBase { val nonKeyColsMatch = colsDS1.forall(columnExists(ds2, _)) if (!nonKeyColsMatch) { - ComparisonFailed("Non key columns in the given data frames do not match.") + DataSynchronizationFailed("Non key columns in the given data frames do not match.") } else { val mergedMaps = colKeyMap ++ colsDS1.map(x => x -> x).toMap finalAssertion(ds1, ds2, mergedMaps, assertion) @@ -137,10 +138,10 @@ object DataSynchronization extends ComparisonBase { val nonKeyColumns2NotInDataset = compCols.values.filterNot(columnExists(ds2, _)) if (nonKeyColumns1NotInDataset.nonEmpty) { - ComparisonFailed(s"The following columns were not found in the first dataset: " + + DataSynchronizationFailed(s"The following columns were not found in the first dataset: " + s"${nonKeyColumns1NotInDataset.mkString(", ")}") } else if (nonKeyColumns2NotInDataset.nonEmpty) { - ComparisonFailed(s"The following columns were not found in the second dataset: " + + DataSynchronizationFailed(s"The following columns were not found in the second dataset: " + s"${nonKeyColumns2NotInDataset.mkString(", ")}") } else { val mergedMaps = colKeyMap ++ compCols @@ -155,23 +156,24 @@ object DataSynchronization extends ComparisonBase { ds2: DataFrame, colKeyMap: Map[String, String], optionalCompCols: Option[Map[String, String]] = None, - optionalOutcomeColumnName: Option[String] = None): Either[ComparisonFailed, DataFrame] = { + optionalOutcomeColumnName: Option[String] = None): + Either[DataSynchronizationFailed, DataFrame] = { val columnErrors = areKeyColumnsValid(ds1, ds2, colKeyMap) if (columnErrors.isEmpty) { - val compColsEither: Either[ComparisonFailed, Map[String, String]] = if (optionalCompCols.isDefined) { + val compColsEither: Either[DataSynchronizationFailed, Map[String, String]] = if (optionalCompCols.isDefined) { optionalCompCols.get match { - case compCols if compCols.isEmpty => Left(ComparisonFailed("Empty column comparison map provided.")) + case compCols if compCols.isEmpty => Left(DataSynchronizationFailed("Empty column comparison map provided.")) case compCols => val ds1CompColsNotInDataset = compCols.keys.filterNot(columnExists(ds1, _)) val ds2CompColsNotInDataset = compCols.values.filterNot(columnExists(ds2, _)) if (ds1CompColsNotInDataset.nonEmpty) { Left( - ComparisonFailed(s"The following columns were not found in the first dataset: " + + DataSynchronizationFailed(s"The following columns were not found in the first dataset: " + s"${ds1CompColsNotInDataset.mkString(", ")}") ) } else if (ds2CompColsNotInDataset.nonEmpty) { Left( - ComparisonFailed(s"The following columns were not found in the second dataset: " + + DataSynchronizationFailed(s"The following columns were not found in the second dataset: " + s"${ds2CompColsNotInDataset.mkString(", ")}") ) } else { @@ -184,7 +186,7 @@ object DataSynchronization extends ComparisonBase { val nonKeyColsMatch = ds1NonKeyCols.forall(columnExists(ds2, _)) if (!nonKeyColsMatch) { - Left(ComparisonFailed("Non key columns in the given data frames do not match.")) + Left(DataSynchronizationFailed("Non key columns in the given data frames do not match.")) } else { Right(ds1NonKeyCols.map { c => c -> c}.toMap) } @@ -196,11 +198,11 @@ object DataSynchronization extends ComparisonBase { case Success(df) => Right(df) case Failure(ex) => ex.printStackTrace() - Left(ComparisonFailed(s"Comparison failed due to ${ex.getCause.getClass}")) + Left(DataSynchronizationFailed(s"Comparison failed due to ${ex.getCause.getClass}")) } } } else { - Left(ComparisonFailed(columnErrors.get)) + Left(DataSynchronizationFailed(columnErrors.get)) } } @@ -253,7 +255,7 @@ object DataSynchronization extends ComparisonBase { val ds2Count = ds2.count() if (ds1Count != ds2Count) { - ComparisonFailed(s"The row counts of the two data frames do not match.") + DataSynchronizationFailed(s"The row counts of the two data frames do not match.") } else { val joinExpression: Column = mergedMaps .map { case (col1, col2) => ds1(col1) === ds2(col2)} @@ -265,7 +267,7 @@ object DataSynchronization extends ComparisonBase { val ratio = passedCount.toDouble / totalCount.toDouble if (assertion(ratio)) { - DataSynchronizationSucceeded(Some(passedCount), Some(totalCount)) + DataSynchronizationSucceeded(passedCount, totalCount) } else { DataSynchronizationFailed(s"Data Synchronization Comparison Metric Value: $ratio does not meet the constraint" + s"requirement.", Some(passedCount), Some(totalCount)) diff --git a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala index b6e009822..5bb8d477e 100644 --- a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala +++ b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala @@ -1,5 +1,5 @@ /** - * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License @@ -17,10 +17,13 @@ package com.amazon.deequ.constraints import com.amazon.deequ.analyzers._ -import com.amazon.deequ.metrics.{BucketDistribution, Distribution, DoubleMetric, Metric} +import com.amazon.deequ.metrics.BucketDistribution +import com.amazon.deequ.metrics.Distribution +import com.amazon.deequ.metrics.Metric import org.apache.spark.sql.expressions.UserDefinedFunction -import scala.util.{Success, Try} +import scala.util.Failure +import scala.util.Success import scala.util.matching.Regex object ConstraintStatus extends Enumeration { @@ -909,18 +912,19 @@ case class DataSynchronizationConstraint(analyzer: DataSynchronizationAnalyzer, override def evaluate(metrics: Map[Analyzer[_, Metric[_]], Metric[_]]): ConstraintResult = { - val anz = Try(metrics.filter(i => i._1.isInstanceOf[DataSynchronizationAnalyzer]).head._2) - anz match { - case Success(m: DoubleMetric) => - val result = m.value match { + metrics.collectFirst { + case (_: DataSynchronizationAnalyzer, metric: Metric[Double]) => metric + } match { + case Some(metric) => + val result = metric.value match { case Success(value) => analyzer.assertion(value) - case _ => false + case Failure(_) => false } val status = if (result) ConstraintStatus.Success else ConstraintStatus.Failure - ConstraintResult(this, status, hint, Some(m)) + ConstraintResult(this, status, hint, Some(metric)) - case _ => - ConstraintResult(this, ConstraintStatus.Failure, hint, anz.toOption) + case None => + ConstraintResult(this, ConstraintStatus.Failure, hint, None) } } } diff --git a/src/test/scala/com/amazon/deequ/checks/CheckTest.scala b/src/test/scala/com/amazon/deequ/checks/CheckTest.scala index 408154fe0..505e6d137 100644 --- a/src/test/scala/com/amazon/deequ/checks/CheckTest.scala +++ b/src/test/scala/com/amazon/deequ/checks/CheckTest.scala @@ -1134,6 +1134,7 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix .isDataSynchronized(dfInformativeRenamed, colMapAtt1, _ > 0.9, Some("must fail as columns does not exist")) val context = runChecks(dfInformative, check) assertEvaluatesTo(check, context, CheckStatus.Error) + println(context) } "yield failure when row count varies in data sync test for 1 col" in withSparkSession { sparkSession => diff --git a/src/test/scala/com/amazon/deequ/comparison/DataSynchronizationTest.scala b/src/test/scala/com/amazon/deequ/comparison/DataSynchronizationTest.scala index 74ebb70b2..dd3a002da 100644 --- a/src/test/scala/com/amazon/deequ/comparison/DataSynchronizationTest.scala +++ b/src/test/scala/com/amazon/deequ/comparison/DataSynchronizationTest.scala @@ -759,8 +759,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val compColsMap1 = Map(nonExistCol1 -> nonExistCol2) val overallResult1 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compColsMap1, assertion) - assert(overallResult1.isInstanceOf[ComparisonFailed]) - val failedOverallResult1 = overallResult1.asInstanceOf[ComparisonFailed] + assert(overallResult1.isInstanceOf[DataSynchronizationFailed]) + val failedOverallResult1 = overallResult1.asInstanceOf[DataSynchronizationFailed] assert(failedOverallResult1.errorMessage.contains( s"The following columns were not found in the first dataset: $nonExistCol1")) @@ -775,8 +775,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val compColsMap2 = Map(nonExistCol1 -> "State") val overallResult2 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compColsMap2, assertion) - assert(overallResult2.isInstanceOf[ComparisonFailed]) - val failedOverallResult2 = overallResult2.asInstanceOf[ComparisonFailed] + assert(overallResult2.isInstanceOf[DataSynchronizationFailed]) + val failedOverallResult2 = overallResult2.asInstanceOf[DataSynchronizationFailed] assert(failedOverallResult2.errorMessage.contains( s"The following columns were not found in the first dataset: $nonExistCol1")) @@ -791,8 +791,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val compColsMap3 = Map("state" -> nonExistCol2) val overallResult3 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compColsMap3, assertion) - assert(overallResult3.isInstanceOf[ComparisonFailed]) - val failedOverallResult3 = overallResult3.asInstanceOf[ComparisonFailed] + assert(overallResult3.isInstanceOf[DataSynchronizationFailed]) + val failedOverallResult3 = overallResult3.asInstanceOf[DataSynchronizationFailed] assert(failedOverallResult3.errorMessage.contains( s"The following columns were not found in the second dataset: $nonExistCol2")) From 6c1f97e1dab3388bf0960c7ac0d03e5af0eae72c Mon Sep 17 00:00:00 2001 From: Karthik Penikalapati Date: Thu, 4 Jan 2024 16:00:47 -0800 Subject: [PATCH 3/3] update test and doc strings --- .../DataSynchronizationAnalyzer.scala | 2 +- .../analyzers/DataSynchronizationState.scala | 24 ++- .../scala/com/amazon/deequ/checks/Check.scala | 22 +++ .../amazon/deequ/VerificationSuiteTest.scala | 178 +++++++++++++++++- 4 files changed, 215 insertions(+), 11 deletions(-) diff --git a/src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationAnalyzer.scala b/src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationAnalyzer.scala index e72fc86cb..1d7e37533 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationAnalyzer.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationAnalyzer.scala @@ -81,7 +81,7 @@ case class DataSynchronizationAnalyzer(dfToCompare: DataFrame, override def computeMetricFrom(state: Option[DataSynchronizationState]): DoubleMetric = { val metric = state match { - case Some(s) => Try(s.synchronizedDataCount.toDouble / s.dataCount.toDouble) + case Some(s) => Try(s.synchronizedDataCount.toDouble / s.totalDataCount.toDouble) case _ => Failure(new IllegalStateException("No state available for DataSynchronizationAnalyzer")) } diff --git a/src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationState.scala b/src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationState.scala index dec49f530..e0321df35 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationState.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationState.scala @@ -17,19 +17,31 @@ package com.amazon.deequ.analyzers /** - * To store state of DataSynchronization + * Represents the state of data synchronization between two DataFrames in Deequ. + * This state keeps track of the count of synchronized record count and the total record count. + * It is used to calculate a ratio of synchronization, which is a measure of how well the data + * in the two DataFrames are synchronized. + * + * @param synchronizedDataCount The count of records that are considered synchronized between the two DataFrames. + * @param totalDataCount The total count of records for check. + * + * The `sum` method allows for aggregation of this state with another, combining the counts from both states. + * This is useful in distributed computations where states from different partitions need to be aggregated. + * + * The `metricValue` method computes the synchronization ratio. It is the ratio of `synchronizedDataCount` + * to `dataCount`. + * If `dataCount` is zero, which means no data points were examined, the method returns `Double.NaN` + * to indicate the undefined state. * - * @param synchronizedDataCount - Count Of rows that are in sync - * @param dataCount - total count of records to caluculate ratio. */ -case class DataSynchronizationState(synchronizedDataCount: Long, dataCount: Long) +case class DataSynchronizationState(synchronizedDataCount: Long, totalDataCount: Long) extends DoubleValuedState[DataSynchronizationState] { override def sum(other: DataSynchronizationState): DataSynchronizationState = { - DataSynchronizationState(synchronizedDataCount + other.synchronizedDataCount, dataCount + other.dataCount) + DataSynchronizationState(synchronizedDataCount + other.synchronizedDataCount, totalDataCount + other.totalDataCount) } override def metricValue(): Double = { - if (dataCount == 0L) Double.NaN else synchronizedDataCount.toDouble / dataCount.toDouble + if (totalDataCount == 0L) Double.NaN else synchronizedDataCount.toDouble / totalDataCount.toDouble } } diff --git a/src/main/scala/com/amazon/deequ/checks/Check.scala b/src/main/scala/com/amazon/deequ/checks/Check.scala index a02bc8f71..9f6f6ea03 100644 --- a/src/main/scala/com/amazon/deequ/checks/Check.scala +++ b/src/main/scala/com/amazon/deequ/checks/Check.scala @@ -359,6 +359,27 @@ case class Check( * Utilizes [[com.amazon.deequ.analyzers.DataSynchronizationAnalyzer]] for comparing the data * and Constraint [[com.amazon.deequ.constraints.DataSynchronizationConstraint]]. * + * Usage: + * To use this method, create a VerificationSuite and invoke this method as part of adding checks: + * {{{ + * val baseDataFrame: DataFrame = ... + * val otherDataFrame: DataFrame = ... + * val columnMappings: Map[String, String] = Map("baseCol1" -> "otherCol1", "baseCol2" -> "otherCol2") + * val assertionFunction: Double => Boolean = _ > 0.7 + * + * val check = new Check(CheckLevel.Error, "Data Synchronization Check") + * .isDataSynchronized(otherDataFrame, columnMappings, assertionFunction) + * + * val verificationResult = VerificationSuite() + * .onData(baseDataFrame) + * .addCheck(check) + * .run() + * }}} + * + * This will add a data synchronization check to the VerificationSuite, comparing the specified columns of + * baseDataFrame and otherDataFrame based on the provided assertion function. + * + * * @param otherDf The DataFrame to be compared with the current one. Analyzed in conjunction with the * current DataFrame to assess data synchronization. * @param columnMappings A map defining the column correlations between the current DataFrame and otherDf. @@ -372,6 +393,7 @@ case class Check( * @return A [[com.amazon.deequ.checks.Check]] object representing the outcome * of the synchronization check. This object can be used in Deequ's verification suite to * assert data quality constraints. + * */ def isDataSynchronized(otherDf: DataFrame, columnMappings: Map[String, String], assertion: Double => Boolean, hint: Option[String] = None): Check = { diff --git a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala index d9d5fa012..e260d2f18 100644 --- a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala +++ b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala @@ -19,10 +19,13 @@ package com.amazon.deequ import com.amazon.deequ.analyzers._ import com.amazon.deequ.analyzers.runners.AnalyzerContext import com.amazon.deequ.anomalydetection.AbsoluteChangeStrategy -import com.amazon.deequ.checks.{Check, CheckLevel, CheckStatus} -import com.amazon.deequ.constraints.{Constraint, ConstraintResult} +import com.amazon.deequ.checks.Check +import com.amazon.deequ.checks.CheckLevel +import com.amazon.deequ.checks.CheckStatus +import com.amazon.deequ.constraints.Constraint import com.amazon.deequ.io.DfsUtils -import com.amazon.deequ.metrics.{DoubleMetric, Entity, Metric} +import com.amazon.deequ.metrics.DoubleMetric +import com.amazon.deequ.metrics.Entity import com.amazon.deequ.repository.MetricsRepository import com.amazon.deequ.repository.ResultKey import com.amazon.deequ.repository.memory.InMemoryMetricsRepository @@ -30,6 +33,9 @@ import com.amazon.deequ.utils.CollectionUtils.SeqExtensions import com.amazon.deequ.utils.FixtureSupport import com.amazon.deequ.utils.TempFileUtils import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.Row +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.functions.when import org.scalamock.scalatest.MockFactory import org.scalatest.Matchers import org.scalatest.WordSpec @@ -805,7 +811,7 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec .hasCompleteness("fake", x => x > 0) val checkHasDataInSyncTest = Check(CheckLevel.Error, "shouldSucceedForAge") - .isDataSynchronized(df, Map("age" -> "age"), _ > 0.99, Some("shouldpass")) + .isDataSynchronized(df, Map("age" -> "age"), _ > 0.99, Some("shouldPass")) val verificationResult = VerificationSuite() .onData(df) @@ -978,6 +984,170 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec List(Some("Value: 0.125 does not meet the constraint requirement!")) assert(subsetNameFailResult.status == CheckStatus.Error) } + + "Should work Data Synchronization checks for single column" in withSparkSession { + sparkSession => + val df = getDateDf(sparkSession).select("id", "product", "product_id", "units") + val dfModified = df.withColumn("id", when(col("id") === 100, 99) + .otherwise(col("id"))) + val dfColRenamed = df.withColumnRenamed("id", "id_renamed") + + val dataSyncCheckPass = Check(CheckLevel.Error, "data synchronization check pass") + .isDataSynchronized(dfModified, Map("id" -> "id"), _ > 0.7, Some("shouldPass")) + + val dataSyncCheckFail = Check(CheckLevel.Error, "data synchronization check fail") + .isDataSynchronized(dfModified, Map("id" -> "id"), _ > 0.9, Some("shouldFail")) + + val emptyDf = sparkSession.createDataFrame(sparkSession.sparkContext.emptyRDD[Row], df.schema) + val dataSyncCheckEmpty = Check(CheckLevel.Error, "data synchronization check on empty DataFrame") + .isDataSynchronized(emptyDf, Map("id" -> "id"), _ < 0.5) + + val dataSyncCheckColMismatchDestination = + Check(CheckLevel.Error, "data synchronization check col mismatch in destination") + .isDataSynchronized(dfModified, Map("id" -> "id2"), _ < 0.5) + + val dataSyncCheckColMismatchSource = + Check(CheckLevel.Error, "data synchronization check col mismatch in source") + .isDataSynchronized(dfModified, Map("id2" -> "id"), _ < 0.5) + + val dataSyncCheckColRenamed = + Check(CheckLevel.Error, "data synchronization check col names renamed") + .isDataSynchronized(dfColRenamed, Map("id" -> "id_renamed"), _ == 1.0) + + val dataSyncFullMatch = + Check(CheckLevel.Error, "data synchronization check full match") + .isDataSynchronized(df, Map("id" -> "id"), _ == 1.0) + + + val verificationResult = VerificationSuite() + .onData(df) + .addCheck(dataSyncCheckPass) + .addCheck(dataSyncCheckFail) + .addCheck(dataSyncCheckEmpty) + .addCheck(dataSyncCheckColMismatchDestination) + .addCheck(dataSyncCheckColMismatchSource) + .addCheck(dataSyncCheckColRenamed) + .addCheck(dataSyncFullMatch) + .run() + + val passResult = verificationResult.checkResults(dataSyncCheckPass) + passResult.constraintResults.map(_.message) shouldBe + List(None) + assert(passResult.status == CheckStatus.Success) + + val failResult = verificationResult.checkResults(dataSyncCheckFail) + failResult.constraintResults.map(_.message) shouldBe + List(Some("Value: 0.8 does not meet the constraint requirement! shouldFail")) + assert(failResult.status == CheckStatus.Error) + + val emptyResult = verificationResult.checkResults(dataSyncCheckEmpty) + emptyResult.constraintResults.map(_.message) shouldBe + List(Some("Value: NaN does not meet the constraint requirement!")) + assert(emptyResult.status == CheckStatus.Error) + + val colMismatchDestResult = verificationResult.checkResults(dataSyncCheckColMismatchDestination) + colMismatchDestResult.constraintResults.map(_.message) shouldBe + List(Some("Value: NaN does not meet the constraint requirement!")) + assert(colMismatchDestResult.status == CheckStatus.Error) + + val colMismatchSourceResult = verificationResult.checkResults(dataSyncCheckColMismatchSource) + colMismatchSourceResult.constraintResults.map(_.message) shouldBe + List(Some("Value: NaN does not meet the constraint requirement!")) + assert(colMismatchSourceResult.status == CheckStatus.Error) + + val colRenamedResult = verificationResult.checkResults(dataSyncCheckColRenamed) + colRenamedResult.constraintResults.map(_.message) shouldBe List(None) + assert(colRenamedResult.status == CheckStatus.Success) + + val fullMatchResult = verificationResult.checkResults(dataSyncFullMatch) + fullMatchResult.constraintResults.map(_.message) shouldBe List(None) + assert(fullMatchResult.status == CheckStatus.Success) + + } + + "Should work Data Synchronization checks for multiple column" in withSparkSession { + sparkSession => + val df = getDateDf(sparkSession).select("id", "product", "product_id", "units") + val dfModified = df.withColumn("id", when(col("id") === 100, 99) + .otherwise(col("id"))) + val dfColRenamed = df.withColumnRenamed("id", "id_renamed") + val colMap = Map("id" -> "id", "product" -> "product") + + val dataSyncCheckPass = Check(CheckLevel.Error, "data synchronization check") + .isDataSynchronized(dfModified, colMap, _ > 0.7, Some("shouldPass")) + + val dataSyncCheckFail = Check(CheckLevel.Error, "data synchronization check") + .isDataSynchronized(dfModified, colMap, _ > 0.9, Some("shouldFail")) + + val emptyDf = sparkSession.createDataFrame(sparkSession.sparkContext.emptyRDD[Row], df.schema) + val dataSyncCheckEmpty = Check(CheckLevel.Error, "data synchronization check on empty DataFrame") + .isDataSynchronized(emptyDf, colMap, _ < 0.5) + + val dataSyncCheckColMismatchDestination = + Check(CheckLevel.Error, "data synchronization check col mismatch in destination") + .isDataSynchronized(dfModified, colMap, _ > 0.9) + + val dataSyncCheckColMismatchSource = + Check(CheckLevel.Error, "data synchronization check col mismatch in source") + .isDataSynchronized(dfModified, Map("id2" -> "id", "product" -> "product"), _ < 0.5) + + val dataSyncCheckColRenamed = + Check(CheckLevel.Error, "data synchronization check col names renamed") + .isDataSynchronized(dfColRenamed, Map("id" -> "id_renamed", "product" -> "product"), _ == 1.0, + Some("shouldPass")) + + val dataSyncFullMatch = + Check(CheckLevel.Error, "data synchronization check col full match") + .isDataSynchronized(df, colMap, _ == 1, Some("shouldPass")) + + + val verificationResult = VerificationSuite() + .onData(df) + .addCheck(dataSyncCheckPass) + .addCheck(dataSyncCheckFail) + .addCheck(dataSyncCheckEmpty) + .addCheck(dataSyncCheckColMismatchDestination) + .addCheck(dataSyncCheckColMismatchSource) + .addCheck(dataSyncCheckColRenamed) + .addCheck(dataSyncFullMatch) + .run() + + val passResult = verificationResult.checkResults(dataSyncCheckPass) + passResult.constraintResults.map(_.message) shouldBe + List(None) + assert(passResult.status == CheckStatus.Success) + + val failResult = verificationResult.checkResults(dataSyncCheckFail) + failResult.constraintResults.map(_.message) shouldBe + List(Some("Value: 0.8 does not meet the constraint requirement! shouldFail")) + assert(failResult.status == CheckStatus.Error) + + val emptyResult = verificationResult.checkResults(dataSyncCheckEmpty) + emptyResult.constraintResults.map(_.message) shouldBe + List(Some("Value: NaN does not meet the constraint requirement!")) + assert(emptyResult.status == CheckStatus.Error) + + val colMismatchDestResult = verificationResult.checkResults(dataSyncCheckColMismatchDestination) + colMismatchDestResult.constraintResults.map(_.message) shouldBe + List(Some("Value: 0.8 does not meet the constraint requirement!")) + assert(colMismatchDestResult.status == CheckStatus.Error) + + val colMismatchSourceResult = verificationResult.checkResults(dataSyncCheckColMismatchSource) + colMismatchSourceResult.constraintResults.map(_.message) shouldBe + List(Some("Value: NaN does not meet the constraint requirement!")) + assert(colMismatchSourceResult.status == CheckStatus.Error) + + val colRenamedResult = verificationResult.checkResults(dataSyncCheckColRenamed) + colRenamedResult.constraintResults.map(_.message) shouldBe + List(None) + assert(colRenamedResult.status == CheckStatus.Success) + + val fullMatchResult = verificationResult.checkResults(dataSyncFullMatch) + fullMatchResult.constraintResults.map(_.message) shouldBe + List(None) + assert(fullMatchResult.status == CheckStatus.Success) + + } } /** Run anomaly detection using a repository with some previous analysis results for testing */