Skip to content

Commit

Permalink
Fix streaming support
Browse files Browse the repository at this point in the history
#155 broke the support of streaming for DynamoDB. This commit fixes the regressions by:

- making sure the data carried by RDDs is serializable when it needs to
- getting back to the old manual loop to wait for the DynamoDB stream to be enabled because the `Waiter` was not working as expected
  • Loading branch information
julienrf committed Jun 25, 2024
1 parent a30d2af commit 8207897
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 44 deletions.
42 changes: 18 additions & 24 deletions migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@ import software.amazon.awssdk.auth.credentials.{
AwsCredentialsProvider,
ProfileCredentialsProvider
}
import software.amazon.awssdk.core.waiters.{ Waiter, WaiterAcceptor }
import software.amazon.awssdk.services.dynamodb.DynamoDbClient
import software.amazon.awssdk.services.dynamodb.model.{
BillingMode,
CreateTableRequest,
DescribeStreamRequest,
DescribeStreamResponse,
DescribeTableRequest,
ProvisionedThroughput,
ProvisionedThroughputDescription,
Expand Down Expand Up @@ -113,28 +111,24 @@ object DynamoUtils {
.build()
)

val waiterResponse =
Waiter
.builder(classOf[DescribeStreamResponse])
.addAcceptor(
WaiterAcceptor.successOnResponseAcceptor { (response: DescribeStreamResponse) =>
response.streamDescription.streamStatus == StreamStatus.ENABLED
}
)
.build()
.run(() => {
val tableDesc =
sourceClient.describeTable(
DescribeTableRequest.builder().tableName(source.table).build())
val latestStreamArn = tableDesc.table.latestStreamArn
sourceStreamsClient.describeStream(
DescribeStreamRequest.builder().streamArn(latestStreamArn).build()
)
})
if (waiterResponse.matched.response.isPresent) {
log.info("Stream enabled successfully")
} else {
throw new RuntimeException("Unable to enable streams", waiterResponse.matched.exception.get)
var done = false
while (!done) {
val tableDesc =
sourceClient.describeTable(DescribeTableRequest.builder().tableName(source.table).build())
val latestStreamArn = tableDesc.table.latestStreamArn
val describeStream =
sourceStreamsClient.describeStream(
DescribeStreamRequest.builder().streamArn(latestStreamArn).build())

val streamStatus = describeStream.streamDescription.streamStatus
if (streamStatus == StreamStatus.ENABLED) {
log.info("Stream enabled successfully")
done = true
} else {
log.info(
s"Stream not yet enabled (status ${streamStatus}); waiting for 5 seconds and retrying")
Thread.sleep(5000)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.scylladb.migrator.writers

import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter
import com.amazonaws.services.dynamodbv2.model.{ AttributeValue => AttributeValueV1 }
import com.scylladb.migrator.AttributeValueUtils
import com.scylladb.migrator.config.{ AWSCredentials, SourceSettings, TargetSettings }
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
Expand All @@ -13,16 +14,21 @@ import org.apache.spark.streaming.kinesis.{
KinesisInitialPositions,
SparkAWSCredentials
}
import software.amazon.awssdk.services.dynamodb.model.{ AttributeValue, TableDescription }
import software.amazon.awssdk.services.dynamodb.model.TableDescription

import java.util
import java.util.stream.Collectors

object DynamoStreamReplication {
val log = LogManager.getLogger("com.scylladb.migrator.writers.DynamoStreamReplication")

// We enrich the table items with a column `operationTypeColumn` describing the type of change
// applied to the item.
// We have to deal with multiple representation of the data because `spark-kinesis-dynamodb`
// uses the AWS SDK V1, whereas `emr-dynamodb-hadoop` uses the AWS SDK V2
private val operationTypeColumn = "_dynamo_op_type"
private val putOperation = AttributeValue.fromBool(true)
private val deleteOperation = AttributeValue.fromBool(false)
private val putOperation = new AttributeValueV1().withBOOL(true)
private val deleteOperation = new AttributeValueV1().withBOOL(false)

def createDStream(spark: SparkSession,
streamingContext: StreamingContext,
Expand All @@ -39,17 +45,13 @@ object DynamoStreamReplication {
messageHandler = {
case recAdapter: RecordAdapter =>
val rec = recAdapter.getInternalObject
val newMap = new util.HashMap[String, AttributeValue]()
val newMap = new util.HashMap[String, AttributeValueV1]()

if (rec.getDynamodb.getNewImage ne null) {
rec.getDynamodb.getNewImage.forEach { (key, value) =>
newMap.put(key, AttributeValueUtils.fromV1(value))
}
newMap.putAll(rec.getDynamodb.getNewImage)
}

rec.getDynamodb.getKeys.forEach { (key, value) =>
newMap.put(key, AttributeValueUtils.fromV1(value))
}
newMap.putAll(rec.getDynamodb.getKeys)

val operationType =
rec.getEventName match {
Expand All @@ -73,15 +75,13 @@ object DynamoStreamReplication {
}.orNull
).foreachRDD { msgs =>
val rdd = msgs
.collect { case Some(item) => new DynamoDBItemWritable(item) }
.collect { case Some(item) => item: util.Map[String, AttributeValueV1] }
.repartition(Runtime.getRuntime.availableProcessors() * 2)
.map(item => (new Text, item)) // Create the key after repartitioning to avoid Serialization issues

val changes =
rdd
.map(_._2) // Remove keys because they are not serializable
.groupBy { itemWritable =>
itemWritable.getItem.get(operationTypeColumn) match {
.groupBy { item =>
item.get(operationTypeColumn) match {
case `putOperation` => "UPSERT"
case `deleteOperation` => "DELETE"
case _ => "UNKNOWN"
Expand All @@ -98,7 +98,26 @@ object DynamoStreamReplication {
log.info("No changes to apply")
}

DynamoDB.writeRDD(target, renamesMap, rdd, targetTableDesc)(spark)
val writableRdd =
rdd.map { item =>
(
new Text,
new DynamoDBItemWritable(
item
.entrySet()
.stream()
.collect(
Collectors.toMap(
(e: util.Map.Entry[String, AttributeValueV1]) => e.getKey,
(e: util.Map.Entry[String, AttributeValueV1]) =>
AttributeValueUtils.fromV1(e.getValue)
)
)
)
)
}

DynamoDB.writeRDD(target, renamesMap, writableRdd, targetTableDesc)(spark)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,21 @@ trait MigratorSuite extends munit.FunSuite {
database.deleteTable(DeleteTableRequest.builder().tableName(name).build()).ensuring { result =>
result.sdkHttpResponse().isSuccessful
}
val waiterResponse =
val maybeFailure =
database
.waiter()
.waitUntilTableNotExists(describeTableRequest(name))
assert(waiterResponse.matched().response().isPresent, s"Failed to delete table ${name}: ${waiterResponse.matched().exception().get()}")
.matched()
.exception()
if (maybeFailure.isPresent) {
throw maybeFailure.get()
}
} catch {
case _: ResourceNotFoundException =>
// OK, the table was not existing
// OK, the table was not existing or the waiter completed with the ResourceNotFoundException
()
case any: Throwable =>
fail(s"Something did not work as expected: ${any}")
fail(s"Failed to delete table ${name}", any)
}

/** Check that the table schema in the target database is the same as in the source database */
Expand Down

0 comments on commit 8207897

Please sign in to comment.