Skip to content

Commit

Permalink
Migrate archives to Cats Effect (#4352)
Browse files Browse the repository at this point in the history
* Migrate archives to Cats Effect

---------

Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas authored Oct 11, 2023
1 parent 7e7c822 commit cfa7f5e
Show file tree
Hide file tree
Showing 28 changed files with 443 additions and 351 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.plugin.PluginDef
import ch.epfl.bluebrain.nexus.delta.sdk.projects.{OwnerPermissionsScopeInitialization, ProjectsConfig}
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{DatabaseConfig, ProjectionConfig, QueryConfig}
import ch.epfl.bluebrain.nexus.delta.sourcing.execution.EvaluationExecution
import ch.megard.akka.http.cors.scaladsl.settings.CorsSettings
import com.typesafe.config.Config
import izumi.distage.model.definition.{Id, ModuleDef}
Expand Down Expand Up @@ -106,6 +107,7 @@ class DeltaModule(appCfg: AppConfig, config: Config)(implicit classLoader: Class
make[Clock[IO]].from(Clock.create[IO])
make[Timer[IO]].from(IO.timer(ExecutionContext.global))
make[ContextShift[IO]].from(IO.contextShift(ExecutionContext.global))
make[EvaluationExecution].from(EvaluationExecution(_, _))
make[UUIDF].from(UUIDF.random)
make[Scheduler].from(scheduler)
make[JsonKeyOrdering].from(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ch.epfl.bluebrain.nexus.delta.wiring

import cats.effect.{Clock, Sync}
import cats.effect.{Clock, IO, Sync, Timer}
import ch.epfl.bluebrain.nexus.delta.sdk.ResourceShifts
import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{ProjectionConfig, QueryConfig}
Expand All @@ -9,6 +9,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.stream._
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes._
import ch.epfl.bluebrain.nexus.delta.sourcing.{DeleteExpired, PurgeElemFailures, Transactors}
import izumi.distage.model.definition.ModuleDef
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import monix.bio.{Task, UIO}

/**
Expand Down Expand Up @@ -55,8 +56,8 @@ object StreamModule extends ModuleDef {
}

make[DeleteExpired].fromEffect {
(supervisor: Supervisor, config: ProjectionConfig, xas: Transactors, clock: Clock[UIO]) =>
DeleteExpired(supervisor, config, xas)(clock)
(supervisor: Supervisor, config: ProjectionConfig, xas: Transactors, clock: Clock[IO], timer: Timer[IO]) =>
DeleteExpired(supervisor, config, xas)(clock, timer).toUIO
}

make[PurgeElemFailures].fromEffect {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package ch.epfl.bluebrain.nexus.delta.plugins.archive

import akka.stream.alpakka.file.ArchiveMetadata
import akka.stream.scaladsl.Source
import akka.util.ByteString
import cats.implicits._
import cats.effect.{ContextShift, IO}
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.plugins.archive.FileSelf.ParsingError
import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.ArchiveReference.{FileReference, FileSelfReference, ResourceReference}
import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.ArchiveRejection._
import ch.epfl.bluebrain.nexus.delta.sdk.model.ResourceRepresentation._
import ch.epfl.bluebrain.nexus.delta.plugins.archive.model._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.Files
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection
import ch.epfl.bluebrain.nexus.delta.rdf.RdfError
import ch.epfl.bluebrain.nexus.delta.rdf.implicits._
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
Expand All @@ -22,20 +26,18 @@ import ch.epfl.bluebrain.nexus.delta.sdk.error.SDKError
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
import ch.epfl.bluebrain.nexus.delta.sdk.jsonld.JsonLdContent
import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.AnnotatedSource
import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, ResourceRepresentation}
import ch.epfl.bluebrain.nexus.delta.sdk.model.ResourceRepresentation._
import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, IdSegmentRef, ResourceRepresentation}
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.resources
import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter
import ch.epfl.bluebrain.nexus.delta.sdk.{AkkaSource, JsonLdValue}
import ch.epfl.bluebrain.nexus.delta.sdk.stream.CatsStreamConverter
import ch.epfl.bluebrain.nexus.delta.sdk.{AkkaSource, JsonLdValue, ResourceShifts}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, ResourceRef}
import com.typesafe.scalalogging.Logger
import fs2.Stream
import io.circe.{Json, Printer}
import monix.bio.{IO, Task, UIO}
import monix.execution.Scheduler
import monix.bio.{Task, UIO}

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import akka.stream.alpakka.file.ArchiveMetadata

/**
* Archive download functionality.
Expand All @@ -60,13 +62,13 @@ trait ArchiveDownload {
value: ArchiveValue,
project: ProjectRef,
ignoreNotFound: Boolean
)(implicit caller: Caller, scheduler: Scheduler): IO[ArchiveRejection, AkkaSource]
)(implicit caller: Caller): IO[AkkaSource]

}

object ArchiveDownload {

implicit private val logger: Logger = Logger[ArchiveDownload]
private val logger = Logger.cats[ArchiveDownload]

case class ArchiveDownloadError(filename: String, response: Complete[JsonLdValue]) extends SDKError {
override def getMessage: String = {
Expand All @@ -86,10 +88,15 @@ object ArchiveDownload {
*/
def apply(
aclCheck: AclCheck,
fetchResource: (ResourceRef, ProjectRef) => UIO[Option[JsonLdContent[_, _]]],
fetchFileContent: (ResourceRef, ProjectRef, Caller) => IO[FileRejection, FileResponse],
fetchResource: (ResourceRef, ProjectRef) => IO[Option[JsonLdContent[_, _]]],
fetchFileContent: (ResourceRef, ProjectRef, Caller) => IO[FileResponse],
fileSelf: FileSelf
)(implicit sort: JsonKeyOrdering, baseUri: BaseUri, rcr: RemoteContextResolution): ArchiveDownload =
)(implicit
sort: JsonKeyOrdering,
baseUri: BaseUri,
rcr: RemoteContextResolution,
contextShift: ContextShift[IO]
): ArchiveDownload =
new ArchiveDownload {

implicit private val api: JsonLdApi = JsonLdJavaApi.lenient
Expand All @@ -100,17 +107,17 @@ object ArchiveDownload {
value: ArchiveValue,
project: ProjectRef,
ignoreNotFound: Boolean
)(implicit caller: Caller, scheduler: Scheduler): IO[ArchiveRejection, AkkaSource] = {
)(implicit caller: Caller): IO[AkkaSource] = {
for {
references <- value.resources.toList.traverse(toFullReference)
_ <- checkResourcePermissions(references, project)
contentStream <- resolveReferencesAsStream(references, project, ignoreNotFound)
} yield {
Source.fromGraph(StreamConverter(contentStream)).via(Zip.writeFlow)
Source.fromGraph(CatsStreamConverter(contentStream)).via(Zip.writeFlow)
}
}

private def toFullReference(archiveReference: ArchiveReference): IO[ArchiveRejection, FullArchiveReference] = {
private def toFullReference(archiveReference: ArchiveReference): IO[FullArchiveReference] = {
archiveReference match {
case reference: FullArchiveReference => IO.pure(reference)
case reference: FileSelfReference =>
Expand All @@ -119,15 +126,17 @@ object ArchiveDownload {
.map { case (projectRef, resourceRef) =>
FileReference(resourceRef, Some(projectRef), reference.path)
}
.mapError(InvalidFileSelf)
.adaptError { case e: ParsingError =>
InvalidFileSelf(e)
}
}
}

private def resolveReferencesAsStream(
references: List[FullArchiveReference],
project: ProjectRef,
ignoreNotFound: Boolean
)(implicit caller: Caller): IO[ArchiveRejection, Stream[Task, (ArchiveMetadata, AkkaSource)]] = {
)(implicit caller: Caller): IO[Stream[IO, (ArchiveMetadata, AkkaSource)]] = {
references
.traverseFilter {
case ref: FileReference => fileEntry(ref, project, ignoreNotFound)
Expand All @@ -137,61 +146,60 @@ object ArchiveDownload {
.map(asStream)
}

private def sortWith(list: List[(ArchiveMetadata, Task[AkkaSource])]): List[(ArchiveMetadata, Task[AkkaSource])] =
private def sortWith(list: List[(ArchiveMetadata, IO[AkkaSource])]): List[(ArchiveMetadata, IO[AkkaSource])] =
list.sortBy { case (entry, _) => entry }(Zip.ordering)

private def asStream(
list: List[(ArchiveMetadata, Task[AkkaSource])]
): Stream[Task, (ArchiveMetadata, AkkaSource)] =
list: List[(ArchiveMetadata, IO[AkkaSource])]
): Stream[IO, (ArchiveMetadata, AkkaSource)] =
Stream.iterable(list).evalMap { case (metadata, source) =>
source.map(metadata -> _)
}

private def checkResourcePermissions(
refs: List[FullArchiveReference],
project: ProjectRef
)(implicit caller: Caller): IO[AuthorizationFailed, Unit] =
)(implicit caller: Caller): IO[Unit] = toCatsIO {
aclCheck
.mapFilterOrRaise(
refs,
(a: FullArchiveReference) => AclAddress.Project(a.project.getOrElse(project)) -> resources.read,
identity[ArchiveReference],
address => IO.raiseError(AuthorizationFailed(address, resources.read))
address => Task.raiseError(AuthorizationFailed(address, resources.read))
)
.void
}

private def fileEntry(
ref: FileReference,
project: ProjectRef,
ignoreNotFound: Boolean
)(implicit
caller: Caller
): IO[ArchiveRejection, Option[(ArchiveMetadata, Task[AkkaSource])]] = {
): IO[Option[(ArchiveMetadata, IO[AkkaSource])]] = {
val refProject = ref.project.getOrElse(project)
// the required permissions are checked for each file content fetch
val entry = fetchFileContent(ref.ref, refProject, caller)
.mapError {
.adaptError {
case _: FileRejection.FileNotFound => ResourceNotFound(ref.ref, project)
case _: FileRejection.TagNotFound => ResourceNotFound(ref.ref, project)
case _: FileRejection.RevisionNotFound => ResourceNotFound(ref.ref, project)
case FileRejection.AuthorizationFailed(addr, perm) => AuthorizationFailed(addr, perm)
case other => WrappedFileRejection(other)
case other: FileRejection => WrappedFileRejection(other)
}
.map { case FileResponse(fileMetadata, content) =>
val path = pathOf(ref, project, fileMetadata.filename)
val archiveMetadata = Zip.metadata(path)
val contentTask: Task[AkkaSource] = content
val path = pathOf(ref, project, fileMetadata.filename)
val archiveMetadata = Zip.metadata(path)
val contentTask: IO[AkkaSource] = content
.tapError(response =>
UIO.delay(
logger
.error(s"Error streaming file '${fileMetadata.filename}' for archive: ${response.value.value}")
)
logger
.error(s"Error streaming file '${fileMetadata.filename}' for archive: ${response.value.value}")
.toUIO
)
.mapError(response => ArchiveDownloadError(fileMetadata.filename, response))
Some((archiveMetadata, contentTask))

Option((archiveMetadata, contentTask))
}
if (ignoreNotFound) entry.onErrorRecover { case _: ResourceNotFound => None }
if (ignoreNotFound) entry.recover { case _: ResourceNotFound => None }
else entry
}

Expand All @@ -209,34 +217,34 @@ object ArchiveDownload {
ref: ResourceReference,
project: ProjectRef,
ignoreNotFound: Boolean
): IO[ArchiveRejection, Option[(ArchiveMetadata, Task[AkkaSource])]] = {
): IO[Option[(ArchiveMetadata, IO[AkkaSource])]] = {
val archiveEntry = resourceRefToByteString(ref, project).map { content =>
val path = pathOf(ref, project)
val metadata = Zip.metadata(path)
Some((metadata, Task.pure(Source.single(content))))
Option((metadata, IO.pure(Source.single(content))))
}
if (ignoreNotFound) archiveEntry.onErrorHandle { _: ResourceNotFound => None }
if (ignoreNotFound) archiveEntry.recover { _: ResourceNotFound => None }
else archiveEntry
}

private def resourceRefToByteString(
ref: ResourceReference,
project: ProjectRef
): IO[ResourceNotFound, ByteString] = {
): IO[ByteString] = {
val p = ref.project.getOrElse(project)
for {
valueOpt <- fetchResource(ref.ref, p)
value <- IO.fromOption(valueOpt, ResourceNotFound(ref.ref, project))
bytes <- valueToByteString(value, ref.representationOrDefault).logAndDiscardErrors(
"serialize resource to ByteString"
)
value <- IO.fromOption(valueOpt)(ResourceNotFound(ref.ref, project))
bytes <- valueToByteString(value, ref.representationOrDefault).onError { error =>
logger.error(error)(s"Serializing resource '$ref' to ByteString failed.")
}
} yield bytes
}

private def valueToByteString[A](
value: JsonLdContent[A, _],
repr: ResourceRepresentation
): IO[RdfError, ByteString] = {
): IO[ByteString] = toCatsIO {
implicit val encoder: JsonLdEncoder[A] = value.encoder
repr match {
case SourceJson => UIO.pure(ByteString(prettyPrintSource(value.source)))
Expand Down Expand Up @@ -265,4 +273,17 @@ object ArchiveDownload {
}
}

def apply(aclCheck: AclCheck, shifts: ResourceShifts, files: Files, fileSelf: FileSelf)(implicit
sort: JsonKeyOrdering,
baseUri: BaseUri,
rcr: RemoteContextResolution,
contextShift: ContextShift[IO]
): ArchiveDownload =
ArchiveDownload(
aclCheck,
shifts.fetch,
(id: ResourceRef, project: ProjectRef, caller: Caller) => files.fetchContent(IdSegmentRef(id), project)(caller),
fileSelf
)

}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package ch.epfl.bluebrain.nexus.delta.plugins.archive

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.sourcing.config.EphemeralLogConfig
import com.typesafe.config.Config
import monix.bio.UIO
import pureconfig.generic.semiauto.deriveReader
import pureconfig.{ConfigReader, ConfigSource}

Expand All @@ -23,8 +23,8 @@ object ArchivePluginConfig {
/**
* Converts a [[Config]] into an [[ArchivePluginConfig]]
*/
def load(config: Config): UIO[ArchivePluginConfig] =
UIO.delay {
def load(config: Config): IO[ArchivePluginConfig] =
IO.delay {
ConfigSource
.fromConfig(config)
.at("plugins.archive")
Expand Down
Loading

0 comments on commit cfa7f5e

Please sign in to comment.