From d2e67a1a8dfc15b4b5825ae8b606ddbfc58d4ccb Mon Sep 17 00:00:00 2001 From: Daniel Bell Date: Mon, 16 Oct 2023 11:58:10 +0200 Subject: [PATCH] Migrate SSE to cats effect --- .../nexus/delta/routes/ElemRoutes.scala | 8 +-- .../nexus/delta/routes/EventsRoutes.scala | 13 +++-- .../nexus/delta/wiring/EventsModule.scala | 26 ++++----- .../nexus/delta/routes/ElemRoutesSpec.scala | 13 +++-- .../nexus/delta/routes/EventsRoutesSpec.scala | 17 +++--- .../nexus/delta/kernel/RetryStrategy.scala | 21 +++++++ .../migration/MigrateEffectSyntax.scala | 8 +++ .../blazegraph/BlazegraphPluginModule.scala | 6 +- .../BlazegraphViewsIndexingRoutes.scala | 9 ++- .../BlazegraphViewsIndexingRoutesSpec.scala | 3 +- .../CompositeViewsPluginModule.scala | 8 ++- .../compositeviews/client/DeltaClient.scala | 12 ++-- .../routes/CompositeViewsIndexingRoutes.scala | 9 ++- .../client/DeltaClientSpec.scala | 4 +- .../routes/CompositeViewsRoutesFixtures.scala | 2 + .../ElasticSearchPluginModule.scala | 6 +- .../routes/ElasticSearchIndexingRoutes.scala | 8 ++- .../ElasticSearchViewsRoutesFixtures.scala | 2 + .../nexus/delta/sdk/acls/AclsImpl.scala | 15 +++-- .../nexus/delta/sdk/ce/DeltaDirectives.scala | 5 ++ .../delta/sdk/directives/ResponseToSse.scala | 23 ++++---- .../nexus/delta/sdk/sse/SseElemStream.scala | 13 +++-- .../nexus/delta/sdk/sse/SseEventLog.scala | 58 +++++++++---------- .../nexus/delta/sdk/sse/package.scala | 4 +- .../delta/sdk/stream/StreamConverter.scala | 40 ++++++------- .../sdk/syntax/ProjectionErrorsSyntax.scala | 10 ++-- .../delta/sdk/sse/SseEventLogSuite.scala | 17 +++--- .../nexus/delta/sourcing/GlobalEventLog.scala | 7 ++- .../sourcing/state/GlobalStateStore.scala | 11 ++-- .../state/GlobalStateStoreSuite.scala | 16 ++--- .../nexus/testkit/bio/StreamAssertions.scala | 17 ++++++ 31 files changed, 249 insertions(+), 162 deletions(-) diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/ElemRoutes.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/ElemRoutes.scala index 5548d7cfb7..245dd41a0b 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/ElemRoutes.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/ElemRoutes.scala @@ -4,12 +4,13 @@ import akka.http.scaladsl.model.StatusCodes import akka.http.scaladsl.model.StatusCodes.OK import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.Route +import cats.effect.{ContextShift, IO} import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.contexts import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution} import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck -import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives.{emit, lastEventId} +import ch.epfl.bluebrain.nexus.delta.sdk.ce.DeltaDirectives.{emit, lastEventId} import ch.epfl.bluebrain.nexus.delta.sdk.directives.UriDirectives.baseUriPrefix import ch.epfl.bluebrain.nexus.delta.sdk.directives.{AuthDirectives, DeltaSchemeDirectives} import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities @@ -24,7 +25,6 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.stream.RemainingElems import io.circe.syntax.EncoderOps import io.circe.{Encoder, JsonObject} import kamon.instrumentation.akka.http.TracingDirectives.operationName -import monix.execution.Scheduler import java.time.Instant @@ -40,9 +40,9 @@ class ElemRoutes( schemeDirectives: DeltaSchemeDirectives )(implicit baseUri: BaseUri, - s: Scheduler, cr: RemoteContextResolution, - ordering: JsonKeyOrdering + ordering: JsonKeyOrdering, + contextShift: ContextShift[IO] ) extends AuthDirectives(identities, aclCheck: AclCheck) { import baseUri.prefixSegment import schemeDirectives._ diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/EventsRoutes.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/EventsRoutes.scala index 1cbf33843a..f76e5543c9 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/EventsRoutes.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/EventsRoutes.scala @@ -3,6 +3,8 @@ package ch.epfl.bluebrain.nexus.delta.routes import akka.http.scaladsl.model.StatusCodes.OK import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.{Directive1, Route} +import cats.effect.{ContextShift, IO} +import cats.implicits.catsSyntaxApplicativeError import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck @@ -12,11 +14,12 @@ import ch.epfl.bluebrain.nexus.delta.sdk.directives.UriDirectives._ import ch.epfl.bluebrain.nexus.delta.sdk.directives.{AuthDirectives, DeltaSchemeDirectives} import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri +import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.OrganizationRejection import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.events +import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectRejection import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseEventLog import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label import kamon.instrumentation.akka.http.TracingDirectives.operationName -import monix.execution.Scheduler /** * The global events route. @@ -37,7 +40,7 @@ class EventsRoutes( schemeDirectives: DeltaSchemeDirectives )(implicit baseUri: BaseUri, - s: Scheduler, + c: ContextShift[IO], cr: RemoteContextResolution, ordering: JsonKeyOrdering ) extends AuthDirectives(identities, aclCheck: AclCheck) { @@ -100,7 +103,7 @@ class EventsRoutes( operationName(s"$prefixSegment/$selector/{org}/events") { concat( authorizeFor(org, events.read).apply { - emit(sseEventLog.streamBy(selector, org, offset)) + emit(sseEventLog.streamBy(selector, org, offset).attemptNarrow[OrganizationRejection]) }, (head & authorizeFor(org, events.read)) { complete(OK) @@ -114,7 +117,7 @@ class EventsRoutes( concat( operationName(s"$prefixSegment/$selector/{org}/{proj}/events") { authorizeFor(projectRef, events.read).apply { - emit(sseEventLog.streamBy(selector, projectRef, offset)) + emit(sseEventLog.streamBy(selector, projectRef, offset).attemptNarrow[ProjectRejection]) } }, (head & authorizeFor(projectRef, events.read)) { @@ -144,7 +147,7 @@ object EventsRoutes { schemeDirectives: DeltaSchemeDirectives )(implicit baseUri: BaseUri, - s: Scheduler, + c: ContextShift[IO], cr: RemoteContextResolution, ordering: JsonKeyOrdering ): Route = new EventsRoutes(identities, aclCheck, sseEventLog, schemeDirectives).routes diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/EventsModule.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/EventsModule.scala index 0eec5f9fec..c352c1904f 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/EventsModule.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/EventsModule.scala @@ -1,5 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.wiring +import cats.effect.{ContextShift, IO} import ch.epfl.bluebrain.nexus.delta.Main.pluginsMaxPriority import ch.epfl.bluebrain.nexus.delta.config.AppConfig import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ @@ -18,7 +19,6 @@ import ch.epfl.bluebrain.nexus.delta.sdk.sse.{SseElemStream, SseEncoder, SseEven import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import izumi.distage.model.definition.{Id, ModuleDef} -import monix.execution.Scheduler /** * Events wiring @@ -34,15 +34,13 @@ object EventsModule extends ModuleDef { xas: Transactors, jo: JsonKeyOrdering ) => - toCatsIO( - SseEventLog( - sseEncoders, - organizations.fetch(_).void.toBIO[OrganizationRejection], - projects.fetch(_).map { p => (p.value.organizationUuid, p.value.uuid) }, - config.sse, - xas - )(jo) - ) + SseEventLog( + sseEncoders, + organizations.fetch(_).void.toBIO[OrganizationRejection], + projects.fetch(_).map { p => (p.value.organizationUuid, p.value.uuid) }, + config.sse, + xas + )(jo) } make[SseElemStream].from { (qc: QueryConfig, xas: Transactors) => @@ -56,11 +54,11 @@ object EventsModule extends ModuleDef { sseEventLog: SseEventLog, schemeDirectives: DeltaSchemeDirectives, baseUri: BaseUri, - s: Scheduler, + c: ContextShift[IO], cr: RemoteContextResolution @Id("aggregate"), ordering: JsonKeyOrdering ) => - new EventsRoutes(identities, aclCheck, sseEventLog, schemeDirectives)(baseUri, s, cr, ordering) + new EventsRoutes(identities, aclCheck, sseEventLog, schemeDirectives)(baseUri, c, cr, ordering) } many[PriorityRoute].add { (route: EventsRoutes) => @@ -74,11 +72,11 @@ object EventsModule extends ModuleDef { sseElemStream: SseElemStream, schemeDirectives: DeltaSchemeDirectives, baseUri: BaseUri, - s: Scheduler, + contextShift: ContextShift[IO], cr: RemoteContextResolution @Id("aggregate"), ordering: JsonKeyOrdering ) => - new ElemRoutes(identities, aclCheck, sseElemStream, schemeDirectives)(baseUri, s, cr, ordering) + new ElemRoutes(identities, aclCheck, sseElemStream, schemeDirectives)(baseUri, cr, ordering, contextShift) } many[PriorityRoute].add { (route: ElemRoutes) => diff --git a/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/ElemRoutesSpec.scala b/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/ElemRoutesSpec.scala index e09a902401..c0300fb676 100644 --- a/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/ElemRoutesSpec.scala +++ b/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/ElemRoutesSpec.scala @@ -1,9 +1,10 @@ package ch.epfl.bluebrain.nexus.delta.routes -import akka.http.scaladsl.model.headers.{`Last-Event-ID`, OAuth2BearerToken} +import akka.http.scaladsl.model.headers.{OAuth2BearerToken, `Last-Event-ID`} import akka.http.scaladsl.model.sse.ServerSentEvent import akka.http.scaladsl.model.{MediaTypes, StatusCodes} import akka.http.scaladsl.server.Route +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclSimpleCheck import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaSchemeDirectives @@ -20,13 +21,13 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter import ch.epfl.bluebrain.nexus.delta.sourcing.stream.RemainingElems import ch.epfl.bluebrain.nexus.testkit.CirceLiteral import ch.epfl.bluebrain.nexus.testkit.bio.IOFromMap +import ch.epfl.bluebrain.nexus.testkit.ce.CatsIOValues import fs2.Stream -import monix.bio.{Task, UIO} import java.time.Instant import java.util.UUID -class ElemRoutesSpec extends BaseRouteSpec with CirceLiteral with IOFromMap { +class ElemRoutesSpec extends BaseRouteSpec with CirceLiteral with IOFromMap with CatsIOValues { private val aclCheck = AclSimpleCheck().accepted @@ -45,7 +46,7 @@ class ElemRoutesSpec extends BaseRouteSpec with CirceLiteral with IOFromMap { private val sseElemStream = new SseElemStream { - private val stream = Stream.emits(List(elem1, elem2, elem3)).covary[Task] + private val stream = Stream.emits(List(elem1, elem2, elem3)).covary[IO] override def continuous(project: ProjectRef, selectFilter: SelectFilter, start: Offset): ServerSentEventStream = stream @@ -56,8 +57,8 @@ class ElemRoutesSpec extends BaseRouteSpec with CirceLiteral with IOFromMap { project: ProjectRef, selectFilter: SelectFilter, start: Offset - ): UIO[Option[RemainingElems]] = - UIO.some(RemainingElems(999L, Instant.EPOCH)) + ): IO[Option[RemainingElems]] = + IO.pure(Some(RemainingElems(999L, Instant.EPOCH))) } private val routes = Route.seal( diff --git a/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/EventsRoutesSpec.scala b/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/EventsRoutesSpec.scala index 3ca0a8e34f..021c878e46 100644 --- a/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/EventsRoutesSpec.scala +++ b/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/EventsRoutesSpec.scala @@ -1,19 +1,18 @@ package ch.epfl.bluebrain.nexus.delta.routes -import akka.http.scaladsl.model.headers.{`Last-Event-ID`, OAuth2BearerToken} +import akka.http.scaladsl.model.headers.{OAuth2BearerToken, `Last-Event-ID`} import akka.http.scaladsl.model.sse.ServerSentEvent import akka.http.scaladsl.model.{MediaTypes, StatusCodes} import akka.http.scaladsl.server.Route +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclSimpleCheck import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaSchemeDirectives import ch.epfl.bluebrain.nexus.delta.sdk.identities.IdentitiesDummy import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller -import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.OrganizationRejection import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.OrganizationRejection.OrganizationNotFound import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.events import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContextDummy -import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectRejection import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectRejection.ProjectNotFound import ch.epfl.bluebrain.nexus.delta.sdk.sse.{ServerSentEventStream, SseEventLog} import ch.epfl.bluebrain.nexus.delta.sdk.utils.BaseRouteSpec @@ -22,12 +21,12 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset.{At, Start} import ch.epfl.bluebrain.nexus.testkit.bio.IOFromMap +import ch.epfl.bluebrain.nexus.testkit.ce.CatsIOValues import fs2.Stream -import monix.bio.IO import java.util.UUID -class EventsRoutesSpec extends BaseRouteSpec with IOFromMap { +class EventsRoutesSpec extends BaseRouteSpec with IOFromMap with CatsIOValues { private val uuid = UUID.randomUUID() @@ -64,24 +63,24 @@ class EventsRoutesSpec extends BaseRouteSpec with IOFromMap { override def streamBy(selector: Label, offset: Offset): ServerSentEventStream = stream(offset).filter(_.eventType.exists(_.toLowerCase == selector.value)) - override def stream(org: Label, offset: Offset): IO[OrganizationRejection, ServerSentEventStream] = + override def stream(org: Label, offset: Offset): IO[ServerSentEventStream] = IO.raiseWhen(org != projectRef.organization)(OrganizationNotFound(org)).as(stream(offset)) override def streamBy( selector: Label, org: Label, offset: Offset - ): IO[OrganizationRejection, ServerSentEventStream] = + ): IO[ServerSentEventStream] = IO.raiseWhen(org != projectRef.organization)(OrganizationNotFound(org)).as(streamBy(selector, offset)) - override def stream(project: ProjectRef, offset: Offset): IO[ProjectRejection, ServerSentEventStream] = + override def stream(project: ProjectRef, offset: Offset): IO[ServerSentEventStream] = IO.raiseWhen(project != projectRef)(ProjectNotFound(project)).as(stream(offset)) override def streamBy( selector: Label, project: ProjectRef, offset: Offset - ): IO[ProjectRejection, ServerSentEventStream] = + ): IO[ServerSentEventStream] = IO.raiseWhen(project != projectRef)(ProjectNotFound(project)).as(streamBy(selector, offset)) override def allSelectors: Set[Label] = Set(acl, project, resources) diff --git a/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/RetryStrategy.scala b/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/RetryStrategy.scala index 5065cfc122..bd3d0662b2 100644 --- a/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/RetryStrategy.scala +++ b/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/RetryStrategy.scala @@ -2,6 +2,8 @@ package ch.epfl.bluebrain.nexus.delta.kernel import com.typesafe.scalalogging.{Logger => ScalaLoggingLogger} import monix.bio.{IO, UIO} +import cats.effect.{IO => CIO} +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toMonixBIOOps import pureconfig.ConfigReader import pureconfig.error.{CannotConvert, ConfigReaderFailures, ConvertFailure} import pureconfig.generic.semiauto._ @@ -54,6 +56,18 @@ object RetryStrategy { UIO.delay(logger.error(message)) } + /** + * Log errors when retrying + */ + def logError[E](logger: org.typelevel.log4cats.Logger[CIO], action: String): (E, RetryDetails) => CIO[Unit] = { + case (err, WillDelayAndRetry(nextDelay, retriesSoFar, _)) => + val message = s"""Error $err while $action: retrying in ${nextDelay.toMillis}ms (retries so far: $retriesSoFar)""" + logger.warn(message) + case (err, GivingUp(totalRetries, _)) => + val message = s"""Error $err while $action, giving up (total retries: $totalRetries)""" + logger.error(message) + } + /** * Log errors when retrying */ @@ -125,6 +139,13 @@ object RetryStrategy { (t: Throwable, d: RetryDetails) => logError(logger, action)(t, d) ) + def retryOnNonFatal(config: RetryStrategyConfig, logger: org.typelevel.log4cats.Logger[CIO], action: String): RetryStrategy[Throwable] = + RetryStrategy( + config, + (t: Throwable) => NonFatal(t), + (t: Throwable, d: RetryDetails) => logError(logger, action)(t, d).toBIO[Throwable] + ) + } /** diff --git a/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/effect/migration/MigrateEffectSyntax.scala b/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/effect/migration/MigrateEffectSyntax.scala index 3420488176..c08d85b36f 100644 --- a/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/effect/migration/MigrateEffectSyntax.scala +++ b/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/effect/migration/MigrateEffectSyntax.scala @@ -10,11 +10,14 @@ import scala.reflect.ClassTag trait MigrateEffectSyntax { implicit def toCatsIO[E <: Throwable, A](io: BIO[E, A]): IO[A] = io.to[IO] + implicit def uioToCatsIO[E <: Throwable, A](io: UIO[A]): IO[A] = io.to[IO] implicit def toCatsIOOps[E <: Throwable, A](io: BIO[E, A]): MonixBioToCatsIOOps[E, A] = new MonixBioToCatsIOOps(io) + implicit def toCatsIOEitherOps[E, A](io: BIO[E, A]): MonixBioToCatsIOEitherOps[E, A] = new MonixBioToCatsIOEitherOps(io) implicit def toMonixBIOOps[A](io: IO[A]): CatsIOToBioOps[A] = new CatsIOToBioOps(io) val taskToIoK: Task ~> IO = λ[Task ~> IO](toCatsIO(_)) + val uioToIoK: UIO ~> IO = λ[UIO ~> IO](uioToCatsIO(_)) val ioToUioK: IO ~> UIO = λ[IO ~> UIO](_.toUIO) val ioToTaskK: IO ~> Task = λ[IO ~> Task](Task.from(_)) @@ -24,6 +27,11 @@ final class MonixBioToCatsIOOps[E <: Throwable, A](private val io: BIO[E, A]) ex def toCatsIO: IO[A] = io.to[IO] } +final class MonixBioToCatsIOEitherOps[E, A](private val io: BIO[E, A]) extends AnyVal { + def toCatsIOEither: IO[Either[E, A]] = io.attempt.to[IO] +} + + final class CatsIOToBioOps[A](private val io: IO[A]) extends AnyVal { def toBIO[E <: Throwable](implicit E: ClassTag[E]): BIO[E, A] = BIO.from(io).mapErrorPartialWith { diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphPluginModule.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphPluginModule.scala index 8ef5fcea0d..fe8693df12 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphPluginModule.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphPluginModule.scala @@ -1,13 +1,13 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph import akka.actor.typed.ActorSystem -import cats.effect.Clock +import cats.effect.{Clock, ContextShift, IO} import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.config.BlazegraphViewsConfig import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.BlazegraphCoordinator import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewRejection.ProjectContextRejection -import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.{contexts, schema => viewsSchemaId, BlazegraphView, BlazegraphViewEvent} +import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.{BlazegraphView, BlazegraphViewEvent, contexts, schema => viewsSchemaId} import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.routes.{BlazegraphViewsIndexingRoutes, BlazegraphViewsRoutes, BlazegraphViewsRoutesHandler} import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries.{BlazegraphSlowQueryDeleter, BlazegraphSlowQueryLogger, BlazegraphSlowQueryStore} import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi @@ -227,6 +227,7 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef { baseUri: BaseUri, cfg: BlazegraphViewsConfig, s: Scheduler, + c: ContextShift[IO], cr: RemoteContextResolution @Id("aggregate"), ordering: JsonKeyOrdering ) => @@ -240,6 +241,7 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef { )( baseUri, s, + c, cr, ordering, cfg.pagination diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutes.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutes.scala index e489b1d164..3362506209 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutes.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutes.scala @@ -2,6 +2,9 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.routes import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.Route +import cats.effect.{ContextShift, IO => CIO} +import cats.implicits.catsSyntaxApplicativeError +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.IndexingViewDef.ActiveViewDef import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewRejection._ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model._ @@ -41,6 +44,7 @@ class BlazegraphViewsIndexingRoutes( )(implicit baseUri: BaseUri, s: Scheduler, + c: ContextShift[CIO], cr: RemoteContextResolution, ordering: JsonKeyOrdering, pc: PaginationConfig @@ -80,10 +84,10 @@ class BlazegraphViewsIndexingRoutes( concat( (pathPrefix("sse") & lastEventId) { offset => emit( - fetch(id, ref) + fetch(id, ref).toCatsIO .map { view => projectionErrors.sses(view.ref.project, view.ref.viewId, offset) - } + }.attemptNarrow[BlazegraphViewRejection] ) }, (fromPaginated & timeRange("instant") & extractUri & pathEndOrSingleSlash) { @@ -148,6 +152,7 @@ object BlazegraphViewsIndexingRoutes { )(implicit baseUri: BaseUri, s: Scheduler, + c: ContextShift[CIO], cr: RemoteContextResolution, ordering: JsonKeyOrdering, pc: PaginationConfig diff --git a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutesSpec.scala b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutesSpec.scala index 0e642f44ba..b4ebbe05bc 100644 --- a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutesSpec.scala +++ b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutesSpec.scala @@ -23,12 +23,13 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.FailedElem import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionProgress import ch.epfl.bluebrain.nexus.testkit.bio.IOFromMap +import ch.epfl.bluebrain.nexus.testkit.ce.CatsIOValues import monix.bio.IO import java.time.Instant import scala.concurrent.duration._ -class BlazegraphViewsIndexingRoutesSpec extends BlazegraphViewRoutesFixtures with IOFromMap { +class BlazegraphViewsIndexingRoutesSpec extends BlazegraphViewRoutesFixtures with IOFromMap with CatsIOValues { private lazy val projections = Projections(xas, queryConfig, 1.hour) private lazy val projectionErrors = ProjectionErrors(xas, queryConfig) diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsPluginModule.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsPluginModule.scala index e761582993..1867ab6484 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsPluginModule.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsPluginModule.scala @@ -1,7 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews import akka.actor.typed.ActorSystem -import cats.effect.Clock +import cats.effect.{Clock, ContextShift, IO} import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient @@ -57,10 +57,11 @@ class CompositeViewsPluginModule(priority: Int) extends ModuleDef { cfg: CompositeViewsConfig, as: ActorSystem[Nothing], sc: Scheduler, + c: ContextShift[IO], authTokenProvider: AuthTokenProvider ) => val httpClient = HttpClient()(cfg.remoteSourceClient.http, as.classicSystem, sc) - DeltaClient(httpClient, authTokenProvider, cfg.remoteSourceCredentials, cfg.remoteSourceClient.retryDelay)(as, sc) + DeltaClient(httpClient, authTokenProvider, cfg.remoteSourceCredentials, cfg.remoteSourceClient.retryDelay)(as, sc, c) } make[BlazegraphClient].named("blazegraph-composite-indexing-client").from { @@ -315,6 +316,7 @@ class CompositeViewsPluginModule(priority: Int) extends ModuleDef { baseUri: BaseUri, config: CompositeViewsConfig, s: Scheduler, + c: ContextShift[IO], cr: RemoteContextResolution @Id("aggregate"), ordering: JsonKeyOrdering ) => @@ -327,7 +329,7 @@ class CompositeViewsPluginModule(priority: Int) extends ModuleDef { projections, projectionErrors, schemeDirectives - )(baseUri, config.pagination, s, cr, ordering) + )(baseUri, config.pagination, s, c, cr, ordering) } make[CompositeView.Shift].from { (views: CompositeViews, base: BaseUri) => diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClient.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClient.scala index 27fbd59de6..fa75b179f9 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClient.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClient.scala @@ -4,9 +4,10 @@ import akka.actor.typed.ActorSystem import akka.http.scaladsl.client.RequestBuilding.{Get, Head} import akka.http.scaladsl.model.ContentTypes.`application/json` import akka.http.scaladsl.model.Uri.Query -import akka.http.scaladsl.model.headers.{`Last-Event-ID`, Accept} +import akka.http.scaladsl.model.headers.{Accept, `Last-Event-ID`} import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCodes} import akka.stream.alpakka.sse.scaladsl.EventSource +import cats.effect.ContextShift import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.MigrateEffectSyntax import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewSource.RemoteProjectSource import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.stream.CompositeBranch @@ -27,6 +28,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{Elem, RemainingElems} import com.typesafe.scalalogging.Logger import io.circe.parser.decode import fs2._ +import cats.effect.{IO => CIO} import monix.bio.{IO, UIO} import monix.execution.Scheduler @@ -87,7 +89,8 @@ object DeltaClient { retryDelay: FiniteDuration )(implicit as: ActorSystem[Nothing], - scheduler: Scheduler + scheduler: Scheduler, + c: ContextShift[CIO] ) extends DeltaClient with MigrateEffectSyntax { @@ -150,7 +153,7 @@ object DeltaClient { logger.error(s"Failed to decode sse event '$sse'", err) Stream.empty } - } + }.translate(ioToTaskK) } private def typeQuery(types: Set[Iri]) = @@ -187,7 +190,8 @@ object DeltaClient { retryDelay: FiniteDuration )(implicit as: ActorSystem[Nothing], - sc: Scheduler + sc: Scheduler, + c: ContextShift[CIO] ): DeltaClient = new DeltaClientImpl(client, authTokenProvider, credentials, retryDelay) } diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsIndexingRoutes.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsIndexingRoutes.scala index 6b1488f3b4..02a0e10347 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsIndexingRoutes.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsIndexingRoutes.scala @@ -2,7 +2,9 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.routes import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.Route +import cats.effect.ContextShift import cats.syntax.all._ +import cats.effect.{IO => CIO} import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.routes.BlazegraphViewsDirectives import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef.ActiveViewDef import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewRejection._ @@ -29,6 +31,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject import ch.epfl.bluebrain.nexus.delta.sourcing.model.{FailedElemLogRow, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.projections.ProjectionErrors +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import monix.bio.IO import monix.execution.Scheduler class CompositeViewsIndexingRoutes( @@ -44,6 +47,7 @@ class CompositeViewsIndexingRoutes( baseUri: BaseUri, paginationConfig: PaginationConfig, s: Scheduler, + c: ContextShift[CIO], cr: RemoteContextResolution, ordering: JsonKeyOrdering ) extends AuthDirectives(identities, aclCheck) @@ -98,10 +102,10 @@ class CompositeViewsIndexingRoutes( concat( (pathPrefix("sse") & lastEventId) { offset => emit( - fetchView(id, ref) + fetchView(id, ref).toCatsIO .map { view => projectionErrors.sses(view.project, view.id, offset) - } + }.attemptNarrow[CompositeViewRejection] ) }, (fromPaginated & timeRange("instant") & extractUri & pathEndOrSingleSlash) { @@ -262,6 +266,7 @@ object CompositeViewsIndexingRoutes { baseUri: BaseUri, paginationConfig: PaginationConfig, s: Scheduler, + c: ContextShift[CIO], cr: RemoteContextResolution, ordering: JsonKeyOrdering ): Route = diff --git a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClientSpec.scala b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClientSpec.scala index 2403661675..612060ae7d 100644 --- a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClientSpec.scala +++ b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClientSpec.scala @@ -1,7 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.client import akka.actor.typed.scaladsl.adapter._ -import akka.actor.{typed, ActorSystem} +import akka.actor.{ActorSystem, typed} import akka.http.scaladsl.Http import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._ import akka.http.scaladsl.model._ @@ -26,6 +26,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.SuccessElem import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{Elem, RemainingElems} +import ch.epfl.bluebrain.nexus.testkit.ce.CatsIOValues import ch.epfl.bluebrain.nexus.testkit.{IOValues, TestHelpers} import io.circe.syntax.EncoderOps import monix.execution.Scheduler @@ -45,6 +46,7 @@ class DeltaClientSpec with ScalaFutures with OptionValues with IOValues + with CatsIOValues with ConfigFixtures with BeforeAndAfterAll with TestHelpers diff --git a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsRoutesFixtures.scala b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsRoutesFixtures.scala index ce1e47dd25..21a411c459 100644 --- a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsRoutesFixtures.scala +++ b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsRoutesFixtures.scala @@ -14,6 +14,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.utils.RouteHelpers import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Authenticated, Group, User} import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.DoobieScalaTestFixture +import ch.epfl.bluebrain.nexus.testkit.ce.CatsIOValues import ch.epfl.bluebrain.nexus.testkit.{CirceEq, CirceLiteral, IOFixedClock, TestMatchers} import org.scalatest.{CancelAfterFailure, Inspectors, OptionValues} import org.scalatest.matchers.should.Matchers @@ -25,6 +26,7 @@ trait CompositeViewsRoutesFixtures with CirceLiteral with CirceEq with IOFixedClock + with CatsIOValues with OptionValues with TestMatchers with Inspectors diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchPluginModule.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchPluginModule.scala index 5eec1f42fe..d5498d0599 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchPluginModule.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchPluginModule.scala @@ -1,7 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch import akka.actor.typed.ActorSystem -import cats.effect.Clock +import cats.effect.{Clock, ContextShift, IO} import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient @@ -9,7 +9,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.config.ElasticSearchV import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.deletion.{ElasticSearchDeletionTask, EventMetricsDeletionTask} import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.ElasticSearchCoordinator import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewRejection.ProjectContextRejection -import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{contexts, defaultElasticsearchMapping, defaultElasticsearchSettings, schema => viewsSchemaId, ElasticSearchView, ElasticSearchViewEvent} +import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{ElasticSearchView, ElasticSearchViewEvent, contexts, defaultElasticsearchMapping, defaultElasticsearchSettings, schema => viewsSchemaId} import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.query.{DefaultViewsQuery, ElasticSearchQueryError} import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.routes._ import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary @@ -245,6 +245,7 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef { schemeDirectives: DeltaSchemeDirectives, baseUri: BaseUri, s: Scheduler, + c: ContextShift[IO], cr: RemoteContextResolution @Id("aggregate"), esConfig: ElasticSearchViewsConfig, ordering: JsonKeyOrdering, @@ -262,6 +263,7 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef { baseUri, esConfig.pagination, s, + c, cr, ordering ) diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchIndexingRoutes.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchIndexingRoutes.scala index 35fd383944..5c8e40889d 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchIndexingRoutes.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchIndexingRoutes.scala @@ -2,6 +2,9 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.routes import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server._ +import cats.effect.{ContextShift, IO => CIO} +import cats.implicits.catsSyntaxApplicativeError +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchViewsQuery import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.IndexingViewDef.ActiveViewDef import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewRejection._ @@ -62,6 +65,7 @@ final class ElasticSearchIndexingRoutes( baseUri: BaseUri, paginationConfig: PaginationConfig, s: Scheduler, + c: ContextShift[CIO], cr: RemoteContextResolution, ordering: JsonKeyOrdering ) extends AuthDirectives(identities, aclCheck) @@ -99,10 +103,11 @@ final class ElasticSearchIndexingRoutes( concat( (pathPrefix("sse") & lastEventId) { offset => emit( - fetch(id, ref) + fetch(id, ref).toCatsIO .map { view => projectionErrors.sses(view.ref.project, view.ref.viewId, offset) } + .attemptNarrow[ElasticSearchViewRejection] ) }, (fromPaginated & timeRange("instant") & extractUri & pathEndOrSingleSlash) { @@ -174,6 +179,7 @@ object ElasticSearchIndexingRoutes { baseUri: BaseUri, paginationConfig: PaginationConfig, s: Scheduler, + c: ContextShift[CIO], cr: RemoteContextResolution, ordering: JsonKeyOrdering ): Route = diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchViewsRoutesFixtures.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchViewsRoutesFixtures.scala index bb74d1626d..d19d06f620 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchViewsRoutesFixtures.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchViewsRoutesFixtures.scala @@ -21,6 +21,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Authent import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.DoobieScalaTestFixture import ch.epfl.bluebrain.nexus.testkit._ +import ch.epfl.bluebrain.nexus.testkit.ce.CatsIOValues import monix.execution.Scheduler import org.scalatest.matchers.should.Matchers import org.scalatest.{CancelAfterFailure, Inspectors, OptionValues} @@ -35,6 +36,7 @@ class ElasticSearchViewsRoutesFixtures with CirceEq with IOFixedClock with IOValues + with CatsIOValues with OptionValues with TestMatchers with Inspectors diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImpl.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImpl.scala index a002f89ece..bccb6f2b49 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImpl.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImpl.scala @@ -1,6 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.sdk.acls import cats.effect.Clock +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent import ch.epfl.bluebrain.nexus.delta.sdk._ import ch.epfl.bluebrain.nexus.delta.sdk.acls.Acls.entityType @@ -96,13 +97,15 @@ object AclsImpl { type AclsLog = GlobalEventLog[AclAddress, AclState, AclCommand, AclEvent, AclRejection] - def findUnknownRealms(xas: Transactors)(labels: Set[Label]): IO[UnknownRealms, Unit] = - GlobalStateStore.listIds(Realms.entityType, xas.read).compile.toList.hideErrors.flatMap { existing => - val unknown = labels.filterNot { l => - existing.contains(Realms.encodeId(l)) + def findUnknownRealms(xas: Transactors)(labels: Set[Label]): IO[UnknownRealms, Unit] = { + GlobalStateStore.listIds(Realms.entityType, xas.readCE).compile.toList.toBIO + .flatMap { existing => + val unknown = labels.filterNot { l => + existing.contains(Realms.encodeId(l)) + } + IO.raiseWhen(unknown.nonEmpty)(UnknownRealms(unknown)) } - IO.raiseWhen(unknown.nonEmpty)(UnknownRealms(unknown)) - } + } /** * Constructs an [[AclsImpl]] instance. diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/ce/DeltaDirectives.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/ce/DeltaDirectives.scala index 743850583b..3e3a64a71d 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/ce/DeltaDirectives.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/ce/DeltaDirectives.scala @@ -53,6 +53,11 @@ trait DeltaDirectives extends UriDirectives { def emit(status: StatusCode, response: ResponseToMarshaller): Route = response(Some(status)) + /** + * Completes the current Route with the provided conversion to SSEs + */ + def emit(response: ResponseToSse): Route = response() + /** * Completes the current Route with the provided conversion to Json-LD */ diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/directives/ResponseToSse.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/directives/ResponseToSse.scala index a3aa16fc53..64b9b2646a 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/directives/ResponseToSse.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/directives/ResponseToSse.scala @@ -6,16 +6,15 @@ import akka.http.scaladsl.model.sse.ServerSentEvent import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.Route import akka.stream.scaladsl.Source +import cats.effect.{ContextShift, IO} import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering -import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives.emit +import ch.epfl.bluebrain.nexus.delta.sdk.ce.DeltaDirectives.emit import ch.epfl.bluebrain.nexus.delta.sdk.directives.Response.{Complete, Reject} import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.HttpResponseFields import ch.epfl.bluebrain.nexus.delta.sdk.sse.ServerSentEventStream import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter -import monix.bio.{IO, UIO} -import monix.execution.Scheduler import scala.concurrent.duration._ @@ -25,15 +24,15 @@ sealed trait ResponseToSse { object ResponseToSse { - private def apply[E: JsonLdEncoder, A](io: IO[Response[E], ServerSentEventStream])(implicit - s: Scheduler, + private def apply[E: JsonLdEncoder, A](io: IO[Either[Response[E], ServerSentEventStream]])(implicit jo: JsonKeyOrdering, - cr: RemoteContextResolution + cr: RemoteContextResolution, + contextShift: ContextShift[IO] ): ResponseToSse = new ResponseToSse { override def apply(): Route = - onSuccess(io.attempt.runToFuture) { + onSuccess(io.unsafeToFuture()) { case Left(complete: Complete[E]) => emit(complete) case Left(reject: Reject[E]) => emit(reject) case Right(stream) => @@ -47,12 +46,12 @@ object ResponseToSse { } implicit def ioStream[E: JsonLdEncoder: HttpResponseFields]( - io: IO[E, ServerSentEventStream] - )(implicit s: Scheduler, jo: JsonKeyOrdering, cr: RemoteContextResolution): ResponseToSse = - ResponseToSse(io.mapError(Complete(_))) + io: IO[Either[E, ServerSentEventStream]] + )(implicit jo: JsonKeyOrdering, cr: RemoteContextResolution, contextShift: ContextShift[IO]): ResponseToSse = + ResponseToSse(io.map(_.left.map(Complete(_)))) implicit def streamValue( value: ServerSentEventStream - )(implicit s: Scheduler, jo: JsonKeyOrdering, cr: RemoteContextResolution): ResponseToSse = - ResponseToSse(UIO.pure(value)) + )(implicit jo: JsonKeyOrdering, cr: RemoteContextResolution, contextShift: ContextShift[IO]): ResponseToSse = + ResponseToSse(IO.pure(Right(value))) } diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseElemStream.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseElemStream.scala index efbae84aff..176b2ad423 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseElemStream.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseElemStream.scala @@ -1,6 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.sdk.sse import akka.http.scaladsl.model.sse.ServerSentEvent +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.RdfMarshalling.defaultPrinter import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig @@ -10,7 +11,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.query.{RefreshStrategy, SelectFilt import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem} import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{Elem, RemainingElems} import io.circe.syntax.EncoderOps -import monix.bio.UIO +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ trait SseElemStream { @@ -48,7 +49,7 @@ trait SseElemStream { * @param start * the offset to start with */ - def remaining(project: ProjectRef, selectFilter: SelectFilter, start: Offset): UIO[Option[RemainingElems]] + def remaining(project: ProjectRef, selectFilter: SelectFilter, start: Offset): IO[Option[RemainingElems]] } object SseElemStream { @@ -59,7 +60,7 @@ object SseElemStream { def apply(qc: QueryConfig, xas: Transactors): SseElemStream = new SseElemStream { override def continuous(project: ProjectRef, selectFilter: SelectFilter, start: Offset): ServerSentEventStream = - StreamingQuery.elems(project, start, selectFilter, qc, xas).map(toServerSentEvent) + StreamingQuery.elems(project, start, selectFilter, qc, xas).map(toServerSentEvent).translate(taskToIoK) override def currents(project: ProjectRef, selectFilter: SelectFilter, start: Offset): ServerSentEventStream = StreamingQuery @@ -70,14 +71,14 @@ object SseElemStream { qc.copy(refreshStrategy = RefreshStrategy.Stop), xas ) - .map(toServerSentEvent) + .map(toServerSentEvent).translate(taskToIoK) override def remaining( project: ProjectRef, selectFilter: SelectFilter, start: Offset - ): UIO[Option[RemainingElems]] = - StreamingQuery.remaining(project, selectFilter, start, xas) + ): IO[Option[RemainingElems]] = + StreamingQuery.remaining(project, selectFilter, start, xas).toCatsIO } private[sse] def toServerSentEvent(elem: Elem[Unit]): ServerSentEvent = { diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLog.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLog.scala index 8e849ceb12..5fb9421acb 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLog.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLog.scala @@ -1,12 +1,14 @@ package ch.epfl.bluebrain.nexus.delta.sdk.sse import akka.http.scaladsl.model.sse.ServerSentEvent -import ch.epfl.bluebrain.nexus.delta.kernel.cache.KeyValueStore +import cats.effect.IO +import cats.implicits._ +import ch.epfl.bluebrain.nexus.delta.kernel.Logger +import ch.epfl.bluebrain.nexus.delta.kernel.cache.LocalCache +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.taskToIoK import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering import ch.epfl.bluebrain.nexus.delta.sdk.error.ServiceError.UnknownSseLabel import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.RdfMarshalling.defaultPrinter -import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.OrganizationRejection -import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectRejection import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseEncoder.SseData import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ import ch.epfl.bluebrain.nexus.delta.sourcing.event.EventStreaming @@ -14,10 +16,8 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model._ import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset.{At, Start} import ch.epfl.bluebrain.nexus.delta.sourcing.{MultiDecoder, Scope, Transactors} -import com.typesafe.scalalogging.Logger import fs2.Stream import io.circe.syntax.EncoderOps -import monix.bio.{IO, Task, UIO} import java.util.UUID @@ -57,7 +57,7 @@ trait SseEventLog { def stream( org: Label, offset: Offset - ): IO[OrganizationRejection, ServerSentEventStream] + ): IO[ServerSentEventStream] /** * Get stream of server sent events inside an organization @@ -70,7 +70,7 @@ trait SseEventLog { * @param offset * the offset to start from */ - def streamBy(selector: Label, org: Label, offset: Offset): IO[OrganizationRejection, ServerSentEventStream] + def streamBy(selector: Label, org: Label, offset: Offset): IO[ServerSentEventStream] /** * Get stream of server sent events inside an project @@ -83,7 +83,7 @@ trait SseEventLog { def stream( project: ProjectRef, offset: Offset - ): IO[ProjectRejection, ServerSentEventStream] + ): IO[ServerSentEventStream] /** * Get stream of server sent events inside an project @@ -95,7 +95,7 @@ trait SseEventLog { * @param offset * the offset to start from */ - def streamBy(selector: Label, project: ProjectRef, offset: Offset): IO[ProjectRejection, ServerSentEventStream] + def streamBy(selector: Label, project: ProjectRef, offset: Offset): IO[ServerSentEventStream] /** * Return all SSE selectors @@ -110,15 +110,15 @@ trait SseEventLog { object SseEventLog { - private val logger: Logger = Logger[SseEventLog] + private val logger = Logger.cats[SseEventLog] private[sse] def toServerSentEvent( envelope: Envelope[SseData], - fetchUuids: ProjectRef => UIO[Option[(UUID, UUID)]] - )(implicit jo: JsonKeyOrdering): UIO[ServerSentEvent] = { + fetchUuids: ProjectRef => IO[Option[(UUID, UUID)]] + )(implicit jo: JsonKeyOrdering): IO[ServerSentEvent] = { val data = envelope.value.data envelope.value.project - .fold(UIO.pure(data)) { ref => + .fold(IO.pure(data)) { ref => fetchUuids(ref).map { _.fold(data) { case (orgUuid, projUuid) => data.add("_organizationUuid", orgUuid.asJson).add("_projectUuid", projUuid.asJson) @@ -135,13 +135,13 @@ object SseEventLog { def apply( sseEncoders: Set[SseEncoder[_]], - fetchOrg: Label => IO[OrganizationRejection, Unit], - fetchProject: ProjectRef => IO[ProjectRejection, (UUID, UUID)], + fetchOrg: Label => IO[Unit], + fetchProject: ProjectRef => IO[(UUID, UUID)], config: SseConfig, xas: Transactors - )(implicit jo: JsonKeyOrdering): UIO[SseEventLog] = - KeyValueStore - .localLRU[ProjectRef, (UUID, UUID)](config.cache) + )(implicit jo: JsonKeyOrdering): IO[SseEventLog] = + LocalCache + .lru[ProjectRef, (UUID, UUID)](config.cache) .map { cache => new SseEventLog { implicit private val multiDecoder: MultiDecoder[SseData] = @@ -161,9 +161,9 @@ object SseEventLog { private def fetchUuids(ref: ProjectRef) = cache.getOrElseUpdate(ref, fetchProject(ref)).attempt.map(_.toOption) - private def stream(scope: Scope, selector: Option[Label], offset: Offset): Stream[Task, ServerSentEvent] = { + private def stream(scope: Scope, selector: Option[Label], offset: Offset): Stream[IO, ServerSentEvent] = { Stream - .fromEither[Task]( + .fromEither[IO]( selector .map { l => entityTypesBySelector.get(l).toRight(UnknownSseLabel(l)) @@ -178,34 +178,32 @@ object SseEventLog { offset, config.query, xas - ) + ).translate(taskToIoK) .evalMap(toServerSentEvent(_, fetchUuids)) } } - override def stream(offset: Offset): Stream[Task, ServerSentEvent] = stream(Scope.root, None, offset) + override def stream(offset: Offset): Stream[IO, ServerSentEvent] = stream(Scope.root, None, offset) - override def streamBy(selector: Label, offset: Offset): Stream[Task, ServerSentEvent] = + override def streamBy(selector: Label, offset: Offset): Stream[IO, ServerSentEvent] = stream(Scope.root, Some(selector), offset) - override def stream(org: Label, offset: Offset): IO[OrganizationRejection, Stream[Task, ServerSentEvent]] = + override def stream(org: Label, offset: Offset): IO[Stream[IO, ServerSentEvent]] = fetchOrg(org).as(stream(Scope.Org(org), None, offset)) override def streamBy(selector: Label, org: Label, offset: Offset) - : IO[OrganizationRejection, Stream[Task, ServerSentEvent]] = + : IO[Stream[IO, ServerSentEvent]] = fetchOrg(org).as(stream(Scope.Org(org), Some(selector), offset)) override def stream(project: ProjectRef, offset: Offset) - : IO[ProjectRejection, Stream[Task, ServerSentEvent]] = + : IO[Stream[IO, ServerSentEvent]] = fetchProject(project).as(stream(Scope.Project(project), None, offset)) override def streamBy(selector: Label, project: ProjectRef, offset: Offset) - : IO[ProjectRejection, Stream[Task, ServerSentEvent]] = + : IO[Stream[IO, ServerSentEvent]] = fetchProject(project).as(stream(Scope.Project(project), Some(selector), offset)) } } - .tapEval { sseLog => - UIO.delay(logger.info(s"SseLog is configured with selectors: ${sseLog.allSelectors.mkString("'", "','", "'")}")) - } + .flatTap { sseLog => logger.info(s"SseLog is configured with selectors: ${sseLog.allSelectors.mkString("'", "','", "'")}")} } diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/package.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/package.scala index aa2fe7236b..ce82dadf1d 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/package.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/package.scala @@ -1,13 +1,13 @@ package ch.epfl.bluebrain.nexus.delta.sdk import akka.http.scaladsl.model.sse.ServerSentEvent +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label import fs2.Stream -import monix.bio.Task package object sse { - type ServerSentEventStream = Stream[Task, ServerSentEvent] + type ServerSentEventStream = Stream[IO, ServerSentEvent] val resourcesSelector = Label.unsafe("resources") diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/stream/StreamConverter.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/stream/StreamConverter.scala index 0c8c9575fc..971389cc45 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/stream/StreamConverter.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/stream/StreamConverter.scala @@ -4,9 +4,8 @@ import akka.NotUsed import akka.stream._ import akka.stream.scaladsl.{Sink => AkkaSink, Source => AkkaSource, _} import cats.effect._ +import cats.implicits._ import fs2._ -import monix.bio.Task -import monix.execution.Scheduler /** * Converts a fs2 stream to an Akka source Original code from the streamz library from Martin Krasser (published under @@ -15,19 +14,18 @@ import monix.execution.Scheduler */ object StreamConverter { - private def publisherStream[A](publisher: SourceQueueWithComplete[A], stream: Stream[Task, A]): Stream[Task, Unit] = { - def publish(a: A): Task[Option[Unit]] = Task - .fromFuture(publisher.offer(a)) + private def publisherStream[A](publisher: SourceQueueWithComplete[A], stream: Stream[IO, A])(implicit contextShift: ContextShift[IO]): Stream[IO, Unit] = { + def publish(a: A): IO[Option[Unit]] = IO + .fromFuture(IO.delay(publisher.offer(a))) .flatMap { - case QueueOfferResult.Enqueued => Task.some(()) - case QueueOfferResult.Failure(cause) => Task.raiseError[Option[Unit]](cause) - case QueueOfferResult.QueueClosed => Task.none + case QueueOfferResult.Enqueued => IO.pure(Some(())) + case QueueOfferResult.Failure(cause) => IO.raiseError[Option[Unit]](cause) + case QueueOfferResult.QueueClosed => IO.none case QueueOfferResult.Dropped => - Task.raiseError[Option[Unit]]( + IO.raiseError[Option[Unit]]( new IllegalStateException("This should never happen because we use OverflowStrategy.backpressure") ) - } - .onErrorRecover { + }.recover { // This handles a race condition between `interruptWhen` and `publish`. // There's no guarantee that, when the akka sink is terminated, we will observe the // `interruptWhen` termination before calling publish one last time. @@ -35,9 +33,9 @@ object StreamConverter { case _: StreamDetachedException => None } - def watchCompletion: Task[Unit] = Task.fromFuture(publisher.watchCompletion()).void - def fail(e: Throwable): Task[Unit] = Task.delay(publisher.fail(e)) >> watchCompletion - def complete: Task[Unit] = Task.delay(publisher.complete()) >> watchCompletion + def watchCompletion: IO[Unit] = IO.fromFuture(IO.delay(publisher.watchCompletion())).void + def fail(e: Throwable): IO[Unit] = IO.delay(publisher.fail(e)) >> watchCompletion + def complete: IO[Unit] = IO.delay(publisher.complete()) >> watchCompletion stream .interruptWhen(watchCompletion.attempt) @@ -49,12 +47,12 @@ object StreamConverter { } } - def apply[A](stream: Stream[Task, A])(implicit s: Scheduler): Graph[SourceShape[A], NotUsed] = { + def apply[A](stream: Stream[IO, A])(implicit contextShift: ContextShift[IO]): Graph[SourceShape[A], NotUsed] = { val source = AkkaSource.queue[A](0, OverflowStrategy.backpressure) // A sink that runs an FS2 publisherStream when consuming the publisher actor (= materialized value) of source val sink = AkkaSink.foreach[SourceQueueWithComplete[A]] { p => // Fire and forget Future so it runs in the background - publisherStream[A](p, stream).compile.drain.runToFuture + publisherStream[A](p, stream).compile.drain.unsafeToFuture() () } @@ -67,17 +65,17 @@ object StreamConverter { .mapMaterializedValue(_ => NotUsed) } - def apply[A](source: Graph[SourceShape[A], NotUsed])(implicit materializer: Materializer): Stream[Task, A] = + def apply[A](source: Graph[SourceShape[A], NotUsed])(implicit materializer: Materializer, contextShift: ContextShift[IO]): Stream[IO, A] = Stream.force { - Task.delay { + IO.delay { val subscriber = AkkaSource.fromGraph(source).toMat(AkkaSink.queue[A]())(Keep.right).run() subscriberStream[A](subscriber) } } - private def subscriberStream[A](subscriber: SinkQueueWithCancel[A]): Stream[Task, A] = { - val pull = Task.deferFuture(subscriber.pull()) - val cancel = Task.delay(subscriber.cancel()) + private def subscriberStream[A](subscriber: SinkQueueWithCancel[A])(implicit contextShift: ContextShift[IO]): Stream[IO, A] = { + val pull = IO.fromFuture(IO.delay(subscriber.pull())) + val cancel = IO.delay(subscriber.cancel()) Stream.repeatEval(pull).unNoneTerminate.onFinalize(cancel) } diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/syntax/ProjectionErrorsSyntax.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/syntax/ProjectionErrorsSyntax.scala index fd4cd7772b..0ec8e5c754 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/syntax/ProjectionErrorsSyntax.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/syntax/ProjectionErrorsSyntax.scala @@ -2,12 +2,14 @@ package ch.epfl.bluebrain.nexus.delta.sdk.syntax import akka.http.scaladsl.model.sse.ServerSentEvent import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination.FromPagination import ch.epfl.bluebrain.nexus.delta.kernel.search.TimeRange import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi} import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution import ch.epfl.bluebrain.nexus.delta.sdk.model.search.SearchResults +import ch.epfl.bluebrain.nexus.delta.sdk.sse.ServerSentEventStream import ch.epfl.bluebrain.nexus.delta.sdk.syntax.ProjectionErrorsSyntax.ProjectionErrorsOps import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef import ch.epfl.bluebrain.nexus.delta.sourcing.model.FailedElemLogRow.FailedElemData @@ -15,7 +17,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.projections.ProjectionErrors import io.circe.Printer -import monix.bio.{Task, UIO} +import monix.bio.UIO /** * Allows to extend the methods from [[ProjectionErrors]] by adding higher-level methods @@ -47,9 +49,9 @@ object ProjectionErrorsSyntax { */ def sses(projectionProject: ProjectRef, projectionId: Iri, offset: Offset)(implicit rcr: RemoteContextResolution - ): fs2.Stream[Task, ServerSentEvent] = - projectionErrors.failedElemEntries(projectionProject, projectionId, offset).evalMap { felem => - felem.failedElemData.toCompactedJsonLd.map { compactJson => + ): ServerSentEventStream = + projectionErrors.failedElemEntries(projectionProject, projectionId, offset).translate(taskToIoK).evalMap { felem => + felem.failedElemData.toCompactedJsonLd.toCatsIO.map { compactJson => ServerSentEvent( defaultPrinter.print(compactJson.json), "IndexingFailure", diff --git a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLogSuite.scala b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLogSuite.scala index 13984711a5..90805c4a42 100644 --- a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLogSuite.scala +++ b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLogSuite.scala @@ -1,6 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.sdk.sse import akka.http.scaladsl.model.sse.ServerSentEvent +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering import ch.epfl.bluebrain.nexus.delta.sdk.ConfigFixtures @@ -9,14 +10,14 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Envelope, Proje import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.testkit.IOFixedClock import ch.epfl.bluebrain.nexus.testkit.bio.BioSuite +import ch.epfl.bluebrain.nexus.testkit.ce.CatsEffectAssertions import io.circe.JsonObject import io.circe.syntax.EncoderOps -import monix.bio.UIO import java.time.Instant import java.util.UUID -class SseEventLogSuite extends BioSuite with ConfigFixtures with IOFixedClock { +class SseEventLogSuite extends BioSuite with ConfigFixtures with IOFixedClock with CatsEffectAssertions { implicit private val jo: JsonKeyOrdering = JsonKeyOrdering.alphabetical @@ -25,9 +26,9 @@ class SseEventLogSuite extends BioSuite with ConfigFixtures with IOFixedClock { private val orgUuid = UUID.randomUUID() private val projectUuid = UUID.randomUUID() - private def fetchUuids: ProjectRef => UIO[Option[(UUID, UUID)]] = { - case `ref` => UIO.some(orgUuid -> projectUuid) - case _ => UIO.none + private def fetchUuids: ProjectRef => IO[Option[(UUID, UUID)]] = { + case `ref` => IO.pure(Some(orgUuid -> projectUuid)) + case _ => IO.none } private def makeEnvelope(sseData: SseData) = Envelope( @@ -45,7 +46,7 @@ class SseEventLogSuite extends BioSuite with ConfigFixtures with IOFixedClock { ) SseEventLog .toServerSentEvent(envelope, fetchUuids) - .assert(ServerSentEvent("""{"name":"John Doe"}""", "Person", "5")) + .assertEquals(ServerSentEvent("""{"name":"John Doe"}""", "Person", "5")) } test("Should not inject project uuids when the ref is unknown") { @@ -59,7 +60,7 @@ class SseEventLogSuite extends BioSuite with ConfigFixtures with IOFixedClock { ) SseEventLog .toServerSentEvent(envelope, fetchUuids) - .assert(ServerSentEvent("""{"name":"John Doe"}""", "Person", "5")) + .assertEquals(ServerSentEvent("""{"name":"John Doe"}""", "Person", "5")) } test("Should inject project uuids when the ref is unknown") { @@ -73,7 +74,7 @@ class SseEventLogSuite extends BioSuite with ConfigFixtures with IOFixedClock { ) SseEventLog .toServerSentEvent(envelope, fetchUuids) - .assert( + .assertEquals( ServerSentEvent( s"""{"_organizationUuid":"$orgUuid","_projectUuid":"$projectUuid","name":"John Doe"}""", "Person", diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/GlobalEventLog.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/GlobalEventLog.scala index be191dff66..a2f4f48ba9 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/GlobalEventLog.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/GlobalEventLog.scala @@ -14,6 +14,7 @@ import doobie.postgres.sqlstate import fs2.Stream import monix.bio.Cause.{Error, Termination} import monix.bio.{IO, Task, UIO} +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import scala.concurrent.duration.FiniteDuration @@ -150,7 +151,7 @@ object GlobalEventLog { xas: Transactors ): GlobalEventLog[Id, S, Command, E, Rejection] = new GlobalEventLog[Id, S, Command, E, Rejection] { - override def stateOr[R <: Rejection](id: Id, notFound: => R): IO[R, S] = stateStore.get(id).flatMap { + override def stateOr[R <: Rejection](id: Id, notFound: => R): IO[R, S] = stateStore.get(id).toUIO.flatMap { IO.fromOption(_, notFound) } @@ -162,7 +163,7 @@ object GlobalEventLog { } override def evaluate(id: Id, command: Command): IO[Rejection, (E, S)] = - stateStore.get(id).flatMap { current => + stateStore.get(id).toUIO.flatMap { current => stateMachine .evaluate(current, command, maxDuration) .tapEval { case (event, state) => @@ -185,7 +186,7 @@ object GlobalEventLog { } override def dryRun(id: Id, command: Command): IO[Rejection, (E, S)] = - stateStore.get(id).flatMap { current => + stateStore.get(id).toUIO.flatMap { current => stateMachine.evaluate(current, command, maxDuration) } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStore.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStore.scala index 083d3b60df..4d43b772c7 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStore.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStore.scala @@ -1,5 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.state +import cats.effect.IO import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.sourcing.{Serializer, Transactors} @@ -15,7 +16,6 @@ import doobie.postgres.implicits._ import doobie.util.transactor.Transactor import fs2.Stream import io.circe.Decoder -import monix.bio.{Task, UIO} /** * Allow to save and fetch [[GlobalState]] s from the database @@ -35,7 +35,7 @@ trait GlobalStateStore[Id, S <: GlobalState] { /** * Returns the state */ - def get(id: Id): UIO[Option[S]] + def get(id: Id): IO[Option[S]] /** * Fetches states from the given type from the provided offset. @@ -62,7 +62,7 @@ trait GlobalStateStore[Id, S <: GlobalState] { object GlobalStateStore { - def listIds(tpe: EntityType, xa: Transactor[Task]): Stream[Task, Iri] = + def listIds(tpe: EntityType, xa: Transactor[IO]): Stream[IO, Iri] = sql"SELECT id FROM global_states WHERE type = $tpe".query[Iri].stream.transact(xa) def apply[Id, S <: GlobalState]( @@ -116,12 +116,11 @@ object GlobalStateStore { override def delete(id: Id): ConnectionIO[Unit] = sql"""DELETE FROM global_states WHERE type = $tpe AND id = $id""".stripMargin.update.run.void - override def get(id: Id): UIO[Option[S]] = + override def get(id: Id): IO[Option[S]] = sql"""SELECT value FROM global_states WHERE type = $tpe AND id = $id""" .query[S] .option - .transact(xas.read) - .hideErrors + .transact(xas.readCE) private def states(offset: Offset, strategy: RefreshStrategy): EnvelopeStream[S] = StreamingQuery[Envelope[S]]( diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStoreSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStoreSuite.scala index d35f9ef516..16e2eb9d3a 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStoreSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStoreSuite.scala @@ -9,16 +9,16 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, User} import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Envelope, Label} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset -import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshStrategy -import ch.epfl.bluebrain.nexus.testkit.bio.BioSuite import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie +import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshStrategy +import ch.epfl.bluebrain.nexus.testkit.ce.CatsEffectSuite import doobie.implicits._ import munit.AnyFixture import java.time.Instant import scala.concurrent.duration._ -class GlobalStateStoreSuite extends BioSuite with Doobie.Fixture with Doobie.Assertions { +class GlobalStateStoreSuite extends CatsEffectSuite with Doobie.Fixture with Doobie.Assertions { override def munitFixtures: Seq[AnyFixture[_]] = List(doobie) @@ -45,17 +45,17 @@ class GlobalStateStoreSuite extends BioSuite with Doobie.Fixture with Doobie.Ass private val envelope3 = Envelope(Arithmetic.entityType, id1, 2, updatedState1, Instant.EPOCH, Offset.at(3L)) private def assertCount(expected: Int) = - sql"select count(*) from global_states".query[Int].unique.transact(xas.read).assert(expected) + sql"select count(*) from global_states".query[Int].unique.transact(xas.readCE).assertEquals(expected) test("Save state 1 and state 2 successfully") { for { - _ <- List(state1, state2).traverse(store.save).transact(xas.write) + _ <- List(state1, state2).traverse(store.save).transact(xas.writeCE) _ <- assertCount(2) } yield () } test("List ids") { - GlobalStateStore.listIds(Arithmetic.entityType, xas.read).assert(id1, id2) + GlobalStateStore.listIds(Arithmetic.entityType, xas.readCE).assert(List(id1, id2)) } test("get state 1") { @@ -80,7 +80,7 @@ class GlobalStateStoreSuite extends BioSuite with Doobie.Fixture with Doobie.Ass test("Update state 1 successfully") { for { - _ <- store.save(updatedState1).transact(xas.write) + _ <- store.save(updatedState1).transact(xas.writeCE) _ <- assertCount(2) _ <- store.get(id1).assertSome(updatedState1) } yield () @@ -92,7 +92,7 @@ class GlobalStateStoreSuite extends BioSuite with Doobie.Fixture with Doobie.Ass test("Delete state 2 successfully") { for { - _ <- store.delete(id2).transact(xas.write) + _ <- store.delete(id2).transact(xas.writeCE) _ <- assertCount(1) _ <- store.get(id2).assertNone } yield () diff --git a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/bio/StreamAssertions.scala b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/bio/StreamAssertions.scala index 238bdcad73..788f3f6b6d 100644 --- a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/bio/StreamAssertions.scala +++ b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/bio/StreamAssertions.scala @@ -1,5 +1,6 @@ package ch.epfl.bluebrain.nexus.testkit.bio +import cats.effect.{ContextShift, IO, Timer} import fs2.Stream import monix.bio.Task import munit.{Assertions, Location} @@ -23,4 +24,20 @@ trait StreamAssertions { self: Assertions => def assertEmpty: Task[Unit] = assert(List.empty) } + implicit class CEStreamAssertionsOps[A](stream: Stream[IO, A])(implicit loc: Location, contextShift: ContextShift[IO], timer: Timer[IO]) { + def assert(expected: List[A]): IO[Unit] = + stream.take(expected.size.toLong).timeout(3.seconds).mask.compile.toList.map { obtained => + assertEquals(obtained, expected, s"Got ${obtained.size} elements, ${expected.size} elements were expected.") + } + + def assertSize(expected: Int): IO[Unit] = + stream.take(expected.toLong).timeout(3.seconds).mask.compile.toList.map { obtained => + assertEquals(obtained.size, expected, s"Got ${obtained.size} elements, $expected elements were expected.") + } + + def assert(expected: A*): IO[Unit] = assert(expected.toList) + + def assertEmpty: IO[Unit] = assert(List.empty) + } + }