Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate SSE to cats effect #4367

Merged
merged 2 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.StatusCodes.OK
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import cats.effect.{ContextShift, IO}
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.contexts
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives.{emit, lastEventId}
import ch.epfl.bluebrain.nexus.delta.sdk.ce.DeltaDirectives.{emit, lastEventId}
import ch.epfl.bluebrain.nexus.delta.sdk.directives.UriDirectives.baseUriPrefix
import ch.epfl.bluebrain.nexus.delta.sdk.directives.{AuthDirectives, DeltaSchemeDirectives}
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
Expand All @@ -24,7 +25,6 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.stream.RemainingElems
import io.circe.syntax.EncoderOps
import io.circe.{Encoder, JsonObject}
import kamon.instrumentation.akka.http.TracingDirectives.operationName
import monix.execution.Scheduler

import java.time.Instant

Expand All @@ -40,9 +40,9 @@ class ElemRoutes(
schemeDirectives: DeltaSchemeDirectives
)(implicit
baseUri: BaseUri,
s: Scheduler,
cr: RemoteContextResolution,
ordering: JsonKeyOrdering
ordering: JsonKeyOrdering,
contextShift: ContextShift[IO]
) extends AuthDirectives(identities, aclCheck: AclCheck) {
import baseUri.prefixSegment
import schemeDirectives._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package ch.epfl.bluebrain.nexus.delta.routes
import akka.http.scaladsl.model.StatusCodes.OK
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.{Directive1, Route}
import cats.effect.{ContextShift, IO}
import cats.implicits.catsSyntaxApplicativeError
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
Expand All @@ -12,11 +14,12 @@ import ch.epfl.bluebrain.nexus.delta.sdk.directives.UriDirectives._
import ch.epfl.bluebrain.nexus.delta.sdk.directives.{AuthDirectives, DeltaSchemeDirectives}
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.OrganizationRejection
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.events
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectRejection
import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseEventLog
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label
import kamon.instrumentation.akka.http.TracingDirectives.operationName
import monix.execution.Scheduler

/**
* The global events route.
Expand All @@ -37,7 +40,7 @@ class EventsRoutes(
schemeDirectives: DeltaSchemeDirectives
)(implicit
baseUri: BaseUri,
s: Scheduler,
c: ContextShift[IO],
cr: RemoteContextResolution,
ordering: JsonKeyOrdering
) extends AuthDirectives(identities, aclCheck: AclCheck) {
Expand Down Expand Up @@ -100,7 +103,7 @@ class EventsRoutes(
operationName(s"$prefixSegment/$selector/{org}/events") {
concat(
authorizeFor(org, events.read).apply {
emit(sseEventLog.streamBy(selector, org, offset))
emit(sseEventLog.streamBy(selector, org, offset).attemptNarrow[OrganizationRejection])
},
(head & authorizeFor(org, events.read)) {
complete(OK)
Expand All @@ -114,7 +117,7 @@ class EventsRoutes(
concat(
operationName(s"$prefixSegment/$selector/{org}/{proj}/events") {
authorizeFor(projectRef, events.read).apply {
emit(sseEventLog.streamBy(selector, projectRef, offset))
emit(sseEventLog.streamBy(selector, projectRef, offset).attemptNarrow[ProjectRejection])
}
},
(head & authorizeFor(projectRef, events.read)) {
Expand Down Expand Up @@ -144,7 +147,7 @@ object EventsRoutes {
schemeDirectives: DeltaSchemeDirectives
)(implicit
baseUri: BaseUri,
s: Scheduler,
c: ContextShift[IO],
cr: RemoteContextResolution,
ordering: JsonKeyOrdering
): Route = new EventsRoutes(identities, aclCheck, sseEventLog, schemeDirectives).routes
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ch.epfl.bluebrain.nexus.delta.wiring

import cats.effect.{ContextShift, IO}
import ch.epfl.bluebrain.nexus.delta.Main.pluginsMaxPriority
import ch.epfl.bluebrain.nexus.delta.config.AppConfig
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
Expand All @@ -18,7 +19,6 @@ import ch.epfl.bluebrain.nexus.delta.sdk.sse.{SseElemStream, SseEncoder, SseEven
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig
import izumi.distage.model.definition.{Id, ModuleDef}
import monix.execution.Scheduler

/**
* Events wiring
Expand All @@ -34,15 +34,13 @@ object EventsModule extends ModuleDef {
xas: Transactors,
jo: JsonKeyOrdering
) =>
toCatsIO(
SseEventLog(
sseEncoders,
organizations.fetch(_).void.toBIO[OrganizationRejection],
projects.fetch(_).map { p => (p.value.organizationUuid, p.value.uuid) },
config.sse,
xas
)(jo)
)
SseEventLog(
sseEncoders,
organizations.fetch(_).void.toBIO[OrganizationRejection],
projects.fetch(_).map { p => (p.value.organizationUuid, p.value.uuid) },
config.sse,
xas
)(jo)
}

make[SseElemStream].from { (qc: QueryConfig, xas: Transactors) =>
Expand All @@ -56,11 +54,11 @@ object EventsModule extends ModuleDef {
sseEventLog: SseEventLog,
schemeDirectives: DeltaSchemeDirectives,
baseUri: BaseUri,
s: Scheduler,
c: ContextShift[IO],
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering
) =>
new EventsRoutes(identities, aclCheck, sseEventLog, schemeDirectives)(baseUri, s, cr, ordering)
new EventsRoutes(identities, aclCheck, sseEventLog, schemeDirectives)(baseUri, c, cr, ordering)
}

many[PriorityRoute].add { (route: EventsRoutes) =>
Expand All @@ -74,11 +72,11 @@ object EventsModule extends ModuleDef {
sseElemStream: SseElemStream,
schemeDirectives: DeltaSchemeDirectives,
baseUri: BaseUri,
s: Scheduler,
contextShift: ContextShift[IO],
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering
) =>
new ElemRoutes(identities, aclCheck, sseElemStream, schemeDirectives)(baseUri, s, cr, ordering)
new ElemRoutes(identities, aclCheck, sseElemStream, schemeDirectives)(baseUri, cr, ordering, contextShift)
}

many[PriorityRoute].add { (route: ElemRoutes) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import akka.http.scaladsl.model.headers.{`Last-Event-ID`, OAuth2BearerToken}
import akka.http.scaladsl.model.sse.ServerSentEvent
import akka.http.scaladsl.model.{MediaTypes, StatusCodes}
import akka.http.scaladsl.server.Route
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclSimpleCheck
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaSchemeDirectives
Expand All @@ -20,13 +21,13 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.RemainingElems
import ch.epfl.bluebrain.nexus.testkit.CirceLiteral
import ch.epfl.bluebrain.nexus.testkit.bio.IOFromMap
import ch.epfl.bluebrain.nexus.testkit.ce.CatsIOValues
import fs2.Stream
import monix.bio.{Task, UIO}

import java.time.Instant
import java.util.UUID

class ElemRoutesSpec extends BaseRouteSpec with CirceLiteral with IOFromMap {
class ElemRoutesSpec extends BaseRouteSpec with CirceLiteral with IOFromMap with CatsIOValues {

private val aclCheck = AclSimpleCheck().accepted

Expand All @@ -45,7 +46,7 @@ class ElemRoutesSpec extends BaseRouteSpec with CirceLiteral with IOFromMap {

private val sseElemStream = new SseElemStream {

private val stream = Stream.emits(List(elem1, elem2, elem3)).covary[Task]
private val stream = Stream.emits(List(elem1, elem2, elem3)).covary[IO]

override def continuous(project: ProjectRef, selectFilter: SelectFilter, start: Offset): ServerSentEventStream =
stream
Expand All @@ -56,8 +57,8 @@ class ElemRoutesSpec extends BaseRouteSpec with CirceLiteral with IOFromMap {
project: ProjectRef,
selectFilter: SelectFilter,
start: Offset
): UIO[Option[RemainingElems]] =
UIO.some(RemainingElems(999L, Instant.EPOCH))
): IO[Option[RemainingElems]] =
IO.pure(Some(RemainingElems(999L, Instant.EPOCH)))
}

private val routes = Route.seal(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ import akka.http.scaladsl.model.headers.{`Last-Event-ID`, OAuth2BearerToken}
import akka.http.scaladsl.model.sse.ServerSentEvent
import akka.http.scaladsl.model.{MediaTypes, StatusCodes}
import akka.http.scaladsl.server.Route
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclSimpleCheck
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaSchemeDirectives
import ch.epfl.bluebrain.nexus.delta.sdk.identities.IdentitiesDummy
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.OrganizationRejection
import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.OrganizationRejection.OrganizationNotFound
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.events
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContextDummy
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectRejection
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectRejection.ProjectNotFound
import ch.epfl.bluebrain.nexus.delta.sdk.sse.{ServerSentEventStream, SseEventLog}
import ch.epfl.bluebrain.nexus.delta.sdk.utils.BaseRouteSpec
Expand All @@ -22,12 +21,12 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset.{At, Start}
import ch.epfl.bluebrain.nexus.testkit.bio.IOFromMap
import ch.epfl.bluebrain.nexus.testkit.ce.CatsIOValues
import fs2.Stream
import monix.bio.IO

import java.util.UUID

class EventsRoutesSpec extends BaseRouteSpec with IOFromMap {
class EventsRoutesSpec extends BaseRouteSpec with IOFromMap with CatsIOValues {

private val uuid = UUID.randomUUID()

Expand Down Expand Up @@ -64,24 +63,24 @@ class EventsRoutesSpec extends BaseRouteSpec with IOFromMap {
override def streamBy(selector: Label, offset: Offset): ServerSentEventStream =
stream(offset).filter(_.eventType.exists(_.toLowerCase == selector.value))

override def stream(org: Label, offset: Offset): IO[OrganizationRejection, ServerSentEventStream] =
override def stream(org: Label, offset: Offset): IO[ServerSentEventStream] =
IO.raiseWhen(org != projectRef.organization)(OrganizationNotFound(org)).as(stream(offset))

override def streamBy(
selector: Label,
org: Label,
offset: Offset
): IO[OrganizationRejection, ServerSentEventStream] =
): IO[ServerSentEventStream] =
IO.raiseWhen(org != projectRef.organization)(OrganizationNotFound(org)).as(streamBy(selector, offset))

override def stream(project: ProjectRef, offset: Offset): IO[ProjectRejection, ServerSentEventStream] =
override def stream(project: ProjectRef, offset: Offset): IO[ServerSentEventStream] =
IO.raiseWhen(project != projectRef)(ProjectNotFound(project)).as(stream(offset))

override def streamBy(
selector: Label,
project: ProjectRef,
offset: Offset
): IO[ProjectRejection, ServerSentEventStream] =
): IO[ServerSentEventStream] =
IO.raiseWhen(project != projectRef)(ProjectNotFound(project)).as(streamBy(selector, offset))

override def allSelectors: Set[Label] = Set(acl, project, resources)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package ch.epfl.bluebrain.nexus.delta.kernel

import com.typesafe.scalalogging.{Logger => ScalaLoggingLogger}
import monix.bio.{IO, UIO}
import cats.effect.{IO => CIO}
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toMonixBIOOps
import pureconfig.ConfigReader
import pureconfig.error.{CannotConvert, ConfigReaderFailures, ConvertFailure}
import pureconfig.generic.semiauto._
Expand Down Expand Up @@ -54,6 +56,18 @@ object RetryStrategy {
UIO.delay(logger.error(message))
}

/**
* Log errors when retrying
*/
def logError[E](logger: org.typelevel.log4cats.Logger[CIO], action: String): (E, RetryDetails) => CIO[Unit] = {
case (err, WillDelayAndRetry(nextDelay, retriesSoFar, _)) =>
val message = s"""Error $err while $action: retrying in ${nextDelay.toMillis}ms (retries so far: $retriesSoFar)"""
logger.warn(message)
case (err, GivingUp(totalRetries, _)) =>
val message = s"""Error $err while $action, giving up (total retries: $totalRetries)"""
logger.error(message)
}

/**
* Log errors when retrying
*/
Expand Down Expand Up @@ -125,6 +139,17 @@ object RetryStrategy {
(t: Throwable, d: RetryDetails) => logError(logger, action)(t, d)
)

def retryOnNonFatal(
config: RetryStrategyConfig,
logger: org.typelevel.log4cats.Logger[CIO],
action: String
): RetryStrategy[Throwable] =
RetryStrategy(
config,
(t: Throwable) => NonFatal(t),
(t: Throwable, d: RetryDetails) => logError(logger, action)(t, d).toBIO[Throwable]
)

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@ import scala.reflect.ClassTag
trait MigrateEffectSyntax {

implicit def toCatsIO[E <: Throwable, A](io: BIO[E, A]): IO[A] = io.to[IO]
implicit def uioToCatsIO[E <: Throwable, A](io: UIO[A]): IO[A] = io.to[IO]
implicit def toCatsIOOps[E <: Throwable, A](io: BIO[E, A]): MonixBioToCatsIOOps[E, A] = new MonixBioToCatsIOOps(io)
implicit def toCatsIOEitherOps[E, A](io: BIO[E, A]): MonixBioToCatsIOEitherOps[E, A] = new MonixBioToCatsIOEitherOps(
io
)

implicit def toMonixBIOOps[A](io: IO[A]): CatsIOToBioOps[A] = new CatsIOToBioOps(io)

val taskToIoK: Task ~> IO = λ[Task ~> IO](toCatsIO(_))
val uioToIoK: UIO ~> IO = λ[UIO ~> IO](uioToCatsIO(_))
val ioToUioK: IO ~> UIO = λ[IO ~> UIO](_.toUIO)
val ioToTaskK: IO ~> Task = λ[IO ~> Task](Task.from(_))

Expand All @@ -24,6 +29,10 @@ final class MonixBioToCatsIOOps[E <: Throwable, A](private val io: BIO[E, A]) ex
def toCatsIO: IO[A] = io.to[IO]
}

final class MonixBioToCatsIOEitherOps[E, A](private val io: BIO[E, A]) extends AnyVal {
def toCatsIOEither: IO[Either[E, A]] = io.attempt.to[IO]
}

final class CatsIOToBioOps[A](private val io: IO[A]) extends AnyVal {
def toBIO[E <: Throwable](implicit E: ClassTag[E]): BIO[E, A] =
BIO.from(io).mapErrorPartialWith {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph

import akka.actor.typed.ActorSystem
import cats.effect.Clock
import cats.effect.{Clock, ContextShift, IO}
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.config.BlazegraphViewsConfig
Expand Down Expand Up @@ -227,6 +227,7 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
baseUri: BaseUri,
cfg: BlazegraphViewsConfig,
s: Scheduler,
c: ContextShift[IO],
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering
) =>
Expand All @@ -240,6 +241,7 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
)(
baseUri,
s,
c,
cr,
ordering,
cfg.pagination
Expand Down
Loading