Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Embrace the AWS SDK v2 everywhere #157

Merged
merged 3 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ inThisBuild(
lazy val `spark-kinesis-dynamodb` = project.in(file("spark-kinesis-dynamodb")).settings(
libraryDependencies ++= Seq(
("org.apache.spark" %% "spark-streaming-kinesis-asl" % sparkVersion)
.excludeAll(InclExclRule("org.apache.spark", "spark-streaming_2.13")), // For some reason, the Spark dependency is not marked as provided in spark-streaming-kinesis-asl. We exclude it and then add it as provided.
.excludeAll(InclExclRule("org.apache.spark", s"spark-streaming_${scalaBinaryVersion.value}")), // For some reason, the Spark dependency is not marked as provided in spark-streaming-kinesis-asl. We exclude it and then add it as provided.
"org.apache.spark" %% "spark-streaming" % sparkVersion % Provided,
"com.amazonaws" % "dynamodb-streams-kinesis-adapter" % dynamodbStreamsKinesisAdapterVersion
)
Expand Down Expand Up @@ -99,7 +99,18 @@ lazy val tests = project.in(file("tests")).settings(
"org.apache.hadoop" % "hadoop-client" % hadoopVersion,
"org.scalameta" %% "munit" % "0.7.29"
),
Test / parallelExecution := false
Test / parallelExecution := false,
// Needed to build a Spark session on Java 17+, see https://stackoverflow.com/questions/73465937/apache-spark-3-3-0-breaks-on-java-17-with-cannot-access-class-sun-nio-ch-direct
Test / javaOptions ++= {
val maybeJavaMajorVersion =
sys.props.get("java.version")
.map(version => version.takeWhile(_ != '.').toInt)
if (maybeJavaMajorVersion.exists(_ > 11))
Seq("--add-exports", "java.base/sun.nio.ch=ALL-UNNAMED")
else
Nil
},
Test / fork := true,
).dependsOn(migrator)

lazy val root = project.in(file("."))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,55 +5,16 @@ import com.amazonaws.services.dynamodbv2.model.{ AttributeValue => AttributeValu
import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.services.dynamodb.model.AttributeValue

import java.nio.ByteBuffer
import java.util.stream.Collectors
import java.util.{ Map => JMap }
import scala.jdk.CollectionConverters._

/** Convenient factories to create `AttributeValue` objects */
object AttributeValueUtils {

def binaryValue(bytes: Array[Byte]): AttributeValue =
binaryValue(ByteBuffer.wrap(bytes))

// Fixme simplify
def binaryValue(byteBuffer: ByteBuffer): AttributeValue =
AttributeValue.fromB(SdkBytes.fromByteBuffer(byteBuffer))

def binaryValues(byteBuffers: ByteBuffer*): AttributeValue =
AttributeValue.fromBs(byteBuffers.map(SdkBytes.fromByteBuffer).asJava)

def stringValue(value: String): AttributeValue =
AttributeValue.fromS(value)

def stringValues(values: String*): AttributeValue =
AttributeValue.fromSs(values.asJava)

def numericalValue(value: String): AttributeValue =
AttributeValue.fromN(value)

def numericalValues(values: String*): AttributeValue =
AttributeValue.fromNs(values.asJava)

def boolValue(value: Boolean): AttributeValue =
AttributeValue.fromBool(value)

def listValue(items: AttributeValue*): AttributeValue =
AttributeValue.fromL(items.asJava)

def mapValue(items: (String, AttributeValue)*): AttributeValue =
AttributeValue.fromM(items.toMap.asJava)

def mapValue(items: Map[String, AttributeValue]): AttributeValue =
AttributeValue.fromM(items.asJava)

val nullValue: AttributeValue =
AttributeValue.fromNul(true)

def fromV1(value: AttributeValueV1): AttributeValue =
if (value.getS ne null) stringValue(value.getS)
else if (value.getN ne null) numericalValue(value.getN)
else if (value.getB ne null) binaryValue(value.getB)
if (value.getS ne null) AttributeValue.fromS(value.getS)
else if (value.getN ne null) AttributeValue.fromN(value.getN)
else if (value.getB ne null) AttributeValue.fromB(SdkBytes.fromByteBuffer(value.getB))
else if (value.getSS ne null) AttributeValue.fromSs(value.getSS)
else if (value.getNS ne null) AttributeValue.fromNs(value.getNS)
else if (value.getBS ne null)
Expand All @@ -75,7 +36,7 @@ object AttributeValueUtils {
AttributeValue.fromL(
value.getL.stream().map(fromV1).collect(Collectors.toList[AttributeValue]))
else if (value.getNULL ne null) AttributeValue.fromNul(value.getNULL)
else if (value.getBOOL ne null) boolValue(value.getBOOL)
else if (value.getBOOL ne null) AttributeValue.fromBool(value.getBOOL)
else sys.error("Unable to convert AttributeValue from AWS SDK V1 to V2")

}
65 changes: 33 additions & 32 deletions migrator/src/main/scala/com/scylladb/migrator/AwsUtils.scala
Original file line number Diff line number Diff line change
@@ -1,32 +1,33 @@
package com.scylladb.migrator

import com.amazonaws.auth.{
AWSCredentialsProvider,
AWSStaticCredentialsProvider,
BasicAWSCredentials,
BasicSessionCredentials
import com.scylladb.migrator.config.DynamoDBEndpoint
import software.amazon.awssdk.auth.credentials.{
AwsBasicCredentials,
AwsCredentialsProvider,
AwsSessionCredentials,
StaticCredentialsProvider
}
import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder
import com.amazonaws.services.securitytoken.model.AssumeRoleRequest
import com.scylladb.migrator.config.{ AWSAssumeRole, DynamoDBEndpoint }
import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sts.StsClient
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest

import java.net.URI

object AwsUtils {

/** Configure an AWS SDK builder to use the provided endpoint, region and credentials provider */
def configureClientBuilder[Builder <: AwsClientBuilder[Builder, Client], Client](
builder: AwsClientBuilder[Builder, Client],
maybeEndpoint: Option[DynamoDBEndpoint],
maybeRegion: Option[String],
maybeCredentialsProvider: Option[AWSCredentialsProvider]): builder.type = {
maybeCredentialsProvider: Option[AwsCredentialsProvider]): builder.type = {

for (endpoint <- maybeEndpoint) {
builder.setEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(
endpoint.renderEndpoint,
maybeRegion.getOrElse("empty")))
builder.endpointOverride(new URI(endpoint.renderEndpoint))
}
maybeCredentialsProvider.foreach(builder.setCredentials)
maybeRegion.foreach(builder.setRegion)
maybeCredentialsProvider.foreach(builder.credentialsProvider)
maybeRegion.map(Region.of).foreach(builder.region)

builder
}
Expand All @@ -48,26 +49,26 @@ object AwsUtils {
case Some(role) =>
val stsClient =
configureClientBuilder(
AWSSecurityTokenServiceClientBuilder.standard(),
StsClient.builder(),
endpoint,
region,
Some(
new AWSStaticCredentialsProvider(
new BasicAWSCredentials(
configuredCredentials.accessKey,
configuredCredentials.secretKey)))
StaticCredentialsProvider.create(AwsBasicCredentials
.create(configuredCredentials.accessKey, configuredCredentials.secretKey)))
).build()
val response =
stsClient.assumeRole(
new AssumeRoleRequest()
.withRoleArn(role.arn)
.withRoleSessionName(role.getSessionName)
AssumeRoleRequest
.builder()
.roleArn(role.arn)
.roleSessionName(role.getSessionName)
.build()
)
val assumedCredentials = response.getCredentials
val assumedCredentials = response.credentials
AWSCredentials(
assumedCredentials.getAccessKeyId,
assumedCredentials.getSecretAccessKey,
Some(assumedCredentials.getSessionToken)
assumedCredentials.accessKeyId,
assumedCredentials.secretAccessKey,
Some(assumedCredentials.sessionToken)
)
}
}
Expand All @@ -78,13 +79,13 @@ object AwsUtils {
case class AWSCredentials(accessKey: String, secretKey: String, maybeSessionToken: Option[String]) {

/** Convenient method to use our credentials with the AWS SDK */
def toProvider: AWSCredentialsProvider = {
def toProvider: AwsCredentialsProvider = {
val staticCredentials =
maybeSessionToken match {
case Some(sessionToken) => new BasicSessionCredentials(accessKey, secretKey, sessionToken)
case None => new BasicAWSCredentials(accessKey, secretKey)
case Some(sessionToken) => AwsSessionCredentials.create(accessKey, secretKey, sessionToken)
case None => AwsBasicCredentials.create(accessKey, secretKey)
}
new AWSStaticCredentialsProvider(staticCredentials)
StaticCredentialsProvider.create(staticCredentials)
}

}
Loading
Loading