From 23c51c1f8d43ae5ee80a6b7308a6c15203f79cec Mon Sep 17 00:00:00 2001 From: Simon Dumas Date: Tue, 10 Oct 2023 19:29:09 +0200 Subject: [PATCH 1/3] Migrate archives to Cats Effect --- .../nexus/delta/wiring/DeltaModule.scala | 2 + .../plugins/archive/ArchiveDownload.scala | 115 +++++++++++------- .../plugins/archive/ArchivePluginModule.scala | 30 ++--- .../delta/plugins/archive/Archives.scala | 93 ++++++-------- .../delta/plugins/archive/FileSelf.scala | 34 +++--- .../archive/model/ArchiveRejection.scala | 3 +- .../delta/plugins/archive/model/Zip.scala | 12 +- .../archive/routes/ArchiveRoutes.scala | 98 +++++++-------- .../plugins/archive/ArchiveDownloadSpec.scala | 24 ++-- .../plugins/archive/ArchiveRoutesSpec.scala | 29 +++-- .../plugins/archive/ArchivesSTMSpec.scala | 5 +- .../delta/plugins/archive/ArchivesSpec.scala | 50 +++----- .../delta/plugins/archive/FileSelfSuite.scala | 23 ++-- .../sdk/directives/ResponseToRedirect.scala | 9 +- .../sdk/stream/CatsStreamConverter.scala | 89 ++++++++++++++ .../nexus/delta/sourcing/DeleteExpired.scala | 39 +++--- .../delta/sourcing/EphemeralDefinition.scala | 18 +-- .../nexus/delta/sourcing/EphemeralLog.scala | 27 ++-- .../execution/EvaluationExecution.scala | 5 + .../sourcing/state/EphemeralStateStore.scala | 9 +- .../delta/sourcing/EphemeralLogSuite.scala | 30 ++--- .../nexus/delta/sourcing/Message.scala | 9 +- .../state/EphemeralStateStoreSuite.scala | 11 +- .../nexus/testkit/ce/CatsEffectSuite.scala | 4 +- .../nexus/testkit/ce/CatsIOValues.scala | 5 +- 25 files changed, 433 insertions(+), 340 deletions(-) create mode 100644 delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/stream/CatsStreamConverter.scala create mode 100644 delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/execution/EvaluationExecution.scala diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/DeltaModule.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/DeltaModule.scala index a22575c37e..e5d12a4401 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/DeltaModule.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/DeltaModule.scala @@ -30,6 +30,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.plugin.PluginDef import ch.epfl.bluebrain.nexus.delta.sdk.projects.{OwnerPermissionsScopeInitialization, ProjectsConfig} import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.config.{DatabaseConfig, ProjectionConfig, QueryConfig} +import ch.epfl.bluebrain.nexus.delta.sourcing.execution.EvaluationExecution import ch.megard.akka.http.cors.scaladsl.settings.CorsSettings import com.typesafe.config.Config import izumi.distage.model.definition.{Id, ModuleDef} @@ -106,6 +107,7 @@ class DeltaModule(appCfg: AppConfig, config: Config)(implicit classLoader: Class make[Clock[IO]].from(Clock.create[IO]) make[Timer[IO]].from(IO.timer(ExecutionContext.global)) make[ContextShift[IO]].from(IO.contextShift(ExecutionContext.global)) + make[EvaluationExecution].from(EvaluationExecution(_, _)) make[UUIDF].from(UUIDF.random) make[Scheduler].from(scheduler) make[JsonKeyOrdering].from( diff --git a/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchiveDownload.scala b/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchiveDownload.scala index 43a7115c8c..876ae894c6 100644 --- a/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchiveDownload.scala +++ b/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchiveDownload.scala @@ -1,14 +1,18 @@ package ch.epfl.bluebrain.nexus.delta.plugins.archive +import akka.stream.alpakka.file.ArchiveMetadata import akka.stream.scaladsl.Source import akka.util.ByteString -import cats.implicits._ +import cats.effect.{ContextShift, IO} +import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.kernel.Logger +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ +import ch.epfl.bluebrain.nexus.delta.plugins.archive.FileSelf.ParsingError import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.ArchiveReference.{FileReference, FileSelfReference, ResourceReference} import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.ArchiveRejection._ -import ch.epfl.bluebrain.nexus.delta.sdk.model.ResourceRepresentation._ import ch.epfl.bluebrain.nexus.delta.plugins.archive.model._ +import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.Files import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection -import ch.epfl.bluebrain.nexus.delta.rdf.RdfError import ch.epfl.bluebrain.nexus.delta.rdf.implicits._ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi} import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution @@ -22,20 +26,18 @@ import ch.epfl.bluebrain.nexus.delta.sdk.error.SDKError import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller import ch.epfl.bluebrain.nexus.delta.sdk.jsonld.JsonLdContent import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.AnnotatedSource -import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, ResourceRepresentation} +import ch.epfl.bluebrain.nexus.delta.sdk.model.ResourceRepresentation._ +import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, IdSegmentRef, ResourceRepresentation} import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.resources -import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter -import ch.epfl.bluebrain.nexus.delta.sdk.{AkkaSource, JsonLdValue} +import ch.epfl.bluebrain.nexus.delta.sdk.stream.CatsStreamConverter +import ch.epfl.bluebrain.nexus.delta.sdk.{AkkaSource, JsonLdValue, ResourceShifts} import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, ResourceRef} -import com.typesafe.scalalogging.Logger import fs2.Stream import io.circe.{Json, Printer} -import monix.bio.{IO, Task, UIO} -import monix.execution.Scheduler +import monix.bio.{Task, UIO} import java.nio.ByteBuffer import java.nio.charset.StandardCharsets -import akka.stream.alpakka.file.ArchiveMetadata /** * Archive download functionality. @@ -60,13 +62,13 @@ trait ArchiveDownload { value: ArchiveValue, project: ProjectRef, ignoreNotFound: Boolean - )(implicit caller: Caller, scheduler: Scheduler): IO[ArchiveRejection, AkkaSource] + )(implicit caller: Caller): IO[AkkaSource] } object ArchiveDownload { - implicit private val logger: Logger = Logger[ArchiveDownload] + private val logger = Logger.cats[ArchiveDownload] case class ArchiveDownloadError(filename: String, response: Complete[JsonLdValue]) extends SDKError { override def getMessage: String = { @@ -86,10 +88,15 @@ object ArchiveDownload { */ def apply( aclCheck: AclCheck, - fetchResource: (ResourceRef, ProjectRef) => UIO[Option[JsonLdContent[_, _]]], - fetchFileContent: (ResourceRef, ProjectRef, Caller) => IO[FileRejection, FileResponse], + fetchResource: (ResourceRef, ProjectRef) => IO[Option[JsonLdContent[_, _]]], + fetchFileContent: (ResourceRef, ProjectRef, Caller) => IO[FileResponse], fileSelf: FileSelf - )(implicit sort: JsonKeyOrdering, baseUri: BaseUri, rcr: RemoteContextResolution): ArchiveDownload = + )(implicit + sort: JsonKeyOrdering, + baseUri: BaseUri, + rcr: RemoteContextResolution, + contextShift: ContextShift[IO] + ): ArchiveDownload = new ArchiveDownload { implicit private val api: JsonLdApi = JsonLdJavaApi.lenient @@ -100,17 +107,17 @@ object ArchiveDownload { value: ArchiveValue, project: ProjectRef, ignoreNotFound: Boolean - )(implicit caller: Caller, scheduler: Scheduler): IO[ArchiveRejection, AkkaSource] = { + )(implicit caller: Caller): IO[AkkaSource] = { for { references <- value.resources.toList.traverse(toFullReference) _ <- checkResourcePermissions(references, project) contentStream <- resolveReferencesAsStream(references, project, ignoreNotFound) } yield { - Source.fromGraph(StreamConverter(contentStream)).via(Zip.writeFlow) + Source.fromGraph(CatsStreamConverter(contentStream)).via(Zip.writeFlow) } } - private def toFullReference(archiveReference: ArchiveReference): IO[ArchiveRejection, FullArchiveReference] = { + private def toFullReference(archiveReference: ArchiveReference): IO[FullArchiveReference] = { archiveReference match { case reference: FullArchiveReference => IO.pure(reference) case reference: FileSelfReference => @@ -119,7 +126,9 @@ object ArchiveDownload { .map { case (projectRef, resourceRef) => FileReference(resourceRef, Some(projectRef), reference.path) } - .mapError(InvalidFileSelf) + .adaptError { case e: ParsingError => + InvalidFileSelf(e) + } } } @@ -127,7 +136,7 @@ object ArchiveDownload { references: List[FullArchiveReference], project: ProjectRef, ignoreNotFound: Boolean - )(implicit caller: Caller): IO[ArchiveRejection, Stream[Task, (ArchiveMetadata, AkkaSource)]] = { + )(implicit caller: Caller): IO[Stream[IO, (ArchiveMetadata, AkkaSource)]] = { references .traverseFilter { case ref: FileReference => fileEntry(ref, project, ignoreNotFound) @@ -137,12 +146,12 @@ object ArchiveDownload { .map(asStream) } - private def sortWith(list: List[(ArchiveMetadata, Task[AkkaSource])]): List[(ArchiveMetadata, Task[AkkaSource])] = + private def sortWith(list: List[(ArchiveMetadata, IO[AkkaSource])]): List[(ArchiveMetadata, IO[AkkaSource])] = list.sortBy { case (entry, _) => entry }(Zip.ordering) private def asStream( - list: List[(ArchiveMetadata, Task[AkkaSource])] - ): Stream[Task, (ArchiveMetadata, AkkaSource)] = + list: List[(ArchiveMetadata, IO[AkkaSource])] + ): Stream[IO, (ArchiveMetadata, AkkaSource)] = Stream.iterable(list).evalMap { case (metadata, source) => source.map(metadata -> _) } @@ -150,15 +159,16 @@ object ArchiveDownload { private def checkResourcePermissions( refs: List[FullArchiveReference], project: ProjectRef - )(implicit caller: Caller): IO[AuthorizationFailed, Unit] = + )(implicit caller: Caller): IO[Unit] = toCatsIO { aclCheck .mapFilterOrRaise( refs, (a: FullArchiveReference) => AclAddress.Project(a.project.getOrElse(project)) -> resources.read, identity[ArchiveReference], - address => IO.raiseError(AuthorizationFailed(address, resources.read)) + address => Task.raiseError(AuthorizationFailed(address, resources.read)) ) .void + } private def fileEntry( ref: FileReference, @@ -166,32 +176,30 @@ object ArchiveDownload { ignoreNotFound: Boolean )(implicit caller: Caller - ): IO[ArchiveRejection, Option[(ArchiveMetadata, Task[AkkaSource])]] = { + ): IO[Option[(ArchiveMetadata, IO[AkkaSource])]] = { val refProject = ref.project.getOrElse(project) // the required permissions are checked for each file content fetch val entry = fetchFileContent(ref.ref, refProject, caller) - .mapError { + .adaptError { case _: FileRejection.FileNotFound => ResourceNotFound(ref.ref, project) case _: FileRejection.TagNotFound => ResourceNotFound(ref.ref, project) case _: FileRejection.RevisionNotFound => ResourceNotFound(ref.ref, project) case FileRejection.AuthorizationFailed(addr, perm) => AuthorizationFailed(addr, perm) - case other => WrappedFileRejection(other) + case other: FileRejection => WrappedFileRejection(other) } .map { case FileResponse(fileMetadata, content) => - val path = pathOf(ref, project, fileMetadata.filename) - val archiveMetadata = Zip.metadata(path) - val contentTask: Task[AkkaSource] = content + val path = pathOf(ref, project, fileMetadata.filename) + val archiveMetadata = Zip.metadata(path) + val contentTask: IO[AkkaSource] = content .tapError(response => - UIO.delay( - logger - .error(s"Error streaming file '${fileMetadata.filename}' for archive: ${response.value.value}") - ) + logger + .error(s"Error streaming file '${fileMetadata.filename}' for archive: ${response.value.value}") + .toUIO ) .mapError(response => ArchiveDownloadError(fileMetadata.filename, response)) - Some((archiveMetadata, contentTask)) - + Option((archiveMetadata, contentTask)) } - if (ignoreNotFound) entry.onErrorRecover { case _: ResourceNotFound => None } + if (ignoreNotFound) entry.recover { case _: ResourceNotFound => None } else entry } @@ -209,34 +217,34 @@ object ArchiveDownload { ref: ResourceReference, project: ProjectRef, ignoreNotFound: Boolean - ): IO[ArchiveRejection, Option[(ArchiveMetadata, Task[AkkaSource])]] = { + ): IO[Option[(ArchiveMetadata, IO[AkkaSource])]] = { val archiveEntry = resourceRefToByteString(ref, project).map { content => val path = pathOf(ref, project) val metadata = Zip.metadata(path) - Some((metadata, Task.pure(Source.single(content)))) + Option((metadata, IO.pure(Source.single(content)))) } - if (ignoreNotFound) archiveEntry.onErrorHandle { _: ResourceNotFound => None } + if (ignoreNotFound) archiveEntry.recover { _: ResourceNotFound => None } else archiveEntry } private def resourceRefToByteString( ref: ResourceReference, project: ProjectRef - ): IO[ResourceNotFound, ByteString] = { + ): IO[ByteString] = { val p = ref.project.getOrElse(project) for { valueOpt <- fetchResource(ref.ref, p) - value <- IO.fromOption(valueOpt, ResourceNotFound(ref.ref, project)) - bytes <- valueToByteString(value, ref.representationOrDefault).logAndDiscardErrors( - "serialize resource to ByteString" - ) + value <- IO.fromOption(valueOpt)(ResourceNotFound(ref.ref, project)) + bytes <- valueToByteString(value, ref.representationOrDefault).onError { error => + logger.error(error)(s"Serializing resource '$ref' to ByteString failed.") + } } yield bytes } private def valueToByteString[A]( value: JsonLdContent[A, _], repr: ResourceRepresentation - ): IO[RdfError, ByteString] = { + ): IO[ByteString] = toCatsIO { implicit val encoder: JsonLdEncoder[A] = value.encoder repr match { case SourceJson => UIO.pure(ByteString(prettyPrintSource(value.source))) @@ -265,4 +273,17 @@ object ArchiveDownload { } } + def apply(aclCheck: AclCheck, shifts: ResourceShifts, files: Files, fileSelf: FileSelf)(implicit + sort: JsonKeyOrdering, + baseUri: BaseUri, + rcr: RemoteContextResolution, + contextShift: ContextShift[IO] + ): ArchiveDownload = + ArchiveDownload( + aclCheck, + shifts.fetch, + (id: ResourceRef, project: ProjectRef, caller: Caller) => files.fetchContent(IdSegmentRef(id), project)(caller), + fileSelf + ) + } diff --git a/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchivePluginModule.scala b/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchivePluginModule.scala index 91a06c2214..1af0b8fc63 100644 --- a/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchivePluginModule.scala +++ b/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchivePluginModule.scala @@ -1,6 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.archive -import cats.effect.Clock +import cats.effect.{Clock, ContextShift, IO} import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.ArchiveRejection.ProjectContextRejection import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.contexts @@ -13,17 +13,14 @@ import ch.epfl.bluebrain.nexus.delta.sdk._ import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaSchemeDirectives import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities -import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller -import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, IdSegmentRef, MetadataContextValue} +import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, MetadataContextValue} import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext.ContextRejection import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ApiMappings import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, ResourceRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.execution.EvaluationExecution import com.typesafe.config.Config import izumi.distage.model.definition.{Id, ModuleDef} -import monix.bio.UIO -import monix.execution.Scheduler /** * Archive plugin wiring. @@ -38,17 +35,13 @@ object ArchivePluginModule extends ModuleDef { aclCheck: AclCheck, shifts: ResourceShifts, files: Files, + fileSelf: FileSelf, sort: JsonKeyOrdering, baseUri: BaseUri, rcr: RemoteContextResolution @Id("aggregate"), - fileSelf: FileSelf + contextShift: ContextShift[IO] ) => - ArchiveDownload( - aclCheck, - shifts.fetch, - (id: ResourceRef, project: ProjectRef, caller: Caller) => files.fetchContent(IdSegmentRef(id), project)(caller), - fileSelf - )(sort, baseUri, rcr) + ArchiveDownload(aclCheck, shifts, files, fileSelf)(sort, baseUri, rcr, contextShift) } make[FileSelf].from { (fetchContext: FetchContext[ContextRejection], baseUri: BaseUri) => @@ -64,13 +57,15 @@ object ArchivePluginModule extends ModuleDef { api: JsonLdApi, uuidF: UUIDF, rcr: RemoteContextResolution @Id("aggregate"), - clock: Clock[UIO] + clock: Clock[IO], + ec: EvaluationExecution ) => Archives(fetchContext.mapRejection(ProjectContextRejection), archiveDownload, cfg, xas)( api, uuidF, rcr, - clock + clock, + ec ) } @@ -82,10 +77,9 @@ object ArchivePluginModule extends ModuleDef { schemeDirectives: DeltaSchemeDirectives, baseUri: BaseUri, rcr: RemoteContextResolution @Id("aggregate"), - jko: JsonKeyOrdering, - sc: Scheduler + jko: JsonKeyOrdering ) => - new ArchiveRoutes(archives, identities, aclCheck, schemeDirectives)(baseUri, rcr, jko, sc) + new ArchiveRoutes(archives, identities, aclCheck, schemeDirectives)(baseUri, rcr, jko) } many[PriorityRoute].add { (cfg: ArchivePluginConfig, routes: ArchiveRoutes) => diff --git a/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/Archives.scala b/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/Archives.scala index c99b3d5a04..12822dc8b7 100644 --- a/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/Archives.scala +++ b/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/Archives.scala @@ -1,15 +1,16 @@ package ch.epfl.bluebrain.nexus.delta.plugins.archive -import cats.effect.Clock +import cats.effect.{Clock, IO} import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent import ch.epfl.bluebrain.nexus.delta.kernel.syntax._ -import ch.epfl.bluebrain.nexus.delta.kernel.utils.{IOUtils, UUIDF} +import ch.epfl.bluebrain.nexus.delta.kernel.utils.{IOInstant, UUIDF} import ch.epfl.bluebrain.nexus.delta.plugins.archive.Archives.{entityType, expandIri, ArchiveLog} import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.ArchiveRejection._ import ch.epfl.bluebrain.nexus.delta.plugins.archive.model._ import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource import ch.epfl.bluebrain.nexus.delta.sdk.instances._ import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller @@ -19,12 +20,11 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.IdSegment import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ApiMappings import ch.epfl.bluebrain.nexus.delta.sourcing.config.EphemeralLogConfig +import ch.epfl.bluebrain.nexus.delta.sourcing.execution.EvaluationExecution import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.{EphemeralDefinition, EphemeralLog, Transactors} import io.circe.Json -import monix.bio.{IO, UIO} -import monix.execution.Scheduler /** * Archives module. @@ -50,30 +50,14 @@ class Archives( archiveDownload: ArchiveDownload, sourceDecoder: JsonLdSourceDecoder[ArchiveRejection, ArchiveValue], config: EphemeralLogConfig -)(implicit uuidF: UUIDF, rcr: RemoteContextResolution) { +)(implicit rcr: RemoteContextResolution) { implicit private val kamonComponent: KamonMetricComponent = KamonMetricComponent(entityType.value) - /** - * Creates an archive with a system generated id. - * - * @param project - * the archive parent project - * @param value - * the archive value - * @param subject - * the subject that initiated the action - */ - def create( - project: ProjectRef, - value: ArchiveValue - )(implicit subject: Subject): IO[ArchiveRejection, ArchiveResource] = - uuidF().flatMap(uuid => create(uuid.toString, project, value)) - /** * Creates an archive with a specific id. * - * @param id + * @param iri * the archive identifier * @param project * the archive parent project @@ -82,16 +66,8 @@ class Archives( * @param subject * the subject that initiated the action */ - def create( - id: IdSegment, - project: ProjectRef, - value: ArchiveValue - )(implicit subject: Subject): IO[ArchiveRejection, ArchiveResource] = - (for { - p <- fetchContext.onRead(project) - iri <- expandIri(id, p) - res <- eval(CreateArchive(iri, project, value, subject)) - } yield res).span("createArchive") + def create(iri: Iri, project: ProjectRef, value: ArchiveValue)(implicit subject: Subject): IO[ArchiveResource] = + eval(CreateArchive(iri, project, value, subject)).span("createArchive") /** * Creates an archive from a json-ld representation. If an id is detected in the source document it will be used. @@ -104,11 +80,11 @@ class Archives( * @param subject * the subject that initiated the action */ - def create(project: ProjectRef, source: Json)(implicit subject: Subject): IO[ArchiveRejection, ArchiveResource] = + def create(project: ProjectRef, source: Json)(implicit subject: Subject): IO[ArchiveResource] = (for { - p <- fetchContext.onRead(project) - (iri, value) <- sourceDecoder(p, source) - res <- eval(CreateArchive(iri, project, value, subject)) + p <- toCatsIO(fetchContext.onRead(project)) + (iri, value) <- toCatsIO(sourceDecoder(p, source)) + res <- create(iri, project, value) } yield res).span("createArchive") /** @@ -129,12 +105,11 @@ class Archives( id: IdSegment, project: ProjectRef, source: Json - )(implicit subject: Subject): IO[ArchiveRejection, ArchiveResource] = + )(implicit subject: Subject): IO[ArchiveResource] = (for { - p <- fetchContext.onRead(project) - iri <- expandIri(id, p) - value <- sourceDecoder(p, iri, source) - res <- eval(CreateArchive(iri, project, value, subject)) + (iri, p) <- expandWithContext(id, project) + value <- toCatsIO(sourceDecoder(p, iri, source)) + res <- create(iri, project, value) } yield res).span("createArchive") /** @@ -145,13 +120,13 @@ class Archives( * @param project * the archive parent project */ - def fetch(id: IdSegment, project: ProjectRef): IO[ArchiveRejection, ArchiveResource] = - (for { - p <- fetchContext.onRead(project) - iri <- expandIri(id, p) - state <- log.stateOr(project, iri, ArchiveNotFound(iri, project)) - res = state.toResource(config.ttl) - } yield res).span("fetchArchive") + def fetch(id: IdSegment, project: ProjectRef): IO[ArchiveResource] = { + for { + (iri, _) <- expandWithContext(id, project) + state <- log.stateOr(project, iri, ArchiveNotFound(iri, project)) + res = state.toResource(config.ttl) + } yield res + }.span("fetchArchive") /** * Provides an [[AkkaSource]] for streaming an archive content. @@ -167,14 +142,21 @@ class Archives( id: IdSegment, project: ProjectRef, ignoreNotFound: Boolean - )(implicit caller: Caller, scheduler: Scheduler): IO[ArchiveRejection, AkkaSource] = + )(implicit caller: Caller): IO[AkkaSource] = (for { resource <- fetch(id, project) value = resource.value source <- archiveDownload(value.value, project, ignoreNotFound) } yield source).span("downloadArchive") - private def eval(cmd: CreateArchive): IO[ArchiveRejection, ArchiveResource] = + private def expandWithContext(id: IdSegment, project: ProjectRef) = toCatsIO { + for { + p <- fetchContext.onRead(project) + iri <- expandIri(id, p) + } yield (iri, p) + } + + private def eval(cmd: CreateArchive): IO[ArchiveResource] = log.evaluate(cmd.project, cmd.id, cmd).map { _.toResource(config.ttl) } } @@ -211,7 +193,8 @@ object Archives { api: JsonLdApi, uuidF: UUIDF, rcr: RemoteContextResolution, - clock: Clock[UIO] + clock: Clock[IO], + execution: EvaluationExecution ): Archives = new Archives( EphemeralLog( definition, @@ -224,7 +207,7 @@ object Archives { cfg.ephemeral ) - private def definition(implicit clock: Clock[UIO]) = + private def definition(implicit clock: Clock[IO]) = EphemeralDefinition( entityType, evaluate, @@ -240,9 +223,9 @@ object Archives { private[archive] def evaluate( command: CreateArchive - )(implicit clock: Clock[UIO]): IO[ArchiveRejection, ArchiveState] = - IOUtils.instant.map { instant => - ArchiveState(command.id, command.project, command.value.resources, instant, command.subject) + )(implicit clock: Clock[IO]): IO[ArchiveState] = + IOInstant.now.map { now => + ArchiveState(command.id, command.project, command.value.resources, now, command.subject) } } diff --git a/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/FileSelf.scala b/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/FileSelf.scala index 24f57b8896..a1ac940d44 100644 --- a/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/FileSelf.scala +++ b/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/FileSelf.scala @@ -1,15 +1,17 @@ package ch.epfl.bluebrain.nexus.delta.plugins.archive import akka.http.scaladsl.model.Uri +import cats.effect.IO +import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.kernel.utils.UrlUtils -import ch.epfl.bluebrain.nexus.delta.plugins.archive.FileSelf.ParsingError -import ch.epfl.bluebrain.nexus.delta.plugins.archive.FileSelf.ParsingError.{ExternalLink, InvalidFileId, InvalidPath, InvalidProject, InvalidProjectContext} +import ch.epfl.bluebrain.nexus.delta.plugins.archive.FileSelf.ParsingError._ import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.implicits._ +import ch.epfl.bluebrain.nexus.delta.sdk.error.SDKError import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, IdSegment} import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, ResourceRef} -import monix.bio.IO /** * Attempts to parse an incoming iri/uri as in order to extract file identifiers if it is a valid file "_self". @@ -18,10 +20,10 @@ import monix.bio.IO */ trait FileSelf { - def parse(input: Uri): IO[ParsingError, (ProjectRef, ResourceRef)] = + def parse(input: Uri): IO[(ProjectRef, ResourceRef)] = parse(input.toIri) - def parse(input: Iri): IO[ParsingError, (ProjectRef, ResourceRef)] + def parse(input: Iri): IO[(ProjectRef, ResourceRef)] } object FileSelf { @@ -29,8 +31,10 @@ object FileSelf { /** * Enumeration of errors that can be raised while attempting to resolve a self */ - sealed trait ParsingError extends Product with Serializable { + sealed trait ParsingError extends SDKError { def message: String + + override def getMessage: String = message } object ParsingError { @@ -83,7 +87,7 @@ object FileSelf { val filePrefixIri = baseUri.iriEndpoint / "files" / "" new FileSelf { - override def parse(input: Iri): IO[ParsingError, (ProjectRef, ResourceRef)] = + override def parse(input: Iri): IO[(ProjectRef, ResourceRef)] = validateSelfPrefix(input) >> parseSelf(input) private def validateSelfPrefix(self: Iri) = @@ -92,18 +96,18 @@ object FileSelf { else IO.raiseError(ParsingError.NonAbsoluteLink(self)) - private def parseSelf(self: Iri): IO[ParsingError, (ProjectRef, ResourceRef)] = + private def parseSelf(self: Iri): IO[(ProjectRef, ResourceRef)] = self.stripPrefix(filePrefixIri).split('/') match { case Array(org, project, id) => for { - project <- IO.fromEither(ProjectRef.parse(org, project)).mapError(_ => InvalidProject(self)) - projectContext <- fetchContext.onRead(project).mapError { _ => InvalidProjectContext(self, project) } + project <- IO.fromEither(ProjectRef.parse(org, project).leftMap(_ => InvalidProject(self))) + projectContext <- toCatsIO( + fetchContext.onRead(project).mapError { _ => InvalidProjectContext(self, project) } + ) decodedId = UrlUtils.decode(id) - resourceRef <- - IO.fromOption( - IdSegment(decodedId).toIri(projectContext.apiMappings, projectContext.base).map(ResourceRef(_)), - InvalidFileId(self) - ) + iriOption = + IdSegment(decodedId).toIri(projectContext.apiMappings, projectContext.base).map(ResourceRef(_)) + resourceRef <- IO.fromOption(iriOption)(InvalidFileId(self)) } yield { (project, resourceRef) } diff --git a/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/model/ArchiveRejection.scala b/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/model/ArchiveRejection.scala index 49637d95b6..daaf6a9411 100644 --- a/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/model/ArchiveRejection.scala +++ b/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/model/ArchiveRejection.scala @@ -19,6 +19,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.permissions.model.Permission import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext.ContextRejection import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, ResourceRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.rejection.Rejection import io.circe.syntax.EncoderOps import io.circe.{Encoder, JsonObject} @@ -28,7 +29,7 @@ import io.circe.{Encoder, JsonObject} * @param reason * a descriptive message as to why the rejection occurred */ -sealed abstract class ArchiveRejection(val reason: String) extends Product with Serializable +sealed abstract class ArchiveRejection(val reason: String) extends Rejection object ArchiveRejection { diff --git a/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/model/Zip.scala b/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/model/Zip.scala index 7477eb6a19..d8ea2022ee 100644 --- a/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/model/Zip.scala +++ b/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/model/Zip.scala @@ -1,9 +1,11 @@ package ch.epfl.bluebrain.nexus.delta.plugins.archive.model import akka.NotUsed -import akka.http.scaladsl.model.{ContentType, HttpRequest, MediaTypes} -import akka.stream.alpakka.file.scaladsl.Archive +import akka.http.scaladsl.model.{ContentType, MediaTypes} +import akka.http.scaladsl.server.Directive +import akka.http.scaladsl.server.Directives.extractRequest import akka.stream.alpakka.file.ArchiveMetadata +import akka.stream.alpakka.file.scaladsl.Archive import akka.stream.scaladsl.{Flow, Source} import akka.util.ByteString import ch.epfl.bluebrain.nexus.delta.sdk.utils.HeadersUtils @@ -25,5 +27,9 @@ object Zip { def metadata(filename: String): ArchiveMetadata = ArchiveMetadata.create(filename) - def checkHeader(req: HttpRequest): Boolean = HeadersUtils.matches(req.headers, Zip.contentType.mediaType) + def checkZipHeader: Directive[Tuple1[Boolean]] = + extractRequest.map { req => + HeadersUtils.matches(req.headers, Zip.contentType.mediaType) + } + } diff --git a/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/routes/ArchiveRoutes.scala b/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/routes/ArchiveRoutes.scala index f4cba81633..c5a09424a8 100644 --- a/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/routes/ArchiveRoutes.scala +++ b/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/routes/ArchiveRoutes.scala @@ -1,23 +1,26 @@ package ch.epfl.bluebrain.nexus.delta.plugins.archive.routes +import akka.http.scaladsl.model.StatusCode import akka.http.scaladsl.model.StatusCodes.{Created, SeeOther} import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.Route +import cats.effect.IO +import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.plugins.archive.Archives -import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.permissions -import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.Zip +import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.{permissions, ArchiveRejection, ArchiveResource, Zip} import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck +import ch.epfl.bluebrain.nexus.delta.sdk.ce.DeltaDirectives._ import ch.epfl.bluebrain.nexus.delta.sdk.circe.CirceUnmarshalling -import ch.epfl.bluebrain.nexus.delta.sdk.directives.{AuthDirectives, DeltaDirectives, DeltaSchemeDirectives, FileResponse} +import ch.epfl.bluebrain.nexus.delta.sdk.directives.{AuthDirectives, DeltaSchemeDirectives, FileResponse} import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities +import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller import ch.epfl.bluebrain.nexus.delta.sdk.implicits._ -import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri +import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, IdSegment} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import io.circe.Json -import kamon.instrumentation.akka.http.TracingDirectives.operationName -import monix.execution.Scheduler /** * The Archive routes. @@ -36,59 +39,39 @@ class ArchiveRoutes( identities: Identities, aclCheck: AclCheck, schemeDirectives: DeltaSchemeDirectives -)(implicit baseUri: BaseUri, rcr: RemoteContextResolution, jko: JsonKeyOrdering, sc: Scheduler) +)(implicit baseUri: BaseUri, rcr: RemoteContextResolution, jko: JsonKeyOrdering) extends AuthDirectives(identities, aclCheck) - with CirceUnmarshalling - with DeltaDirectives { + with CirceUnmarshalling { - private val prefix = baseUri.prefixSegment import schemeDirectives._ def routes: Route = baseUriPrefix(baseUri.prefix) { pathPrefix("archives") { extractCaller { implicit caller => - resolveProjectRef.apply { implicit ref => + resolveProjectRef.apply { implicit project => concat( // create an archive without an id (post & entity(as[Json]) & pathEndOrSingleSlash) { json => - operationName(s"$prefix/archives/{org}/{project}") { - authorizeFor(ref, permissions.write).apply { - archiveResponse( - emitRedirect(SeeOther, archives.create(ref, json).map(_.uris.accessUri)), - emit(Created, archives.create(ref, json).mapValue(_.metadata)) - ) - } + authorizeFor(project, permissions.write).apply { + emitCreatedArchive(archives.create(project, json)) } }, (idSegment & pathEndOrSingleSlash) { id => - operationName(s"$prefix/archives/{org}/{project}/{id}") { - concat( - // create an archive with an id - (put & entity(as[Json]) & pathEndOrSingleSlash) { json => - authorizeFor(ref, permissions.write).apply { - archiveResponse( - emitRedirect(SeeOther, archives.create(id, ref, json).map(_.uris.accessUri)), - emit(Created, archives.create(id, ref, json).mapValue(_.metadata)) - ) - } - }, - // fetch or download an archive - (get & pathEndOrSingleSlash) { - authorizeFor(ref, permissions.read).apply { - archiveResponse( - parameter("ignoreNotFound".as[Boolean] ? false) { ignoreNotFound => - val response = archives.download(id, ref, ignoreNotFound).map { source => - sourceToFileResponse(source) - } - emit(response) - }, - emit(archives.fetch(id, ref)) - ) - } + concat( + // create an archive with an id + (put & entity(as[Json]) & pathEndOrSingleSlash) { json => + authorizeFor(project, permissions.write).apply { + emitCreatedArchive(archives.create(id, project, json)) } - ) - } + }, + // fetch or download an archive + (get & pathEndOrSingleSlash) { + authorizeFor(project, permissions.read).apply { + emitArchiveDownload(id, project) + } + } + ) } ) } @@ -96,9 +79,28 @@ class ArchiveRoutes( } } - private def archiveResponse(validResp: Route, invalidResp: Route): Route = - extractRequest.map(Zip.checkHeader(_)).apply(valid => if (valid) validResp else invalidResp) + private def emitMetadata(statusCode: StatusCode, io: IO[ArchiveResource]): Route = + emit(statusCode, io.mapValue(_.metadata).attemptNarrow[ArchiveRejection]) + + private def emitArchiveFile(source: IO[AkkaSource]) = { + val response = source.map { s => + FileResponse(s"archive.zip", Zip.contentType, 0L, s) + } + emit(response.attemptNarrow[ArchiveRejection]) + } + + private def emitCreatedArchive(io: IO[ArchiveResource]): Route = + Zip.checkZipHeader { + case true => emitRedirect(SeeOther, io.map(_.uris.accessUri).attemptNarrow[ArchiveRejection]) + case false => emitMetadata(Created, io) + } - private def sourceToFileResponse(source: AkkaSource): FileResponse = - FileResponse(s"archive.zip", Zip.contentType, 0L, source) + private def emitArchiveDownload(id: IdSegment, project: ProjectRef)(implicit caller: Caller): Route = + Zip.checkZipHeader { + case true => + parameter("ignoreNotFound".as[Boolean] ? false) { ignoreNotFound => + emitArchiveFile(archives.download(id, project, ignoreNotFound)) + } + case false => emit(archives.fetch(id, project).attemptNarrow[ArchiveRejection]) + } } diff --git a/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchiveDownloadSpec.scala b/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchiveDownloadSpec.scala index 3d75c62a63..93d03ee0a4 100644 --- a/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchiveDownloadSpec.scala +++ b/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchiveDownloadSpec.scala @@ -7,16 +7,16 @@ import akka.stream.scaladsl.Source import akka.testkit.TestKit import akka.util.ByteString import cats.data.NonEmptySet +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.kernel.utils.UrlUtils.encode import ch.epfl.bluebrain.nexus.delta.plugins.archive.FileSelf.ParsingError import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.ArchiveReference.{FileReference, FileSelfReference, ResourceReference} import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.ArchiveRejection.{AuthorizationFailed, InvalidFileSelf, ResourceNotFound} -import ch.epfl.bluebrain.nexus.delta.sdk.model.ResourceRepresentation.{CompactedJsonLd, Dot, ExpandedJsonLd, NQuads, NTriples, SourceJson} import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.{ArchiveRejection, ArchiveValue} import ch.epfl.bluebrain.nexus.delta.plugins.storage.RemoteContextResolutionFixture import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes.FileAttributesOrigin.Client import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection.FileNotFound -import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{Digest, FileAttributes, FileRejection} +import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{Digest, FileAttributes} import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.{schemas, FileGen} import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StorageFixtures import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.AbsolutePath @@ -32,16 +32,16 @@ import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller import ch.epfl.bluebrain.nexus.delta.sdk.implicits._ import ch.epfl.bluebrain.nexus.delta.sdk.jsonld.JsonLdContent import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri +import ch.epfl.bluebrain.nexus.delta.sdk.model.ResourceRepresentation.{CompactedJsonLd, Dot, ExpandedJsonLd, NQuads, NTriples, SourceJson} import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ApiMappings import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject import ch.epfl.bluebrain.nexus.delta.sourcing.model.ResourceRef.Latest import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Identity, Label, ProjectRef, ResourceRef} import ch.epfl.bluebrain.nexus.testkit.archive.ArchiveHelpers -import ch.epfl.bluebrain.nexus.testkit.{EitherValuable, IOValues, TestHelpers} +import ch.epfl.bluebrain.nexus.testkit.ce.CatsIOValues +import ch.epfl.bluebrain.nexus.testkit.{EitherValuable, TestHelpers} import io.circe.syntax.EncoderOps -import monix.bio.{IO, UIO} -import monix.execution.Scheduler.Implicits.global import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike import org.scalatest.{Inspectors, OptionValues} @@ -55,7 +55,7 @@ class ArchiveDownloadSpec with AnyWordSpecLike with Inspectors with EitherValuable - with IOValues + with CatsIOValues with OptionValues with TestHelpers with StorageFixtures @@ -109,13 +109,13 @@ class ArchiveDownloadSpec val file2 = FileGen.resourceFor(id2, projectRef, storageRef, fileAttributes(file2Name, file2Size)) val file2Content: String = "file content 2" - val fetchResource: (Iri, ProjectRef) => UIO[Option[JsonLdContent[_, _]]] = { + val fetchResource: (Iri, ProjectRef) => IO[Option[JsonLdContent[_, _]]] = { case (`id1`, `projectRef`) => - UIO.some(JsonLdContent(file1, file1.value.asJson, None)) + IO.pure(Some(JsonLdContent(file1, file1.value.asJson, None))) case (`id2`, `projectRef`) => - UIO.some(JsonLdContent(file2, file2.value.asJson, None)) + IO.pure(Some(JsonLdContent(file2, file2.value.asJson, None))) case _ => - UIO.none + IO.none } val file1SelfIri: Iri = file1Self.toIri @@ -124,7 +124,7 @@ class ArchiveDownloadSpec case other => IO.raiseError(ParsingError.InvalidPath(other)) } - val fetchFileContent: (Iri, ProjectRef) => IO[FileRejection, FileResponse] = { + val fetchFileContent: (Iri, ProjectRef) => IO[FileResponse] = { case (`id1`, `projectRef`) => IO.pure( FileResponse(file1Name, ContentTypes.`text/plain(UTF-8)`, file1Size, Source.single(ByteString(file1Content))) @@ -154,7 +154,7 @@ class ArchiveDownloadSpec def rejectedAccess(value: ArchiveValue) = { archiveDownload - .apply(value, project.ref, ignoreNotFound = true)(Caller.Anonymous, global) + .apply(value, project.ref, ignoreNotFound = true)(Caller.Anonymous) .rejectedWith[AuthorizationFailed] } diff --git a/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchiveRoutesSpec.scala b/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchiveRoutesSpec.scala index 0e061ee440..590f4fc502 100644 --- a/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchiveRoutesSpec.scala +++ b/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchiveRoutesSpec.scala @@ -8,6 +8,7 @@ import akka.http.scaladsl.model.{ContentTypes, StatusCodes, Uri} import akka.http.scaladsl.server.Route import akka.stream.scaladsl.Source import akka.util.ByteString +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.kernel.utils.UrlUtils.encode import ch.epfl.bluebrain.nexus.delta.kernel.utils.{StatefulUUIDF, UUIDF} import ch.epfl.bluebrain.nexus.delta.plugins.archive.FileSelf.ParsingError.InvalidPath @@ -41,26 +42,32 @@ import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContextDummy import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ApiMappings import ch.epfl.bluebrain.nexus.delta.sdk.utils.BaseRouteSpec import ch.epfl.bluebrain.nexus.delta.sourcing.config.EphemeralLogConfig +import ch.epfl.bluebrain.nexus.delta.sourcing.execution.EvaluationExecution import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject import ch.epfl.bluebrain.nexus.delta.sourcing.model.ResourceRef.Latest import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Identity, Label, ProjectRef, ResourceRef} import ch.epfl.bluebrain.nexus.testkit.archive.ArchiveHelpers +import ch.epfl.bluebrain.nexus.testkit.ce.{CatsIOValues, IOFixedClock} import io.circe.Json import io.circe.syntax.EncoderOps -import monix.bio.{IO, UIO} -import monix.execution.Scheduler import org.scalatest.TryValues import java.util.UUID import scala.concurrent.duration._ -class ArchiveRoutesSpec extends BaseRouteSpec with StorageFixtures with TryValues with ArchiveHelpers { - - implicit private val scheduler: Scheduler = Scheduler.global +class ArchiveRoutesSpec + extends BaseRouteSpec + with StorageFixtures + with IOFixedClock + with TryValues + with ArchiveHelpers + with CatsIOValues { private val uuid = UUID.fromString("8249ba90-7cc6-4de5-93a1-802c04200dcc") implicit private val uuidF: StatefulUUIDF = UUIDF.stateful(uuid).accepted + implicit private val ee: EvaluationExecution = EvaluationExecution(timer, contextShift) + implicit override def rcr: RemoteContextResolution = RemoteContextResolutionFixture.rcr private val subject: Subject = Identity.User("user", Label.unsafe("realm")) @@ -90,7 +97,7 @@ class ArchiveRoutesSpec extends BaseRouteSpec with StorageFixtures with TryValue private val acceptAll = Accept(`*/*`) private val fetchContext = FetchContextDummy(List(project)) - private val groupDirectives = DeltaSchemeDirectives(fetchContext, _ => UIO.none, _ => UIO.none) + private val groupDirectives = DeltaSchemeDirectives(fetchContext) private val storageRef = ResourceRef.Revision(iri"http://localhost/${genString()}", 5) @@ -116,14 +123,14 @@ class ArchiveRoutesSpec extends BaseRouteSpec with StorageFixtures with TryValue private val generatedId = project.base.iri / uuid.toString - val fetchResource: (Iri, ProjectRef) => UIO[Option[JsonLdContent[_, _]]] = { + val fetchResource: (Iri, ProjectRef) => IO[Option[JsonLdContent[_, _]]] = { case (`fileId`, `projectRef`) => - UIO.some(JsonLdContent(file, file.value.asJson, None)) + IO.pure(Some(JsonLdContent(file, file.value.asJson, None))) case _ => - UIO.none + IO.none } - val fetchFileContent: (Iri, ProjectRef, Caller) => IO[FileRejection, FileResponse] = (id, p, c) => { + val fetchFileContent: (Iri, ProjectRef, Caller) => IO[FileResponse] = (id, p, c) => { val s = c.subject (id, p, s) match { case (_, _, `subjectNoFilePerms`) => @@ -290,7 +297,7 @@ class ArchiveRoutesSpec extends BaseRouteSpec with StorageFixtures with TryValue ) ~> routes ~> check { status shouldEqual StatusCodes.OK header[`Content-Type`].value.value() shouldEqual `application/zip`.value - val result = fromZip(responseEntity.dataBytes) + val result = fromZip(responseEntity.dataBytes)(materializer, executor) result.keySet shouldEqual Set( s"${project.ref}/file/file.txt", diff --git a/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchivesSTMSpec.scala b/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchivesSTMSpec.scala index bed5a9c895..7e124f683f 100644 --- a/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchivesSTMSpec.scala +++ b/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchivesSTMSpec.scala @@ -8,7 +8,8 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.AbsolutePath import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.User import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef, ResourceRef} -import ch.epfl.bluebrain.nexus.testkit.{EitherValuable, IOFixedClock, IOValues, TestHelpers} +import ch.epfl.bluebrain.nexus.testkit.ce.{CatsIOValues, IOFixedClock} +import ch.epfl.bluebrain.nexus.testkit.{EitherValuable, TestHelpers} import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike @@ -18,7 +19,7 @@ import java.time.Instant class ArchivesSTMSpec extends AnyWordSpecLike with Matchers - with IOValues + with CatsIOValues with IOFixedClock with EitherValuable with TestHelpers { diff --git a/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchivesSpec.scala b/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchivesSpec.scala index 273d3ee9dd..48e63df7a8 100644 --- a/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchivesSpec.scala +++ b/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchivesSpec.scala @@ -2,6 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.archive import akka.stream.scaladsl.Source import cats.data.NonEmptySet +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.ArchiveReference.{FileReference, ResourceReference} import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.ArchiveRejection.{ArchiveNotFound, ProjectContextRejection} @@ -16,14 +17,14 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.ResourceUris.EphemeralResourceInP import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContextDummy import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ApiMappings import ch.epfl.bluebrain.nexus.delta.sourcing.config.EphemeralLogConfig +import ch.epfl.bluebrain.nexus.delta.sourcing.execution.EvaluationExecution import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Subject, User} import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.model.ResourceRef.Latest import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.DoobieScalaTestFixture -import ch.epfl.bluebrain.nexus.testkit._ +import ch.epfl.bluebrain.nexus.testkit.{EitherValuable, TestHelpers} +import ch.epfl.bluebrain.nexus.testkit.ce.{CatsIOValues, IOFixedClock} import io.circe.literal._ -import monix.bio.IO -import monix.execution.Scheduler import org.scalatest.matchers.should.Matchers import java.net.URLEncoder @@ -35,15 +36,14 @@ import scala.concurrent.duration._ class ArchivesSpec extends DoobieScalaTestFixture with Matchers - with IOValues with IOFixedClock + with CatsIOValues with EitherValuable with TestHelpers with RemoteContextResolutionFixture { - private val uuid = UUID.randomUUID() - implicit private val uuidF: UUIDF = UUIDF.random - implicit private val sc: Scheduler = Scheduler.global + private val uuid = UUID.randomUUID() + implicit private val uuidF: UUIDF = UUIDF.random implicit private val api: JsonLdApi = JsonLdJavaApi.strict @@ -61,15 +61,16 @@ class ArchivesSpec ProjectContextRejection ) - private val cfg = ArchivePluginConfig(1, EphemeralLogConfig(5.seconds, 5.hours)) - private val download = new ArchiveDownload { + private val cfg = ArchivePluginConfig(1, EphemeralLogConfig(5.seconds, 5.hours)) + private val download = new ArchiveDownload { override def apply(value: ArchiveValue, project: ProjectRef, ignoreNotFound: Boolean)(implicit - caller: Caller, - scheduler: Scheduler - ): IO[ArchiveRejection, AkkaSource] = + caller: Caller + ): IO[AkkaSource] = IO.pure(Source.empty) } - private lazy val archives = Archives(fetchContext, download, cfg, xas) + + implicit val ee: EvaluationExecution = EvaluationExecution(timer, contextShift) + private lazy val archives = Archives(fetchContext, download, cfg, xas) "An Archives module" should { "create an archive from source" in { @@ -180,29 +181,6 @@ class ArchivesSpec ) } - "create an archive from value" in { - val resourceId = iri"http://localhost/${genString()}" - val fileId = iri"http://localhost/${genString()}" - val value = ArchiveValue.unsafe( - NonEmptySet.of( - ResourceReference(Latest(resourceId), None, None, None), - FileReference(Latest(fileId), None, None) - ) - ) - - val resource = archives.create(project.ref, value).accepted - - val id = resource.id - val encodedId = URLEncoder.encode(id.toString, StandardCharsets.UTF_8) - resource.uris shouldEqual EphemeralResourceInProjectUris( - project.ref, - s"archives/${project.ref}/$encodedId" - ) - - resource.id shouldEqual id - resource.value shouldEqual Archive(id, project.ref, value.resources, 5.hours.toSeconds) - } - "create an archive from value with a fixed id" in { val id = iri"http://localhost/${genString()}" val resourceId = iri"http://localhost/${genString()}" diff --git a/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/FileSelfSuite.scala b/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/FileSelfSuite.scala index e4c00c644c..8ec4181e6a 100644 --- a/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/FileSelfSuite.scala +++ b/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/FileSelfSuite.scala @@ -11,10 +11,9 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContextDummy import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ResourceRef} -import ch.epfl.bluebrain.nexus.testkit.bio.BioSuite -import monix.bio.UIO +import ch.epfl.bluebrain.nexus.testkit.ce.CatsEffectSuite -class FileSelfSuite extends BioSuite { +class FileSelfSuite extends CatsEffectSuite { implicit private val baseUri: BaseUri = BaseUri("http://bbp.epfl.ch", Label.unsafe("v1")) @@ -34,46 +33,46 @@ class FileSelfSuite extends BioSuite { test("An expanded self should be parsed") { val input = iri"http://bbp.epfl.ch/v1/files/$org/$project/${encode(expandedResourceId)}" - fileSelf.parse(input).tapError { p => UIO.delay(println(p)) }.assert((projectRef, latestRef)) + fileSelf.parse(input).assertEquals((projectRef, latestRef)) } test("An expanded self with a revision should be parsed") { val input = iri"http://bbp.epfl.ch/v1/files/$org/$project/${encode(expandedResourceId)}?rev=$rev" - fileSelf.parse(input).tapError { p => UIO.delay(println(p)) }.assert((projectRef, revisionRef)) + fileSelf.parse(input).assertEquals((projectRef, revisionRef)) } test("An expanded self with a tag should be parsed") { val input = iri"http://bbp.epfl.ch/v1/files/$org/$project/${encode(expandedResourceId)}?tag=${tag.value}" - fileSelf.parse(input).tapError { p => UIO.delay(println(p)) }.assert((projectRef, tagRef)) + fileSelf.parse(input).assertEquals((projectRef, tagRef)) } test("A curie self should be parsed") { val input = iri"http://bbp.epfl.ch/v1/files/$org/$project/nxv:$compactResourceId" - fileSelf.parse(input).assert((projectRef, latestRef)) + fileSelf.parse(input).assertEquals((projectRef, latestRef)) } test("A relative self should not be parsed") { val input = iri"/$org/$project/$compactResourceId" - fileSelf.parse(input).error(NonAbsoluteLink(input)) + fileSelf.parse(input).intercept(NonAbsoluteLink(input)) } test("A self from an external website should not be parsed") { val input = iri"http://localhost/v1/files/$org/$project/$compactResourceId" - fileSelf.parse(input).error(ExternalLink(input)) + fileSelf.parse(input).intercept(ExternalLink(input)) } test("A self with an incorrect path should not be parsed") { val input = iri"http://bbp.epfl.ch/v1/files/$org/$project/$compactResourceId/extra" - fileSelf.parse(input).error(InvalidPath(input)) + fileSelf.parse(input).intercept(InvalidPath(input)) } test("A self with an incorrect project label should not be parsed") { val input = iri"http://bbp.epfl.ch/v1/files/%illegal/$project/$compactResourceId" - fileSelf.parse(input).error(InvalidProject(input)) + fileSelf.parse(input).intercept(InvalidProject(input)) } test("A self with an incorrect id should not resolve") { val input = iri"""http://bbp.epfl.ch/v1/files/$org/$project/badcurie:$compactResourceId")}""" - fileSelf.parse(input).error(InvalidFileId(input)) + fileSelf.parse(input).intercept(InvalidFileId(input)) } } diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/directives/ResponseToRedirect.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/directives/ResponseToRedirect.scala index 6c648b9267..c2823877c1 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/directives/ResponseToRedirect.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/directives/ResponseToRedirect.scala @@ -5,7 +5,6 @@ import akka.http.scaladsl.model.Uri import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.Route import cats.effect.IO -import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering @@ -14,8 +13,6 @@ import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.HttpResponseFields import monix.bio.{IO => BIO, UIO} import monix.execution.Scheduler -import scala.reflect.ClassTag - /** * Redirection response magnet. */ @@ -52,12 +49,12 @@ object ResponseToRedirect { } } - implicit def ioRedirectWithError[E <: Throwable: ClassTag: JsonLdEncoder: HttpResponseFields]( - io: IO[Uri] + implicit def ioRedirectWithError[E <: Throwable: JsonLdEncoder: HttpResponseFields]( + io: IO[Either[E, Uri]] )(implicit cr: RemoteContextResolution, jo: JsonKeyOrdering): ResponseToRedirect = new ResponseToRedirect { override def apply(redirection: Redirection): Route = - onSuccess(io.attemptNarrow[E].unsafeToFuture()) { + onSuccess(io.unsafeToFuture()) { case Left(value) => CatsResponseToJsonLd.valueWithHttpResponseFields[E](value).apply(None) case Right(location) => redirect(location, redirection) } diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/stream/CatsStreamConverter.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/stream/CatsStreamConverter.scala new file mode 100644 index 0000000000..33d3c241ef --- /dev/null +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/stream/CatsStreamConverter.scala @@ -0,0 +1,89 @@ +package ch.epfl.bluebrain.nexus.delta.sdk.stream + +import akka.NotUsed +import akka.stream._ +import akka.stream.scaladsl.{Sink => AkkaSink, Source => AkkaSource, _} +import cats.effect._ +import cats.syntax.all._ +import fs2._ + +/** + * Converts a fs2 stream to an Akka source Original code from the streamz library from Martin Krasser (published under + * Apache License 2.0): + * https://github.com/krasserm/streamz/blob/master/streamz-converter/src/main/scala/streamz/converter/Converter.scala + */ +object CatsStreamConverter { + + private def publisherStream[A](publisher: SourceQueueWithComplete[A], stream: Stream[IO, A])(implicit + contextShift: ContextShift[IO] + ): Stream[IO, Unit] = { + def publish(a: A): IO[Option[Unit]] = IO + .fromFuture(IO(publisher.offer(a))) + .flatMap { + case QueueOfferResult.Enqueued => IO.pure(Some(())) + case QueueOfferResult.Failure(cause) => IO.raiseError[Option[Unit]](cause) + case QueueOfferResult.QueueClosed => IO.none + case QueueOfferResult.Dropped => + IO.raiseError[Option[Unit]]( + new IllegalStateException("This should never happen because we use OverflowStrategy.backpressure") + ) + } + .recover { + // This handles a race condition between `interruptWhen` and `publish`. + // There's no guarantee that, when the akka sink is terminated, we will observe the + // `interruptWhen` termination before calling publish one last time. + // Such a call fails with StreamDetachedException + case _: StreamDetachedException => None + } + + def watchCompletion: IO[Unit] = IO.fromFuture(IO(publisher.watchCompletion())).void + def fail(e: Throwable): IO[Unit] = IO.delay(publisher.fail(e)) >> watchCompletion + def complete: IO[Unit] = IO.delay(publisher.complete()) >> watchCompletion + + stream + .interruptWhen(watchCompletion.attempt) + .evalMap(publish) + .unNoneTerminate + .onFinalizeCase { + case ExitCase.Completed | ExitCase.Canceled => complete + case ExitCase.Error(e) => fail(e) + } + } + + def apply[A](stream: Stream[IO, A])(implicit contextShift: ContextShift[IO]): Graph[SourceShape[A], NotUsed] = { + val source = AkkaSource.queue[A](0, OverflowStrategy.backpressure) + // A sink that runs an FS2 publisherStream when consuming the publisher actor (= materialized value) of source + val sink = AkkaSink.foreach[SourceQueueWithComplete[A]] { p => + // Fire and forget Future so it runs in the background + publisherStream[A](p, stream).compile.drain.unsafeToFuture() + () + } + + AkkaSource + .fromGraph(GraphDSL.createGraph(source) { implicit builder => source => + import GraphDSL.Implicits._ + builder.materializedValue ~> sink + SourceShape(source.out) + }) + .mapMaterializedValue(_ => NotUsed) + } + + def apply[A]( + source: Graph[SourceShape[A], NotUsed] + )(implicit materializer: Materializer, contextShift: ContextShift[IO]): Stream[IO, A] = + Stream.force { + IO.delay { + val subscriber = AkkaSource.fromGraph(source).toMat(AkkaSink.queue[A]())(Keep.right).run() + subscriberStream[A](subscriber) + } + } + + private def subscriberStream[A]( + subscriber: SinkQueueWithCancel[A] + )(implicit contextShift: ContextShift[IO]): Stream[IO, A] = { + val pull = IO.fromFuture(IO(subscriber.pull())) + val cancel = IO.delay(subscriber.cancel()) + Stream.repeatEval(pull).unNoneTerminate.onFinalize(cancel) + } + +} diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/DeleteExpired.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/DeleteExpired.scala index 18b5f0eb6c..fe2333ed9d 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/DeleteExpired.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/DeleteExpired.scala @@ -1,35 +1,35 @@ package ch.epfl.bluebrain.nexus.delta.sourcing -import cats.effect.Clock -import ch.epfl.bluebrain.nexus.delta.kernel.utils.IOUtils +import cats.effect.{Clock, IO, Timer} +import ch.epfl.bluebrain.nexus.delta.kernel.Logger +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ +import ch.epfl.bluebrain.nexus.delta.kernel.utils.IOInstant import ch.epfl.bluebrain.nexus.delta.sourcing.DeleteExpired.logger import ch.epfl.bluebrain.nexus.delta.sourcing.config.ProjectionConfig import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{CompiledProjection, ExecutionStrategy, ProjectionMetadata, Supervisor} -import com.typesafe.scalalogging.Logger import doobie.implicits._ import doobie.postgres.implicits._ import fs2.Stream -import monix.bio.{Task, UIO} /** * Allow to delete expired ephemeral states */ -final class DeleteExpired private[sourcing] (xas: Transactors)(implicit clock: Clock[UIO]) { +final class DeleteExpired private[sourcing] (xas: Transactors)(implicit clock: Clock[IO]) { - def apply(): UIO[Unit] = { + def apply(): IO[Unit] = { for { - instant <- IOUtils.instant + instant <- IOInstant.now deleted <- sql""" | DELETE FROM public.ephemeral_states | WHERE expires < $instant - """.stripMargin.update.run.transact(xas.write).hideErrors - _ <- UIO.when(deleted > 0)(UIO.delay(logger.info(s"Deleted $deleted expired ephemeral states"))) + """.stripMargin.update.run.transact(xas.writeCE) + _ <- IO.whenA(deleted > 0)(logger.info(s"Deleted $deleted expired ephemeral states")) } yield () } } object DeleteExpired { - private val logger: Logger = Logger[DeleteExpired] + private val logger = Logger.cats[DeleteExpired] private val metadata: ProjectionMetadata = ProjectionMetadata("system", "delete-expired", None, None) @@ -37,23 +37,18 @@ object DeleteExpired { * Creates a [[DeleteExpired]] instance and schedules in the supervisor the deletion of expired ephemeral states */ def apply(supervisor: Supervisor, config: ProjectionConfig, xas: Transactors)(implicit - clock: Clock[UIO] - ): Task[DeleteExpired] = { + clock: Clock[IO], + timer: Timer[IO] + ): IO[DeleteExpired] = { val deleteExpired = new DeleteExpired(xas) val stream = Stream - .awakeEvery[Task](config.deleteExpiredEvery) + .awakeEvery[IO](config.deleteExpiredEvery) .evalTap(_ => deleteExpired()) .drain - supervisor - .run( - CompiledProjection.fromStream( - metadata, - ExecutionStrategy.TransientSingleNode, - _ => stream - ) - ) - .as(deleteExpired) + val deleteExpiredProjection = + CompiledProjection.fromStream(metadata, ExecutionStrategy.TransientSingleNode, _ => stream.translate(ioToUioK)) + supervisor.run(deleteExpiredProjection).as(deleteExpired) } } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/EphemeralDefinition.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/EphemeralDefinition.scala index 945c5b5ed9..48b80b0289 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/EphemeralDefinition.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/EphemeralDefinition.scala @@ -1,26 +1,30 @@ package ch.epfl.bluebrain.nexus.delta.sourcing +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.sourcing.EvaluationError.EvaluationTimeout +import ch.epfl.bluebrain.nexus.delta.sourcing.execution.EvaluationExecution import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType +import ch.epfl.bluebrain.nexus.delta.sourcing.rejection.Rejection import ch.epfl.bluebrain.nexus.delta.sourcing.state.State.EphemeralState -import monix.bio.IO import scala.concurrent.duration.FiniteDuration -final case class EphemeralDefinition[Id, S <: EphemeralState, Command, Rejection]( +final case class EphemeralDefinition[Id, S <: EphemeralState, Command, +R <: Rejection]( tpe: EntityType, - evaluate: Command => IO[Rejection, S], + evaluate: Command => IO[S], stateSerializer: Serializer[Id, S], - onUniqueViolation: (Id, Command) => Rejection + onUniqueViolation: (Id, Command) => R ) { /** * Fetches the current state and attempt to apply an incoming command on it */ - def evaluate(command: Command, maxDuration: FiniteDuration): IO[Rejection, S] = + def evaluate(command: Command, maxDuration: FiniteDuration)(implicit execution: EvaluationExecution): IO[S] = evaluate(command).attempt - .timeoutWith(maxDuration, EvaluationTimeout(command, maxDuration)) - .hideErrors + .timeoutTo(maxDuration, IO.raiseError(EvaluationTimeout(command, maxDuration)))( + execution.timer, + execution.contextShift + ) .flatMap(IO.fromEither) } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/EphemeralLog.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/EphemeralLog.scala index d6ffb8cd3d..cc9ca19c64 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/EphemeralLog.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/EphemeralLog.scala @@ -1,12 +1,14 @@ package ch.epfl.bluebrain.nexus.delta.sourcing +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.sourcing.config.EphemeralLogConfig +import ch.epfl.bluebrain.nexus.delta.sourcing.execution.EvaluationExecution import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef +import ch.epfl.bluebrain.nexus.delta.sourcing.rejection.Rejection import ch.epfl.bluebrain.nexus.delta.sourcing.state.EphemeralStateStore import ch.epfl.bluebrain.nexus.delta.sourcing.state.State.EphemeralState import doobie.implicits._ import doobie.postgres.sqlstate -import monix.bio.IO /** * Event log for ephemeral entities that can be controlled through commands; @@ -17,7 +19,7 @@ import monix.bio.IO * Unsuccessful commands result in rejections returned to the caller context without any events being generated or * state transitions applied. */ -trait EphemeralLog[Id, S <: EphemeralState, Command, Rejection] { +trait EphemeralLog[Id, S <: EphemeralState, Command, R <: Rejection] { /** * Get the current state for the entity with the given __id__ @@ -28,7 +30,7 @@ trait EphemeralLog[Id, S <: EphemeralState, Command, Rejection] { * @param notFound * if no state is found, fails with this rejection */ - def stateOr[R <: Rejection](ref: ProjectRef, id: Id, notFound: => R): IO[R, S] + def stateOr[R2 <: R](ref: ProjectRef, id: Id, notFound: => R2): IO[S] /** * Evaluates the argument __command__ in the context of entity identified by __id__. @@ -43,7 +45,7 @@ trait EphemeralLog[Id, S <: EphemeralState, Command, Rejection] { * the newly generated state if the command was evaluated successfully, or the rejection of the __command__ * otherwise */ - def evaluate(ref: ProjectRef, id: Id, command: Command): IO[Rejection, S] + def evaluate(ref: ProjectRef, id: Id, command: Command): IO[S] } @@ -52,20 +54,20 @@ object EphemeralLog { /** * Creates on a ephemeral log for the given definition and config */ - def apply[Id, S <: EphemeralState, Command, Rejection]( - definition: EphemeralDefinition[Id, S, Command, Rejection], + def apply[Id, S <: EphemeralState, Command, R <: Rejection]( + definition: EphemeralDefinition[Id, S, Command, R], config: EphemeralLogConfig, xas: Transactors - ): EphemeralLog[Id, S, Command, Rejection] = { + )(implicit execution: EvaluationExecution): EphemeralLog[Id, S, Command, R] = { val stateStore = EphemeralStateStore(definition.tpe, definition.stateSerializer, config.ttl, xas) - new EphemeralLog[Id, S, Command, Rejection] { + new EphemeralLog[Id, S, Command, R] { - override def stateOr[R <: Rejection](ref: ProjectRef, id: Id, notFound: => R): IO[R, S] = + override def stateOr[R2 <: R](ref: ProjectRef, id: Id, notFound: => R2): IO[S] = stateStore.get(ref, id).flatMap { - IO.fromOption(_, notFound) + IO.fromOption(_)(notFound) } - override def evaluate(ref: ProjectRef, id: Id, command: Command): IO[Rejection, S] = { + override def evaluate(ref: ProjectRef, id: Id, command: Command): IO[S] = { for { newState <- definition.evaluate(command, config.maxDuration) res <- stateStore @@ -73,8 +75,7 @@ object EphemeralLog { .attemptSomeSqlState { case sqlstate.class23.UNIQUE_VIOLATION => definition.onUniqueViolation(id, command) } - .transact(xas.write) - .hideErrors + .transact(xas.writeCE) _ <- IO.fromEither(res) } yield newState } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/execution/EvaluationExecution.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/execution/EvaluationExecution.scala new file mode 100644 index 0000000000..727a6b763d --- /dev/null +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/execution/EvaluationExecution.scala @@ -0,0 +1,5 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing.execution + +import cats.effect.{ContextShift, IO, Timer} + +final case class EvaluationExecution(timer: Timer[IO], contextShift: ContextShift[IO]) diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/EphemeralStateStore.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/EphemeralStateStore.scala index d055f0d3b1..06736445ae 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/EphemeralStateStore.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/EphemeralStateStore.scala @@ -1,5 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.state +import cats.effect.IO import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.sourcing.{Serializer, Transactors} import ch.epfl.bluebrain.nexus.delta.sourcing.model._ @@ -7,7 +8,6 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.state.State.EphemeralState import doobie._ import doobie.implicits._ import doobie.postgres.implicits._ -import monix.bio.UIO import ch.epfl.bluebrain.nexus.delta.sourcing.implicits.IriInstances import scala.concurrent.duration.FiniteDuration @@ -25,7 +25,7 @@ trait EphemeralStateStore[Id, S <: EphemeralState] { /** * Returns the state */ - def get(ref: ProjectRef, id: Id): UIO[Option[S]] + def get(ref: ProjectRef, id: Id): IO[Option[S]] } object EphemeralStateStore { @@ -66,12 +66,11 @@ object EphemeralStateStore { """.stripMargin }.update.run.void - override def get(ref: ProjectRef, id: Id): UIO[Option[S]] = + override def get(ref: ProjectRef, id: Id): IO[Option[S]] = sql"""SELECT value FROM public.ephemeral_states WHERE type = $tpe AND org = ${ref.organization} AND project = ${ref.project} AND id = $id""" .query[S] .option - .transact(xas.read) - .hideErrors + .transact(xas.readCE) } } diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/EphemeralLogSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/EphemeralLogSuite.scala index f965355d51..af893e2f00 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/EphemeralLogSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/EphemeralLogSuite.scala @@ -5,16 +5,17 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.Message.MessageRejection.{AlreadyE import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv import ch.epfl.bluebrain.nexus.delta.sourcing.Message.{CreateMessage, MessageRejection, MessageState} import ch.epfl.bluebrain.nexus.delta.sourcing.config.EphemeralLogConfig +import ch.epfl.bluebrain.nexus.delta.sourcing.execution.EvaluationExecution import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, User} import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef} -import ch.epfl.bluebrain.nexus.testkit.bio.BioSuite import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie +import ch.epfl.bluebrain.nexus.testkit.ce.CatsEffectSuite import munit.AnyFixture import java.time.Instant import scala.concurrent.duration._ -class EphemeralLogSuite extends BioSuite with Doobie.Fixture with Doobie.Assertions { +class EphemeralLogSuite extends CatsEffectSuite with Doobie.Fixture with Doobie.Assertions { override def munitFixtures: Seq[AnyFixture[_]] = List(doobie) private lazy val xas = doobie() @@ -27,6 +28,8 @@ class EphemeralLogSuite extends BioSuite with Doobie.Fixture with Doobie.Asserti (_, command) => AlreadyExists(command.id, command.project) ) + implicit val ee: EvaluationExecution = EvaluationExecution(timer, contextShift) + private lazy val log = EphemeralLog( definition, EphemeralLogConfig(100.millis, 5.hours), @@ -39,36 +42,35 @@ class EphemeralLogSuite extends BioSuite with Doobie.Fixture with Doobie.Asserti private val alice = User("Alice", Label.unsafe("Wonderland")) private val message = MessageState(id, proj, text, alice, Instant.EPOCH, Anonymous) + private def createMessage(text: String) = + log.evaluate(proj, id, CreateMessage(id, proj, text, alice)) + test("Raise an error with a non-existent project") { - log.stateOr(ProjectRef.unsafe("xxx", "xxx"), id, NotFound).error(NotFound) + log.stateOr(ProjectRef.unsafe("xxx", "xxx"), id, NotFound).intercept(NotFound) } test("Raise an error with a non-existent id") { - log.stateOr(proj, nxv + "xxx", NotFound).error(NotFound) + log.stateOr(proj, nxv + "xxx", NotFound).intercept(NotFound) } test("Raise an error if the text message is too long and save nothing") { for { - _ <- log - .evaluate(proj, id, CreateMessage(id, proj, "Hello, World !", alice)) - .error(MessageTooLong(id, proj)) - _ <- log.stateOr(proj, id, NotFound).error(NotFound) + _ <- createMessage("Hello, World !").intercept(MessageTooLong(id, proj)) + _ <- log.stateOr(proj, id, NotFound).intercept(NotFound) } yield () } test("Evaluate successfully the command and save the message") { for { - _ <- log.evaluate(proj, id, CreateMessage(id, proj, text, alice)).assert(message) - _ <- log.stateOr(proj, id, NotFound).assert(message) + _ <- createMessage(text).assertEquals(message) + _ <- log.stateOr(proj, id, NotFound).assertEquals(message) } yield () } test("Raise an error if id already exists and save nothing") { for { - _ <- log - .evaluate(proj, id, CreateMessage(id, proj, "Bye", alice)) - .error(AlreadyExists(id, proj)) - _ <- log.stateOr(proj, id, NotFound).assert(message) + _ <- createMessage("Bye").intercept(AlreadyExists(id, proj)) + _ <- log.stateOr(proj, id, NotFound).assertEquals(message) } yield () } } diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Message.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Message.scala index ff55241f32..186e5c9faa 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Message.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Message.scala @@ -1,16 +1,17 @@ package ch.epfl.bluebrain.nexus.delta.sourcing +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.{nxv, schemas} import ch.epfl.bluebrain.nexus.delta.sourcing.Message.MessageRejection.MessageTooLong import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Subject} import ch.epfl.bluebrain.nexus.delta.sourcing.model.ResourceRef.Latest import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef, ResourceRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.rejection.Rejection import ch.epfl.bluebrain.nexus.delta.sourcing.state.State.EphemeralState import io.circe.Codec import io.circe.generic.extras.Configuration import io.circe.generic.extras.semiauto.deriveConfiguredCodec -import monix.bio.IO import java.time.Instant import scala.annotation.nowarn @@ -18,7 +19,7 @@ import scala.annotation.nowarn object Message { val entityType: EntityType = EntityType("message") - def evaluate(c: CreateMessage): IO[MessageRejection, MessageState] = + def evaluate(c: CreateMessage): IO[MessageState] = IO.raiseWhen(c.text.length > 10)(MessageTooLong(c.id, c.project)) .as(MessageState(c.id, c.project, c.text, c.from, Instant.EPOCH, Anonymous)) @@ -37,7 +38,9 @@ object Message { override def types: Set[Iri] = Set(nxv + "Message") } - sealed trait MessageRejection extends Product with Serializable + sealed trait MessageRejection extends Rejection { + override def reason: String = "Something bad happened." + } object MessageRejection { final case object NotFound extends MessageRejection diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/EphemeralStateStoreSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/EphemeralStateStoreSuite.scala index 8db1579544..de9af45b71 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/EphemeralStateStoreSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/EphemeralStateStoreSuite.scala @@ -6,16 +6,15 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.{DeleteExpired, Message} import ch.epfl.bluebrain.nexus.delta.sourcing.Message.MessageState import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, User} import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef} -import ch.epfl.bluebrain.nexus.testkit.IOFixedClock -import ch.epfl.bluebrain.nexus.testkit.bio.BioSuite import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie +import ch.epfl.bluebrain.nexus.testkit.ce.{CatsEffectSuite, IOFixedClock} import doobie.implicits._ import munit.AnyFixture import java.time.Instant import scala.concurrent.duration._ -class EphemeralStateStoreSuite extends BioSuite with Doobie.Fixture with Doobie.Assertions { +class EphemeralStateStoreSuite extends CatsEffectSuite with Doobie.Fixture with Doobie.Assertions { override def munitFixtures: Seq[AnyFixture[_]] = List(doobie) private lazy val xas = doobie() @@ -37,12 +36,12 @@ class EphemeralStateStoreSuite extends BioSuite with Doobie.Fixture with Doobie. private val m2 = nxv + "m2" private val message2 = MessageState(m2, project1, "Bye !", alice, Instant.EPOCH.plusSeconds(60L), Anonymous) - private lazy val deleteExpired = new DeleteExpired(xas)(IOFixedClock.ioClock(Instant.EPOCH.plusSeconds(6L))) + private lazy val deleteExpired = new DeleteExpired(xas)(IOFixedClock.ceClock(Instant.EPOCH.plusSeconds(6L))) test("save the states") { for { - _ <- store.save(message1).transact(xas.write).assert(()) - _ <- store.save(message2).transact(xas.write).assert(()) + _ <- store.save(message1).transact(xas.writeCE).assert + _ <- store.save(message2).transact(xas.writeCE).assert } yield () } diff --git a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsEffectSuite.scala b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsEffectSuite.scala index 5a618b9925..39a87deca8 100644 --- a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsEffectSuite.scala +++ b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsEffectSuite.scala @@ -22,14 +22,12 @@ abstract class CatsEffectSuite protected val ioTimeout: FiniteDuration = 45.seconds implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) + implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) override def munitValueTransforms: List[ValueTransform] = super.munitValueTransforms ++ List(munitIOTransform, munitBIOTransform) private val munitIOTransform: ValueTransform = { - implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) - implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) - new ValueTransform( "IO", { case io: IO[_] => diff --git a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsIOValues.scala b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsIOValues.scala index f1c5cdcc63..6daa66fdf7 100644 --- a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsIOValues.scala +++ b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsIOValues.scala @@ -1,6 +1,6 @@ package ch.epfl.bluebrain.nexus.testkit.ce -import cats.effect.IO +import cats.effect.{ContextShift, IO, Timer} import org.scalactic.source import org.scalatest.Assertion import org.scalatest.Assertions._ @@ -10,6 +10,9 @@ import scala.reflect.ClassTag trait CatsIOValues extends CatsIOValuesLowPrio { + implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) + implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) + implicit def ioToFutureAssertion(io: IO[Assertion]): Future[Assertion] = io.unsafeToFuture() implicit def futureListToFutureAssertion(future: Future[List[Assertion]])(implicit From 19728214e8f7b9359114bd6d152ecac12bf03f2a Mon Sep 17 00:00:00 2001 From: Simon Dumas Date: Tue, 10 Oct 2023 19:38:43 +0200 Subject: [PATCH 2/3] Fix wiring --- .../epfl/bluebrain/nexus/delta/wiring/StreamModule.scala | 7 ++++--- .../nexus/delta/plugins/archive/ArchivePluginConfig.scala | 6 +++--- .../nexus/delta/plugins/archive/ArchivePluginModule.scala | 3 ++- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/StreamModule.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/StreamModule.scala index 22a1b2e6a9..41d1c22b09 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/StreamModule.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/StreamModule.scala @@ -1,6 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.wiring -import cats.effect.{Clock, Sync} +import cats.effect.{Clock, IO, Sync, Timer} import ch.epfl.bluebrain.nexus.delta.sdk.ResourceShifts import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream import ch.epfl.bluebrain.nexus.delta.sourcing.config.{ProjectionConfig, QueryConfig} @@ -9,6 +9,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.stream._ import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes._ import ch.epfl.bluebrain.nexus.delta.sourcing.{DeleteExpired, PurgeElemFailures, Transactors} import izumi.distage.model.definition.ModuleDef +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import monix.bio.{Task, UIO} /** @@ -55,8 +56,8 @@ object StreamModule extends ModuleDef { } make[DeleteExpired].fromEffect { - (supervisor: Supervisor, config: ProjectionConfig, xas: Transactors, clock: Clock[UIO]) => - DeleteExpired(supervisor, config, xas)(clock) + (supervisor: Supervisor, config: ProjectionConfig, xas: Transactors, clock: Clock[IO], timer: Timer[IO]) => + DeleteExpired(supervisor, config, xas)(clock, timer).toUIO } make[PurgeElemFailures].fromEffect { diff --git a/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchivePluginConfig.scala b/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchivePluginConfig.scala index f14a7bb82b..f74d0baf5f 100644 --- a/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchivePluginConfig.scala +++ b/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchivePluginConfig.scala @@ -1,8 +1,8 @@ package ch.epfl.bluebrain.nexus.delta.plugins.archive +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.sourcing.config.EphemeralLogConfig import com.typesafe.config.Config -import monix.bio.UIO import pureconfig.generic.semiauto.deriveReader import pureconfig.{ConfigReader, ConfigSource} @@ -23,8 +23,8 @@ object ArchivePluginConfig { /** * Converts a [[Config]] into an [[ArchivePluginConfig]] */ - def load(config: Config): UIO[ArchivePluginConfig] = - UIO.delay { + def load(config: Config): IO[ArchivePluginConfig] = + IO.delay { ConfigSource .fromConfig(config) .at("plugins.archive") diff --git a/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchivePluginModule.scala b/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchivePluginModule.scala index 1af0b8fc63..70360672ee 100644 --- a/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchivePluginModule.scala +++ b/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchivePluginModule.scala @@ -21,6 +21,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.execution.EvaluationExecution import com.typesafe.config.Config import izumi.distage.model.definition.{Id, ModuleDef} +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ /** * Archive plugin wiring. @@ -28,7 +29,7 @@ import izumi.distage.model.definition.{Id, ModuleDef} object ArchivePluginModule extends ModuleDef { implicit private val classLoader: ClassLoader = getClass.getClassLoader - make[ArchivePluginConfig].fromEffect { cfg: Config => ArchivePluginConfig.load(cfg) } + make[ArchivePluginConfig].fromEffect { cfg: Config => ArchivePluginConfig.load(cfg).toUIO } make[ArchiveDownload].from { ( From c34b2c29563a052a2a4b948a379202d10c08bd65 Mon Sep 17 00:00:00 2001 From: Simon Dumas Date: Wed, 11 Oct 2023 09:17:23 +0200 Subject: [PATCH 3/3] Fix integration tests --- .../bluebrain/nexus/delta/plugins/archive/model/Zip.scala | 2 +- .../nexus/delta/plugins/archive/routes/ArchiveRoutes.scala | 4 ++-- .../test/scala/ch/epfl/bluebrain/nexus/tests/BaseSpec.scala | 5 +---- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/model/Zip.scala b/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/model/Zip.scala index d8ea2022ee..ce6f87e8da 100644 --- a/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/model/Zip.scala +++ b/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/model/Zip.scala @@ -27,7 +27,7 @@ object Zip { def metadata(filename: String): ArchiveMetadata = ArchiveMetadata.create(filename) - def checkZipHeader: Directive[Tuple1[Boolean]] = + def checkHeader: Directive[Tuple1[Boolean]] = extractRequest.map { req => HeadersUtils.matches(req.headers, Zip.contentType.mediaType) } diff --git a/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/routes/ArchiveRoutes.scala b/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/routes/ArchiveRoutes.scala index c5a09424a8..1a06913a5f 100644 --- a/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/routes/ArchiveRoutes.scala +++ b/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/routes/ArchiveRoutes.scala @@ -90,13 +90,13 @@ class ArchiveRoutes( } private def emitCreatedArchive(io: IO[ArchiveResource]): Route = - Zip.checkZipHeader { + Zip.checkHeader { case true => emitRedirect(SeeOther, io.map(_.uris.accessUri).attemptNarrow[ArchiveRejection]) case false => emitMetadata(Created, io) } private def emitArchiveDownload(id: IdSegment, project: ProjectRef)(implicit caller: Caller): Route = - Zip.checkZipHeader { + Zip.checkHeader { case true => parameter("ignoreNotFound".as[Boolean] ? false) { ignoreNotFound => emitArchiveFile(archives.download(id, project, ignoreNotFound)) diff --git a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/BaseSpec.scala b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/BaseSpec.scala index be13ddc138..13eb936f20 100644 --- a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/BaseSpec.scala +++ b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/BaseSpec.scala @@ -6,8 +6,8 @@ import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers._ import akka.http.scaladsl.testkit.ScalatestRouteTest import akka.util.ByteString +import cats.effect.IO import cats.effect.concurrent.Ref -import cats.effect.{ContextShift, IO} import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.testkit._ @@ -30,7 +30,6 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AsyncWordSpecLike import org.scalatest.{Assertion, BeforeAndAfterAll, OptionValues} -import scala.concurrent.ExecutionContext import scala.concurrent.duration._ trait BaseSpec @@ -49,8 +48,6 @@ trait BaseSpec with ScalaFutures with Matchers { - implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) - private val logger = Logger.cats[this.type] implicit val config: TestsConfig = load[TestsConfig](ConfigFactory.load(), "tests")