From 95697fc5d85fdef17d8e35c31b28521e2c374e6f Mon Sep 17 00:00:00 2001 From: Julien Richard-Foy Date: Tue, 12 Mar 2024 09:34:59 +0100 Subject: [PATCH 1/7] Add failing test case for issue #103 (probably covers #104 as well) --- .../dynamodb-to-alternator-issue-103.yaml | 38 +++++++++++++++ .../com/scylladb/migrator/Issue103Test.scala | 47 +++++++++++++++++++ 2 files changed, 85 insertions(+) create mode 100644 tests/src/test/configurations/dynamodb-to-alternator-issue-103.yaml create mode 100644 tests/src/test/scala/com/scylladb/migrator/Issue103Test.scala diff --git a/tests/src/test/configurations/dynamodb-to-alternator-issue-103.yaml b/tests/src/test/configurations/dynamodb-to-alternator-issue-103.yaml new file mode 100644 index 00000000..e1e25f87 --- /dev/null +++ b/tests/src/test/configurations/dynamodb-to-alternator-issue-103.yaml @@ -0,0 +1,38 @@ +source: + type: dynamodb + table: Issue103Items + endpoint: + host: http://dynamodb + port: 8000 + credentials: + accessKey: dummy + secretKey: dummy + maxMapTasks: 2 + scanSegments: 10 + +target: + type: dynamodb + table: Issue103Items + endpoint: + host: http://scylla + port: 8000 + credentials: + accessKey: dummy + secretKey: dummy + maxMapTasks: 1 + streamChanges: false + +renames: [] + +# Below are unused but mandatory settings +savepoints: + path: /app/savepoints + intervalSeconds: 300 +skipTokenRanges: [] +validation: + compareTimestamps: true + ttlToleranceMillis: 60000 + writetimeToleranceMillis: 1000 + failuresToFetch: 100 + floatingPointTolerance: 0.001 + timestampMsTolerance: 0 diff --git a/tests/src/test/scala/com/scylladb/migrator/Issue103Test.scala b/tests/src/test/scala/com/scylladb/migrator/Issue103Test.scala new file mode 100644 index 00000000..e4304686 --- /dev/null +++ b/tests/src/test/scala/com/scylladb/migrator/Issue103Test.scala @@ -0,0 +1,47 @@ +package com.scylladb.migrator + +import com.amazonaws.services.dynamodbv2.model.{AttributeValue, GetItemRequest} + +import scala.collection.JavaConverters._ +import scala.util.chaining._ + +// Reproduction of https://github.com/scylladb/scylla-migrator/issues/103 +class Issue103Test extends MigratorSuite { + + withTable("Issue103Items").test("Issue #103 is fixed") { tableName => + // Insert two items + val keys1 = Map("id" -> new AttributeValue().withS("4")) + val attrs1 = Map( + "AlbumTitle" -> new AttributeValue().withS("aaaaa"), + "Awards" -> new AttributeValue().withN("4") + ) + val item1Data = keys1 ++ attrs1 + + val keys2 = Map("id" -> new AttributeValue().withS("999")) + val attrs2 = Map( + "asdfg" -> new AttributeValue().withM( + Map("fffff" -> new AttributeValue().withS("asdfasdfs")).asJava + ) + ) + val item2Data = keys2 ++ attrs2 + + sourceDDb.putItem(tableName, item1Data.asJava) + sourceDDb.putItem(tableName, item2Data.asJava) + + // Perform the migration + submitSparkJob("dynamodb-to-alternator-issue-103.yaml") + + // Check that both items have been correctly migrated to the target table + targetAlternator + .getItem(new GetItemRequest(tableName, keys1.asJava)) + .tap { itemResult => + assertEquals(itemResult.getItem.asScala.toMap, item1Data) + } + targetAlternator + .getItem(new GetItemRequest(tableName, keys2.asJava)) + .tap { itemResult => + assertEquals(itemResult.getItem.asScala.toMap, item2Data) + } + } + +} From 610b95f1f8b74641c8ad68f3576a801f4977c9da Mon Sep 17 00:00:00 2001 From: Julien Richard-Foy Date: Fri, 1 Mar 2024 12:31:24 +0100 Subject: [PATCH 2/7] Do not try to infer a schema when migrating from DynamoDB to Alternator This is a work in progress to address #103. Instead of using `com.audienceproject:spark-dynamodb` to migrate the data, we use `com.amazon.emr:emr-dynamodb-hadoop` as described in https://aws.amazon.com/fr/blogs/big-data/analyze-your-data-on-amazon-dynamodb-with-apache-spark/. The former approach used to load the data as a `DataFrame`, which required us to infer the data schema, but which was not doable in a reliable way. The new approach still benefits from the Spark infrastructure to handle the data transfer efficiently, but the data is loaded as an `RDD`. To achieve this, I had to de-unify the migrators for Scylla and Alternator (so, in a sense, I am undoing what was done in #23). The benefit is that the Alternator migrator is not anymore constrained by the requirements of the Scylla migrator. --- build.sbt | 1 + docker-compose-tests.yml | 2 + .../com/scylladb/migrator/Migrator.scala | 216 ++---------------- .../alternator/AlternatorMigrator.scala | 70 ++++++ .../scylladb/migrator/readers/Cassandra.scala | 1 + .../scylladb/migrator/readers/DynamoDB.scala | 57 +++-- .../scylladb/migrator/readers/Parquet.scala | 3 +- .../migrator/readers/SourceDataFrame.scala | 8 - .../migrator/readers/TimestampColumns.scala | 3 + .../migrator/scylla/ScyllaMigrator.scala | 152 ++++++++++++ .../scylladb/migrator/writers/DynamoDB.scala | 71 +++--- .../writers/DynamoStreamReplication.scala | 90 +++----- .../scylladb/migrator/writers/Scylla.scala | 6 +- tests/docker/job-flow.json | 27 +++ 14 files changed, 384 insertions(+), 323 deletions(-) create mode 100644 migrator/src/main/scala/com/scylladb/migrator/alternator/AlternatorMigrator.scala delete mode 100644 migrator/src/main/scala/com/scylladb/migrator/readers/SourceDataFrame.scala create mode 100644 migrator/src/main/scala/com/scylladb/migrator/readers/TimestampColumns.scala create mode 100644 migrator/src/main/scala/com/scylladb/migrator/scylla/ScyllaMigrator.scala create mode 100644 tests/docker/job-flow.json diff --git a/build.sbt b/build.sbt index da38ba78..700f7fef 100644 --- a/build.sbt +++ b/build.sbt @@ -30,6 +30,7 @@ lazy val migrator = (project in file("migrator")).settings( "com.amazonaws" % "aws-java-sdk-dynamodb" % awsSdkVersion, ("com.amazonaws" % "dynamodb-streams-kinesis-adapter" % "1.5.2") .excludeAll(InclExclRule("com.fasterxml.jackson.core")), + "com.amazon.emr" % "emr-dynamodb-hadoop" % "4.8.0", "org.yaml" % "snakeyaml" % "1.23", "io.circe" %% "circe-yaml" % "0.9.0", "io.circe" %% "circe-generic" % "0.9.0", diff --git a/docker-compose-tests.yml b/docker-compose-tests.yml index 66e452cd..5a1808c4 100644 --- a/docker-compose-tests.yml +++ b/docker-compose-tests.yml @@ -54,6 +54,8 @@ services: volumes: - ./migrator/target/scala-2.11:/jars - ./tests/src/test/configurations:/app/configurations + # Workaround for https://github.com/awslabs/emr-dynamodb-connector/issues/50 + - ${PWD}/tests/docker/job-flow.json:/mnt/var/lib/info/job-flow.json spark-worker: image: bde2020/spark-worker:2.4.4-hadoop2.7 diff --git a/migrator/src/main/scala/com/scylladb/migrator/Migrator.scala b/migrator/src/main/scala/com/scylladb/migrator/Migrator.scala index dba0092f..839d32db 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/Migrator.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/Migrator.scala @@ -1,21 +1,10 @@ package com.scylladb.migrator -import java.nio.charset.StandardCharsets -import java.nio.file.{ Files, Paths } -import java.util.concurrent.{ ScheduledThreadPoolExecutor, TimeUnit } -import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter -import com.datastax.spark.connector.rdd.partitioner.{ CassandraPartition, CqlTokenRange } -import com.datastax.spark.connector.rdd.partitioner.dht.Token -import com.datastax.spark.connector.writer._ +import com.scylladb.migrator.alternator.AlternatorMigrator import com.scylladb.migrator.config._ -import com.scylladb.migrator.writers.DynamoStreamReplication +import com.scylladb.migrator.scylla.ScyllaMigrator import org.apache.log4j.{ Level, LogManager, Logger } import org.apache.spark.sql._ -import org.apache.spark.streaming.{ Seconds, StreamingContext } -import org.apache.spark.streaming.kinesis.{ KinesisInputDStream, SparkAWSCredentials } -import sun.misc.{ Signal, SignalHandler } - -import scala.util.control.NonFatal object Migrator { val log = LogManager.getLogger("com.scylladb.migrator") @@ -27,7 +16,6 @@ object Migrator { .config("spark.task.maxFailures", "1024") .config("spark.stage.maxConsecutiveAttempts", "60") .getOrCreate - val streamingContext = new StreamingContext(spark.sparkContext, Seconds(5)) Logger.getRootLogger.setLevel(Level.WARN) log.setLevel(Level.INFO) @@ -39,202 +27,26 @@ object Migrator { log.info(s"Loaded config: ${migratorConfig}") - val scheduler = new ScheduledThreadPoolExecutor(1) - - val sourceDF = - migratorConfig.source match { - case cassandraSource: SourceSettings.Cassandra => - readers.Cassandra.readDataframe( + try { + (migratorConfig.source, migratorConfig.target) match { + case (cassandraSource: SourceSettings.Cassandra, scyllaTarget: TargetSettings.Scylla) => + val sourceDF = readers.Cassandra.readDataframe( spark, cassandraSource, cassandraSource.preserveTimestamps, migratorConfig.skipTokenRanges) - case parquetSource: SourceSettings.Parquet => - readers.Parquet.readDataFrame(spark, parquetSource) - case dynamoSource: SourceSettings.DynamoDB => - val tableDesc = DynamoUtils - .buildDynamoClient(dynamoSource.endpoint, dynamoSource.credentials, dynamoSource.region) - .describeTable(dynamoSource.table) - .getTable - - readers.DynamoDB.readDataFrame(spark, dynamoSource, tableDesc) - } - - log.info("Created source dataframe; resulting schema:") - sourceDF.dataFrame.printSchema() - - val tokenRangeAccumulator = - if (!sourceDF.savepointsSupported) None - else { - val tokenRangeAccumulator = TokenRangeAccumulator.empty - spark.sparkContext.register(tokenRangeAccumulator, "Token ranges copied") - - addUSR2Handler(migratorConfig, tokenRangeAccumulator) - startSavepointSchedule(scheduler, migratorConfig, tokenRangeAccumulator) - - Some(tokenRangeAccumulator) - } - - log.info( - "We need to transfer: " + sourceDF.dataFrame.rdd.getNumPartitions + " partitions in total") - - if (migratorConfig.source.isInstanceOf[SourceSettings.Cassandra]) { - val partitions = sourceDF.dataFrame.rdd.partitions - val cassandraPartitions = partitions.map(p => { - p.asInstanceOf[CassandraPartition[_, _]] - }) - var allTokenRanges = Set[(Token[_], Token[_])]() - cassandraPartitions.foreach(p => { - p.tokenRanges - .asInstanceOf[Vector[CqlTokenRange[_, _]]] - .foreach(tr => { - val range = - Set((tr.range.start.asInstanceOf[Token[_]], tr.range.end.asInstanceOf[Token[_]])) - allTokenRanges = allTokenRanges ++ range - }) - - }) - - log.info("All token ranges extracted from partitions size:" + allTokenRanges.size) - - if (migratorConfig.skipTokenRanges != None) { - log.info( - "Savepoints array defined, size of the array: " + migratorConfig.skipTokenRanges.size) - - val diff = allTokenRanges.diff(migratorConfig.skipTokenRanges) - log.info("Diff ... total diff of full ranges to savepoints is: " + diff.size) - log.debug("Dump of the missing tokens: ") - log.debug(diff) - } - } - - log.info("Starting write...") - - try { - migratorConfig.target match { - case target: TargetSettings.Scylla => - writers.Scylla.writeDataframe( - target, - migratorConfig.renames, - sourceDF.dataFrame, - sourceDF.timestampColumns, - tokenRangeAccumulator) - case target: TargetSettings.DynamoDB => - val sourceAndDescriptions = migratorConfig.source match { - case source: SourceSettings.DynamoDB => - if (target.streamChanges) { - log.info( - "Source is a Dynamo table and change streaming requested; enabling Dynamo Stream") - DynamoUtils.enableDynamoStream(source) - } - val sourceDesc = - DynamoUtils - .buildDynamoClient(source.endpoint, source.credentials, source.region) - .describeTable(source.table) - .getTable - - Some( - ( - source, - sourceDesc, - DynamoUtils.replicateTableDefinition( - sourceDesc, - target - ) - )) - - case _ => - None - } - - writers.DynamoDB.writeDataframe( - target, - migratorConfig.renames, - sourceDF.dataFrame, - sourceAndDescriptions.map(_._3)) - - sourceAndDescriptions.foreach { - case (source, sourceDesc, targetDesc) => - if (target.streamChanges) { - log.info("Done transferring table snapshot. Starting to transfer changes") - - DynamoStreamReplication.createDStream( - spark, - streamingContext, - source, - target, - sourceDF.dataFrame.schema, - sourceDesc, - targetDesc, - migratorConfig.renames) - - streamingContext.start() - streamingContext.awaitTermination() - } - } + ScyllaMigrator.migrate(migratorConfig, scyllaTarget, sourceDF) + case (parquetSource: SourceSettings.Parquet, scyllaTarget: TargetSettings.Scylla) => + val sourceDF = readers.Parquet.readDataFrame(spark, parquetSource) + ScyllaMigrator.migrate(migratorConfig, scyllaTarget, sourceDF) + case (dynamoSource: SourceSettings.DynamoDB, alternatorTarget: TargetSettings.DynamoDB) => + AlternatorMigrator.migrate(dynamoSource, alternatorTarget, migratorConfig.renames) + case _ => + sys.error("Unsupported combination of source and target.") } - } catch { - case NonFatal(e) => // Catching everything on purpose to try and dump the accumulator state - log.error( - "Caught error while writing the DataFrame. Will create a savepoint before exiting", - e) } finally { - tokenRangeAccumulator.foreach(dumpAccumulatorState(migratorConfig, _, "final")) - scheduler.shutdown() spark.stop() } } - def savepointFilename(path: String): String = - s"${path}/savepoint_${System.currentTimeMillis / 1000}.yaml" - - def dumpAccumulatorState(config: MigratorConfig, - accumulator: TokenRangeAccumulator, - reason: String): Unit = { - val filename = - Paths.get(savepointFilename(config.savepoints.path)).normalize - val rangesToSkip = accumulator.value.get.map(range => - (range.range.start.asInstanceOf[Token[_]], range.range.end.asInstanceOf[Token[_]])) - - val modifiedConfig = config.copy( - skipTokenRanges = config.skipTokenRanges ++ rangesToSkip - ) - - Files.write(filename, modifiedConfig.render.getBytes(StandardCharsets.UTF_8)) - - log.info( - s"Created a savepoint config at ${filename} due to ${reason}. Ranges added: ${rangesToSkip}") - } - - def startSavepointSchedule(svc: ScheduledThreadPoolExecutor, - config: MigratorConfig, - acc: TokenRangeAccumulator): Unit = { - val runnable = new Runnable { - override def run(): Unit = - try dumpAccumulatorState(config, acc, "schedule") - catch { - case e: Throwable => - log.error("Could not create the savepoint. This will be retried.", e) - } - } - - log.info( - s"Starting savepoint schedule; will write a savepoint every ${config.savepoints.intervalSeconds} seconds") - - svc.scheduleAtFixedRate(runnable, 0, config.savepoints.intervalSeconds, TimeUnit.SECONDS) - } - - def addUSR2Handler(config: MigratorConfig, acc: TokenRangeAccumulator) = { - log.info( - "Installing SIGINT/TERM/USR2 handler. Send this to dump the current progress to a savepoint.") - - val handler = new SignalHandler { - override def handle(signal: Signal): Unit = - dumpAccumulatorState(config, acc, signal.toString) - } - - Signal.handle(new Signal("USR2"), handler) - Signal.handle(new Signal("TERM"), handler) - Signal.handle(new Signal("INT"), handler) - } } diff --git a/migrator/src/main/scala/com/scylladb/migrator/alternator/AlternatorMigrator.scala b/migrator/src/main/scala/com/scylladb/migrator/alternator/AlternatorMigrator.scala new file mode 100644 index 00000000..93927544 --- /dev/null +++ b/migrator/src/main/scala/com/scylladb/migrator/alternator/AlternatorMigrator.scala @@ -0,0 +1,70 @@ +package com.scylladb.migrator.alternator + +import com.scylladb.migrator.DynamoUtils +import com.scylladb.migrator.config.{ Rename, SourceSettings, TargetSettings } +import com.scylladb.migrator.{ readers, writers } +import com.scylladb.migrator.writers.DynamoStreamReplication +import org.apache.log4j.LogManager +import org.apache.spark.sql.SparkSession +import org.apache.spark.streaming.{ Seconds, StreamingContext } + +import scala.util.control.NonFatal + +object AlternatorMigrator { + val log = LogManager.getLogger("com.scylladb.migrator.alternator") + + def migrate(source: SourceSettings.DynamoDB, + target: TargetSettings.DynamoDB, + renames: List[Rename])(implicit spark: SparkSession): Unit = { + + val sourceTableDesc = DynamoUtils + .buildDynamoClient(source.endpoint, source.credentials, source.region) + .describeTable(source.table) + .getTable + + val sourceRDD = + readers.DynamoDB.readRDD(spark, source, sourceTableDesc) + + log.info("We need to transfer: " + sourceRDD.getNumPartitions + " partitions in total") + + log.info("Starting write...") + + try { + val targetTableDesc = { + if (target.streamChanges) { + log.info( + "Source is a Dynamo table and change streaming requested; enabling Dynamo Stream") + DynamoUtils.enableDynamoStream(source) + } + + DynamoUtils.replicateTableDefinition( + sourceTableDesc, + target + ) + } + + writers.DynamoDB.writeRDD(target, renames, sourceRDD, Some(targetTableDesc)) + + if (target.streamChanges) { + log.info("Done transferring table snapshot. Starting to transfer changes") + val streamingContext = new StreamingContext(spark.sparkContext, Seconds(5)) + + DynamoStreamReplication.createDStream( + spark, + streamingContext, + source, + target, + targetTableDesc, + renames) + + streamingContext.start() + streamingContext.awaitTermination() + } + } catch { + case NonFatal(e) => + log.error("Caught error while writing the RDD.", e) + } + + } + +} diff --git a/migrator/src/main/scala/com/scylladb/migrator/readers/Cassandra.scala b/migrator/src/main/scala/com/scylladb/migrator/readers/Cassandra.scala index e8542d00..2a6ac6e4 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/readers/Cassandra.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/readers/Cassandra.scala @@ -15,6 +15,7 @@ import org.apache.spark.sql.types.{ IntegerType, LongType, StructField, StructTy import org.apache.spark.sql.{ DataFrame, Row, SparkSession } import org.apache.spark.unsafe.types.UTF8String import com.datastax.oss.driver.api.core.ConsistencyLevel +import com.scylladb.migrator.scylla.SourceDataFrame import scala.collection.mutable.ArrayBuffer diff --git a/migrator/src/main/scala/com/scylladb/migrator/readers/DynamoDB.scala b/migrator/src/main/scala/com/scylladb/migrator/readers/DynamoDB.scala index bffac36f..12c3a122 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/readers/DynamoDB.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/readers/DynamoDB.scala @@ -1,19 +1,21 @@ package com.scylladb.migrator.readers -import cats.implicits._ -import com.amazonaws.services.dynamodbv2.document.Table import com.amazonaws.services.dynamodbv2.model.TableDescription -import com.audienceproject.spark.dynamodb.implicits._ -import com.scylladb.migrator.config.{ AWSCredentials, SourceSettings } +import com.scylladb.migrator.config.SourceSettings +import org.apache.hadoop.dynamodb.{ DynamoDBConstants, DynamoDBItemWritable } +import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat +import org.apache.hadoop.io.Text +import org.apache.hadoop.mapred.JobConf import org.apache.log4j.LogManager +import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession object DynamoDB { val log = LogManager.getLogger("com.scylladb.migrator.readers.DynamoDB") - def readDataFrame(spark: SparkSession, - source: SourceSettings.DynamoDB, - description: TableDescription): SourceDataFrame = { + def readRDD(spark: SparkSession, + source: SourceSettings.DynamoDB, + description: TableDescription): RDD[(Text, DynamoDBItemWritable)] = { val provisionedThroughput = Option(description.getProvisionedThroughput) val readThroughput = provisionedThroughput.flatMap(p => Option(p.getReadCapacityUnits)) val writeThroughput = provisionedThroughput.flatMap(p => Option(p.getWriteCapacityUnits)) @@ -22,23 +24,32 @@ object DynamoDB { if (readThroughput.isEmpty || writeThroughput.isEmpty) Some("25") else None - val df = spark.read - .options( - (List( - source.region.map("region" -> _), - source.endpoint.map(ep => "endpoint" -> ep.renderEndpoint), - source.scanSegments.map(segs => "readPartitions" -> segs.toString), - source.throughputReadPercent.map(pct => "targetCapacity" -> pct.toString), - source.maxMapTasks.map(tasks => "defaultParallelism" -> tasks.toString), - throughput.map("throughput" -> _) - ).flatten ++ source.credentials.toList.flatMap { - case AWSCredentials(accessKey, secretKey) => - List("accesskey" -> accessKey, "secretkey" -> secretKey) - }).toMap - ) - .dynamodb(source.table) + val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration) + def setOptionalConf(name: String, maybeValue: Option[String]): Unit = + for (value <- maybeValue) { + jobConf.set(name, value) + } + jobConf.set(DynamoDBConstants.INPUT_TABLE_NAME, source.table) + setOptionalConf(DynamoDBConstants.REGION, source.region) + setOptionalConf(DynamoDBConstants.ENDPOINT, source.endpoint.map(_.renderEndpoint)) + setOptionalConf(DynamoDBConstants.READ_THROUGHPUT, throughput) + setOptionalConf( + DynamoDBConstants.THROUGHPUT_READ_PERCENT, + source.throughputReadPercent.map(_.toString)) + setOptionalConf(DynamoDBConstants.SCAN_SEGMENTS, source.scanSegments.map(_.toString)) + setOptionalConf(DynamoDBConstants.MAX_MAP_TASKS, source.maxMapTasks.map(_.toString)) + setOptionalConf(DynamoDBConstants.DYNAMODB_ACCESS_KEY_CONF, source.credentials.map(_.accessKey)) + setOptionalConf(DynamoDBConstants.DYNAMODB_SECRET_KEY_CONF, source.credentials.map(_.secretKey)) + jobConf.set( + "mapred.output.format.class", + "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat") + jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat") - SourceDataFrame(df, None, false) + spark.sparkContext.hadoopRDD( + jobConf, + classOf[DynamoDBInputFormat], + classOf[Text], + classOf[DynamoDBItemWritable]) } } diff --git a/migrator/src/main/scala/com/scylladb/migrator/readers/Parquet.scala b/migrator/src/main/scala/com/scylladb/migrator/readers/Parquet.scala index a68f501e..4826841e 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/readers/Parquet.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/readers/Parquet.scala @@ -1,8 +1,9 @@ package com.scylladb.migrator.readers import com.scylladb.migrator.config.SourceSettings +import com.scylladb.migrator.scylla.SourceDataFrame import org.apache.log4j.LogManager -import org.apache.spark.sql.{ DataFrame, SparkSession } +import org.apache.spark.sql.SparkSession object Parquet { val log = LogManager.getLogger("com.scylladb.migrator.readers.Parquet") diff --git a/migrator/src/main/scala/com/scylladb/migrator/readers/SourceDataFrame.scala b/migrator/src/main/scala/com/scylladb/migrator/readers/SourceDataFrame.scala deleted file mode 100644 index 7133c7aa..00000000 --- a/migrator/src/main/scala/com/scylladb/migrator/readers/SourceDataFrame.scala +++ /dev/null @@ -1,8 +0,0 @@ -package com.scylladb.migrator.readers - -import org.apache.spark.sql.DataFrame - -case class TimestampColumns(ttl: String, writeTime: String) -case class SourceDataFrame(dataFrame: DataFrame, - timestampColumns: Option[TimestampColumns], - savepointsSupported: Boolean) diff --git a/migrator/src/main/scala/com/scylladb/migrator/readers/TimestampColumns.scala b/migrator/src/main/scala/com/scylladb/migrator/readers/TimestampColumns.scala new file mode 100644 index 00000000..6d67b890 --- /dev/null +++ b/migrator/src/main/scala/com/scylladb/migrator/readers/TimestampColumns.scala @@ -0,0 +1,3 @@ +package com.scylladb.migrator.readers + +case class TimestampColumns(ttl: String, writeTime: String) diff --git a/migrator/src/main/scala/com/scylladb/migrator/scylla/ScyllaMigrator.scala b/migrator/src/main/scala/com/scylladb/migrator/scylla/ScyllaMigrator.scala new file mode 100644 index 00000000..6a7a95a6 --- /dev/null +++ b/migrator/src/main/scala/com/scylladb/migrator/scylla/ScyllaMigrator.scala @@ -0,0 +1,152 @@ +package com.scylladb.migrator.scylla + +import com.datastax.spark.connector.rdd.partitioner.{ CassandraPartition, CqlTokenRange } +import com.datastax.spark.connector.rdd.partitioner.dht.Token +import com.datastax.spark.connector.writer.TokenRangeAccumulator +import com.scylladb.migrator.config.{ MigratorConfig, SourceSettings, TargetSettings } +import com.scylladb.migrator.readers.TimestampColumns +import com.scylladb.migrator.writers +import org.apache.log4j.LogManager +import org.apache.spark.sql.{ DataFrame, SparkSession } +import sun.misc.{ Signal, SignalHandler } + +import java.nio.charset.StandardCharsets +import java.nio.file.{ Files, Paths } +import java.util.concurrent.{ ScheduledThreadPoolExecutor, TimeUnit } +import scala.util.control.NonFatal + +case class SourceDataFrame(dataFrame: DataFrame, + timestampColumns: Option[TimestampColumns], + savepointsSupported: Boolean) + +object ScyllaMigrator { + val log = LogManager.getLogger("com.scylladb.migrator.scylla") + + def migrate(migratorConfig: MigratorConfig, + target: TargetSettings.Scylla, + sourceDF: SourceDataFrame)(implicit spark: SparkSession): Unit = { + + val scheduler = new ScheduledThreadPoolExecutor(1) + + log.info("Created source dataframe; resulting schema:") + sourceDF.dataFrame.printSchema() + + val tokenRangeAccumulator = + if (!sourceDF.savepointsSupported) None + else { + val tokenRangeAccumulator = TokenRangeAccumulator.empty + spark.sparkContext.register(tokenRangeAccumulator, "Token ranges copied") + + addUSR2Handler(migratorConfig, tokenRangeAccumulator) + startSavepointSchedule(scheduler, migratorConfig, tokenRangeAccumulator) + + Some(tokenRangeAccumulator) + } + + log.info( + "We need to transfer: " + sourceDF.dataFrame.rdd.getNumPartitions + " partitions in total") + + if (migratorConfig.source.isInstanceOf[SourceSettings.Cassandra]) { + val partitions = sourceDF.dataFrame.rdd.partitions + val cassandraPartitions = partitions.map(p => { + p.asInstanceOf[CassandraPartition[_, _]] + }) + var allTokenRanges = Set[(Token[_], Token[_])]() + cassandraPartitions.foreach(p => { + p.tokenRanges + .asInstanceOf[Vector[CqlTokenRange[_, _]]] + .foreach(tr => { + val range = + Set((tr.range.start.asInstanceOf[Token[_]], tr.range.end.asInstanceOf[Token[_]])) + allTokenRanges = allTokenRanges ++ range + }) + + }) + + log.info("All token ranges extracted from partitions size:" + allTokenRanges.size) + + if (migratorConfig.skipTokenRanges != None) { + log.info( + "Savepoints array defined, size of the array: " + migratorConfig.skipTokenRanges.size) + + val diff = allTokenRanges.diff(migratorConfig.skipTokenRanges) + log.info("Diff ... total diff of full ranges to savepoints is: " + diff.size) + log.debug("Dump of the missing tokens: ") + log.debug(diff) + } + } + + log.info("Starting write...") + + try { + writers.Scylla.writeDataframe( + target, + migratorConfig.renames, + sourceDF.dataFrame, + sourceDF.timestampColumns, + tokenRangeAccumulator) + } catch { + case NonFatal(e) => // Catching everything on purpose to try and dump the accumulator state + log.error( + "Caught error while writing the DataFrame. Will create a savepoint before exiting", + e) + } finally { + tokenRangeAccumulator.foreach(dumpAccumulatorState(migratorConfig, _, "final")) + scheduler.shutdown() + } + } + + def startSavepointSchedule(svc: ScheduledThreadPoolExecutor, + config: MigratorConfig, + acc: TokenRangeAccumulator): Unit = { + val runnable = new Runnable { + override def run(): Unit = + try dumpAccumulatorState(config, acc, "schedule") + catch { + case e: Throwable => + log.error("Could not create the savepoint. This will be retried.", e) + } + } + + log.info( + s"Starting savepoint schedule; will write a savepoint every ${config.savepoints.intervalSeconds} seconds") + + svc.scheduleAtFixedRate(runnable, 0, config.savepoints.intervalSeconds, TimeUnit.SECONDS) + } + + def addUSR2Handler(config: MigratorConfig, acc: TokenRangeAccumulator) = { + log.info( + "Installing SIGINT/TERM/USR2 handler. Send this to dump the current progress to a savepoint.") + + val handler = new SignalHandler { + override def handle(signal: Signal): Unit = + dumpAccumulatorState(config, acc, signal.toString) + } + + Signal.handle(new Signal("USR2"), handler) + Signal.handle(new Signal("TERM"), handler) + Signal.handle(new Signal("INT"), handler) + } + + def savepointFilename(path: String): String = + s"${path}/savepoint_${System.currentTimeMillis / 1000}.yaml" + + def dumpAccumulatorState(config: MigratorConfig, + accumulator: TokenRangeAccumulator, + reason: String): Unit = { + val filename = + Paths.get(savepointFilename(config.savepoints.path)).normalize + val rangesToSkip = accumulator.value.get.map(range => + (range.range.start.asInstanceOf[Token[_]], range.range.end.asInstanceOf[Token[_]])) + + val modifiedConfig = config.copy( + skipTokenRanges = config.skipTokenRanges ++ rangesToSkip + ) + + Files.write(filename, modifiedConfig.render.getBytes(StandardCharsets.UTF_8)) + + log.info( + s"Created a savepoint config at ${filename} due to ${reason}. Ranges added: ${rangesToSkip}") + } + +} diff --git a/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoDB.scala b/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoDB.scala index 4d956828..dab4dbc7 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoDB.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoDB.scala @@ -1,20 +1,21 @@ package com.scylladb.migrator.writers -import cats.implicits._ import com.amazonaws.services.dynamodbv2.model.TableDescription -import com.audienceproject.spark.dynamodb.implicits._ -import com.scylladb.migrator.config.{ AWSCredentials, Rename, TargetSettings } +import com.scylladb.migrator.config.{ Rename, TargetSettings } +import org.apache.hadoop.dynamodb.{ DynamoDBConstants, DynamoDBItemWritable } +import org.apache.hadoop.io.Text +import org.apache.hadoop.mapred.JobConf import org.apache.log4j.LogManager -import org.apache.spark.sql.{ DataFrame, SparkSession } +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession object DynamoDB { val log = LogManager.getLogger("com.scylladb.migrator.writers.DynamoDB") - def writeDataframe( - target: TargetSettings.DynamoDB, - renames: List[Rename], - df: DataFrame, - targetTableDesc: Option[TableDescription])(implicit spark: SparkSession): Unit = { + def writeRDD(target: TargetSettings.DynamoDB, + renames: List[Rename], + rdd: RDD[(Text, DynamoDBItemWritable)], + targetTableDesc: Option[TableDescription])(implicit spark: SparkSession): Unit = { val provisionedThroughput = targetTableDesc.flatMap(p => Option(p.getProvisionedThroughput)) val readThroughput = provisionedThroughput.flatMap(p => Option(p.getReadCapacityUnits)) val writeThroughput = provisionedThroughput.flatMap(p => Option(p.getWriteCapacityUnits)) @@ -23,27 +24,39 @@ object DynamoDB { if (readThroughput.isEmpty || writeThroughput.isEmpty) Some("25") else None - val renamedDf = renames - .foldLeft(df) { - case (acc, Rename(from, to)) => acc.withColumnRenamed(from, to) + val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration) + + def setOptionalConf(name: String, maybeValue: Option[String]): Unit = + for (value <- maybeValue) { + jobConf.set(name, value) + } + + jobConf.set(DynamoDBConstants.OUTPUT_TABLE_NAME, target.table) + setOptionalConf(DynamoDBConstants.REGION, target.region) + setOptionalConf(DynamoDBConstants.ENDPOINT, target.endpoint.map(_.renderEndpoint)) + setOptionalConf(DynamoDBConstants.READ_THROUGHPUT, throughput) + setOptionalConf( + DynamoDBConstants.THROUGHPUT_WRITE_PERCENT, + target.throughputWritePercent.map(_.toString)) + setOptionalConf(DynamoDBConstants.SCAN_SEGMENTS, target.scanSegments.map(_.toString)) + setOptionalConf(DynamoDBConstants.MAX_MAP_TASKS, target.maxMapTasks.map(_.toString)) + setOptionalConf(DynamoDBConstants.DYNAMODB_ACCESS_KEY_CONF, target.credentials.map(_.accessKey)) + setOptionalConf(DynamoDBConstants.DYNAMODB_SECRET_KEY_CONF, target.credentials.map(_.secretKey)) + jobConf.set( + "mapred.output.format.class", + "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat") + jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat") + + rdd + .mapValues { itemWritable => + val item = itemWritable.getItem + for (rename <- renames) { + item.put(rename.to, item.get(rename.from)) + item.remove(rename.from) + } + itemWritable } + .saveAsHadoopDataset(jobConf) - log.info(s"Schema after renames:\n${renamedDf.schema.treeString}") - - renamedDf.write - .options( - (List( - target.region.map("region" -> _), - target.endpoint.map(ep => "endpoint" -> ep.renderEndpoint), - target.scanSegments.map(segs => "readPartitions" -> segs.toString), - target.throughputWritePercent.map(pct => "targetCapacity" -> pct.toString), - target.maxMapTasks.map(tasks => "defaultParallelism" -> tasks.toString), - throughput.map("throughput" -> _) - ).flatten ++ target.credentials.toList.flatMap { - case AWSCredentials(accessKey, secretKey) => - List("accesskey" -> accessKey, "secretkey" -> secretKey) - }).toMap - ) - .dynamodb(target.table) } } diff --git a/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala b/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala index 77d71331..105c9f3e 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala @@ -1,26 +1,21 @@ package com.scylladb.migrator.writers -import java.util - -import com.amazonaws.services.dynamodbv2.document.ItemUtils import com.amazonaws.services.dynamodbv2.model.{ AttributeValue, TableDescription } import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter import com.audienceproject.spark.dynamodb.connector.ColumnSchema -import com.audienceproject.spark.dynamodb.datasource.TypeConverter import com.scylladb.migrator.config.{ AWSCredentials, Rename, SourceSettings, TargetSettings } +import org.apache.hadoop.dynamodb.DynamoDBItemWritable +import org.apache.hadoop.io.Text import org.apache.log4j.LogManager -import org.apache.spark.sql.types.{ BooleanType, StringType, StructType } -import org.apache.spark.sql.{ Row, SparkSession } +import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.kinesis.{ KinesisInitialPositions, KinesisInputDStream, SparkAWSCredentials } -import org.apache.spark.unsafe.types.UTF8String -import org.apache.spark.sql.{ functions => f } -import scala.collection.JavaConverters._ +import java.util object DynamoStreamReplication { val log = LogManager.getLogger("com.scylladb.migrator.writers.DynamoStreamReplication") @@ -29,14 +24,8 @@ object DynamoStreamReplication { streamingContext: StreamingContext, src: SourceSettings.DynamoDB, target: TargetSettings.DynamoDB, - inferredSchema: StructType, - sourceTableDesc: TableDescription, targetTableDesc: TableDescription, - renames: List[Rename]) = { - val typeConverter = TypeConverter.fromStructType(inferredSchema) - val partitionColumns = - sourceTableDesc.getKeySchema.asScala.map(keyElement => f.col(keyElement.getAttributeName)) - + renames: List[Rename]): Unit = KinesisInputDStream.builder .streamingContext(streamingContext) .streamName(src.table) @@ -62,54 +51,41 @@ object DynamoStreamReplication { newMap.putAll(rec.getDynamodb.getKeys) - val row = - Row.fromSeq { - typeConverter - .readInternalRow(ItemUtils.toItem(newMap)) - .toSeq(inferredSchema) - .map { - case s: UTF8String => s.toString - case x => x - } ++ - Seq( - rec.getEventName match { - case "INSERT" | "MODIFY" => ColumnSchema.PutOperation - case "REMOVE" => ColumnSchema.DeleteOperation - } - ) + val operationType = + rec.getEventName match { + case "INSERT" | "MODIFY" => putOperation + case "REMOVE" => deleteOperation } - - Some(row) + newMap.put(ColumnSchema.OperationTypeColumn, operationType) + Some(newMap) case _ => None } .foreachRDD { msgs => - val df = spark - .createDataFrame( - msgs - .collect { - case Some(row) => row - }, - inferredSchema.add(ColumnSchema.OperationTypeColumn, BooleanType) - ) - .repartition(partitionColumns: _*) + val rdd = msgs.collect { + case Some(item) => (new Text(), new DynamoDBItemWritable(item)) + } + // FIXME re-partition by columns? msgs.repartition(???) log.info("Changes to be applied:") - df.select( - f.when( - f.col(ColumnSchema.OperationTypeColumn) === f.lit(ColumnSchema.PutOperation), - f.lit("UPSERT")) - .when( - f.col(ColumnSchema.OperationTypeColumn) === f.lit(ColumnSchema.DeleteOperation), - f.lit("DELETE")) - .otherwise(f.lit("UNKNOWN")) - .as(ColumnSchema.OperationTypeColumn) - ) - .groupBy(ColumnSchema.OperationTypeColumn) - .count() - .show() + rdd + .map(_._2) // Remove keys because they are not serializable + .groupBy { itemWritable => + itemWritable.getItem.get(ColumnSchema.OperationTypeColumn) match { + case `putOperation` => "UPSERT" + case `deleteOperation` => "DELETE" + case _ => "UNKNOWN" + } + } + .mapValues(_.size) + .foreach { + case (operation, count) => + log.info(s"${operation}: ${count}") + } - DynamoDB.writeDataframe(target, renames, df, Some(targetTableDesc))(spark) + DynamoDB.writeRDD(target, renames, rdd, Some(targetTableDesc))(spark) } - } + + private val putOperation = new AttributeValue().withBOOL(ColumnSchema.PutOperation) + private val deleteOperation = new AttributeValue().withBOOL(ColumnSchema.DeleteOperation) } diff --git a/migrator/src/main/scala/com/scylladb/migrator/writers/Scylla.scala b/migrator/src/main/scala/com/scylladb/migrator/writers/Scylla.scala index d2a00836..3ec98fd5 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/writers/Scylla.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/writers/Scylla.scala @@ -3,10 +3,10 @@ package com.scylladb.migrator.writers import com.datastax.spark.connector.writer._ import com.datastax.spark.connector._ import com.scylladb.migrator.Connectors -import com.scylladb.migrator.config.{Rename, TargetSettings} +import com.scylladb.migrator.config.{ Rename, TargetSettings } import com.scylladb.migrator.readers.TimestampColumns -import org.apache.log4j.{LogManager, Logger} -import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.log4j.{ LogManager, Logger } +import org.apache.spark.sql.{ DataFrame, Row, SparkSession } import com.datastax.oss.driver.api.core.ConsistencyLevel object Scylla { diff --git a/tests/docker/job-flow.json b/tests/docker/job-flow.json new file mode 100644 index 00000000..8575813b --- /dev/null +++ b/tests/docker/job-flow.json @@ -0,0 +1,27 @@ +{ + "jobFlowId": "j-2AO77MNLG17NW", + "jobFlowCreationInstant": 1429046932628, + "instanceCount": 2, + "masterInstanceId": "i-08dea4f4", + "masterPrivateDnsName": "localhost", + "masterInstanceType": "m1.medium", + "slaveInstanceType": "m1.xlarge", + "hadoopVersion": "2.4.0", + "instanceGroups": [ + { + "instanceGroupId": "ig-16NXM94TY33LB", + "instanceGroupName": "CORE", + "instanceRole": "Core", + "marketType": "OnDemand", + "instanceType": "m3.xlarge", + "requestedInstanceCount": 1 + }, + { + "instanceGroupId": "ig-2XQ29JGCTKLBL", + "instanceGroupName": "MASTER", + "instanceRole": "Master", + "marketType": "OnDemand", + "instanceType": "m1.medium", + "requestedInstanceCount": 1 + }] +} From 5bba4275e163e906bed96a58e4f7c09394c55392 Mon Sep 17 00:00:00 2001 From: Julien Richard-Foy Date: Fri, 8 Mar 2024 14:59:00 +0100 Subject: [PATCH 3/7] Remove dependency to spark-dynamodb --- .gitmodules | 3 --- build.sh | 4 ---- .../migrator/writers/DynamoStreamReplication.scala | 11 ++++++----- spark-dynamodb | 1 - 4 files changed, 6 insertions(+), 13 deletions(-) delete mode 160000 spark-dynamodb diff --git a/.gitmodules b/.gitmodules index 47276c5f..8d0f11a6 100644 --- a/.gitmodules +++ b/.gitmodules @@ -2,9 +2,6 @@ path = spark-cassandra-connector url = https://github.com/scylladb/spark-cassandra-connector branch = feature/track-token-ranges -[submodule "spark-dynamodb"] - path = spark-dynamodb - url = https://github.com/scylladb/spark-dynamodb [submodule "spark-kinesis"] path = spark-kinesis url = https://github.com/scylladb/spark-kinesis diff --git a/build.sh b/build.sh index 1e3f10bf..a1b503f2 100755 --- a/build.sh +++ b/build.sh @@ -14,9 +14,6 @@ trap "rm -rf $TMPDIR" EXIT pushd spark-cassandra-connector sbt -Djava.io.tmpdir="$TMPDIR" ++2.11.12 assembly popd -pushd spark-dynamodb -sbt assembly -popd pushd spark-kinesis sbt assembly popd @@ -26,7 +23,6 @@ if [ ! -d "./migrator/lib" ]; then fi cp ./spark-cassandra-connector/connector/target/scala-2.11/spark-cassandra-connector-assembly-*.jar ./migrator/lib -cp ./spark-dynamodb/target/scala-2.11/spark-dynamodb-assembly-*.jar ./migrator/lib cp ./spark-kinesis/target/scala-2.11/spark-streaming-kinesis-asl-assembly-*.jar ./migrator/lib sbt -Djava.io.tmpdir="$TMPDIR" migrator/assembly diff --git a/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala b/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala index 105c9f3e..89ceedae 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala @@ -2,7 +2,6 @@ package com.scylladb.migrator.writers import com.amazonaws.services.dynamodbv2.model.{ AttributeValue, TableDescription } import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter -import com.audienceproject.spark.dynamodb.connector.ColumnSchema import com.scylladb.migrator.config.{ AWSCredentials, Rename, SourceSettings, TargetSettings } import org.apache.hadoop.dynamodb.DynamoDBItemWritable import org.apache.hadoop.io.Text @@ -20,6 +19,10 @@ import java.util object DynamoStreamReplication { val log = LogManager.getLogger("com.scylladb.migrator.writers.DynamoStreamReplication") + private val operationTypeColumn = "_dynamo_op_type" + private val putOperation = new AttributeValue().withBOOL(true) + private val deleteOperation = new AttributeValue().withBOOL(false) + def createDStream(spark: SparkSession, streamingContext: StreamingContext, src: SourceSettings.DynamoDB, @@ -56,7 +59,7 @@ object DynamoStreamReplication { case "INSERT" | "MODIFY" => putOperation case "REMOVE" => deleteOperation } - newMap.put(ColumnSchema.OperationTypeColumn, operationType) + newMap.put(operationTypeColumn, operationType) Some(newMap) case _ => None @@ -71,7 +74,7 @@ object DynamoStreamReplication { rdd .map(_._2) // Remove keys because they are not serializable .groupBy { itemWritable => - itemWritable.getItem.get(ColumnSchema.OperationTypeColumn) match { + itemWritable.getItem.get(operationTypeColumn) match { case `putOperation` => "UPSERT" case `deleteOperation` => "DELETE" case _ => "UNKNOWN" @@ -86,6 +89,4 @@ object DynamoStreamReplication { DynamoDB.writeRDD(target, renames, rdd, Some(targetTableDesc))(spark) } - private val putOperation = new AttributeValue().withBOOL(ColumnSchema.PutOperation) - private val deleteOperation = new AttributeValue().withBOOL(ColumnSchema.DeleteOperation) } diff --git a/spark-dynamodb b/spark-dynamodb deleted file mode 160000 index 48faabc8..00000000 --- a/spark-dynamodb +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 48faabc86604c612744f2dc9d65d678a4ba95374 From 2f507844e672e077e59cde0c3a06a7e0a4763213 Mon Sep 17 00:00:00 2001 From: Julien Richard-Foy Date: Fri, 8 Mar 2024 17:20:22 +0100 Subject: [PATCH 4/7] Repartition RDDs before writing to Alternator --- .../migrator/writers/DynamoStreamReplication.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala b/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala index 89ceedae..3dd4309c 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala @@ -65,10 +65,11 @@ object DynamoStreamReplication { case _ => None } .foreachRDD { msgs => - val rdd = msgs.collect { - case Some(item) => (new Text(), new DynamoDBItemWritable(item)) - } - // FIXME re-partition by columns? msgs.repartition(???) + val rdd = msgs + .collect { + case Some(item) => (new Text(), new DynamoDBItemWritable(item)) + } + .repartition(Runtime.getRuntime.availableProcessors() * 2) log.info("Changes to be applied:") rdd From 1db4c8585ef96aff89bd3231d04692885bf0c9a7 Mon Sep 17 00:00:00 2001 From: Julien Richard-Foy Date: Tue, 12 Mar 2024 14:42:32 +0100 Subject: [PATCH 5/7] Update emr-dynamodb-hadoop to 4.16.0 --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 700f7fef..d978aa3c 100644 --- a/build.sbt +++ b/build.sbt @@ -30,7 +30,7 @@ lazy val migrator = (project in file("migrator")).settings( "com.amazonaws" % "aws-java-sdk-dynamodb" % awsSdkVersion, ("com.amazonaws" % "dynamodb-streams-kinesis-adapter" % "1.5.2") .excludeAll(InclExclRule("com.fasterxml.jackson.core")), - "com.amazon.emr" % "emr-dynamodb-hadoop" % "4.8.0", + "com.amazon.emr" % "emr-dynamodb-hadoop" % "4.16.0", "org.yaml" % "snakeyaml" % "1.23", "io.circe" %% "circe-yaml" % "0.9.0", "io.circe" %% "circe-generic" % "0.9.0", From afcbfd85919ffd9d6895bfeb6ae18a93667a9983 Mon Sep 17 00:00:00 2001 From: Julien Richard-Foy Date: Wed, 13 Mar 2024 12:08:44 +0100 Subject: [PATCH 6/7] Fix streaming issue The RDD keys are not serializable, which can fail some RDD operations. We create the RDD element keys _after_ repartitioning to avoid them being serialized across partitions. This change allowed me to successfully run a data migration with stream changes enabled. Such a scenario can not be added to our test suite though because KCL only works with the real AWS servers (see #113) --- .../com/scylladb/migrator/DynamoUtils.scala | 14 ++++++-- .../writers/DynamoStreamReplication.scala | 36 ++++++++++--------- 2 files changed, 32 insertions(+), 18 deletions(-) diff --git a/migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala b/migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala index 3128791e..839b10c3 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala @@ -64,7 +64,8 @@ object DynamoUtils { def enableDynamoStream(source: SourceSettings.DynamoDB): Unit = { val sourceClient = buildDynamoClient(source.endpoint, source.credentials, source.region) - val sourceStreamsClient = buildDynamoStreamsClient(source.credentials, source.region) + val sourceStreamsClient = + buildDynamoStreamsClient(source.endpoint, source.credentials, source.region) sourceClient .updateTable( @@ -114,9 +115,18 @@ object DynamoUtils { builder.build() } - def buildDynamoStreamsClient(creds: Option[AWSCredentialsProvider], region: Option[String]) = { + def buildDynamoStreamsClient(endpoint: Option[DynamoDBEndpoint], + creds: Option[AWSCredentialsProvider], + region: Option[String]) = { val builder = AmazonDynamoDBStreamsClientBuilder.standard() + endpoint.foreach { endpoint => + builder + .withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration( + endpoint.renderEndpoint, + region.getOrElse("empty"))) + } creds.foreach(builder.withCredentials) region.foreach(builder.withRegion) diff --git a/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala b/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala index 3dd4309c..ca17bf50 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala @@ -66,26 +66,30 @@ object DynamoStreamReplication { } .foreachRDD { msgs => val rdd = msgs - .collect { - case Some(item) => (new Text(), new DynamoDBItemWritable(item)) - } + .collect { case Some(item) => new DynamoDBItemWritable(item) } .repartition(Runtime.getRuntime.availableProcessors() * 2) + .map(item => (new Text, item)) // Create the key after repartitioning to avoid Serialization issues - log.info("Changes to be applied:") - rdd - .map(_._2) // Remove keys because they are not serializable - .groupBy { itemWritable => - itemWritable.getItem.get(operationTypeColumn) match { - case `putOperation` => "UPSERT" - case `deleteOperation` => "DELETE" - case _ => "UNKNOWN" + val changes = + rdd + .map(_._2) // Remove keys because they are not serializable + .groupBy { itemWritable => + itemWritable.getItem.get(operationTypeColumn) match { + case `putOperation` => "UPSERT" + case `deleteOperation` => "DELETE" + case _ => "UNKNOWN" + } } + .mapValues(_.size) + .collect() + if (changes.nonEmpty) { + log.info("Changes to be applied:") + for ((operation, count) <- changes) { + log.info(s"${operation}: ${count}") } - .mapValues(_.size) - .foreach { - case (operation, count) => - log.info(s"${operation}: ${count}") - } + } else { + log.info("No changes to apply") + } DynamoDB.writeRDD(target, renames, rdd, Some(targetTableDesc))(spark) } From 8e9728d63ff2cdf79f5bb5195b607ba80e684cbe Mon Sep 17 00:00:00 2001 From: Julien Richard-Foy Date: Mon, 18 Mar 2024 09:59:46 +0100 Subject: [PATCH 7/7] Factor out Hadoop DynamoDB jobs configuration --- .../com/scylladb/migrator/DynamoUtils.scala | 53 ++++++++++++++++++- .../scylladb/migrator/readers/DynamoDB.scala | 25 ++++----- .../scylladb/migrator/writers/DynamoDB.scala | 26 ++++----- 3 files changed, 71 insertions(+), 33 deletions(-) diff --git a/migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala b/migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala index 839b10c3..1fd259a4 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala @@ -4,7 +4,6 @@ import com.amazonaws.auth.AWSCredentialsProvider import com.amazonaws.client.builder.AwsClientBuilder import com.amazonaws.services.dynamodbv2.{ AmazonDynamoDBClientBuilder, - AmazonDynamoDBStreamsClient, AmazonDynamoDBStreamsClientBuilder } import com.amazonaws.services.dynamodbv2.model.{ @@ -17,7 +16,14 @@ import com.amazonaws.services.dynamodbv2.model.{ TableDescription, UpdateTableRequest } -import com.scylladb.migrator.config.{ DynamoDBEndpoint, SourceSettings, TargetSettings } +import com.scylladb.migrator.config.{ + AWSCredentials, + DynamoDBEndpoint, + SourceSettings, + TargetSettings +} +import org.apache.hadoop.dynamodb.DynamoDBConstants +import org.apache.hadoop.mapred.JobConf import org.apache.log4j.LogManager import scala.util.{ Failure, Success, Try } @@ -132,4 +138,47 @@ object DynamoUtils { builder.build() } + + /** + * Optionally set a configuration. If `maybeValue` is empty, nothing is done. Otherwise, + * its value is set to the `name` property on the `jobConf`. + * + * @param jobConf Target JobConf to configure + * @param name Name of the Hadoop configuration key + * @param maybeValue Optional value to set. + */ + def setOptionalConf(jobConf: JobConf, name: String, maybeValue: Option[String]): Unit = + for (value <- maybeValue) { + jobConf.set(name, value) + } + + /** + * Set the common configuration of both read and write DynamoDB jobs. + * @param jobConf Target JobConf to configure + * @param maybeRegion AWS region + * @param maybeEndpoint AWS endpoint + * @param maybeScanSegments Scan segments + * @param maybeMaxMapTasks Max map tasks + * @param maybeAwsCredentials AWS credentials + */ + def setDynamoDBJobConf(jobConf: JobConf, + maybeRegion: Option[String], + maybeEndpoint: Option[DynamoDBEndpoint], + maybeScanSegments: Option[Int], + maybeMaxMapTasks: Option[Int], + maybeAwsCredentials: Option[AWSCredentials]): Unit = { + setOptionalConf(jobConf, DynamoDBConstants.REGION, maybeRegion) + setOptionalConf(jobConf, DynamoDBConstants.ENDPOINT, maybeEndpoint.map(_.renderEndpoint)) + setOptionalConf(jobConf, DynamoDBConstants.SCAN_SEGMENTS, maybeScanSegments.map(_.toString)) + setOptionalConf(jobConf, DynamoDBConstants.MAX_MAP_TASKS, maybeMaxMapTasks.map(_.toString)) + for (credentials <- maybeAwsCredentials) { + jobConf.set(DynamoDBConstants.DYNAMODB_ACCESS_KEY_CONF, credentials.accessKey) + jobConf.set(DynamoDBConstants.DYNAMODB_SECRET_KEY_CONF, credentials.secretKey) + } + jobConf.set( + "mapred.output.format.class", + "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat") + jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat") + } + } diff --git a/migrator/src/main/scala/com/scylladb/migrator/readers/DynamoDB.scala b/migrator/src/main/scala/com/scylladb/migrator/readers/DynamoDB.scala index 12c3a122..c29d4240 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/readers/DynamoDB.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/readers/DynamoDB.scala @@ -1,6 +1,7 @@ package com.scylladb.migrator.readers import com.amazonaws.services.dynamodbv2.model.TableDescription +import com.scylladb.migrator.DynamoUtils.{ setDynamoDBJobConf, setOptionalConf } import com.scylladb.migrator.config.SourceSettings import org.apache.hadoop.dynamodb.{ DynamoDBConstants, DynamoDBItemWritable } import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat @@ -25,25 +26,19 @@ object DynamoDB { else None val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration) - def setOptionalConf(name: String, maybeValue: Option[String]): Unit = - for (value <- maybeValue) { - jobConf.set(name, value) - } + setDynamoDBJobConf( + jobConf, + source.region, + source.endpoint, + source.scanSegments, + source.maxMapTasks, + source.credentials) jobConf.set(DynamoDBConstants.INPUT_TABLE_NAME, source.table) - setOptionalConf(DynamoDBConstants.REGION, source.region) - setOptionalConf(DynamoDBConstants.ENDPOINT, source.endpoint.map(_.renderEndpoint)) - setOptionalConf(DynamoDBConstants.READ_THROUGHPUT, throughput) + setOptionalConf(jobConf, DynamoDBConstants.READ_THROUGHPUT, throughput) setOptionalConf( + jobConf, DynamoDBConstants.THROUGHPUT_READ_PERCENT, source.throughputReadPercent.map(_.toString)) - setOptionalConf(DynamoDBConstants.SCAN_SEGMENTS, source.scanSegments.map(_.toString)) - setOptionalConf(DynamoDBConstants.MAX_MAP_TASKS, source.maxMapTasks.map(_.toString)) - setOptionalConf(DynamoDBConstants.DYNAMODB_ACCESS_KEY_CONF, source.credentials.map(_.accessKey)) - setOptionalConf(DynamoDBConstants.DYNAMODB_SECRET_KEY_CONF, source.credentials.map(_.secretKey)) - jobConf.set( - "mapred.output.format.class", - "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat") - jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat") spark.sparkContext.hadoopRDD( jobConf, diff --git a/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoDB.scala b/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoDB.scala index dab4dbc7..c5542cf8 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoDB.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoDB.scala @@ -1,6 +1,7 @@ package com.scylladb.migrator.writers import com.amazonaws.services.dynamodbv2.model.TableDescription +import com.scylladb.migrator.DynamoUtils.{ setDynamoDBJobConf, setOptionalConf } import com.scylladb.migrator.config.{ Rename, TargetSettings } import org.apache.hadoop.dynamodb.{ DynamoDBConstants, DynamoDBItemWritable } import org.apache.hadoop.io.Text @@ -26,26 +27,19 @@ object DynamoDB { val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration) - def setOptionalConf(name: String, maybeValue: Option[String]): Unit = - for (value <- maybeValue) { - jobConf.set(name, value) - } - + setDynamoDBJobConf( + jobConf, + target.region, + target.endpoint, + target.scanSegments, + target.maxMapTasks, + target.credentials) jobConf.set(DynamoDBConstants.OUTPUT_TABLE_NAME, target.table) - setOptionalConf(DynamoDBConstants.REGION, target.region) - setOptionalConf(DynamoDBConstants.ENDPOINT, target.endpoint.map(_.renderEndpoint)) - setOptionalConf(DynamoDBConstants.READ_THROUGHPUT, throughput) + setOptionalConf(jobConf, DynamoDBConstants.WRITE_THROUGHPUT, throughput) setOptionalConf( + jobConf, DynamoDBConstants.THROUGHPUT_WRITE_PERCENT, target.throughputWritePercent.map(_.toString)) - setOptionalConf(DynamoDBConstants.SCAN_SEGMENTS, target.scanSegments.map(_.toString)) - setOptionalConf(DynamoDBConstants.MAX_MAP_TASKS, target.maxMapTasks.map(_.toString)) - setOptionalConf(DynamoDBConstants.DYNAMODB_ACCESS_KEY_CONF, target.credentials.map(_.accessKey)) - setOptionalConf(DynamoDBConstants.DYNAMODB_SECRET_KEY_CONF, target.credentials.map(_.secretKey)) - jobConf.set( - "mapred.output.format.class", - "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat") - jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat") rdd .mapValues { itemWritable =>