Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace alpakka S3 with fs2-aws and minio with localstack #4852

Merged
merged 12 commits into from
Apr 9, 2024
7 changes: 0 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ lazy val akkaTestKit = "com.typesafe.akka" %% "akka
lazy val akkaTestKitTyped = "com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion
lazy val alpakkaFile = "com.lightbend.akka" %% "akka-stream-alpakka-file" % alpakkaVersion
lazy val alpakkaSse = "com.lightbend.akka" %% "akka-stream-alpakka-sse" % alpakkaVersion
lazy val alpakkaS3 = "com.lightbend.akka" %% "akka-stream-alpakka-s3" % alpakkaVersion
lazy val apacheCompress = "org.apache.commons" % "commons-compress" % apacheCompressVersion
lazy val apacheIO = "commons-io" % "commons-io" % apacheIOVersion
lazy val awsSdk = "software.amazon.awssdk" % "s3" % awsSdkVersion
Expand Down Expand Up @@ -576,12 +575,6 @@ lazy val storagePlugin = project
name := "delta-storage-plugin",
moduleName := "delta-storage-plugin",
libraryDependencies ++= Seq(
alpakkaS3 excludeAll (
ExclusionRule(organization = "com.typesafe.akka", name = "akka-stream_2.13"),
ExclusionRule(organization = "com.typesafe.akka", name = "akka-http_2.13"),
ExclusionRule(organization = "com.typesafe.akka", name = "akka-http-xml_2.13"),
ExclusionRule(organization = "org.slf4j", name = "slf4j-api")
),
kamonAkkaHttp % Provided,
akkaSlf4j % Test,
akkaTestKitTyped % Test,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ class StoragePluginModule(priority: Int) extends ModuleDef {
make[Files].from {
(
cfg: StoragePluginConfig,
storageTypeConfig: StorageTypeConfig,
aclCheck: AclCheck,
fetchContext: FetchContext,
storages: Storages,
Expand All @@ -170,17 +169,18 @@ class StoragePluginModule(priority: Int) extends ModuleDef {
clock: Clock[IO],
uuidF: UUIDF,
as: ActorSystem[Nothing],
remoteDiskStorageClient: RemoteDiskStorageClient
remoteDiskStorageClient: RemoteDiskStorageClient,
s3Client: S3StorageClient
) =>
Files(
fetchContext,
aclCheck,
storages,
storagesStatistics,
xas,
storageTypeConfig,
cfg.files,
remoteDiskStorageClient,
s3Client,
clock
)(
uuidF,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileEvent._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.schemas.{files => fileSchema}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageRejection.{StorageFetchRejection, StorageIsDeprecated}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.{DigestAlgorithm, Storage, StorageRejection, StorageType}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.{FetchAttributeRejection, FetchFileRejection, SaveFileRejection}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.{FetchStorage, Storages, StoragesStatistics}
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.ContextValue
Expand Down Expand Up @@ -56,7 +56,7 @@ final class Files(
storages: FetchStorage,
storagesStatistics: StoragesStatistics,
remoteDiskStorageClient: RemoteDiskStorageClient,
config: StorageTypeConfig
s3Client: S3StorageClient
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't get round to it yet but I was planning to pull out the file operations like SaveFile - it will need some refactoring to the interface. The clients should really be abstracted away from here

)(implicit
uuidF: UUIDF,
system: ClassicActorSystem
Expand Down Expand Up @@ -392,7 +392,7 @@ final class Files(
}.span("fetchFileContent")

private def fetchFile(storage: Storage, attr: FileAttributes, fileId: Iri): IO[AkkaSource] =
FetchFile(storage, remoteDiskStorageClient, config)
FetchFile(storage, remoteDiskStorageClient, s3Client)
.apply(attr)
.adaptError { case e: FetchFileRejection =>
FetchRejection(fileId, storage.id, e)
Expand Down Expand Up @@ -504,7 +504,7 @@ final class Files(
metadata: FileDescription,
source: BodyPartEntity
): IO[FileStorageMetadata] =
SaveFile(storage, remoteDiskStorageClient, config)
SaveFile(storage, remoteDiskStorageClient, s3Client)
.apply(metadata.filename, source)
.adaptError { case e: SaveFileRejection => SaveRejection(iri, storage.id, e) }

Expand Down Expand Up @@ -762,9 +762,9 @@ object Files {
storages: FetchStorage,
storagesStatistics: StoragesStatistics,
xas: Transactors,
storageTypeConfig: StorageTypeConfig,
config: FilesConfig,
remoteDiskStorageClient: RemoteDiskStorageClient,
s3Client: S3StorageClient,
clock: Clock[IO]
)(implicit
uuidF: UUIDF,
Expand All @@ -779,7 +779,7 @@ object Files {
storages,
storagesStatistics,
remoteDiskStorageClient,
storageTypeConfig
s3Client
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model

import akka.actor.ActorSystem
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.Metadata
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageValue.{DiskStorageValue, RemoteDiskStorageValue, S3StorageValue}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.disk.{DiskStorageFetchFile, DiskStorageSaveFile}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{S3StorageFetchFile, S3StorageSaveFile}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.{contexts, Storages}
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
Expand Down Expand Up @@ -97,11 +97,11 @@ object Storage {
override val default: Boolean = value.default
override val storageValue: StorageValue = value

def fetchFile(config: StorageTypeConfig)(implicit as: ActorSystem): FetchFile =
new S3StorageFetchFile(value, config)
def fetchFile(client: S3StorageClient): FetchFile =
new S3StorageFetchFile(client, value.bucket)

def saveFile(config: StorageTypeConfig)(implicit as: ActorSystem, uuidf: UUIDF): SaveFile =
new S3StorageSaveFile(this, config)
def saveFile(s3StorageClient: S3StorageClient)(implicit as: ActorSystem, uuidf: UUIDF): SaveFile =
new S3StorageSaveFile(s3StorageClient, this)
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model

import akka.http.scaladsl.model.Uri
import akka.stream.alpakka.s3
import akka.stream.alpakka.s3.{ApiVersion, MemoryBufferType}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.Digest
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.JsonLdContext.keywords
import ch.epfl.bluebrain.nexus.delta.sdk.implicits._
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.model.Permission
Expand All @@ -13,9 +10,7 @@ import io.circe.generic.extras.Configuration
import io.circe.generic.extras.semiauto.{deriveConfiguredCodec, deriveConfiguredEncoder}
import io.circe.syntax._
import io.circe.{Codec, Decoder, Encoder}
import software.amazon.awssdk.auth.credentials.{AnonymousCredentialsProvider, AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.regions.providers.AwsRegionProvider

import java.io.File
import java.nio.file.Path
Expand Down Expand Up @@ -146,38 +141,6 @@ object StorageValue {

override val tpe: StorageType = StorageType.S3Storage
override val capacity: Option[Long] = None

def address(bucket: String): Uri =
endpoint match {
case Some(host) if host.scheme.trim.isEmpty => Uri(s"https://$bucket.$host")
case Some(e) => e.withHost(s"$bucket.${e.authority.host}")
case None => region.fold(s"https://$bucket.s3.amazonaws.com")(r => s"https://$bucket.s3.$r.amazonaws.com")
}

/**
* @return
* these settings converted to an instance of [[akka.stream.alpakka.s3.S3Settings]]
*/
def alpakkaSettings(config: StorageTypeConfig): s3.S3Settings = {

val keys = for {
cfg <- config.amazon
} yield cfg.defaultAccessKey.value -> cfg.defaultSecretKey.value

val credsProvider = keys match {
case Some((accessKey, secretKey)) =>
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey))
case _ =>
StaticCredentialsProvider.create(AnonymousCredentialsProvider.create().resolveCredentials())
}

val regionProvider: AwsRegionProvider = new AwsRegionProvider {
val getRegion: Region = region.getOrElse(Region.US_EAST_1)
}

s3.S3Settings(MemoryBufferType, credsProvider, regionProvider, ApiVersion.ListBucketVersion2)
.withEndpointUrl(address(bucket).toString())
}
}

object S3StorageValue {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations

import akka.actor.ActorSystem
import akka.http.scaladsl.model.Uri
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource

trait FetchFile {
Expand All @@ -33,13 +32,11 @@ object FetchFile {
/**
* Construct a [[FetchFile]] from the given ''storage''.
*/
def apply(storage: Storage, client: RemoteDiskStorageClient, config: StorageTypeConfig)(implicit
as: ActorSystem
): FetchFile =
def apply(storage: Storage, remoteClient: RemoteDiskStorageClient, s3Client: S3StorageClient): FetchFile =
storage match {
case storage: Storage.DiskStorage => storage.fetchFile
case storage: Storage.S3Storage => storage.fetchFile(config)
case storage: Storage.RemoteDiskStorage => storage.fetchFile(client)
case storage: Storage.S3Storage => storage.fetchFile(s3Client)
case storage: Storage.RemoteDiskStorage => storage.fetchFile(remoteClient)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.Digest.ComputedDigest
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileStorageMetadata
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.{DigestAlgorithm, Storage}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef

import java.util.UUID
Expand All @@ -34,13 +34,13 @@ object SaveFile {
/**
* Construct a [[SaveFile]] from the given ''storage''.
*/
def apply(storage: Storage, client: RemoteDiskStorageClient, config: StorageTypeConfig)(implicit
def apply(storage: Storage, client: RemoteDiskStorageClient, s3Client: S3StorageClient)(implicit
as: ActorSystem,
uuidf: UUIDF
): SaveFile =
storage match {
case storage: Storage.DiskStorage => storage.saveFile
case storage: Storage.S3Storage => storage.saveFile(config)
case storage: Storage.S3Storage => storage.saveFile(s3Client)
case storage: Storage.RemoteDiskStorage => storage.saveFile(client)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,45 +1,37 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3

import akka.actor.ActorSystem
import akka.http.scaladsl.model.Uri
import akka.stream.alpakka.s3.scaladsl.S3
import akka.stream.alpakka.s3.{S3Attributes, S3Exception}
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.util.ByteString
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageValue.S3StorageValue
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.FetchFile
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.FetchFileRejection._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource
import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter

import java.net.URLDecoder
import java.nio.charset.StandardCharsets.UTF_8
import scala.concurrent.duration.DurationInt

final class S3StorageFetchFile(value: S3StorageValue, config: StorageTypeConfig)(implicit
as: ActorSystem
) extends FetchFile {

private val s3Attributes = S3Attributes.settings(value.alpakkaSettings(config))
final class S3StorageFetchFile(client: S3StorageClient, bucket: String) extends FetchFile {

override def apply(attributes: FileAttributes): IO[AkkaSource] =
apply(attributes.path)

override def apply(path: Uri.Path): IO[AkkaSource] =
IO.fromFuture(
IO.delay(
S3.download(value.bucket, URLDecoder.decode(path.toString, UTF_8.toString))
.withAttributes(s3Attributes)
.runWith(Sink.head)
override def apply(path: Uri.Path): IO[AkkaSource] = {
IO.delay(
Source.fromGraph(
StreamConverter(
client
.readFile(bucket, URLDecoder.decode(path.toString, UTF_8.toString))
.groupWithin(8192, 1.second)
.map(bytes => ByteString(bytes.toArray))
)
)
).redeemWith(
{
case err: S3Exception => IO.raiseError(UnexpectedFetchError(path.toString, err.toString()))
case err => IO.raiseError(UnexpectedFetchError(path.toString, err.getMessage))
},
{
case Some((source, _)) => IO.pure(source: AkkaSource)
case None => IO.raiseError(FileNotFound(path.toString()))
}
)
).recoverWith { err =>
IO.raiseError(UnexpectedFetchError(path.toString, err.getMessage))
}
}
}
Loading