From 82238c6adf83717d95f3ef5eedc3cb937e8a2df0 Mon Sep 17 00:00:00 2001 From: Daniel Bell Date: Fri, 19 Apr 2024 12:03:17 +0100 Subject: [PATCH 1/3] Use S3 to generate checksum (#4873) * Use MD5 hash generated by S3 in S3 storage * Calculate content size from the stream without re-processing the entire stream * Tidy up * Use S3 sigests * Ensure S3 storage can't be configured with an invalid digest algorithm --- .../storage/storages/StoragesConfig.scala | 27 ++- .../storages/model/DigestAlgorithm.scala | 3 + .../storage/storages/model/StorageValue.scala | 1 - .../operations/s3/S3StorageSaveFile.scala | 206 +++++++++--------- .../s3/client/S3StorageClient.scala | 6 +- .../operations/s3/S3FileOperationsSuite.scala | 43 +++- 6 files changed, 181 insertions(+), 105 deletions(-) diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/StoragesConfig.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/StoragesConfig.scala index 653d880391..c895a94a62 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/StoragesConfig.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/StoragesConfig.scala @@ -10,10 +10,11 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri import ch.epfl.bluebrain.nexus.delta.sdk.model.search.PaginationConfig import ch.epfl.bluebrain.nexus.delta.sdk.permissions.model.Permission import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig +import pureconfig.ConfigReader.Result import pureconfig.ConvertHelpers.{catchReadError, optF} import pureconfig.error.{CannotConvert, ConfigReaderFailures, ConvertFailure, FailureReason} import pureconfig.generic.auto._ -import pureconfig.{ConfigConvert, ConfigReader} +import pureconfig.{ConfigConvert, ConfigObjectCursor, ConfigReader} import scala.annotation.nowarn import scala.concurrent.duration.FiniteDuration @@ -80,6 +81,29 @@ object StoragesConfig { val description: String = s"'allowed-volumes' must contain at least '$defaultVolume' (default-volume)" } + final case class DigestNotSupportedOnS3(digestAlgorithm: DigestAlgorithm) extends FailureReason { + val description: String = s"Digest algorithm '${digestAlgorithm.value}' is not supported on S3" + } + + private def assertValidS3Algorithm( + digestAlgorithm: DigestAlgorithm, + amazonCursor: ConfigObjectCursor + ): Result[Unit] = { + digestAlgorithm.value match { + case "SHA-256" | "SHA-1" | "MD5" => Right(()) + case _ => + Left( + ConfigReaderFailures( + ConvertFailure( + DigestNotSupportedOnS3(digestAlgorithm), + None, + amazonCursor.atKeyOrUndefined("digest-algorithm").path + ) + ) + ) + } + } + implicit val storageTypeConfigReader: ConfigReader[StorageTypeConfig] = ConfigReader.fromCursor { cursor => for { obj <- cursor.asObjectCursor @@ -96,6 +120,7 @@ object StoragesConfig { amazonEnabledCursor <- amazonCursor.atKey("enabled") amazonEnabled <- amazonEnabledCursor.asBoolean amazon <- ConfigReader[S3StorageConfig].from(amazonCursor) + _ <- assertValidS3Algorithm(amazon.digestAlgorithm, amazonCursor) remoteCursor <- obj.atKeyOrUndefined("remote-disk").asObjectCursor remoteEnabledCursor <- remoteCursor.atKey("enabled") remoteEnabled <- remoteEnabledCursor.asBoolean diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/DigestAlgorithm.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/DigestAlgorithm.scala index 63fd2cd9ce..6fecd47c49 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/DigestAlgorithm.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/DigestAlgorithm.scala @@ -29,6 +29,9 @@ object DigestAlgorithm { final val default: DigestAlgorithm = new DigestAlgorithm("SHA-256") + final val MD5: DigestAlgorithm = + new DigestAlgorithm("MD5") + /** * Safely construct an [[DigestAlgorithm]] */ diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/StorageValue.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/StorageValue.scala index 1e921f8284..7fe2e16ed6 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/StorageValue.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/StorageValue.scala @@ -128,7 +128,6 @@ object StorageValue { ) extends StorageValue { override val tpe: StorageType = StorageType.S3Storage - } object S3StorageValue { diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala index 474da84075..744d007b20 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala @@ -5,7 +5,7 @@ import akka.actor.ActorSystem import akka.http.scaladsl.model.{BodyPartEntity, Uri} import akka.stream.scaladsl.Source import akka.util.ByteString -import cats.effect.IO +import cats.effect.{IO, Ref} import cats.implicits._ import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF @@ -15,27 +15,24 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgori import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.S3Storage import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.FileOperations.intermediateFolders import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.SaveFileRejection._ +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.S3StorageSaveFile.PutObjectRequestOps import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient import ch.epfl.bluebrain.nexus.delta.rdf.syntax.uriSyntax import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter -import eu.timepit.refined.refineMV -import eu.timepit.refined.types.string.NonEmptyString -import fs2.Stream -import fs2.aws.s3.S3.MultipartETagValidation -import fs2.aws.s3.models.Models.{BucketName, ETag, FileKey, PartSizeMB} +import fs2.{Chunk, Pipe, Stream} +import org.apache.commons.codec.binary.Hex +import software.amazon.awssdk.core.async.AsyncRequestBody import software.amazon.awssdk.services.s3.model._ -import java.util.UUID +import java.util.{Base64, UUID} final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit as: ActorSystem, uuidf: UUIDF ) { - private val s3 = s3StorageClient.underlyingClient - private val multipartETagValidation = MultipartETagValidation.create[IO] - private val logger = Logger[S3StorageSaveFile] - private val partSizeMB: PartSizeMB = refineMV(5) + private val s3 = s3StorageClient.underlyingClient + private val logger = Logger[S3StorageSaveFile] def apply( storage: S3Storage, @@ -43,66 +40,53 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit entity: BodyPartEntity ): IO[FileStorageMetadata] = { - val bucket = BucketName(NonEmptyString.unsafeFrom(storage.value.bucket)) - - def storeFile(key: String, uuid: UUID, entity: BodyPartEntity): IO[FileStorageMetadata] = { - val fileData: Stream[IO, Byte] = convertStream(entity.dataBytes) - - (for { - _ <- log(key, s"Checking for object existence") - _ <- validateObjectDoesNotExist(bucket.value.value, key) - _ <- log(key, s"Beginning multipart upload") - maybeEtags <- uploadFileMultipart(fileData, bucket, key) - _ <- log(key, s"Finished multipart upload. Etag by part: $maybeEtags") - attr <- collectFileMetadata(fileData, key, uuid, maybeEtags) - } yield attr) - .onError(e => logger.error(e)("Unexpected error when storing file")) - .adaptError { err => UnexpectedSaveError(key, err.getMessage) } - } - - def collectFileMetadata( - bytes: Stream[IO, Byte], - key: String, - uuid: UUID, - maybeEtags: List[Option[ETag]] - ): IO[FileStorageMetadata] = - maybeEtags.sequence match { - case Some(onlyPartETag :: Nil) => - // TODO our tests expect specific values for digests and the only algorithm currently used is SHA-256. - // If we want to continue to check this, but allow for different algorithms, this needs to be abstracted - // in the tests and verified for specific file contents. - // The result will als depend on whether we use a multipart upload or a standard put object. - for { - _ <- log(key, s"Received ETag for single part upload: $onlyPartETag") - fileSize <- computeSize(bytes) - digest <- computeDigest(bytes, storage.storageValue.algorithm) - metadata <- fileMetadata(key, uuid, fileSize, digest) - } yield metadata - case Some(other) => raiseUnexpectedErr(key, s"S3 multipart upload returned multiple etags unexpectedly: $other") - case None => raiseUnexpectedErr(key, "S3 multipart upload was aborted because no data was received") - } - - def fileMetadata(key: String, uuid: UUID, fileSize: Long, digest: String) = - s3StorageClient.baseEndpoint.map { base => - FileStorageMetadata( - uuid = uuid, - bytes = fileSize, - digest = Digest.ComputedDigest(storage.value.algorithm, digest), - origin = Client, - location = base / bucket.value.value / Uri.Path(key), - path = Uri.Path(key) - ) - } - - def log(key: String, msg: String) = logger.info(s"Bucket: ${bucket.value}. Key: $key. $msg") - for { uuid <- uuidf() path = Uri.Path(intermediateFolders(storage.project, uuid, filename)) - result <- storeFile(path.toString(), uuid, entity) + result <- storeFile(storage.value.bucket, path.toString(), uuid, entity, storage.value.algorithm) } yield result } + private def storeFile( + bucket: String, + key: String, + uuid: UUID, + entity: BodyPartEntity, + algorithm: DigestAlgorithm + ): IO[FileStorageMetadata] = { + val fileData: Stream[IO, Byte] = convertStream(entity.dataBytes) + + (for { + _ <- log(bucket, key, s"Checking for object existence") + _ <- validateObjectDoesNotExist(bucket, key) + _ <- log(bucket, key, s"Beginning upload") + (digest, fileSize) <- uploadFile(fileData, bucket, key, algorithm) + _ <- log(bucket, key, s"Finished upload. Digest: $digest") + attr <- fileMetadata(bucket, key, uuid, fileSize, algorithm, digest) + } yield attr) + .onError(e => logger.error(e)("Unexpected error when storing file")) + .adaptError { err => UnexpectedSaveError(key, err.getMessage) } + } + + private def fileMetadata( + bucket: String, + key: String, + uuid: UUID, + fileSize: Long, + algorithm: DigestAlgorithm, + digest: String + ): IO[FileStorageMetadata] = + s3StorageClient.baseEndpoint.map { base => + FileStorageMetadata( + uuid = uuid, + bytes = fileSize, + digest = Digest.ComputedDigest(algorithm, digest), + origin = Client, + location = base / bucket / Uri.Path(key), + path = Uri.Path(key) + ) + } + private def validateObjectDoesNotExist(bucket: String, key: String) = getFileAttributes(bucket, key).redeemWith( { @@ -119,42 +103,70 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit .mapMaterializedValue(_ => NotUsed) ) - private def uploadFileMultipart(fileData: Stream[IO, Byte], bucket: BucketName, key: String): IO[List[Option[ETag]]] = - fileData - .through( - s3.uploadFileMultipart( - bucket, - FileKey(NonEmptyString.unsafeFrom(key)), - partSizeMB, - uploadEmptyFiles = true, - multipartETagValidation = multipartETagValidation.some - ) - ) - .compile - .to(List) + private def uploadFile( + fileData: Stream[IO, Byte], + bucket: String, + key: String, + algorithm: DigestAlgorithm + ): IO[(String, Long)] = { + for { + fileSizeAcc <- Ref.of[IO, Long](0L) + digest <- fileData + .evalTap(_ => fileSizeAcc.update(_ + 1)) + .through( + uploadFilePipe(bucket, key, algorithm) + ) + .compile + .onlyOrError + fileSize <- fileSizeAcc.get + } yield (digest, fileSize) + } + + private def uploadFilePipe(bucket: String, key: String, algorithm: DigestAlgorithm): Pipe[IO, Byte, String] = { in => + fs2.Stream.eval { + in.compile.to(Chunk).flatMap { chunks => + val bs = chunks.toByteBuffer + for { + response <- s3.putObject( + PutObjectRequest + .builder() + .bucket(bucket) + .deltaDigest(algorithm) + .key(key) + .build(), + AsyncRequestBody.fromByteBuffer(bs) + ) + } yield { + checksumFromResponse(response, algorithm) + } + } + } + } + + private def checksumFromResponse(response: PutObjectResponse, algorithm: DigestAlgorithm): String = { + algorithm.value match { + case "MD5" => response.eTag().stripPrefix("\"").stripSuffix("\"") + case "SHA-256" => Hex.encodeHexString(Base64.getDecoder.decode(response.checksumSHA256())) + case "SHA-1" => Hex.encodeHexString(Base64.getDecoder.decode(response.checksumSHA1())) + case _ => throw new IllegalArgumentException(s"Unsupported algorithm for S3: ${algorithm.value}") + } + } private def getFileAttributes(bucket: String, key: String): IO[GetObjectAttributesResponse] = s3StorageClient.getFileAttributes(bucket, key) - // TODO issue fetching attributes when tested against localstack, only after the object is saved - // Verify if it's the same for real S3. Error msg: 'Could not parse XML response.' - // For now we just compute it manually. - // private def getFileSize(key: String) = - // getFileAttributes(key).flatMap { attr => - // log(key, s"File attributes from S3: $attr").as(attr.objectSize()) - // } - private def computeSize(bytes: Stream[IO, Byte]): IO[Long] = bytes.fold(0L)((acc, _) => acc + 1).compile.lastOrError - - private def computeDigest(bytes: Stream[IO, Byte], algorithm: DigestAlgorithm): IO[String] = { - val digest = algorithm.digest - bytes.chunks - .evalMap(chunk => IO(digest.update(chunk.toArray))) - .compile - .last - .map { _ => - digest.digest().map("%02x".format(_)).mkString + private def log(bucket: String, key: String, msg: String): IO[Unit] = + logger.info(s"Bucket: ${bucket}. Key: $key. $msg") +} + +object S3StorageSaveFile { + implicit class PutObjectRequestOps(request: PutObjectRequest.Builder) { + def deltaDigest(algorithm: DigestAlgorithm): PutObjectRequest.Builder = + algorithm.value match { + case "MD5" => request + case "SHA-256" => request.checksumAlgorithm(ChecksumAlgorithm.SHA256) + case "SHA-1" => request.checksumAlgorithm(ChecksumAlgorithm.SHA1) + case _ => throw new IllegalArgumentException(s"Unsupported algorithm for S3: ${algorithm.value}") } } - - private def raiseUnexpectedErr[A](key: String, msg: String): IO[A] = IO.raiseError(UnexpectedSaveError(key, msg)) } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala index b6afad3bf1..75f1424876 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala @@ -28,7 +28,7 @@ trait S3StorageClient { def getFileAttributes(bucket: String, key: String): IO[GetObjectAttributesResponse] - def underlyingClient: S3[IO] + def underlyingClient: S3AsyncClientOp[IO] def baseEndpoint: IO[Uri] } @@ -80,7 +80,7 @@ object S3StorageClient { .build() ) - override def underlyingClient: S3[IO] = s3 + override def underlyingClient: S3AsyncClientOp[IO] = client override def baseEndpoint: IO[Uri] = IO.pure(baseEndpoint) } @@ -97,7 +97,7 @@ object S3StorageClient { override def getFileAttributes(bucket: String, key: String): IO[GetObjectAttributesResponse] = raiseDisabledErr - override def underlyingClient: S3[IO] = throw disabledErr + override def underlyingClient: S3AsyncClientOp[IO] = throw disabledErr override def baseEndpoint: IO[Uri] = raiseDisabledErr } diff --git a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperationsSuite.scala b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperationsSuite.scala index e5cf0a6e4c..9f76fd8b6a 100644 --- a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperationsSuite.scala +++ b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperationsSuite.scala @@ -3,6 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3 import akka.actor.ActorSystem import akka.http.scaladsl.model.HttpEntity import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.Digest.ComputedDigest import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.S3Storage import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageRejection.StorageNotAccessible @@ -18,7 +19,9 @@ import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite import io.circe.Json import io.laserdisc.pure.s3.tagless.S3AsyncClientOp import munit.AnyFixture +import org.apache.commons.codec.binary.Hex +import java.nio.charset.StandardCharsets import scala.concurrent.duration.{Duration, DurationInt} class S3FileOperationsSuite @@ -40,6 +43,10 @@ class S3FileOperationsSuite private lazy val fileOps = S3FileOperations.mk(s3StorageClient) + private def makeContentHash(algorithm: DigestAlgorithm, content: String) = { + Hex.encodeHexString(algorithm.digest.digest(content.getBytes(StandardCharsets.UTF_8))) + } + test("List objects in an existing bucket") { givenAnS3Bucket { bucket => fileOps.checkBucketExists(bucket) @@ -65,16 +72,46 @@ class S3FileOperationsSuite val project = ProjectRef.unsafe("org", "project") val storage = S3Storage(iri, project, storageValue, Json.obj()) - val filename = "myfile.txt" - val content = genString() - val entity = HttpEntity(content) + val filename = "myfile.txt" + val content = genString() + val hashOfContent = makeContentHash(DigestAlgorithm.default, content) + val entity = HttpEntity(content) val result = for { attr <- fileOps.save(storage, filename, entity) + _ = assertEquals(attr.digest, ComputedDigest(DigestAlgorithm.default, hashOfContent)) + _ = assertEquals(attr.bytes, content.length.toLong) source <- fileOps.fetch(bucket, attr.path) } yield consume(source) assertIO(result, content) } } + + test("Use MD5 to calculate a checksum") { + givenAnS3Bucket { bucket => + val storageValue = S3StorageValue( + default = false, + algorithm = DigestAlgorithm.MD5, + bucket = bucket, + readPermission = read, + writePermission = write, + maxFileSize = 20 + ) + + val iri = iri"http://localhost/s3" + val project = ProjectRef.unsafe("org", "project") + val storage = S3Storage(iri, project, storageValue, Json.obj()) + + val filename = "myfile.txt" + val content = genString() + val hashOfContent = makeContentHash(DigestAlgorithm.MD5, content) + val entity = HttpEntity(content) + + for { + attr <- fileOps.save(storage, filename, entity) + _ = assertEquals(attr.digest, ComputedDigest(DigestAlgorithm.MD5, hashOfContent)) + } yield () + } + } } From 265d07d1589462498bd2375494f2ab6572de12e1 Mon Sep 17 00:00:00 2001 From: dantb Date: Mon, 22 Apr 2024 11:24:17 +0200 Subject: [PATCH 2/3] Register S3 files in-place (#4878) * WIP - register S3 files in-place * Use head object request for S3 file metadata, verify in integration test * Update FileRoutesSpec test * Deal with uuid and checksum in mkS3Metadata * Fix import * Use fs2-aws client in S3 integration tests, enable checksum * Formatting --------- Co-authored-by: Oliver <20188437+olivergrabinski@users.noreply.github.com> --- build.sbt | 4 +- .../delta/plugins/storage/files/Files.scala | 27 ++++++ .../storage/files/model/FileAttributes.scala | 15 +-- .../storage/files/model/FileRejection.scala | 4 +- .../storage/files/routes/FilesRoutes.scala | 37 ++++++- .../storages/operations/FileOperations.scala | 11 ++- .../operations/StorageFileRejection.scala | 19 +++- .../operations/s3/S3FileOperations.scala | 58 ++++++++++- .../operations/s3/S3StorageSaveFile.scala | 42 ++++---- .../s3/client/S3StorageClient.scala | 26 ++--- .../files/mocks/FileOperationsMock.scala | 2 + .../files/routes/FilesRoutesSpec.scala | 4 +- .../kg/files/registration-metadata.json | 31 ++++++ .../nexus/tests/kg/files/S3StorageSpec.scala | 97 +++++++++++++++---- 14 files changed, 299 insertions(+), 78 deletions(-) create mode 100644 tests/src/test/resources/kg/files/registration-metadata.json diff --git a/build.sbt b/build.sbt index c62c0df1e5..ab30d7a8de 100755 --- a/build.sbt +++ b/build.sbt @@ -854,7 +854,9 @@ lazy val tests = project awsSdk % Test, scalaTest % Test, akkaSlf4j % Test, - alpakkaSse % Test + alpakkaSse % Test, + fs2Aws % Test, + fs2AwsS3 % Test ), scalacOptions ~= { options: Seq[String] => options.filterNot(Set("-Wunused:imports")) }, Test / parallelExecution := false, diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala index 9e04b9e26f..225b2aa2fb 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala @@ -218,6 +218,33 @@ final class Files( } yield res }.span("updateFileMetadata") + def registerFile( + id: FileId, + storageId: Option[IdSegment], + metadata: Option[FileCustomMetadata], + path: Uri.Path, + tag: Option[UserTag] + )(implicit caller: Caller): IO[FileResource] = { + for { + (iri, pc) <- id.expandIri(fetchContext.onCreate) + (storageRef, storage) <- fetchAndValidateActiveStorage(storageId, id.project, pc) + s3Metadata <- fileOperations.register(storage, path) + filename <- IO.fromOption(path.lastSegment)(InvalidFilePath) + attr = FileAttributes.from(FileDescription(filename, s3Metadata.contentType.some, metadata), s3Metadata.metadata) + res <- eval( + CreateFile( + iri, + id.project, + storageRef, + storage.tpe, + attr, + caller.subject, + tag + ) + ) + } yield res + }.span("registerFile") + /** * Update a new file linking it from an existing file in a storage * diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/model/FileAttributes.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/model/FileAttributes.scala index f25db277c4..399bad2d85 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/model/FileAttributes.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/model/FileAttributes.scala @@ -95,17 +95,20 @@ object FileAttributes { object FileAttributesOrigin { final case object Client extends FileAttributesOrigin - final case object Storage extends FileAttributesOrigin + final case object Storage extends FileAttributesOrigin + final case object External extends FileAttributesOrigin implicit val fileAttributesEncoder: Encoder[FileAttributesOrigin] = Encoder.encodeString.contramap { - case Client => "Client" - case Storage => "Storage" + case Client => "Client" + case Storage => "Storage" + case External => "External" } implicit val fileAttributesDecoder: Decoder[FileAttributesOrigin] = Decoder.decodeString.emap { - case "Client" => Right(Client) - case "Storage" => Right(Storage) - case str => Left(s"'$str' is not a 'FileAttributesOrigin'") + case "Client" => Right(Client) + case "Storage" => Right(Storage) + case "External" => Right(External) + case str => Left(s"'$str' is not a 'FileAttributesOrigin'") } } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/model/FileRejection.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/model/FileRejection.scala index 0eedccb8c1..bc12dcf47f 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/model/FileRejection.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/model/FileRejection.scala @@ -226,9 +226,9 @@ object FileRejection { * Rejection returned when attempting to link a file without providing a filename or a path that ends with a * filename. */ - final case object InvalidFileLink + final case object InvalidFilePath extends FileRejection( - s"Linking a file cannot be performed without a 'filename' or a 'path' that does not end with a filename." + s"Linking or registering a file cannot be performed without a 'filename' or a 'path' that does not end with a filename." ) final case class CopyRejection( diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/routes/FilesRoutes.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/routes/FilesRoutes.scala index 29a86ece58..9abf0a8975 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/routes/FilesRoutes.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/routes/FilesRoutes.scala @@ -11,7 +11,7 @@ import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection._ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model._ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.permissions.{read => Read, write => Write} -import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.routes.FilesRoutes.LinkFileRequest.{fileDescriptionFromRequest, linkFileDecoder} +import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.routes.FilesRoutes.LinkFileRequest.{fileDescriptionFromRequest, linkFileDecoder, regFileDecoder} import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.routes.FilesRoutes._ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.{schemas, FileResource, Files} import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.ShowFileLocation @@ -264,6 +264,32 @@ final class FilesRoutes( } } ) + }, + pathPrefix("register") { + (idSegment & indexingMode) { (id, mode) => + pathEndOrSingleSlash { + operationName(s"$prefixSegment/files/{org}/{project}/register/{id}") { + parameters("storage".as[IdSegment].?, "tag".as[UserTag].?) { (storage, tag) => + entity(as[RegisterFileRequest]) { registerRequest => + val fileId = FileId(id, project) + emit( + Created, + files + .registerFile( + fileId, + storage, + registerRequest.metadata, + registerRequest.path, + tag + ) + .index(mode) + .attemptNarrow[FileRejection] + ) + } + } + } + } + } } ) } @@ -329,15 +355,18 @@ object FilesRoutes { metadata: Option[FileCustomMetadata] ) + final case class RegisterFileRequest(path: Path, metadata: Option[FileCustomMetadata]) + object LinkFileRequest { @nowarn("cat=unused") - implicit private val config: Configuration = Configuration.default - implicit val linkFileDecoder: Decoder[LinkFileRequest] = deriveConfiguredDecoder[LinkFileRequest] + implicit private val config: Configuration = Configuration.default + implicit val linkFileDecoder: Decoder[LinkFileRequest] = deriveConfiguredDecoder[LinkFileRequest] + implicit val regFileDecoder: Decoder[RegisterFileRequest] = deriveConfiguredDecoder[RegisterFileRequest] def fileDescriptionFromRequest(f: LinkFileRequest): IO[FileDescription] = f.filename.orElse(f.path.lastSegment) match { case Some(value) => IO.pure(FileDescription(value, f.mediaType, f.metadata)) - case None => IO.raiseError(InvalidFileLink) + case None => IO.raiseError(InvalidFilePath) } } } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/FileOperations.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/FileOperations.scala index a18e6688b9..f687e13965 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/FileOperations.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/FileOperations.scala @@ -6,10 +6,11 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{ComputedFileAt import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.{DiskStorage, RemoteDiskStorage, S3Storage} import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageValue.{DiskStorageValue, RemoteDiskStorageValue, S3StorageValue} import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.{Storage, StorageValue} -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.{FetchAttributeRejection, MoveFileRejection} +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.{FetchAttributeRejection, MoveFileRejection, RegisterFileRejection} import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.disk.DiskFileOperations import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.RemoteDiskFileOperations import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.S3FileOperations +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.S3FileOperations.S3FileMetadata import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef @@ -22,6 +23,8 @@ trait FileOperations extends StorageAccess { def link(storage: Storage, sourcePath: Uri.Path, filename: String): IO[FileStorageMetadata] + def register(storage: Storage, path: Uri.Path): IO[S3FileMetadata] + def fetchAttributes(storage: Storage, attributes: FileAttributes): IO[ComputedFileAttributes] } @@ -62,6 +65,12 @@ object FileOperations { case s: RemoteDiskStorage => remoteDiskFileOps.fetchAttributes(s.value.folder, attributes.path) case s => IO.raiseError(FetchAttributeRejection.UnsupportedOperation(s.tpe)) } + + override def register(storage: Storage, path: Uri.Path): IO[S3FileMetadata] = + storage match { + case s: S3Storage => s3FileOps.register(s.value.bucket, path) + case s => IO.raiseError(RegisterFileRejection.UnsupportedOperation(s.tpe)) + } } /** diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/StorageFileRejection.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/StorageFileRejection.scala index 8e7036481a..c4758b9994 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/StorageFileRejection.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/StorageFileRejection.scala @@ -1,6 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations -import akka.http.scaladsl.model.StatusCodes +import akka.http.scaladsl.model.{StatusCodes, Uri} +import cats.data.NonEmptyList import ch.epfl.bluebrain.nexus.delta.kernel.error.Rejection import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageType import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri @@ -192,4 +193,20 @@ object StorageFileRejection { } + sealed abstract class RegisterFileRejection(loggedDetails: String) extends StorageFileRejection(loggedDetails) + + object RegisterFileRejection { + final case class MissingS3Attributes(missingAttributes: NonEmptyList[String]) + extends RegisterFileRejection(s"Missing attributes from S3: ${missingAttributes.toList.mkString(", ")}") + + final case class InvalidContentType(received: String) + extends RegisterFileRejection(s"Invalid content type returned from S3: $received") + + final case class InvalidPath(path: Uri.Path) + extends RegisterFileRejection(s"An S3 path must contain at least the filename. Path was $path") + + final case class UnsupportedOperation(tpe: StorageType) + extends MoveFileRejection(s"Registering a file in-place is not supported for storages of type '${tpe.iri}'") + } + } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperations.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperations.scala index 6881a341ab..bea15744ab 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperations.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperations.scala @@ -1,22 +1,31 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3 import akka.actor.ActorSystem -import akka.http.scaladsl.model.{BodyPartEntity, Uri} +import akka.http.scaladsl.model.{BodyPartEntity, ContentType, Uri} import akka.stream.scaladsl.Source import akka.util.ByteString import cats.effect.IO +import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF -import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileStorageMetadata +import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes.FileAttributesOrigin +import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{Digest, FileStorageMetadata} +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.S3Storage import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageRejection.StorageNotAccessible import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.FetchFileRejection.UnexpectedFetchError +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.RegisterFileRejection.InvalidContentType +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.S3FileOperations.S3FileMetadata import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient +import ch.epfl.bluebrain.nexus.delta.rdf.syntax.uriSyntax import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter +import org.apache.commons.codec.binary.Hex +import software.amazon.awssdk.services.s3.model.HeadObjectResponse import java.net.URLDecoder import java.nio.charset.StandardCharsets.UTF_8 +import java.util.Base64 import scala.concurrent.duration.DurationInt trait S3FileOperations { @@ -29,9 +38,12 @@ trait S3FileOperations { filename: String, entity: BodyPartEntity ): IO[FileStorageMetadata] + + def register(bucket: String, path: Uri.Path): IO[S3FileMetadata] } object S3FileOperations { + final case class S3FileMetadata(contentType: ContentType, metadata: FileStorageMetadata) def mk(client: S3StorageClient)(implicit as: ActorSystem, uuidf: UUIDF): S3FileOperations = new S3FileOperations { @@ -63,6 +75,48 @@ object S3FileOperations { override def save(storage: S3Storage, filename: String, entity: BodyPartEntity): IO[FileStorageMetadata] = saveFile.apply(storage, filename, entity) + + override def register(bucket: String, path: Uri.Path): IO[S3FileMetadata] = { + for { + _ <- log.info(s"Fetching attributes for S3 file. Bucket $bucket at path $path") + resp <- client.headObject(bucket, path.toString()) + contentType <- parseContentType(resp.contentType()) + metadata <- mkS3Metadata(bucket, path, resp, contentType) + } yield metadata + } + .onError { e => + log.error(e)(s"Failed fetching required attributes for S3 file registration. Bucket $bucket and path $path") + } + + private def parseContentType(raw: String): IO[ContentType] = + ContentType.parse(raw).map(_.pure[IO]).getOrElse(IO.raiseError(InvalidContentType(raw))) + + private def mkS3Metadata(bucket: String, path: Uri.Path, resp: HeadObjectResponse, ct: ContentType) = { + for { + uuid <- uuidf() + checksum <- checksumFrom(resp) + } yield S3FileMetadata( + ct, + FileStorageMetadata( + uuid, + resp.contentLength(), + checksum, + FileAttributesOrigin.External, + client.baseEndpoint / bucket / path, + path + ) + ) + } + + private def checksumFrom(response: HeadObjectResponse) = IO.fromOption { + Option(response.checksumSHA256()) + .map { checksum => + Digest.ComputedDigest( + DigestAlgorithm.default, + Hex.encodeHexString(Base64.getDecoder.decode(checksum)) + ) + } + }(new IllegalArgumentException("Missing checksum")) } } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala index 744d007b20..3fe7e41850 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala @@ -62,7 +62,7 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit _ <- log(bucket, key, s"Beginning upload") (digest, fileSize) <- uploadFile(fileData, bucket, key, algorithm) _ <- log(bucket, key, s"Finished upload. Digest: $digest") - attr <- fileMetadata(bucket, key, uuid, fileSize, algorithm, digest) + attr = fileMetadata(bucket, key, uuid, fileSize, algorithm, digest) } yield attr) .onError(e => logger.error(e)("Unexpected error when storing file")) .adaptError { err => UnexpectedSaveError(key, err.getMessage) } @@ -75,26 +75,27 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit fileSize: Long, algorithm: DigestAlgorithm, digest: String - ): IO[FileStorageMetadata] = - s3StorageClient.baseEndpoint.map { base => - FileStorageMetadata( - uuid = uuid, - bytes = fileSize, - digest = Digest.ComputedDigest(algorithm, digest), - origin = Client, - location = base / bucket / Uri.Path(key), - path = Uri.Path(key) - ) - } + ): FileStorageMetadata = + FileStorageMetadata( + uuid = uuid, + bytes = fileSize, + digest = Digest.ComputedDigest(algorithm, digest), + origin = Client, + location = s3StorageClient.baseEndpoint / bucket / Uri.Path(key), + path = Uri.Path(key) + ) private def validateObjectDoesNotExist(bucket: String, key: String) = - getFileAttributes(bucket, key).redeemWith( - { - case _: NoSuchKeyException => IO.unit - case e => IO.raiseError(e) - }, - _ => IO.raiseError(ResourceAlreadyExists(key)) - ) + s3StorageClient + .headObject(bucket, key) + .void + .redeemWith( + { + case _: NoSuchKeyException => IO.unit + case e => IO.raiseError(e) + }, + _ => IO.raiseError(ResourceAlreadyExists(key)) + ) private def convertStream(source: Source[ByteString, Any]): Stream[IO, Byte] = StreamConverter( @@ -152,9 +153,6 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit } } - private def getFileAttributes(bucket: String, key: String): IO[GetObjectAttributesResponse] = - s3StorageClient.getFileAttributes(bucket, key) - private def log(bucket: String, key: String, msg: String): IO[Unit] = logger.info(s"Bucket: ${bucket}. Key: $key. $msg") } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala index 75f1424876..da606a3cfe 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala @@ -12,7 +12,7 @@ import io.laserdisc.pure.s3.tagless.{Interpreter, S3AsyncClientOp} import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, AwsCredentialsProvider, StaticCredentialsProvider} import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.s3.S3AsyncClient -import software.amazon.awssdk.services.s3.model.{GetObjectAttributesRequest, GetObjectAttributesResponse, ListObjectsV2Request, ListObjectsV2Response, ObjectAttributes} +import software.amazon.awssdk.services.s3.model.{ChecksumMode, HeadObjectRequest, HeadObjectResponse, ListObjectsV2Request, ListObjectsV2Response} import java.net.URI @@ -26,11 +26,11 @@ trait S3StorageClient { def readFile(bucket: BucketName, fileKey: FileKey): Stream[IO, Byte] - def getFileAttributes(bucket: String, key: String): IO[GetObjectAttributesResponse] + def headObject(bucket: String, key: String): IO[HeadObjectResponse] def underlyingClient: S3AsyncClientOp[IO] - def baseEndpoint: IO[Uri] + def baseEndpoint: Uri } object S3StorageClient { @@ -57,7 +57,7 @@ object S3StorageClient { ) .map(new S3StorageClientImpl(_, endpoint.toString)) - final class S3StorageClientImpl(client: S3AsyncClientOp[IO], baseEndpoint: Uri) extends S3StorageClient { + final class S3StorageClientImpl(client: S3AsyncClientOp[IO], val baseEndpoint: Uri) extends S3StorageClient { private val s3: S3[IO] = S3.create(client) override def listObjectsV2(bucket: String): IO[ListObjectsV2Response] = @@ -69,20 +69,10 @@ object S3StorageClient { override def readFile(bucket: BucketName, fileKey: FileKey): Stream[IO, Byte] = s3.readFile(bucket, fileKey) - override def getFileAttributes(bucket: String, key: String): IO[GetObjectAttributesResponse] = - client - .getObjectAttributes( - GetObjectAttributesRequest - .builder() - .bucket(bucket) - .key(key) - .objectAttributes(ObjectAttributes.knownValues()) // TODO get all values - .build() - ) + override def headObject(bucket: String, key: String): IO[HeadObjectResponse] = + client.headObject(HeadObjectRequest.builder().bucket(bucket).key(key).checksumMode(ChecksumMode.ENABLED).build) override def underlyingClient: S3AsyncClientOp[IO] = client - - override def baseEndpoint: IO[Uri] = IO.pure(baseEndpoint) } final case object S3StorageClientDisabled extends S3StorageClient { @@ -95,10 +85,10 @@ object S3StorageClient { override def readFile(bucket: BucketName, fileKey: FileKey): Stream[IO, Byte] = Stream.raiseError[IO](disabledErr) - override def getFileAttributes(bucket: String, key: String): IO[GetObjectAttributesResponse] = raiseDisabledErr + override def headObject(bucket: String, key: String): IO[HeadObjectResponse] = raiseDisabledErr override def underlyingClient: S3AsyncClientOp[IO] = throw disabledErr - override def baseEndpoint: IO[Uri] = raiseDisabledErr + override def baseEndpoint: Uri = throw disabledErr } } diff --git a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/mocks/FileOperationsMock.scala b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/mocks/FileOperationsMock.scala index aaae15727c..860a3b163e 100644 --- a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/mocks/FileOperationsMock.scala +++ b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/mocks/FileOperationsMock.scala @@ -36,6 +36,7 @@ object FileOperationsMock { def link(storage: Storage, sourcePath: Uri.Path, filename: String): IO[FileStorageMetadata] = ??? def fetch(storage: Storage, attributes: FileAttributes): IO[AkkaSource] = ??? def fetchAttributes(storage: Storage, attributes: FileAttributes): IO[ComputedFileAttributes] = ??? + def register(storage: Storage, path: Uri.Path): IO[S3FileOperations.S3FileMetadata] = ??? } def diskUnimplemented: DiskFileOperations = new DiskFileOperations { @@ -48,5 +49,6 @@ object FileOperationsMock { def checkBucketExists(bucket: String): IO[Unit] = ??? def fetch(bucket: String, path: Uri.Path): IO[AkkaSource] = ??? def save(storage: Storage.S3Storage, filename: String, entity: BodyPartEntity): IO[FileStorageMetadata] = ??? + def register(bucket: String, path: Uri.Path): IO[S3FileOperations.S3FileMetadata] = ??? } } diff --git a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/routes/FilesRoutesSpec.scala b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/routes/FilesRoutesSpec.scala index af67235e8b..287fff1181 100644 --- a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/routes/FilesRoutesSpec.scala +++ b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/routes/FilesRoutesSpec.scala @@ -270,8 +270,8 @@ class FilesRoutesSpec json""" { "@context" : "https://bluebrain.github.io/nexus/contexts/error.json", - "@type" : "InvalidFileLink", - "reason" : "Linking a file cannot be performed without a 'filename' or a 'path' that does not end with a filename." + "@type" : "InvalidFilePath", + "reason" : "Linking or registering a file cannot be performed without a 'filename' or a 'path' that does not end with a filename." } """ } diff --git a/tests/src/test/resources/kg/files/registration-metadata.json b/tests/src/test/resources/kg/files/registration-metadata.json new file mode 100644 index 0000000000..8a5ec60b8f --- /dev/null +++ b/tests/src/test/resources/kg/files/registration-metadata.json @@ -0,0 +1,31 @@ +{ + "@context" : [ + "https://bluebrain.github.io/nexus/contexts/files.json", + "https://bluebrain.github.io/nexus/contexts/metadata.json" + ], + "@id": "{{id}}", + "@type": "File", + "_storage": { + "@id": "https://bluebrain.github.io/nexus/vocabulary/{{storageId}}", + "@type" : "S3Storage", + "_rev": 3 + }, + "_bytes": 29625, + "_digest": { + "_algorithm": "SHA-256", + "_value": "{{digestValue}}" + }, + "_filename": "nexus-logo.png", + "_location": "{{location}}", + "_mediaType": "image/png", + "_origin" : "External", + "_incoming": "{{self}}/incoming", + "_outgoing": "{{self}}/outgoing", + "_self": "{{self}}", + "_constrainedBy": "https://bluebrain.github.io/nexus/schemas/files.json", + "_project": "{{deltaUri}}/projects/{{projId}}", + "_rev": 1, + "_deprecated": false, + "_createdBy": "{{deltaUri}}/realms/{{realm}}/users/{{user}}", + "_updatedBy": "{{deltaUri}}/realms/{{realm}}/users/{{user}}" +} \ No newline at end of file diff --git a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/files/S3StorageSpec.scala b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/files/S3StorageSpec.scala index 2ec325b63b..d0e5d579f8 100644 --- a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/files/S3StorageSpec.scala +++ b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/files/S3StorageSpec.scala @@ -2,20 +2,24 @@ package ch.epfl.bluebrain.nexus.tests.kg.files import akka.http.scaladsl.model.StatusCodes import cats.effect.IO +import cats.implicits.toTraverseOps import ch.epfl.bluebrain.nexus.tests.Identity.storages.Coyote import ch.epfl.bluebrain.nexus.tests.Optics.{error, filterMetadataKeys} import ch.epfl.bluebrain.nexus.tests.config.S3Config import ch.epfl.bluebrain.nexus.tests.iam.types.Permission import io.circe.Json import io.circe.syntax.EncoderOps +import io.laserdisc.pure.s3.tagless.Interpreter +import org.apache.commons.codec.binary.Hex import org.scalatest.Assertion import software.amazon.awssdk.auth.credentials.{AnonymousCredentialsProvider, AwsBasicCredentials, StaticCredentialsProvider} import software.amazon.awssdk.regions.Region -import software.amazon.awssdk.services.s3.S3Client +import software.amazon.awssdk.services.s3.S3AsyncClient import software.amazon.awssdk.services.s3.model._ import java.net.URI import java.nio.file.Paths +import java.util.Base64 import scala.jdk.CollectionConverters._ class S3StorageSpec extends StorageSpec { @@ -30,8 +34,11 @@ class S3StorageSpec extends StorageSpec { val s3Config: S3Config = storageConfig.s3 - private val bucket = genId() - private val logoKey = "some/path/to/nexus-logo.png" + private val bucket = genId() + private val logoFilename = "nexus-logo.png" + private val logoKey = s"some/path/to/$logoFilename" + private val logoSha256Base64Digest = "Bb9EKBAhO55f7NUkLu/v8fPSB5E4YclmWMdcz1iZfoc=" + private val logoSha256HexDigest = Hex.encodeHexString(Base64.getDecoder.decode(logoSha256Base64Digest)) val s3Endpoint: String = "http://s3.localhost.localstack.cloud:4566" val s3BucketEndpoint: String = s"http://s3.localhost.localstack.cloud:4566/$bucket" @@ -41,29 +48,45 @@ class S3StorageSpec extends StorageSpec { case _ => AnonymousCredentialsProvider.create() } - private val s3Client = S3Client.builder - .endpointOverride(new URI(s3Endpoint)) - .credentialsProvider(credentialsProvider) - .region(Region.US_EAST_1) - .build + val s3Client = Interpreter[IO] + .S3AsyncClientOpResource( + S3AsyncClient + .builder() + .credentialsProvider(credentialsProvider) + .endpointOverride(new URI(s3Endpoint)) + .forcePathStyle(true) + .region(Region.US_EAST_1) + ) + .allocated + .map(_._1) + .accepted override def beforeAll(): Unit = { super.beforeAll() - // Configure minio - s3Client.createBucket(CreateBucketRequest.builder.bucket(bucket).build) - s3Client.putObject( - PutObjectRequest.builder.bucket(bucket).key(logoKey).build, - Paths.get(getClass.getResource("/kg/files/nexus-logo.png").toURI) - ) + (s3Client.createBucket(CreateBucketRequest.builder.bucket(bucket).build) >> uploadLogoFile(logoKey)).accepted () } + private def uploadLogoFile(key: String): IO[PutObjectResponse] = s3Client.putObject( + PutObjectRequest.builder + .bucket(bucket) + .key(key) + .checksumAlgorithm(ChecksumAlgorithm.SHA256) + .checksumSHA256(logoSha256Base64Digest) + .build, + Paths.get(getClass.getResource("/kg/files/nexus-logo.png").toURI) + ) + override def afterAll(): Unit = { - val objects = s3Client.listObjects(ListObjectsRequest.builder.bucket(bucket).build) - objects.contents.asScala.foreach { obj => - s3Client.deleteObject(DeleteObjectRequest.builder.bucket(bucket).key(obj.key).build) - } - s3Client.deleteBucket(DeleteBucketRequest.builder.bucket(bucket).build) + val cleanup: IO[Unit] = for { + resp <- s3Client.listObjects(ListObjectsRequest.builder.bucket(bucket).build) + objects = resp.contents.asScala.toList + _ <- objects.traverse(obj => s3Client.deleteObject(DeleteObjectRequest.builder.bucket(bucket).key(obj.key).build)) + _ <- s3Client.deleteBucket(DeleteBucketRequest.builder.bucket(bucket).build) + } yield () + + cleanup.accepted + super.afterAll() } @@ -145,4 +168,40 @@ class S3StorageSpec extends StorageSpec { } } } + + private def registrationResponse(id: String, digestValue: String, location: String): Json = + jsonContentOf( + "kg/files/registration-metadata.json", + replacements( + Coyote, + "id" -> id, + "storageId" -> storageId, + "self" -> fileSelf(projectRef, id), + "projId" -> s"$projectRef", + "digestValue" -> digestValue, + "location" -> location + ): _* + ) + + s"Registering an S3 file in-place" should { + "succeed" in { + val id = genId() + val path = s"$id/nexus-logo.png" + val payload = Json.obj("path" -> Json.fromString(path)) + + for { + _ <- uploadLogoFile(path) + _ <- deltaClient.put[Json](s"/files/$projectRef/register/$id?storage=nxv:$storageId", payload, Coyote) { + (_, response) => response.status shouldEqual StatusCodes.Created + } + fullId = s"$attachmentPrefix$id" + location = s"$s3BucketEndpoint/$path" + expected = registrationResponse(fullId, logoSha256HexDigest, location) + assertion <- deltaClient.get[Json](s"/files/$projectRef/$id", Coyote) { (json, response) => + response.status shouldEqual StatusCodes.OK + filterMetadataKeys(json) shouldEqual expected + } + } yield assertion + } + } } From a404d4211793aa896110bfbe5b960306df50a27c Mon Sep 17 00:00:00 2001 From: Daniel Bell Date: Mon, 22 Apr 2024 12:51:16 +0100 Subject: [PATCH 3/3] Remove s3 md5 (#4882) * Remove MD5 support from S3 storages * Refactor S3 object existence check * move upload file into S3StorageClient * upload file returns location * Group upload metadata into case class --- .../storage/storages/StoragesConfig.scala | 4 +- .../storages/model/DigestAlgorithm.scala | 4 +- .../operations/s3/S3StorageSaveFile.scala | 110 +++--------------- .../s3/client/S3StorageClient.scala | 105 +++++++++++++++-- .../operations/s3/S3FileOperationsSuite.scala | 11 +- 5 files changed, 127 insertions(+), 107 deletions(-) diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/StoragesConfig.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/StoragesConfig.scala index c895a94a62..637863a49b 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/StoragesConfig.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/StoragesConfig.scala @@ -90,8 +90,8 @@ object StoragesConfig { amazonCursor: ConfigObjectCursor ): Result[Unit] = { digestAlgorithm.value match { - case "SHA-256" | "SHA-1" | "MD5" => Right(()) - case _ => + case "SHA-256" | "SHA-1" => Right(()) + case _ => Left( ConfigReaderFailures( ConvertFailure( diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/DigestAlgorithm.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/DigestAlgorithm.scala index 6fecd47c49..e1c884349c 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/DigestAlgorithm.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/DigestAlgorithm.scala @@ -29,8 +29,8 @@ object DigestAlgorithm { final val default: DigestAlgorithm = new DigestAlgorithm("SHA-256") - final val MD5: DigestAlgorithm = - new DigestAlgorithm("MD5") + final val SHA1: DigestAlgorithm = + new DigestAlgorithm("SHA-1") /** * Safely construct an [[DigestAlgorithm]] diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala index 3fe7e41850..ebb90cc69e 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala @@ -5,7 +5,7 @@ import akka.actor.ActorSystem import akka.http.scaladsl.model.{BodyPartEntity, Uri} import akka.stream.scaladsl.Source import akka.util.ByteString -import cats.effect.{IO, Ref} +import cats.effect.IO import cats.implicits._ import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF @@ -15,23 +15,18 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgori import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.S3Storage import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.FileOperations.intermediateFolders import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.SaveFileRejection._ -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.S3StorageSaveFile.PutObjectRequestOps import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient -import ch.epfl.bluebrain.nexus.delta.rdf.syntax.uriSyntax +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient.UploadMetadata import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter -import fs2.{Chunk, Pipe, Stream} -import org.apache.commons.codec.binary.Hex -import software.amazon.awssdk.core.async.AsyncRequestBody -import software.amazon.awssdk.services.s3.model._ +import fs2.Stream -import java.util.{Base64, UUID} +import java.util.UUID final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit as: ActorSystem, uuidf: UUIDF ) { - private val s3 = s3StorageClient.underlyingClient private val logger = Logger[S3StorageSaveFile] def apply( @@ -57,45 +52,39 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit val fileData: Stream[IO, Byte] = convertStream(entity.dataBytes) (for { - _ <- log(bucket, key, s"Checking for object existence") - _ <- validateObjectDoesNotExist(bucket, key) - _ <- log(bucket, key, s"Beginning upload") - (digest, fileSize) <- uploadFile(fileData, bucket, key, algorithm) - _ <- log(bucket, key, s"Finished upload. Digest: $digest") - attr = fileMetadata(bucket, key, uuid, fileSize, algorithm, digest) + _ <- log(bucket, key, s"Checking for object existence") + _ <- validateObjectDoesNotExist(bucket, key) + _ <- log(bucket, key, s"Beginning upload") + uploadMetadata <- s3StorageClient.uploadFile(fileData, bucket, key, algorithm) + _ <- log(bucket, key, s"Finished upload. Digest: ${uploadMetadata.checksum}") + attr = fileMetadata(key, uuid, algorithm, uploadMetadata) } yield attr) .onError(e => logger.error(e)("Unexpected error when storing file")) .adaptError { err => UnexpectedSaveError(key, err.getMessage) } } private def fileMetadata( - bucket: String, key: String, uuid: UUID, - fileSize: Long, algorithm: DigestAlgorithm, - digest: String + uploadMetadata: UploadMetadata ): FileStorageMetadata = FileStorageMetadata( uuid = uuid, - bytes = fileSize, - digest = Digest.ComputedDigest(algorithm, digest), + bytes = uploadMetadata.fileSize, + digest = Digest.ComputedDigest(algorithm, uploadMetadata.checksum), origin = Client, - location = s3StorageClient.baseEndpoint / bucket / Uri.Path(key), + location = uploadMetadata.location, path = Uri.Path(key) ) private def validateObjectDoesNotExist(bucket: String, key: String) = s3StorageClient - .headObject(bucket, key) - .void - .redeemWith( - { - case _: NoSuchKeyException => IO.unit - case e => IO.raiseError(e) - }, - _ => IO.raiseError(ResourceAlreadyExists(key)) - ) + .objectExists(bucket, key) + .flatMap { + case true => IO.raiseError(ResourceAlreadyExists(key)) + case false => IO.unit + } private def convertStream(source: Source[ByteString, Any]): Stream[IO, Byte] = StreamConverter( @@ -104,67 +93,6 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit .mapMaterializedValue(_ => NotUsed) ) - private def uploadFile( - fileData: Stream[IO, Byte], - bucket: String, - key: String, - algorithm: DigestAlgorithm - ): IO[(String, Long)] = { - for { - fileSizeAcc <- Ref.of[IO, Long](0L) - digest <- fileData - .evalTap(_ => fileSizeAcc.update(_ + 1)) - .through( - uploadFilePipe(bucket, key, algorithm) - ) - .compile - .onlyOrError - fileSize <- fileSizeAcc.get - } yield (digest, fileSize) - } - - private def uploadFilePipe(bucket: String, key: String, algorithm: DigestAlgorithm): Pipe[IO, Byte, String] = { in => - fs2.Stream.eval { - in.compile.to(Chunk).flatMap { chunks => - val bs = chunks.toByteBuffer - for { - response <- s3.putObject( - PutObjectRequest - .builder() - .bucket(bucket) - .deltaDigest(algorithm) - .key(key) - .build(), - AsyncRequestBody.fromByteBuffer(bs) - ) - } yield { - checksumFromResponse(response, algorithm) - } - } - } - } - - private def checksumFromResponse(response: PutObjectResponse, algorithm: DigestAlgorithm): String = { - algorithm.value match { - case "MD5" => response.eTag().stripPrefix("\"").stripSuffix("\"") - case "SHA-256" => Hex.encodeHexString(Base64.getDecoder.decode(response.checksumSHA256())) - case "SHA-1" => Hex.encodeHexString(Base64.getDecoder.decode(response.checksumSHA1())) - case _ => throw new IllegalArgumentException(s"Unsupported algorithm for S3: ${algorithm.value}") - } - } - private def log(bucket: String, key: String, msg: String): IO[Unit] = logger.info(s"Bucket: ${bucket}. Key: $key. $msg") } - -object S3StorageSaveFile { - implicit class PutObjectRequestOps(request: PutObjectRequest.Builder) { - def deltaDigest(algorithm: DigestAlgorithm): PutObjectRequest.Builder = - algorithm.value match { - case "MD5" => request - case "SHA-256" => request.checksumAlgorithm(ChecksumAlgorithm.SHA256) - case "SHA-1" => request.checksumAlgorithm(ChecksumAlgorithm.SHA1) - case _ => throw new IllegalArgumentException(s"Unsupported algorithm for S3: ${algorithm.value}") - } - } -} diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala index da606a3cfe..35d68afd4a 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala @@ -1,20 +1,26 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client import akka.http.scaladsl.model.Uri -import cats.effect.{IO, Resource} +import cats.effect.{IO, Ref, Resource} import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.S3StorageConfig +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient.UploadMetadata +import ch.epfl.bluebrain.nexus.delta.rdf.syntax.uriSyntax import ch.epfl.bluebrain.nexus.delta.sdk.error.ServiceError.FeatureDisabled -import fs2.Stream +import fs2.{Chunk, Pipe, Stream} import fs2.aws.s3.S3 import fs2.aws.s3.models.Models.{BucketName, FileKey} import io.laserdisc.pure.s3.tagless.{Interpreter, S3AsyncClientOp} +import org.apache.commons.codec.binary.Hex import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, AwsCredentialsProvider, StaticCredentialsProvider} +import software.amazon.awssdk.core.async.AsyncRequestBody import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.s3.S3AsyncClient -import software.amazon.awssdk.services.s3.model.{ChecksumMode, HeadObjectRequest, HeadObjectResponse, ListObjectsV2Request, ListObjectsV2Response} +import software.amazon.awssdk.services.s3.model.{ChecksumAlgorithm, ChecksumMode, HeadObjectRequest, HeadObjectResponse, ListObjectsV2Request, ListObjectsV2Response, NoSuchKeyException, PutObjectRequest, PutObjectResponse} import java.net.URI +import java.util.Base64 trait S3StorageClient { def listObjectsV2(bucket: String): IO[ListObjectsV2Response] @@ -28,12 +34,22 @@ trait S3StorageClient { def headObject(bucket: String, key: String): IO[HeadObjectResponse] - def underlyingClient: S3AsyncClientOp[IO] + def uploadFile( + fileData: Stream[IO, Byte], + bucket: String, + key: String, + algorithm: DigestAlgorithm + ): IO[UploadMetadata] + + def objectExists(bucket: String, key: String): IO[Boolean] def baseEndpoint: Uri } object S3StorageClient { + + case class UploadMetadata(checksum: String, fileSize: Long, location: Uri) + def resource(s3Config: Option[S3StorageConfig]): Resource[IO, S3StorageClient] = s3Config match { case Some(cfg) => val creds = @@ -72,7 +88,75 @@ object S3StorageClient { override def headObject(bucket: String, key: String): IO[HeadObjectResponse] = client.headObject(HeadObjectRequest.builder().bucket(bucket).key(key).checksumMode(ChecksumMode.ENABLED).build) - override def underlyingClient: S3AsyncClientOp[IO] = client + override def objectExists(bucket: String, key: String): IO[Boolean] = { + headObject(bucket, key) + .redeemWith( + { + case _: NoSuchKeyException => IO.pure(false) + case e => IO.raiseError(e) + }, + _ => IO.pure(true) + ) + } + + override def uploadFile( + fileData: Stream[IO, Byte], + bucket: String, + key: String, + algorithm: DigestAlgorithm + ): IO[UploadMetadata] = { + for { + fileSizeAcc <- Ref.of[IO, Long](0L) + digest <- fileData + .evalTap(_ => fileSizeAcc.update(_ + 1)) + .through( + uploadFilePipe(bucket, key, algorithm) + ) + .compile + .onlyOrError + fileSize <- fileSizeAcc.get + location = baseEndpoint / bucket / Uri.Path(key) + } yield UploadMetadata(digest, fileSize, location) + } + + private def uploadFilePipe(bucket: String, key: String, algorithm: DigestAlgorithm): Pipe[IO, Byte, String] = { + in => + fs2.Stream.eval { + in.compile.to(Chunk).flatMap { chunks => + val bs = chunks.toByteBuffer + for { + response <- client.putObject( + PutObjectRequest + .builder() + .bucket(bucket) + .deltaDigest(algorithm) + .key(key) + .build(), + AsyncRequestBody.fromByteBuffer(bs) + ) + } yield { + checksumFromResponse(response, algorithm) + } + } + } + } + + private def checksumFromResponse(response: PutObjectResponse, algorithm: DigestAlgorithm): String = { + algorithm.value match { + case "SHA-256" => Hex.encodeHexString(Base64.getDecoder.decode(response.checksumSHA256())) + case "SHA-1" => Hex.encodeHexString(Base64.getDecoder.decode(response.checksumSHA1())) + case _ => throw new IllegalArgumentException(s"Unsupported algorithm for S3: ${algorithm.value}") + } + } + + implicit class PutObjectRequestOps(request: PutObjectRequest.Builder) { + def deltaDigest(algorithm: DigestAlgorithm): PutObjectRequest.Builder = + algorithm.value match { + case "SHA-256" => request.checksumAlgorithm(ChecksumAlgorithm.SHA256) + case "SHA-1" => request.checksumAlgorithm(ChecksumAlgorithm.SHA1) + case _ => throw new IllegalArgumentException(s"Unsupported algorithm for S3: ${algorithm.value}") + } + } } final case object S3StorageClientDisabled extends S3StorageClient { @@ -87,8 +171,15 @@ object S3StorageClient { override def headObject(bucket: String, key: String): IO[HeadObjectResponse] = raiseDisabledErr - override def underlyingClient: S3AsyncClientOp[IO] = throw disabledErr - override def baseEndpoint: Uri = throw disabledErr + + override def objectExists(bucket: String, key: String): IO[Boolean] = raiseDisabledErr + + override def uploadFile( + fileData: Stream[IO, Byte], + bucket: String, + key: String, + algorithm: DigestAlgorithm + ): IO[UploadMetadata] = raiseDisabledErr } } diff --git a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperationsSuite.scala b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperationsSuite.scala index 9f76fd8b6a..ce435b608c 100644 --- a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperationsSuite.scala +++ b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperationsSuite.scala @@ -22,6 +22,7 @@ import munit.AnyFixture import org.apache.commons.codec.binary.Hex import java.nio.charset.StandardCharsets +import java.security.MessageDigest import scala.concurrent.duration.{Duration, DurationInt} class S3FileOperationsSuite @@ -44,7 +45,7 @@ class S3FileOperationsSuite private lazy val fileOps = S3FileOperations.mk(s3StorageClient) private def makeContentHash(algorithm: DigestAlgorithm, content: String) = { - Hex.encodeHexString(algorithm.digest.digest(content.getBytes(StandardCharsets.UTF_8))) + Hex.encodeHexString(MessageDigest.getInstance(algorithm.value).digest(content.getBytes(StandardCharsets.UTF_8))) } test("List objects in an existing bucket") { @@ -88,11 +89,11 @@ class S3FileOperationsSuite } } - test("Use MD5 to calculate a checksum") { + test("Use SHA-1 to calculate a checksum") { givenAnS3Bucket { bucket => val storageValue = S3StorageValue( default = false, - algorithm = DigestAlgorithm.MD5, + algorithm = DigestAlgorithm.SHA1, bucket = bucket, readPermission = read, writePermission = write, @@ -105,12 +106,12 @@ class S3FileOperationsSuite val filename = "myfile.txt" val content = genString() - val hashOfContent = makeContentHash(DigestAlgorithm.MD5, content) + val hashOfContent = makeContentHash(DigestAlgorithm.SHA1, content) val entity = HttpEntity(content) for { attr <- fileOps.save(storage, filename, entity) - _ = assertEquals(attr.digest, ComputedDigest(DigestAlgorithm.MD5, hashOfContent)) + _ = assertEquals(attr.digest, ComputedDigest(DigestAlgorithm.SHA1, hashOfContent)) } yield () } }