Skip to content

Commit

Permalink
[Data Synchronization/Matching] Delegate to Spark for checking existe…
Browse files Browse the repository at this point in the history
…nce of columns in the given dataframes (#515)

- Prior to this change, we were doing case sensitive equality checks of non-key columns.
- This makes the utility more restrictive, as Spark does not care about the casing of column names.
- With this change, we rely on Spark to check if a column exists in the given dataframe. If Spark can find the column, we can proceed with the rest of the check.
  • Loading branch information
rdsharma26 committed Apr 16, 2024
1 parent 4f1e4fa commit 9b32b09
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@ object DataSynchronization extends ComparisonBase {
assertion: Double => Boolean): ComparisonResult = {
val columnErrors = areKeyColumnsValid(ds1, ds2, colKeyMap)
if (columnErrors.isEmpty) {
// Get all the non-key columns from DS1 and verify that they are present in DS2
val colsDS1 = ds1.columns.filterNot(x => colKeyMap.keys.toSeq.contains(x)).sorted
val colsDS2 = ds2.columns.filterNot(x => colKeyMap.values.toSeq.contains(x)).sorted
val nonKeyColsMatch = colsDS1.forall { col => Try { ds2(col) }.isSuccess }

if (!(colsDS1 sameElements colsDS2)) {
if (!nonKeyColsMatch) {
ComparisonFailed("Non key columns in the given data frames do not match.")
} else {
val mergedMaps = colKeyMap ++ colsDS1.map(x => x -> x).toMap
Expand Down Expand Up @@ -152,9 +153,11 @@ object DataSynchronization extends ComparisonBase {
case compCols => Right(compCols)
}
} else {
// Get all the non-key columns from DS1 and verify that they are present in DS2
val ds1NonKeyCols = ds1.columns.filterNot(x => colKeyMap.keys.toSeq.contains(x)).sorted
val ds2NonKeyCols = ds2.columns.filterNot(x => colKeyMap.values.toSeq.contains(x)).sorted
if (!(ds1NonKeyCols sameElements ds2NonKeyCols)) {
val nonKeyColsMatch = ds1NonKeyCols.forall { col => Try { ds2(col) }.isSuccess }

if (!nonKeyColsMatch) {
Left(ComparisonFailed("Non key columns in the given data frames do not match."))
} else {
Right(ds1NonKeyCols.map { c => c -> c}.toMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,4 +596,95 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec {
assert(expected == rowLevelResults)
}
}

"Data Synchronization Schema Test for non key columns" should {
def primaryDataset(spark: SparkSession, idColumnName: String): DataFrame = {
import spark.implicits._
spark.sparkContext.parallelize(
Seq(
(1, "John", "NY"),
(2, "Javier", "WI"),
(3, "Helena", "TX"),
(4, "Helena", "TX"),
(5, "Nick", "FL"),
(6, "Molly", "TX")
)
).toDF(idColumnName, "name", "state") // all lower case
}

def referenceDataset(spark: SparkSession, idColumnName: String): DataFrame = {
import spark.implicits._
spark.sparkContext.parallelize(
Seq(
(1, "John", "NY"),
(2, "Javier", "WI"),
(3, "Helena", "TX"),
(4, "Helena", "TX"),
(5, "Nicholas", "FL"),
(6, "Ms Molly", "TX")
)
).toDF(idColumnName, "Name", "State") // upper case except for id
}

"works when key column names have different casings" in withSparkSession { spark =>
val id1ColumnName = "id"
val id2ColumnName = "ID"
val ds1 = primaryDataset(spark, id1ColumnName)
val ds2 = referenceDataset(spark, id2ColumnName)

// Not using id1ColumnName -> id2ColumnName intentionally.
// In Glue DQ, we accept the column names in two formats: mapping and non-mapping
// Mapping format is "col1 -> col2", when customer wants to compare columns with different names.
// Non-mapping format is just "col1", when customer has same column, regardless of case, in both datasets.
// A non-mapping format would translate it into the map below.
// We want to test that the functionality works as expected in that case.
val colKeyMap = Map(id1ColumnName -> id1ColumnName)

// Overall
val assertion: Double => Boolean = _ >= 0.6 // 4 out of 6 rows match
val overallResult = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion)
assert(overallResult.isInstanceOf[ComparisonSucceeded])

// Row Level
val outcomeColName = "outcome"
val rowLevelResult = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap, None, Some(outcomeColName))

assert(rowLevelResult.isRight)

val dfResult = rowLevelResult.right.get
val rowLevelResults = dfResult.collect().map { row =>
row.getAs[Int]("id") -> row.getAs[Boolean](outcomeColName)
}.toMap

val expected = Map(1 -> true, 2 -> true, 3 -> true, 4 -> true, 5 -> false, 6 -> false)
assert(expected == rowLevelResults)
}

"works when non-key column names have different casings" in withSparkSession { spark =>
val idColumnName = "id"
val ds1 = primaryDataset(spark, idColumnName)
val ds2 = referenceDataset(spark, idColumnName)

val colKeyMap = Map(idColumnName -> idColumnName)

// Overall
val assertion: Double => Boolean = _ >= 0.6 // 4 out of 6 rows match
val overallResult = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion)
assert(overallResult.isInstanceOf[ComparisonSucceeded])

// Row Level
val outcomeColName = "outcome"
val rowLevelResult = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap, None, Some(outcomeColName))

assert(rowLevelResult.isRight)

val dfResult = rowLevelResult.right.get
val rowLevelResults = dfResult.collect().map { row =>
row.getAs[Int]("id") -> row.getAs[Boolean](outcomeColName)
}.toMap

val expected = Map(1 -> true, 2 -> true, 3 -> true, 4 -> true, 5 -> false, 6 -> false)
assert(expected == rowLevelResults)
}
}
}

0 comments on commit 9b32b09

Please sign in to comment.