Skip to content

Commit

Permalink
Fix streaming issue
Browse files Browse the repository at this point in the history
The RDD keys are not serializable, which can fail some RDD operations.

We create the RDD element keys _after_ repartitioning to avoid them being serialized across partitions.

This change allowed me to successfully run a data migration with stream changes enabled. Such a scenario can not be added to our test suite though because KCL only works with the real AWS servers (see scylladb#113)
  • Loading branch information
julienrf committed Mar 13, 2024
1 parent 1db4c85 commit afcbfd8
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 18 deletions.
14 changes: 12 additions & 2 deletions migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ object DynamoUtils {

def enableDynamoStream(source: SourceSettings.DynamoDB): Unit = {
val sourceClient = buildDynamoClient(source.endpoint, source.credentials, source.region)
val sourceStreamsClient = buildDynamoStreamsClient(source.credentials, source.region)
val sourceStreamsClient =
buildDynamoStreamsClient(source.endpoint, source.credentials, source.region)

sourceClient
.updateTable(
Expand Down Expand Up @@ -114,9 +115,18 @@ object DynamoUtils {
builder.build()
}

def buildDynamoStreamsClient(creds: Option[AWSCredentialsProvider], region: Option[String]) = {
def buildDynamoStreamsClient(endpoint: Option[DynamoDBEndpoint],
creds: Option[AWSCredentialsProvider],
region: Option[String]) = {
val builder = AmazonDynamoDBStreamsClientBuilder.standard()

endpoint.foreach { endpoint =>
builder
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(
endpoint.renderEndpoint,
region.getOrElse("empty")))
}
creds.foreach(builder.withCredentials)
region.foreach(builder.withRegion)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,26 +66,30 @@ object DynamoStreamReplication {
}
.foreachRDD { msgs =>
val rdd = msgs
.collect {
case Some(item) => (new Text(), new DynamoDBItemWritable(item))
}
.collect { case Some(item) => new DynamoDBItemWritable(item) }
.repartition(Runtime.getRuntime.availableProcessors() * 2)
.map(item => (new Text, item)) // Create the key after repartitioning to avoid Serialization issues

log.info("Changes to be applied:")
rdd
.map(_._2) // Remove keys because they are not serializable
.groupBy { itemWritable =>
itemWritable.getItem.get(operationTypeColumn) match {
case `putOperation` => "UPSERT"
case `deleteOperation` => "DELETE"
case _ => "UNKNOWN"
val changes =
rdd
.map(_._2) // Remove keys because they are not serializable
.groupBy { itemWritable =>
itemWritable.getItem.get(operationTypeColumn) match {
case `putOperation` => "UPSERT"
case `deleteOperation` => "DELETE"
case _ => "UNKNOWN"
}
}
.mapValues(_.size)
.collect()
if (changes.nonEmpty) {
log.info("Changes to be applied:")
for ((operation, count) <- changes) {
log.info(s"${operation}: ${count}")
}
.mapValues(_.size)
.foreach {
case (operation, count) =>
log.info(s"${operation}: ${count}")
}
} else {
log.info("No changes to apply")
}

DynamoDB.writeRDD(target, renames, rdd, Some(targetTableDesc))(spark)
}
Expand Down

0 comments on commit afcbfd8

Please sign in to comment.