diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala index 3dc442426d..ebb90cc69e 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala @@ -16,6 +16,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.S3St import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.FileOperations.intermediateFolders import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.SaveFileRejection._ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient.UploadMetadata import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter import fs2.Stream @@ -51,12 +52,12 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit val fileData: Stream[IO, Byte] = convertStream(entity.dataBytes) (for { - _ <- log(bucket, key, s"Checking for object existence") - _ <- validateObjectDoesNotExist(bucket, key) - _ <- log(bucket, key, s"Beginning upload") - (digest, fileSize, location) <- s3StorageClient.uploadFile(fileData, bucket, key, algorithm) - _ <- log(bucket, key, s"Finished upload. Digest: $digest") - attr = fileMetadata(key, uuid, fileSize, algorithm, digest, location) + _ <- log(bucket, key, s"Checking for object existence") + _ <- validateObjectDoesNotExist(bucket, key) + _ <- log(bucket, key, s"Beginning upload") + uploadMetadata <- s3StorageClient.uploadFile(fileData, bucket, key, algorithm) + _ <- log(bucket, key, s"Finished upload. Digest: ${uploadMetadata.checksum}") + attr = fileMetadata(key, uuid, algorithm, uploadMetadata) } yield attr) .onError(e => logger.error(e)("Unexpected error when storing file")) .adaptError { err => UnexpectedSaveError(key, err.getMessage) } @@ -65,17 +66,15 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit private def fileMetadata( key: String, uuid: UUID, - fileSize: Long, algorithm: DigestAlgorithm, - digest: String, - location: Uri + uploadMetadata: UploadMetadata ): FileStorageMetadata = FileStorageMetadata( uuid = uuid, - bytes = fileSize, - digest = Digest.ComputedDigest(algorithm, digest), + bytes = uploadMetadata.fileSize, + digest = Digest.ComputedDigest(algorithm, uploadMetadata.checksum), origin = Client, - location = location, + location = uploadMetadata.location, path = Uri.Path(key) ) diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala index 9e28635af5..35d68afd4a 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala @@ -5,6 +5,7 @@ import cats.effect.{IO, Ref, Resource} import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.S3StorageConfig import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient.UploadMetadata import ch.epfl.bluebrain.nexus.delta.rdf.syntax.uriSyntax import ch.epfl.bluebrain.nexus.delta.sdk.error.ServiceError.FeatureDisabled import fs2.{Chunk, Pipe, Stream} @@ -38,7 +39,7 @@ trait S3StorageClient { bucket: String, key: String, algorithm: DigestAlgorithm - ): IO[(String, Long, Uri)] + ): IO[UploadMetadata] def objectExists(bucket: String, key: String): IO[Boolean] @@ -46,6 +47,9 @@ trait S3StorageClient { } object S3StorageClient { + + case class UploadMetadata(checksum: String, fileSize: Long, location: Uri) + def resource(s3Config: Option[S3StorageConfig]): Resource[IO, S3StorageClient] = s3Config match { case Some(cfg) => val creds = @@ -100,7 +104,7 @@ object S3StorageClient { bucket: String, key: String, algorithm: DigestAlgorithm - ): IO[(String, Long, Uri)] = { + ): IO[UploadMetadata] = { for { fileSizeAcc <- Ref.of[IO, Long](0L) digest <- fileData @@ -112,7 +116,7 @@ object S3StorageClient { .onlyOrError fileSize <- fileSizeAcc.get location = baseEndpoint / bucket / Uri.Path(key) - } yield (digest, fileSize, location) + } yield UploadMetadata(digest, fileSize, location) } private def uploadFilePipe(bucket: String, key: String, algorithm: DigestAlgorithm): Pipe[IO, Byte, String] = { @@ -176,6 +180,6 @@ object S3StorageClient { bucket: String, key: String, algorithm: DigestAlgorithm - ): IO[(String, Long, Uri)] = raiseDisabledErr + ): IO[UploadMetadata] = raiseDisabledErr } }