Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate archives to Cats Effect #4352

Merged
merged 5 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.plugin.PluginDef
import ch.epfl.bluebrain.nexus.delta.sdk.projects.{OwnerPermissionsScopeInitialization, ProjectsConfig}
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{DatabaseConfig, ProjectionConfig, QueryConfig}
import ch.epfl.bluebrain.nexus.delta.sourcing.execution.EvaluationExecution
import ch.megard.akka.http.cors.scaladsl.settings.CorsSettings
import com.typesafe.config.Config
import izumi.distage.model.definition.{Id, ModuleDef}
Expand Down Expand Up @@ -106,6 +107,7 @@ class DeltaModule(appCfg: AppConfig, config: Config)(implicit classLoader: Class
make[Clock[IO]].from(Clock.create[IO])
make[Timer[IO]].from(IO.timer(ExecutionContext.global))
make[ContextShift[IO]].from(IO.contextShift(ExecutionContext.global))
make[EvaluationExecution].from(EvaluationExecution(_, _))
make[UUIDF].from(UUIDF.random)
make[Scheduler].from(scheduler)
make[JsonKeyOrdering].from(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ch.epfl.bluebrain.nexus.delta.wiring

import cats.effect.{Clock, Sync}
import cats.effect.{Clock, IO, Sync, Timer}
import ch.epfl.bluebrain.nexus.delta.sdk.ResourceShifts
import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{ProjectionConfig, QueryConfig}
Expand All @@ -9,6 +9,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.stream._
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes._
import ch.epfl.bluebrain.nexus.delta.sourcing.{DeleteExpired, PurgeElemFailures, Transactors}
import izumi.distage.model.definition.ModuleDef
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import monix.bio.{Task, UIO}

/**
Expand Down Expand Up @@ -55,8 +56,8 @@ object StreamModule extends ModuleDef {
}

make[DeleteExpired].fromEffect {
(supervisor: Supervisor, config: ProjectionConfig, xas: Transactors, clock: Clock[UIO]) =>
DeleteExpired(supervisor, config, xas)(clock)
(supervisor: Supervisor, config: ProjectionConfig, xas: Transactors, clock: Clock[IO], timer: Timer[IO]) =>
DeleteExpired(supervisor, config, xas)(clock, timer).toUIO
}

make[PurgeElemFailures].fromEffect {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package ch.epfl.bluebrain.nexus.delta.plugins.archive

import akka.stream.alpakka.file.ArchiveMetadata
import akka.stream.scaladsl.Source
import akka.util.ByteString
import cats.implicits._
import cats.effect.{ContextShift, IO}
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.plugins.archive.FileSelf.ParsingError
import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.ArchiveReference.{FileReference, FileSelfReference, ResourceReference}
import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.ArchiveRejection._
import ch.epfl.bluebrain.nexus.delta.sdk.model.ResourceRepresentation._
import ch.epfl.bluebrain.nexus.delta.plugins.archive.model._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.Files
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection
import ch.epfl.bluebrain.nexus.delta.rdf.RdfError
import ch.epfl.bluebrain.nexus.delta.rdf.implicits._
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
Expand All @@ -22,20 +26,18 @@ import ch.epfl.bluebrain.nexus.delta.sdk.error.SDKError
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
import ch.epfl.bluebrain.nexus.delta.sdk.jsonld.JsonLdContent
import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.AnnotatedSource
import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, ResourceRepresentation}
import ch.epfl.bluebrain.nexus.delta.sdk.model.ResourceRepresentation._
import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, IdSegmentRef, ResourceRepresentation}
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.resources
import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter
import ch.epfl.bluebrain.nexus.delta.sdk.{AkkaSource, JsonLdValue}
import ch.epfl.bluebrain.nexus.delta.sdk.stream.CatsStreamConverter
import ch.epfl.bluebrain.nexus.delta.sdk.{AkkaSource, JsonLdValue, ResourceShifts}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, ResourceRef}
import com.typesafe.scalalogging.Logger
import fs2.Stream
import io.circe.{Json, Printer}
import monix.bio.{IO, Task, UIO}
import monix.execution.Scheduler
import monix.bio.{Task, UIO}

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import akka.stream.alpakka.file.ArchiveMetadata

/**
* Archive download functionality.
Expand All @@ -60,13 +62,13 @@ trait ArchiveDownload {
value: ArchiveValue,
project: ProjectRef,
ignoreNotFound: Boolean
)(implicit caller: Caller, scheduler: Scheduler): IO[ArchiveRejection, AkkaSource]
)(implicit caller: Caller): IO[AkkaSource]

}

object ArchiveDownload {

implicit private val logger: Logger = Logger[ArchiveDownload]
private val logger = Logger.cats[ArchiveDownload]

case class ArchiveDownloadError(filename: String, response: Complete[JsonLdValue]) extends SDKError {
override def getMessage: String = {
Expand All @@ -86,10 +88,15 @@ object ArchiveDownload {
*/
def apply(
aclCheck: AclCheck,
fetchResource: (ResourceRef, ProjectRef) => UIO[Option[JsonLdContent[_, _]]],
fetchFileContent: (ResourceRef, ProjectRef, Caller) => IO[FileRejection, FileResponse],
fetchResource: (ResourceRef, ProjectRef) => IO[Option[JsonLdContent[_, _]]],
fetchFileContent: (ResourceRef, ProjectRef, Caller) => IO[FileResponse],
fileSelf: FileSelf
)(implicit sort: JsonKeyOrdering, baseUri: BaseUri, rcr: RemoteContextResolution): ArchiveDownload =
)(implicit
sort: JsonKeyOrdering,
baseUri: BaseUri,
rcr: RemoteContextResolution,
contextShift: ContextShift[IO]
): ArchiveDownload =
new ArchiveDownload {

implicit private val api: JsonLdApi = JsonLdJavaApi.lenient
Expand All @@ -100,17 +107,17 @@ object ArchiveDownload {
value: ArchiveValue,
project: ProjectRef,
ignoreNotFound: Boolean
)(implicit caller: Caller, scheduler: Scheduler): IO[ArchiveRejection, AkkaSource] = {
)(implicit caller: Caller): IO[AkkaSource] = {
for {
references <- value.resources.toList.traverse(toFullReference)
_ <- checkResourcePermissions(references, project)
contentStream <- resolveReferencesAsStream(references, project, ignoreNotFound)
} yield {
Source.fromGraph(StreamConverter(contentStream)).via(Zip.writeFlow)
Source.fromGraph(CatsStreamConverter(contentStream)).via(Zip.writeFlow)
}
}

private def toFullReference(archiveReference: ArchiveReference): IO[ArchiveRejection, FullArchiveReference] = {
private def toFullReference(archiveReference: ArchiveReference): IO[FullArchiveReference] = {
archiveReference match {
case reference: FullArchiveReference => IO.pure(reference)
case reference: FileSelfReference =>
Expand All @@ -119,15 +126,17 @@ object ArchiveDownload {
.map { case (projectRef, resourceRef) =>
FileReference(resourceRef, Some(projectRef), reference.path)
}
.mapError(InvalidFileSelf)
.adaptError { case e: ParsingError =>
InvalidFileSelf(e)
}
}
}

private def resolveReferencesAsStream(
references: List[FullArchiveReference],
project: ProjectRef,
ignoreNotFound: Boolean
)(implicit caller: Caller): IO[ArchiveRejection, Stream[Task, (ArchiveMetadata, AkkaSource)]] = {
)(implicit caller: Caller): IO[Stream[IO, (ArchiveMetadata, AkkaSource)]] = {
references
.traverseFilter {
case ref: FileReference => fileEntry(ref, project, ignoreNotFound)
Expand All @@ -137,61 +146,60 @@ object ArchiveDownload {
.map(asStream)
}

private def sortWith(list: List[(ArchiveMetadata, Task[AkkaSource])]): List[(ArchiveMetadata, Task[AkkaSource])] =
private def sortWith(list: List[(ArchiveMetadata, IO[AkkaSource])]): List[(ArchiveMetadata, IO[AkkaSource])] =
list.sortBy { case (entry, _) => entry }(Zip.ordering)

private def asStream(
list: List[(ArchiveMetadata, Task[AkkaSource])]
): Stream[Task, (ArchiveMetadata, AkkaSource)] =
list: List[(ArchiveMetadata, IO[AkkaSource])]
): Stream[IO, (ArchiveMetadata, AkkaSource)] =
Stream.iterable(list).evalMap { case (metadata, source) =>
source.map(metadata -> _)
}

private def checkResourcePermissions(
refs: List[FullArchiveReference],
project: ProjectRef
)(implicit caller: Caller): IO[AuthorizationFailed, Unit] =
)(implicit caller: Caller): IO[Unit] = toCatsIO {
aclCheck
.mapFilterOrRaise(
refs,
(a: FullArchiveReference) => AclAddress.Project(a.project.getOrElse(project)) -> resources.read,
identity[ArchiveReference],
address => IO.raiseError(AuthorizationFailed(address, resources.read))
address => Task.raiseError(AuthorizationFailed(address, resources.read))
)
.void
}

private def fileEntry(
ref: FileReference,
project: ProjectRef,
ignoreNotFound: Boolean
)(implicit
caller: Caller
): IO[ArchiveRejection, Option[(ArchiveMetadata, Task[AkkaSource])]] = {
): IO[Option[(ArchiveMetadata, IO[AkkaSource])]] = {
val refProject = ref.project.getOrElse(project)
// the required permissions are checked for each file content fetch
val entry = fetchFileContent(ref.ref, refProject, caller)
.mapError {
.adaptError {
case _: FileRejection.FileNotFound => ResourceNotFound(ref.ref, project)
case _: FileRejection.TagNotFound => ResourceNotFound(ref.ref, project)
case _: FileRejection.RevisionNotFound => ResourceNotFound(ref.ref, project)
case FileRejection.AuthorizationFailed(addr, perm) => AuthorizationFailed(addr, perm)
case other => WrappedFileRejection(other)
case other: FileRejection => WrappedFileRejection(other)
}
.map { case FileResponse(fileMetadata, content) =>
val path = pathOf(ref, project, fileMetadata.filename)
val archiveMetadata = Zip.metadata(path)
val contentTask: Task[AkkaSource] = content
val path = pathOf(ref, project, fileMetadata.filename)
val archiveMetadata = Zip.metadata(path)
val contentTask: IO[AkkaSource] = content
.tapError(response =>
UIO.delay(
logger
.error(s"Error streaming file '${fileMetadata.filename}' for archive: ${response.value.value}")
)
logger
.error(s"Error streaming file '${fileMetadata.filename}' for archive: ${response.value.value}")
.toUIO
)
.mapError(response => ArchiveDownloadError(fileMetadata.filename, response))
Some((archiveMetadata, contentTask))

Option((archiveMetadata, contentTask))
}
if (ignoreNotFound) entry.onErrorRecover { case _: ResourceNotFound => None }
if (ignoreNotFound) entry.recover { case _: ResourceNotFound => None }
else entry
}

Expand All @@ -209,34 +217,34 @@ object ArchiveDownload {
ref: ResourceReference,
project: ProjectRef,
ignoreNotFound: Boolean
): IO[ArchiveRejection, Option[(ArchiveMetadata, Task[AkkaSource])]] = {
): IO[Option[(ArchiveMetadata, IO[AkkaSource])]] = {
val archiveEntry = resourceRefToByteString(ref, project).map { content =>
val path = pathOf(ref, project)
val metadata = Zip.metadata(path)
Some((metadata, Task.pure(Source.single(content))))
Option((metadata, IO.pure(Source.single(content))))
}
if (ignoreNotFound) archiveEntry.onErrorHandle { _: ResourceNotFound => None }
if (ignoreNotFound) archiveEntry.recover { _: ResourceNotFound => None }
else archiveEntry
}

private def resourceRefToByteString(
ref: ResourceReference,
project: ProjectRef
): IO[ResourceNotFound, ByteString] = {
): IO[ByteString] = {
val p = ref.project.getOrElse(project)
for {
valueOpt <- fetchResource(ref.ref, p)
value <- IO.fromOption(valueOpt, ResourceNotFound(ref.ref, project))
bytes <- valueToByteString(value, ref.representationOrDefault).logAndDiscardErrors(
"serialize resource to ByteString"
)
value <- IO.fromOption(valueOpt)(ResourceNotFound(ref.ref, project))
bytes <- valueToByteString(value, ref.representationOrDefault).onError { error =>
logger.error(error)(s"Serializing resource '$ref' to ByteString failed.")
}
} yield bytes
}

private def valueToByteString[A](
value: JsonLdContent[A, _],
repr: ResourceRepresentation
): IO[RdfError, ByteString] = {
): IO[ByteString] = toCatsIO {
implicit val encoder: JsonLdEncoder[A] = value.encoder
repr match {
case SourceJson => UIO.pure(ByteString(prettyPrintSource(value.source)))
Expand Down Expand Up @@ -265,4 +273,17 @@ object ArchiveDownload {
}
}

def apply(aclCheck: AclCheck, shifts: ResourceShifts, files: Files, fileSelf: FileSelf)(implicit
sort: JsonKeyOrdering,
baseUri: BaseUri,
rcr: RemoteContextResolution,
contextShift: ContextShift[IO]
): ArchiveDownload =
ArchiveDownload(
aclCheck,
shifts.fetch,
(id: ResourceRef, project: ProjectRef, caller: Caller) => files.fetchContent(IdSegmentRef(id), project)(caller),
fileSelf
)

}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package ch.epfl.bluebrain.nexus.delta.plugins.archive

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.sourcing.config.EphemeralLogConfig
import com.typesafe.config.Config
import monix.bio.UIO
import pureconfig.generic.semiauto.deriveReader
import pureconfig.{ConfigReader, ConfigSource}

Expand All @@ -23,8 +23,8 @@ object ArchivePluginConfig {
/**
* Converts a [[Config]] into an [[ArchivePluginConfig]]
*/
def load(config: Config): UIO[ArchivePluginConfig] =
UIO.delay {
def load(config: Config): IO[ArchivePluginConfig] =
IO.delay {
ConfigSource
.fromConfig(config)
.at("plugins.archive")
Expand Down
Loading