From 96a1c94e8851fdb1b4cf60ad1309d6a73a3eb799 Mon Sep 17 00:00:00 2001 From: dantb Date: Tue, 9 Apr 2024 15:56:42 +0200 Subject: [PATCH] Replace alpakka S3 with fs2-aws and minio with localstack (#4852) * Read file using fs2-aws * Use S3 storage client to fetch files * Use same group size as the previous implementation * WIP - multipart S3 upload * WIP - check object existence, fetch metadata for file size, refactor * Use S3 base endpoint for absolute path * Remove alpakka s3 dependency * Tidy * Tidy integration test debugging * Don't error when uploading an empty file (calling .lastOrError) * Fix bug in file size calculation --------- Co-authored-by: Daniel Bell Co-authored-by: Daniel Bell --- build.sbt | 7 - .../plugins/storage/StoragePluginModule.scala | 6 +- .../delta/plugins/storage/files/Files.scala | 12 +- .../storage/storages/model/Storage.scala | 10 +- .../storage/storages/model/StorageValue.scala | 37 ---- .../storages/operations/FetchFile.scala | 11 +- .../storages/operations/SaveFile.scala | 6 +- .../operations/s3/S3StorageFetchFile.scala | 46 ++--- .../operations/s3/S3StorageSaveFile.scala | 181 +++++++++++++----- .../s3/client/S3StorageClient.scala | 43 ++++- .../plugins/storage/files/FilesSpec.scala | 3 +- .../files/routes/FilesRoutesSpec.scala | 3 +- .../s3/LocalStackS3StorageClient.scala | 41 ++++ .../storages/operations/s3/MinioSpec.scala | 50 ----- ...lStack.scala => S3StorageAccessSpec.scala} | 16 +- .../s3/S3StorageFetchSaveSpec.scala | 94 +++++++++ .../s3/S3StorageSaveAndFetchFileSpec.scala | 100 ---------- .../nexus/testkit/minio/LocalStackS3.scala | 14 +- .../nexus/tests/kg/files/S3StorageSpec.scala | 2 +- 19 files changed, 370 insertions(+), 312 deletions(-) create mode 100644 delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/LocalStackS3StorageClient.scala delete mode 100644 delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/MinioSpec.scala rename delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/{S3StorageAccessSpecLocalStack.scala => S3StorageAccessSpec.scala} (63%) create mode 100644 delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageFetchSaveSpec.scala delete mode 100644 delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveAndFetchFileSpec.scala diff --git a/build.sbt b/build.sbt index 75ed3f7bc8..f2a86ca307 100755 --- a/build.sbt +++ b/build.sbt @@ -76,7 +76,6 @@ lazy val akkaTestKit = "com.typesafe.akka" %% "akka lazy val akkaTestKitTyped = "com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion lazy val alpakkaFile = "com.lightbend.akka" %% "akka-stream-alpakka-file" % alpakkaVersion lazy val alpakkaSse = "com.lightbend.akka" %% "akka-stream-alpakka-sse" % alpakkaVersion -lazy val alpakkaS3 = "com.lightbend.akka" %% "akka-stream-alpakka-s3" % alpakkaVersion lazy val apacheCompress = "org.apache.commons" % "commons-compress" % apacheCompressVersion lazy val apacheIO = "commons-io" % "commons-io" % apacheIOVersion lazy val awsSdk = "software.amazon.awssdk" % "s3" % awsSdkVersion @@ -576,12 +575,6 @@ lazy val storagePlugin = project name := "delta-storage-plugin", moduleName := "delta-storage-plugin", libraryDependencies ++= Seq( - alpakkaS3 excludeAll ( - ExclusionRule(organization = "com.typesafe.akka", name = "akka-stream_2.13"), - ExclusionRule(organization = "com.typesafe.akka", name = "akka-http_2.13"), - ExclusionRule(organization = "com.typesafe.akka", name = "akka-http-xml_2.13"), - ExclusionRule(organization = "org.slf4j", name = "slf4j-api") - ), kamonAkkaHttp % Provided, akkaSlf4j % Test, akkaTestKitTyped % Test, diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/StoragePluginModule.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/StoragePluginModule.scala index 1ffd2d1a7b..d567fc5867 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/StoragePluginModule.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/StoragePluginModule.scala @@ -161,7 +161,6 @@ class StoragePluginModule(priority: Int) extends ModuleDef { make[Files].from { ( cfg: StoragePluginConfig, - storageTypeConfig: StorageTypeConfig, aclCheck: AclCheck, fetchContext: FetchContext, storages: Storages, @@ -170,7 +169,8 @@ class StoragePluginModule(priority: Int) extends ModuleDef { clock: Clock[IO], uuidF: UUIDF, as: ActorSystem[Nothing], - remoteDiskStorageClient: RemoteDiskStorageClient + remoteDiskStorageClient: RemoteDiskStorageClient, + s3Client: S3StorageClient ) => Files( fetchContext, @@ -178,9 +178,9 @@ class StoragePluginModule(priority: Int) extends ModuleDef { storages, storagesStatistics, xas, - storageTypeConfig, cfg.files, remoteDiskStorageClient, + s3Client, clock )( uuidF, diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala index 4c346a954f..75b9e69631 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala @@ -16,12 +16,12 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileEvent._ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection._ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model._ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.schemas.{files => fileSchema} -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageRejection.{StorageFetchRejection, StorageIsDeprecated} import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.{DigestAlgorithm, Storage, StorageRejection, StorageType} import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.{FetchAttributeRejection, FetchFileRejection, SaveFileRejection} import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations._ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.{FetchStorage, Storages, StoragesStatistics} import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.ContextValue @@ -56,7 +56,7 @@ final class Files( storages: FetchStorage, storagesStatistics: StoragesStatistics, remoteDiskStorageClient: RemoteDiskStorageClient, - config: StorageTypeConfig + s3Client: S3StorageClient )(implicit uuidF: UUIDF, system: ClassicActorSystem @@ -392,7 +392,7 @@ final class Files( }.span("fetchFileContent") private def fetchFile(storage: Storage, attr: FileAttributes, fileId: Iri): IO[AkkaSource] = - FetchFile(storage, remoteDiskStorageClient, config) + FetchFile(storage, remoteDiskStorageClient, s3Client) .apply(attr) .adaptError { case e: FetchFileRejection => FetchRejection(fileId, storage.id, e) @@ -504,7 +504,7 @@ final class Files( metadata: FileDescription, source: BodyPartEntity ): IO[FileStorageMetadata] = - SaveFile(storage, remoteDiskStorageClient, config) + SaveFile(storage, remoteDiskStorageClient, s3Client) .apply(metadata.filename, source) .adaptError { case e: SaveFileRejection => SaveRejection(iri, storage.id, e) } @@ -762,9 +762,9 @@ object Files { storages: FetchStorage, storagesStatistics: StoragesStatistics, xas: Transactors, - storageTypeConfig: StorageTypeConfig, config: FilesConfig, remoteDiskStorageClient: RemoteDiskStorageClient, + s3Client: S3StorageClient, clock: Clock[IO] )(implicit uuidF: UUIDF, @@ -779,7 +779,7 @@ object Files { storages, storagesStatistics, remoteDiskStorageClient, - storageTypeConfig + s3Client ) } } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/Storage.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/Storage.scala index f0c4992914..d0ad7aec2c 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/Storage.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/Storage.scala @@ -2,13 +2,13 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model import akka.actor.ActorSystem import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.Metadata import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageValue.{DiskStorageValue, RemoteDiskStorageValue, S3StorageValue} import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations._ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.disk.{DiskStorageFetchFile, DiskStorageSaveFile} import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote._ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{S3StorageFetchFile, S3StorageSaveFile} import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.{contexts, Storages} import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri @@ -97,11 +97,11 @@ object Storage { override val default: Boolean = value.default override val storageValue: StorageValue = value - def fetchFile(config: StorageTypeConfig)(implicit as: ActorSystem): FetchFile = - new S3StorageFetchFile(value, config) + def fetchFile(client: S3StorageClient): FetchFile = + new S3StorageFetchFile(client, value.bucket) - def saveFile(config: StorageTypeConfig)(implicit as: ActorSystem, uuidf: UUIDF): SaveFile = - new S3StorageSaveFile(this, config) + def saveFile(s3StorageClient: S3StorageClient)(implicit as: ActorSystem, uuidf: UUIDF): SaveFile = + new S3StorageSaveFile(s3StorageClient, this) } /** diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/StorageValue.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/StorageValue.scala index e33007da4e..71b7b98dcd 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/StorageValue.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/StorageValue.scala @@ -1,10 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model import akka.http.scaladsl.model.Uri -import akka.stream.alpakka.s3 -import akka.stream.alpakka.s3.{ApiVersion, MemoryBufferType} import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.Digest -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.JsonLdContext.keywords import ch.epfl.bluebrain.nexus.delta.sdk.implicits._ import ch.epfl.bluebrain.nexus.delta.sdk.permissions.model.Permission @@ -13,9 +10,7 @@ import io.circe.generic.extras.Configuration import io.circe.generic.extras.semiauto.{deriveConfiguredCodec, deriveConfiguredEncoder} import io.circe.syntax._ import io.circe.{Codec, Decoder, Encoder} -import software.amazon.awssdk.auth.credentials.{AnonymousCredentialsProvider, AwsBasicCredentials, StaticCredentialsProvider} import software.amazon.awssdk.regions.Region -import software.amazon.awssdk.regions.providers.AwsRegionProvider import java.io.File import java.nio.file.Path @@ -146,38 +141,6 @@ object StorageValue { override val tpe: StorageType = StorageType.S3Storage override val capacity: Option[Long] = None - - def address(bucket: String): Uri = - endpoint match { - case Some(host) if host.scheme.trim.isEmpty => Uri(s"https://$bucket.$host") - case Some(e) => e.withHost(s"$bucket.${e.authority.host}") - case None => region.fold(s"https://$bucket.s3.amazonaws.com")(r => s"https://$bucket.s3.$r.amazonaws.com") - } - - /** - * @return - * these settings converted to an instance of [[akka.stream.alpakka.s3.S3Settings]] - */ - def alpakkaSettings(config: StorageTypeConfig): s3.S3Settings = { - - val keys = for { - cfg <- config.amazon - } yield cfg.defaultAccessKey.value -> cfg.defaultSecretKey.value - - val credsProvider = keys match { - case Some((accessKey, secretKey)) => - StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey)) - case _ => - StaticCredentialsProvider.create(AnonymousCredentialsProvider.create().resolveCredentials()) - } - - val regionProvider: AwsRegionProvider = new AwsRegionProvider { - val getRegion: Region = region.getOrElse(Region.US_EAST_1) - } - - s3.S3Settings(MemoryBufferType, credsProvider, regionProvider, ApiVersion.ListBucketVersion2) - .withEndpointUrl(address(bucket).toString()) - } } object S3StorageValue { diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/FetchFile.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/FetchFile.scala index 3dd679a25a..c07a50a0e0 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/FetchFile.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/FetchFile.scala @@ -1,12 +1,11 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations -import akka.actor.ActorSystem import akka.http.scaladsl.model.Uri import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource trait FetchFile { @@ -33,13 +32,11 @@ object FetchFile { /** * Construct a [[FetchFile]] from the given ''storage''. */ - def apply(storage: Storage, client: RemoteDiskStorageClient, config: StorageTypeConfig)(implicit - as: ActorSystem - ): FetchFile = + def apply(storage: Storage, remoteClient: RemoteDiskStorageClient, s3Client: S3StorageClient): FetchFile = storage match { case storage: Storage.DiskStorage => storage.fetchFile - case storage: Storage.S3Storage => storage.fetchFile(config) - case storage: Storage.RemoteDiskStorage => storage.fetchFile(client) + case storage: Storage.S3Storage => storage.fetchFile(s3Client) + case storage: Storage.RemoteDiskStorage => storage.fetchFile(remoteClient) } } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/SaveFile.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/SaveFile.scala index 8b6358b5b6..9e95286430 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/SaveFile.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/SaveFile.scala @@ -8,9 +8,9 @@ import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.Digest.ComputedDigest import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileStorageMetadata -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.{DigestAlgorithm, Storage} import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import java.util.UUID @@ -34,13 +34,13 @@ object SaveFile { /** * Construct a [[SaveFile]] from the given ''storage''. */ - def apply(storage: Storage, client: RemoteDiskStorageClient, config: StorageTypeConfig)(implicit + def apply(storage: Storage, client: RemoteDiskStorageClient, s3Client: S3StorageClient)(implicit as: ActorSystem, uuidf: UUIDF ): SaveFile = storage match { case storage: Storage.DiskStorage => storage.saveFile - case storage: Storage.S3Storage => storage.saveFile(config) + case storage: Storage.S3Storage => storage.saveFile(s3Client) case storage: Storage.RemoteDiskStorage => storage.saveFile(client) } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageFetchFile.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageFetchFile.scala index bb8fbe1145..cc45c87a5b 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageFetchFile.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageFetchFile.scala @@ -1,45 +1,37 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3 -import akka.actor.ActorSystem import akka.http.scaladsl.model.Uri -import akka.stream.alpakka.s3.scaladsl.S3 -import akka.stream.alpakka.s3.{S3Attributes, S3Exception} -import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.util.ByteString import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageValue.S3StorageValue import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.FetchFile import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.FetchFileRejection._ +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource +import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter import java.net.URLDecoder import java.nio.charset.StandardCharsets.UTF_8 +import scala.concurrent.duration.DurationInt -final class S3StorageFetchFile(value: S3StorageValue, config: StorageTypeConfig)(implicit - as: ActorSystem -) extends FetchFile { - - private val s3Attributes = S3Attributes.settings(value.alpakkaSettings(config)) +final class S3StorageFetchFile(client: S3StorageClient, bucket: String) extends FetchFile { override def apply(attributes: FileAttributes): IO[AkkaSource] = apply(attributes.path) - override def apply(path: Uri.Path): IO[AkkaSource] = - IO.fromFuture( - IO.delay( - S3.download(value.bucket, URLDecoder.decode(path.toString, UTF_8.toString)) - .withAttributes(s3Attributes) - .runWith(Sink.head) + override def apply(path: Uri.Path): IO[AkkaSource] = { + IO.delay( + Source.fromGraph( + StreamConverter( + client + .readFile(bucket, URLDecoder.decode(path.toString, UTF_8.toString)) + .groupWithin(8192, 1.second) + .map(bytes => ByteString(bytes.toArray)) + ) ) - ).redeemWith( - { - case err: S3Exception => IO.raiseError(UnexpectedFetchError(path.toString, err.toString())) - case err => IO.raiseError(UnexpectedFetchError(path.toString, err.getMessage)) - }, - { - case Some((source, _)) => IO.pure(source: AkkaSource) - case None => IO.raiseError(FileNotFound(path.toString())) - } - ) + ).recoverWith { err => + IO.raiseError(UnexpectedFetchError(path.toString, err.getMessage)) + } + } } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala index dbb513a5f4..fc5fe5fbab 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala @@ -1,76 +1,169 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3 +import akka.NotUsed import akka.actor.ActorSystem import akka.http.scaladsl.model.Uri.Path -import akka.http.scaladsl.model.Uri.Path.Slash import akka.http.scaladsl.model.{BodyPartEntity, Uri} -import akka.stream.alpakka.s3.scaladsl.S3 -import akka.stream.alpakka.s3.{S3Attributes, S3Exception} -import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.util.ByteString import cats.effect.IO import cats.implicits._ +import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes.FileAttributesOrigin.Client -import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileStorageMetadata -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig +import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{Digest, FileStorageMetadata} +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.S3Storage import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.SaveFile -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.SaveFile.{digestSink, intermediateFolders, sizeSink} +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.SaveFile.intermediateFolders import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.SaveFileRejection._ -import ch.epfl.bluebrain.nexus.delta.plugins.storage.utils.SinkUtils +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient +import ch.epfl.bluebrain.nexus.delta.rdf.syntax.uriSyntax +import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter +import eu.timepit.refined.refineMV +import eu.timepit.refined.types.string.NonEmptyString +import fs2.Stream +import fs2.aws.s3.S3 +import fs2.aws.s3.S3.MultipartETagValidation +import fs2.aws.s3.models.Models.{BucketName, ETag, FileKey, PartSizeMB} +import software.amazon.awssdk.services.s3.model._ import java.util.UUID -import scala.concurrent.Future -final class S3StorageSaveFile(storage: S3Storage, config: StorageTypeConfig)(implicit +final class S3StorageSaveFile(s3StorageClient: S3StorageClient, storage: S3Storage)(implicit as: ActorSystem, uuidf: UUIDF ) extends SaveFile { - import as.dispatcher - private val fileAlreadyExistException = new IllegalArgumentException("Collision, file already exist") - override def apply( + private val client = s3StorageClient.underlyingClient + private val s3 = S3.create(client) + private val multipartETagValidation = MultipartETagValidation.create[IO] + private val logger = Logger[S3StorageSaveFile] + private val partSizeMB: PartSizeMB = refineMV(5) + private val bucket = BucketName(NonEmptyString.unsafeFrom(storage.value.bucket)) + + def apply( filename: String, entity: BodyPartEntity - ): IO[FileStorageMetadata] = { + ): IO[FileStorageMetadata] = for { uuid <- uuidf() path = Uri.Path(intermediateFolders(storage.project, uuid, filename)) result <- storeFile(path, uuid, entity) } yield result - } private def storeFile(path: Path, uuid: UUID, entity: BodyPartEntity): IO[FileStorageMetadata] = { - val key = path.toString() - val attributes = S3Attributes.settings(storage.value.alpakkaSettings(config)) - def s3Sink = S3.multipartUpload(storage.value.bucket, key).withAttributes(attributes) - IO.fromFuture( - IO.delay( - S3.getObjectMetadata(storage.value.bucket, key) - .withAttributes(attributes) - .runWith(Sink.last) - .flatMap { - case None => - entity.dataBytes.runWith(SinkUtils.combineMat(digestSink(storage.value.algorithm), sizeSink, s3Sink) { - case (digest, bytes, s3Result) => - Future.successful( - FileStorageMetadata( - uuid = uuid, - bytes = bytes, - digest = digest, - origin = Client, - location = s3Result.location.withPath(Slash(path)), - path = Uri.Path(key) - ) - ) - }) - case Some(_) => Future.failed(fileAlreadyExistException) - } + val key = path.toString() + val fileData: Stream[IO, Byte] = convertStream(entity.dataBytes) + + (for { + _ <- log(key, s"Checking for object existence") + _ <- validateObjectDoesNotExist(key) + _ <- log(key, s"Beginning multipart upload") + maybeEtags <- uploadFileMultipart(fileData, key) + _ <- log(key, s"Finished multipart upload. Etag by part: $maybeEtags") + attr <- collectFileMetadata(fileData, key, uuid, maybeEtags) + } yield attr) + .onError(e => logger.error(e)("Unexpected error when storing file")) + .adaptError { err => UnexpectedSaveError(key, err.getMessage) } + } + + private def validateObjectDoesNotExist(key: String) = + getFileAttributes(key).redeemWith( + { + case _: NoSuchKeyException => IO.unit + case e => IO.raiseError(e) + }, + _ => IO.raiseError(ResourceAlreadyExists(key)) + ) + + private def convertStream(source: Source[ByteString, Any]): Stream[IO, Byte] = + StreamConverter( + source + .flatMapConcat(x => Source.fromIterator(() => x.iterator)) + .mapMaterializedValue(_ => NotUsed) + ) + + private def uploadFileMultipart(fileData: Stream[IO, Byte], key: String): IO[List[Option[ETag]]] = + fileData + .through( + s3.uploadFileMultipart( + bucket, + FileKey(NonEmptyString.unsafeFrom(key)), + partSizeMB, + uploadEmptyFiles = true, + multipartETagValidation = multipartETagValidation.some + ) + ) + .compile + .to(List) + + private def getFileAttributes(key: String): IO[GetObjectAttributesResponse] = + client + .getObjectAttributes( + GetObjectAttributesRequest + .builder() + .bucket(bucket.value.value) + .key(key) + .objectAttributes(ObjectAttributes.OBJECT_SIZE) // TODO get all values + .build() ) - ).adaptError { - case `fileAlreadyExistException` => ResourceAlreadyExists(key) - case err: S3Exception => UnexpectedSaveError(key, err.toString()) - case err => UnexpectedSaveError(key, err.getMessage) + + private def collectFileMetadata( + bytes: Stream[IO, Byte], + key: String, + uuid: UUID, + maybeEtags: List[Option[ETag]] + ): IO[FileStorageMetadata] = + maybeEtags.sequence match { + case Some(onlyPartETag :: Nil) => + // TODO our tests expect specific values for digests and the only algorithm currently used is SHA-256. + // If we want to continue to check this, but allow for different algorithms, this needs to be abstracted + // in the tests and verified for specific file contents. + // The result will als depend on whether we use a multipart upload or a standard put object. + for { + _ <- log(key, s"Received ETag for single part upload: $onlyPartETag") + fileSize <- computeSize(bytes) + digest <- computeDigest(bytes, storage.storageValue.algorithm) + metadata <- fileMetadata(key, uuid, fileSize, digest) + } yield metadata + case Some(other) => raiseUnexpectedErr(key, s"S3 multipart upload returned multiple etags unexpectedly: $other") + case None => raiseUnexpectedErr(key, "S3 multipart upload was aborted because no data was received") } + + // TODO issue fetching attributes when tested against localstack, only after the object is saved + // Verify if it's the same for real S3. Error msg: 'Could not parse XML response.' + // For now we just compute it manually. + // private def getFileSize(key: String) = + // getFileAttributes(key).flatMap { attr => + // log(key, s"File attributes from S3: $attr").as(attr.objectSize()) + // } + private def computeSize(bytes: Stream[IO, Byte]): IO[Long] = bytes.fold(0L)((acc, _) => acc + 1).compile.lastOrError + + private def computeDigest(bytes: Stream[IO, Byte], algorithm: DigestAlgorithm): IO[String] = { + val digest = algorithm.digest + bytes.chunks + .evalMap(chunk => IO(digest.update(chunk.toArray))) + .compile + .last + .map { _ => + digest.digest().map("%02x".format(_)).mkString + } } + + private def fileMetadata(key: String, uuid: UUID, fileSize: Long, digest: String) = + s3StorageClient.baseEndpoint.map { base => + FileStorageMetadata( + uuid = uuid, + bytes = fileSize, + digest = Digest.ComputedDigest(storage.value.algorithm, digest), + origin = Client, + location = base / bucket.value.value / Uri.Path(key), + path = Uri.Path(key) + ) + } + + private def raiseUnexpectedErr[A](key: String, msg: String): IO[A] = IO.raiseError(UnexpectedSaveError(key, msg)) + + private def log(key: String, msg: String) = logger.info(s"Bucket: ${bucket.value}. Key: $key. $msg") } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala index 8b302dc3b1..4b095ae3cd 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala @@ -1,18 +1,31 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client +import akka.http.scaladsl.model.Uri import cats.effect.{IO, Resource} +import cats.implicits.toBifunctorOps import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.S3StorageConfig import ch.epfl.bluebrain.nexus.delta.sdk.error.ServiceError.FeatureDisabled +import eu.timepit.refined.collection.NonEmpty +import eu.timepit.refined.refineV +import fs2.aws.s3.S3 +import fs2.aws.s3.models.Models.{BucketName, FileKey} import io.laserdisc.pure.s3.tagless.{Interpreter, S3AsyncClientOp} import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider} import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.s3.S3AsyncClient import software.amazon.awssdk.services.s3.model.{ListObjectsV2Request, ListObjectsV2Response} +import fs2.Stream import java.net.URI trait S3StorageClient { def listObjectsV2(bucket: String): IO[ListObjectsV2Response] + + def readFile(bucket: String, fileKey: String): fs2.Stream[IO, Byte] + + def underlyingClient: S3AsyncClientOp[IO] + + def baseEndpoint: IO[Uri] } object S3StorageClient { @@ -32,18 +45,40 @@ object S3StorageClient { .forcePathStyle(true) .region(Region.US_EAST_1) ) - .map(new S3StorageClientImpl(_)) + .map(new S3StorageClientImpl(_, cfg.defaultEndpoint)) case None => Resource.pure(S3StorageClientDisabled) } - final class S3StorageClientImpl(client: S3AsyncClientOp[IO]) extends S3StorageClient { + final class S3StorageClientImpl(client: S3AsyncClientOp[IO], baseEndpoint: Uri) extends S3StorageClient { + private val s3: S3[IO] = S3.create(client) + override def listObjectsV2(bucket: String): IO[ListObjectsV2Response] = client.listObjectsV2(ListObjectsV2Request.builder().bucket(bucket).build()) + + def readFile(bucket: String, fileKey: String): fs2.Stream[IO, Byte] = + for { + bk <- Stream.fromEither[IO](refineV[NonEmpty](bucket).leftMap(e => new IllegalArgumentException(e))) + fk <- Stream.fromEither[IO](refineV[NonEmpty](fileKey).leftMap(e => new IllegalArgumentException(e))) + bytes <- s3.readFile(BucketName(bk), FileKey(fk)) + } yield bytes + + override def underlyingClient: S3AsyncClientOp[IO] = client + + override def baseEndpoint: IO[Uri] = IO.pure(baseEndpoint) } final case object S3StorageClientDisabled extends S3StorageClient { - override def listObjectsV2(bucket: String): IO[ListObjectsV2Response] = - IO.raiseError(FeatureDisabled("S3 storage is disabled")) + private val disabledErr = FeatureDisabled("S3 storage is disabled") + private val raiseDisabledErr = IO.raiseError(disabledErr) + + override def listObjectsV2(bucket: String): IO[ListObjectsV2Response] = raiseDisabledErr + + override def readFile(bucket: String, fileKey: String): fs2.Stream[IO, Byte] = + fs2.Stream.raiseError[IO](disabledErr) + + override def underlyingClient: S3AsyncClientOp[IO] = throw disabledErr + + override def baseEndpoint: IO[Uri] = raiseDisabledErr } } diff --git a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/FilesSpec.scala b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/FilesSpec.scala index e270b5ba58..56b38887f7 100644 --- a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/FilesSpec.scala +++ b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/FilesSpec.scala @@ -18,6 +18,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageRejec import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageType.{RemoteDiskStorage => RemoteStorageType} import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.{StorageStatEntry, StorageType} import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.AkkaSourceHelpers +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient.S3StorageClientDisabled import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.{StorageFixtures, Storages, StoragesConfig, StoragesStatistics} import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv @@ -143,9 +144,9 @@ class FilesSpec(fixture: RemoteStorageClientFixtures) storages, storageStatistics, xas, - cfg, FilesConfig(eventLogConfig, MediaTypeDetectorConfig.Empty), remoteDiskStorageClient, + S3StorageClientDisabled, clock ) diff --git a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/routes/FilesRoutesSpec.scala b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/routes/FilesRoutesSpec.scala index ae794c004b..fcfd94c5db 100644 --- a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/routes/FilesRoutesSpec.scala +++ b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/routes/FilesRoutesSpec.scala @@ -14,6 +14,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{FileAttributes import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.{contexts => fileContexts, permissions, FileFixtures, Files, FilesConfig} import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.{StorageStatEntry, StorageType} import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient.RemoteDiskStorageClientDisabled +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient.S3StorageClientDisabled import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.{contexts => storageContexts, permissions => storagesPermissions, StorageFixtures, Storages, StoragesConfig, StoragesStatistics} import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.RdfMediaTypes.`application/ld+json` @@ -128,9 +129,9 @@ class FilesRoutesSpec storages, storagesStatistics, xas, - config, FilesConfig(eventLogConfig, MediaTypeDetectorConfig.Empty), remoteDiskStorageClient, + S3StorageClientDisabled, clock )(uuidF, typedSystem) private val groupDirectives = DeltaSchemeDirectives(fetchContext) diff --git a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/LocalStackS3StorageClient.scala b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/LocalStackS3StorageClient.scala new file mode 100644 index 0000000000..1bda05edb8 --- /dev/null +++ b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/LocalStackS3StorageClient.scala @@ -0,0 +1,41 @@ +package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3 + +import akka.http.scaladsl.model.Uri +import cats.effect.{IO, Resource} +import ch.epfl.bluebrain.nexus.delta.kernel.Secret +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.S3StorageConfig +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.permissions +import ch.epfl.bluebrain.nexus.testkit.minio.LocalStackS3 +import munit.CatsEffectSuite +import munit.catseffect.IOFixture +import org.testcontainers.containers.localstack.LocalStackContainer.Service + +object LocalStackS3StorageClient { + val ServiceType = Service.S3 + + def s3StorageClientResource(): Resource[IO, (S3StorageClient, S3StorageConfig)] = + LocalStackS3.localstackS3().flatMap { localstack => + LocalStackS3.fs2ClientFromLocalstack(localstack).map { client => + val creds = localstack.staticCredentialsProvider.resolveCredentials() + val (accessKey, secretKey) = (creds.accessKeyId(), creds.secretAccessKey()) + val conf: S3StorageConfig = S3StorageConfig( + digestAlgorithm = DigestAlgorithm.default, + defaultEndpoint = Uri(localstack.endpointOverride(LocalStackS3.ServiceType).toString), + defaultAccessKey = Secret(accessKey), + defaultSecretKey = Secret(secretKey), + defaultReadPermission = permissions.read, + defaultWritePermission = permissions.write, + showLocation = false, + defaultMaxFileSize = 1 + ) + (new S3StorageClient.S3StorageClientImpl(client, conf.defaultEndpoint), conf) + } + } + + trait Fixture { self: CatsEffectSuite => + val localStackS3Client: IOFixture[(S3StorageClient, S3StorageConfig)] = + ResourceSuiteLocalFixture("s3storageclient", s3StorageClientResource()) + } +} diff --git a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/MinioSpec.scala b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/MinioSpec.scala deleted file mode 100644 index 951370d26f..0000000000 --- a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/MinioSpec.scala +++ /dev/null @@ -1,50 +0,0 @@ -package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3 - -import akka.actor.ActorSystem -import akka.stream.alpakka.s3.scaladsl.S3 -import akka.stream.alpakka.s3.{BucketAccess, S3Attributes} -import cats.effect.IO -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageValue.S3StorageValue -import ch.epfl.bluebrain.nexus.testkit.minio.MinioDocker -import org.scalatest.{Suite, Suites} - -import java.net.URLDecoder -import java.nio.charset.StandardCharsets.UTF_8 - -class MinioSpec extends Suites with MinioDocker { - override val nestedSuites: IndexedSeq[Suite] = Vector( - new S3StorageSaveAndFetchFileSpec(this) - ) -} - -object MinioSpec { - def createBucket( - value: S3StorageValue - )(implicit config: StorageTypeConfig, system: ActorSystem): IO[Unit] = { - implicit val attributes = S3Attributes.settings(value.alpakkaSettings(config)) - - IO.fromFuture(IO.delay(S3.checkIfBucketExists(value.bucket))).flatMap { - case BucketAccess.NotExists => IO.delay(S3.makeBucket(value.bucket)).void - case _ => IO.unit - } - } - - def deleteBucket( - value: S3StorageValue - )(implicit config: StorageTypeConfig, system: ActorSystem): IO[Unit] = { - implicit val attributes = S3Attributes.settings(value.alpakkaSettings(config)) - - IO.fromFuture( - IO.delay( - S3.listBucket(value.bucket, None) - .withAttributes(attributes) - .flatMapConcat { content => - S3.deleteObject(value.bucket, URLDecoder.decode(content.getKey, UTF_8.toString)) - .withAttributes(attributes) - } - .run() - ) - ) >> IO.fromFuture(IO.delay(S3.deleteBucket(value.bucket))).void - } -} diff --git a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageAccessSpecLocalStack.scala b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageAccessSpec.scala similarity index 63% rename from delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageAccessSpecLocalStack.scala rename to delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageAccessSpec.scala index 2a1188172d..a52da0f6c9 100644 --- a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageAccessSpecLocalStack.scala +++ b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageAccessSpec.scala @@ -4,29 +4,25 @@ import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StorageFixtures import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageRejection.StorageNotAccessible import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient.S3StorageClientImpl import ch.epfl.bluebrain.nexus.delta.sdk.actor.ActorSystemSetup -import ch.epfl.bluebrain.nexus.testkit.minio.LocalStackS3 import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite -import io.laserdisc.pure.s3.tagless.S3AsyncClientOp import munit.AnyFixture import software.amazon.awssdk.services.s3.model.{CreateBucketRequest, DeleteBucketRequest} import scala.concurrent.duration.{Duration, DurationInt} -class S3StorageAccessSpecLocalStack +class S3StorageAccessSpec extends NexusSuite with StorageFixtures - with LocalStackS3.Fixture + with LocalStackS3StorageClient.Fixture with ActorSystemSetup.Fixture { override def munitIOTimeout: Duration = 60.seconds override def munitFixtures: Seq[AnyFixture[_]] = List(localStackS3Client) - private lazy val s3Client: S3AsyncClientOp[IO] = localStackS3Client() - private lazy val s3StorageClient: S3StorageClient = new S3StorageClientImpl(s3Client) - private lazy val s3Access = new S3StorageAccess(s3StorageClient) + private lazy val (s3Client: S3StorageClient, _) = localStackS3Client() + private lazy val s3Access = new S3StorageAccess(s3Client) test("List objects in an existing bucket") { givenAnS3Bucket { bucket => @@ -40,8 +36,8 @@ class S3StorageAccessSpecLocalStack def givenAnS3Bucket(test: String => IO[Unit]): IO[Unit] = { val bucket = genString() - s3Client.createBucket(CreateBucketRequest.builder().bucket(bucket).build) >> + s3Client.underlyingClient.createBucket(CreateBucketRequest.builder().bucket(bucket).build) >> test(bucket) >> - s3Client.deleteBucket(DeleteBucketRequest.builder().bucket(bucket).build).void + s3Client.underlyingClient.deleteBucket(DeleteBucketRequest.builder().bucket(bucket).build).void } } diff --git a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageFetchSaveSpec.scala b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageFetchSaveSpec.scala new file mode 100644 index 0000000000..0ebf790eb8 --- /dev/null +++ b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageFetchSaveSpec.scala @@ -0,0 +1,94 @@ +package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3 + +import akka.actor.ActorSystem +import akka.http.scaladsl.model.HttpEntity +import cats.effect.IO +import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StorageFixtures +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.S3Storage +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageValue.S3StorageValue +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.AkkaSourceHelpers +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.permissions.{read, write} +import ch.epfl.bluebrain.nexus.delta.rdf.syntax.iriStringContextSyntax +import ch.epfl.bluebrain.nexus.delta.sdk.actor.ActorSystemSetup +import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef +import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite +import io.circe.Json +import munit.AnyFixture +import software.amazon.awssdk.services.s3.model.{CreateBucketRequest, DeleteBucketRequest, DeleteObjectRequest} + +import java.util.UUID +import scala.concurrent.duration.{Duration, DurationInt} +import scala.jdk.CollectionConverters.ListHasAsScala + +class S3StorageFetchSaveSpec + extends NexusSuite + with StorageFixtures + with ActorSystemSetup.Fixture + with LocalStackS3StorageClient.Fixture + with AkkaSourceHelpers { + + override def munitIOTimeout: Duration = 120.seconds + + override def munitFixtures: Seq[AnyFixture[_]] = List(localStackS3Client, actorSystem) + + private val uuid = UUID.fromString("8049ba90-7cc6-4de5-93a1-802c04200dcc") + implicit private val uuidf: UUIDF = UUIDF.fixed(uuid) + + private lazy val (s3StorageClient: S3StorageClient, _) = localStackS3Client() + implicit private lazy val as: ActorSystem = actorSystem() + + test("Save and fetch an object in a bucket") { + givenAnS3Bucket { bucket => + val s3Fetch = new S3StorageFetchFile(s3StorageClient, bucket) + val storageValue = S3StorageValue( + default = false, + algorithm = DigestAlgorithm.default, + bucket = bucket, + endpoint = None, + region = None, + readPermission = read, + writePermission = write, + maxFileSize = 20 + ) + val iri = iri"http://localhost/s3" + val project = ProjectRef.unsafe("org", "project") + val storage = S3Storage(iri, project, storageValue, Json.obj()) + val s3Save = new S3StorageSaveFile(s3StorageClient, storage) + + val filename = "myfile.txt" + val content = genString() + val entity = HttpEntity(content) + + val result = for { + attr <- s3Save.apply(filename, entity) + source <- s3Fetch.apply(attr.path) + } yield consume(source) + + assertIO(result, content) + } + } + + def givenAnS3Bucket(test: String => IO[Unit]): IO[Unit] = { + val bucket = genString() + s3StorageClient.underlyingClient.createBucket(CreateBucketRequest.builder().bucket(bucket).build) >> + test(bucket) >> + emptyBucket(bucket) >> + s3StorageClient.underlyingClient.deleteBucket(DeleteBucketRequest.builder().bucket(bucket).build).void + } + + def emptyBucket(bucket: String): IO[Unit] = + s3StorageClient + .listObjectsV2(bucket) + .flatMap { resp => + val keys: List[String] = resp.contents.asScala.map(_.key()).toList + keys.traverse(deleteObject(bucket, _)) + } + .void + + def deleteObject(bucket: String, key: String): IO[Unit] = + s3StorageClient.underlyingClient.deleteObject(DeleteObjectRequest.builder().bucket(bucket).key(key).build()).void +} diff --git a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveAndFetchFileSpec.scala b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveAndFetchFileSpec.scala deleted file mode 100644 index a63f002a6d..0000000000 --- a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveAndFetchFileSpec.scala +++ /dev/null @@ -1,100 +0,0 @@ -package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3 - -import akka.actor.ActorSystem -import akka.http.scaladsl.model.{HttpEntity, Uri} -import akka.testkit.TestKit -import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF -import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.Digest.ComputedDigest -import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes.FileAttributesOrigin.Client -import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileStorageMetadata -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StorageFixtures -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.S3Storage -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageValue.S3StorageValue -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.AkkaSourceHelpers -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.FetchFileRejection.FileNotFound -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.SaveFileRejection.ResourceAlreadyExists -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.MinioSpec._ -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.permissions.{read, write} -import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ -import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef -import ch.epfl.bluebrain.nexus.testkit.minio.MinioDocker -import ch.epfl.bluebrain.nexus.testkit.minio.MinioDocker._ -import ch.epfl.bluebrain.nexus.testkit.scalatest.ce.CatsEffectSpec -import io.circe.Json -import org.scalatest.{BeforeAndAfterAll, DoNotDiscover} -import software.amazon.awssdk.regions.Region - -import java.util.UUID - -@DoNotDiscover -class S3StorageSaveAndFetchFileSpec(docker: MinioDocker) - extends TestKit(ActorSystem("S3StorageSaveAndFetchFileSpec")) - with CatsEffectSpec - with AkkaSourceHelpers - with StorageFixtures - with BeforeAndAfterAll { - - private val iri = iri"http://localhost/s3" - private val uuid = UUID.fromString("8049ba90-7cc6-4de5-93a1-802c04200dcc") - implicit private val uuidf: UUIDF = UUIDF.fixed(uuid) - private val project = ProjectRef.unsafe("org", "project") - private val filename = "myfile.txt" - private val digest = - ComputedDigest(DigestAlgorithm.default, "e0ac3601005dfa1864f5392aabaf7d898b1b5bab854f1acb4491bcd806b76b0c") - - private var storageValue: S3StorageValue = _ - private var storage: S3Storage = _ - private var metadata: FileStorageMetadata = _ - - override protected def beforeAll(): Unit = { - super.beforeAll() - storageValue = S3StorageValue( - default = false, - algorithm = DigestAlgorithm.default, - bucket = "bucket2", - endpoint = Some(docker.hostConfig.endpoint), - region = Some(Region.EU_CENTRAL_1), - readPermission = read, - writePermission = write, - maxFileSize = 20 - ) - createBucket(storageValue).accepted - storage = S3Storage(iri, project, storageValue, Json.obj()) - metadata = FileStorageMetadata( - uuid, - 12, - digest, - Client, - s"http://bucket2.$VirtualHost:${docker.hostConfig.port}/org/project/8/0/4/9/b/a/9/0/myfile.txt", - Uri.Path("org/project/8/0/4/9/b/a/9/0/myfile.txt") - ) - } - - override protected def afterAll(): Unit = { - deleteBucket(storageValue).accepted - super.afterAll() - } - - "S3Storage operations" should { - val content = "file content" - val entity = HttpEntity(content) - - "save a file to a bucket" in { - storage.saveFile(config).apply(filename, entity).accepted shouldEqual metadata - } - - "fetch a file from a bucket" in { - val sourceFetched = storage.fetchFile(config).apply(metadata.path).accepted - consume(sourceFetched) shouldEqual content - } - - "fail fetching a file that does not exist" in { - storage.fetchFile(config).apply(Uri.Path("other.txt")).rejectedWith[FileNotFound] - } - - "fail attempting to save the same file again" in { - storage.saveFile(config).apply(filename, entity).rejectedWith[ResourceAlreadyExists] - } - } -} diff --git a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/minio/LocalStackS3.scala b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/minio/LocalStackS3.scala index 072d10d6ec..5c34f5bfa8 100644 --- a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/minio/LocalStackS3.scala +++ b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/minio/LocalStackS3.scala @@ -11,7 +11,7 @@ import software.amazon.awssdk.services.s3.S3AsyncClient object LocalStackS3 { val ServiceType = Service.S3 - private def resource(): Resource[IO, LocalStackV2Container] = { + def localstackS3(): Resource[IO, LocalStackV2Container] = { def acquire: IO[LocalStackV2Container] = IO.delay { val containerDef = LocalStackV2Container.Def(services = Seq(ServiceType)) @@ -23,20 +23,22 @@ object LocalStackS3 { Resource.make(acquire)(release) } - private def fs2ClientResource(): Resource[IO, S3AsyncClientOp[IO]] = resource().flatMap { container => - val endpoint = container.endpointOverride(LocalStackS3.ServiceType) + def fs2ClientFromLocalstack(localstack: LocalStackV2Container): Resource[IO, S3AsyncClientOp[IO]] = { + val endpoint = localstack.endpointOverride(LocalStackS3.ServiceType) Interpreter[IO].S3AsyncClientOpResource( S3AsyncClient .builder() - .credentialsProvider(container.staticCredentialsProvider) + .credentialsProvider(localstack.staticCredentialsProvider) .endpointOverride(endpoint) .forcePathStyle(true) - .region(container.region) + .region(localstack.region) ) } + def fs2Client(): Resource[IO, S3AsyncClientOp[IO]] = localstackS3().flatMap(fs2ClientFromLocalstack) + trait Fixture { self: CatsEffectSuite => - val localStackS3Client: IOFixture[S3AsyncClientOp[IO]] = ResourceSuiteLocalFixture("s3client", fs2ClientResource()) + val localStackS3Client: IOFixture[S3AsyncClientOp[IO]] = ResourceSuiteLocalFixture("s3client", fs2Client()) } } diff --git a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/files/S3StorageSpec.scala b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/files/S3StorageSpec.scala index 4bb5709ccb..a42c8f5a68 100644 --- a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/files/S3StorageSpec.scala +++ b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/files/S3StorageSpec.scala @@ -34,7 +34,7 @@ class S3StorageSpec extends StorageSpec { private val logoKey = "some/path/to/nexus-logo.png" val s3Endpoint: String = "http://s3.localhost.localstack.cloud:4566" - val s3BucketEndpoint: String = s"http://$bucket.s3.localhost.localstack.cloud:4566" + val s3BucketEndpoint: String = s"http://s3.localhost.localstack.cloud:4566/$bucket" private val credentialsProvider = (s3Config.accessKey, s3Config.secretKey) match { case (Some(ak), Some(sk)) => StaticCredentialsProvider.create(AwsBasicCredentials.create(ak, sk))