diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/StoragesConfig.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/StoragesConfig.scala index 8f7b7cc6ba..653d880391 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/StoragesConfig.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/StoragesConfig.scala @@ -168,6 +168,7 @@ object StoragesConfig { * the default maximum allowed file size (in bytes) for uploaded files */ final case class S3StorageConfig( + digestAlgorithm: DigestAlgorithm, defaultEndpoint: Uri, defaultAccessKey: Secret[String], defaultSecretKey: Secret[String], diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/StorageFields.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/StorageFields.scala index bb92c47fc8..3c94ce2d6a 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/StorageFields.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/StorageFields.scala @@ -151,6 +151,7 @@ object StorageFields { name, description, default, + cfg.digestAlgorithm, bucket, readPermission.getOrElse(cfg.defaultReadPermission), writePermission.getOrElse(cfg.defaultWritePermission), diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/StorageValue.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/StorageValue.scala index 72c00c09de..7fe2e16ed6 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/StorageValue.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/StorageValue.scala @@ -120,14 +120,14 @@ object StorageValue { name: Option[String], description: Option[String], default: Boolean, + algorithm: DigestAlgorithm, bucket: String, readPermission: Permission, writePermission: Permission, maxFileSize: Long ) extends StorageValue { - override val tpe: StorageType = StorageType.S3Storage - override val algorithm: DigestAlgorithm = DigestAlgorithm.MD5 + override val tpe: StorageType = StorageType.S3Storage } object S3StorageValue { @@ -138,6 +138,7 @@ object StorageValue { */ def apply( default: Boolean, + algorithm: DigestAlgorithm, bucket: String, readPermission: Permission, writePermission: Permission, @@ -147,6 +148,7 @@ object StorageValue { None, None, default, + algorithm, bucket, readPermission, writePermission, diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala index 254742a0e2..a6135b73a4 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala @@ -19,10 +19,11 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.clie import ch.epfl.bluebrain.nexus.delta.rdf.syntax.uriSyntax import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter import fs2.{Chunk, Pipe, Stream} +import org.apache.commons.codec.binary.Hex import software.amazon.awssdk.core.async.AsyncRequestBody import software.amazon.awssdk.services.s3.model._ -import java.util.UUID +import java.util.{Base64, UUID} final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit as: ActorSystem, @@ -41,20 +42,26 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit for { uuid <- uuidf() path = Uri.Path(intermediateFolders(storage.project, uuid, filename)) - result <- storeFile(storage.value.bucket, path.toString(), uuid, entity) + result <- storeFile(storage.value.bucket, path.toString(), uuid, entity, storage.value.algorithm) } yield result } - private def storeFile(bucket: String, key: String, uuid: UUID, entity: BodyPartEntity): IO[FileStorageMetadata] = { + private def storeFile( + bucket: String, + key: String, + uuid: UUID, + entity: BodyPartEntity, + algorithm: DigestAlgorithm + ): IO[FileStorageMetadata] = { val fileData: Stream[IO, Byte] = convertStream(entity.dataBytes) (for { - _ <- log(bucket, key, s"Checking for object existence") - _ <- validateObjectDoesNotExist(bucket, key) - _ <- log(bucket, key, s"Beginning upload") - (md5, fileSize) <- uploadFile(fileData, bucket, key) - _ <- log(bucket, key, s"Finished upload. MD5: $md5") - attr <- fileMetadata(bucket, key, uuid, fileSize, md5) + _ <- log(bucket, key, s"Checking for object existence") + _ <- validateObjectDoesNotExist(bucket, key) + _ <- log(bucket, key, s"Beginning upload") + (digest, fileSize) <- uploadFile(fileData, bucket, key, algorithm) + _ <- log(bucket, key, s"Finished upload. Digest: $digest") + attr <- fileMetadata(bucket, key, uuid, fileSize, algorithm, digest) } yield attr) .onError(e => logger.error(e)("Unexpected error when storing file")) .adaptError { err => UnexpectedSaveError(key, err.getMessage) } @@ -65,13 +72,14 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit key: String, uuid: UUID, fileSize: Long, - md5: String + algorithm: DigestAlgorithm, + digest: String ): IO[FileStorageMetadata] = s3StorageClient.baseEndpoint.map { base => FileStorageMetadata( uuid = uuid, bytes = fileSize, - digest = Digest.ComputedDigest(DigestAlgorithm.MD5, md5), + digest = Digest.ComputedDigest(algorithm, digest), origin = Client, location = base / bucket / Uri.Path(key), path = Uri.Path(key) @@ -94,38 +102,68 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit .mapMaterializedValue(_ => NotUsed) ) - private def uploadFile(fileData: Stream[IO, Byte], bucket: String, key: String): IO[(String, Long)] = { + private def uploadFile( + fileData: Stream[IO, Byte], + bucket: String, + key: String, + algorithm: DigestAlgorithm + ): IO[(String, Long)] = { for { fileSizeAcc <- Ref.of[IO, Long](0L) - md5 <- fileData + digest <- fileData .evalTap(_ => fileSizeAcc.update(_ + 1)) .through( - uploadFilePipe(bucket, key) + uploadFilePipe(bucket, key, algorithm) ) .compile .onlyOrError fileSize <- fileSizeAcc.get - } yield (md5, fileSize) + } yield (digest, fileSize) } - private def uploadFilePipe(bucket: String, key: String): Pipe[IO, Byte, String] = { in => + private def uploadFilePipe(bucket: String, key: String, algorithm: DigestAlgorithm): Pipe[IO, Byte, String] = { in => fs2.Stream.eval { in.compile.to(Chunk).flatMap { chunks => - val bs = chunks.toByteBuffer - s3.putObject( - PutObjectRequest - .builder() - .bucket(bucket) - .key(key) - .build(), - AsyncRequestBody.fromByteBuffer(bs) - ).map { response => - response.eTag().filter(_ != '"') + val bs = chunks.toByteBuffer + val request = PutObjectRequest + .builder() + .bucket(bucket) + .key(key) + + for { + fullRequest <- setAlgorithm(request, algorithm) + response <- s3.putObject( + fullRequest + .build(), + AsyncRequestBody.fromByteBuffer(bs) + ) + } yield { + parseResponse(response, algorithm) } } } } + private def parseResponse(response: PutObjectResponse, algorithm: DigestAlgorithm): String = { + algorithm.value match { + case "MD5" => response.eTag().stripPrefix("\"").stripSuffix("\"") + case "SHA-256" => Hex.encodeHexString(Base64.getDecoder.decode(response.checksumSHA256())) + case "SHA-1" => Hex.encodeHexString(Base64.getDecoder.decode(response.checksumSHA1())) + case _ => throw new IllegalArgumentException(s"Unsupported algorithm for S3: ${algorithm.value}") + } + } + + private def setAlgorithm( + request: PutObjectRequest.Builder, + algorithm: DigestAlgorithm + ): IO[PutObjectRequest.Builder] = + algorithm.value match { + case "MD5" => IO.pure(request) + case "SHA-256" => IO.delay(request.checksumAlgorithm(ChecksumAlgorithm.SHA256)) + case "SHA-1" => IO.delay(request.checksumAlgorithm(ChecksumAlgorithm.SHA1)) + case _ => IO.raiseError(new IllegalArgumentException(s"Unsupported algorithm for S3: ${algorithm.value}")) + } + private def getFileAttributes(bucket: String, key: String): IO[GetObjectAttributesResponse] = s3StorageClient.getFileAttributes(bucket, key) diff --git a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/StorageFixtures.scala b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/StorageFixtures.scala index 9531a0c9f1..d6eb41e1aa 100644 --- a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/StorageFixtures.scala +++ b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/StorageFixtures.scala @@ -32,7 +32,7 @@ trait StorageFixtures extends CirceLiteral { // format: off implicit val config: StorageTypeConfig = StorageTypeConfig( disk = DiskStorageConfig(diskVolume, Set(diskVolume,tmpVolume), DigestAlgorithm.default, permissions.read, permissions.write, showLocation = false, 50), - amazon = Some(S3StorageConfig("localhost", Secret(MinioDocker.RootUser), Secret(MinioDocker.RootPassword), + amazon = Some(S3StorageConfig(DigestAlgorithm.default, "localhost", Secret(MinioDocker.RootUser), Secret(MinioDocker.RootPassword), permissions.read, permissions.write, showLocation = false, 60)), remoteDisk = Some(RemoteDiskStorageConfig(DigestAlgorithm.default, BaseUri("http://localhost", Label.unsafe("v1")), Anonymous, permissions.read, permissions.write, showLocation = false, 70, 50.millis)), ) diff --git a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/LocalStackS3StorageClient.scala b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/LocalStackS3StorageClient.scala index 9ce8e9ef5a..41778c02c3 100644 --- a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/LocalStackS3StorageClient.scala +++ b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/LocalStackS3StorageClient.scala @@ -4,6 +4,7 @@ import akka.http.scaladsl.model.Uri import cats.effect.{IO, Resource} import ch.epfl.bluebrain.nexus.delta.kernel.Secret import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.S3StorageConfig +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.permissions import ch.epfl.bluebrain.nexus.testkit.minio.LocalStackS3 @@ -34,6 +35,7 @@ object LocalStackS3StorageClient { val creds = localstack.staticCredentialsProvider.resolveCredentials() val (accessKey, secretKey) = (creds.accessKeyId(), creds.secretAccessKey()) val conf: S3StorageConfig = S3StorageConfig( + digestAlgorithm = DigestAlgorithm.default, defaultEndpoint = Uri(localstack.endpointOverride(LocalStackS3.ServiceType).toString), defaultAccessKey = Secret(accessKey), defaultSecretKey = Secret(secretKey), diff --git a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperationsSuite.scala b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperationsSuite.scala index e94509133c..9f76fd8b6a 100644 --- a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperationsSuite.scala +++ b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperationsSuite.scala @@ -43,8 +43,8 @@ class S3FileOperationsSuite private lazy val fileOps = S3FileOperations.mk(s3StorageClient) - private def md5Hash(content: String) = { - Hex.encodeHexString(DigestAlgorithm.MD5.digest.digest(content.getBytes(StandardCharsets.UTF_8))) + private def makeContentHash(algorithm: DigestAlgorithm, content: String) = { + Hex.encodeHexString(algorithm.digest.digest(content.getBytes(StandardCharsets.UTF_8))) } test("List objects in an existing bucket") { @@ -61,6 +61,7 @@ class S3FileOperationsSuite givenAnS3Bucket { bucket => val storageValue = S3StorageValue( default = false, + algorithm = DigestAlgorithm.default, bucket = bucket, readPermission = read, writePermission = write, @@ -73,12 +74,12 @@ class S3FileOperationsSuite val filename = "myfile.txt" val content = genString() - val hashOfContent = md5Hash(content) + val hashOfContent = makeContentHash(DigestAlgorithm.default, content) val entity = HttpEntity(content) val result = for { attr <- fileOps.save(storage, filename, entity) - _ = assertEquals(attr.digest, ComputedDigest(DigestAlgorithm.MD5, hashOfContent)) + _ = assertEquals(attr.digest, ComputedDigest(DigestAlgorithm.default, hashOfContent)) _ = assertEquals(attr.bytes, content.length.toLong) source <- fileOps.fetch(bucket, attr.path) } yield consume(source) @@ -86,4 +87,31 @@ class S3FileOperationsSuite assertIO(result, content) } } + + test("Use MD5 to calculate a checksum") { + givenAnS3Bucket { bucket => + val storageValue = S3StorageValue( + default = false, + algorithm = DigestAlgorithm.MD5, + bucket = bucket, + readPermission = read, + writePermission = write, + maxFileSize = 20 + ) + + val iri = iri"http://localhost/s3" + val project = ProjectRef.unsafe("org", "project") + val storage = S3Storage(iri, project, storageValue, Json.obj()) + + val filename = "myfile.txt" + val content = genString() + val hashOfContent = makeContentHash(DigestAlgorithm.MD5, content) + val entity = HttpEntity(content) + + for { + attr <- fileOps.save(storage, filename, entity) + _ = assertEquals(attr.digest, ComputedDigest(DigestAlgorithm.MD5, hashOfContent)) + } yield () + } + } } diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigFixtures.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigFixtures.scala index ff2c76c549..1589ab58e3 100644 --- a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigFixtures.scala +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigFixtures.scala @@ -3,6 +3,7 @@ package ch.epfl.bluebrain.nexus.ship.config import ch.epfl.bluebrain.nexus.delta.kernel.Secret import ch.epfl.bluebrain.nexus.delta.plugins.storage.files import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.S3StorageConfig +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.{permissions, StorageFixtures, StoragesConfig} import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.ServiceAccount import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, ServiceAccountConfig} @@ -32,6 +33,7 @@ trait ShipConfigFixtures extends ConfigFixtures with StorageFixtures with Classp private val amazonConfig: S3StorageConfig = S3StorageConfig( + DigestAlgorithm.default, "https://s3.us-east-1.amazonaws.com", Secret("my-key"), Secret("my-secret-key"),