Skip to content

Commit

Permalink
Replace alpakka S3 with fs2-aws and minio with localstack (#4852)
Browse files Browse the repository at this point in the history
* Read file using fs2-aws

* Use S3 storage client to fetch files

* Use same group size as the previous implementation

* WIP - multipart S3 upload

* WIP - check object existence, fetch metadata for file size, refactor

* Use S3 base endpoint for absolute path

* Remove alpakka s3 dependency

* Tidy

* Tidy integration test debugging

* Don't error when uploading an empty file (calling .lastOrError)

* Fix bug in file size calculation

---------

Co-authored-by: Daniel Bell <[email protected]>
Co-authored-by: Daniel Bell <[email protected]>
  • Loading branch information
3 people authored Apr 9, 2024
1 parent ea14433 commit 96a1c94
Show file tree
Hide file tree
Showing 19 changed files with 370 additions and 312 deletions.
7 changes: 0 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ lazy val akkaTestKit = "com.typesafe.akka" %% "akka
lazy val akkaTestKitTyped = "com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion
lazy val alpakkaFile = "com.lightbend.akka" %% "akka-stream-alpakka-file" % alpakkaVersion
lazy val alpakkaSse = "com.lightbend.akka" %% "akka-stream-alpakka-sse" % alpakkaVersion
lazy val alpakkaS3 = "com.lightbend.akka" %% "akka-stream-alpakka-s3" % alpakkaVersion
lazy val apacheCompress = "org.apache.commons" % "commons-compress" % apacheCompressVersion
lazy val apacheIO = "commons-io" % "commons-io" % apacheIOVersion
lazy val awsSdk = "software.amazon.awssdk" % "s3" % awsSdkVersion
Expand Down Expand Up @@ -576,12 +575,6 @@ lazy val storagePlugin = project
name := "delta-storage-plugin",
moduleName := "delta-storage-plugin",
libraryDependencies ++= Seq(
alpakkaS3 excludeAll (
ExclusionRule(organization = "com.typesafe.akka", name = "akka-stream_2.13"),
ExclusionRule(organization = "com.typesafe.akka", name = "akka-http_2.13"),
ExclusionRule(organization = "com.typesafe.akka", name = "akka-http-xml_2.13"),
ExclusionRule(organization = "org.slf4j", name = "slf4j-api")
),
kamonAkkaHttp % Provided,
akkaSlf4j % Test,
akkaTestKitTyped % Test,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ class StoragePluginModule(priority: Int) extends ModuleDef {
make[Files].from {
(
cfg: StoragePluginConfig,
storageTypeConfig: StorageTypeConfig,
aclCheck: AclCheck,
fetchContext: FetchContext,
storages: Storages,
Expand All @@ -170,17 +169,18 @@ class StoragePluginModule(priority: Int) extends ModuleDef {
clock: Clock[IO],
uuidF: UUIDF,
as: ActorSystem[Nothing],
remoteDiskStorageClient: RemoteDiskStorageClient
remoteDiskStorageClient: RemoteDiskStorageClient,
s3Client: S3StorageClient
) =>
Files(
fetchContext,
aclCheck,
storages,
storagesStatistics,
xas,
storageTypeConfig,
cfg.files,
remoteDiskStorageClient,
s3Client,
clock
)(
uuidF,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileEvent._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.schemas.{files => fileSchema}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageRejection.{StorageFetchRejection, StorageIsDeprecated}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.{DigestAlgorithm, Storage, StorageRejection, StorageType}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.{FetchAttributeRejection, FetchFileRejection, SaveFileRejection}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.{FetchStorage, Storages, StoragesStatistics}
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.ContextValue
Expand Down Expand Up @@ -56,7 +56,7 @@ final class Files(
storages: FetchStorage,
storagesStatistics: StoragesStatistics,
remoteDiskStorageClient: RemoteDiskStorageClient,
config: StorageTypeConfig
s3Client: S3StorageClient
)(implicit
uuidF: UUIDF,
system: ClassicActorSystem
Expand Down Expand Up @@ -392,7 +392,7 @@ final class Files(
}.span("fetchFileContent")

private def fetchFile(storage: Storage, attr: FileAttributes, fileId: Iri): IO[AkkaSource] =
FetchFile(storage, remoteDiskStorageClient, config)
FetchFile(storage, remoteDiskStorageClient, s3Client)
.apply(attr)
.adaptError { case e: FetchFileRejection =>
FetchRejection(fileId, storage.id, e)
Expand Down Expand Up @@ -504,7 +504,7 @@ final class Files(
metadata: FileDescription,
source: BodyPartEntity
): IO[FileStorageMetadata] =
SaveFile(storage, remoteDiskStorageClient, config)
SaveFile(storage, remoteDiskStorageClient, s3Client)
.apply(metadata.filename, source)
.adaptError { case e: SaveFileRejection => SaveRejection(iri, storage.id, e) }

Expand Down Expand Up @@ -762,9 +762,9 @@ object Files {
storages: FetchStorage,
storagesStatistics: StoragesStatistics,
xas: Transactors,
storageTypeConfig: StorageTypeConfig,
config: FilesConfig,
remoteDiskStorageClient: RemoteDiskStorageClient,
s3Client: S3StorageClient,
clock: Clock[IO]
)(implicit
uuidF: UUIDF,
Expand All @@ -779,7 +779,7 @@ object Files {
storages,
storagesStatistics,
remoteDiskStorageClient,
storageTypeConfig
s3Client
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model

import akka.actor.ActorSystem
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.Metadata
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageValue.{DiskStorageValue, RemoteDiskStorageValue, S3StorageValue}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.disk.{DiskStorageFetchFile, DiskStorageSaveFile}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{S3StorageFetchFile, S3StorageSaveFile}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.{contexts, Storages}
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
Expand Down Expand Up @@ -97,11 +97,11 @@ object Storage {
override val default: Boolean = value.default
override val storageValue: StorageValue = value

def fetchFile(config: StorageTypeConfig)(implicit as: ActorSystem): FetchFile =
new S3StorageFetchFile(value, config)
def fetchFile(client: S3StorageClient): FetchFile =
new S3StorageFetchFile(client, value.bucket)

def saveFile(config: StorageTypeConfig)(implicit as: ActorSystem, uuidf: UUIDF): SaveFile =
new S3StorageSaveFile(this, config)
def saveFile(s3StorageClient: S3StorageClient)(implicit as: ActorSystem, uuidf: UUIDF): SaveFile =
new S3StorageSaveFile(s3StorageClient, this)
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model

import akka.http.scaladsl.model.Uri
import akka.stream.alpakka.s3
import akka.stream.alpakka.s3.{ApiVersion, MemoryBufferType}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.Digest
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.JsonLdContext.keywords
import ch.epfl.bluebrain.nexus.delta.sdk.implicits._
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.model.Permission
Expand All @@ -13,9 +10,7 @@ import io.circe.generic.extras.Configuration
import io.circe.generic.extras.semiauto.{deriveConfiguredCodec, deriveConfiguredEncoder}
import io.circe.syntax._
import io.circe.{Codec, Decoder, Encoder}
import software.amazon.awssdk.auth.credentials.{AnonymousCredentialsProvider, AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.regions.providers.AwsRegionProvider

import java.io.File
import java.nio.file.Path
Expand Down Expand Up @@ -146,38 +141,6 @@ object StorageValue {

override val tpe: StorageType = StorageType.S3Storage
override val capacity: Option[Long] = None

def address(bucket: String): Uri =
endpoint match {
case Some(host) if host.scheme.trim.isEmpty => Uri(s"https://$bucket.$host")
case Some(e) => e.withHost(s"$bucket.${e.authority.host}")
case None => region.fold(s"https://$bucket.s3.amazonaws.com")(r => s"https://$bucket.s3.$r.amazonaws.com")
}

/**
* @return
* these settings converted to an instance of [[akka.stream.alpakka.s3.S3Settings]]
*/
def alpakkaSettings(config: StorageTypeConfig): s3.S3Settings = {

val keys = for {
cfg <- config.amazon
} yield cfg.defaultAccessKey.value -> cfg.defaultSecretKey.value

val credsProvider = keys match {
case Some((accessKey, secretKey)) =>
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey))
case _ =>
StaticCredentialsProvider.create(AnonymousCredentialsProvider.create().resolveCredentials())
}

val regionProvider: AwsRegionProvider = new AwsRegionProvider {
val getRegion: Region = region.getOrElse(Region.US_EAST_1)
}

s3.S3Settings(MemoryBufferType, credsProvider, regionProvider, ApiVersion.ListBucketVersion2)
.withEndpointUrl(address(bucket).toString())
}
}

object S3StorageValue {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations

import akka.actor.ActorSystem
import akka.http.scaladsl.model.Uri
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource

trait FetchFile {
Expand All @@ -33,13 +32,11 @@ object FetchFile {
/**
* Construct a [[FetchFile]] from the given ''storage''.
*/
def apply(storage: Storage, client: RemoteDiskStorageClient, config: StorageTypeConfig)(implicit
as: ActorSystem
): FetchFile =
def apply(storage: Storage, remoteClient: RemoteDiskStorageClient, s3Client: S3StorageClient): FetchFile =
storage match {
case storage: Storage.DiskStorage => storage.fetchFile
case storage: Storage.S3Storage => storage.fetchFile(config)
case storage: Storage.RemoteDiskStorage => storage.fetchFile(client)
case storage: Storage.S3Storage => storage.fetchFile(s3Client)
case storage: Storage.RemoteDiskStorage => storage.fetchFile(remoteClient)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.Digest.ComputedDigest
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileStorageMetadata
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.{DigestAlgorithm, Storage}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef

import java.util.UUID
Expand All @@ -34,13 +34,13 @@ object SaveFile {
/**
* Construct a [[SaveFile]] from the given ''storage''.
*/
def apply(storage: Storage, client: RemoteDiskStorageClient, config: StorageTypeConfig)(implicit
def apply(storage: Storage, client: RemoteDiskStorageClient, s3Client: S3StorageClient)(implicit
as: ActorSystem,
uuidf: UUIDF
): SaveFile =
storage match {
case storage: Storage.DiskStorage => storage.saveFile
case storage: Storage.S3Storage => storage.saveFile(config)
case storage: Storage.S3Storage => storage.saveFile(s3Client)
case storage: Storage.RemoteDiskStorage => storage.saveFile(client)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,45 +1,37 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3

import akka.actor.ActorSystem
import akka.http.scaladsl.model.Uri
import akka.stream.alpakka.s3.scaladsl.S3
import akka.stream.alpakka.s3.{S3Attributes, S3Exception}
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.util.ByteString
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageValue.S3StorageValue
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.FetchFile
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.FetchFileRejection._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource
import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter

import java.net.URLDecoder
import java.nio.charset.StandardCharsets.UTF_8
import scala.concurrent.duration.DurationInt

final class S3StorageFetchFile(value: S3StorageValue, config: StorageTypeConfig)(implicit
as: ActorSystem
) extends FetchFile {

private val s3Attributes = S3Attributes.settings(value.alpakkaSettings(config))
final class S3StorageFetchFile(client: S3StorageClient, bucket: String) extends FetchFile {

override def apply(attributes: FileAttributes): IO[AkkaSource] =
apply(attributes.path)

override def apply(path: Uri.Path): IO[AkkaSource] =
IO.fromFuture(
IO.delay(
S3.download(value.bucket, URLDecoder.decode(path.toString, UTF_8.toString))
.withAttributes(s3Attributes)
.runWith(Sink.head)
override def apply(path: Uri.Path): IO[AkkaSource] = {
IO.delay(
Source.fromGraph(
StreamConverter(
client
.readFile(bucket, URLDecoder.decode(path.toString, UTF_8.toString))
.groupWithin(8192, 1.second)
.map(bytes => ByteString(bytes.toArray))
)
)
).redeemWith(
{
case err: S3Exception => IO.raiseError(UnexpectedFetchError(path.toString, err.toString()))
case err => IO.raiseError(UnexpectedFetchError(path.toString, err.getMessage))
},
{
case Some((source, _)) => IO.pure(source: AkkaSource)
case None => IO.raiseError(FileNotFound(path.toString()))
}
)
).recoverWith { err =>
IO.raiseError(UnexpectedFetchError(path.toString, err.getMessage))
}
}
}
Loading

0 comments on commit 96a1c94

Please sign in to comment.