From 494505531c2bfba8d633d794bdd30f3edb28f947 Mon Sep 17 00:00:00 2001 From: Julien Richard-Foy Date: Fri, 23 Aug 2024 14:48:57 +0200 Subject: [PATCH 1/6] Add tests that access AWS - Test authentication via AssumeRole - Test `streamChanges: true` both with and without `skipInitialSnapshotTransfer` --- .github/workflows/tests-aws.yml | 42 ++++ .github/workflows/tests.yml | 10 +- .github/workflows/tutorial-dynamodb.yaml | 8 +- CONTRIBUTING.md | 13 +- build.sbt | 2 +- .../com/scylladb/migrator/Migrator.scala | 3 +- .../scylladb/migrator/readers/DynamoDB.scala | 3 +- .../dynamodb-to-alternator-assume-role.yaml | 25 +++ ...to-alternator-streaming-skip-snapshot.yaml | 26 +++ .../dynamodb-to-alternator-streaming.yaml | 25 +++ .../scala/com/scylladb/migrator/AWS.scala | 4 + .../com/scylladb/migrator/SparkUtils.scala | 17 +- .../migrator/alternator/AssumeRoleTest.scala | 30 +++ .../alternator/BasicMigrationTest.scala | 4 +- .../DynamoDBS3ExportMigrationTest.scala | 6 +- .../migrator/alternator/Issue103Test.scala | 6 +- .../migrator/alternator/MigratorSuite.scala | 132 +++++++++--- .../alternator/RenamedItemsTest.scala | 6 +- .../alternator/SecondaryIndexesTest.scala | 16 +- .../alternator/SkippedItemsTest.scala | 12 +- .../alternator/SkippedSegmentsTest.scala | 8 +- .../alternator/StreamedItemsTest.scala | 192 ++++++++++++++++++ .../migrator/alternator/ValidatorTest.scala | 6 +- .../migrator/scylla/BasicMigrationTest.scala | 4 +- .../migrator/scylla/MigratorSuite.scala | 77 ++++--- .../migrator/scylla/RenamedItemsTest.scala | 4 +- .../ScyllaToScyllaBasicMigrationTest.scala | 4 +- .../migrator/scylla/ValidatorTest.scala | 4 +- 28 files changed, 577 insertions(+), 112 deletions(-) create mode 100644 .github/workflows/tests-aws.yml create mode 100644 tests/src/test/configurations/dynamodb-to-alternator-assume-role.yaml create mode 100644 tests/src/test/configurations/dynamodb-to-alternator-streaming-skip-snapshot.yaml create mode 100644 tests/src/test/configurations/dynamodb-to-alternator-streaming.yaml create mode 100644 tests/src/test/scala/com/scylladb/migrator/AWS.scala create mode 100644 tests/src/test/scala/com/scylladb/migrator/alternator/AssumeRoleTest.scala create mode 100644 tests/src/test/scala/com/scylladb/migrator/alternator/StreamedItemsTest.scala diff --git a/.github/workflows/tests-aws.yml b/.github/workflows/tests-aws.yml new file mode 100644 index 00000000..fc0e807b --- /dev/null +++ b/.github/workflows/tests-aws.yml @@ -0,0 +1,42 @@ +name: Tests with AWS +on: + push: + branches: + - master + paths: + - '**.scala' + - '**.sbt' + workflow_dispatch: + +jobs: + test: + name: Test + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Cache Docker images + uses: ScribeMD/docker-cache@0.5.0 + with: + key: docker-${{ runner.os }}-${{ hashFiles('docker-compose-tests.yml') }} + - uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: 8 + cache: sbt + - name: Build migrator + run: ./build.sh + - name: Set up services + run: | + docker compose -f docker-compose-tests.yml up -d scylla spark-master spark-worker + .github/wait-for-port.sh 8000 # ScyllaDB Alternator + .github/wait-for-cql.sh scylla + .github/wait-for-port.sh 8080 # Spark master + .github/wait-for-port.sh 8081 # Spark worker + - name: Run tests accessing AWS + run: sbt "testOnly -- --include-categories=com.scylladb.migrator.AWS" + env: + AWS_ACCESS_KEY: ${{ secrets.AWS_ACCESS_KEY }} + AWS_SECRET_KEY: ${{ secrets.AWS_SECRET_KEY }} + AWS_ROLE_ARN: ${{ secrets.AWS_ROLE_ARN }} + - name: Stop services + run: docker compose -f docker-compose-tests.yml down diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 5d75b5df..188f49ef 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -3,7 +3,13 @@ on: push: branches: - master + paths: + - '**.scala' + - '**.sbt' pull_request: + paths: + - '**.scala' + - '**.sbt' jobs: test: @@ -33,7 +39,7 @@ jobs: .github/wait-for-cql.sh scylla-source .github/wait-for-port.sh 8080 # Spark master .github/wait-for-port.sh 8081 # Spark worker - - name: Run tests - run: sbt test + - name: Run tests locally + run: sbt "testOnly -- --exclude-categories=com.scylladb.migrator.AWS" - name: Stop services run: docker compose -f docker-compose-tests.yml down diff --git a/.github/workflows/tutorial-dynamodb.yaml b/.github/workflows/tutorial-dynamodb.yaml index 98f044ad..f18e3b7a 100644 --- a/.github/workflows/tutorial-dynamodb.yaml +++ b/.github/workflows/tutorial-dynamodb.yaml @@ -3,7 +3,13 @@ on: push: branches: - master + paths: + - '**.scala' + - '**.sbt' pull_request: + paths: + - '**.scala' + - '**.sbt' env: TUTORIAL_DIR: docs/source/tutorials/dynamodb-to-scylladb-alternator @@ -17,7 +23,7 @@ jobs: - name: Cache Docker images uses: ScribeMD/docker-cache@0.5.0 with: - key: docker-${{ runner.os }}-${{ hashFiles('docker-compose-tests.yml') }} + key: docker-${{ runner.os }}-${{ hashFiles('docs/source/tutorials/dynamodb-to-scylladb-alternator/docker-compose.yaml') }} - uses: actions/setup-java@v4 with: distribution: temurin diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 76288281..3ff11d91 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -16,10 +16,10 @@ Tests are implemented in the `tests` sbt submodule. They simulate the submission docker compose -f docker-compose-tests.yml up ~~~ -3. Run the tests +3. Run the tests locally ~~~ sh - sbt test + sbt "testOnly -- --exclude-categories=com.scylladb.migrator.AWS" ~~~ Or, to run a single test: @@ -28,6 +28,15 @@ Tests are implemented in the `tests` sbt submodule. They simulate the submission sbt testOnly com.scylladb.migrator.BasicMigrationTest ~~~ + Or, to run the tests that access AWS: + + ~~~ sh + AWS_ACCESS_KEY=... \ + AWS_SECRET_KEY=... \ + AWS_ROLE_NAME=... \ + sbt "testOnly -- --include-categories=com.scylladb.migrator.AWS" + ~~~ + 4. Ultimately, stop the Docker containers ~~~ sh diff --git a/build.sbt b/build.sbt index fc918c8c..2926387b 100644 --- a/build.sbt +++ b/build.sbt @@ -86,7 +86,7 @@ lazy val tests = project.in(file("tests")).settings( "org.apache.cassandra" % "java-driver-query-builder" % "4.18.0", "com.github.mjakubowski84" %% "parquet4s-core" % "1.9.4", "org.apache.hadoop" % "hadoop-client" % hadoopVersion, - "org.scalameta" %% "munit" % "0.7.29" + "org.scalameta" %% "munit" % "1.0.1" ), Test / parallelExecution := false, // Needed to build a Spark session on Java 17+, see https://stackoverflow.com/questions/73465937/apache-spark-3-3-0-breaks-on-java-17-with-cannot-access-class-sun-nio-ch-direct diff --git a/migrator/src/main/scala/com/scylladb/migrator/Migrator.scala b/migrator/src/main/scala/com/scylladb/migrator/Migrator.scala index bb01fbd5..fbbca5e9 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/Migrator.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/Migrator.scala @@ -10,11 +10,12 @@ object Migrator { val log = LogManager.getLogger("com.scylladb.migrator") def main(args: Array[String]): Unit = { - implicit val spark = SparkSession + implicit val spark: SparkSession = SparkSession .builder() .appName("scylla-migrator") .config("spark.task.maxFailures", "1024") .config("spark.stage.maxConsecutiveAttempts", "60") + .config("spark.streaming.stopGracefullyOnShutdown", "true") .getOrCreate() Logger.getRootLogger.setLevel(Level.WARN) diff --git a/migrator/src/main/scala/com/scylladb/migrator/readers/DynamoDB.scala b/migrator/src/main/scala/com/scylladb/migrator/readers/DynamoDB.scala index 51f46a69..4ef61d57 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/readers/DynamoDB.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/readers/DynamoDB.scala @@ -82,7 +82,8 @@ object DynamoDB { throughputReadPercent, tableDescription, maybeTtlDescription, - skipSegments) + skipSegments + ) val rdd = spark.sparkContext.hadoopRDD( diff --git a/tests/src/test/configurations/dynamodb-to-alternator-assume-role.yaml b/tests/src/test/configurations/dynamodb-to-alternator-assume-role.yaml new file mode 100644 index 00000000..57323aa6 --- /dev/null +++ b/tests/src/test/configurations/dynamodb-to-alternator-assume-role.yaml @@ -0,0 +1,25 @@ +source: + type: dynamodb + table: AssumeRoleTest + region: us-west-1 + credentials: + accessKey: "{AWS_ACCESS_KEY}" + secretKey: "{AWS_SECRET_KEY}" + assumeRole: + arn: "{AWS_ROLE_ARN}" + +target: + type: dynamodb + table: AssumeRoleTest + region: dummy + endpoint: + host: http://scylla + port: 8000 + credentials: + accessKey: dummy + secretKey: dummy + streamChanges: false + +savepoints: + path: /app/savepoints + intervalSeconds: 300 diff --git a/tests/src/test/configurations/dynamodb-to-alternator-streaming-skip-snapshot.yaml b/tests/src/test/configurations/dynamodb-to-alternator-streaming-skip-snapshot.yaml new file mode 100644 index 00000000..73e3d7a2 --- /dev/null +++ b/tests/src/test/configurations/dynamodb-to-alternator-streaming-skip-snapshot.yaml @@ -0,0 +1,26 @@ +source: + type: dynamodb + table: StreamedItemsSkipSnapshotTest + region: us-west-1 + credentials: + accessKey: "{AWS_ACCESS_KEY}" + secretKey: "{AWS_SECRET_KEY}" + assumeRole: + arn: "{AWS_ROLE_ARN}" + +target: + type: dynamodb + table: StreamedItemsSkipSnapshotTest + region: dummy + endpoint: + host: http://scylla + port: 8000 + credentials: + accessKey: dummy + secretKey: dummy + streamChanges: true + skipInitialSnapshotTransfer: true + +savepoints: + path: /app/savepoints + intervalSeconds: 300 diff --git a/tests/src/test/configurations/dynamodb-to-alternator-streaming.yaml b/tests/src/test/configurations/dynamodb-to-alternator-streaming.yaml new file mode 100644 index 00000000..45271c2a --- /dev/null +++ b/tests/src/test/configurations/dynamodb-to-alternator-streaming.yaml @@ -0,0 +1,25 @@ +source: + type: dynamodb + table: StreamedItemsTest + region: us-west-1 + credentials: + accessKey: "{AWS_ACCESS_KEY}" + secretKey: "{AWS_SECRET_KEY}" + assumeRole: + arn: "{AWS_ROLE_ARN}" + +target: + type: dynamodb + table: StreamedItemsTest + region: dummy + endpoint: + host: http://scylla + port: 8000 + credentials: + accessKey: dummy + secretKey: dummy + streamChanges: true + +savepoints: + path: /app/savepoints + intervalSeconds: 300 diff --git a/tests/src/test/scala/com/scylladb/migrator/AWS.scala b/tests/src/test/scala/com/scylladb/migrator/AWS.scala new file mode 100644 index 00000000..2b4dc954 --- /dev/null +++ b/tests/src/test/scala/com/scylladb/migrator/AWS.scala @@ -0,0 +1,4 @@ +package com.scylladb.migrator + +/** Used for tagging test suites that require AWS access */ +class AWS extends munit.Tag("AWS") diff --git a/tests/src/test/scala/com/scylladb/migrator/SparkUtils.scala b/tests/src/test/scala/com/scylladb/migrator/SparkUtils.scala index adb76c2e..f12192ee 100644 --- a/tests/src/test/scala/com/scylladb/migrator/SparkUtils.scala +++ b/tests/src/test/scala/com/scylladb/migrator/SparkUtils.scala @@ -1,6 +1,6 @@ package com.scylladb.migrator -import scala.sys.process.Process +import scala.sys.process.{ Process, ProcessBuilder } object SparkUtils { @@ -21,12 +21,15 @@ object SparkUtils { () } - /** - * @param migratorConfigFile Configuration file to use - * @param entryPoint Java entry point of the job - * @return The running process - */ def submitSparkJob(migratorConfigFile: String, entryPoint: String): Process = + submitSparkJobProcess(migratorConfigFile, entryPoint).run() + + /** + * @param migratorConfigFile Configuration file to use + * @param entryPoint Java entry point of the job + * @return The running process + */ + def submitSparkJobProcess(migratorConfigFile: String, entryPoint: String): ProcessBuilder = Process( Seq( "docker", @@ -51,6 +54,6 @@ object SparkUtils { // "--conf", "spark.executor.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5006", "/jars/scylla-migrator-assembly.jar" ) - ).run() + ) } diff --git a/tests/src/test/scala/com/scylladb/migrator/alternator/AssumeRoleTest.scala b/tests/src/test/scala/com/scylladb/migrator/alternator/AssumeRoleTest.scala new file mode 100644 index 00000000..c0d01402 --- /dev/null +++ b/tests/src/test/scala/com/scylladb/migrator/alternator/AssumeRoleTest.scala @@ -0,0 +1,30 @@ +package com.scylladb.migrator.alternator + +import com.scylladb.migrator.SparkUtils.successfullyPerformMigration +import software.amazon.awssdk.services.dynamodb.model.{AttributeValue, PutItemRequest} + +import scala.jdk.CollectionConverters._ + +/** Basic migration that uses the real AWS DynamoDB as a source and AssumeRole for authentication */ +class AssumeRoleTest extends MigratorSuiteWithAWS { + + withTable("AssumeRoleTest").test("Read from source and write to target") { tableName => + val configFileName = "dynamodb-to-alternator-assume-role.yaml" + + setupConfigurationFile(configFileName) + + // Insert some items + val keys = Map("id" -> AttributeValue.fromS("12345")) + val attrs = Map("foo" -> AttributeValue.fromS("bar")) + val itemData = keys ++ attrs + sourceDDb().putItem(PutItemRequest.builder().tableName(tableName).item(itemData.asJava).build()) + + // Perform the migration + successfullyPerformMigration(configFileName) + + checkSchemaWasMigrated(tableName) + + checkItemWasMigrated(tableName, keys, itemData) + } + +} diff --git a/tests/src/test/scala/com/scylladb/migrator/alternator/BasicMigrationTest.scala b/tests/src/test/scala/com/scylladb/migrator/alternator/BasicMigrationTest.scala index 773e0794..489cd69e 100644 --- a/tests/src/test/scala/com/scylladb/migrator/alternator/BasicMigrationTest.scala +++ b/tests/src/test/scala/com/scylladb/migrator/alternator/BasicMigrationTest.scala @@ -5,7 +5,7 @@ import software.amazon.awssdk.services.dynamodb.model.{AttributeValue, PutItemRe import scala.jdk.CollectionConverters._ -class BasicMigrationTest extends MigratorSuite { +class BasicMigrationTest extends MigratorSuiteWithDynamoDBLocal { withTable("BasicTest").test("Read from source and write to target") { tableName => val keys = Map("id" -> AttributeValue.fromS("12345")) @@ -13,7 +13,7 @@ class BasicMigrationTest extends MigratorSuite { val itemData = keys ++ attrs // Insert some items - sourceDDb.putItem(PutItemRequest.builder().tableName(tableName).item(itemData.asJava).build()) + sourceDDb().putItem(PutItemRequest.builder().tableName(tableName).item(itemData.asJava).build()) // Perform the migration successfullyPerformMigration("dynamodb-to-alternator-basic.yaml") diff --git a/tests/src/test/scala/com/scylladb/migrator/alternator/DynamoDBS3ExportMigrationTest.scala b/tests/src/test/scala/com/scylladb/migrator/alternator/DynamoDBS3ExportMigrationTest.scala index ea29c170..825d4bda 100644 --- a/tests/src/test/scala/com/scylladb/migrator/alternator/DynamoDBS3ExportMigrationTest.scala +++ b/tests/src/test/scala/com/scylladb/migrator/alternator/DynamoDBS3ExportMigrationTest.scala @@ -13,7 +13,7 @@ import java.nio.file.{Files, Path, Paths} import java.util.function.Consumer import scala.jdk.CollectionConverters._ -class DynamoDBS3ExportMigrationTest extends MigratorSuite { +class DynamoDBS3ExportMigrationTest extends MigratorSuiteWithDynamoDBLocal { val s3Client: S3Client = S3Client @@ -90,13 +90,13 @@ class DynamoDBS3ExportMigrationTest extends MigratorSuite { // Make sure to properly set up and clean up the target database and the S3 instance def withResources(bucketName: String, tableName: String): FunFixture[(String, String)] = FunFixture( setup = _ => { - deleteTableIfExists(targetAlternator, tableName) + deleteTableIfExists(targetAlternator(), tableName) deleteBucket(bucketName) s3Client.createBucket(CreateBucketRequest.builder().bucket(bucketName).build()) (bucketName, tableName) }, teardown = _ => { - targetAlternator.deleteTable(DeleteTableRequest.builder().tableName(tableName).build()) + targetAlternator().deleteTable(DeleteTableRequest.builder().tableName(tableName).build()) deleteBucket(bucketName) } ) diff --git a/tests/src/test/scala/com/scylladb/migrator/alternator/Issue103Test.scala b/tests/src/test/scala/com/scylladb/migrator/alternator/Issue103Test.scala index 5ac81a7c..0941dc75 100644 --- a/tests/src/test/scala/com/scylladb/migrator/alternator/Issue103Test.scala +++ b/tests/src/test/scala/com/scylladb/migrator/alternator/Issue103Test.scala @@ -6,7 +6,7 @@ import software.amazon.awssdk.services.dynamodb.model.{AttributeValue, PutItemRe import scala.jdk.CollectionConverters._ // Reproduction of https://github.com/scylladb/scylla-migrator/issues/103 -class Issue103Test extends MigratorSuite { +class Issue103Test extends MigratorSuiteWithDynamoDBLocal { withTable("Issue103Items").test("Issue #103 is fixed") { tableName => // Insert two items @@ -27,8 +27,8 @@ class Issue103Test extends MigratorSuite { ) val item2Data = keys2 ++ attrs2 - sourceDDb.putItem(PutItemRequest.builder().tableName(tableName).item(item1Data.asJava).build()) - sourceDDb.putItem(PutItemRequest.builder().tableName(tableName).item(item2Data.asJava).build()) + sourceDDb().putItem(PutItemRequest.builder().tableName(tableName).item(item1Data.asJava).build()) + sourceDDb().putItem(PutItemRequest.builder().tableName(tableName).item(item2Data.asJava).build()) // Perform the migration successfullyPerformMigration("dynamodb-to-alternator-issue-103.yaml") diff --git a/tests/src/test/scala/com/scylladb/migrator/alternator/MigratorSuite.scala b/tests/src/test/scala/com/scylladb/migrator/alternator/MigratorSuite.scala index b00859fe..e62f8a64 100644 --- a/tests/src/test/scala/com/scylladb/migrator/alternator/MigratorSuite.scala +++ b/tests/src/test/scala/com/scylladb/migrator/alternator/MigratorSuite.scala @@ -1,11 +1,16 @@ package com.scylladb.migrator.alternator -import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider} +import com.scylladb.migrator.AWS +import org.junit.experimental.categories.Category +import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, AwsSessionCredentials, StaticCredentialsProvider} import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.dynamodb.DynamoDbClient -import software.amazon.awssdk.services.dynamodb.model.{AttributeDefinition, AttributeValue, CreateTableRequest, DeleteTableRequest, DescribeTableRequest, GetItemRequest, GlobalSecondaryIndexDescription, KeySchemaElement, KeyType, LocalSecondaryIndex, LocalSecondaryIndexDescription, ProvisionedThroughput, ResourceNotFoundException, ScalarAttributeType} +import software.amazon.awssdk.services.dynamodb.model.{AttributeDefinition, AttributeValue, CreateTableRequest, DeleteTableRequest, DescribeTableRequest, GetItemRequest, GlobalSecondaryIndexDescription, KeySchemaElement, KeyType, LocalSecondaryIndexDescription, ProvisionedThroughput, ResourceNotFoundException, ScalarAttributeType} +import software.amazon.awssdk.services.sts.StsClient +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest import java.net.URI +import java.nio.file.{Files, Paths} import scala.util.chaining._ import scala.jdk.CollectionConverters._ @@ -14,24 +19,28 @@ import scala.jdk.CollectionConverters._ * * It expects external services (DynamoDB, Scylla, Spark, etc.) to be running. * See the files `CONTRIBUTING.md` and `docker-compose-tests.yml` for more information. + * */ trait MigratorSuite extends munit.FunSuite { /** Client of a source DynamoDB instance */ - val sourceDDb: DynamoDbClient = DynamoDbClient - .builder() - .region(Region.of("dummy")) - .endpointOverride(new URI("http://localhost:8001")) - .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("dummy", "dummy"))) - .build() + def sourceDDb: Fixture[DynamoDbClient] /** Client of a target Alternator instance */ - val targetAlternator: DynamoDbClient = DynamoDbClient - .builder() - .region(Region.of("dummy")) - .endpointOverride(new URI("http://localhost:8000")) - .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("dummy", "dummy"))) - .build() + val targetAlternator: Fixture[DynamoDbClient] = new Fixture[DynamoDbClient]("targetAlternator") { + private var client: DynamoDbClient = null + def apply(): DynamoDbClient = client + override def beforeAll(): Unit = { + client = + DynamoDbClient + .builder() + .region(Region.of("dummy")) + .endpointOverride(new URI("http://localhost:8000")) + .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("dummy", "dummy"))) + .build() + } + override def afterAll(): Unit = client.close() + } /** * Fixture automating the house-keeping work when migrating a table. @@ -47,8 +56,8 @@ trait MigratorSuite extends munit.FunSuite { def withTable(name: String): FunFixture[String] = FunFixture( setup = { _ => // Make sure the target database does not contain the table already - deleteTableIfExists(sourceDDb, name) - deleteTableIfExists(targetAlternator, name) + deleteTableIfExists(sourceDDb(), name) + deleteTableIfExists(targetAlternator(), name) try { // Create the table in the source database val createTableRequest = @@ -59,23 +68,23 @@ trait MigratorSuite extends munit.FunSuite { .attributeDefinitions(AttributeDefinition.builder().attributeName("id").attributeType(ScalarAttributeType.S).build()) .provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(25L).writeCapacityUnits(25L).build()) .build() - sourceDDb.createTable(createTableRequest) + sourceDDb().createTable(createTableRequest) val waiterResponse = - sourceDDb + sourceDDb() .waiter() .waitUntilTableExists(describeTableRequest(name)) assert(waiterResponse.matched().response().isPresent, s"Failed to create table ${name}: ${waiterResponse.matched().exception().get()}") } catch { case any: Throwable => - fail(s"Failed to create table ${name} in database ${sourceDDb}", any) + fail(s"Failed to create table ${name} in database ${sourceDDb()}", any) } name }, teardown = { _ => // Clean-up both the source and target databases because we assume the test did replicate the table // to the target database - targetAlternator.deleteTable(DeleteTableRequest.builder().tableName(name).build()) - sourceDDb.deleteTable(DeleteTableRequest.builder().tableName(name).build()) + targetAlternator().deleteTable(DeleteTableRequest.builder().tableName(name).build()) + sourceDDb().deleteTable(DeleteTableRequest.builder().tableName(name).build()) () } ) @@ -108,7 +117,7 @@ trait MigratorSuite extends munit.FunSuite { /** Check that the table schema in the target database is the same as in the source database */ def checkSchemaWasMigrated(tableName: String): Unit = { - val sourceTableDesc = sourceDDb.describeTable(describeTableRequest(tableName)).table + val sourceTableDesc = sourceDDb().describeTable(describeTableRequest(tableName)).table checkSchemaWasMigrated( tableName, sourceTableDesc.keySchema, @@ -125,7 +134,7 @@ trait MigratorSuite extends munit.FunSuite { attributeDefinitions: java.util.List[AttributeDefinition], localSecondaryIndexes: java.util.List[LocalSecondaryIndexDescription], globalSecondaryIndexes: java.util.List[GlobalSecondaryIndexDescription]): Unit = { - targetAlternator + targetAlternator() .describeTable(describeTableRequest(tableName)) .table .tap { targetTableDesc => @@ -155,11 +164,86 @@ trait MigratorSuite extends munit.FunSuite { /** Check that the target database contains the provided item description */ def checkItemWasMigrated(tableName: String, itemKey: Map[String, AttributeValue], itemData: Map[String, AttributeValue]): Unit = { - targetAlternator + targetAlternator() .getItem(GetItemRequest.builder.tableName(tableName).key(itemKey.asJava).build()) .tap { itemResult => assertEquals(itemResult.item.asScala.toMap, itemData) } } + override def munitFixtures: Seq[Fixture[_]] = Seq(sourceDDb, targetAlternator) +} + +trait MigratorSuiteWithDynamoDBLocal extends MigratorSuite { + + lazy val sourceDDb: Fixture[DynamoDbClient] = new Fixture[DynamoDbClient]("sourceDDb") { + private var client: DynamoDbClient = null + def apply(): DynamoDbClient = client + override def beforeAll(): Unit = { + client = + DynamoDbClient + .builder() + .region(Region.of("dummy")) + .endpointOverride(new URI("http://localhost:8001")) + .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("dummy", "dummy"))) + .build() + } + override def afterAll(): Unit = client.close() + } + +} + +@Category(Array(classOf[AWS])) +abstract class MigratorSuiteWithAWS extends MigratorSuite { + + lazy val sourceDDb: Fixture[DynamoDbClient] = new Fixture[DynamoDbClient]("sourceDDb") { + private var client: DynamoDbClient = null + def apply(): DynamoDbClient = client + override def beforeAll(): Unit = { + val region = Region.US_WEST_1 // FIXME + val accessKey = sys.env("AWS_ACCESS_KEY") + val secretKey = sys.env("AWS_SECRET_KEY") + val roleArn = sys.env("AWS_ROLE_ARN") + val stsClient = + StsClient + .builder() + .credentialsProvider( + StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey)) + ) + .build() + val credentials = + stsClient + .assumeRole( + AssumeRoleRequest + .builder() + .roleArn(roleArn) + .roleSessionName("MigratorTest") + .build() + ) + .credentials() + client = + DynamoDbClient + .builder() + .region(region) + .credentialsProvider( + StaticCredentialsProvider.create( + AwsSessionCredentials.create(credentials.accessKeyId, credentials.secretAccessKey, credentials.sessionToken) + ) + ) + .build() + } + override def afterAll(): Unit = client.close() + } + + def setupConfigurationFile(configFileName: String): Unit = { + val configFilePath = Paths.get("src", "test", "configurations", configFileName) + val configFileContent = new String(Files.readAllBytes(configFilePath)) + val updatedConfigFileContent = + configFileContent + .replace("{AWS_ACCESS_KEY}", sys.env("AWS_ACCESS_KEY")) + .replace("{AWS_SECRET_KEY}", sys.env("AWS_SECRET_KEY")) + .replace("{AWS_ROLE_ARN}", sys.env("AWS_ROLE_ARN")) + Files.write(configFilePath, updatedConfigFileContent.getBytes) + } + } diff --git a/tests/src/test/scala/com/scylladb/migrator/alternator/RenamedItemsTest.scala b/tests/src/test/scala/com/scylladb/migrator/alternator/RenamedItemsTest.scala index 4721a984..22080bcd 100644 --- a/tests/src/test/scala/com/scylladb/migrator/alternator/RenamedItemsTest.scala +++ b/tests/src/test/scala/com/scylladb/migrator/alternator/RenamedItemsTest.scala @@ -5,7 +5,7 @@ import software.amazon.awssdk.services.dynamodb.model.{AttributeValue, PutItemRe import scala.jdk.CollectionConverters._ -class RenamedItemsTest extends MigratorSuite { +class RenamedItemsTest extends MigratorSuiteWithDynamoDBLocal { withTable("RenamedItems").test("Rename items along the migration") { tableName => // Insert several items @@ -20,8 +20,8 @@ class RenamedItemsTest extends MigratorSuite { ) val item2Data = keys2 ++ attrs2 - sourceDDb.putItem(PutItemRequest.builder().tableName(tableName).item(item1Data.asJava).build()) - sourceDDb.putItem(PutItemRequest.builder().tableName(tableName).item(item2Data.asJava).build()) + sourceDDb().putItem(PutItemRequest.builder().tableName(tableName).item(item1Data.asJava).build()) + sourceDDb().putItem(PutItemRequest.builder().tableName(tableName).item(item2Data.asJava).build()) // Perform the migration successfullyPerformMigration("dynamodb-to-alternator-renames.yaml") diff --git a/tests/src/test/scala/com/scylladb/migrator/alternator/SecondaryIndexesTest.scala b/tests/src/test/scala/com/scylladb/migrator/alternator/SecondaryIndexesTest.scala index 9ea2ccf9..fee2d627 100644 --- a/tests/src/test/scala/com/scylladb/migrator/alternator/SecondaryIndexesTest.scala +++ b/tests/src/test/scala/com/scylladb/migrator/alternator/SecondaryIndexesTest.scala @@ -3,13 +3,13 @@ package com.scylladb.migrator.alternator import com.scylladb.migrator.SparkUtils.successfullyPerformMigration import software.amazon.awssdk.services.dynamodb.model.{AttributeDefinition, CreateTableRequest, DeleteTableRequest, GlobalSecondaryIndex, KeySchemaElement, KeyType, LocalSecondaryIndex, Projection, ProjectionType, ProvisionedThroughput, ScalarAttributeType} -class SecondaryIndexesTest extends MigratorSuite { +class SecondaryIndexesTest extends MigratorSuiteWithDynamoDBLocal { val tableName = "TableWithSecondaryIndexes" val withResources: FunFixture[Unit] = FunFixture( setup = _ => { - deleteTableIfExists(sourceDDb, tableName) - deleteTableIfExists(targetAlternator, tableName) + deleteTableIfExists(sourceDDb(), tableName) + deleteTableIfExists(targetAlternator(), tableName) try { val createTableRequest = CreateTableRequest @@ -50,21 +50,21 @@ class SecondaryIndexesTest extends MigratorSuite { .build() ) .build() - sourceDDb.createTable(createTableRequest) + sourceDDb().createTable(createTableRequest) val waiterResponse = - sourceDDb + sourceDDb() .waiter() .waitUntilTableExists(describeTableRequest(tableName)) assert(waiterResponse.matched().response().isPresent, s"Failed to create table ${tableName}: ${waiterResponse.matched().exception().get()}") } catch { case any: Throwable => - fail(s"Failed to create table ${tableName} in database ${sourceDDb}", any) + fail(s"Failed to create table ${tableName} in database ${sourceDDb()}", any) } () }, teardown = _ => { - targetAlternator.deleteTable(DeleteTableRequest.builder().tableName(tableName).build()) - sourceDDb.deleteTable(DeleteTableRequest.builder().tableName(tableName).build()) + targetAlternator().deleteTable(DeleteTableRequest.builder().tableName(tableName).build()) + sourceDDb().deleteTable(DeleteTableRequest.builder().tableName(tableName).build()) () } ) diff --git a/tests/src/test/scala/com/scylladb/migrator/alternator/SkippedItemsTest.scala b/tests/src/test/scala/com/scylladb/migrator/alternator/SkippedItemsTest.scala index 26973c4a..d2cf5a94 100644 --- a/tests/src/test/scala/com/scylladb/migrator/alternator/SkippedItemsTest.scala +++ b/tests/src/test/scala/com/scylladb/migrator/alternator/SkippedItemsTest.scala @@ -6,7 +6,7 @@ import software.amazon.awssdk.services.dynamodb.model.{AttributeValue, GetItemRe import scala.jdk.CollectionConverters._ import scala.util.chaining._ -class SkippedItemsTest extends MigratorSuite { +class SkippedItemsTest extends MigratorSuiteWithDynamoDBLocal { withTable("TtlTable").test("Expired items should be filtered out from the source table") { tableName => // Insert two items, one of them is expired @@ -15,18 +15,18 @@ class SkippedItemsTest extends MigratorSuite { val keys1 = Map("id" -> AttributeValue.fromS("12345")) val attrs1 = Map("foo" -> AttributeValue.fromN((now + oneDay).toString)) val item1Data = keys1 ++ attrs1 - sourceDDb.putItem( + sourceDDb().putItem( PutItemRequest.builder().tableName(tableName).item(item1Data.asJava).build() ) val keys2 = Map("id" -> AttributeValue.fromS("67890")) val attrs2 = Map("foo" -> AttributeValue.fromN((now - oneDay).toString)) val item2Data = keys2 ++ attrs2 - sourceDDb.putItem( + sourceDDb().putItem( PutItemRequest.builder().tableName(tableName).item(item2Data.asJava).build() ) // Enable TTL - sourceDDb.updateTimeToLive( + sourceDDb().updateTimeToLive( UpdateTimeToLiveRequest .builder() .tableName(tableName) @@ -48,7 +48,7 @@ class SkippedItemsTest extends MigratorSuite { // Check that expired item is still present in the source before the migration val getItem2Request = GetItemRequest.builder().tableName(tableName).key(keys2.asJava).build() - sourceDDb + sourceDDb() .getItem(getItem2Request) .tap { itemResult => assert(itemResult.hasItem) @@ -60,7 +60,7 @@ class SkippedItemsTest extends MigratorSuite { checkItemWasMigrated(tableName, keys1, item1Data) // Expired item has been skipped - targetAlternator + targetAlternator() .getItem(getItem2Request) .tap { itemResult => assert(!itemResult.hasItem) diff --git a/tests/src/test/scala/com/scylladb/migrator/alternator/SkippedSegmentsTest.scala b/tests/src/test/scala/com/scylladb/migrator/alternator/SkippedSegmentsTest.scala index d2c0b505..4d0aefcc 100644 --- a/tests/src/test/scala/com/scylladb/migrator/alternator/SkippedSegmentsTest.scala +++ b/tests/src/test/scala/com/scylladb/migrator/alternator/SkippedSegmentsTest.scala @@ -7,7 +7,7 @@ import java.util.UUID import scala.jdk.CollectionConverters._ import scala.util.chaining._ -class SkippedSegmentsTest extends MigratorSuite { +class SkippedSegmentsTest extends MigratorSuiteWithDynamoDBLocal { withTable("SkippedSegments").test("Run partial migrations") { tableName => // We rely on the fact that both config files have `scanSegments: 3` and @@ -19,7 +19,7 @@ class SkippedSegmentsTest extends MigratorSuite { // Initially, the target table does not exist try { - targetAlternator.describeTable(describeTableRequest(tableName)) + targetAlternator().describeTable(describeTableRequest(tableName)) fail(s"The table ${tableName} should not exist yet") } catch { case _: ResourceNotFoundException => @@ -54,12 +54,12 @@ class SkippedSegmentsTest extends MigratorSuite { "bar" -> AttributeValue.fromS(UUID.randomUUID().toString), "baz" -> AttributeValue.fromS(UUID.randomUUID().toString) ) - sourceDDb.putItem(PutItemRequest.builder().tableName(tableName).item(itemData.asJava).build()) + sourceDDb().putItem(PutItemRequest.builder().tableName(tableName).item(itemData.asJava).build()) } } def targetAlternatorItemCount(tableName: String): Long = - targetAlternator + targetAlternator() .scanPaginator(ScanRequest.builder().tableName(tableName).build()) .items() .stream() diff --git a/tests/src/test/scala/com/scylladb/migrator/alternator/StreamedItemsTest.scala b/tests/src/test/scala/com/scylladb/migrator/alternator/StreamedItemsTest.scala new file mode 100644 index 00000000..b6fb63ed --- /dev/null +++ b/tests/src/test/scala/com/scylladb/migrator/alternator/StreamedItemsTest.scala @@ -0,0 +1,192 @@ +package com.scylladb.migrator.alternator + +import com.scylladb.migrator.SparkUtils.submitSparkJobProcess +import software.amazon.awssdk.services.dynamodb.model.{AttributeValue, GetItemRequest, PutItemRequest} + +import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration} +import scala.jdk.CollectionConverters._ +import scala.sys.process.{Process, ProcessLogger} +import scala.util.chaining.scalaUtilChainingOps + +class StreamedItemsTest extends MigratorSuiteWithAWS { + + override val munitTimeout: Duration = 120.seconds + + withTable("StreamedItemsTest").test("Stream changes") { tableName => + val configFileName = "dynamodb-to-alternator-streaming.yaml" + setupConfigurationFile(configFileName) + + // Populate the source table + val keys1 = Map("id" -> AttributeValue.fromS("12345")) + val attrs1 = Map("foo" -> AttributeValue.fromS("bar")) + val item1Data = keys1 ++ attrs1 + sourceDDb().putItem(PutItemRequest.builder().tableName(tableName).item(item1Data.asJava).build()) + + // Perform the migration + val sparkLogs = new StringBuilder() + val sparkJob = + submitSparkJobProcess(configFileName, "com.scylladb.migrator.Migrator") + .run(ProcessLogger { log => + sparkLogs ++= log +// println(log) // Uncomment to see the logs + }) + + awaitAtMost(60.seconds) { + assert(sparkLogs.toString().contains(s"Table ${tableName} created.")) + } + // Check that the table initial snapshot has been successfully migrated + awaitAtMost(60.seconds) { + targetAlternator() + .getItem(GetItemRequest.builder().tableName(tableName).key(keys1.asJava).build()) + .tap { itemResult => + assert(itemResult.hasItem, "First item not found in the target database") + assertEquals(itemResult.item.asScala.toMap, item1Data) + } + } + + // Insert one more item + val keys2 = Map("id" -> AttributeValue.fromS("67890")) + val attrs2 = Map( + "foo" -> AttributeValue.fromS("baz"), + "baz" -> AttributeValue.fromBool(false) + ) + val item2Data = keys2 ++ attrs2 + sourceDDb().putItem(PutItemRequest.builder().tableName(tableName).item(item2Data.asJava).build()) + + // Check that the added item has also been migrated + awaitAtMost(60.seconds) { + targetAlternator() + .getItem(GetItemRequest.builder().tableName(tableName).key(keys2.asJava).build()) + .tap { itemResult => + assert(itemResult.hasItem, "Second item not found in the target database") + assertEquals( + itemResult.item.asScala.toMap, + item2Data + ("_dynamo_op_type" -> AttributeValue.fromBool(true))) + } + } + + // Stop the migration job + stopSparkJob(configFileName) + assertEquals(sparkJob.exitValue(), 143) // 143 = SIGTERM + + deleteStreamTable(tableName) + } + + withTable("StreamedItemsSkipSnapshotTest").test("Stream changes but skip initial snapshot") { tableName => + val configFileName = "dynamodb-to-alternator-streaming-skip-snapshot.yaml" + setupConfigurationFile(configFileName) + + // Populate the source table + val keys1 = Map("id" -> AttributeValue.fromS("12345")) + val attrs1 = Map("foo" -> AttributeValue.fromS("bar")) + val item1Data = keys1 ++ attrs1 + sourceDDb().putItem(PutItemRequest.builder().tableName(tableName).item(item1Data.asJava).build()) + + // Perform the migration + val sparkLogs = new StringBuilder() + val sparkJob = + submitSparkJobProcess(configFileName, "com.scylladb.migrator.Migrator") + .run(ProcessLogger { (log: String) => + sparkLogs ++= log +// println(log) // Uncomment to see the logs + }) + + // Wait for the changes to start being streamed + awaitAtMost(60.seconds) { + assert(sparkLogs.toString().contains("alternator: Starting to transfer changes")) + } + + // Insert one more item + val keys2 = Map("id" -> AttributeValue.fromS("67890")) + val attrs2 = Map( + "foo" -> AttributeValue.fromS("baz"), + "baz" -> AttributeValue.fromBool(false) + ) + val item2Data = keys2 ++ attrs2 + sourceDDb().putItem(PutItemRequest.builder().tableName(tableName).item(item2Data.asJava).build()) + + // Check that only the second item has been migrated + awaitAtMost(60.seconds) { + targetAlternator() + .getItem(GetItemRequest.builder().tableName(tableName).key(keys2.asJava).build()) + .tap { itemResult => + assert(itemResult.hasItem, "Second item not found in the target database") + assertEquals( + itemResult.item.asScala.toMap, + item2Data + ("_dynamo_op_type" -> AttributeValue.fromBool(true))) + } + } + targetAlternator() + .getItem(GetItemRequest.builder().tableName(tableName).key(keys1.asJava).build()) + .tap { itemResult => + assert(!itemResult.hasItem, "First item found in the target database") + } + + // Stop the migration job + stopSparkJob(configFileName) + assertEquals(sparkJob.exitValue(), 143) + + deleteStreamTable(tableName) + } + + /** + * Continuously evaluate the provided `assertion` until it terminates + * (MUnit models failures with exceptions of type `AssertionError`), + * or until the provided `delay` passed. + */ + def awaitAtMost(delay: FiniteDuration)(assertion: => Unit): Unit = { + val deadline = delay.fromNow + var maybeFailure: Option[AssertionError] = Some(new AssertionError("Assertion has not been tested yet")) + while (maybeFailure.isDefined && deadline.hasTimeLeft()) { + try { + assertion + maybeFailure = None + } catch { + case failure: AssertionError => + maybeFailure = Some(failure) + Thread.sleep(1000) + } + } + for (failure <- maybeFailure) { + fail(s"Assertion was not true after ${delay}", failure) + } + } + + /** + * Delete the DynamoDB table automatically created by the Kinesis library + * @param sourceTableName Source table that had streams enabled + */ + private def deleteStreamTable(sourceTableName: String): Unit = { + sourceDDb() + .listTablesPaginator() + .tableNames() + .stream() + .filter(name => name.startsWith(s"migrator_${sourceTableName}_")) + .forEach { streamTableName => + deleteTableIfExists(sourceDDb(), streamTableName) + } + } + + // This looks more complicated than it should, but this is what it takes to stop a + // process started via “docker compose exec ...”. + // Indeed, stopping the host process does not stop the container process, see: + // https://github.com/moby/moby/issues/9098 + // So, we look into the container for the PID of the migration process and kill it. + private def stopSparkJob(migrationConfigFile: String): Unit = { + val commandPrefix = Seq("docker", "compose", "-f", "../docker-compose-tests.yml", "exec", "spark-master") + val pid = + Process(commandPrefix ++ Seq("ps", "-ef")) + .lazyLines + // Find the process that contains the arguments we passed when we submitted the Spark job + .filter(_.contains(s"--conf spark.scylla.config=/app/configurations/${migrationConfigFile}")) + .head + .split("\\s+") + .apply(1) // The 2nd column contains the process ID + println(s"Stopping Spark job whose PID is ${pid}") + Process(commandPrefix ++ Seq("kill", pid)) + .run() + .exitValue() + .ensuring(_ == 0) + } + +} diff --git a/tests/src/test/scala/com/scylladb/migrator/alternator/ValidatorTest.scala b/tests/src/test/scala/com/scylladb/migrator/alternator/ValidatorTest.scala index 6a6d2a34..1f3619e7 100644 --- a/tests/src/test/scala/com/scylladb/migrator/alternator/ValidatorTest.scala +++ b/tests/src/test/scala/com/scylladb/migrator/alternator/ValidatorTest.scala @@ -6,7 +6,7 @@ import software.amazon.awssdk.services.dynamodb.model.{AttributeAction, Attribut import scala.jdk.CollectionConverters._ import scala.util.chaining._ -class ValidatorTest extends MigratorSuite { +class ValidatorTest extends MigratorSuiteWithDynamoDBLocal { withTable("BasicTest").test("Validate migration") { tableName => val configFile = "dynamodb-to-alternator-basic.yaml" @@ -16,7 +16,7 @@ class ValidatorTest extends MigratorSuite { val itemData = keys ++ attrs // Insert some items - sourceDDb.putItem(PutItemRequest.builder().tableName(tableName).item(itemData.asJava).build()) + sourceDDb().putItem(PutItemRequest.builder().tableName(tableName).item(itemData.asJava).build()) // Perform the migration successfullyPerformMigration(configFile) @@ -27,7 +27,7 @@ class ValidatorTest extends MigratorSuite { } // Change the value of an item - targetAlternator.updateItem( + targetAlternator().updateItem( UpdateItemRequest .builder() .tableName(tableName) diff --git a/tests/src/test/scala/com/scylladb/migrator/scylla/BasicMigrationTest.scala b/tests/src/test/scala/com/scylladb/migrator/scylla/BasicMigrationTest.scala index 4342ffef..b1259fe9 100644 --- a/tests/src/test/scala/com/scylladb/migrator/scylla/BasicMigrationTest.scala +++ b/tests/src/test/scala/com/scylladb/migrator/scylla/BasicMigrationTest.scala @@ -22,7 +22,7 @@ class BasicMigrationTest extends MigratorSuite(sourcePort = 9043) { .build() // Insert some items - sourceCassandra.execute(insertStatement) + sourceCassandra().execute(insertStatement) // Perform the migration successfullyPerformMigration("cassandra-to-scylla-basic.yaml") @@ -32,7 +32,7 @@ class BasicMigrationTest extends MigratorSuite(sourcePort = 9043) { .selectFrom(keyspace, tableName) .all() .build() - targetScylla.execute(selectAllStatement).tap { resultSet => + targetScylla().execute(selectAllStatement).tap { resultSet => val rows = resultSet.all().asScala assertEquals(rows.size, 1) val row = rows.head diff --git a/tests/src/test/scala/com/scylladb/migrator/scylla/MigratorSuite.scala b/tests/src/test/scala/com/scylladb/migrator/scylla/MigratorSuite.scala index 2dab2664..1188b2ff 100644 --- a/tests/src/test/scala/com/scylladb/migrator/scylla/MigratorSuite.scala +++ b/tests/src/test/scala/com/scylladb/migrator/scylla/MigratorSuite.scala @@ -19,21 +19,48 @@ abstract class MigratorSuite(sourcePort: Int) extends munit.FunSuite { val keyspace = "test" + private val createKeyspaceStatement = + SchemaBuilder + .createKeyspace(keyspace) + .ifNotExists() + .withReplicationOptions(Map[String, AnyRef]( + "class" -> "SimpleStrategy", + "replication_factor" -> Integer.valueOf(1)).asJava) + .build() + /** Client of a source Cassandra instance */ - val sourceCassandra: CqlSession = CqlSession - .builder() - .addContactPoint(new InetSocketAddress("localhost", sourcePort)) - .withLocalDatacenter("datacenter1") - .withAuthCredentials("dummy", "dummy") - .build() + val sourceCassandra: Fixture[CqlSession] = new Fixture[CqlSession]("sourceCassandra") { + private var session: CqlSession = null + def apply(): CqlSession = session + override def beforeAll(): Unit = { + session = + CqlSession + .builder() + .addContactPoint(new InetSocketAddress("localhost", sourcePort)) + .withLocalDatacenter("datacenter1") + .withAuthCredentials("dummy", "dummy") + .build() + session.execute(createKeyspaceStatement) + } + override def afterAll(): Unit = session.close() + } /** Client of a target ScyllaDB instance */ - val targetScylla: CqlSession = CqlSession - .builder() - .addContactPoint(new InetSocketAddress("localhost", 9042)) - .withLocalDatacenter("datacenter1") - .withAuthCredentials("dummy", "dummy") - .build() + val targetScylla: Fixture[CqlSession] = new Fixture[CqlSession]("targetScylla") { + var session: CqlSession = null + def apply(): CqlSession = session + override def beforeAll(): Unit = { + session = + CqlSession + .builder() + .addContactPoint(new InetSocketAddress("localhost", 9042)) + .withLocalDatacenter("datacenter1") + .withAuthCredentials("dummy", "dummy") + .build() + session.execute(createKeyspaceStatement) + } + override def afterAll(): Unit = session.close() + } /** * Fixture automating the house-keeping work when migrating a table. @@ -51,9 +78,9 @@ abstract class MigratorSuite(sourcePort: Int) extends munit.FunSuite { setup = { _ => // Make sure the source and target databases do not contain the table already try { - dropAndRecreateTable(sourceCassandra, keyspace, name, columnName = identity) + dropAndRecreateTable(sourceCassandra(), keyspace, name, columnName = identity) dropAndRecreateTable( - targetScylla, + targetScylla(), keyspace, name, columnName = originalName => renames.getOrElse(originalName, originalName)) @@ -66,28 +93,12 @@ abstract class MigratorSuite(sourcePort: Int) extends munit.FunSuite { teardown = { _ => // Clean-up both the source and target databases val dropTableQuery = SchemaBuilder.dropTable(keyspace, name).build() - targetScylla.execute(dropTableQuery) - sourceCassandra.execute(dropTableQuery) + targetScylla().execute(dropTableQuery) + sourceCassandra().execute(dropTableQuery) () } ) - override def beforeAll(): Unit = { - val keyspaceStatement = - SchemaBuilder - .createKeyspace(keyspace) - .ifNotExists() - .withReplicationOptions(Map[String, AnyRef]( - "class" -> "SimpleStrategy", - "replication_factor" -> new Integer(1)).asJava) - .build() - sourceCassandra.execute(keyspaceStatement) - targetScylla.execute(keyspaceStatement) - } - - override def afterAll(): Unit = { - sourceCassandra.close() - targetScylla.close() - } + override def munitFixtures: Seq[Fixture[_]] = Seq(sourceCassandra, targetScylla) } diff --git a/tests/src/test/scala/com/scylladb/migrator/scylla/RenamedItemsTest.scala b/tests/src/test/scala/com/scylladb/migrator/scylla/RenamedItemsTest.scala index f6cdc442..9363790c 100644 --- a/tests/src/test/scala/com/scylladb/migrator/scylla/RenamedItemsTest.scala +++ b/tests/src/test/scala/com/scylladb/migrator/scylla/RenamedItemsTest.scala @@ -24,7 +24,7 @@ class RenamedItemsTest extends MigratorSuite(sourcePort = 9043) { .build() // Insert some items - sourceCassandra.execute(insertStatement) + sourceCassandra().execute(insertStatement) // Perform the migration successfullyPerformMigration("cassandra-to-scylla-renames.yaml") @@ -34,7 +34,7 @@ class RenamedItemsTest extends MigratorSuite(sourcePort = 9043) { .selectFrom(keyspace, tableName) .all() .build() - targetScylla.execute(selectAllStatement).tap { resultSet => + targetScylla().execute(selectAllStatement).tap { resultSet => val rows = resultSet.all().asScala assertEquals(rows.size, 1) val row = rows.head diff --git a/tests/src/test/scala/com/scylladb/migrator/scylla/ScyllaToScyllaBasicMigrationTest.scala b/tests/src/test/scala/com/scylladb/migrator/scylla/ScyllaToScyllaBasicMigrationTest.scala index 0e0a82f4..01072533 100644 --- a/tests/src/test/scala/com/scylladb/migrator/scylla/ScyllaToScyllaBasicMigrationTest.scala +++ b/tests/src/test/scala/com/scylladb/migrator/scylla/ScyllaToScyllaBasicMigrationTest.scala @@ -22,7 +22,7 @@ class ScyllaToScyllaBasicMigrationTest extends MigratorSuite(sourcePort = 9044) .build() // Insert some items - sourceCassandra.execute(insertStatement) + sourceCassandra().execute(insertStatement) // Perform the migration successfullyPerformMigration("scylla-to-scylla-basic.yaml") @@ -32,7 +32,7 @@ class ScyllaToScyllaBasicMigrationTest extends MigratorSuite(sourcePort = 9044) .selectFrom(keyspace, tableName) .all() .build() - targetScylla.execute(selectAllStatement).tap { resultSet => + targetScylla().execute(selectAllStatement).tap { resultSet => val rows = resultSet.all().asScala assertEquals(rows.size, 1) val row = rows.head diff --git a/tests/src/test/scala/com/scylladb/migrator/scylla/ValidatorTest.scala b/tests/src/test/scala/com/scylladb/migrator/scylla/ValidatorTest.scala index b3484b08..adf26452 100644 --- a/tests/src/test/scala/com/scylladb/migrator/scylla/ValidatorTest.scala +++ b/tests/src/test/scala/com/scylladb/migrator/scylla/ValidatorTest.scala @@ -23,7 +23,7 @@ class ValidatorTest extends MigratorSuite(sourcePort = 9043) { "foo" -> literal("bar") ).asJava) .build() - sourceCassandra.execute(insertStatement) + sourceCassandra().execute(insertStatement) // Perform the migration successfullyPerformMigration(configFile) @@ -40,7 +40,7 @@ class ValidatorTest extends MigratorSuite(sourcePort = 9043) { .setColumn("foo", literal("baz")) .whereColumn("id").isEqualTo(literal("12345")) .build() - targetScylla.execute(updateStatement) + targetScylla().execute(updateStatement) // Check that the validation failed because of the introduced change submitSparkJob(configFile, "com.scylladb.migrator.Validator").exitValue().tap { statusCode => From 127429daf13d0a6af371775275daaa5e3f20e646 Mon Sep 17 00:00:00 2001 From: Julien Richard-Foy Date: Fri, 30 Aug 2024 10:13:02 +0200 Subject: [PATCH 2/6] Use AssumeRoleWithWebIdentity instead of AssumeRole with static IAM credentials - Get temporary credentials from the GitHub workflow, and forward those credentials to the Spark nodes - Remove AssumeRoleTest, which cannot be executed anymore --- .github/workflows/tests-aws.yml | 17 +++++-- CONTRIBUTING.md | 6 +-- docker-compose-tests.yml | 2 + tests/docker/.gitignore | 2 + tests/docker/aws-profile/.gitignore | 4 ++ .../dynamodb-to-alternator-assume-role.yaml | 25 --------- ...to-alternator-streaming-skip-snapshot.yaml | 7 +-- .../dynamodb-to-alternator-streaming.yaml | 7 +-- .../migrator/alternator/AssumeRoleTest.scala | 30 ----------- .../migrator/alternator/MigratorSuite.scala | 51 ++++--------------- .../alternator/StreamedItemsTest.scala | 2 - 11 files changed, 35 insertions(+), 118 deletions(-) create mode 100644 tests/docker/aws-profile/.gitignore delete mode 100644 tests/src/test/configurations/dynamodb-to-alternator-assume-role.yaml delete mode 100644 tests/src/test/scala/com/scylladb/migrator/alternator/AssumeRoleTest.scala diff --git a/.github/workflows/tests-aws.yml b/.github/workflows/tests-aws.yml index fc0e807b..6c6b53c0 100644 --- a/.github/workflows/tests-aws.yml +++ b/.github/workflows/tests-aws.yml @@ -8,6 +8,13 @@ on: - '**.sbt' workflow_dispatch: +env: + AWS_REGION: us-east-1 + +permissions: + id-token: write + contents: read + jobs: test: name: Test @@ -23,6 +30,12 @@ jobs: distribution: temurin java-version: 8 cache: sbt + - name: configure aws credentials + uses: aws-actions/configure-aws-credentials@v4 + with: + aws-region: ${{ env.AWS_REGION }} + role-to-assume: ${{ secrets.AWS_ROLE_ARN }} + role-session-name: GitHub_to_AWS_via_FederatedOIDC - name: Build migrator run: ./build.sh - name: Set up services @@ -34,9 +47,5 @@ jobs: .github/wait-for-port.sh 8081 # Spark worker - name: Run tests accessing AWS run: sbt "testOnly -- --include-categories=com.scylladb.migrator.AWS" - env: - AWS_ACCESS_KEY: ${{ secrets.AWS_ACCESS_KEY }} - AWS_SECRET_KEY: ${{ secrets.AWS_SECRET_KEY }} - AWS_ROLE_ARN: ${{ secrets.AWS_ROLE_ARN }} - name: Stop services run: docker compose -f docker-compose-tests.yml down diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 3ff11d91..2bba06b7 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -28,12 +28,10 @@ Tests are implemented in the `tests` sbt submodule. They simulate the submission sbt testOnly com.scylladb.migrator.BasicMigrationTest ~~~ - Or, to run the tests that access AWS: + Or, to run the tests that access AWS, first configure your AWS credentials with `aws configure`, and then: ~~~ sh - AWS_ACCESS_KEY=... \ - AWS_SECRET_KEY=... \ - AWS_ROLE_NAME=... \ + AWS_REGION=us-east-1 \ sbt "testOnly -- --include-categories=com.scylladb.migrator.AWS" ~~~ diff --git a/docker-compose-tests.yml b/docker-compose-tests.yml index 8428a4c5..948854ec 100644 --- a/docker-compose-tests.yml +++ b/docker-compose-tests.yml @@ -63,6 +63,7 @@ services: - ./migrator/target/scala-2.13:/jars - ./tests/src/test/configurations:/app/configurations - ./tests/docker/spark-master:/app/savepoints + - ./tests/docker/aws-profile:/root/.aws - ./tests/docker/parquet:/app/parquet spark-worker: @@ -80,5 +81,6 @@ services: - 8081:8081 volumes: - ./tests/docker/parquet:/app/parquet + - ./tests/docker/aws-profile:/root/.aws depends_on: - spark-master diff --git a/tests/docker/.gitignore b/tests/docker/.gitignore index 7deefc53..d26b6ae4 100644 --- a/tests/docker/.gitignore +++ b/tests/docker/.gitignore @@ -1,3 +1,5 @@ cassandra/ +s3/ scylla/ +scylla-source/ spark-master/ diff --git a/tests/docker/aws-profile/.gitignore b/tests/docker/aws-profile/.gitignore new file mode 100644 index 00000000..5e7d2734 --- /dev/null +++ b/tests/docker/aws-profile/.gitignore @@ -0,0 +1,4 @@ +# Ignore everything in this directory +* +# Except this file +!.gitignore diff --git a/tests/src/test/configurations/dynamodb-to-alternator-assume-role.yaml b/tests/src/test/configurations/dynamodb-to-alternator-assume-role.yaml deleted file mode 100644 index 57323aa6..00000000 --- a/tests/src/test/configurations/dynamodb-to-alternator-assume-role.yaml +++ /dev/null @@ -1,25 +0,0 @@ -source: - type: dynamodb - table: AssumeRoleTest - region: us-west-1 - credentials: - accessKey: "{AWS_ACCESS_KEY}" - secretKey: "{AWS_SECRET_KEY}" - assumeRole: - arn: "{AWS_ROLE_ARN}" - -target: - type: dynamodb - table: AssumeRoleTest - region: dummy - endpoint: - host: http://scylla - port: 8000 - credentials: - accessKey: dummy - secretKey: dummy - streamChanges: false - -savepoints: - path: /app/savepoints - intervalSeconds: 300 diff --git a/tests/src/test/configurations/dynamodb-to-alternator-streaming-skip-snapshot.yaml b/tests/src/test/configurations/dynamodb-to-alternator-streaming-skip-snapshot.yaml index 73e3d7a2..6411d83e 100644 --- a/tests/src/test/configurations/dynamodb-to-alternator-streaming-skip-snapshot.yaml +++ b/tests/src/test/configurations/dynamodb-to-alternator-streaming-skip-snapshot.yaml @@ -1,12 +1,7 @@ source: type: dynamodb table: StreamedItemsSkipSnapshotTest - region: us-west-1 - credentials: - accessKey: "{AWS_ACCESS_KEY}" - secretKey: "{AWS_SECRET_KEY}" - assumeRole: - arn: "{AWS_ROLE_ARN}" + region: us-east-1 target: type: dynamodb diff --git a/tests/src/test/configurations/dynamodb-to-alternator-streaming.yaml b/tests/src/test/configurations/dynamodb-to-alternator-streaming.yaml index 45271c2a..75073817 100644 --- a/tests/src/test/configurations/dynamodb-to-alternator-streaming.yaml +++ b/tests/src/test/configurations/dynamodb-to-alternator-streaming.yaml @@ -1,12 +1,7 @@ source: type: dynamodb table: StreamedItemsTest - region: us-west-1 - credentials: - accessKey: "{AWS_ACCESS_KEY}" - secretKey: "{AWS_SECRET_KEY}" - assumeRole: - arn: "{AWS_ROLE_ARN}" + region: us-east-1 target: type: dynamodb diff --git a/tests/src/test/scala/com/scylladb/migrator/alternator/AssumeRoleTest.scala b/tests/src/test/scala/com/scylladb/migrator/alternator/AssumeRoleTest.scala deleted file mode 100644 index c0d01402..00000000 --- a/tests/src/test/scala/com/scylladb/migrator/alternator/AssumeRoleTest.scala +++ /dev/null @@ -1,30 +0,0 @@ -package com.scylladb.migrator.alternator - -import com.scylladb.migrator.SparkUtils.successfullyPerformMigration -import software.amazon.awssdk.services.dynamodb.model.{AttributeValue, PutItemRequest} - -import scala.jdk.CollectionConverters._ - -/** Basic migration that uses the real AWS DynamoDB as a source and AssumeRole for authentication */ -class AssumeRoleTest extends MigratorSuiteWithAWS { - - withTable("AssumeRoleTest").test("Read from source and write to target") { tableName => - val configFileName = "dynamodb-to-alternator-assume-role.yaml" - - setupConfigurationFile(configFileName) - - // Insert some items - val keys = Map("id" -> AttributeValue.fromS("12345")) - val attrs = Map("foo" -> AttributeValue.fromS("bar")) - val itemData = keys ++ attrs - sourceDDb().putItem(PutItemRequest.builder().tableName(tableName).item(itemData.asJava).build()) - - // Perform the migration - successfullyPerformMigration(configFileName) - - checkSchemaWasMigrated(tableName) - - checkItemWasMigrated(tableName, keys, itemData) - } - -} diff --git a/tests/src/test/scala/com/scylladb/migrator/alternator/MigratorSuite.scala b/tests/src/test/scala/com/scylladb/migrator/alternator/MigratorSuite.scala index e62f8a64..613522c6 100644 --- a/tests/src/test/scala/com/scylladb/migrator/alternator/MigratorSuite.scala +++ b/tests/src/test/scala/com/scylladb/migrator/alternator/MigratorSuite.scala @@ -2,17 +2,16 @@ package com.scylladb.migrator.alternator import com.scylladb.migrator.AWS import org.junit.experimental.categories.Category -import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, AwsSessionCredentials, StaticCredentialsProvider} +import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider} import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.dynamodb.DynamoDbClient import software.amazon.awssdk.services.dynamodb.model.{AttributeDefinition, AttributeValue, CreateTableRequest, DeleteTableRequest, DescribeTableRequest, GetItemRequest, GlobalSecondaryIndexDescription, KeySchemaElement, KeyType, LocalSecondaryIndexDescription, ProvisionedThroughput, ResourceNotFoundException, ScalarAttributeType} -import software.amazon.awssdk.services.sts.StsClient -import software.amazon.awssdk.services.sts.model.AssumeRoleRequest import java.net.URI -import java.nio.file.{Files, Paths} +import java.nio.file.Paths import scala.util.chaining._ import scala.jdk.CollectionConverters._ +import scala.sys.process.stringToProcess /** * Base class for implementing end-to-end tests. @@ -200,50 +199,20 @@ abstract class MigratorSuiteWithAWS extends MigratorSuite { private var client: DynamoDbClient = null def apply(): DynamoDbClient = client override def beforeAll(): Unit = { - val region = Region.US_WEST_1 // FIXME - val accessKey = sys.env("AWS_ACCESS_KEY") - val secretKey = sys.env("AWS_SECRET_KEY") - val roleArn = sys.env("AWS_ROLE_ARN") - val stsClient = - StsClient - .builder() - .credentialsProvider( - StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey)) - ) - .build() - val credentials = - stsClient - .assumeRole( - AssumeRoleRequest - .builder() - .roleArn(roleArn) - .roleSessionName("MigratorTest") - .build() - ) - .credentials() + // Provision the AWS credentials on the Spark nodes via a Docker volume + val localAwsCredentials = + Paths.get(sys.props("user.home"), ".aws", "credentials") + .toAbsolutePath + (s"cp ${localAwsCredentials} docker/aws-profile/credentials").!! + + val region = Region.of(sys.env("AWS_REGION")) client = DynamoDbClient .builder() .region(region) - .credentialsProvider( - StaticCredentialsProvider.create( - AwsSessionCredentials.create(credentials.accessKeyId, credentials.secretAccessKey, credentials.sessionToken) - ) - ) .build() } override def afterAll(): Unit = client.close() } - def setupConfigurationFile(configFileName: String): Unit = { - val configFilePath = Paths.get("src", "test", "configurations", configFileName) - val configFileContent = new String(Files.readAllBytes(configFilePath)) - val updatedConfigFileContent = - configFileContent - .replace("{AWS_ACCESS_KEY}", sys.env("AWS_ACCESS_KEY")) - .replace("{AWS_SECRET_KEY}", sys.env("AWS_SECRET_KEY")) - .replace("{AWS_ROLE_ARN}", sys.env("AWS_ROLE_ARN")) - Files.write(configFilePath, updatedConfigFileContent.getBytes) - } - } diff --git a/tests/src/test/scala/com/scylladb/migrator/alternator/StreamedItemsTest.scala b/tests/src/test/scala/com/scylladb/migrator/alternator/StreamedItemsTest.scala index b6fb63ed..be2a6145 100644 --- a/tests/src/test/scala/com/scylladb/migrator/alternator/StreamedItemsTest.scala +++ b/tests/src/test/scala/com/scylladb/migrator/alternator/StreamedItemsTest.scala @@ -14,7 +14,6 @@ class StreamedItemsTest extends MigratorSuiteWithAWS { withTable("StreamedItemsTest").test("Stream changes") { tableName => val configFileName = "dynamodb-to-alternator-streaming.yaml" - setupConfigurationFile(configFileName) // Populate the source table val keys1 = Map("id" -> AttributeValue.fromS("12345")) @@ -74,7 +73,6 @@ class StreamedItemsTest extends MigratorSuiteWithAWS { withTable("StreamedItemsSkipSnapshotTest").test("Stream changes but skip initial snapshot") { tableName => val configFileName = "dynamodb-to-alternator-streaming-skip-snapshot.yaml" - setupConfigurationFile(configFileName) // Populate the source table val keys1 = Map("id" -> AttributeValue.fromS("12345")) From babe65736007176e1b72109cd65ef3fbd8fd7597 Mon Sep 17 00:00:00 2001 From: Julien Richard-Foy Date: Fri, 30 Aug 2024 11:11:33 +0200 Subject: [PATCH 3/6] Fallback to the default credentials if no static credentials are provided on the source table --- .../writers/DynamoStreamReplication.scala | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala b/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala index 8b8edcd1..1a972851 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala @@ -63,16 +63,18 @@ object DynamoStreamReplication { case _ => None }, - kinesisCreds = src.credentials.map { - case AWSCredentials(accessKey, secretKey, maybeAssumeRole) => - val builder = - SparkAWSCredentials.builder - .basicCredentials(accessKey, secretKey) - for (assumeRole <- maybeAssumeRole) { - builder.stsCredentials(assumeRole.arn, assumeRole.getSessionName) - } - builder.build() - }.orNull + kinesisCreds = src.credentials + .map { + case AWSCredentials(accessKey, secretKey, maybeAssumeRole) => + val builder = + SparkAWSCredentials.builder + .basicCredentials(accessKey, secretKey) + for (assumeRole <- maybeAssumeRole) { + builder.stsCredentials(assumeRole.arn, assumeRole.getSessionName) + } + builder.build() + } + .getOrElse(SparkAWSCredentials.builder.build()) ).foreachRDD { msgs => val rdd = msgs .collect { case Some(item) => item: util.Map[String, AttributeValueV1] } From 1ab3a51d9e4896cb63a8cc16ba999f19a41636a6 Mon Sep 17 00:00:00 2001 From: Julien Richard-Foy Date: Fri, 30 Aug 2024 13:29:43 +0200 Subject: [PATCH 4/6] Make sure to terminate the application after we dumped the migration state, otherwise it never terminates, and properly clean-up the handlers. --- .../scylladb/migrator/SavepointsManager.scala | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/migrator/src/main/scala/com/scylladb/migrator/SavepointsManager.scala b/migrator/src/main/scala/com/scylladb/migrator/SavepointsManager.scala index 0a324c1b..0324be2f 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/SavepointsManager.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/SavepointsManager.scala @@ -28,6 +28,9 @@ abstract class SavepointsManager(migratorConfig: MigratorConfig) extends AutoClo val log = LogManager.getLogger(this.getClass.getName) private val scheduler = new ScheduledThreadPoolExecutor(1) + private var oldUsr2Handler: SignalHandler = _ + private var oldTermHandler: SignalHandler = _ + private var oldIntHandler: SignalHandler = _ createSavepointsDirectory() addUSR2Handler() @@ -50,13 +53,15 @@ abstract class SavepointsManager(migratorConfig: MigratorConfig) extends AutoClo "Installing SIGINT/TERM/USR2 handler. Send this to dump the current progress to a savepoint.") val handler = new SignalHandler { - override def handle(signal: Signal): Unit = + override def handle(signal: Signal): Unit = { dumpMigrationState(signal.toString) + sys.exit(0) + } } - Signal.handle(new Signal("USR2"), handler) - Signal.handle(new Signal("TERM"), handler) - Signal.handle(new Signal("INT"), handler) + oldUsr2Handler = Signal.handle(new Signal("USR2"), handler) + oldTermHandler = Signal.handle(new Signal("TERM"), handler) + oldIntHandler = Signal.handle(new Signal("INT"), handler) } private def startSavepointSchedule(): Unit = { @@ -99,8 +104,12 @@ abstract class SavepointsManager(migratorConfig: MigratorConfig) extends AutoClo /** * Stop the periodic creation of savepoints and release the associated resources. */ - final def close(): Unit = + final def close(): Unit = { scheduler.shutdown() + Signal.handle(new Signal("USR2"), oldUsr2Handler) + Signal.handle(new Signal("TERM"), oldTermHandler) + Signal.handle(new Signal("INT"), oldIntHandler) + } /** * Provide readable logs by describing which parts of the migration have been completed already. From b31e52f6946a166cfe13e3ad81a093656b118780 Mon Sep 17 00:00:00 2001 From: Julien Richard-Foy Date: Fri, 30 Aug 2024 13:35:16 +0200 Subject: [PATCH 5/6] Set up savepoints manager only during the writing process --- .../com/scylladb/migrator/Migrator.scala | 5 +--- .../scylladb/migrator/SavepointsManager.scala | 4 +-- .../alternator/AlternatorMigrator.scala | 26 +++++++++---------- .../DynamoDbSavepointsManager.scala | 22 +++++++++------- 4 files changed, 28 insertions(+), 29 deletions(-) diff --git a/migrator/src/main/scala/com/scylladb/migrator/Migrator.scala b/migrator/src/main/scala/com/scylladb/migrator/Migrator.scala index fbbca5e9..8829761f 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/Migrator.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/Migrator.scala @@ -47,10 +47,7 @@ object Migrator { case ( s3Source: SourceSettings.DynamoDBS3Export, alternatorTarget: TargetSettings.DynamoDB) => - AlternatorMigrator.migrateFromS3Export( - s3Source, - alternatorTarget, - migratorConfig.renamesMap) + AlternatorMigrator.migrateFromS3Export(s3Source, alternatorTarget, migratorConfig) case _ => sys.error("Unsupported combination of source and target.") } diff --git a/migrator/src/main/scala/com/scylladb/migrator/SavepointsManager.scala b/migrator/src/main/scala/com/scylladb/migrator/SavepointsManager.scala index 0324be2f..266ce572 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/SavepointsManager.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/SavepointsManager.scala @@ -61,7 +61,7 @@ abstract class SavepointsManager(migratorConfig: MigratorConfig) extends AutoClo oldUsr2Handler = Signal.handle(new Signal("USR2"), handler) oldTermHandler = Signal.handle(new Signal("TERM"), handler) - oldIntHandler = Signal.handle(new Signal("INT"), handler) + oldIntHandler = Signal.handle(new Signal("INT"), handler) } private def startSavepointSchedule(): Unit = { @@ -104,7 +104,7 @@ abstract class SavepointsManager(migratorConfig: MigratorConfig) extends AutoClo /** * Stop the periodic creation of savepoints and release the associated resources. */ - final def close(): Unit = { + def close(): Unit = { scheduler.shutdown() Signal.handle(new Signal("USR2"), oldUsr2Handler) Signal.handle(new Signal("TERM"), oldTermHandler) diff --git a/migrator/src/main/scala/com/scylladb/migrator/alternator/AlternatorMigrator.scala b/migrator/src/main/scala/com/scylladb/migrator/alternator/AlternatorMigrator.scala index 2526966b..800e846f 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/alternator/AlternatorMigrator.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/alternator/AlternatorMigrator.scala @@ -23,17 +23,13 @@ object AlternatorMigrator { migratorConfig: MigratorConfig)(implicit spark: SparkSession): Unit = { val (sourceRDD, sourceTableDesc) = readers.DynamoDB.readRDD(spark, source, migratorConfig.skipSegments) - val savepointsManager = - DynamoDbSavepointsManager.setup(migratorConfig, sourceRDD, spark.sparkContext) - Using.resource(savepointsManager) { _ => - val maybeStreamedSource = if (target.streamChanges) Some(source) else None - migrate(sourceRDD, sourceTableDesc, maybeStreamedSource, target, migratorConfig.renamesMap) - } + val maybeStreamedSource = if (target.streamChanges) Some(source) else None + migrate(sourceRDD, sourceTableDesc, maybeStreamedSource, target, migratorConfig) } def migrateFromS3Export(source: SourceSettings.DynamoDBS3Export, target: TargetSettings.DynamoDB, - renamesMap: Map[String, String])(implicit spark: SparkSession): Unit = { + migratorConfig: MigratorConfig)(implicit spark: SparkSession): Unit = { val (sourceRDD, sourceTableDesc) = readers.DynamoDBS3Export.readRDD(source)(spark.sparkContext) // Adapt the decoded items to the format expected by the EMR Hadoop connector val normalizedRDD = @@ -43,7 +39,7 @@ object AlternatorMigrator { if (target.streamChanges) { log.warn("'streamChanges: true' is not supported when the source is a DynamoDB S3 export.") } - migrate(normalizedRDD, sourceTableDesc, None, target, renamesMap) + migrate(normalizedRDD, sourceTableDesc, None, target, migratorConfig) } /** @@ -51,19 +47,17 @@ object AlternatorMigrator { * @param sourceTableDesc Description of the table to replicate on the target database * @param maybeStreamedSource Settings of the source table in case `streamChanges` was `true` * @param target Target table settings - * @param renamesMap Renames + * @param migratorConfig The complete original configuration * @param spark Spark session */ def migrate(sourceRDD: RDD[(Text, DynamoDBItemWritable)], sourceTableDesc: TableDescription, maybeStreamedSource: Option[SourceSettings.DynamoDB], target: TargetSettings.DynamoDB, - renamesMap: Map[String, String])(implicit spark: SparkSession): Unit = { + migratorConfig: MigratorConfig)(implicit spark: SparkSession): Unit = { log.info("We need to transfer: " + sourceRDD.getNumPartitions + " partitions in total") - log.info("Starting write...") - try { val targetTableDesc = { for (streamedSource <- maybeStreamedSource) { @@ -81,7 +75,11 @@ object AlternatorMigrator { if (target.streamChanges && target.skipInitialSnapshotTransfer.contains(true)) { log.info("Skip transferring table snapshot") } else { - writers.DynamoDB.writeRDD(target, renamesMap, sourceRDD, targetTableDesc) + Using.resource(DynamoDbSavepointsManager(migratorConfig, sourceRDD, spark.sparkContext)) { + _ => + log.info("Starting write...") + writers.DynamoDB.writeRDD(target, migratorConfig.renamesMap, sourceRDD, targetTableDesc) + } log.info("Done transferring table snapshot") } @@ -95,7 +93,7 @@ object AlternatorMigrator { streamedSource, target, targetTableDesc, - renamesMap) + migratorConfig.renamesMap) streamingContext.start() streamingContext.awaitTermination() diff --git a/migrator/src/main/scala/com/scylladb/migrator/alternator/DynamoDbSavepointsManager.scala b/migrator/src/main/scala/com/scylladb/migrator/alternator/DynamoDbSavepointsManager.scala index 78bd89a9..5a592699 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/alternator/DynamoDbSavepointsManager.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/alternator/DynamoDbSavepointsManager.scala @@ -17,7 +17,9 @@ import scala.util.{ Failure, Success, Try } * Manage DynamoDB-based migrations by tracking the migrated scan segments. */ class DynamoDbSavepointsManager(migratorConfig: MigratorConfig, - segmentsAccumulator: IntSetAccumulator) + segmentsAccumulator: IntSetAccumulator, + sparkTaskEndListener: SparkListener, + spark: SparkContext) extends SavepointsManager(migratorConfig) { def describeMigrationState(): String = @@ -26,25 +28,26 @@ class DynamoDbSavepointsManager(migratorConfig: MigratorConfig, def updateConfigWithMigrationState(): MigratorConfig = migratorConfig.copy(skipSegments = Some(segmentsAccumulator.value)) + override def close(): Unit = { + spark.removeSparkListener(sparkTaskEndListener) + super.close() + } + } object DynamoDbSavepointsManager { private val log = LogManager.getLogger(classOf[DynamoDbSavepointsManager]) - def apply(migratorConfig: MigratorConfig, - segmentsAccumulator: IntSetAccumulator): DynamoDbSavepointsManager = - new DynamoDbSavepointsManager(migratorConfig, segmentsAccumulator) - /** * Set up a savepoints manager that tracks the scan segments migrated from the source RDD. */ - def setup(migratorConfig: MigratorConfig, + def apply(migratorConfig: MigratorConfig, sourceRDD: RDD[(Text, DynamoDBItemWritable)], spark: SparkContext): DynamoDbSavepointsManager = { val segmentsAccumulator = IntSetAccumulator(migratorConfig.skipSegments.getOrElse(Set.empty)) - spark.addSparkListener(new SparkListener { + val sparkTaskEndListener = new SparkListener { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { val partitionId = taskEnd.taskInfo.partitionId log.debug(s"Migration of partition ${partitionId} ended: ${taskEnd.reason}.") @@ -60,8 +63,9 @@ object DynamoDbSavepointsManager { } } } - }) - DynamoDbSavepointsManager(migratorConfig, segmentsAccumulator) + } + spark.addSparkListener(sparkTaskEndListener) + new DynamoDbSavepointsManager(migratorConfig, segmentsAccumulator, sparkTaskEndListener, spark) } /** From 8bfe80d3d9f6904e7884a4d5f7a2254729112f85 Mon Sep 17 00:00:00 2001 From: Julien Richard-Foy Date: Fri, 30 Aug 2024 14:41:10 +0200 Subject: [PATCH 6/6] Increase timeout of long tests to avoid CI flakiness --- .../com/scylladb/migrator/alternator/SkippedSegmentsTest.scala | 3 +++ .../scala/com/scylladb/migrator/alternator/ValidatorTest.scala | 3 +++ .../scala/com/scylladb/migrator/scylla/ValidatorTest.scala | 3 +++ 3 files changed, 9 insertions(+) diff --git a/tests/src/test/scala/com/scylladb/migrator/alternator/SkippedSegmentsTest.scala b/tests/src/test/scala/com/scylladb/migrator/alternator/SkippedSegmentsTest.scala index 4d0aefcc..3fc193a2 100644 --- a/tests/src/test/scala/com/scylladb/migrator/alternator/SkippedSegmentsTest.scala +++ b/tests/src/test/scala/com/scylladb/migrator/alternator/SkippedSegmentsTest.scala @@ -4,11 +4,14 @@ import com.scylladb.migrator.SparkUtils.{submitSparkJob, successfullyPerformMigr import software.amazon.awssdk.services.dynamodb.model.{AttributeValue, PutItemRequest, ResourceNotFoundException, ScanRequest} import java.util.UUID +import scala.concurrent.duration.{Duration, DurationInt} import scala.jdk.CollectionConverters._ import scala.util.chaining._ class SkippedSegmentsTest extends MigratorSuiteWithDynamoDBLocal { + override val munitTimeout: Duration = 120.seconds + withTable("SkippedSegments").test("Run partial migrations") { tableName => // We rely on the fact that both config files have `scanSegments: 3` and // complementary `skipSegments` properties diff --git a/tests/src/test/scala/com/scylladb/migrator/alternator/ValidatorTest.scala b/tests/src/test/scala/com/scylladb/migrator/alternator/ValidatorTest.scala index 1f3619e7..5b72dc96 100644 --- a/tests/src/test/scala/com/scylladb/migrator/alternator/ValidatorTest.scala +++ b/tests/src/test/scala/com/scylladb/migrator/alternator/ValidatorTest.scala @@ -3,11 +3,14 @@ package com.scylladb.migrator.alternator import com.scylladb.migrator.SparkUtils.{submitSparkJob, successfullyPerformMigration} import software.amazon.awssdk.services.dynamodb.model.{AttributeAction, AttributeValue, AttributeValueUpdate, PutItemRequest, UpdateItemRequest} +import scala.concurrent.duration.{Duration, DurationInt} import scala.jdk.CollectionConverters._ import scala.util.chaining._ class ValidatorTest extends MigratorSuiteWithDynamoDBLocal { + override val munitTimeout: Duration = 120.seconds + withTable("BasicTest").test("Validate migration") { tableName => val configFile = "dynamodb-to-alternator-basic.yaml" diff --git a/tests/src/test/scala/com/scylladb/migrator/scylla/ValidatorTest.scala b/tests/src/test/scala/com/scylladb/migrator/scylla/ValidatorTest.scala index adf26452..ea1e8c17 100644 --- a/tests/src/test/scala/com/scylladb/migrator/scylla/ValidatorTest.scala +++ b/tests/src/test/scala/com/scylladb/migrator/scylla/ValidatorTest.scala @@ -5,11 +5,14 @@ import com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal import com.datastax.oss.driver.api.querybuilder.term.Term import com.scylladb.migrator.SparkUtils.{submitSparkJob, successfullyPerformMigration} +import scala.concurrent.duration.{Duration, DurationInt} import scala.jdk.CollectionConverters._ import scala.util.chaining.scalaUtilChainingOps class ValidatorTest extends MigratorSuite(sourcePort = 9043) { + override val munitTimeout: Duration = 120.seconds + withTable("BasicTest").test("Validate migration") { tableName => val configFile = "cassandra-to-scylla-basic.yaml"