Skip to content

Commit

Permalink
Migrate SSE to cats effect
Browse files Browse the repository at this point in the history
  • Loading branch information
shinyhappydan committed Oct 16, 2023
1 parent cb4b6b1 commit 91f6662
Show file tree
Hide file tree
Showing 31 changed files with 274 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.StatusCodes.OK
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import cats.effect.{ContextShift, IO}
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.contexts
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives.{emit, lastEventId}
import ch.epfl.bluebrain.nexus.delta.sdk.ce.DeltaDirectives.{emit, lastEventId}
import ch.epfl.bluebrain.nexus.delta.sdk.directives.UriDirectives.baseUriPrefix
import ch.epfl.bluebrain.nexus.delta.sdk.directives.{AuthDirectives, DeltaSchemeDirectives}
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
Expand All @@ -24,7 +25,6 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.stream.RemainingElems
import io.circe.syntax.EncoderOps
import io.circe.{Encoder, JsonObject}
import kamon.instrumentation.akka.http.TracingDirectives.operationName
import monix.execution.Scheduler

import java.time.Instant

Expand All @@ -40,9 +40,9 @@ class ElemRoutes(
schemeDirectives: DeltaSchemeDirectives
)(implicit
baseUri: BaseUri,
s: Scheduler,
cr: RemoteContextResolution,
ordering: JsonKeyOrdering
ordering: JsonKeyOrdering,
contextShift: ContextShift[IO]
) extends AuthDirectives(identities, aclCheck: AclCheck) {
import baseUri.prefixSegment
import schemeDirectives._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package ch.epfl.bluebrain.nexus.delta.routes
import akka.http.scaladsl.model.StatusCodes.OK
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.{Directive1, Route}
import cats.effect.{ContextShift, IO}
import cats.implicits.catsSyntaxApplicativeError
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
Expand All @@ -12,11 +14,12 @@ import ch.epfl.bluebrain.nexus.delta.sdk.directives.UriDirectives._
import ch.epfl.bluebrain.nexus.delta.sdk.directives.{AuthDirectives, DeltaSchemeDirectives}
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.OrganizationRejection
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.events
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectRejection
import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseEventLog
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label
import kamon.instrumentation.akka.http.TracingDirectives.operationName
import monix.execution.Scheduler

/**
* The global events route.
Expand All @@ -37,7 +40,7 @@ class EventsRoutes(
schemeDirectives: DeltaSchemeDirectives
)(implicit
baseUri: BaseUri,
s: Scheduler,
c: ContextShift[IO],
cr: RemoteContextResolution,
ordering: JsonKeyOrdering
) extends AuthDirectives(identities, aclCheck: AclCheck) {
Expand Down Expand Up @@ -100,7 +103,7 @@ class EventsRoutes(
operationName(s"$prefixSegment/$selector/{org}/events") {
concat(
authorizeFor(org, events.read).apply {
emit(sseEventLog.streamBy(selector, org, offset))
emit(sseEventLog.streamBy(selector, org, offset).attemptNarrow[OrganizationRejection])
},
(head & authorizeFor(org, events.read)) {
complete(OK)
Expand All @@ -114,7 +117,7 @@ class EventsRoutes(
concat(
operationName(s"$prefixSegment/$selector/{org}/{proj}/events") {
authorizeFor(projectRef, events.read).apply {
emit(sseEventLog.streamBy(selector, projectRef, offset))
emit(sseEventLog.streamBy(selector, projectRef, offset).attemptNarrow[ProjectRejection])
}
},
(head & authorizeFor(projectRef, events.read)) {
Expand Down Expand Up @@ -144,7 +147,7 @@ object EventsRoutes {
schemeDirectives: DeltaSchemeDirectives
)(implicit
baseUri: BaseUri,
s: Scheduler,
c: ContextShift[IO],
cr: RemoteContextResolution,
ordering: JsonKeyOrdering
): Route = new EventsRoutes(identities, aclCheck, sseEventLog, schemeDirectives).routes
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ch.epfl.bluebrain.nexus.delta.wiring

import cats.effect.{ContextShift, IO}
import ch.epfl.bluebrain.nexus.delta.Main.pluginsMaxPriority
import ch.epfl.bluebrain.nexus.delta.config.AppConfig
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
Expand All @@ -18,7 +19,6 @@ import ch.epfl.bluebrain.nexus.delta.sdk.sse.{SseElemStream, SseEncoder, SseEven
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig
import izumi.distage.model.definition.{Id, ModuleDef}
import monix.execution.Scheduler

/**
* Events wiring
Expand All @@ -34,15 +34,13 @@ object EventsModule extends ModuleDef {
xas: Transactors,
jo: JsonKeyOrdering
) =>
toCatsIO(
SseEventLog(
sseEncoders,
organizations.fetch(_).void.toBIO[OrganizationRejection],
projects.fetch(_).map { p => (p.value.organizationUuid, p.value.uuid) },
config.sse,
xas
)(jo)
)
SseEventLog(
sseEncoders,
organizations.fetch(_).void.toBIO[OrganizationRejection],
projects.fetch(_).map { p => (p.value.organizationUuid, p.value.uuid) },
config.sse,
xas
)(jo)
}

make[SseElemStream].from { (qc: QueryConfig, xas: Transactors) =>
Expand All @@ -56,11 +54,11 @@ object EventsModule extends ModuleDef {
sseEventLog: SseEventLog,
schemeDirectives: DeltaSchemeDirectives,
baseUri: BaseUri,
s: Scheduler,
c: ContextShift[IO],
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering
) =>
new EventsRoutes(identities, aclCheck, sseEventLog, schemeDirectives)(baseUri, s, cr, ordering)
new EventsRoutes(identities, aclCheck, sseEventLog, schemeDirectives)(baseUri, c, cr, ordering)
}

many[PriorityRoute].add { (route: EventsRoutes) =>
Expand All @@ -74,11 +72,11 @@ object EventsModule extends ModuleDef {
sseElemStream: SseElemStream,
schemeDirectives: DeltaSchemeDirectives,
baseUri: BaseUri,
s: Scheduler,
contextShift: ContextShift[IO],
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering
) =>
new ElemRoutes(identities, aclCheck, sseElemStream, schemeDirectives)(baseUri, s, cr, ordering)
new ElemRoutes(identities, aclCheck, sseElemStream, schemeDirectives)(baseUri, cr, ordering, contextShift)
}

many[PriorityRoute].add { (route: ElemRoutes) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import akka.http.scaladsl.model.headers.{`Last-Event-ID`, OAuth2BearerToken}
import akka.http.scaladsl.model.sse.ServerSentEvent
import akka.http.scaladsl.model.{MediaTypes, StatusCodes}
import akka.http.scaladsl.server.Route
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclSimpleCheck
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaSchemeDirectives
Expand All @@ -20,13 +21,13 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.RemainingElems
import ch.epfl.bluebrain.nexus.testkit.CirceLiteral
import ch.epfl.bluebrain.nexus.testkit.bio.IOFromMap
import ch.epfl.bluebrain.nexus.testkit.ce.CatsIOValues
import fs2.Stream
import monix.bio.{Task, UIO}

import java.time.Instant
import java.util.UUID

class ElemRoutesSpec extends BaseRouteSpec with CirceLiteral with IOFromMap {
class ElemRoutesSpec extends BaseRouteSpec with CirceLiteral with IOFromMap with CatsIOValues {

private val aclCheck = AclSimpleCheck().accepted

Expand All @@ -45,7 +46,7 @@ class ElemRoutesSpec extends BaseRouteSpec with CirceLiteral with IOFromMap {

private val sseElemStream = new SseElemStream {

private val stream = Stream.emits(List(elem1, elem2, elem3)).covary[Task]
private val stream = Stream.emits(List(elem1, elem2, elem3)).covary[IO]

override def continuous(project: ProjectRef, selectFilter: SelectFilter, start: Offset): ServerSentEventStream =
stream
Expand All @@ -56,8 +57,8 @@ class ElemRoutesSpec extends BaseRouteSpec with CirceLiteral with IOFromMap {
project: ProjectRef,
selectFilter: SelectFilter,
start: Offset
): UIO[Option[RemainingElems]] =
UIO.some(RemainingElems(999L, Instant.EPOCH))
): IO[Option[RemainingElems]] =
IO.pure(Some(RemainingElems(999L, Instant.EPOCH)))
}

private val routes = Route.seal(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ import akka.http.scaladsl.model.headers.{`Last-Event-ID`, OAuth2BearerToken}
import akka.http.scaladsl.model.sse.ServerSentEvent
import akka.http.scaladsl.model.{MediaTypes, StatusCodes}
import akka.http.scaladsl.server.Route
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclSimpleCheck
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaSchemeDirectives
import ch.epfl.bluebrain.nexus.delta.sdk.identities.IdentitiesDummy
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.OrganizationRejection
import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.OrganizationRejection.OrganizationNotFound
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.events
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContextDummy
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectRejection
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectRejection.ProjectNotFound
import ch.epfl.bluebrain.nexus.delta.sdk.sse.{ServerSentEventStream, SseEventLog}
import ch.epfl.bluebrain.nexus.delta.sdk.utils.BaseRouteSpec
Expand All @@ -22,12 +21,12 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset.{At, Start}
import ch.epfl.bluebrain.nexus.testkit.bio.IOFromMap
import ch.epfl.bluebrain.nexus.testkit.ce.CatsIOValues
import fs2.Stream
import monix.bio.IO

import java.util.UUID

class EventsRoutesSpec extends BaseRouteSpec with IOFromMap {
class EventsRoutesSpec extends BaseRouteSpec with IOFromMap with CatsIOValues {

private val uuid = UUID.randomUUID()

Expand Down Expand Up @@ -64,24 +63,24 @@ class EventsRoutesSpec extends BaseRouteSpec with IOFromMap {
override def streamBy(selector: Label, offset: Offset): ServerSentEventStream =
stream(offset).filter(_.eventType.exists(_.toLowerCase == selector.value))

override def stream(org: Label, offset: Offset): IO[OrganizationRejection, ServerSentEventStream] =
override def stream(org: Label, offset: Offset): IO[ServerSentEventStream] =
IO.raiseWhen(org != projectRef.organization)(OrganizationNotFound(org)).as(stream(offset))

override def streamBy(
selector: Label,
org: Label,
offset: Offset
): IO[OrganizationRejection, ServerSentEventStream] =
): IO[ServerSentEventStream] =
IO.raiseWhen(org != projectRef.organization)(OrganizationNotFound(org)).as(streamBy(selector, offset))

override def stream(project: ProjectRef, offset: Offset): IO[ProjectRejection, ServerSentEventStream] =
override def stream(project: ProjectRef, offset: Offset): IO[ServerSentEventStream] =
IO.raiseWhen(project != projectRef)(ProjectNotFound(project)).as(stream(offset))

override def streamBy(
selector: Label,
project: ProjectRef,
offset: Offset
): IO[ProjectRejection, ServerSentEventStream] =
): IO[ServerSentEventStream] =
IO.raiseWhen(project != projectRef)(ProjectNotFound(project)).as(streamBy(selector, offset))

override def allSelectors: Set[Label] = Set(acl, project, resources)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package ch.epfl.bluebrain.nexus.delta.kernel

import com.typesafe.scalalogging.{Logger => ScalaLoggingLogger}
import monix.bio.{IO, UIO}
import cats.effect.{IO => CIO}
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toMonixBIOOps
import pureconfig.ConfigReader
import pureconfig.error.{CannotConvert, ConfigReaderFailures, ConvertFailure}
import pureconfig.generic.semiauto._
Expand Down Expand Up @@ -54,6 +56,18 @@ object RetryStrategy {
UIO.delay(logger.error(message))
}

/**
* Log errors when retrying
*/
def logError[E](logger: org.typelevel.log4cats.Logger[CIO], action: String): (E, RetryDetails) => CIO[Unit] = {
case (err, WillDelayAndRetry(nextDelay, retriesSoFar, _)) =>
val message = s"""Error $err while $action: retrying in ${nextDelay.toMillis}ms (retries so far: $retriesSoFar)"""
logger.warn(message)
case (err, GivingUp(totalRetries, _)) =>
val message = s"""Error $err while $action, giving up (total retries: $totalRetries)"""
logger.error(message)
}

/**
* Log errors when retrying
*/
Expand Down Expand Up @@ -125,6 +139,17 @@ object RetryStrategy {
(t: Throwable, d: RetryDetails) => logError(logger, action)(t, d)
)

def retryOnNonFatal(
config: RetryStrategyConfig,
logger: org.typelevel.log4cats.Logger[CIO],
action: String
): RetryStrategy[Throwable] =
RetryStrategy(
config,
(t: Throwable) => NonFatal(t),
(t: Throwable, d: RetryDetails) => logError(logger, action)(t, d).toBIO[Throwable]
)

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@ import scala.reflect.ClassTag
trait MigrateEffectSyntax {

implicit def toCatsIO[E <: Throwable, A](io: BIO[E, A]): IO[A] = io.to[IO]
implicit def uioToCatsIO[E <: Throwable, A](io: UIO[A]): IO[A] = io.to[IO]
implicit def toCatsIOOps[E <: Throwable, A](io: BIO[E, A]): MonixBioToCatsIOOps[E, A] = new MonixBioToCatsIOOps(io)
implicit def toCatsIOEitherOps[E, A](io: BIO[E, A]): MonixBioToCatsIOEitherOps[E, A] = new MonixBioToCatsIOEitherOps(
io
)

implicit def toMonixBIOOps[A](io: IO[A]): CatsIOToBioOps[A] = new CatsIOToBioOps(io)

val taskToIoK: Task ~> IO = λ[Task ~> IO](toCatsIO(_))
val uioToIoK: UIO ~> IO = λ[UIO ~> IO](uioToCatsIO(_))
val ioToUioK: IO ~> UIO = λ[IO ~> UIO](_.toUIO)
val ioToTaskK: IO ~> Task = λ[IO ~> Task](Task.from(_))

Expand All @@ -24,6 +29,10 @@ final class MonixBioToCatsIOOps[E <: Throwable, A](private val io: BIO[E, A]) ex
def toCatsIO: IO[A] = io.to[IO]
}

final class MonixBioToCatsIOEitherOps[E, A](private val io: BIO[E, A]) extends AnyVal {
def toCatsIOEither: IO[Either[E, A]] = io.attempt.to[IO]
}

final class CatsIOToBioOps[A](private val io: IO[A]) extends AnyVal {
def toBIO[E <: Throwable](implicit E: ClassTag[E]): BIO[E, A] =
BIO.from(io).mapErrorPartialWith {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph

import akka.actor.typed.ActorSystem
import cats.effect.Clock
import cats.effect.{Clock, ContextShift, IO}
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.config.BlazegraphViewsConfig
Expand Down Expand Up @@ -227,6 +227,7 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
baseUri: BaseUri,
cfg: BlazegraphViewsConfig,
s: Scheduler,
c: ContextShift[IO],
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering
) =>
Expand All @@ -240,6 +241,7 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
)(
baseUri,
s,
c,
cr,
ordering,
cfg.pagination
Expand Down
Loading

0 comments on commit 91f6662

Please sign in to comment.