Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests that access AWS #208

Merged
merged 6 commits into from
Aug 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions .github/workflows/tests-aws.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
name: Tests with AWS
on:
push:
branches:
- master
paths:
- '**.scala'
- '**.sbt'
workflow_dispatch:

env:
AWS_REGION: us-east-1

permissions:
id-token: write
contents: read

jobs:
test:
name: Test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Cache Docker images
uses: ScribeMD/[email protected]
with:
key: docker-${{ runner.os }}-${{ hashFiles('docker-compose-tests.yml') }}
- uses: actions/setup-java@v4
with:
distribution: temurin
java-version: 8
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we compile with jdk 11

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait, ok, it was blocked due to spark 2.x
ok, so maybe we can switch to jdk 11 with spark 3.x

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not really important to be support Java 8, but if we can, why not do it? Using Java 8 in the CI ensures that we at least support Java 8. On the other hand, that could also prevent us from detecting possible incompatibilities with higher versions of the JVM…
But anyway, that should be a separate issue/PR :)

cache: sbt
- name: configure aws credentials
uses: aws-actions/configure-aws-credentials@v4
with:
aws-region: ${{ env.AWS_REGION }}
role-to-assume: ${{ secrets.AWS_ROLE_ARN }}
role-session-name: GitHub_to_AWS_via_FederatedOIDC
- name: Build migrator
run: ./build.sh
- name: Set up services
run: |
docker compose -f docker-compose-tests.yml up -d scylla spark-master spark-worker
.github/wait-for-port.sh 8000 # ScyllaDB Alternator
.github/wait-for-cql.sh scylla
.github/wait-for-port.sh 8080 # Spark master
.github/wait-for-port.sh 8081 # Spark worker
- name: Run tests accessing AWS
run: sbt "testOnly -- --include-categories=com.scylladb.migrator.AWS"
- name: Stop services
run: docker compose -f docker-compose-tests.yml down
10 changes: 8 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ on:
push:
branches:
- master
paths:
- '**.scala'
- '**.sbt'
pull_request:
paths:
- '**.scala'
- '**.sbt'

jobs:
test:
Expand Down Expand Up @@ -33,7 +39,7 @@ jobs:
.github/wait-for-cql.sh scylla-source
.github/wait-for-port.sh 8080 # Spark master
.github/wait-for-port.sh 8081 # Spark worker
- name: Run tests
run: sbt test
- name: Run tests locally
run: sbt "testOnly -- --exclude-categories=com.scylladb.migrator.AWS"
- name: Stop services
run: docker compose -f docker-compose-tests.yml down
8 changes: 7 additions & 1 deletion .github/workflows/tutorial-dynamodb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ on:
push:
branches:
- master
paths:
- '**.scala'
- '**.sbt'
pull_request:
paths:
- '**.scala'
- '**.sbt'

env:
TUTORIAL_DIR: docs/source/tutorials/dynamodb-to-scylladb-alternator
Expand All @@ -17,7 +23,7 @@ jobs:
- name: Cache Docker images
uses: ScribeMD/[email protected]
with:
key: docker-${{ runner.os }}-${{ hashFiles('docker-compose-tests.yml') }}
key: docker-${{ runner.os }}-${{ hashFiles('docs/source/tutorials/dynamodb-to-scylladb-alternator/docker-compose.yaml') }}
- uses: actions/setup-java@v4
with:
distribution: temurin
Expand Down
11 changes: 9 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ Tests are implemented in the `tests` sbt submodule. They simulate the submission
docker compose -f docker-compose-tests.yml up
~~~

3. Run the tests
3. Run the tests locally

~~~ sh
sbt test
sbt "testOnly -- --exclude-categories=com.scylladb.migrator.AWS"
~~~

Or, to run a single test:
Expand All @@ -28,6 +28,13 @@ Tests are implemented in the `tests` sbt submodule. They simulate the submission
sbt testOnly com.scylladb.migrator.BasicMigrationTest
~~~

Or, to run the tests that access AWS, first configure your AWS credentials with `aws configure`, and then:

~~~ sh
AWS_REGION=us-east-1 \
sbt "testOnly -- --include-categories=com.scylladb.migrator.AWS"
~~~

4. Ultimately, stop the Docker containers

~~~ sh
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ lazy val tests = project.in(file("tests")).settings(
"org.apache.cassandra" % "java-driver-query-builder" % "4.18.0",
"com.github.mjakubowski84" %% "parquet4s-core" % "1.9.4",
"org.apache.hadoop" % "hadoop-client" % hadoopVersion,
"org.scalameta" %% "munit" % "0.7.29"
"org.scalameta" %% "munit" % "1.0.1"
),
Test / parallelExecution := false,
// Needed to build a Spark session on Java 17+, see https://stackoverflow.com/questions/73465937/apache-spark-3-3-0-breaks-on-java-17-with-cannot-access-class-sun-nio-ch-direct
Expand Down
2 changes: 2 additions & 0 deletions docker-compose-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ services:
- ./migrator/target/scala-2.13:/jars
- ./tests/src/test/configurations:/app/configurations
- ./tests/docker/spark-master:/app/savepoints
- ./tests/docker/aws-profile:/root/.aws
- ./tests/docker/parquet:/app/parquet

spark-worker:
Expand All @@ -80,5 +81,6 @@ services:
- 8081:8081
volumes:
- ./tests/docker/parquet:/app/parquet
- ./tests/docker/aws-profile:/root/.aws
depends_on:
- spark-master
8 changes: 3 additions & 5 deletions migrator/src/main/scala/com/scylladb/migrator/Migrator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ object Migrator {
val log = LogManager.getLogger("com.scylladb.migrator")

def main(args: Array[String]): Unit = {
implicit val spark = SparkSession
implicit val spark: SparkSession = SparkSession
.builder()
.appName("scylla-migrator")
.config("spark.task.maxFailures", "1024")
.config("spark.stage.maxConsecutiveAttempts", "60")
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.getOrCreate()

Logger.getRootLogger.setLevel(Level.WARN)
Expand Down Expand Up @@ -46,10 +47,7 @@ object Migrator {
case (
s3Source: SourceSettings.DynamoDBS3Export,
alternatorTarget: TargetSettings.DynamoDB) =>
AlternatorMigrator.migrateFromS3Export(
s3Source,
alternatorTarget,
migratorConfig.renamesMap)
AlternatorMigrator.migrateFromS3Export(s3Source, alternatorTarget, migratorConfig)
case _ =>
sys.error("Unsupported combination of source and target.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ abstract class SavepointsManager(migratorConfig: MigratorConfig) extends AutoClo

val log = LogManager.getLogger(this.getClass.getName)
private val scheduler = new ScheduledThreadPoolExecutor(1)
private var oldUsr2Handler: SignalHandler = _
private var oldTermHandler: SignalHandler = _
private var oldIntHandler: SignalHandler = _

createSavepointsDirectory()
addUSR2Handler()
Expand All @@ -50,13 +53,15 @@ abstract class SavepointsManager(migratorConfig: MigratorConfig) extends AutoClo
"Installing SIGINT/TERM/USR2 handler. Send this to dump the current progress to a savepoint.")

val handler = new SignalHandler {
override def handle(signal: Signal): Unit =
override def handle(signal: Signal): Unit = {
dumpMigrationState(signal.toString)
sys.exit(0)
}
}

Signal.handle(new Signal("USR2"), handler)
Signal.handle(new Signal("TERM"), handler)
Signal.handle(new Signal("INT"), handler)
oldUsr2Handler = Signal.handle(new Signal("USR2"), handler)
oldTermHandler = Signal.handle(new Signal("TERM"), handler)
oldIntHandler = Signal.handle(new Signal("INT"), handler)
}

private def startSavepointSchedule(): Unit = {
Expand Down Expand Up @@ -99,8 +104,12 @@ abstract class SavepointsManager(migratorConfig: MigratorConfig) extends AutoClo
/**
* Stop the periodic creation of savepoints and release the associated resources.
*/
final def close(): Unit =
def close(): Unit = {
scheduler.shutdown()
Signal.handle(new Signal("USR2"), oldUsr2Handler)
Signal.handle(new Signal("TERM"), oldTermHandler)
Signal.handle(new Signal("INT"), oldIntHandler)
}

/**
* Provide readable logs by describing which parts of the migration have been completed already.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,13 @@ object AlternatorMigrator {
migratorConfig: MigratorConfig)(implicit spark: SparkSession): Unit = {
val (sourceRDD, sourceTableDesc) =
readers.DynamoDB.readRDD(spark, source, migratorConfig.skipSegments)
val savepointsManager =
DynamoDbSavepointsManager.setup(migratorConfig, sourceRDD, spark.sparkContext)
Using.resource(savepointsManager) { _ =>
val maybeStreamedSource = if (target.streamChanges) Some(source) else None
migrate(sourceRDD, sourceTableDesc, maybeStreamedSource, target, migratorConfig.renamesMap)
}
val maybeStreamedSource = if (target.streamChanges) Some(source) else None
migrate(sourceRDD, sourceTableDesc, maybeStreamedSource, target, migratorConfig)
}

def migrateFromS3Export(source: SourceSettings.DynamoDBS3Export,
target: TargetSettings.DynamoDB,
renamesMap: Map[String, String])(implicit spark: SparkSession): Unit = {
migratorConfig: MigratorConfig)(implicit spark: SparkSession): Unit = {
val (sourceRDD, sourceTableDesc) = readers.DynamoDBS3Export.readRDD(source)(spark.sparkContext)
// Adapt the decoded items to the format expected by the EMR Hadoop connector
val normalizedRDD =
Expand All @@ -43,27 +39,25 @@ object AlternatorMigrator {
if (target.streamChanges) {
log.warn("'streamChanges: true' is not supported when the source is a DynamoDB S3 export.")
}
migrate(normalizedRDD, sourceTableDesc, None, target, renamesMap)
migrate(normalizedRDD, sourceTableDesc, None, target, migratorConfig)
}

/**
* @param sourceRDD Data to migrate
* @param sourceTableDesc Description of the table to replicate on the target database
* @param maybeStreamedSource Settings of the source table in case `streamChanges` was `true`
* @param target Target table settings
* @param renamesMap Renames
* @param migratorConfig The complete original configuration
* @param spark Spark session
*/
def migrate(sourceRDD: RDD[(Text, DynamoDBItemWritable)],
sourceTableDesc: TableDescription,
maybeStreamedSource: Option[SourceSettings.DynamoDB],
target: TargetSettings.DynamoDB,
renamesMap: Map[String, String])(implicit spark: SparkSession): Unit = {
migratorConfig: MigratorConfig)(implicit spark: SparkSession): Unit = {

log.info("We need to transfer: " + sourceRDD.getNumPartitions + " partitions in total")

log.info("Starting write...")

try {
val targetTableDesc = {
for (streamedSource <- maybeStreamedSource) {
Expand All @@ -81,7 +75,11 @@ object AlternatorMigrator {
if (target.streamChanges && target.skipInitialSnapshotTransfer.contains(true)) {
log.info("Skip transferring table snapshot")
} else {
writers.DynamoDB.writeRDD(target, renamesMap, sourceRDD, targetTableDesc)
Using.resource(DynamoDbSavepointsManager(migratorConfig, sourceRDD, spark.sparkContext)) {
_ =>
log.info("Starting write...")
writers.DynamoDB.writeRDD(target, migratorConfig.renamesMap, sourceRDD, targetTableDesc)
}
log.info("Done transferring table snapshot")
}

Expand All @@ -95,7 +93,7 @@ object AlternatorMigrator {
streamedSource,
target,
targetTableDesc,
renamesMap)
migratorConfig.renamesMap)

streamingContext.start()
streamingContext.awaitTermination()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import scala.util.{ Failure, Success, Try }
* Manage DynamoDB-based migrations by tracking the migrated scan segments.
*/
class DynamoDbSavepointsManager(migratorConfig: MigratorConfig,
segmentsAccumulator: IntSetAccumulator)
segmentsAccumulator: IntSetAccumulator,
sparkTaskEndListener: SparkListener,
spark: SparkContext)
extends SavepointsManager(migratorConfig) {

def describeMigrationState(): String =
Expand All @@ -26,25 +28,26 @@ class DynamoDbSavepointsManager(migratorConfig: MigratorConfig,
def updateConfigWithMigrationState(): MigratorConfig =
migratorConfig.copy(skipSegments = Some(segmentsAccumulator.value))

override def close(): Unit = {
spark.removeSparkListener(sparkTaskEndListener)
super.close()
}

}

object DynamoDbSavepointsManager {

private val log = LogManager.getLogger(classOf[DynamoDbSavepointsManager])

def apply(migratorConfig: MigratorConfig,
segmentsAccumulator: IntSetAccumulator): DynamoDbSavepointsManager =
new DynamoDbSavepointsManager(migratorConfig, segmentsAccumulator)

/**
* Set up a savepoints manager that tracks the scan segments migrated from the source RDD.
*/
def setup(migratorConfig: MigratorConfig,
def apply(migratorConfig: MigratorConfig,
sourceRDD: RDD[(Text, DynamoDBItemWritable)],
spark: SparkContext): DynamoDbSavepointsManager = {
val segmentsAccumulator =
IntSetAccumulator(migratorConfig.skipSegments.getOrElse(Set.empty))
spark.addSparkListener(new SparkListener {
val sparkTaskEndListener = new SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
val partitionId = taskEnd.taskInfo.partitionId
log.debug(s"Migration of partition ${partitionId} ended: ${taskEnd.reason}.")
Expand All @@ -60,8 +63,9 @@ object DynamoDbSavepointsManager {
}
}
}
})
DynamoDbSavepointsManager(migratorConfig, segmentsAccumulator)
}
spark.addSparkListener(sparkTaskEndListener)
new DynamoDbSavepointsManager(migratorConfig, segmentsAccumulator, sparkTaskEndListener, spark)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ object DynamoDB {
throughputReadPercent,
tableDescription,
maybeTtlDescription,
skipSegments)
skipSegments
)

val rdd =
spark.sparkContext.hadoopRDD(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,18 @@ object DynamoStreamReplication {

case _ => None
},
kinesisCreds = src.credentials.map {
case AWSCredentials(accessKey, secretKey, maybeAssumeRole) =>
val builder =
SparkAWSCredentials.builder
.basicCredentials(accessKey, secretKey)
for (assumeRole <- maybeAssumeRole) {
builder.stsCredentials(assumeRole.arn, assumeRole.getSessionName)
}
builder.build()
}.orNull
kinesisCreds = src.credentials
.map {
case AWSCredentials(accessKey, secretKey, maybeAssumeRole) =>
val builder =
SparkAWSCredentials.builder
.basicCredentials(accessKey, secretKey)
for (assumeRole <- maybeAssumeRole) {
builder.stsCredentials(assumeRole.arn, assumeRole.getSessionName)
}
builder.build()
}
.getOrElse(SparkAWSCredentials.builder.build())
).foreachRDD { msgs =>
val rdd = msgs
.collect { case Some(item) => item: util.Map[String, AttributeValueV1] }
Expand Down
2 changes: 2 additions & 0 deletions tests/docker/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
cassandra/
s3/
scylla/
scylla-source/
spark-master/
4 changes: 4 additions & 0 deletions tests/docker/aws-profile/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Ignore everything in this directory
*
# Except this file
!.gitignore
Loading
Loading