From 8207897f9ee0c3b548cec494ce85983df5699bd8 Mon Sep 17 00:00:00 2001 From: Julien Richard-Foy Date: Tue, 25 Jun 2024 12:23:21 +0200 Subject: [PATCH] Fix streaming support scylladb/scylla-migrator#155 broke the support of streaming for DynamoDB. This commit fixes the regressions by: - making sure the data carried by RDDs is serializable when it needs to - getting back to the old manual loop to wait for the DynamoDB stream to be enabled because the `Waiter` was not working as expected --- .../com/scylladb/migrator/DynamoUtils.scala | 42 +++++++-------- .../writers/DynamoStreamReplication.scala | 51 +++++++++++++------ .../migrator/alternator/MigratorSuite.scala | 12 +++-- 3 files changed, 61 insertions(+), 44 deletions(-) diff --git a/migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala b/migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala index 9a0be7f5..77df2e67 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala @@ -9,13 +9,11 @@ import software.amazon.awssdk.auth.credentials.{ AwsCredentialsProvider, ProfileCredentialsProvider } -import software.amazon.awssdk.core.waiters.{ Waiter, WaiterAcceptor } import software.amazon.awssdk.services.dynamodb.DynamoDbClient import software.amazon.awssdk.services.dynamodb.model.{ BillingMode, CreateTableRequest, DescribeStreamRequest, - DescribeStreamResponse, DescribeTableRequest, ProvisionedThroughput, ProvisionedThroughputDescription, @@ -113,28 +111,24 @@ object DynamoUtils { .build() ) - val waiterResponse = - Waiter - .builder(classOf[DescribeStreamResponse]) - .addAcceptor( - WaiterAcceptor.successOnResponseAcceptor { (response: DescribeStreamResponse) => - response.streamDescription.streamStatus == StreamStatus.ENABLED - } - ) - .build() - .run(() => { - val tableDesc = - sourceClient.describeTable( - DescribeTableRequest.builder().tableName(source.table).build()) - val latestStreamArn = tableDesc.table.latestStreamArn - sourceStreamsClient.describeStream( - DescribeStreamRequest.builder().streamArn(latestStreamArn).build() - ) - }) - if (waiterResponse.matched.response.isPresent) { - log.info("Stream enabled successfully") - } else { - throw new RuntimeException("Unable to enable streams", waiterResponse.matched.exception.get) + var done = false + while (!done) { + val tableDesc = + sourceClient.describeTable(DescribeTableRequest.builder().tableName(source.table).build()) + val latestStreamArn = tableDesc.table.latestStreamArn + val describeStream = + sourceStreamsClient.describeStream( + DescribeStreamRequest.builder().streamArn(latestStreamArn).build()) + + val streamStatus = describeStream.streamDescription.streamStatus + if (streamStatus == StreamStatus.ENABLED) { + log.info("Stream enabled successfully") + done = true + } else { + log.info( + s"Stream not yet enabled (status ${streamStatus}); waiting for 5 seconds and retrying") + Thread.sleep(5000) + } } } diff --git a/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala b/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala index e070df3e..8b8edcd1 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala @@ -1,6 +1,7 @@ package com.scylladb.migrator.writers import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter +import com.amazonaws.services.dynamodbv2.model.{ AttributeValue => AttributeValueV1 } import com.scylladb.migrator.AttributeValueUtils import com.scylladb.migrator.config.{ AWSCredentials, SourceSettings, TargetSettings } import org.apache.hadoop.dynamodb.DynamoDBItemWritable @@ -13,16 +14,21 @@ import org.apache.spark.streaming.kinesis.{ KinesisInitialPositions, SparkAWSCredentials } -import software.amazon.awssdk.services.dynamodb.model.{ AttributeValue, TableDescription } +import software.amazon.awssdk.services.dynamodb.model.TableDescription import java.util +import java.util.stream.Collectors object DynamoStreamReplication { val log = LogManager.getLogger("com.scylladb.migrator.writers.DynamoStreamReplication") + // We enrich the table items with a column `operationTypeColumn` describing the type of change + // applied to the item. + // We have to deal with multiple representation of the data because `spark-kinesis-dynamodb` + // uses the AWS SDK V1, whereas `emr-dynamodb-hadoop` uses the AWS SDK V2 private val operationTypeColumn = "_dynamo_op_type" - private val putOperation = AttributeValue.fromBool(true) - private val deleteOperation = AttributeValue.fromBool(false) + private val putOperation = new AttributeValueV1().withBOOL(true) + private val deleteOperation = new AttributeValueV1().withBOOL(false) def createDStream(spark: SparkSession, streamingContext: StreamingContext, @@ -39,17 +45,13 @@ object DynamoStreamReplication { messageHandler = { case recAdapter: RecordAdapter => val rec = recAdapter.getInternalObject - val newMap = new util.HashMap[String, AttributeValue]() + val newMap = new util.HashMap[String, AttributeValueV1]() if (rec.getDynamodb.getNewImage ne null) { - rec.getDynamodb.getNewImage.forEach { (key, value) => - newMap.put(key, AttributeValueUtils.fromV1(value)) - } + newMap.putAll(rec.getDynamodb.getNewImage) } - rec.getDynamodb.getKeys.forEach { (key, value) => - newMap.put(key, AttributeValueUtils.fromV1(value)) - } + newMap.putAll(rec.getDynamodb.getKeys) val operationType = rec.getEventName match { @@ -73,15 +75,13 @@ object DynamoStreamReplication { }.orNull ).foreachRDD { msgs => val rdd = msgs - .collect { case Some(item) => new DynamoDBItemWritable(item) } + .collect { case Some(item) => item: util.Map[String, AttributeValueV1] } .repartition(Runtime.getRuntime.availableProcessors() * 2) - .map(item => (new Text, item)) // Create the key after repartitioning to avoid Serialization issues val changes = rdd - .map(_._2) // Remove keys because they are not serializable - .groupBy { itemWritable => - itemWritable.getItem.get(operationTypeColumn) match { + .groupBy { item => + item.get(operationTypeColumn) match { case `putOperation` => "UPSERT" case `deleteOperation` => "DELETE" case _ => "UNKNOWN" @@ -98,7 +98,26 @@ object DynamoStreamReplication { log.info("No changes to apply") } - DynamoDB.writeRDD(target, renamesMap, rdd, targetTableDesc)(spark) + val writableRdd = + rdd.map { item => + ( + new Text, + new DynamoDBItemWritable( + item + .entrySet() + .stream() + .collect( + Collectors.toMap( + (e: util.Map.Entry[String, AttributeValueV1]) => e.getKey, + (e: util.Map.Entry[String, AttributeValueV1]) => + AttributeValueUtils.fromV1(e.getValue) + ) + ) + ) + ) + } + + DynamoDB.writeRDD(target, renamesMap, writableRdd, targetTableDesc)(spark) } } diff --git a/tests/src/test/scala/com/scylladb/migrator/alternator/MigratorSuite.scala b/tests/src/test/scala/com/scylladb/migrator/alternator/MigratorSuite.scala index 82f1c89e..d9fe6318 100644 --- a/tests/src/test/scala/com/scylladb/migrator/alternator/MigratorSuite.scala +++ b/tests/src/test/scala/com/scylladb/migrator/alternator/MigratorSuite.scala @@ -89,17 +89,21 @@ trait MigratorSuite extends munit.FunSuite { database.deleteTable(DeleteTableRequest.builder().tableName(name).build()).ensuring { result => result.sdkHttpResponse().isSuccessful } - val waiterResponse = + val maybeFailure = database .waiter() .waitUntilTableNotExists(describeTableRequest(name)) - assert(waiterResponse.matched().response().isPresent, s"Failed to delete table ${name}: ${waiterResponse.matched().exception().get()}") + .matched() + .exception() + if (maybeFailure.isPresent) { + throw maybeFailure.get() + } } catch { case _: ResourceNotFoundException => - // OK, the table was not existing + // OK, the table was not existing or the waiter completed with the ResourceNotFoundException () case any: Throwable => - fail(s"Something did not work as expected: ${any}") + fail(s"Failed to delete table ${name}", any) } /** Check that the table schema in the target database is the same as in the source database */