diff --git a/migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala b/migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala index 3128791e..839b10c3 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala @@ -64,7 +64,8 @@ object DynamoUtils { def enableDynamoStream(source: SourceSettings.DynamoDB): Unit = { val sourceClient = buildDynamoClient(source.endpoint, source.credentials, source.region) - val sourceStreamsClient = buildDynamoStreamsClient(source.credentials, source.region) + val sourceStreamsClient = + buildDynamoStreamsClient(source.endpoint, source.credentials, source.region) sourceClient .updateTable( @@ -114,9 +115,18 @@ object DynamoUtils { builder.build() } - def buildDynamoStreamsClient(creds: Option[AWSCredentialsProvider], region: Option[String]) = { + def buildDynamoStreamsClient(endpoint: Option[DynamoDBEndpoint], + creds: Option[AWSCredentialsProvider], + region: Option[String]) = { val builder = AmazonDynamoDBStreamsClientBuilder.standard() + endpoint.foreach { endpoint => + builder + .withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration( + endpoint.renderEndpoint, + region.getOrElse("empty"))) + } creds.foreach(builder.withCredentials) region.foreach(builder.withRegion) diff --git a/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala b/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala index 3dd4309c..ca17bf50 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala @@ -66,26 +66,30 @@ object DynamoStreamReplication { } .foreachRDD { msgs => val rdd = msgs - .collect { - case Some(item) => (new Text(), new DynamoDBItemWritable(item)) - } + .collect { case Some(item) => new DynamoDBItemWritable(item) } .repartition(Runtime.getRuntime.availableProcessors() * 2) + .map(item => (new Text, item)) // Create the key after repartitioning to avoid Serialization issues - log.info("Changes to be applied:") - rdd - .map(_._2) // Remove keys because they are not serializable - .groupBy { itemWritable => - itemWritable.getItem.get(operationTypeColumn) match { - case `putOperation` => "UPSERT" - case `deleteOperation` => "DELETE" - case _ => "UNKNOWN" + val changes = + rdd + .map(_._2) // Remove keys because they are not serializable + .groupBy { itemWritable => + itemWritable.getItem.get(operationTypeColumn) match { + case `putOperation` => "UPSERT" + case `deleteOperation` => "DELETE" + case _ => "UNKNOWN" + } } + .mapValues(_.size) + .collect() + if (changes.nonEmpty) { + log.info("Changes to be applied:") + for ((operation, count) <- changes) { + log.info(s"${operation}: ${count}") } - .mapValues(_.size) - .foreach { - case (operation, count) => - log.info(s"${operation}: ${count}") - } + } else { + log.info("No changes to apply") + } DynamoDB.writeRDD(target, renames, rdd, Some(targetTableDesc))(spark) }