Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not try to infer a schema when migrating from DynamoDB to Alternator #105

Merged
merged 7 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
path = spark-cassandra-connector
url = https://github.com/scylladb/spark-cassandra-connector
branch = feature/track-token-ranges
[submodule "spark-dynamodb"]
path = spark-dynamodb
url = https://github.com/scylladb/spark-dynamodb
[submodule "spark-kinesis"]
path = spark-kinesis
url = https://github.com/scylladb/spark-kinesis
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ lazy val migrator = (project in file("migrator")).settings(
"com.amazonaws" % "aws-java-sdk-dynamodb" % awsSdkVersion,
("com.amazonaws" % "dynamodb-streams-kinesis-adapter" % "1.5.2")
.excludeAll(InclExclRule("com.fasterxml.jackson.core")),
"com.amazon.emr" % "emr-dynamodb-hadoop" % "4.16.0",
"org.yaml" % "snakeyaml" % "1.23",
"io.circe" %% "circe-yaml" % "0.9.0",
"io.circe" %% "circe-generic" % "0.9.0",
Expand Down
4 changes: 0 additions & 4 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ trap "rm -rf $TMPDIR" EXIT
pushd spark-cassandra-connector
sbt -Djava.io.tmpdir="$TMPDIR" ++2.11.12 assembly
popd
pushd spark-dynamodb
sbt assembly
popd
pushd spark-kinesis
sbt assembly
popd
Expand All @@ -26,7 +23,6 @@ if [ ! -d "./migrator/lib" ]; then
fi

cp ./spark-cassandra-connector/connector/target/scala-2.11/spark-cassandra-connector-assembly-*.jar ./migrator/lib
cp ./spark-dynamodb/target/scala-2.11/spark-dynamodb-assembly-*.jar ./migrator/lib
cp ./spark-kinesis/target/scala-2.11/spark-streaming-kinesis-asl-assembly-*.jar ./migrator/lib

sbt -Djava.io.tmpdir="$TMPDIR" migrator/assembly
2 changes: 2 additions & 0 deletions docker-compose-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ services:
volumes:
- ./migrator/target/scala-2.11:/jars
- ./tests/src/test/configurations:/app/configurations
# Workaround for https://github.com/awslabs/emr-dynamodb-connector/issues/50
- ${PWD}/tests/docker/job-flow.json:/mnt/var/lib/info/job-flow.json

spark-worker:
image: bde2020/spark-worker:2.4.4-hadoop2.7
Expand Down
14 changes: 12 additions & 2 deletions migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ object DynamoUtils {

def enableDynamoStream(source: SourceSettings.DynamoDB): Unit = {
val sourceClient = buildDynamoClient(source.endpoint, source.credentials, source.region)
val sourceStreamsClient = buildDynamoStreamsClient(source.credentials, source.region)
val sourceStreamsClient =
buildDynamoStreamsClient(source.endpoint, source.credentials, source.region)

sourceClient
.updateTable(
Expand Down Expand Up @@ -114,9 +115,18 @@ object DynamoUtils {
builder.build()
}

def buildDynamoStreamsClient(creds: Option[AWSCredentialsProvider], region: Option[String]) = {
def buildDynamoStreamsClient(endpoint: Option[DynamoDBEndpoint],
creds: Option[AWSCredentialsProvider],
region: Option[String]) = {
val builder = AmazonDynamoDBStreamsClientBuilder.standard()

endpoint.foreach { endpoint =>
builder
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(
endpoint.renderEndpoint,
region.getOrElse("empty")))
}
creds.foreach(builder.withCredentials)
region.foreach(builder.withRegion)

Expand Down
216 changes: 14 additions & 202 deletions migrator/src/main/scala/com/scylladb/migrator/Migrator.scala
Original file line number Diff line number Diff line change
@@ -1,21 +1,10 @@
package com.scylladb.migrator

import java.nio.charset.StandardCharsets
import java.nio.file.{ Files, Paths }
import java.util.concurrent.{ ScheduledThreadPoolExecutor, TimeUnit }
import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter
import com.datastax.spark.connector.rdd.partitioner.{ CassandraPartition, CqlTokenRange }
import com.datastax.spark.connector.rdd.partitioner.dht.Token
import com.datastax.spark.connector.writer._
import com.scylladb.migrator.alternator.AlternatorMigrator
import com.scylladb.migrator.config._
import com.scylladb.migrator.writers.DynamoStreamReplication
import com.scylladb.migrator.scylla.ScyllaMigrator
import org.apache.log4j.{ Level, LogManager, Logger }
import org.apache.spark.sql._
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.kinesis.{ KinesisInputDStream, SparkAWSCredentials }
import sun.misc.{ Signal, SignalHandler }

import scala.util.control.NonFatal

object Migrator {
val log = LogManager.getLogger("com.scylladb.migrator")
Expand All @@ -27,7 +16,6 @@ object Migrator {
.config("spark.task.maxFailures", "1024")
.config("spark.stage.maxConsecutiveAttempts", "60")
.getOrCreate
val streamingContext = new StreamingContext(spark.sparkContext, Seconds(5))

Logger.getRootLogger.setLevel(Level.WARN)
log.setLevel(Level.INFO)
Expand All @@ -39,202 +27,26 @@ object Migrator {

log.info(s"Loaded config: ${migratorConfig}")

val scheduler = new ScheduledThreadPoolExecutor(1)

val sourceDF =
migratorConfig.source match {
case cassandraSource: SourceSettings.Cassandra =>
readers.Cassandra.readDataframe(
try {
(migratorConfig.source, migratorConfig.target) match {
case (cassandraSource: SourceSettings.Cassandra, scyllaTarget: TargetSettings.Scylla) =>
val sourceDF = readers.Cassandra.readDataframe(
spark,
cassandraSource,
cassandraSource.preserveTimestamps,
migratorConfig.skipTokenRanges)
case parquetSource: SourceSettings.Parquet =>
readers.Parquet.readDataFrame(spark, parquetSource)
case dynamoSource: SourceSettings.DynamoDB =>
val tableDesc = DynamoUtils
.buildDynamoClient(dynamoSource.endpoint, dynamoSource.credentials, dynamoSource.region)
.describeTable(dynamoSource.table)
.getTable

readers.DynamoDB.readDataFrame(spark, dynamoSource, tableDesc)
}

log.info("Created source dataframe; resulting schema:")
sourceDF.dataFrame.printSchema()

val tokenRangeAccumulator =
if (!sourceDF.savepointsSupported) None
else {
val tokenRangeAccumulator = TokenRangeAccumulator.empty
spark.sparkContext.register(tokenRangeAccumulator, "Token ranges copied")

addUSR2Handler(migratorConfig, tokenRangeAccumulator)
startSavepointSchedule(scheduler, migratorConfig, tokenRangeAccumulator)

Some(tokenRangeAccumulator)
}

log.info(
"We need to transfer: " + sourceDF.dataFrame.rdd.getNumPartitions + " partitions in total")

if (migratorConfig.source.isInstanceOf[SourceSettings.Cassandra]) {
val partitions = sourceDF.dataFrame.rdd.partitions
val cassandraPartitions = partitions.map(p => {
p.asInstanceOf[CassandraPartition[_, _]]
})
var allTokenRanges = Set[(Token[_], Token[_])]()
cassandraPartitions.foreach(p => {
p.tokenRanges
.asInstanceOf[Vector[CqlTokenRange[_, _]]]
.foreach(tr => {
val range =
Set((tr.range.start.asInstanceOf[Token[_]], tr.range.end.asInstanceOf[Token[_]]))
allTokenRanges = allTokenRanges ++ range
})

})

log.info("All token ranges extracted from partitions size:" + allTokenRanges.size)

if (migratorConfig.skipTokenRanges != None) {
log.info(
"Savepoints array defined, size of the array: " + migratorConfig.skipTokenRanges.size)

val diff = allTokenRanges.diff(migratorConfig.skipTokenRanges)
log.info("Diff ... total diff of full ranges to savepoints is: " + diff.size)
log.debug("Dump of the missing tokens: ")
log.debug(diff)
}
}

log.info("Starting write...")

try {
migratorConfig.target match {
case target: TargetSettings.Scylla =>
writers.Scylla.writeDataframe(
target,
migratorConfig.renames,
sourceDF.dataFrame,
sourceDF.timestampColumns,
tokenRangeAccumulator)
case target: TargetSettings.DynamoDB =>
val sourceAndDescriptions = migratorConfig.source match {
case source: SourceSettings.DynamoDB =>
if (target.streamChanges) {
log.info(
"Source is a Dynamo table and change streaming requested; enabling Dynamo Stream")
DynamoUtils.enableDynamoStream(source)
}
val sourceDesc =
DynamoUtils
.buildDynamoClient(source.endpoint, source.credentials, source.region)
.describeTable(source.table)
.getTable

Some(
(
source,
sourceDesc,
DynamoUtils.replicateTableDefinition(
sourceDesc,
target
)
))

case _ =>
None
}

writers.DynamoDB.writeDataframe(
target,
migratorConfig.renames,
sourceDF.dataFrame,
sourceAndDescriptions.map(_._3))

sourceAndDescriptions.foreach {
case (source, sourceDesc, targetDesc) =>
if (target.streamChanges) {
log.info("Done transferring table snapshot. Starting to transfer changes")

DynamoStreamReplication.createDStream(
spark,
streamingContext,
source,
target,
sourceDF.dataFrame.schema,
sourceDesc,
targetDesc,
migratorConfig.renames)

streamingContext.start()
streamingContext.awaitTermination()
}
}
ScyllaMigrator.migrate(migratorConfig, scyllaTarget, sourceDF)
case (parquetSource: SourceSettings.Parquet, scyllaTarget: TargetSettings.Scylla) =>
val sourceDF = readers.Parquet.readDataFrame(spark, parquetSource)
ScyllaMigrator.migrate(migratorConfig, scyllaTarget, sourceDF)
case (dynamoSource: SourceSettings.DynamoDB, alternatorTarget: TargetSettings.DynamoDB) =>
AlternatorMigrator.migrate(dynamoSource, alternatorTarget, migratorConfig.renames)
case _ =>
sys.error("Unsupported combination of source and target.")
}
} catch {
case NonFatal(e) => // Catching everything on purpose to try and dump the accumulator state
log.error(
"Caught error while writing the DataFrame. Will create a savepoint before exiting",
e)
} finally {
tokenRangeAccumulator.foreach(dumpAccumulatorState(migratorConfig, _, "final"))
scheduler.shutdown()
spark.stop()
}
}

def savepointFilename(path: String): String =
s"${path}/savepoint_${System.currentTimeMillis / 1000}.yaml"

def dumpAccumulatorState(config: MigratorConfig,
accumulator: TokenRangeAccumulator,
reason: String): Unit = {
val filename =
Paths.get(savepointFilename(config.savepoints.path)).normalize
val rangesToSkip = accumulator.value.get.map(range =>
(range.range.start.asInstanceOf[Token[_]], range.range.end.asInstanceOf[Token[_]]))

val modifiedConfig = config.copy(
skipTokenRanges = config.skipTokenRanges ++ rangesToSkip
)

Files.write(filename, modifiedConfig.render.getBytes(StandardCharsets.UTF_8))

log.info(
s"Created a savepoint config at ${filename} due to ${reason}. Ranges added: ${rangesToSkip}")
}

def startSavepointSchedule(svc: ScheduledThreadPoolExecutor,
config: MigratorConfig,
acc: TokenRangeAccumulator): Unit = {
val runnable = new Runnable {
override def run(): Unit =
try dumpAccumulatorState(config, acc, "schedule")
catch {
case e: Throwable =>
log.error("Could not create the savepoint. This will be retried.", e)
}
}

log.info(
s"Starting savepoint schedule; will write a savepoint every ${config.savepoints.intervalSeconds} seconds")

svc.scheduleAtFixedRate(runnable, 0, config.savepoints.intervalSeconds, TimeUnit.SECONDS)
}

def addUSR2Handler(config: MigratorConfig, acc: TokenRangeAccumulator) = {
log.info(
"Installing SIGINT/TERM/USR2 handler. Send this to dump the current progress to a savepoint.")

val handler = new SignalHandler {
override def handle(signal: Signal): Unit =
dumpAccumulatorState(config, acc, signal.toString)
}

Signal.handle(new Signal("USR2"), handler)
Signal.handle(new Signal("TERM"), handler)
Signal.handle(new Signal("INT"), handler)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.scylladb.migrator.alternator

import com.scylladb.migrator.DynamoUtils
import com.scylladb.migrator.config.{ Rename, SourceSettings, TargetSettings }
import com.scylladb.migrator.{ readers, writers }
import com.scylladb.migrator.writers.DynamoStreamReplication
import org.apache.log4j.LogManager
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{ Seconds, StreamingContext }

import scala.util.control.NonFatal

object AlternatorMigrator {
val log = LogManager.getLogger("com.scylladb.migrator.alternator")

def migrate(source: SourceSettings.DynamoDB,
target: TargetSettings.DynamoDB,
renames: List[Rename])(implicit spark: SparkSession): Unit = {

val sourceTableDesc = DynamoUtils
.buildDynamoClient(source.endpoint, source.credentials, source.region)
.describeTable(source.table)
.getTable

val sourceRDD =
readers.DynamoDB.readRDD(spark, source, sourceTableDesc)

log.info("We need to transfer: " + sourceRDD.getNumPartitions + " partitions in total")

log.info("Starting write...")

try {
val targetTableDesc = {
if (target.streamChanges) {
log.info(
"Source is a Dynamo table and change streaming requested; enabling Dynamo Stream")
DynamoUtils.enableDynamoStream(source)
}

DynamoUtils.replicateTableDefinition(
sourceTableDesc,
target
)
}

writers.DynamoDB.writeRDD(target, renames, sourceRDD, Some(targetTableDesc))

if (target.streamChanges) {
log.info("Done transferring table snapshot. Starting to transfer changes")
val streamingContext = new StreamingContext(spark.sparkContext, Seconds(5))

DynamoStreamReplication.createDStream(
spark,
streamingContext,
source,
target,
targetTableDesc,
renames)

streamingContext.start()
streamingContext.awaitTermination()
}
} catch {
case NonFatal(e) =>
log.error("Caught error while writing the RDD.", e)
}

}

}
Loading
Loading