From 5c3ec3d9dce64287bfc2f5c8b7c87e035bcf6a07 Mon Sep 17 00:00:00 2001 From: Daniel Bell Date: Mon, 16 Oct 2023 14:35:23 +0100 Subject: [PATCH] Migrate SSE to cats effect (#4367) * Migrate SSE to cats effect * don't use CIO --- .../nexus/delta/routes/ElemRoutes.scala | 8 +-- .../nexus/delta/routes/EventsRoutes.scala | 13 ++-- .../nexus/delta/wiring/EventsModule.scala | 26 ++++---- .../nexus/delta/routes/ElemRoutesSpec.scala | 11 ++-- .../nexus/delta/routes/EventsRoutesSpec.scala | 15 +++-- .../nexus/delta/kernel/RetryStrategy.scala | 59 ++++++++++++------ .../migration/MigrateEffectSyntax.scala | 9 +++ .../blazegraph/BlazegraphPluginModule.scala | 4 +- .../BlazegraphViewsIndexingRoutes.scala | 12 +++- .../BlazegraphViewsIndexingRoutesSpec.scala | 3 +- .../CompositeViewsPluginModule.scala | 12 +++- .../compositeviews/client/DeltaClient.scala | 14 +++-- .../routes/CompositeViewsIndexingRoutes.scala | 13 ++-- .../client/DeltaClientSpec.scala | 2 + .../routes/CompositeViewsRoutesFixtures.scala | 2 + .../ElasticSearchPluginModule.scala | 4 +- .../routes/ElasticSearchIndexingRoutes.scala | 12 +++- .../ElasticSearchViewsRoutesFixtures.scala | 2 + .../nexus/delta/sdk/acls/AclsImpl.scala | 19 ++++-- .../nexus/delta/sdk/ce/DeltaDirectives.scala | 5 ++ .../delta/sdk/directives/ResponseToSse.scala | 23 ++++--- .../nexus/delta/sdk/sse/SseElemStream.scala | 12 ++-- .../nexus/delta/sdk/sse/SseEventLog.scala | 60 +++++++++---------- .../nexus/delta/sdk/sse/package.scala | 4 +- .../delta/sdk/stream/StreamConverter.scala | 45 +++++++------- .../sdk/syntax/ProjectionErrorsSyntax.scala | 23 +++---- .../delta/sdk/sse/SseEventLogSuite.scala | 17 +++--- .../nexus/delta/sourcing/GlobalEventLog.scala | 7 ++- .../sourcing/state/GlobalStateStore.scala | 11 ++-- .../state/GlobalStateStoreSuite.scala | 16 ++--- .../nexus/testkit/bio/StreamAssertions.scala | 21 +++++++ 31 files changed, 299 insertions(+), 185 deletions(-) diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/ElemRoutes.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/ElemRoutes.scala index 5548d7cfb7..e03b9d5721 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/ElemRoutes.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/ElemRoutes.scala @@ -4,12 +4,13 @@ import akka.http.scaladsl.model.StatusCodes import akka.http.scaladsl.model.StatusCodes.OK import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.Route +import cats.effect.{ContextShift, IO} import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.contexts import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution} import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck -import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives.{emit, lastEventId} +import ch.epfl.bluebrain.nexus.delta.sdk.ce.DeltaDirectives.{emit, lastEventId} import ch.epfl.bluebrain.nexus.delta.sdk.directives.UriDirectives.baseUriPrefix import ch.epfl.bluebrain.nexus.delta.sdk.directives.{AuthDirectives, DeltaSchemeDirectives} import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities @@ -24,7 +25,6 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.stream.RemainingElems import io.circe.syntax.EncoderOps import io.circe.{Encoder, JsonObject} import kamon.instrumentation.akka.http.TracingDirectives.operationName -import monix.execution.Scheduler import java.time.Instant @@ -40,9 +40,9 @@ class ElemRoutes( schemeDirectives: DeltaSchemeDirectives )(implicit baseUri: BaseUri, - s: Scheduler, cr: RemoteContextResolution, - ordering: JsonKeyOrdering + ordering: JsonKeyOrdering, + contextShift: ContextShift[IO] ) extends AuthDirectives(identities, aclCheck: AclCheck) { import baseUri.prefixSegment import schemeDirectives._ diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/EventsRoutes.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/EventsRoutes.scala index 1cbf33843a..f76e5543c9 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/EventsRoutes.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/EventsRoutes.scala @@ -3,6 +3,8 @@ package ch.epfl.bluebrain.nexus.delta.routes import akka.http.scaladsl.model.StatusCodes.OK import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.{Directive1, Route} +import cats.effect.{ContextShift, IO} +import cats.implicits.catsSyntaxApplicativeError import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck @@ -12,11 +14,12 @@ import ch.epfl.bluebrain.nexus.delta.sdk.directives.UriDirectives._ import ch.epfl.bluebrain.nexus.delta.sdk.directives.{AuthDirectives, DeltaSchemeDirectives} import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri +import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.OrganizationRejection import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.events +import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectRejection import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseEventLog import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label import kamon.instrumentation.akka.http.TracingDirectives.operationName -import monix.execution.Scheduler /** * The global events route. @@ -37,7 +40,7 @@ class EventsRoutes( schemeDirectives: DeltaSchemeDirectives )(implicit baseUri: BaseUri, - s: Scheduler, + c: ContextShift[IO], cr: RemoteContextResolution, ordering: JsonKeyOrdering ) extends AuthDirectives(identities, aclCheck: AclCheck) { @@ -100,7 +103,7 @@ class EventsRoutes( operationName(s"$prefixSegment/$selector/{org}/events") { concat( authorizeFor(org, events.read).apply { - emit(sseEventLog.streamBy(selector, org, offset)) + emit(sseEventLog.streamBy(selector, org, offset).attemptNarrow[OrganizationRejection]) }, (head & authorizeFor(org, events.read)) { complete(OK) @@ -114,7 +117,7 @@ class EventsRoutes( concat( operationName(s"$prefixSegment/$selector/{org}/{proj}/events") { authorizeFor(projectRef, events.read).apply { - emit(sseEventLog.streamBy(selector, projectRef, offset)) + emit(sseEventLog.streamBy(selector, projectRef, offset).attemptNarrow[ProjectRejection]) } }, (head & authorizeFor(projectRef, events.read)) { @@ -144,7 +147,7 @@ object EventsRoutes { schemeDirectives: DeltaSchemeDirectives )(implicit baseUri: BaseUri, - s: Scheduler, + c: ContextShift[IO], cr: RemoteContextResolution, ordering: JsonKeyOrdering ): Route = new EventsRoutes(identities, aclCheck, sseEventLog, schemeDirectives).routes diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/EventsModule.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/EventsModule.scala index 0eec5f9fec..c352c1904f 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/EventsModule.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/EventsModule.scala @@ -1,5 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.wiring +import cats.effect.{ContextShift, IO} import ch.epfl.bluebrain.nexus.delta.Main.pluginsMaxPriority import ch.epfl.bluebrain.nexus.delta.config.AppConfig import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ @@ -18,7 +19,6 @@ import ch.epfl.bluebrain.nexus.delta.sdk.sse.{SseElemStream, SseEncoder, SseEven import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import izumi.distage.model.definition.{Id, ModuleDef} -import monix.execution.Scheduler /** * Events wiring @@ -34,15 +34,13 @@ object EventsModule extends ModuleDef { xas: Transactors, jo: JsonKeyOrdering ) => - toCatsIO( - SseEventLog( - sseEncoders, - organizations.fetch(_).void.toBIO[OrganizationRejection], - projects.fetch(_).map { p => (p.value.organizationUuid, p.value.uuid) }, - config.sse, - xas - )(jo) - ) + SseEventLog( + sseEncoders, + organizations.fetch(_).void.toBIO[OrganizationRejection], + projects.fetch(_).map { p => (p.value.organizationUuid, p.value.uuid) }, + config.sse, + xas + )(jo) } make[SseElemStream].from { (qc: QueryConfig, xas: Transactors) => @@ -56,11 +54,11 @@ object EventsModule extends ModuleDef { sseEventLog: SseEventLog, schemeDirectives: DeltaSchemeDirectives, baseUri: BaseUri, - s: Scheduler, + c: ContextShift[IO], cr: RemoteContextResolution @Id("aggregate"), ordering: JsonKeyOrdering ) => - new EventsRoutes(identities, aclCheck, sseEventLog, schemeDirectives)(baseUri, s, cr, ordering) + new EventsRoutes(identities, aclCheck, sseEventLog, schemeDirectives)(baseUri, c, cr, ordering) } many[PriorityRoute].add { (route: EventsRoutes) => @@ -74,11 +72,11 @@ object EventsModule extends ModuleDef { sseElemStream: SseElemStream, schemeDirectives: DeltaSchemeDirectives, baseUri: BaseUri, - s: Scheduler, + contextShift: ContextShift[IO], cr: RemoteContextResolution @Id("aggregate"), ordering: JsonKeyOrdering ) => - new ElemRoutes(identities, aclCheck, sseElemStream, schemeDirectives)(baseUri, s, cr, ordering) + new ElemRoutes(identities, aclCheck, sseElemStream, schemeDirectives)(baseUri, cr, ordering, contextShift) } many[PriorityRoute].add { (route: ElemRoutes) => diff --git a/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/ElemRoutesSpec.scala b/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/ElemRoutesSpec.scala index e09a902401..0131441cc7 100644 --- a/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/ElemRoutesSpec.scala +++ b/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/ElemRoutesSpec.scala @@ -4,6 +4,7 @@ import akka.http.scaladsl.model.headers.{`Last-Event-ID`, OAuth2BearerToken} import akka.http.scaladsl.model.sse.ServerSentEvent import akka.http.scaladsl.model.{MediaTypes, StatusCodes} import akka.http.scaladsl.server.Route +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclSimpleCheck import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaSchemeDirectives @@ -20,13 +21,13 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter import ch.epfl.bluebrain.nexus.delta.sourcing.stream.RemainingElems import ch.epfl.bluebrain.nexus.testkit.CirceLiteral import ch.epfl.bluebrain.nexus.testkit.bio.IOFromMap +import ch.epfl.bluebrain.nexus.testkit.ce.CatsIOValues import fs2.Stream -import monix.bio.{Task, UIO} import java.time.Instant import java.util.UUID -class ElemRoutesSpec extends BaseRouteSpec with CirceLiteral with IOFromMap { +class ElemRoutesSpec extends BaseRouteSpec with CirceLiteral with IOFromMap with CatsIOValues { private val aclCheck = AclSimpleCheck().accepted @@ -45,7 +46,7 @@ class ElemRoutesSpec extends BaseRouteSpec with CirceLiteral with IOFromMap { private val sseElemStream = new SseElemStream { - private val stream = Stream.emits(List(elem1, elem2, elem3)).covary[Task] + private val stream = Stream.emits(List(elem1, elem2, elem3)).covary[IO] override def continuous(project: ProjectRef, selectFilter: SelectFilter, start: Offset): ServerSentEventStream = stream @@ -56,8 +57,8 @@ class ElemRoutesSpec extends BaseRouteSpec with CirceLiteral with IOFromMap { project: ProjectRef, selectFilter: SelectFilter, start: Offset - ): UIO[Option[RemainingElems]] = - UIO.some(RemainingElems(999L, Instant.EPOCH)) + ): IO[Option[RemainingElems]] = + IO.pure(Some(RemainingElems(999L, Instant.EPOCH))) } private val routes = Route.seal( diff --git a/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/EventsRoutesSpec.scala b/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/EventsRoutesSpec.scala index 3ca0a8e34f..dded4654c3 100644 --- a/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/EventsRoutesSpec.scala +++ b/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/EventsRoutesSpec.scala @@ -4,16 +4,15 @@ import akka.http.scaladsl.model.headers.{`Last-Event-ID`, OAuth2BearerToken} import akka.http.scaladsl.model.sse.ServerSentEvent import akka.http.scaladsl.model.{MediaTypes, StatusCodes} import akka.http.scaladsl.server.Route +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclSimpleCheck import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaSchemeDirectives import ch.epfl.bluebrain.nexus.delta.sdk.identities.IdentitiesDummy import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller -import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.OrganizationRejection import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.OrganizationRejection.OrganizationNotFound import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.events import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContextDummy -import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectRejection import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectRejection.ProjectNotFound import ch.epfl.bluebrain.nexus.delta.sdk.sse.{ServerSentEventStream, SseEventLog} import ch.epfl.bluebrain.nexus.delta.sdk.utils.BaseRouteSpec @@ -22,12 +21,12 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset.{At, Start} import ch.epfl.bluebrain.nexus.testkit.bio.IOFromMap +import ch.epfl.bluebrain.nexus.testkit.ce.CatsIOValues import fs2.Stream -import monix.bio.IO import java.util.UUID -class EventsRoutesSpec extends BaseRouteSpec with IOFromMap { +class EventsRoutesSpec extends BaseRouteSpec with IOFromMap with CatsIOValues { private val uuid = UUID.randomUUID() @@ -64,24 +63,24 @@ class EventsRoutesSpec extends BaseRouteSpec with IOFromMap { override def streamBy(selector: Label, offset: Offset): ServerSentEventStream = stream(offset).filter(_.eventType.exists(_.toLowerCase == selector.value)) - override def stream(org: Label, offset: Offset): IO[OrganizationRejection, ServerSentEventStream] = + override def stream(org: Label, offset: Offset): IO[ServerSentEventStream] = IO.raiseWhen(org != projectRef.organization)(OrganizationNotFound(org)).as(stream(offset)) override def streamBy( selector: Label, org: Label, offset: Offset - ): IO[OrganizationRejection, ServerSentEventStream] = + ): IO[ServerSentEventStream] = IO.raiseWhen(org != projectRef.organization)(OrganizationNotFound(org)).as(streamBy(selector, offset)) - override def stream(project: ProjectRef, offset: Offset): IO[ProjectRejection, ServerSentEventStream] = + override def stream(project: ProjectRef, offset: Offset): IO[ServerSentEventStream] = IO.raiseWhen(project != projectRef)(ProjectNotFound(project)).as(stream(offset)) override def streamBy( selector: Label, project: ProjectRef, offset: Offset - ): IO[ProjectRejection, ServerSentEventStream] = + ): IO[ServerSentEventStream] = IO.raiseWhen(project != projectRef)(ProjectNotFound(project)).as(streamBy(selector, offset)) override def allSelectors: Set[Label] = Set(acl, project, resources) diff --git a/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/RetryStrategy.scala b/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/RetryStrategy.scala index 5065cfc122..5fe9b652b2 100644 --- a/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/RetryStrategy.scala +++ b/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/RetryStrategy.scala @@ -1,7 +1,9 @@ package ch.epfl.bluebrain.nexus.delta.kernel +import cats.effect.IO import com.typesafe.scalalogging.{Logger => ScalaLoggingLogger} -import monix.bio.{IO, UIO} +import monix.bio.{IO => BIO, UIO} +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toMonixBIOOps import pureconfig.ConfigReader import pureconfig.error.{CannotConvert, ConfigReaderFailures, ConvertFailure} import pureconfig.generic.semiauto._ @@ -25,9 +27,9 @@ import scala.util.control.NonFatal final case class RetryStrategy[E]( config: RetryStrategyConfig, retryWhen: E => Boolean, - onError: (E, RetryDetails) => IO[E, Unit] + onError: (E, RetryDetails) => BIO[E, Unit] ) { - val policy: RetryPolicy[IO[E, *]] = config.toPolicy[E] + val policy: RetryPolicy[BIO[E, *]] = config.toPolicy[E] } object RetryStrategy { @@ -35,7 +37,7 @@ object RetryStrategy { /** * Apply the provided strategy on the given io */ - def use[E, A](io: IO[E, A], retryStrategy: RetryStrategy[E]): IO[E, A] = + def use[E, A](io: BIO[E, A], retryStrategy: RetryStrategy[E]): BIO[E, A] = io.retryingOnSomeErrors( retryStrategy.retryWhen, retryStrategy.policy, @@ -45,7 +47,7 @@ object RetryStrategy { /** * Log errors when retrying */ - def logError[E](logger: ScalaLoggingLogger, action: String): (E, RetryDetails) => IO[E, Unit] = { + def logError[E](logger: ScalaLoggingLogger, action: String): (E, RetryDetails) => BIO[E, Unit] = { case (err, WillDelayAndRetry(nextDelay, retriesSoFar, _)) => val message = s"""Error $err while $action: retrying in ${nextDelay.toMillis}ms (retries so far: $retriesSoFar)""" UIO.delay(logger.warn(message)) @@ -57,7 +59,19 @@ object RetryStrategy { /** * Log errors when retrying */ - def logError[E](logger: Logger, action: String): (E, RetryDetails) => IO[E, Unit] = { + def logError[E](logger: org.typelevel.log4cats.Logger[IO], action: String): (E, RetryDetails) => IO[Unit] = { + case (err, WillDelayAndRetry(nextDelay, retriesSoFar, _)) => + val message = s"""Error $err while $action: retrying in ${nextDelay.toMillis}ms (retries so far: $retriesSoFar)""" + logger.warn(message) + case (err, GivingUp(totalRetries, _)) => + val message = s"""Error $err while $action, giving up (total retries: $totalRetries)""" + logger.error(message) + } + + /** + * Log errors when retrying + */ + def logError[E](logger: Logger, action: String): (E, RetryDetails) => BIO[E, Unit] = { case (err, WillDelayAndRetry(nextDelay, retriesSoFar, _)) => val message = s"""Error $err while $action: retrying in ${nextDelay.toMillis}ms (retries so far: $retriesSoFar)""" logger.warn(message) @@ -71,7 +85,7 @@ object RetryStrategy { * @param onError * what action to perform on error */ - def alwaysGiveUp[E](onError: (E, RetryDetails) => IO[E, Unit]): RetryStrategy[E] = + def alwaysGiveUp[E](onError: (E, RetryDetails) => BIO[E, Unit]): RetryStrategy[E] = RetryStrategy(RetryStrategyConfig.AlwaysGiveUp, _ => false, onError) /** @@ -89,7 +103,7 @@ object RetryStrategy { constant: FiniteDuration, maxRetries: Int, retryWhen: E => Boolean, - onError: (E, RetryDetails) => IO[E, Unit] + onError: (E, RetryDetails) => BIO[E, Unit] ): RetryStrategy[E] = RetryStrategy( RetryStrategyConfig.ConstantStrategyConfig(constant, maxRetries), @@ -125,13 +139,24 @@ object RetryStrategy { (t: Throwable, d: RetryDetails) => logError(logger, action)(t, d) ) + def retryOnNonFatal( + config: RetryStrategyConfig, + logger: org.typelevel.log4cats.Logger[IO], + action: String + ): RetryStrategy[Throwable] = + RetryStrategy( + config, + (t: Throwable) => NonFatal(t), + (t: Throwable, d: RetryDetails) => logError(logger, action)(t, d).toBIO[Throwable] + ) + } /** * Configuration for a [[RetryStrategy]] */ sealed trait RetryStrategyConfig extends Product with Serializable { - def toPolicy[E]: RetryPolicy[IO[E, *]] + def toPolicy[E]: RetryPolicy[BIO[E, *]] } @@ -141,7 +166,7 @@ object RetryStrategyConfig { * Fails without retry */ case object AlwaysGiveUp extends RetryStrategyConfig { - override def toPolicy[E]: RetryPolicy[IO[E, *]] = alwaysGiveUp[IO[E, *]] + override def toPolicy[E]: RetryPolicy[BIO[E, *]] = alwaysGiveUp[BIO[E, *]] } /** @@ -152,8 +177,8 @@ object RetryStrategyConfig { * the maximum number of retries */ final case class ConstantStrategyConfig(delay: FiniteDuration, maxRetries: Int) extends RetryStrategyConfig { - override def toPolicy[E]: RetryPolicy[IO[E, *]] = - constantDelay[IO[E, *]](delay) join limitRetries(maxRetries) + override def toPolicy[E]: RetryPolicy[BIO[E, *]] = + constantDelay[BIO[E, *]](delay) join limitRetries(maxRetries) } /** @@ -162,8 +187,8 @@ object RetryStrategyConfig { * the interval before the retry will be attempted */ final case class OnceStrategyConfig(delay: FiniteDuration) extends RetryStrategyConfig { - override def toPolicy[E]: RetryPolicy[IO[E, *]] = - constantDelay[IO[E, *]](delay) join limitRetries(1) + override def toPolicy[E]: RetryPolicy[BIO[E, *]] = + constantDelay[BIO[E, *]](delay) join limitRetries(1) } /** @@ -177,8 +202,8 @@ object RetryStrategyConfig { */ final case class ExponentialStrategyConfig(initialDelay: FiniteDuration, maxDelay: FiniteDuration, maxRetries: Int) extends RetryStrategyConfig { - override def toPolicy[E]: RetryPolicy[IO[E, *]] = - capDelay[IO[E, *]](maxDelay, fullJitter(initialDelay)) join limitRetries(maxRetries) + override def toPolicy[E]: RetryPolicy[BIO[E, *]] = + capDelay[BIO[E, *]](maxDelay, fullJitter(initialDelay)) join limitRetries(maxRetries) } /** @@ -190,7 +215,7 @@ object RetryStrategyConfig { */ final case class MaximumCumulativeDelayConfig(threshold: FiniteDuration, delay: FiniteDuration) extends RetryStrategyConfig { - override def toPolicy[E]: RetryPolicy[IO[E, *]] = + override def toPolicy[E]: RetryPolicy[BIO[E, *]] = RetryPolicies.limitRetriesByCumulativeDelay( threshold, RetryPolicies.constantDelay(delay) diff --git a/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/effect/migration/MigrateEffectSyntax.scala b/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/effect/migration/MigrateEffectSyntax.scala index 3420488176..c1c4d6d25a 100644 --- a/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/effect/migration/MigrateEffectSyntax.scala +++ b/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/effect/migration/MigrateEffectSyntax.scala @@ -10,11 +10,16 @@ import scala.reflect.ClassTag trait MigrateEffectSyntax { implicit def toCatsIO[E <: Throwable, A](io: BIO[E, A]): IO[A] = io.to[IO] + implicit def uioToCatsIO[E <: Throwable, A](io: UIO[A]): IO[A] = io.to[IO] implicit def toCatsIOOps[E <: Throwable, A](io: BIO[E, A]): MonixBioToCatsIOOps[E, A] = new MonixBioToCatsIOOps(io) + implicit def toCatsIOEitherOps[E, A](io: BIO[E, A]): MonixBioToCatsIOEitherOps[E, A] = new MonixBioToCatsIOEitherOps( + io + ) implicit def toMonixBIOOps[A](io: IO[A]): CatsIOToBioOps[A] = new CatsIOToBioOps(io) val taskToIoK: Task ~> IO = λ[Task ~> IO](toCatsIO(_)) + val uioToIoK: UIO ~> IO = λ[UIO ~> IO](uioToCatsIO(_)) val ioToUioK: IO ~> UIO = λ[IO ~> UIO](_.toUIO) val ioToTaskK: IO ~> Task = λ[IO ~> Task](Task.from(_)) @@ -24,6 +29,10 @@ final class MonixBioToCatsIOOps[E <: Throwable, A](private val io: BIO[E, A]) ex def toCatsIO: IO[A] = io.to[IO] } +final class MonixBioToCatsIOEitherOps[E, A](private val io: BIO[E, A]) extends AnyVal { + def toCatsIOEither: IO[Either[E, A]] = io.attempt.to[IO] +} + final class CatsIOToBioOps[A](private val io: IO[A]) extends AnyVal { def toBIO[E <: Throwable](implicit E: ClassTag[E]): BIO[E, A] = BIO.from(io).mapErrorPartialWith { diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphPluginModule.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphPluginModule.scala index 8ef5fcea0d..742cdd937c 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphPluginModule.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphPluginModule.scala @@ -1,7 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph import akka.actor.typed.ActorSystem -import cats.effect.Clock +import cats.effect.{Clock, ContextShift, IO} import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.config.BlazegraphViewsConfig @@ -227,6 +227,7 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef { baseUri: BaseUri, cfg: BlazegraphViewsConfig, s: Scheduler, + c: ContextShift[IO], cr: RemoteContextResolution @Id("aggregate"), ordering: JsonKeyOrdering ) => @@ -240,6 +241,7 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef { )( baseUri, s, + c, cr, ordering, cfg.pagination diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutes.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutes.scala index e489b1d164..307f3b5e74 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutes.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutes.scala @@ -2,6 +2,9 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.routes import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.Route +import cats.effect.{ContextShift, IO} +import cats.implicits.catsSyntaxApplicativeError +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.IndexingViewDef.ActiveViewDef import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewRejection._ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model._ @@ -29,7 +32,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectionErrors, Pro import io.circe.Encoder import io.circe.generic.semiauto.deriveEncoder import io.circe.syntax._ -import monix.bio.IO +import monix.bio.{IO => BIO} import monix.execution.Scheduler class BlazegraphViewsIndexingRoutes( fetch: FetchIndexingView, @@ -41,6 +44,7 @@ class BlazegraphViewsIndexingRoutes( )(implicit baseUri: BaseUri, s: Scheduler, + c: ContextShift[IO], cr: RemoteContextResolution, ordering: JsonKeyOrdering, pc: PaginationConfig @@ -80,10 +84,11 @@ class BlazegraphViewsIndexingRoutes( concat( (pathPrefix("sse") & lastEventId) { offset => emit( - fetch(id, ref) + fetch(id, ref).toCatsIO .map { view => projectionErrors.sses(view.ref.project, view.ref.viewId, offset) } + .attemptNarrow[BlazegraphViewRejection] ) }, (fromPaginated & timeRange("instant") & extractUri & pathEndOrSingleSlash) { @@ -132,7 +137,7 @@ class BlazegraphViewsIndexingRoutes( object BlazegraphViewsIndexingRoutes { - type FetchIndexingView = (IdSegment, ProjectRef) => IO[BlazegraphViewRejection, ActiveViewDef] + type FetchIndexingView = (IdSegment, ProjectRef) => BIO[BlazegraphViewRejection, ActiveViewDef] /** * @return @@ -148,6 +153,7 @@ object BlazegraphViewsIndexingRoutes { )(implicit baseUri: BaseUri, s: Scheduler, + c: ContextShift[IO], cr: RemoteContextResolution, ordering: JsonKeyOrdering, pc: PaginationConfig diff --git a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutesSpec.scala b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutesSpec.scala index 0e642f44ba..b4ebbe05bc 100644 --- a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutesSpec.scala +++ b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutesSpec.scala @@ -23,12 +23,13 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.FailedElem import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionProgress import ch.epfl.bluebrain.nexus.testkit.bio.IOFromMap +import ch.epfl.bluebrain.nexus.testkit.ce.CatsIOValues import monix.bio.IO import java.time.Instant import scala.concurrent.duration._ -class BlazegraphViewsIndexingRoutesSpec extends BlazegraphViewRoutesFixtures with IOFromMap { +class BlazegraphViewsIndexingRoutesSpec extends BlazegraphViewRoutesFixtures with IOFromMap with CatsIOValues { private lazy val projections = Projections(xas, queryConfig, 1.hour) private lazy val projectionErrors = ProjectionErrors(xas, queryConfig) diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsPluginModule.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsPluginModule.scala index e761582993..9d90fc8295 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsPluginModule.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsPluginModule.scala @@ -1,7 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews import akka.actor.typed.ActorSystem -import cats.effect.Clock +import cats.effect.{Clock, ContextShift, IO} import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient @@ -57,10 +57,15 @@ class CompositeViewsPluginModule(priority: Int) extends ModuleDef { cfg: CompositeViewsConfig, as: ActorSystem[Nothing], sc: Scheduler, + c: ContextShift[IO], authTokenProvider: AuthTokenProvider ) => val httpClient = HttpClient()(cfg.remoteSourceClient.http, as.classicSystem, sc) - DeltaClient(httpClient, authTokenProvider, cfg.remoteSourceCredentials, cfg.remoteSourceClient.retryDelay)(as, sc) + DeltaClient(httpClient, authTokenProvider, cfg.remoteSourceCredentials, cfg.remoteSourceClient.retryDelay)( + as, + sc, + c + ) } make[BlazegraphClient].named("blazegraph-composite-indexing-client").from { @@ -315,6 +320,7 @@ class CompositeViewsPluginModule(priority: Int) extends ModuleDef { baseUri: BaseUri, config: CompositeViewsConfig, s: Scheduler, + c: ContextShift[IO], cr: RemoteContextResolution @Id("aggregate"), ordering: JsonKeyOrdering ) => @@ -327,7 +333,7 @@ class CompositeViewsPluginModule(priority: Int) extends ModuleDef { projections, projectionErrors, schemeDirectives - )(baseUri, config.pagination, s, cr, ordering) + )(baseUri, config.pagination, s, c, cr, ordering) } make[CompositeView.Shift].from { (views: CompositeViews, base: BaseUri) => diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClient.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClient.scala index 27fbd59de6..4d660ace9e 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClient.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClient.scala @@ -7,6 +7,7 @@ import akka.http.scaladsl.model.Uri.Query import akka.http.scaladsl.model.headers.{`Last-Event-ID`, Accept} import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCodes} import akka.stream.alpakka.sse.scaladsl.EventSource +import cats.effect.{ContextShift, IO} import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.MigrateEffectSyntax import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewSource.RemoteProjectSource import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.stream.CompositeBranch @@ -27,7 +28,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{Elem, RemainingElems} import com.typesafe.scalalogging.Logger import io.circe.parser.decode import fs2._ -import monix.bio.{IO, UIO} +import monix.bio.{IO => BIO, UIO} import monix.execution.Scheduler import scala.concurrent.Future @@ -87,7 +88,8 @@ object DeltaClient { retryDelay: FiniteDuration )(implicit as: ActorSystem[Nothing], - scheduler: Scheduler + scheduler: Scheduler, + c: ContextShift[IO] ) extends DeltaClient with MigrateEffectSyntax { @@ -119,7 +121,7 @@ object DeltaClient { for { authToken <- authTokenProvider(credentials).toBIO result <- client(Head(elemAddress(source)).withCredentials(authToken)) { - case resp if resp.status.isSuccess() => UIO.delay(resp.discardEntityBytes()) >> IO.unit + case resp if resp.status.isSuccess() => UIO.delay(resp.discardEntityBytes()) >> BIO.unit } } yield result } @@ -133,7 +135,7 @@ object DeltaClient { def send(request: HttpRequest): Future[HttpResponse] = { (for { authToken <- authTokenProvider(credentials).toBIO - result <- client[HttpResponse](request.withCredentials(authToken))(IO.pure(_)) + result <- client[HttpResponse](request.withCredentials(authToken))(BIO.pure(_)) } yield result).runToFuture } @@ -151,6 +153,7 @@ object DeltaClient { Stream.empty } } + .translate(ioToTaskK) } private def typeQuery(types: Set[Iri]) = @@ -187,7 +190,8 @@ object DeltaClient { retryDelay: FiniteDuration )(implicit as: ActorSystem[Nothing], - sc: Scheduler + sc: Scheduler, + c: ContextShift[IO] ): DeltaClient = new DeltaClientImpl(client, authTokenProvider, credentials, retryDelay) } diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsIndexingRoutes.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsIndexingRoutes.scala index 6b1488f3b4..ca093ce5a3 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsIndexingRoutes.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsIndexingRoutes.scala @@ -2,6 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.routes import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.Route +import cats.effect.{ContextShift, IO} import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.routes.BlazegraphViewsDirectives import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef.ActiveViewDef @@ -29,7 +30,8 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject import ch.epfl.bluebrain.nexus.delta.sourcing.model.{FailedElemLogRow, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.projections.ProjectionErrors -import monix.bio.IO +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ +import monix.bio.{IO => BIO} import monix.execution.Scheduler class CompositeViewsIndexingRoutes( identities: Identities, @@ -44,6 +46,7 @@ class CompositeViewsIndexingRoutes( baseUri: BaseUri, paginationConfig: PaginationConfig, s: Scheduler, + c: ContextShift[IO], cr: RemoteContextResolution, ordering: JsonKeyOrdering ) extends AuthDirectives(identities, aclCheck) @@ -98,10 +101,11 @@ class CompositeViewsIndexingRoutes( concat( (pathPrefix("sse") & lastEventId) { offset => emit( - fetchView(id, ref) + fetchView(id, ref).toCatsIO .map { view => projectionErrors.sses(view.project, view.id, offset) } + .attemptNarrow[CompositeViewRejection] ) }, (fromPaginated & timeRange("instant") & extractUri & pathEndOrSingleSlash) { @@ -233,12 +237,12 @@ class CompositeViewsIndexingRoutes( private def fetchProjection(view: ActiveViewDef, projectionId: IdSegment) = expandId(projectionId, view.project).flatMap { id => - IO.fromEither(view.projection(id)) + BIO.fromEither(view.projection(id)) } private def fetchSource(view: ActiveViewDef, sourceId: IdSegment) = expandId(sourceId, view.project).flatMap { id => - IO.fromEither(view.source(id)) + BIO.fromEither(view.source(id)) } } @@ -262,6 +266,7 @@ object CompositeViewsIndexingRoutes { baseUri: BaseUri, paginationConfig: PaginationConfig, s: Scheduler, + c: ContextShift[IO], cr: RemoteContextResolution, ordering: JsonKeyOrdering ): Route = diff --git a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClientSpec.scala b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClientSpec.scala index 2403661675..a9e522576f 100644 --- a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClientSpec.scala +++ b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClientSpec.scala @@ -26,6 +26,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.SuccessElem import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{Elem, RemainingElems} +import ch.epfl.bluebrain.nexus.testkit.ce.CatsIOValues import ch.epfl.bluebrain.nexus.testkit.{IOValues, TestHelpers} import io.circe.syntax.EncoderOps import monix.execution.Scheduler @@ -45,6 +46,7 @@ class DeltaClientSpec with ScalaFutures with OptionValues with IOValues + with CatsIOValues with ConfigFixtures with BeforeAndAfterAll with TestHelpers diff --git a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsRoutesFixtures.scala b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsRoutesFixtures.scala index ce1e47dd25..21a411c459 100644 --- a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsRoutesFixtures.scala +++ b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsRoutesFixtures.scala @@ -14,6 +14,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.utils.RouteHelpers import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Authenticated, Group, User} import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.DoobieScalaTestFixture +import ch.epfl.bluebrain.nexus.testkit.ce.CatsIOValues import ch.epfl.bluebrain.nexus.testkit.{CirceEq, CirceLiteral, IOFixedClock, TestMatchers} import org.scalatest.{CancelAfterFailure, Inspectors, OptionValues} import org.scalatest.matchers.should.Matchers @@ -25,6 +26,7 @@ trait CompositeViewsRoutesFixtures with CirceLiteral with CirceEq with IOFixedClock + with CatsIOValues with OptionValues with TestMatchers with Inspectors diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchPluginModule.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchPluginModule.scala index 5eec1f42fe..3904adacc6 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchPluginModule.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchPluginModule.scala @@ -1,7 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch import akka.actor.typed.ActorSystem -import cats.effect.Clock +import cats.effect.{Clock, ContextShift, IO} import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient @@ -245,6 +245,7 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef { schemeDirectives: DeltaSchemeDirectives, baseUri: BaseUri, s: Scheduler, + c: ContextShift[IO], cr: RemoteContextResolution @Id("aggregate"), esConfig: ElasticSearchViewsConfig, ordering: JsonKeyOrdering, @@ -262,6 +263,7 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef { baseUri, esConfig.pagination, s, + c, cr, ordering ) diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchIndexingRoutes.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchIndexingRoutes.scala index 35fd383944..159f0354ed 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchIndexingRoutes.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchIndexingRoutes.scala @@ -2,6 +2,9 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.routes import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server._ +import cats.effect.{ContextShift, IO} +import cats.implicits.catsSyntaxApplicativeError +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchViewsQuery import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.IndexingViewDef.ActiveViewDef import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewRejection._ @@ -31,7 +34,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectionErrors, Pro import io.circe.Encoder import io.circe.generic.semiauto.deriveEncoder import io.circe.syntax._ -import monix.bio.IO +import monix.bio.{IO => BIO} import monix.execution.Scheduler /** @@ -62,6 +65,7 @@ final class ElasticSearchIndexingRoutes( baseUri: BaseUri, paginationConfig: PaginationConfig, s: Scheduler, + c: ContextShift[IO], cr: RemoteContextResolution, ordering: JsonKeyOrdering ) extends AuthDirectives(identities, aclCheck) @@ -99,10 +103,11 @@ final class ElasticSearchIndexingRoutes( concat( (pathPrefix("sse") & lastEventId) { offset => emit( - fetch(id, ref) + fetch(id, ref).toCatsIO .map { view => projectionErrors.sses(view.ref.project, view.ref.viewId, offset) } + .attemptNarrow[ElasticSearchViewRejection] ) }, (fromPaginated & timeRange("instant") & extractUri & pathEndOrSingleSlash) { @@ -156,7 +161,7 @@ final class ElasticSearchIndexingRoutes( object ElasticSearchIndexingRoutes { - type FetchIndexingView = (IdSegment, ProjectRef) => IO[ElasticSearchViewRejection, ActiveViewDef] + type FetchIndexingView = (IdSegment, ProjectRef) => BIO[ElasticSearchViewRejection, ActiveViewDef] /** * @return @@ -174,6 +179,7 @@ object ElasticSearchIndexingRoutes { baseUri: BaseUri, paginationConfig: PaginationConfig, s: Scheduler, + c: ContextShift[IO], cr: RemoteContextResolution, ordering: JsonKeyOrdering ): Route = diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchViewsRoutesFixtures.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchViewsRoutesFixtures.scala index bb74d1626d..d19d06f620 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchViewsRoutesFixtures.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchViewsRoutesFixtures.scala @@ -21,6 +21,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Authent import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.DoobieScalaTestFixture import ch.epfl.bluebrain.nexus.testkit._ +import ch.epfl.bluebrain.nexus.testkit.ce.CatsIOValues import monix.execution.Scheduler import org.scalatest.matchers.should.Matchers import org.scalatest.{CancelAfterFailure, Inspectors, OptionValues} @@ -35,6 +36,7 @@ class ElasticSearchViewsRoutesFixtures with CirceEq with IOFixedClock with IOValues + with CatsIOValues with OptionValues with TestMatchers with Inspectors diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImpl.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImpl.scala index a002f89ece..e511ff8913 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImpl.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImpl.scala @@ -1,6 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.sdk.acls import cats.effect.Clock +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent import ch.epfl.bluebrain.nexus.delta.sdk._ import ch.epfl.bluebrain.nexus.delta.sdk.acls.Acls.entityType @@ -96,13 +97,19 @@ object AclsImpl { type AclsLog = GlobalEventLog[AclAddress, AclState, AclCommand, AclEvent, AclRejection] - def findUnknownRealms(xas: Transactors)(labels: Set[Label]): IO[UnknownRealms, Unit] = - GlobalStateStore.listIds(Realms.entityType, xas.read).compile.toList.hideErrors.flatMap { existing => - val unknown = labels.filterNot { l => - existing.contains(Realms.encodeId(l)) + def findUnknownRealms(xas: Transactors)(labels: Set[Label]): IO[UnknownRealms, Unit] = { + GlobalStateStore + .listIds(Realms.entityType, xas.readCE) + .compile + .toList + .toBIO + .flatMap { existing => + val unknown = labels.filterNot { l => + existing.contains(Realms.encodeId(l)) + } + IO.raiseWhen(unknown.nonEmpty)(UnknownRealms(unknown)) } - IO.raiseWhen(unknown.nonEmpty)(UnknownRealms(unknown)) - } + } /** * Constructs an [[AclsImpl]] instance. diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/ce/DeltaDirectives.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/ce/DeltaDirectives.scala index 743850583b..893b768340 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/ce/DeltaDirectives.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/ce/DeltaDirectives.scala @@ -53,6 +53,11 @@ trait DeltaDirectives extends UriDirectives { def emit(status: StatusCode, response: ResponseToMarshaller): Route = response(Some(status)) + /** + * Completes the current Route with the provided conversion to SSEs + */ + def emit(response: ResponseToSse): Route = response() + /** * Completes the current Route with the provided conversion to Json-LD */ diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/directives/ResponseToSse.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/directives/ResponseToSse.scala index a3aa16fc53..64b9b2646a 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/directives/ResponseToSse.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/directives/ResponseToSse.scala @@ -6,16 +6,15 @@ import akka.http.scaladsl.model.sse.ServerSentEvent import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.Route import akka.stream.scaladsl.Source +import cats.effect.{ContextShift, IO} import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering -import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives.emit +import ch.epfl.bluebrain.nexus.delta.sdk.ce.DeltaDirectives.emit import ch.epfl.bluebrain.nexus.delta.sdk.directives.Response.{Complete, Reject} import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.HttpResponseFields import ch.epfl.bluebrain.nexus.delta.sdk.sse.ServerSentEventStream import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter -import monix.bio.{IO, UIO} -import monix.execution.Scheduler import scala.concurrent.duration._ @@ -25,15 +24,15 @@ sealed trait ResponseToSse { object ResponseToSse { - private def apply[E: JsonLdEncoder, A](io: IO[Response[E], ServerSentEventStream])(implicit - s: Scheduler, + private def apply[E: JsonLdEncoder, A](io: IO[Either[Response[E], ServerSentEventStream]])(implicit jo: JsonKeyOrdering, - cr: RemoteContextResolution + cr: RemoteContextResolution, + contextShift: ContextShift[IO] ): ResponseToSse = new ResponseToSse { override def apply(): Route = - onSuccess(io.attempt.runToFuture) { + onSuccess(io.unsafeToFuture()) { case Left(complete: Complete[E]) => emit(complete) case Left(reject: Reject[E]) => emit(reject) case Right(stream) => @@ -47,12 +46,12 @@ object ResponseToSse { } implicit def ioStream[E: JsonLdEncoder: HttpResponseFields]( - io: IO[E, ServerSentEventStream] - )(implicit s: Scheduler, jo: JsonKeyOrdering, cr: RemoteContextResolution): ResponseToSse = - ResponseToSse(io.mapError(Complete(_))) + io: IO[Either[E, ServerSentEventStream]] + )(implicit jo: JsonKeyOrdering, cr: RemoteContextResolution, contextShift: ContextShift[IO]): ResponseToSse = + ResponseToSse(io.map(_.left.map(Complete(_)))) implicit def streamValue( value: ServerSentEventStream - )(implicit s: Scheduler, jo: JsonKeyOrdering, cr: RemoteContextResolution): ResponseToSse = - ResponseToSse(UIO.pure(value)) + )(implicit jo: JsonKeyOrdering, cr: RemoteContextResolution, contextShift: ContextShift[IO]): ResponseToSse = + ResponseToSse(IO.pure(Right(value))) } diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseElemStream.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseElemStream.scala index efbae84aff..0e2f7bfdb3 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseElemStream.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseElemStream.scala @@ -1,6 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.sdk.sse import akka.http.scaladsl.model.sse.ServerSentEvent +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.RdfMarshalling.defaultPrinter import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig @@ -10,7 +11,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.query.{RefreshStrategy, SelectFilt import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem} import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{Elem, RemainingElems} import io.circe.syntax.EncoderOps -import monix.bio.UIO +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ trait SseElemStream { @@ -48,7 +49,7 @@ trait SseElemStream { * @param start * the offset to start with */ - def remaining(project: ProjectRef, selectFilter: SelectFilter, start: Offset): UIO[Option[RemainingElems]] + def remaining(project: ProjectRef, selectFilter: SelectFilter, start: Offset): IO[Option[RemainingElems]] } object SseElemStream { @@ -59,7 +60,7 @@ object SseElemStream { def apply(qc: QueryConfig, xas: Transactors): SseElemStream = new SseElemStream { override def continuous(project: ProjectRef, selectFilter: SelectFilter, start: Offset): ServerSentEventStream = - StreamingQuery.elems(project, start, selectFilter, qc, xas).map(toServerSentEvent) + StreamingQuery.elems(project, start, selectFilter, qc, xas).map(toServerSentEvent).translate(taskToIoK) override def currents(project: ProjectRef, selectFilter: SelectFilter, start: Offset): ServerSentEventStream = StreamingQuery @@ -71,13 +72,14 @@ object SseElemStream { xas ) .map(toServerSentEvent) + .translate(taskToIoK) override def remaining( project: ProjectRef, selectFilter: SelectFilter, start: Offset - ): UIO[Option[RemainingElems]] = - StreamingQuery.remaining(project, selectFilter, start, xas) + ): IO[Option[RemainingElems]] = + StreamingQuery.remaining(project, selectFilter, start, xas).toCatsIO } private[sse] def toServerSentEvent(elem: Elem[Unit]): ServerSentEvent = { diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLog.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLog.scala index 8e849ceb12..79f5b2794e 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLog.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLog.scala @@ -1,12 +1,14 @@ package ch.epfl.bluebrain.nexus.delta.sdk.sse import akka.http.scaladsl.model.sse.ServerSentEvent -import ch.epfl.bluebrain.nexus.delta.kernel.cache.KeyValueStore +import cats.effect.IO +import cats.implicits._ +import ch.epfl.bluebrain.nexus.delta.kernel.Logger +import ch.epfl.bluebrain.nexus.delta.kernel.cache.LocalCache +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.taskToIoK import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering import ch.epfl.bluebrain.nexus.delta.sdk.error.ServiceError.UnknownSseLabel import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.RdfMarshalling.defaultPrinter -import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.OrganizationRejection -import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectRejection import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseEncoder.SseData import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ import ch.epfl.bluebrain.nexus.delta.sourcing.event.EventStreaming @@ -14,10 +16,8 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model._ import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset.{At, Start} import ch.epfl.bluebrain.nexus.delta.sourcing.{MultiDecoder, Scope, Transactors} -import com.typesafe.scalalogging.Logger import fs2.Stream import io.circe.syntax.EncoderOps -import monix.bio.{IO, Task, UIO} import java.util.UUID @@ -57,7 +57,7 @@ trait SseEventLog { def stream( org: Label, offset: Offset - ): IO[OrganizationRejection, ServerSentEventStream] + ): IO[ServerSentEventStream] /** * Get stream of server sent events inside an organization @@ -70,7 +70,7 @@ trait SseEventLog { * @param offset * the offset to start from */ - def streamBy(selector: Label, org: Label, offset: Offset): IO[OrganizationRejection, ServerSentEventStream] + def streamBy(selector: Label, org: Label, offset: Offset): IO[ServerSentEventStream] /** * Get stream of server sent events inside an project @@ -83,7 +83,7 @@ trait SseEventLog { def stream( project: ProjectRef, offset: Offset - ): IO[ProjectRejection, ServerSentEventStream] + ): IO[ServerSentEventStream] /** * Get stream of server sent events inside an project @@ -95,7 +95,7 @@ trait SseEventLog { * @param offset * the offset to start from */ - def streamBy(selector: Label, project: ProjectRef, offset: Offset): IO[ProjectRejection, ServerSentEventStream] + def streamBy(selector: Label, project: ProjectRef, offset: Offset): IO[ServerSentEventStream] /** * Return all SSE selectors @@ -110,15 +110,15 @@ trait SseEventLog { object SseEventLog { - private val logger: Logger = Logger[SseEventLog] + private val logger = Logger.cats[SseEventLog] private[sse] def toServerSentEvent( envelope: Envelope[SseData], - fetchUuids: ProjectRef => UIO[Option[(UUID, UUID)]] - )(implicit jo: JsonKeyOrdering): UIO[ServerSentEvent] = { + fetchUuids: ProjectRef => IO[Option[(UUID, UUID)]] + )(implicit jo: JsonKeyOrdering): IO[ServerSentEvent] = { val data = envelope.value.data envelope.value.project - .fold(UIO.pure(data)) { ref => + .fold(IO.pure(data)) { ref => fetchUuids(ref).map { _.fold(data) { case (orgUuid, projUuid) => data.add("_organizationUuid", orgUuid.asJson).add("_projectUuid", projUuid.asJson) @@ -135,13 +135,13 @@ object SseEventLog { def apply( sseEncoders: Set[SseEncoder[_]], - fetchOrg: Label => IO[OrganizationRejection, Unit], - fetchProject: ProjectRef => IO[ProjectRejection, (UUID, UUID)], + fetchOrg: Label => IO[Unit], + fetchProject: ProjectRef => IO[(UUID, UUID)], config: SseConfig, xas: Transactors - )(implicit jo: JsonKeyOrdering): UIO[SseEventLog] = - KeyValueStore - .localLRU[ProjectRef, (UUID, UUID)](config.cache) + )(implicit jo: JsonKeyOrdering): IO[SseEventLog] = + LocalCache + .lru[ProjectRef, (UUID, UUID)](config.cache) .map { cache => new SseEventLog { implicit private val multiDecoder: MultiDecoder[SseData] = @@ -161,9 +161,9 @@ object SseEventLog { private def fetchUuids(ref: ProjectRef) = cache.getOrElseUpdate(ref, fetchProject(ref)).attempt.map(_.toOption) - private def stream(scope: Scope, selector: Option[Label], offset: Offset): Stream[Task, ServerSentEvent] = { + private def stream(scope: Scope, selector: Option[Label], offset: Offset): Stream[IO, ServerSentEvent] = { Stream - .fromEither[Task]( + .fromEither[IO]( selector .map { l => entityTypesBySelector.get(l).toRight(UnknownSseLabel(l)) @@ -179,33 +179,31 @@ object SseEventLog { config.query, xas ) + .translate(taskToIoK) .evalMap(toServerSentEvent(_, fetchUuids)) } } - override def stream(offset: Offset): Stream[Task, ServerSentEvent] = stream(Scope.root, None, offset) + override def stream(offset: Offset): Stream[IO, ServerSentEvent] = stream(Scope.root, None, offset) - override def streamBy(selector: Label, offset: Offset): Stream[Task, ServerSentEvent] = + override def streamBy(selector: Label, offset: Offset): Stream[IO, ServerSentEvent] = stream(Scope.root, Some(selector), offset) - override def stream(org: Label, offset: Offset): IO[OrganizationRejection, Stream[Task, ServerSentEvent]] = + override def stream(org: Label, offset: Offset): IO[Stream[IO, ServerSentEvent]] = fetchOrg(org).as(stream(Scope.Org(org), None, offset)) - override def streamBy(selector: Label, org: Label, offset: Offset) - : IO[OrganizationRejection, Stream[Task, ServerSentEvent]] = + override def streamBy(selector: Label, org: Label, offset: Offset): IO[Stream[IO, ServerSentEvent]] = fetchOrg(org).as(stream(Scope.Org(org), Some(selector), offset)) - override def stream(project: ProjectRef, offset: Offset) - : IO[ProjectRejection, Stream[Task, ServerSentEvent]] = + override def stream(project: ProjectRef, offset: Offset): IO[Stream[IO, ServerSentEvent]] = fetchProject(project).as(stream(Scope.Project(project), None, offset)) - override def streamBy(selector: Label, project: ProjectRef, offset: Offset) - : IO[ProjectRejection, Stream[Task, ServerSentEvent]] = + override def streamBy(selector: Label, project: ProjectRef, offset: Offset): IO[Stream[IO, ServerSentEvent]] = fetchProject(project).as(stream(Scope.Project(project), Some(selector), offset)) } } - .tapEval { sseLog => - UIO.delay(logger.info(s"SseLog is configured with selectors: ${sseLog.allSelectors.mkString("'", "','", "'")}")) + .flatTap { sseLog => + logger.info(s"SseLog is configured with selectors: ${sseLog.allSelectors.mkString("'", "','", "'")}") } } diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/package.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/package.scala index aa2fe7236b..ce82dadf1d 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/package.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/package.scala @@ -1,13 +1,13 @@ package ch.epfl.bluebrain.nexus.delta.sdk import akka.http.scaladsl.model.sse.ServerSentEvent +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label import fs2.Stream -import monix.bio.Task package object sse { - type ServerSentEventStream = Stream[Task, ServerSentEvent] + type ServerSentEventStream = Stream[IO, ServerSentEvent] val resourcesSelector = Label.unsafe("resources") diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/stream/StreamConverter.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/stream/StreamConverter.scala index 0c8c9575fc..9d0c5dfafd 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/stream/StreamConverter.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/stream/StreamConverter.scala @@ -4,9 +4,8 @@ import akka.NotUsed import akka.stream._ import akka.stream.scaladsl.{Sink => AkkaSink, Source => AkkaSource, _} import cats.effect._ +import cats.implicits._ import fs2._ -import monix.bio.Task -import monix.execution.Scheduler /** * Converts a fs2 stream to an Akka source Original code from the streamz library from Martin Krasser (published under @@ -15,19 +14,21 @@ import monix.execution.Scheduler */ object StreamConverter { - private def publisherStream[A](publisher: SourceQueueWithComplete[A], stream: Stream[Task, A]): Stream[Task, Unit] = { - def publish(a: A): Task[Option[Unit]] = Task - .fromFuture(publisher.offer(a)) + private def publisherStream[A](publisher: SourceQueueWithComplete[A], stream: Stream[IO, A])(implicit + contextShift: ContextShift[IO] + ): Stream[IO, Unit] = { + def publish(a: A): IO[Option[Unit]] = IO + .fromFuture(IO.delay(publisher.offer(a))) .flatMap { - case QueueOfferResult.Enqueued => Task.some(()) - case QueueOfferResult.Failure(cause) => Task.raiseError[Option[Unit]](cause) - case QueueOfferResult.QueueClosed => Task.none + case QueueOfferResult.Enqueued => IO.pure(Some(())) + case QueueOfferResult.Failure(cause) => IO.raiseError[Option[Unit]](cause) + case QueueOfferResult.QueueClosed => IO.none case QueueOfferResult.Dropped => - Task.raiseError[Option[Unit]]( + IO.raiseError[Option[Unit]]( new IllegalStateException("This should never happen because we use OverflowStrategy.backpressure") ) } - .onErrorRecover { + .recover { // This handles a race condition between `interruptWhen` and `publish`. // There's no guarantee that, when the akka sink is terminated, we will observe the // `interruptWhen` termination before calling publish one last time. @@ -35,9 +36,9 @@ object StreamConverter { case _: StreamDetachedException => None } - def watchCompletion: Task[Unit] = Task.fromFuture(publisher.watchCompletion()).void - def fail(e: Throwable): Task[Unit] = Task.delay(publisher.fail(e)) >> watchCompletion - def complete: Task[Unit] = Task.delay(publisher.complete()) >> watchCompletion + def watchCompletion: IO[Unit] = IO.fromFuture(IO.delay(publisher.watchCompletion())).void + def fail(e: Throwable): IO[Unit] = IO.delay(publisher.fail(e)) >> watchCompletion + def complete: IO[Unit] = IO.delay(publisher.complete()) >> watchCompletion stream .interruptWhen(watchCompletion.attempt) @@ -49,12 +50,12 @@ object StreamConverter { } } - def apply[A](stream: Stream[Task, A])(implicit s: Scheduler): Graph[SourceShape[A], NotUsed] = { + def apply[A](stream: Stream[IO, A])(implicit contextShift: ContextShift[IO]): Graph[SourceShape[A], NotUsed] = { val source = AkkaSource.queue[A](0, OverflowStrategy.backpressure) // A sink that runs an FS2 publisherStream when consuming the publisher actor (= materialized value) of source val sink = AkkaSink.foreach[SourceQueueWithComplete[A]] { p => // Fire and forget Future so it runs in the background - publisherStream[A](p, stream).compile.drain.runToFuture + publisherStream[A](p, stream).compile.drain.unsafeToFuture() () } @@ -67,17 +68,21 @@ object StreamConverter { .mapMaterializedValue(_ => NotUsed) } - def apply[A](source: Graph[SourceShape[A], NotUsed])(implicit materializer: Materializer): Stream[Task, A] = + def apply[A]( + source: Graph[SourceShape[A], NotUsed] + )(implicit materializer: Materializer, contextShift: ContextShift[IO]): Stream[IO, A] = Stream.force { - Task.delay { + IO.delay { val subscriber = AkkaSource.fromGraph(source).toMat(AkkaSink.queue[A]())(Keep.right).run() subscriberStream[A](subscriber) } } - private def subscriberStream[A](subscriber: SinkQueueWithCancel[A]): Stream[Task, A] = { - val pull = Task.deferFuture(subscriber.pull()) - val cancel = Task.delay(subscriber.cancel()) + private def subscriberStream[A]( + subscriber: SinkQueueWithCancel[A] + )(implicit contextShift: ContextShift[IO]): Stream[IO, A] = { + val pull = IO.fromFuture(IO.delay(subscriber.pull())) + val cancel = IO.delay(subscriber.cancel()) Stream.repeatEval(pull).unNoneTerminate.onFinalize(cancel) } diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/syntax/ProjectionErrorsSyntax.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/syntax/ProjectionErrorsSyntax.scala index fd4cd7772b..0019bb11b7 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/syntax/ProjectionErrorsSyntax.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/syntax/ProjectionErrorsSyntax.scala @@ -2,12 +2,14 @@ package ch.epfl.bluebrain.nexus.delta.sdk.syntax import akka.http.scaladsl.model.sse.ServerSentEvent import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination.FromPagination import ch.epfl.bluebrain.nexus.delta.kernel.search.TimeRange import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi} import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution import ch.epfl.bluebrain.nexus.delta.sdk.model.search.SearchResults +import ch.epfl.bluebrain.nexus.delta.sdk.sse.ServerSentEventStream import ch.epfl.bluebrain.nexus.delta.sdk.syntax.ProjectionErrorsSyntax.ProjectionErrorsOps import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef import ch.epfl.bluebrain.nexus.delta.sourcing.model.FailedElemLogRow.FailedElemData @@ -15,7 +17,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.projections.ProjectionErrors import io.circe.Printer -import monix.bio.{Task, UIO} +import monix.bio.UIO /** * Allows to extend the methods from [[ProjectionErrors]] by adding higher-level methods @@ -47,15 +49,16 @@ object ProjectionErrorsSyntax { */ def sses(projectionProject: ProjectRef, projectionId: Iri, offset: Offset)(implicit rcr: RemoteContextResolution - ): fs2.Stream[Task, ServerSentEvent] = - projectionErrors.failedElemEntries(projectionProject, projectionId, offset).evalMap { felem => - felem.failedElemData.toCompactedJsonLd.map { compactJson => - ServerSentEvent( - defaultPrinter.print(compactJson.json), - "IndexingFailure", - felem.ordering.value.toString - ) - } + ): ServerSentEventStream = + projectionErrors.failedElemEntries(projectionProject, projectionId, offset).translate(taskToIoK).evalMap { + felem => + felem.failedElemData.toCompactedJsonLd.toCatsIO.map { compactJson => + ServerSentEvent( + defaultPrinter.print(compactJson.json), + "IndexingFailure", + felem.ordering.value.toString + ) + } } /** diff --git a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLogSuite.scala b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLogSuite.scala index 13984711a5..90805c4a42 100644 --- a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLogSuite.scala +++ b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLogSuite.scala @@ -1,6 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.sdk.sse import akka.http.scaladsl.model.sse.ServerSentEvent +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering import ch.epfl.bluebrain.nexus.delta.sdk.ConfigFixtures @@ -9,14 +10,14 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Envelope, Proje import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.testkit.IOFixedClock import ch.epfl.bluebrain.nexus.testkit.bio.BioSuite +import ch.epfl.bluebrain.nexus.testkit.ce.CatsEffectAssertions import io.circe.JsonObject import io.circe.syntax.EncoderOps -import monix.bio.UIO import java.time.Instant import java.util.UUID -class SseEventLogSuite extends BioSuite with ConfigFixtures with IOFixedClock { +class SseEventLogSuite extends BioSuite with ConfigFixtures with IOFixedClock with CatsEffectAssertions { implicit private val jo: JsonKeyOrdering = JsonKeyOrdering.alphabetical @@ -25,9 +26,9 @@ class SseEventLogSuite extends BioSuite with ConfigFixtures with IOFixedClock { private val orgUuid = UUID.randomUUID() private val projectUuid = UUID.randomUUID() - private def fetchUuids: ProjectRef => UIO[Option[(UUID, UUID)]] = { - case `ref` => UIO.some(orgUuid -> projectUuid) - case _ => UIO.none + private def fetchUuids: ProjectRef => IO[Option[(UUID, UUID)]] = { + case `ref` => IO.pure(Some(orgUuid -> projectUuid)) + case _ => IO.none } private def makeEnvelope(sseData: SseData) = Envelope( @@ -45,7 +46,7 @@ class SseEventLogSuite extends BioSuite with ConfigFixtures with IOFixedClock { ) SseEventLog .toServerSentEvent(envelope, fetchUuids) - .assert(ServerSentEvent("""{"name":"John Doe"}""", "Person", "5")) + .assertEquals(ServerSentEvent("""{"name":"John Doe"}""", "Person", "5")) } test("Should not inject project uuids when the ref is unknown") { @@ -59,7 +60,7 @@ class SseEventLogSuite extends BioSuite with ConfigFixtures with IOFixedClock { ) SseEventLog .toServerSentEvent(envelope, fetchUuids) - .assert(ServerSentEvent("""{"name":"John Doe"}""", "Person", "5")) + .assertEquals(ServerSentEvent("""{"name":"John Doe"}""", "Person", "5")) } test("Should inject project uuids when the ref is unknown") { @@ -73,7 +74,7 @@ class SseEventLogSuite extends BioSuite with ConfigFixtures with IOFixedClock { ) SseEventLog .toServerSentEvent(envelope, fetchUuids) - .assert( + .assertEquals( ServerSentEvent( s"""{"_organizationUuid":"$orgUuid","_projectUuid":"$projectUuid","name":"John Doe"}""", "Person", diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/GlobalEventLog.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/GlobalEventLog.scala index be191dff66..a2f4f48ba9 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/GlobalEventLog.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/GlobalEventLog.scala @@ -14,6 +14,7 @@ import doobie.postgres.sqlstate import fs2.Stream import monix.bio.Cause.{Error, Termination} import monix.bio.{IO, Task, UIO} +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import scala.concurrent.duration.FiniteDuration @@ -150,7 +151,7 @@ object GlobalEventLog { xas: Transactors ): GlobalEventLog[Id, S, Command, E, Rejection] = new GlobalEventLog[Id, S, Command, E, Rejection] { - override def stateOr[R <: Rejection](id: Id, notFound: => R): IO[R, S] = stateStore.get(id).flatMap { + override def stateOr[R <: Rejection](id: Id, notFound: => R): IO[R, S] = stateStore.get(id).toUIO.flatMap { IO.fromOption(_, notFound) } @@ -162,7 +163,7 @@ object GlobalEventLog { } override def evaluate(id: Id, command: Command): IO[Rejection, (E, S)] = - stateStore.get(id).flatMap { current => + stateStore.get(id).toUIO.flatMap { current => stateMachine .evaluate(current, command, maxDuration) .tapEval { case (event, state) => @@ -185,7 +186,7 @@ object GlobalEventLog { } override def dryRun(id: Id, command: Command): IO[Rejection, (E, S)] = - stateStore.get(id).flatMap { current => + stateStore.get(id).toUIO.flatMap { current => stateMachine.evaluate(current, command, maxDuration) } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStore.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStore.scala index 083d3b60df..4d43b772c7 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStore.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStore.scala @@ -1,5 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.state +import cats.effect.IO import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.sourcing.{Serializer, Transactors} @@ -15,7 +16,6 @@ import doobie.postgres.implicits._ import doobie.util.transactor.Transactor import fs2.Stream import io.circe.Decoder -import monix.bio.{Task, UIO} /** * Allow to save and fetch [[GlobalState]] s from the database @@ -35,7 +35,7 @@ trait GlobalStateStore[Id, S <: GlobalState] { /** * Returns the state */ - def get(id: Id): UIO[Option[S]] + def get(id: Id): IO[Option[S]] /** * Fetches states from the given type from the provided offset. @@ -62,7 +62,7 @@ trait GlobalStateStore[Id, S <: GlobalState] { object GlobalStateStore { - def listIds(tpe: EntityType, xa: Transactor[Task]): Stream[Task, Iri] = + def listIds(tpe: EntityType, xa: Transactor[IO]): Stream[IO, Iri] = sql"SELECT id FROM global_states WHERE type = $tpe".query[Iri].stream.transact(xa) def apply[Id, S <: GlobalState]( @@ -116,12 +116,11 @@ object GlobalStateStore { override def delete(id: Id): ConnectionIO[Unit] = sql"""DELETE FROM global_states WHERE type = $tpe AND id = $id""".stripMargin.update.run.void - override def get(id: Id): UIO[Option[S]] = + override def get(id: Id): IO[Option[S]] = sql"""SELECT value FROM global_states WHERE type = $tpe AND id = $id""" .query[S] .option - .transact(xas.read) - .hideErrors + .transact(xas.readCE) private def states(offset: Offset, strategy: RefreshStrategy): EnvelopeStream[S] = StreamingQuery[Envelope[S]]( diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStoreSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStoreSuite.scala index d35f9ef516..16e2eb9d3a 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStoreSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStoreSuite.scala @@ -9,16 +9,16 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, User} import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Envelope, Label} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset -import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshStrategy -import ch.epfl.bluebrain.nexus.testkit.bio.BioSuite import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie +import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshStrategy +import ch.epfl.bluebrain.nexus.testkit.ce.CatsEffectSuite import doobie.implicits._ import munit.AnyFixture import java.time.Instant import scala.concurrent.duration._ -class GlobalStateStoreSuite extends BioSuite with Doobie.Fixture with Doobie.Assertions { +class GlobalStateStoreSuite extends CatsEffectSuite with Doobie.Fixture with Doobie.Assertions { override def munitFixtures: Seq[AnyFixture[_]] = List(doobie) @@ -45,17 +45,17 @@ class GlobalStateStoreSuite extends BioSuite with Doobie.Fixture with Doobie.Ass private val envelope3 = Envelope(Arithmetic.entityType, id1, 2, updatedState1, Instant.EPOCH, Offset.at(3L)) private def assertCount(expected: Int) = - sql"select count(*) from global_states".query[Int].unique.transact(xas.read).assert(expected) + sql"select count(*) from global_states".query[Int].unique.transact(xas.readCE).assertEquals(expected) test("Save state 1 and state 2 successfully") { for { - _ <- List(state1, state2).traverse(store.save).transact(xas.write) + _ <- List(state1, state2).traverse(store.save).transact(xas.writeCE) _ <- assertCount(2) } yield () } test("List ids") { - GlobalStateStore.listIds(Arithmetic.entityType, xas.read).assert(id1, id2) + GlobalStateStore.listIds(Arithmetic.entityType, xas.readCE).assert(List(id1, id2)) } test("get state 1") { @@ -80,7 +80,7 @@ class GlobalStateStoreSuite extends BioSuite with Doobie.Fixture with Doobie.Ass test("Update state 1 successfully") { for { - _ <- store.save(updatedState1).transact(xas.write) + _ <- store.save(updatedState1).transact(xas.writeCE) _ <- assertCount(2) _ <- store.get(id1).assertSome(updatedState1) } yield () @@ -92,7 +92,7 @@ class GlobalStateStoreSuite extends BioSuite with Doobie.Fixture with Doobie.Ass test("Delete state 2 successfully") { for { - _ <- store.delete(id2).transact(xas.write) + _ <- store.delete(id2).transact(xas.writeCE) _ <- assertCount(1) _ <- store.get(id2).assertNone } yield () diff --git a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/bio/StreamAssertions.scala b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/bio/StreamAssertions.scala index 238bdcad73..eba9949a19 100644 --- a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/bio/StreamAssertions.scala +++ b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/bio/StreamAssertions.scala @@ -1,5 +1,6 @@ package ch.epfl.bluebrain.nexus.testkit.bio +import cats.effect.{ContextShift, IO, Timer} import fs2.Stream import monix.bio.Task import munit.{Assertions, Location} @@ -23,4 +24,24 @@ trait StreamAssertions { self: Assertions => def assertEmpty: Task[Unit] = assert(List.empty) } + implicit class CEStreamAssertionsOps[A](stream: Stream[IO, A])(implicit + loc: Location, + contextShift: ContextShift[IO], + timer: Timer[IO] + ) { + def assert(expected: List[A]): IO[Unit] = + stream.take(expected.size.toLong).timeout(3.seconds).mask.compile.toList.map { obtained => + assertEquals(obtained, expected, s"Got ${obtained.size} elements, ${expected.size} elements were expected.") + } + + def assertSize(expected: Int): IO[Unit] = + stream.take(expected.toLong).timeout(3.seconds).mask.compile.toList.map { obtained => + assertEquals(obtained.size, expected, s"Got ${obtained.size} elements, $expected elements were expected.") + } + + def assert(expected: A*): IO[Unit] = assert(expected.toList) + + def assertEmpty: IO[Unit] = assert(List.empty) + } + }