diff --git a/.gitmodules b/.gitmodules index 8d0f11a6..78f4fa73 100644 --- a/.gitmodules +++ b/.gitmodules @@ -2,6 +2,3 @@ path = spark-cassandra-connector url = https://github.com/scylladb/spark-cassandra-connector branch = feature/track-token-ranges -[submodule "spark-kinesis"] - path = spark-kinesis - url = https://github.com/scylladb/spark-kinesis diff --git a/build.sbt b/build.sbt index 0d8f065a..5eead8be 100644 --- a/build.sbt +++ b/build.sbt @@ -2,6 +2,7 @@ import sbt.librarymanagement.InclExclRule val awsSdkVersion = "1.11.728" val sparkVersion = "2.4.4" +val dynamodbStreamsKinesisAdapterVersion = "1.5.2" inThisBuild( List( @@ -11,9 +12,12 @@ inThisBuild( ) ) -// Adaptation of spark-streaming-kinesis-asl to work with DynamoDB Streams +// Augmentation of spark-streaming-kinesis-asl to also work with DynamoDB Streams lazy val `spark-kinesis-dynamodb` = project.in(file("spark-kinesis-dynamodb")).settings( - libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % sparkVersion, + libraryDependencies ++= Seq( + "org.apache.spark" %% "spark-streaming-kinesis-asl" % sparkVersion, + "com.amazonaws" % "dynamodb-streams-kinesis-adapter" % dynamodbStreamsKinesisAdapterVersion + ) ) lazy val migrator = (project in file("migrator")).settings( @@ -28,29 +32,39 @@ lazy val migrator = (project in file("migrator")).settings( "-XX:+CMSClassUnloadingEnabled"), scalacOptions ++= Seq("-deprecation", "-unchecked", "-Ypartial-unification"), Test / parallelExecution := false, - fork := true, - scalafmtOnCompile := true, + fork := true, + scalafmtOnCompile := true, libraryDependencies ++= Seq( "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided", "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", "com.amazonaws" % "aws-java-sdk-sts" % awsSdkVersion, "com.amazonaws" % "aws-java-sdk-dynamodb" % awsSdkVersion, - ("com.amazonaws" % "dynamodb-streams-kinesis-adapter" % "1.5.2") + ("com.amazonaws" % "dynamodb-streams-kinesis-adapter" % dynamodbStreamsKinesisAdapterVersion) .excludeAll(InclExclRule("com.fasterxml.jackson.core")), "com.amazon.emr" % "emr-dynamodb-hadoop" % "4.16.0", - "org.yaml" % "snakeyaml" % "1.23", - "io.circe" %% "circe-yaml" % "0.9.0", - "io.circe" %% "circe-generic" % "0.9.0", + "org.yaml" % "snakeyaml" % "1.23", + "io.circe" %% "circe-yaml" % "0.9.0", + "io.circe" %% "circe-generic" % "0.9.0", ), assembly / assemblyShadeRules := Seq( ShadeRule.rename("org.yaml.snakeyaml.**" -> "com.scylladb.shaded.@1").inAll ), assembly / assemblyMergeStrategy := { - case PathList("org", "joda", "time", _ @_*) => MergeStrategy.first + // Handle conflicts between our own library dependencies and those that are bundled into + // the spark-cassandra-connector fat-jar + case PathList("com", "codahale", "metrics", _ @_*) => MergeStrategy.first + case PathList("digesterRules.xml") => MergeStrategy.first + case PathList("org", "aopalliance", _ @_*) => MergeStrategy.first + case PathList("org", "apache", "commons", "collections", _ @_*) => MergeStrategy.first + case PathList("org", "apache", "commons", "configuration", _ @_*) => MergeStrategy.first case PathList("org", "apache", "commons", "logging", _ @_*) => MergeStrategy.first - case PathList("com", "fasterxml", "jackson", "annotation", _ @_*) => MergeStrategy.first - case PathList("com", "fasterxml", "jackson", "core", _ @_*) => MergeStrategy.first - case PathList("com", "fasterxml", "jackson", "databind", _ @_*) => MergeStrategy.first + case PathList("org", "apache", "spark", _ @_*) => MergeStrategy.first + case PathList("org", "slf4j", _ @_*) => MergeStrategy.first + case PathList("properties.dtd") => MergeStrategy.first + case PathList("PropertyList-1.0.dtd") => MergeStrategy.first + // Other conflicts + case PathList("javax", "inject", _ @_*) => MergeStrategy.first + case PathList("org", "apache", "hadoop", _ @_*) => MergeStrategy.first case x => val oldStrategy = (assembly / assemblyMergeStrategy).value oldStrategy(x) @@ -78,12 +92,12 @@ lazy val migrator = (project in file("migrator")).settings( lazy val tests = project.in(file("tests")).settings( libraryDependencies ++= Seq( - "com.amazonaws" % "aws-java-sdk-dynamodb" % awsSdkVersion, - "org.apache.cassandra" % "java-driver-query-builder" % "4.18.0", - "com.github.mjakubowski84" %% "parquet4s-core" % "1.9.4", - "org.apache.hadoop" % "hadoop-client" % "2.9.2", - "org.scalameta" %% "munit" % "0.7.29", - "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" + "com.amazonaws" % "aws-java-sdk-dynamodb" % awsSdkVersion, + "org.apache.cassandra" % "java-driver-query-builder" % "4.18.0", + "com.github.mjakubowski84" %% "parquet4s-core" % "1.9.4", + "org.apache.hadoop" % "hadoop-client" % "2.9.2", + "org.scalameta" %% "munit" % "0.7.29", + "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" ), Test / parallelExecution := false ) diff --git a/build.sh b/build.sh index a1b503f2..01837eba 100755 --- a/build.sh +++ b/build.sh @@ -14,15 +14,11 @@ trap "rm -rf $TMPDIR" EXIT pushd spark-cassandra-connector sbt -Djava.io.tmpdir="$TMPDIR" ++2.11.12 assembly popd -pushd spark-kinesis -sbt assembly -popd if [ ! -d "./migrator/lib" ]; then mkdir migrator/lib fi cp ./spark-cassandra-connector/connector/target/scala-2.11/spark-cassandra-connector-assembly-*.jar ./migrator/lib -cp ./spark-kinesis/target/scala-2.11/spark-streaming-kinesis-asl-assembly-*.jar ./migrator/lib sbt -Djava.io.tmpdir="$TMPDIR" migrator/assembly diff --git a/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala b/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala index ca17bf50..70de3fd1 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala @@ -9,8 +9,8 @@ import org.apache.log4j.LogManager import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.kinesis.{ + KinesisDynamoDBInputDStream, KinesisInitialPositions, - KinesisInputDStream, SparkAWSCredentials } @@ -29,22 +29,13 @@ object DynamoStreamReplication { target: TargetSettings.DynamoDB, targetTableDesc: TableDescription, renames: List[Rename]): Unit = - KinesisInputDStream.builder - .streamingContext(streamingContext) - .streamName(src.table) - .dynamoStream(true) - .kinesisCredentials( - src.credentials.map { - case AWSCredentials(accessKey, secretKey) => - SparkAWSCredentials.builder - .basicCredentials(accessKey, secretKey) - .build - }.orNull - ) - .regionName(src.region.orNull) - .checkpointAppName(s"migrator_${src.table}_${System.currentTimeMillis()}") - .initialPosition(new KinesisInitialPositions.TrimHorizon) - .buildWithMessageHandler { + new KinesisDynamoDBInputDStream( + streamingContext, + streamName = src.table, + regionName = src.region.orNull, + initialPosition = new KinesisInitialPositions.TrimHorizon, + checkpointAppName = s"migrator_${src.table}_${System.currentTimeMillis()}", + messageHandler = { case recAdapter: RecordAdapter => val rec = recAdapter.getInternalObject val newMap = new util.HashMap[String, AttributeValue]() @@ -63,35 +54,41 @@ object DynamoStreamReplication { Some(newMap) case _ => None - } - .foreachRDD { msgs => - val rdd = msgs - .collect { case Some(item) => new DynamoDBItemWritable(item) } - .repartition(Runtime.getRuntime.availableProcessors() * 2) - .map(item => (new Text, item)) // Create the key after repartitioning to avoid Serialization issues + }, + kinesisCreds = src.credentials.map { + case AWSCredentials(accessKey, secretKey) => + SparkAWSCredentials.builder + .basicCredentials(accessKey, secretKey) + .build() + }.orNull + ).foreachRDD { msgs => + val rdd = msgs + .collect { case Some(item) => new DynamoDBItemWritable(item) } + .repartition(Runtime.getRuntime.availableProcessors() * 2) + .map(item => (new Text, item)) // Create the key after repartitioning to avoid Serialization issues - val changes = - rdd - .map(_._2) // Remove keys because they are not serializable - .groupBy { itemWritable => - itemWritable.getItem.get(operationTypeColumn) match { - case `putOperation` => "UPSERT" - case `deleteOperation` => "DELETE" - case _ => "UNKNOWN" - } + val changes = + rdd + .map(_._2) // Remove keys because they are not serializable + .groupBy { itemWritable => + itemWritable.getItem.get(operationTypeColumn) match { + case `putOperation` => "UPSERT" + case `deleteOperation` => "DELETE" + case _ => "UNKNOWN" } - .mapValues(_.size) - .collect() - if (changes.nonEmpty) { - log.info("Changes to be applied:") - for ((operation, count) <- changes) { - log.info(s"${operation}: ${count}") } - } else { - log.info("No changes to apply") + .mapValues(_.size) + .collect() + if (changes.nonEmpty) { + log.info("Changes to be applied:") + for ((operation, count) <- changes) { + log.info(s"${operation}: ${count}") } - - DynamoDB.writeRDD(target, renames, rdd, Some(targetTableDesc))(spark) + } else { + log.info("No changes to apply") } + DynamoDB.writeRDD(target, renames, rdd, Some(targetTableDesc))(spark) + } + } diff --git a/spark-kinesis b/spark-kinesis deleted file mode 160000 index d7ba51cc..00000000 --- a/spark-kinesis +++ /dev/null @@ -1 +0,0 @@ -Subproject commit d7ba51cc670b0e323017694804fab5f9e528b4c9 diff --git a/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala b/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDynamoDBCheckpointer.scala similarity index 98% rename from spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala rename to spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDynamoDBCheckpointer.scala index 11e94953..946ba82e 100644 --- a/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala +++ b/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDynamoDBCheckpointer.scala @@ -36,8 +36,8 @@ import org.apache.spark.util.{Clock, SystemClock} * @param workerId Worker Id of KCL worker for logging purposes * @param clock In order to use ManualClocks for the purpose of testing */ -private[kinesis] class KinesisCheckpointer( - receiver: KinesisReceiver[_], +private[kinesis] class KinesisDynamoDBCheckpointer( + receiver: KinesisDynamoDBReceiver[_], checkpointInterval: Duration, workerId: String, clock: Clock = new SystemClock) extends Logging { diff --git a/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDynamoDBInputDStream.scala b/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDynamoDBInputDStream.scala new file mode 100644 index 00000000..e30500ff --- /dev/null +++ b/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDynamoDBInputDStream.scala @@ -0,0 +1,52 @@ +package org.apache.spark.streaming.kinesis + +import com.amazonaws.services.kinesis.model.Record +import org.apache.spark.streaming.kinesis.KinesisInputDStream.{DEFAULT_KINESIS_ENDPOINT_URL, DEFAULT_STORAGE_LEVEL} +import org.apache.spark.streaming.receiver.Receiver +import org.apache.spark.streaming.StreamingContext + +import scala.reflect.ClassTag + +/** + * Override the default behavior of [[KinesisInputDStream]] to create a [[KinesisDynamoDBReceiver]]. + */ +class KinesisDynamoDBInputDStream[T: ClassTag]( + ssc: StreamingContext, + streamName: String, + regionName: String, + initialPosition: KinesisInitialPosition, + checkpointAppName: String, + messageHandler: Record => T, + kinesisCreds: SparkAWSCredentials +) extends KinesisInputDStream[T]( + ssc, + streamName, + DEFAULT_KINESIS_ENDPOINT_URL, + regionName, + initialPosition, + checkpointAppName, + ssc.graph.batchDuration, + DEFAULT_STORAGE_LEVEL, + messageHandler, + kinesisCreds, + None, + None + ) { + + override def getReceiver(): Receiver[T] = { + new KinesisDynamoDBReceiver( + streamName, + endpointUrl, + regionName, + initialPosition, + checkpointAppName, + checkpointInterval, + DEFAULT_STORAGE_LEVEL, + messageHandler, + kinesisCreds, + dynamoDBCreds, + cloudWatchCreds + ) + } + +} diff --git a/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDynamoDBReceiver.scala similarity index 87% rename from spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala rename to spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDynamoDBReceiver.scala index 69c52365..ca37d7db 100644 --- a/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDynamoDBReceiver.scala @@ -23,7 +23,12 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.control.NonFatal -import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorCheckpointer, IRecordProcessorFactory} +import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder +import com.amazonaws.services.dynamodbv2.{AmazonDynamoDBClientBuilder, AmazonDynamoDBStreamsClient} +import com.amazonaws.services.dynamodbv2.model.DescribeTableRequest +import com.amazonaws.services.dynamodbv2.streamsadapter.{AmazonDynamoDBStreamsAdapterClient, StreamsWorkerFactory} +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{KinesisClientLibConfiguration, Worker} import com.amazonaws.services.kinesis.model.Record @@ -81,7 +86,7 @@ import org.apache.spark.util.Utils * AWSCredentialsProvider passed to the KCL to authorize DynamoDB API calls. * Will use kinesisCreds if value is None. */ -private[kinesis] class KinesisReceiver[T]( +private[kinesis] class KinesisDynamoDBReceiver[T]( val streamName: String, endpointUrl: String, regionName: String, @@ -132,7 +137,7 @@ private[kinesis] class KinesisReceiver[T]( /** * The centralized kinesisCheckpointer that checkpoints based on the given checkpointInterval. */ - @volatile private var kinesisCheckpointer: KinesisCheckpointer = null + @volatile private var kinesisCheckpointer: KinesisDynamoDBCheckpointer = null /** * Latest sequence number ranges that have been stored successfully. @@ -148,13 +153,27 @@ private[kinesis] class KinesisReceiver[T]( workerId = Utils.localHostName() + ":" + UUID.randomUUID() - kinesisCheckpointer = new KinesisCheckpointer(receiver, checkpointInterval, workerId) + kinesisCheckpointer = + new KinesisDynamoDBCheckpointer(receiver, checkpointInterval, workerId) val kinesisProvider = kinesisCreds.provider + val dynamoDBClient = + AmazonDynamoDBClientBuilder + .standard() + .withCredentials(dynamoDBCreds.fold(kinesisProvider)(_.provider)) + .withRegion(regionName) + .build() + + val actualStreamName = + dynamoDBClient + .describeTable(new DescribeTableRequest(streamName)) + .getTable + .getLatestStreamArn + val kinesisClientLibConfiguration = { val baseClientLibConfiguration = new KinesisClientLibConfiguration( checkpointAppName, - streamName, + actualStreamName, kinesisProvider, dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider), cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider), @@ -162,6 +181,10 @@ private[kinesis] class KinesisReceiver[T]( .withKinesisEndpoint(endpointUrl) .withTaskBackoffTimeMillis(500) .withRegionName(regionName) + .withIdleTimeBetweenReadsInMillis(500) + .withMaxRecords(1000) + .withFailoverTimeMillis(60000) + .withParentShardPollIntervalMillis(10000) // Update the Kinesis client lib config with timestamp // if InitialPositionInStream.AT_TIMESTAMP is passed @@ -179,12 +202,36 @@ private[kinesis] class KinesisReceiver[T]( * IRecordProcessor.processRecords() method. * We're using our custom KinesisRecordProcessor in this case. */ - val recordProcessorFactory = new IRecordProcessorFactory { - override def createProcessor: IRecordProcessor = - new KinesisRecordProcessor(receiver, workerId) + val recordProcessorFactory = new v2.IRecordProcessorFactory { + override def createProcessor(): v2.IRecordProcessor = + new V1ToV2RecordProcessor(new KinesisDynamoDBRecordProcessor(receiver, workerId)) + } + + worker = { + val streamsAdapter = new AmazonDynamoDBStreamsAdapterClient( + AmazonDynamoDBStreamsClient.builder() + .withCredentials(kinesisProvider) + .withRegion(regionName) + .build() + ) + + val cloudWatchClient = AmazonCloudWatchClientBuilder.standard + .withCredentials(cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider)) + .withRegion(regionName) + .withClientConfiguration( + kinesisClientLibConfiguration.getCloudWatchClientConfiguration + ) + .build + + StreamsWorkerFactory.createDynamoDbStreamsWorker( + recordProcessorFactory, + kinesisClientLibConfiguration, + streamsAdapter, + dynamoDBClient, + cloudWatchClient + ) } - worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration) workerThread = new Thread() { override def run(): Unit = { try { diff --git a/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDynamoDBRecordProcessor.scala similarity index 98% rename from spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala rename to spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDynamoDBRecordProcessor.scala index 8c6a399d..f3fba11e 100644 --- a/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala +++ b/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDynamoDBRecordProcessor.scala @@ -39,7 +39,7 @@ import org.apache.spark.internal.Logging * @param receiver Kinesis receiver * @param workerId for logging purposes */ -private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], workerId: String) +private[kinesis] class KinesisDynamoDBRecordProcessor[T](receiver: KinesisDynamoDBReceiver[T], workerId: String) extends IRecordProcessor with Logging { // shardId populated during initialize() diff --git a/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/V1ToV2RecordProcessor.scala b/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/V1ToV2RecordProcessor.scala new file mode 100644 index 00000000..3832905d --- /dev/null +++ b/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/V1ToV2RecordProcessor.scala @@ -0,0 +1,20 @@ +package org.apache.spark.streaming.kinesis + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2 +import com.amazonaws.services.kinesis.clientlibrary.types.{InitializationInput, ProcessRecordsInput, ShutdownInput} + +class V1ToV2RecordProcessor(v1RecordProcessor: IRecordProcessor) extends v2.IRecordProcessor { + + def initialize(initializationInput: InitializationInput): Unit = + v1RecordProcessor.initialize(initializationInput.getShardId) + + def processRecords(processRecordsInput: ProcessRecordsInput): Unit = { + println(s"PROCESSING RECORDS ${processRecordsInput}") + v1RecordProcessor.processRecords(processRecordsInput.getRecords, processRecordsInput.getCheckpointer) + } + + def shutdown(shutdownInput: ShutdownInput): Unit = + v1RecordProcessor.shutdown(shutdownInput.getCheckpointer, shutdownInput.getShutdownReason) + +}