Skip to content

Commit

Permalink
Embrace AWS SDK v2 everywhere
Browse files Browse the repository at this point in the history
- Use v2 clients both in the migrator and in the tests
- Use Waiters instead of `Thread.sleep`
- Add more AWS configuration properties as required by the S3 and DynamoDB clients
  • Loading branch information
julienrf committed Jun 25, 2024
1 parent 0bd6fc2 commit c38f582
Show file tree
Hide file tree
Showing 23 changed files with 404 additions and 343 deletions.
13 changes: 12 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,18 @@ lazy val tests = project.in(file("tests")).settings(
"org.apache.hadoop" % "hadoop-client" % hadoopVersion,
"org.scalameta" %% "munit" % "0.7.29"
),
Test / parallelExecution := false
Test / parallelExecution := false,
// Needed to build a Spark session on Java 17+, see https://stackoverflow.com/questions/73465937/apache-spark-3-3-0-breaks-on-java-17-with-cannot-access-class-sun-nio-ch-direct
Test / javaOptions ++= {
val maybeJavaMajorVersion =
sys.props.get("java.version")
.map(version => version.takeWhile(_ != '.').toInt)
if (maybeJavaMajorVersion.exists(_ > 11))
Seq("--add-exports", "java.base/sun.nio.ch=ALL-UNNAMED")
else
Nil
},
Test / fork := true,
).dependsOn(migrator)

lazy val root = project.in(file("."))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,55 +5,16 @@ import com.amazonaws.services.dynamodbv2.model.{ AttributeValue => AttributeValu
import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.services.dynamodb.model.AttributeValue

import java.nio.ByteBuffer
import java.util.stream.Collectors
import java.util.{ Map => JMap }
import scala.jdk.CollectionConverters._

/** Convenient factories to create `AttributeValue` objects */
object AttributeValueUtils {

def binaryValue(bytes: Array[Byte]): AttributeValue =
binaryValue(ByteBuffer.wrap(bytes))

// Fixme simplify
def binaryValue(byteBuffer: ByteBuffer): AttributeValue =
AttributeValue.fromB(SdkBytes.fromByteBuffer(byteBuffer))

def binaryValues(byteBuffers: ByteBuffer*): AttributeValue =
AttributeValue.fromBs(byteBuffers.map(SdkBytes.fromByteBuffer).asJava)

def stringValue(value: String): AttributeValue =
AttributeValue.fromS(value)

def stringValues(values: String*): AttributeValue =
AttributeValue.fromSs(values.asJava)

def numericalValue(value: String): AttributeValue =
AttributeValue.fromN(value)

def numericalValues(values: String*): AttributeValue =
AttributeValue.fromNs(values.asJava)

def boolValue(value: Boolean): AttributeValue =
AttributeValue.fromBool(value)

def listValue(items: AttributeValue*): AttributeValue =
AttributeValue.fromL(items.asJava)

def mapValue(items: (String, AttributeValue)*): AttributeValue =
AttributeValue.fromM(items.toMap.asJava)

def mapValue(items: Map[String, AttributeValue]): AttributeValue =
AttributeValue.fromM(items.asJava)

val nullValue: AttributeValue =
AttributeValue.fromNul(true)

def fromV1(value: AttributeValueV1): AttributeValue =
if (value.getS ne null) stringValue(value.getS)
else if (value.getN ne null) numericalValue(value.getN)
else if (value.getB ne null) binaryValue(value.getB)
if (value.getS ne null) AttributeValue.fromS(value.getS)
else if (value.getN ne null) AttributeValue.fromN(value.getN)
else if (value.getB ne null) AttributeValue.fromB(SdkBytes.fromByteBuffer(value.getB))
else if (value.getSS ne null) AttributeValue.fromSs(value.getSS)
else if (value.getNS ne null) AttributeValue.fromNs(value.getNS)
else if (value.getBS ne null)
Expand All @@ -75,7 +36,7 @@ object AttributeValueUtils {
AttributeValue.fromL(
value.getL.stream().map(fromV1).collect(Collectors.toList[AttributeValue]))
else if (value.getNULL ne null) AttributeValue.fromNul(value.getNULL)
else if (value.getBOOL ne null) boolValue(value.getBOOL)
else if (value.getBOOL ne null) AttributeValue.fromBool(value.getBOOL)
else sys.error("Unable to convert AttributeValue from AWS SDK V1 to V2")

}
65 changes: 33 additions & 32 deletions migrator/src/main/scala/com/scylladb/migrator/AwsUtils.scala
Original file line number Diff line number Diff line change
@@ -1,32 +1,33 @@
package com.scylladb.migrator

import com.amazonaws.auth.{
AWSCredentialsProvider,
AWSStaticCredentialsProvider,
BasicAWSCredentials,
BasicSessionCredentials
import com.scylladb.migrator.config.DynamoDBEndpoint
import software.amazon.awssdk.auth.credentials.{
AwsBasicCredentials,
AwsCredentialsProvider,
AwsSessionCredentials,
StaticCredentialsProvider
}
import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder
import com.amazonaws.services.securitytoken.model.AssumeRoleRequest
import com.scylladb.migrator.config.{ AWSAssumeRole, DynamoDBEndpoint }
import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sts.StsClient
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest

import java.net.URI

object AwsUtils {

/** Configure an AWS SDK builder to use the provided endpoint, region and credentials provider */
def configureClientBuilder[Builder <: AwsClientBuilder[Builder, Client], Client](
builder: AwsClientBuilder[Builder, Client],
maybeEndpoint: Option[DynamoDBEndpoint],
maybeRegion: Option[String],
maybeCredentialsProvider: Option[AWSCredentialsProvider]): builder.type = {
maybeCredentialsProvider: Option[AwsCredentialsProvider]): builder.type = {

for (endpoint <- maybeEndpoint) {
builder.setEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(
endpoint.renderEndpoint,
maybeRegion.getOrElse("empty")))
builder.endpointOverride(new URI(endpoint.renderEndpoint))
}
maybeCredentialsProvider.foreach(builder.setCredentials)
maybeRegion.foreach(builder.setRegion)
maybeCredentialsProvider.foreach(builder.credentialsProvider)
maybeRegion.map(Region.of).foreach(builder.region)

builder
}
Expand All @@ -48,26 +49,26 @@ object AwsUtils {
case Some(role) =>
val stsClient =
configureClientBuilder(
AWSSecurityTokenServiceClientBuilder.standard(),
StsClient.builder(),
endpoint,
region,
Some(
new AWSStaticCredentialsProvider(
new BasicAWSCredentials(
configuredCredentials.accessKey,
configuredCredentials.secretKey)))
StaticCredentialsProvider.create(AwsBasicCredentials
.create(configuredCredentials.accessKey, configuredCredentials.secretKey)))
).build()
val response =
stsClient.assumeRole(
new AssumeRoleRequest()
.withRoleArn(role.arn)
.withRoleSessionName(role.getSessionName)
AssumeRoleRequest
.builder()
.roleArn(role.arn)
.roleSessionName(role.getSessionName)
.build()
)
val assumedCredentials = response.getCredentials
val assumedCredentials = response.credentials
AWSCredentials(
assumedCredentials.getAccessKeyId,
assumedCredentials.getSecretAccessKey,
Some(assumedCredentials.getSessionToken)
assumedCredentials.accessKeyId,
assumedCredentials.secretAccessKey,
Some(assumedCredentials.sessionToken)
)
}
}
Expand All @@ -78,13 +79,13 @@ object AwsUtils {
case class AWSCredentials(accessKey: String, secretKey: String, maybeSessionToken: Option[String]) {

/** Convenient method to use our credentials with the AWS SDK */
def toProvider: AWSCredentialsProvider = {
def toProvider: AwsCredentialsProvider = {
val staticCredentials =
maybeSessionToken match {
case Some(sessionToken) => new BasicSessionCredentials(accessKey, secretKey, sessionToken)
case None => new BasicAWSCredentials(accessKey, secretKey)
case Some(sessionToken) => AwsSessionCredentials.create(accessKey, secretKey, sessionToken)
case None => AwsBasicCredentials.create(accessKey, secretKey)
}
new AWSStaticCredentialsProvider(staticCredentials)
StaticCredentialsProvider.create(staticCredentials)
}

}
Loading

0 comments on commit c38f582

Please sign in to comment.