diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/StoragesConfig.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/StoragesConfig.scala index 653d880391..c895a94a62 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/StoragesConfig.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/StoragesConfig.scala @@ -10,10 +10,11 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri import ch.epfl.bluebrain.nexus.delta.sdk.model.search.PaginationConfig import ch.epfl.bluebrain.nexus.delta.sdk.permissions.model.Permission import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig +import pureconfig.ConfigReader.Result import pureconfig.ConvertHelpers.{catchReadError, optF} import pureconfig.error.{CannotConvert, ConfigReaderFailures, ConvertFailure, FailureReason} import pureconfig.generic.auto._ -import pureconfig.{ConfigConvert, ConfigReader} +import pureconfig.{ConfigConvert, ConfigObjectCursor, ConfigReader} import scala.annotation.nowarn import scala.concurrent.duration.FiniteDuration @@ -80,6 +81,29 @@ object StoragesConfig { val description: String = s"'allowed-volumes' must contain at least '$defaultVolume' (default-volume)" } + final case class DigestNotSupportedOnS3(digestAlgorithm: DigestAlgorithm) extends FailureReason { + val description: String = s"Digest algorithm '${digestAlgorithm.value}' is not supported on S3" + } + + private def assertValidS3Algorithm( + digestAlgorithm: DigestAlgorithm, + amazonCursor: ConfigObjectCursor + ): Result[Unit] = { + digestAlgorithm.value match { + case "SHA-256" | "SHA-1" | "MD5" => Right(()) + case _ => + Left( + ConfigReaderFailures( + ConvertFailure( + DigestNotSupportedOnS3(digestAlgorithm), + None, + amazonCursor.atKeyOrUndefined("digest-algorithm").path + ) + ) + ) + } + } + implicit val storageTypeConfigReader: ConfigReader[StorageTypeConfig] = ConfigReader.fromCursor { cursor => for { obj <- cursor.asObjectCursor @@ -96,6 +120,7 @@ object StoragesConfig { amazonEnabledCursor <- amazonCursor.atKey("enabled") amazonEnabled <- amazonEnabledCursor.asBoolean amazon <- ConfigReader[S3StorageConfig].from(amazonCursor) + _ <- assertValidS3Algorithm(amazon.digestAlgorithm, amazonCursor) remoteCursor <- obj.atKeyOrUndefined("remote-disk").asObjectCursor remoteEnabledCursor <- remoteCursor.atKey("enabled") remoteEnabled <- remoteEnabledCursor.asBoolean diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala index a6135b73a4..744d007b20 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala @@ -15,6 +15,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgori import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.S3Storage import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.FileOperations.intermediateFolders import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.SaveFileRejection._ +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.S3StorageSaveFile.PutObjectRequestOps import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient import ch.epfl.bluebrain.nexus.delta.rdf.syntax.uriSyntax import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter @@ -124,27 +125,25 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit private def uploadFilePipe(bucket: String, key: String, algorithm: DigestAlgorithm): Pipe[IO, Byte, String] = { in => fs2.Stream.eval { in.compile.to(Chunk).flatMap { chunks => - val bs = chunks.toByteBuffer - val request = PutObjectRequest - .builder() - .bucket(bucket) - .key(key) - + val bs = chunks.toByteBuffer for { - fullRequest <- setAlgorithm(request, algorithm) - response <- s3.putObject( - fullRequest - .build(), - AsyncRequestBody.fromByteBuffer(bs) - ) + response <- s3.putObject( + PutObjectRequest + .builder() + .bucket(bucket) + .deltaDigest(algorithm) + .key(key) + .build(), + AsyncRequestBody.fromByteBuffer(bs) + ) } yield { - parseResponse(response, algorithm) + checksumFromResponse(response, algorithm) } } } } - private def parseResponse(response: PutObjectResponse, algorithm: DigestAlgorithm): String = { + private def checksumFromResponse(response: PutObjectResponse, algorithm: DigestAlgorithm): String = { algorithm.value match { case "MD5" => response.eTag().stripPrefix("\"").stripSuffix("\"") case "SHA-256" => Hex.encodeHexString(Base64.getDecoder.decode(response.checksumSHA256())) @@ -153,20 +152,21 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit } } - private def setAlgorithm( - request: PutObjectRequest.Builder, - algorithm: DigestAlgorithm - ): IO[PutObjectRequest.Builder] = - algorithm.value match { - case "MD5" => IO.pure(request) - case "SHA-256" => IO.delay(request.checksumAlgorithm(ChecksumAlgorithm.SHA256)) - case "SHA-1" => IO.delay(request.checksumAlgorithm(ChecksumAlgorithm.SHA1)) - case _ => IO.raiseError(new IllegalArgumentException(s"Unsupported algorithm for S3: ${algorithm.value}")) - } - private def getFileAttributes(bucket: String, key: String): IO[GetObjectAttributesResponse] = s3StorageClient.getFileAttributes(bucket, key) private def log(bucket: String, key: String, msg: String): IO[Unit] = logger.info(s"Bucket: ${bucket}. Key: $key. $msg") } + +object S3StorageSaveFile { + implicit class PutObjectRequestOps(request: PutObjectRequest.Builder) { + def deltaDigest(algorithm: DigestAlgorithm): PutObjectRequest.Builder = + algorithm.value match { + case "MD5" => request + case "SHA-256" => request.checksumAlgorithm(ChecksumAlgorithm.SHA256) + case "SHA-1" => request.checksumAlgorithm(ChecksumAlgorithm.SHA1) + case _ => throw new IllegalArgumentException(s"Unsupported algorithm for S3: ${algorithm.value}") + } + } +}