From 414f4d2eb66072253ce208e931bb87b9272e3222 Mon Sep 17 00:00:00 2001 From: James Stocker Date: Mon, 24 Jul 2023 13:59:44 +0200 Subject: [PATCH 1/4] Apply recommended patch from Igor --- build.sbt | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/build.sbt b/build.sbt index 2a58a59e..7de36b26 100644 --- a/build.sbt +++ b/build.sbt @@ -37,14 +37,13 @@ lazy val root = (project in file(".")).settings( ShadeRule.rename("org.yaml.snakeyaml.**" -> "com.scylladb.shaded.@1").inAll ), assemblyMergeStrategy in assembly := { + case PathList("META-INF", xs @ _*) => MergeStrategy.discard case PathList("org", "joda", "time", _ @_*) => MergeStrategy.first case PathList("org", "apache", "commons", "logging", _ @_*) => MergeStrategy.first case PathList("com", "fasterxml", "jackson", "annotation", _ @_*) => MergeStrategy.first case PathList("com", "fasterxml", "jackson", "core", _ @_*) => MergeStrategy.first case PathList("com", "fasterxml", "jackson", "databind", _ @_*) => MergeStrategy.first - case x => - val oldStrategy = (assemblyMergeStrategy in assembly).value - oldStrategy(x) + case x => MergeStrategy.first }, // uses compile classpath for the run task, including "provided" jar (cf http://stackoverflow.com/a/21803413/3827) run in Compile := Defaults From f964d8433a3deee69ed92ee154ec252a453f27e3 Mon Sep 17 00:00:00 2001 From: James Stocker Date: Wed, 9 Aug 2023 17:19:45 +0200 Subject: [PATCH 2/4] Add a method to copy missing rows --- .../com/scylladb/migrator/Validator.scala | 57 ++++++++++++++++++- 1 file changed, 55 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/scylladb/migrator/Validator.scala b/src/main/scala/com/scylladb/migrator/Validator.scala index 656255f0..2a578edd 100644 --- a/src/main/scala/com/scylladb/migrator/Validator.scala +++ b/src/main/scala/com/scylladb/migrator/Validator.scala @@ -139,10 +139,63 @@ object Validator { val failures = runValidation(migratorConfig) - if (failures.isEmpty) log.info("No comparison failures found - enjoy your day!") - else { + if (failures.isEmpty) { + log.info("No comparison failures found - enjoy your day!") + } else { log.error("Found the following comparison failures:") log.error(failures.mkString("\n")) + + // Copy missing data here based on the discrepancies found + copyMissingData(migratorConfig, failures) + } + } + + // Additional function to copy missing data + + def copyMissingData(config: MigratorConfig, failures: List[RowComparisonFailure])( + implicit spark: SparkSession): Unit = { + + val log = LogManager.getLogger("com.scylladb.migrator") + val sourceSettings = config.source.asInstanceOf[SourceSettings.Cassandra] + val targetSettings = config.target.asInstanceOf[TargetSettings.Scylla] + val retryMaxAttempts = 5 + val retryBackoff = 10.seconds + + val retryPolicy = RetryPolicy() + .withBackoff(retryBackoff) + .withMaxRetries(retryMaxAttempts) + .onRetry { + case (_, _, e) => + log.warn(s"Retrying due to error: ${e.getMessage}") + } + + for (failure <- failures) { + val keyColumns = failure.primaryKeyColumns // Assuming this holds the key column names + + val sourceRows = spark.sparkContext + .cassandraTable(sourceSettings.keyspace, sourceSettings.table) + .select(keyColumns: _*) + .where(failure.primaryKeyFilter) // Assuming this holds the filter conditions for the missing rows + + Try { + sourceRows.foreachPartition { partition => + val retryableInsert = RetryableInsert.retryableInsert( + partition, + targetSettings.keyspace, + targetSettings.table, + targetSettings.columns.map(_.columnName), + retryPolicy, + log + ) + + retryableInsert.run() + } + } match { + case Success(_) => + log.info(s"Copied missing data for primary key: ${failure.primaryKeyValues.mkString(", ")}") + case Failure(e) => + log.error(s"Failed to copy missing data for primary key: ${failure.primaryKeyValues.mkString(", ")}", e) + } } } } From a7006b45f5728ff1a459d48e20d3d18b39a84ff5 Mon Sep 17 00:00:00 2001 From: James Stocker Date: Wed, 9 Aug 2023 19:03:20 +0200 Subject: [PATCH 3/4] Update CopyMissingData method --- .../com/scylladb/migrator/Validator.scala | 71 +++++++++++-------- 1 file changed, 41 insertions(+), 30 deletions(-) diff --git a/src/main/scala/com/scylladb/migrator/Validator.scala b/src/main/scala/com/scylladb/migrator/Validator.scala index 2a578edd..ee262054 100644 --- a/src/main/scala/com/scylladb/migrator/Validator.scala +++ b/src/main/scala/com/scylladb/migrator/Validator.scala @@ -151,50 +151,61 @@ object Validator { } // Additional function to copy missing data - def copyMissingData(config: MigratorConfig, failures: List[RowComparisonFailure])( implicit spark: SparkSession): Unit = { - val log = LogManager.getLogger("com.scylladb.migrator") val sourceSettings = config.source.asInstanceOf[SourceSettings.Cassandra] val targetSettings = config.target.asInstanceOf[TargetSettings.Scylla] val retryMaxAttempts = 5 val retryBackoff = 10.seconds - val retryPolicy = RetryPolicy() - .withBackoff(retryBackoff) - .withMaxRetries(retryMaxAttempts) - .onRetry { - case (_, _, e) => - log.warn(s"Retrying due to error: ${e.getMessage}") - } - for (failure <- failures) { - val keyColumns = failure.primaryKeyColumns // Assuming this holds the key column names + val primaryKeyColumns = failure.primaryKeyColumns // Assuming this holds the key column names val sourceRows = spark.sparkContext .cassandraTable(sourceSettings.keyspace, sourceSettings.table) - .select(keyColumns: _*) - .where(failure.primaryKeyFilter) // Assuming this holds the filter conditions for the missing rows - - Try { - sourceRows.foreachPartition { partition => - val retryableInsert = RetryableInsert.retryableInsert( - partition, - targetSettings.keyspace, - targetSettings.table, - targetSettings.columns.map(_.columnName), - retryPolicy, - log - ) + .select(primaryKeyColumns: _*) + .where(failure.primaryKeyFilter) // Assuming this holds the filter conditions for the missing rows) + + var success = false + var attempt = 1 + while (!success && attempt <= retryMaxAttempts) { + Try { + sourceRows + .foreachPartition { partition => + // Collect rows to be written + val rowsToWrite = partition.map { row => + targetSettings.columns.map { col => + col.columnName -> row.get(col.columnName) + }.toMap + }.toSeq + + // Use the writing method from the original code to write rows + writers.Scylla.writeDataframe( + targetSettings, + Map.empty, // No renames needed + spark.createDataFrame(rowsToWrite, targetSettings.schema), + None // No token range accumulator for validation + ) + + success = true + } + } match { + case Success(_) => + log.info(s"Copied missing data for primary key: ${failure.primaryKeyValues.mkString(", ")}") + case Failure(e) => + log.error(s"Failed to copy missing data for primary key: ${failure.primaryKeyValues.mkString(", ")}", e) + } - retryableInsert.run() + if (!success) { + log.warn(s"Write attempt $attempt failed. Retrying after sleeping for $retryBackoff...") + attempt += 1 + Thread.sleep(retryBackoff.toMillis) } - } match { - case Success(_) => - log.info(s"Copied missing data for primary key: ${failure.primaryKeyValues.mkString(", ")}") - case Failure(e) => - log.error(s"Failed to copy missing data for primary key: ${failure.primaryKeyValues.mkString(", ")}", e) + } + + if (!success) { + log.error(s"Failed to copy missing data after $retryMaxAttempts attempts for primary key: ${failure.primaryKeyValues.mkString(", ")}") } } } From 40d472172fadb1caf2ea0a11c072380af83d4445 Mon Sep 17 00:00:00 2001 From: James Stocker Date: Wed, 9 Aug 2023 19:23:27 +0200 Subject: [PATCH 4/4] A quick review --- .../scala/com/scylladb/migrator/Validator.scala | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/main/scala/com/scylladb/migrator/Validator.scala b/src/main/scala/com/scylladb/migrator/Validator.scala index ee262054..97ea0bb7 100644 --- a/src/main/scala/com/scylladb/migrator/Validator.scala +++ b/src/main/scala/com/scylladb/migrator/Validator.scala @@ -150,12 +150,16 @@ object Validator { } } - // Additional function to copy missing data def copyMissingData(config: MigratorConfig, failures: List[RowComparisonFailure])( implicit spark: SparkSession): Unit = { val log = LogManager.getLogger("com.scylladb.migrator") val sourceSettings = config.source.asInstanceOf[SourceSettings.Cassandra] - val targetSettings = config.target.asInstanceOf[TargetSettings.Scylla] + + val migratorConfig = + MigratorConfig.loadFrom(spark.conf.get("spark.scylla.config")) + + val target = migratorConfig.target + val targetSettings = target.asInstanceOf[TargetSettings.Scylla] val retryMaxAttempts = 5 val retryBackoff = 10.seconds @@ -170,7 +174,7 @@ object Validator { var success = false var attempt = 1 while (!success && attempt <= retryMaxAttempts) { - Try { + try { sourceRows .foreachPartition { partition => // Collect rows to be written @@ -180,11 +184,12 @@ object Validator { }.toMap }.toSeq - // Use the writing method from the original code to write rows + // Use the writing method from the original code with the correct number of parameters writers.Scylla.writeDataframe( - targetSettings, - Map.empty, // No renames needed + target, + migratorConfig.renames, // Use the renames from the migratorConfig spark.createDataFrame(rowsToWrite, targetSettings.schema), + Seq.empty, // Empty sequence for timestamp columns None // No token range accumulator for validation )