Skip to content

Commit

Permalink
Remove hardcoded checksum from copyObject
Browse files Browse the repository at this point in the history
  • Loading branch information
olivergrabinski committed Apr 22, 2024
1 parent 1e163ba commit e12b69d
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ trait S3StorageClient {
sourceBucket: BucketName,
sourceKey: FileKey,
destinationBucket: BucketName,
destinationKey: FileKey
destinationKey: FileKey,
checksumAlgorithm: ChecksumAlgorithm
): IO[CopyObjectResponse]

def uploadFile(
Expand Down Expand Up @@ -99,7 +100,8 @@ object S3StorageClient {
sourceBucket: BucketName,
sourceKey: FileKey,
destinationBucket: BucketName,
destinationKey: FileKey
destinationKey: FileKey,
checksumAlgorithm: ChecksumAlgorithm
): IO[CopyObjectResponse] =
client.copyObject(
CopyObjectRequest
Expand All @@ -108,7 +110,7 @@ object S3StorageClient {
.sourceKey(sourceKey.value.value)
.destinationBucket(destinationBucket.value.value)
.destinationKey(destinationKey.value.value)
.checksumAlgorithm(ChecksumAlgorithm.SHA256) // TODO: See what to do with this
.checksumAlgorithm(checksumAlgorithm)
.build()
)

Expand Down Expand Up @@ -201,7 +203,8 @@ object S3StorageClient {
sourceBucket: BucketName,
sourceKey: FileKey,
destinationBucket: BucketName,
destinationKey: FileKey
destinationKey: FileKey,
checksumAlgorithm: ChecksumAlgorithm
): IO[CopyObjectResponse] = raiseDisabledErr

override def objectExists(bucket: String, key: String): IO[Boolean] = raiseDisabledErr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.clie
import eu.timepit.refined.collection.NonEmpty
import eu.timepit.refined.refineV
import fs2.aws.s3.models.Models.{BucketName, FileKey}
import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm

trait FileCopier {

Expand All @@ -28,7 +29,8 @@ object FileCopier {
val fileKey = IO.fromEither(refineString(path.toString).map(FileKey))

fileKey.flatMap { key =>
s3StorageClient.copyObject(importBucket, key, targetBucket, key)
// TODO: Check if we only use SHA256 or not? If not we need to pass the right algo
s3StorageClient.copyObject(importBucket, key, targetBucket, key, ChecksumAlgorithm.SHA256)
}.void
}

Expand Down
83 changes: 42 additions & 41 deletions ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/RunShipSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label, ProjectR
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie
import ch.epfl.bluebrain.nexus.ship.ImportReport.Statistics
import ch.epfl.bluebrain.nexus.ship.RunShipSuite.{checkFor, expectedImportReport, getDistinctOrgProjects}
import ch.epfl.bluebrain.nexus.ship.RunShipSuite.{checkFor, expectedImportReport, getDistinctOrgProjects, noopS3Client}
import ch.epfl.bluebrain.nexus.ship.config.ShipConfigFixtures
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
import doobie.implicits._
import fs2.aws.s3.models.Models
import fs2.io.file.Path
import munit.AnyFixture
import software.amazon.awssdk.services.s3.model.{CopyObjectResponse, HeadObjectResponse, ListObjectsV2Response}
import software.amazon.awssdk.services.s3.model.{ChecksumAlgorithm, CopyObjectResponse, HeadObjectResponse, ListObjectsV2Response}

import java.time.Instant

Expand All @@ -37,45 +37,10 @@ class RunShipSuite extends NexusSuite with Doobie.Fixture with ShipConfigFixture
EventStreamer.localStreamer.stream(path, offset)
}

private val s3Client = new S3StorageClient {
override def listObjectsV2(bucket: String): IO[ListObjectsV2Response] =
IO.raiseError(new NotImplementedError("listObjectsV2 is not implemented"))

override def listObjectsV2(bucket: Models.BucketName, prefix: String): IO[ListObjectsV2Response] =
IO.raiseError(new NotImplementedError("listObjectsV2 is not implemented"))

override def readFile(bucket: Models.BucketName, fileKey: Models.FileKey): fs2.Stream[IO, Byte] =
fs2.Stream.empty

override def headObject(bucket: String, key: String): IO[HeadObjectResponse] =
IO.raiseError(new NotImplementedError("headObject is not implemented"))

override def copyObject(
sourceBucket: Models.BucketName,
sourceKey: Models.FileKey,
destinationBucket: Models.BucketName,
destinationKey: Models.FileKey
): IO[CopyObjectResponse] =
IO.raiseError(new NotImplementedError("copyObject is not implemented"))

override def baseEndpoint: Uri = Uri.apply("http://localhost:4566")

override def uploadFile(
fileData: fs2.Stream[IO, Byte],
bucket: String,
key: String,
algorithm: DigestAlgorithm
): IO[S3StorageClient.UploadMetadata] =
IO.raiseError(new NotImplementedError("uploadFile is not implemented"))

override def objectExists(bucket: String, key: String): IO[Boolean] =
IO.raiseError(new NotImplementedError("objectExists is not implemented"))
}

test("Run import by providing the path to a file") {
for {
events <- eventsStream("import/import.json")
_ <- RunShip(events, s3Client, inputConfig, xas).assertEquals(expectedImportReport)
_ <- RunShip(events, noopS3Client, inputConfig, xas).assertEquals(expectedImportReport)
_ <- checkFor("elasticsearch", nxv + "defaultElasticSearchIndex", xas).assertEquals(1)
_ <- checkFor("blazegraph", nxv + "defaultSparqlIndex", xas).assertEquals(1)
_ <- checkFor("storage", nxv + "defaultS3Storage", xas).assertEquals(1)
Expand All @@ -85,15 +50,15 @@ class RunShipSuite extends NexusSuite with Doobie.Fixture with ShipConfigFixture
test("Run import by providing the path to a directory") {
for {
events <- eventsStream("import/multi-part-import")
_ <- RunShip(events, s3Client, inputConfig, xas).assertEquals(expectedImportReport)
_ <- RunShip(events, noopS3Client, inputConfig, xas).assertEquals(expectedImportReport)
} yield ()
}

test("Test the increment") {
val start = Offset.at(2)
for {
events <- eventsStream("import/two-projects.json", offset = start)
_ <- RunShip(events, s3Client, inputConfig, xas).map { report =>
_ <- RunShip(events, noopS3Client, inputConfig, xas).map { report =>
assert(report.offset == Offset.at(2L))
assert(thereIsOneProjectEventIn(report))
}
Expand All @@ -108,7 +73,7 @@ class RunShipSuite extends NexusSuite with Doobie.Fixture with ShipConfigFixture
)
for {
events <- eventsStream("import/import.json")
_ <- RunShip(events, s3Client, configWithProjectMapping, xas)
_ <- RunShip(events, noopS3Client, configWithProjectMapping, xas)
_ <- getDistinctOrgProjects(xas).map { project =>
assertEquals(project, target)
}
Expand Down Expand Up @@ -136,6 +101,42 @@ object RunShipSuite {
| AND id = ${id.toString}
""".stripMargin.query[Int].unique.transact(xas.read)

private val noopS3Client = new S3StorageClient {
override def listObjectsV2(bucket: String): IO[ListObjectsV2Response] =
IO.raiseError(new NotImplementedError("listObjectsV2 is not implemented"))

override def listObjectsV2(bucket: Models.BucketName, prefix: String): IO[ListObjectsV2Response] =
IO.raiseError(new NotImplementedError("listObjectsV2 is not implemented"))

override def readFile(bucket: Models.BucketName, fileKey: Models.FileKey): fs2.Stream[IO, Byte] =
fs2.Stream.empty

override def headObject(bucket: String, key: String): IO[HeadObjectResponse] =
IO.raiseError(new NotImplementedError("headObject is not implemented"))

override def copyObject(
sourceBucket: Models.BucketName,
sourceKey: Models.FileKey,
destinationBucket: Models.BucketName,
destinationKey: Models.FileKey,
checksumAlgorithm: ChecksumAlgorithm
): IO[CopyObjectResponse] =
IO.raiseError(new NotImplementedError("copyObject is not implemented"))

override def baseEndpoint: Uri = Uri.apply("http://localhost:4566")

override def uploadFile(
fileData: fs2.Stream[IO, Byte],
bucket: String,
key: String,
algorithm: DigestAlgorithm
): IO[S3StorageClient.UploadMetadata] =
IO.raiseError(new NotImplementedError("uploadFile is not implemented"))

override def objectExists(bucket: String, key: String): IO[Boolean] =
IO.raiseError(new NotImplementedError("objectExists is not implemented"))
}

// The expected import report for the import.json file, as well as for the /import/multi-part-import directory
val expectedImportReport: ImportReport = ImportReport(
Offset.at(9999999L),
Expand Down

0 comments on commit e12b69d

Please sign in to comment.