Skip to content

Commit

Permalink
Merge branch 'master' into migrate-graph-analytics-to-ce
Browse files Browse the repository at this point in the history
  • Loading branch information
dantb committed Oct 17, 2023
2 parents 5d05de9 + 5c3ec3d commit c06a720
Show file tree
Hide file tree
Showing 31 changed files with 299 additions and 187 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.StatusCodes.OK
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import cats.effect.{ContextShift, IO}
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.contexts
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives.{emit, lastEventId}
import ch.epfl.bluebrain.nexus.delta.sdk.ce.DeltaDirectives.{emit, lastEventId}
import ch.epfl.bluebrain.nexus.delta.sdk.directives.UriDirectives.baseUriPrefix
import ch.epfl.bluebrain.nexus.delta.sdk.directives.{AuthDirectives, DeltaSchemeDirectives}
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
Expand All @@ -24,7 +25,6 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.stream.RemainingElems
import io.circe.syntax.EncoderOps
import io.circe.{Encoder, JsonObject}
import kamon.instrumentation.akka.http.TracingDirectives.operationName
import monix.execution.Scheduler

import java.time.Instant

Expand All @@ -40,9 +40,9 @@ class ElemRoutes(
schemeDirectives: DeltaSchemeDirectives
)(implicit
baseUri: BaseUri,
s: Scheduler,
cr: RemoteContextResolution,
ordering: JsonKeyOrdering
ordering: JsonKeyOrdering,
contextShift: ContextShift[IO]
) extends AuthDirectives(identities, aclCheck: AclCheck) {
import baseUri.prefixSegment
import schemeDirectives._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package ch.epfl.bluebrain.nexus.delta.routes
import akka.http.scaladsl.model.StatusCodes.OK
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.{Directive1, Route}
import cats.effect.{ContextShift, IO}
import cats.implicits.catsSyntaxApplicativeError
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
Expand All @@ -12,11 +14,12 @@ import ch.epfl.bluebrain.nexus.delta.sdk.directives.UriDirectives._
import ch.epfl.bluebrain.nexus.delta.sdk.directives.{AuthDirectives, DeltaSchemeDirectives}
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.OrganizationRejection
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.events
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectRejection
import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseEventLog
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label
import kamon.instrumentation.akka.http.TracingDirectives.operationName
import monix.execution.Scheduler

/**
* The global events route.
Expand All @@ -37,7 +40,7 @@ class EventsRoutes(
schemeDirectives: DeltaSchemeDirectives
)(implicit
baseUri: BaseUri,
s: Scheduler,
c: ContextShift[IO],
cr: RemoteContextResolution,
ordering: JsonKeyOrdering
) extends AuthDirectives(identities, aclCheck: AclCheck) {
Expand Down Expand Up @@ -100,7 +103,7 @@ class EventsRoutes(
operationName(s"$prefixSegment/$selector/{org}/events") {
concat(
authorizeFor(org, events.read).apply {
emit(sseEventLog.streamBy(selector, org, offset))
emit(sseEventLog.streamBy(selector, org, offset).attemptNarrow[OrganizationRejection])
},
(head & authorizeFor(org, events.read)) {
complete(OK)
Expand All @@ -114,7 +117,7 @@ class EventsRoutes(
concat(
operationName(s"$prefixSegment/$selector/{org}/{proj}/events") {
authorizeFor(projectRef, events.read).apply {
emit(sseEventLog.streamBy(selector, projectRef, offset))
emit(sseEventLog.streamBy(selector, projectRef, offset).attemptNarrow[ProjectRejection])
}
},
(head & authorizeFor(projectRef, events.read)) {
Expand Down Expand Up @@ -144,7 +147,7 @@ object EventsRoutes {
schemeDirectives: DeltaSchemeDirectives
)(implicit
baseUri: BaseUri,
s: Scheduler,
c: ContextShift[IO],
cr: RemoteContextResolution,
ordering: JsonKeyOrdering
): Route = new EventsRoutes(identities, aclCheck, sseEventLog, schemeDirectives).routes
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ch.epfl.bluebrain.nexus.delta.wiring

import cats.effect.{ContextShift, IO}
import ch.epfl.bluebrain.nexus.delta.Main.pluginsMaxPriority
import ch.epfl.bluebrain.nexus.delta.config.AppConfig
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
Expand All @@ -18,7 +19,6 @@ import ch.epfl.bluebrain.nexus.delta.sdk.sse.{SseElemStream, SseEncoder, SseEven
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig
import izumi.distage.model.definition.{Id, ModuleDef}
import monix.execution.Scheduler

/**
* Events wiring
Expand All @@ -34,15 +34,13 @@ object EventsModule extends ModuleDef {
xas: Transactors,
jo: JsonKeyOrdering
) =>
toCatsIO(
SseEventLog(
sseEncoders,
organizations.fetch(_).void.toBIO[OrganizationRejection],
projects.fetch(_).map { p => (p.value.organizationUuid, p.value.uuid) },
config.sse,
xas
)(jo)
)
SseEventLog(
sseEncoders,
organizations.fetch(_).void.toBIO[OrganizationRejection],
projects.fetch(_).map { p => (p.value.organizationUuid, p.value.uuid) },
config.sse,
xas
)(jo)
}

make[SseElemStream].from { (qc: QueryConfig, xas: Transactors) =>
Expand All @@ -56,11 +54,11 @@ object EventsModule extends ModuleDef {
sseEventLog: SseEventLog,
schemeDirectives: DeltaSchemeDirectives,
baseUri: BaseUri,
s: Scheduler,
c: ContextShift[IO],
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering
) =>
new EventsRoutes(identities, aclCheck, sseEventLog, schemeDirectives)(baseUri, s, cr, ordering)
new EventsRoutes(identities, aclCheck, sseEventLog, schemeDirectives)(baseUri, c, cr, ordering)
}

many[PriorityRoute].add { (route: EventsRoutes) =>
Expand All @@ -74,11 +72,11 @@ object EventsModule extends ModuleDef {
sseElemStream: SseElemStream,
schemeDirectives: DeltaSchemeDirectives,
baseUri: BaseUri,
s: Scheduler,
contextShift: ContextShift[IO],
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering
) =>
new ElemRoutes(identities, aclCheck, sseElemStream, schemeDirectives)(baseUri, s, cr, ordering)
new ElemRoutes(identities, aclCheck, sseElemStream, schemeDirectives)(baseUri, cr, ordering, contextShift)
}

many[PriorityRoute].add { (route: ElemRoutes) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import akka.http.scaladsl.model.headers.{`Last-Event-ID`, OAuth2BearerToken}
import akka.http.scaladsl.model.sse.ServerSentEvent
import akka.http.scaladsl.model.{MediaTypes, StatusCodes}
import akka.http.scaladsl.server.Route
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclSimpleCheck
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaSchemeDirectives
Expand All @@ -20,13 +21,13 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.RemainingElems
import ch.epfl.bluebrain.nexus.testkit.CirceLiteral
import ch.epfl.bluebrain.nexus.testkit.bio.IOFromMap
import ch.epfl.bluebrain.nexus.testkit.ce.CatsIOValues
import fs2.Stream
import monix.bio.{Task, UIO}

import java.time.Instant
import java.util.UUID

class ElemRoutesSpec extends BaseRouteSpec with CirceLiteral with IOFromMap {
class ElemRoutesSpec extends BaseRouteSpec with CirceLiteral with IOFromMap with CatsIOValues {

private val aclCheck = AclSimpleCheck().accepted

Expand All @@ -45,7 +46,7 @@ class ElemRoutesSpec extends BaseRouteSpec with CirceLiteral with IOFromMap {

private val sseElemStream = new SseElemStream {

private val stream = Stream.emits(List(elem1, elem2, elem3)).covary[Task]
private val stream = Stream.emits(List(elem1, elem2, elem3)).covary[IO]

override def continuous(project: ProjectRef, selectFilter: SelectFilter, start: Offset): ServerSentEventStream =
stream
Expand All @@ -56,8 +57,8 @@ class ElemRoutesSpec extends BaseRouteSpec with CirceLiteral with IOFromMap {
project: ProjectRef,
selectFilter: SelectFilter,
start: Offset
): UIO[Option[RemainingElems]] =
UIO.some(RemainingElems(999L, Instant.EPOCH))
): IO[Option[RemainingElems]] =
IO.pure(Some(RemainingElems(999L, Instant.EPOCH)))
}

private val routes = Route.seal(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ import akka.http.scaladsl.model.headers.{`Last-Event-ID`, OAuth2BearerToken}
import akka.http.scaladsl.model.sse.ServerSentEvent
import akka.http.scaladsl.model.{MediaTypes, StatusCodes}
import akka.http.scaladsl.server.Route
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclSimpleCheck
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaSchemeDirectives
import ch.epfl.bluebrain.nexus.delta.sdk.identities.IdentitiesDummy
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.OrganizationRejection
import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.OrganizationRejection.OrganizationNotFound
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.events
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContextDummy
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectRejection
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectRejection.ProjectNotFound
import ch.epfl.bluebrain.nexus.delta.sdk.sse.{ServerSentEventStream, SseEventLog}
import ch.epfl.bluebrain.nexus.delta.sdk.utils.BaseRouteSpec
Expand All @@ -22,12 +21,12 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset.{At, Start}
import ch.epfl.bluebrain.nexus.testkit.bio.IOFromMap
import ch.epfl.bluebrain.nexus.testkit.ce.CatsIOValues
import fs2.Stream
import monix.bio.IO

import java.util.UUID

class EventsRoutesSpec extends BaseRouteSpec with IOFromMap {
class EventsRoutesSpec extends BaseRouteSpec with IOFromMap with CatsIOValues {

private val uuid = UUID.randomUUID()

Expand Down Expand Up @@ -64,24 +63,24 @@ class EventsRoutesSpec extends BaseRouteSpec with IOFromMap {
override def streamBy(selector: Label, offset: Offset): ServerSentEventStream =
stream(offset).filter(_.eventType.exists(_.toLowerCase == selector.value))

override def stream(org: Label, offset: Offset): IO[OrganizationRejection, ServerSentEventStream] =
override def stream(org: Label, offset: Offset): IO[ServerSentEventStream] =
IO.raiseWhen(org != projectRef.organization)(OrganizationNotFound(org)).as(stream(offset))

override def streamBy(
selector: Label,
org: Label,
offset: Offset
): IO[OrganizationRejection, ServerSentEventStream] =
): IO[ServerSentEventStream] =
IO.raiseWhen(org != projectRef.organization)(OrganizationNotFound(org)).as(streamBy(selector, offset))

override def stream(project: ProjectRef, offset: Offset): IO[ProjectRejection, ServerSentEventStream] =
override def stream(project: ProjectRef, offset: Offset): IO[ServerSentEventStream] =
IO.raiseWhen(project != projectRef)(ProjectNotFound(project)).as(stream(offset))

override def streamBy(
selector: Label,
project: ProjectRef,
offset: Offset
): IO[ProjectRejection, ServerSentEventStream] =
): IO[ServerSentEventStream] =
IO.raiseWhen(project != projectRef)(ProjectNotFound(project)).as(streamBy(selector, offset))

override def allSelectors: Set[Label] = Set(acl, project, resources)
Expand Down
Loading

0 comments on commit c06a720

Please sign in to comment.