diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala index 83315c06bb..94b02cdb7b 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala @@ -38,7 +38,8 @@ trait S3StorageClient { sourceBucket: BucketName, sourceKey: FileKey, destinationBucket: BucketName, - destinationKey: FileKey + destinationKey: FileKey, + checksumAlgorithm: ChecksumAlgorithm ): IO[CopyObjectResponse] def uploadFile( @@ -99,7 +100,8 @@ object S3StorageClient { sourceBucket: BucketName, sourceKey: FileKey, destinationBucket: BucketName, - destinationKey: FileKey + destinationKey: FileKey, + checksumAlgorithm: ChecksumAlgorithm ): IO[CopyObjectResponse] = client.copyObject( CopyObjectRequest @@ -108,7 +110,7 @@ object S3StorageClient { .sourceKey(sourceKey.value.value) .destinationBucket(destinationBucket.value.value) .destinationKey(destinationKey.value.value) - .checksumAlgorithm(ChecksumAlgorithm.SHA256) // TODO: See what to do with this + .checksumAlgorithm(checksumAlgorithm) .build() ) @@ -201,7 +203,8 @@ object S3StorageClient { sourceBucket: BucketName, sourceKey: FileKey, destinationBucket: BucketName, - destinationKey: FileKey + destinationKey: FileKey, + checksumAlgorithm: ChecksumAlgorithm ): IO[CopyObjectResponse] = raiseDisabledErr override def objectExists(bucket: String, key: String): IO[Boolean] = raiseDisabledErr diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/files/FileCopier.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/files/FileCopier.scala index 81e4724c34..b3cb640f58 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/files/FileCopier.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/files/FileCopier.scala @@ -7,6 +7,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.clie import eu.timepit.refined.collection.NonEmpty import eu.timepit.refined.refineV import fs2.aws.s3.models.Models.{BucketName, FileKey} +import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm trait FileCopier { @@ -28,7 +29,8 @@ object FileCopier { val fileKey = IO.fromEither(refineString(path.toString).map(FileKey)) fileKey.flatMap { key => - s3StorageClient.copyObject(importBucket, key, targetBucket, key) + // TODO: Check if we only use SHA256 or not? If not we need to pass the right algo + s3StorageClient.copyObject(importBucket, key, targetBucket, key, ChecksumAlgorithm.SHA256) }.void } diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/RunShipSuite.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/RunShipSuite.scala index 3ce5adafb1..4026b63f79 100644 --- a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/RunShipSuite.scala +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/RunShipSuite.scala @@ -14,14 +14,14 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label, ProjectR import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie import ch.epfl.bluebrain.nexus.ship.ImportReport.Statistics -import ch.epfl.bluebrain.nexus.ship.RunShipSuite.{checkFor, expectedImportReport, getDistinctOrgProjects} +import ch.epfl.bluebrain.nexus.ship.RunShipSuite.{checkFor, expectedImportReport, getDistinctOrgProjects, noopS3Client} import ch.epfl.bluebrain.nexus.ship.config.ShipConfigFixtures import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite import doobie.implicits._ import fs2.aws.s3.models.Models import fs2.io.file.Path import munit.AnyFixture -import software.amazon.awssdk.services.s3.model.{CopyObjectResponse, HeadObjectResponse, ListObjectsV2Response} +import software.amazon.awssdk.services.s3.model.{ChecksumAlgorithm, CopyObjectResponse, HeadObjectResponse, ListObjectsV2Response} import java.time.Instant @@ -37,45 +37,10 @@ class RunShipSuite extends NexusSuite with Doobie.Fixture with ShipConfigFixture EventStreamer.localStreamer.stream(path, offset) } - private val s3Client = new S3StorageClient { - override def listObjectsV2(bucket: String): IO[ListObjectsV2Response] = - IO.raiseError(new NotImplementedError("listObjectsV2 is not implemented")) - - override def listObjectsV2(bucket: Models.BucketName, prefix: String): IO[ListObjectsV2Response] = - IO.raiseError(new NotImplementedError("listObjectsV2 is not implemented")) - - override def readFile(bucket: Models.BucketName, fileKey: Models.FileKey): fs2.Stream[IO, Byte] = - fs2.Stream.empty - - override def headObject(bucket: String, key: String): IO[HeadObjectResponse] = - IO.raiseError(new NotImplementedError("headObject is not implemented")) - - override def copyObject( - sourceBucket: Models.BucketName, - sourceKey: Models.FileKey, - destinationBucket: Models.BucketName, - destinationKey: Models.FileKey - ): IO[CopyObjectResponse] = - IO.raiseError(new NotImplementedError("copyObject is not implemented")) - - override def baseEndpoint: Uri = Uri.apply("http://localhost:4566") - - override def uploadFile( - fileData: fs2.Stream[IO, Byte], - bucket: String, - key: String, - algorithm: DigestAlgorithm - ): IO[S3StorageClient.UploadMetadata] = - IO.raiseError(new NotImplementedError("uploadFile is not implemented")) - - override def objectExists(bucket: String, key: String): IO[Boolean] = - IO.raiseError(new NotImplementedError("objectExists is not implemented")) - } - test("Run import by providing the path to a file") { for { events <- eventsStream("import/import.json") - _ <- RunShip(events, s3Client, inputConfig, xas).assertEquals(expectedImportReport) + _ <- RunShip(events, noopS3Client, inputConfig, xas).assertEquals(expectedImportReport) _ <- checkFor("elasticsearch", nxv + "defaultElasticSearchIndex", xas).assertEquals(1) _ <- checkFor("blazegraph", nxv + "defaultSparqlIndex", xas).assertEquals(1) _ <- checkFor("storage", nxv + "defaultS3Storage", xas).assertEquals(1) @@ -85,7 +50,7 @@ class RunShipSuite extends NexusSuite with Doobie.Fixture with ShipConfigFixture test("Run import by providing the path to a directory") { for { events <- eventsStream("import/multi-part-import") - _ <- RunShip(events, s3Client, inputConfig, xas).assertEquals(expectedImportReport) + _ <- RunShip(events, noopS3Client, inputConfig, xas).assertEquals(expectedImportReport) } yield () } @@ -93,7 +58,7 @@ class RunShipSuite extends NexusSuite with Doobie.Fixture with ShipConfigFixture val start = Offset.at(2) for { events <- eventsStream("import/two-projects.json", offset = start) - _ <- RunShip(events, s3Client, inputConfig, xas).map { report => + _ <- RunShip(events, noopS3Client, inputConfig, xas).map { report => assert(report.offset == Offset.at(2L)) assert(thereIsOneProjectEventIn(report)) } @@ -108,7 +73,7 @@ class RunShipSuite extends NexusSuite with Doobie.Fixture with ShipConfigFixture ) for { events <- eventsStream("import/import.json") - _ <- RunShip(events, s3Client, configWithProjectMapping, xas) + _ <- RunShip(events, noopS3Client, configWithProjectMapping, xas) _ <- getDistinctOrgProjects(xas).map { project => assertEquals(project, target) } @@ -136,6 +101,42 @@ object RunShipSuite { | AND id = ${id.toString} """.stripMargin.query[Int].unique.transact(xas.read) + private val noopS3Client = new S3StorageClient { + override def listObjectsV2(bucket: String): IO[ListObjectsV2Response] = + IO.raiseError(new NotImplementedError("listObjectsV2 is not implemented")) + + override def listObjectsV2(bucket: Models.BucketName, prefix: String): IO[ListObjectsV2Response] = + IO.raiseError(new NotImplementedError("listObjectsV2 is not implemented")) + + override def readFile(bucket: Models.BucketName, fileKey: Models.FileKey): fs2.Stream[IO, Byte] = + fs2.Stream.empty + + override def headObject(bucket: String, key: String): IO[HeadObjectResponse] = + IO.raiseError(new NotImplementedError("headObject is not implemented")) + + override def copyObject( + sourceBucket: Models.BucketName, + sourceKey: Models.FileKey, + destinationBucket: Models.BucketName, + destinationKey: Models.FileKey, + checksumAlgorithm: ChecksumAlgorithm + ): IO[CopyObjectResponse] = + IO.raiseError(new NotImplementedError("copyObject is not implemented")) + + override def baseEndpoint: Uri = Uri.apply("http://localhost:4566") + + override def uploadFile( + fileData: fs2.Stream[IO, Byte], + bucket: String, + key: String, + algorithm: DigestAlgorithm + ): IO[S3StorageClient.UploadMetadata] = + IO.raiseError(new NotImplementedError("uploadFile is not implemented")) + + override def objectExists(bucket: String, key: String): IO[Boolean] = + IO.raiseError(new NotImplementedError("objectExists is not implemented")) + } + // The expected import report for the import.json file, as well as for the /import/multi-part-import directory val expectedImportReport: ImportReport = ImportReport( Offset.at(9999999L),