Skip to content

Commit

Permalink
Merge branch 'master' into ship-project-mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
olivergrabinski authored Apr 3, 2024
2 parents e356d2d + 7e5f9a0 commit b5b6866
Show file tree
Hide file tree
Showing 35 changed files with 319 additions and 243 deletions.
19 changes: 18 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ val declineVersion = "2.4.1"
val distageVersion = "1.2.7"
val doobieVersion = "1.0.0-RC5"
val fs2Version = "3.10.1"
val fs2AwsVersion = "6.1.1"
val googleAuthClientVersion = "1.35.0"
val handleBarsVersion = "4.4.0"
val hikariVersion = "5.1.0"
Expand Down Expand Up @@ -104,6 +105,8 @@ lazy val doobie = Seq(
)
lazy val fs2 = "co.fs2" %% "fs2-core" % fs2Version
lazy val fs2io = "co.fs2" %% "fs2-io" % fs2Version
lazy val fs2Aws = "io.laserdisc" %% "fs2-aws-core" % fs2AwsVersion
lazy val fs2AwsS3 = "io.laserdisc" %% "fs2-aws-s3" % fs2AwsVersion
lazy val googleAuthClient = "com.google.oauth-client" % "google-oauth-client" % googleAuthClientVersion
lazy val handleBars = "com.github.jknack" % "handlebars" % handleBarsVersion
lazy val jenaArq = "org.apache.jena" % "jena-arq" % jenaVersion
Expand Down Expand Up @@ -250,6 +253,8 @@ lazy val testkit = project
),
catsRetry,
doobiePostgres,
fs2Aws,
fs2AwsS3,
munit,
munitCatsEffect,
scalaTest,
Expand Down Expand Up @@ -582,7 +587,19 @@ lazy val storagePlugin = project
akkaTestKitTyped % Test,
akkaHttpTestKit % Test,
logback % Test
),
) ++ Seq(
fs2Aws,
fs2AwsS3
).map {
_ excludeAll (
ExclusionRule(organization = "org.typelevel", name = "cats-kernel_2.13"),
ExclusionRule(organization = "org.typelevel", name = "cats-core_2.13"),
ExclusionRule(organization = "org.typelevel", name = "cats-effect_2.13"),
ExclusionRule(organization = "com.chuusai", name = "shapeless_2.13"),
ExclusionRule(organization = "co.fs2", name = "fs2-core_2.13"),
ExclusionRule(organization = "co.fs2", name = "fs2-io_2.13")
)
},
buildInfoKeys := Seq[BuildInfoKey](version),
buildInfoPackage := "ch.epfl.bluebrain.nexus.delta.plugins.storage",
addCompilerPlugin(betterMonadicFor),
Expand Down
6 changes: 3 additions & 3 deletions delta/plugins/storage/src/main/resources/storage.conf
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ plugins.storage {
# the default digest algorithm
digest-algorithm = "SHA-256"
# the default endpoint of the current storage
#default-endpoint = "mybucket.eu-central-1.amazonaws.com"
default-endpoint = "https://s3.us-east-1.amazonaws.com"
# the access key for the default endpoint
#default-access-key = "my-key"
default-access-key = "my-key"
# the secret key for the default endpoint
#default-secret-key = "my-secret-key"
default-secret-key = "my-secret-key"
# the default permission required in order to download a file from an S3 storage
default-read-permission = "resources/read"
# the default permission required in order to upload a file to a S3 storage
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage

import akka.actor
import akka.actor.typed.ActorSystem
import cats.effect.{Clock, IO}
import ch.epfl.bluebrain.nexus.delta.kernel.utils.{ClasspathResourceLoader, TransactionalFileCopier, UUIDF}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.config.ElasticSearchViewsConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.{FileAttributesUpdateStream, Files}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.Files.FilesLog
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.batch.{BatchCopy, BatchFiles}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.contexts.{files => fileCtxId}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.routes.{BatchFilesRoutes, FilesRoutes}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.schemas.{files => filesSchemaId}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.{FileAttributesUpdateStream, Files}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.{ShowFileLocation, StorageTypeConfig}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.contexts.{storages => storageCtxId, storagesMetadata => storageMetaCtxId}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageAccess
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.disk.DiskStorageCopyFiles
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.RemoteDiskStorageCopyFiles
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.routes.StoragesRoutes
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.schemas.{storage => storagesSchemaId}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.{StorageDeletionTask, StoragePermissionProviderImpl, Storages, StoragesStatistics}
Expand All @@ -43,9 +43,9 @@ import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ApiMappings
import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverContextResolution
import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseEncoder
import ch.epfl.bluebrain.nexus.delta.sourcing.{ScopedEventLog, Transactors}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Supervisor
import ch.epfl.bluebrain.nexus.delta.sourcing.{ScopedEventLog, Transactors}
import com.typesafe.config.Config
import izumi.distage.model.definition.{Id, ModuleDef}

Expand All @@ -66,28 +66,30 @@ class StoragePluginModule(priority: Int) extends ModuleDef {
HttpClient.noRetry(compression = false)(as.classicSystem)
}

make[S3StorageClient].fromResource { (cfg: StoragePluginConfig) =>
S3StorageClient.resource(cfg.storages.storageTypeConfig.amazon)
}

make[Storages]
.fromEffect {
(
fetchContext: FetchContext,
contextResolution: ResolverContextResolution,
remoteDiskStorageClient: RemoteDiskStorageClient,
s3StorageClient: S3StorageClient,
permissions: Permissions,
xas: Transactors,
cfg: StoragePluginConfig,
serviceAccount: ServiceAccount,
api: JsonLdApi,
clock: Clock[IO],
uuidF: UUIDF,
as: ActorSystem[Nothing]
uuidF: UUIDF
) =>
implicit val classicAs: actor.ActorSystem = as.classicSystem
implicit val storageTypeConfig: StorageTypeConfig = cfg.storages.storageTypeConfig
Storages(
fetchContext,
contextResolution,
permissions.fetchPermissionSet,
StorageAccess.apply(_, _, remoteDiskStorageClient, storageTypeConfig),
StorageAccess.mk(remoteDiskStorageClient, s3StorageClient),
xas,
cfg.storages,
serviceAccount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageEvent
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageRejection._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageValue.DiskStorageValue
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageAccess
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.schemas.{storage => storageSchema}
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
Expand Down Expand Up @@ -294,8 +295,7 @@ final class Storages private (

object Storages {

type StorageLog = ScopedEventLog[Iri, StorageState, StorageCommand, StorageEvent, StorageRejection]
type StorageAccess = (Iri, StorageValue) => IO[Unit]
type StorageLog = ScopedEventLog[Iri, StorageState, StorageCommand, StorageEvent, StorageRejection]

/**
* The storage entity type.
Expand Down Expand Up @@ -369,11 +369,11 @@ object Storages {
def isDescendantOrEqual(target: AbsolutePath, parent: AbsolutePath): Boolean =
target == parent || target.value.descendantOf(parent.value)

def verifyAllowedDiskVolume(id: Iri, value: StorageValue): IO[Unit] =
def verifyAllowedDiskVolume(value: StorageValue): IO[Unit] =
value match {
case d: DiskStorageValue if !config.disk.allowedVolumes.exists(isDescendantOrEqual(d.volume, _)) =>
val err = s"Volume '${d.volume}' not allowed. Allowed volumes: '${config.disk.allowedVolumes.mkString(",")}'"
IO.raiseError(StorageNotAccessible(id, err))
IO.raiseError(StorageNotAccessible(err))
case _ => IO.unit
}

Expand All @@ -386,8 +386,8 @@ object Storages {
for {
value <- IO.fromOption(fields.toValue(config))(InvalidStorageType(id, fields.tpe, allowedStorageTypes))
_ <- validatePermissions(fields)
_ <- access(id, value)
_ <- verifyAllowedDiskVolume(id, value)
_ <- access(value)
_ <- verifyAllowedDiskVolume(value)
_ <- validateFileSize(id, fields.maxFileSize, value.maxFileSize)
} yield value

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ object StoragesConfig {
*/
final case class S3StorageConfig(
digestAlgorithm: DigestAlgorithm,
defaultEndpoint: Option[Uri],
defaultAccessKey: Option[Secret[String]],
defaultSecretKey: Option[Secret[String]],
defaultEndpoint: Uri,
defaultAccessKey: Secret[String],
defaultSecretKey: Secret[String],
defaultReadPermission: Permission,
defaultWritePermission: Permission,
showLocation: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ object StorageFields {
default,
cfg.digestAlgorithm,
bucket,
endpoint.orElse(cfg.defaultEndpoint),
endpoint.orElse(Some(cfg.defaultEndpoint)),
region,
readPermission.getOrElse(cfg.defaultReadPermission),
writePermission.getOrElse(cfg.defaultWritePermission),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ object StorageRejection {
* @param id
* the storage identifier
*/
final case class StorageNotAccessible(id: Iri, details: String)
extends StorageRejection(s"Storage '$id' not accessible.")
final case class StorageNotAccessible(details: String) extends StorageRejection(s"Storage not accessible: $details")

/**
* Signals an error creating/updating a storage with a wrong maxFileSize
Expand Down Expand Up @@ -165,7 +164,7 @@ object StorageRejection {
val tpe = ClassUtils.simpleName(r)
val obj = JsonObject(keywords.tpe -> tpe.asJson, "reason" -> r.reason.asJson)
r match {
case StorageNotAccessible(_, details) => obj.add("details", details.asJson)
case StorageNotAccessible(details) => obj.add("details", details.asJson)
case IncorrectRev(provided, expected) => obj.add("provided", provided.asJson).add("expected", expected.asJson)
case _: StorageNotFound => obj.add(keywords.tpe, "ResourceNotFound".asJson)
case _ => obj
Expand All @@ -183,7 +182,7 @@ object StorageRejection {
case ResourceAlreadyExists(_, _) => StatusCodes.Conflict
case IncorrectRev(_, _) => StatusCodes.Conflict
case FetchByTagNotSupported(_) => StatusCodes.BadRequest
case StorageNotAccessible(_, _) => StatusCodes.BadRequest
case StorageNotAccessible(_) => StatusCodes.BadRequest
case _ => StatusCodes.BadRequest
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import akka.stream.alpakka.s3.{ApiVersion, MemoryBufferType}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.Digest
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.JsonLdContext.keywords
import ch.epfl.bluebrain.nexus.delta.sdk.implicits._
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.model.Permission
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef}
import io.circe.generic.extras.Configuration
Expand All @@ -15,7 +16,6 @@ import io.circe.{Codec, Decoder, Encoder}
import software.amazon.awssdk.auth.credentials.{AnonymousCredentialsProvider, AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.regions.providers.AwsRegionProvider
import ch.epfl.bluebrain.nexus.delta.sdk.implicits._

import java.io.File
import java.nio.file.Path
Expand Down Expand Up @@ -161,14 +161,12 @@ object StorageValue {
def alpakkaSettings(config: StorageTypeConfig): s3.S3Settings = {

val keys = for {
cfg <- config.amazon
accessKey <- cfg.defaultAccessKey
secretKey <- cfg.defaultSecretKey
} yield accessKey -> secretKey
cfg <- config.amazon
} yield cfg.defaultAccessKey.value -> cfg.defaultSecretKey.value

val credsProvider = keys match {
case Some((accessKey, secretKey)) =>
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey.value, secretKey.value))
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey))
case _ =>
StaticCredentialsProvider.create(AnonymousCredentialsProvider.create().resolveCredentials())
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations

import akka.actor.ActorSystem
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageValue
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageValue.{DiskStorageValue, RemoteDiskStorageValue, S3StorageValue}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.disk.DiskStorageAccess
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.RemoteDiskStorageAccess
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.S3StorageAccess
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient

private[operations] trait StorageAccess {

type Storage <: StorageValue
trait StorageAccess {

/**
* Checks whether the system has access to the passed ''storage''
Expand All @@ -22,20 +18,16 @@ private[operations] trait StorageAccess {
* a [[Unit]] if access has been verified successfully or signals an error [[StorageNotAccessible]] with the
* details about why the storage is not accessible
*/
def apply(id: Iri, storage: Storage): IO[Unit]
def apply(storage: StorageValue): IO[Unit]
}

object StorageAccess {

final private[storage] def apply(
id: Iri,
storage: StorageValue,
client: RemoteDiskStorageClient,
config: StorageTypeConfig
)(implicit as: ActorSystem): IO[Unit] =
storage match {
case storage: DiskStorageValue => DiskStorageAccess(id, storage)
case storage: S3StorageValue => new S3StorageAccess(config).apply(id, storage)
case storage: RemoteDiskStorageValue => new RemoteDiskStorageAccess(client).apply(id, storage)
}
final private[storage] def mk(
remoteStorageClient: RemoteDiskStorageClient,
s3Client: S3StorageClient
): StorageAccess = {
case s: DiskStorageValue => DiskStorageAccess(s)
case s: S3StorageValue => new S3StorageAccess(s3Client).apply(s.bucket)
case s: RemoteDiskStorageValue => new RemoteDiskStorageAccess(remoteStorageClient).apply(s)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,15 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.disk
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageRejection.StorageNotAccessible
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageValue.DiskStorageValue
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageAccess
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri

import java.nio.file.Files

object DiskStorageAccess extends StorageAccess {
override type Storage = DiskStorageValue
object DiskStorageAccess {

override def apply(id: Iri, storage: DiskStorageValue): IO[Unit] = {
def apply(storage: DiskStorageValue): IO[Unit] = {

def failWhen(condition: Boolean, err: => String) = {
IO.raiseWhen(condition)(StorageNotAccessible(id, err))
IO.raiseWhen(condition)(StorageNotAccessible(err))
}

for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,16 @@ import cats.effect.IO
import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageRejection.StorageNotAccessible
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageValue.RemoteDiskStorageValue
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageAccess
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientError

class RemoteDiskStorageAccess(client: RemoteDiskStorageClient) extends StorageAccess {
override type Storage = RemoteDiskStorageValue
class RemoteDiskStorageAccess(client: RemoteDiskStorageClient) {

override def apply(id: Iri, storage: RemoteDiskStorageValue): IO[Unit] = {
def apply(storage: RemoteDiskStorageValue): IO[Unit] = {
client
.exists(storage.folder)
.adaptError { case err: HttpClientError =>
StorageNotAccessible(
id,
err.details.fold(s"Folder '${storage.folder}' does not exist")(d => s"${err.reason}: $d")
)
}
Expand Down
Loading

0 comments on commit b5b6866

Please sign in to comment.