Skip to content

Commit

Permalink
Make streaming work
Browse files Browse the repository at this point in the history
Saving the initial data to the target database works, but saving additional changes fails with an exception “org.apache.hadoop.io.Text is not Serializable”.

I do not fully understand the difference between the initial data and the streamed changes that causes the error to happen. The only difference is that the initial data is read by the connector, whereas the streamed changes are created by us. I changed the way we create it to mix-in the `Serializable` interface in it.

This change allowed me to successfully run a data migration with stream changes enabled. Such a scenario can not be added to our test suite though because KCL only works with the real AWS servers (see #113)
  • Loading branch information
julienrf committed Mar 13, 2024
1 parent 1db4c85 commit 359a0e1
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 16 deletions.
14 changes: 12 additions & 2 deletions migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ object DynamoUtils {

def enableDynamoStream(source: SourceSettings.DynamoDB): Unit = {
val sourceClient = buildDynamoClient(source.endpoint, source.credentials, source.region)
val sourceStreamsClient = buildDynamoStreamsClient(source.credentials, source.region)
val sourceStreamsClient =
buildDynamoStreamsClient(source.endpoint, source.credentials, source.region)

sourceClient
.updateTable(
Expand Down Expand Up @@ -114,9 +115,18 @@ object DynamoUtils {
builder.build()
}

def buildDynamoStreamsClient(creds: Option[AWSCredentialsProvider], region: Option[String]) = {
def buildDynamoStreamsClient(endpoint: Option[DynamoDBEndpoint],
creds: Option[AWSCredentialsProvider],
region: Option[String]) = {
val builder = AmazonDynamoDBStreamsClientBuilder.standard()

endpoint.foreach { endpoint =>
builder
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(
endpoint.renderEndpoint,
region.getOrElse("empty")))
}
creds.foreach(builder.withCredentials)
region.foreach(builder.withRegion)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,25 +67,31 @@ object DynamoStreamReplication {
.foreachRDD { msgs =>
val rdd = msgs
.collect {
case Some(item) => (new Text(), new DynamoDBItemWritable(item))
case Some(item) =>
((new Text() with Serializable): Text, new DynamoDBItemWritable(item))
}
.repartition(Runtime.getRuntime.availableProcessors() * 2)

log.info("Changes to be applied:")
rdd
.map(_._2) // Remove keys because they are not serializable
.groupBy { itemWritable =>
itemWritable.getItem.get(operationTypeColumn) match {
case `putOperation` => "UPSERT"
case `deleteOperation` => "DELETE"
case _ => "UNKNOWN"
val changes =
rdd
.map(_._2) // Remove keys because they are not serializable
.groupBy { itemWritable =>
itemWritable.getItem.get(operationTypeColumn) match {
case `putOperation` => "UPSERT"
case `deleteOperation` => "DELETE"
case _ => "UNKNOWN"
}
}
.mapValues(_.size)
.collect()
if (changes.nonEmpty) {
log.info("Changes to be applied:")
for ((operation, count) <- changes) {
log.info(s"${operation}: ${count}")
}
.mapValues(_.size)
.foreach {
case (operation, count) =>
log.info(s"${operation}: ${count}")
}
} else {
log.info("No changes to apply")
}

DynamoDB.writeRDD(target, renames, rdd, Some(targetTableDesc))(spark)
}
Expand Down

0 comments on commit 359a0e1

Please sign in to comment.