Skip to content

Commit

Permalink
Migrate indexing actions to Cats Effect (#4355)
Browse files Browse the repository at this point in the history
Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas authored Oct 11, 2023
1 parent cfa7f5e commit 09f3df6
Show file tree
Hide file tree
Showing 20 changed files with 153 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server._
import cats.effect.IO
import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.{contexts, schemas}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.routes.ResourcesRoutes.asSourceWithMetadata
import ch.epfl.bluebrain.nexus.delta.sdk._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.circe.CirceUnmarshalling
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives._
Expand All @@ -25,6 +26,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.resources.NexusSource.DecodingOption
import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.ResourceRejection.{InvalidJsonLdFormat, InvalidSchemaRejection, ResourceNotFound}
import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.{Resource, ResourceRejection}
import ch.epfl.bluebrain.nexus.delta.sdk.resources.{NexusSource, Resources}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import io.circe.{Json, Printer}
import monix.bio.IO
import monix.execution.Scheduler
Expand Down Expand Up @@ -67,6 +69,9 @@ final class ResourcesRoutes(
implicit private def resourceFAJsonLdEncoder[A: JsonLdEncoder]: JsonLdEncoder[ResourceF[A]] =
ResourceF.resourceFAJsonLdEncoder(ContextValue.empty)

private def indexUIO(project: ProjectRef, resource: ResourceF[Resource], mode: IndexingMode) =
index(project, resource, mode).toUIO

def routes: Route =
baseUriPrefix(baseUri.prefix) {
pathPrefix("resources") {
Expand All @@ -79,7 +84,7 @@ final class ResourcesRoutes(
authorizeFor(ref, Write).apply {
emit(
Created,
resources.create(ref, resourceSchema, source.value).tapEval(index(ref, _, mode)).map(_.void)
resources.create(ref, resourceSchema, source.value).tapEval(indexUIO(ref, _, mode)).map(_.void)
)
}
},
Expand All @@ -94,7 +99,7 @@ final class ResourcesRoutes(
Created,
resources
.create(ref, schema, source.value)
.tapEval(index(ref, _, mode))
.tapEval(indexUIO(ref, _, mode))
.map(_.void)
.rejectWhen(wrongJsonOrNotFound)
)
Expand All @@ -115,7 +120,7 @@ final class ResourcesRoutes(
Created,
resources
.create(id, ref, schema, source.value)
.tapEval(index(ref, _, mode))
.tapEval(indexUIO(ref, _, mode))
.map(_.void)
.rejectWhen(wrongJsonOrNotFound)
)
Expand All @@ -124,7 +129,7 @@ final class ResourcesRoutes(
emit(
resources
.update(id, ref, schemaOpt, rev, source.value)
.tapEval(index(ref, _, mode))
.tapEval(indexUIO(ref, _, mode))
.map(_.void)
.rejectWhen(wrongJsonOrNotFound)
)
Expand All @@ -137,7 +142,7 @@ final class ResourcesRoutes(
emit(
resources
.deprecate(id, ref, schemaOpt, rev)
.tapEval(index(ref, _, mode))
.tapEval(indexUIO(ref, _, mode))
.map(_.void)
.rejectWhen(wrongJsonOrNotFound)
)
Expand Down Expand Up @@ -166,7 +171,7 @@ final class ResourcesRoutes(
OK,
resources
.refresh(id, ref, schemaOpt)
.tapEval(index(ref, _, mode))
.tapEval(indexUIO(ref, _, mode))
.map(_.void)
.rejectWhen(wrongJsonOrNotFound)
)
Expand Down Expand Up @@ -214,7 +219,7 @@ final class ResourcesRoutes(
Created,
resources
.tag(id, ref, schemaOpt, tag, tagRev, rev)
.tapEval(index(ref, _, mode))
.tapEval(indexUIO(ref, _, mode))
.map(_.void)
.rejectWhen(wrongJsonOrNotFound)
)
Expand All @@ -229,7 +234,7 @@ final class ResourcesRoutes(
emit(
resources
.deleteTag(id, ref, schemaOpt, tag, rev)
.tapEval(index(ref, _, mode))
.tapEval(indexUIO(ref, _, mode))
.map(_.void)
.rejectOn[ResourceNotFound]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server._
import cats.effect.IO
import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.contexts
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.schemas.shacl
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ class DeltaModule(appCfg: AppConfig, config: Config)(implicit classLoader: Class

many[MetadataContextValue].addEffect(MetadataContextValue.fromFile("contexts/metadata.json"))

make[IndexingAction].named("aggregate").from { (internal: Set[IndexingAction]) =>
AggregateIndexingAction(NonEmptyList.fromListUnsafe(internal.toList))
make[AggregateIndexingAction].from {
(internal: Set[IndexingAction], contextShift: ContextShift[IO], cr: RemoteContextResolution @Id("aggregate")) =>
AggregateIndexingAction(NonEmptyList.fromListUnsafe(internal.toList))(contextShift, cr)
}

make[RemoteContextResolution].named("aggregate").fromEffect { (otherCtxResolutions: Set[RemoteContextResolution]) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.routes.ResolversRoutes
import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction.AggregateIndexingAction
import ch.epfl.bluebrain.nexus.delta.sdk._
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaSchemeDirectives
Expand Down Expand Up @@ -70,7 +71,7 @@ object ResolversModule extends ModuleDef {
aclCheck: AclCheck,
resolvers: Resolvers,
schemeDirectives: DeltaSchemeDirectives,
indexingAction: IndexingAction @Id("aggregate"),
indexingAction: AggregateIndexingAction,
shift: Resolver.Shift,
multiResolution: MultiResolution,
baseUri: BaseUri,
Expand All @@ -84,7 +85,7 @@ object ResolversModule extends ModuleDef {
resolvers,
multiResolution,
schemeDirectives,
indexingAction(_, _, _)(shift, cr)
indexingAction(_, _, _)(shift)
)(
baseUri,
cr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.routes.ResourcesRoutes
import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction.AggregateIndexingAction
import ch.epfl.bluebrain.nexus.delta.sdk._
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaSchemeDirectives
Expand Down Expand Up @@ -81,7 +82,7 @@ object ResourcesModule extends ModuleDef {
aclCheck: AclCheck,
resources: Resources,
schemeDirectives: DeltaSchemeDirectives,
indexingAction: IndexingAction @Id("aggregate"),
indexingAction: AggregateIndexingAction,
shift: Resource.Shift,
baseUri: BaseUri,
s: Scheduler,
Expand All @@ -95,7 +96,7 @@ object ResourcesModule extends ModuleDef {
aclCheck,
resources,
schemeDirectives,
indexingAction(_, _, _)(shift, cr)
indexingAction(_, _, _)(shift)
)(
baseUri,
s,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.routes.SchemasRoutes
import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction.AggregateIndexingAction
import ch.epfl.bluebrain.nexus.delta.sdk._
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaSchemeDirectives
Expand Down Expand Up @@ -71,14 +72,14 @@ object SchemasModule extends ModuleDef {
aclCheck: AclCheck,
schemas: Schemas,
schemeDirectives: DeltaSchemeDirectives,
indexingAction: IndexingAction @Id("aggregate"),
indexingAction: AggregateIndexingAction,
shift: Schema.Shift,
baseUri: BaseUri,
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering,
fusionConfig: FusionConfig
) =>
new SchemasRoutes(identities, aclCheck, schemas, schemeDirectives, indexingAction(_, _, _)(shift, cr))(
new SchemasRoutes(identities, aclCheck, schemas, schemeDirectives, indexingAction(_, _, _)(shift))(
baseUri,
cr,
ordering,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.IndexingViewDef.{ActiveViewDef, DeprecatedViewDef}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.{BlazegraphSink, IndexingViewDef}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sourcing.config.BatchConfig
Expand Down Expand Up @@ -49,9 +48,7 @@ final class BlazegraphIndexingAction(
case _: DeprecatedViewDef => UIO.none
}

def projections(project: ProjectRef, elem: Elem[GraphResource])(implicit
cr: RemoteContextResolution
): ElemStream[CompiledProjection] =
def projections(project: ProjectRef, elem: Elem[GraphResource]): ElemStream[CompiledProjection] =
fetchCurrentViews(project).evalMap { _.evalMapFilter(compile(_, elem)) }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries.{BlazegraphS
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction.AggregateIndexingAction
import ch.epfl.bluebrain.nexus.delta.sdk._
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.deletion.ProjectDeletionTask
Expand Down Expand Up @@ -180,7 +181,7 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
views: BlazegraphViews,
viewsQuery: BlazegraphViewsQuery,
schemeDirectives: DeltaSchemeDirectives,
indexingAction: IndexingAction @Id("aggregate"),
indexingAction: AggregateIndexingAction,
shift: BlazegraphView.Shift,
baseUri: BaseUri,
cfg: BlazegraphViewsConfig,
Expand All @@ -195,7 +196,7 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
identities,
aclCheck,
schemeDirectives,
indexingAction(_, _, _)(shift, cr)
indexingAction(_, _, _)(shift)
)(
baseUri,
s,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import akka.http.scaladsl.model.StatusCodes.Created
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.{Directive0, Route}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphView._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewRejection._
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model._
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.permissions.{read => Read, write => Write}
Expand All @@ -13,7 +14,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteCon
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder
import ch.epfl.bluebrain.nexus.delta.rdf.query.SparqlQuery
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction
import ch.epfl.bluebrain.nexus.delta.sdk.{IndexingAction, IndexingMode}
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.circe.CirceUnmarshalling
import ch.epfl.bluebrain.nexus.delta.sdk.directives.{AuthDirectives, DeltaDirectives, DeltaSchemeDirectives}
Expand All @@ -25,7 +26,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.RdfMarshalling
import ch.epfl.bluebrain.nexus.delta.sdk.model.routes.Tag
import ch.epfl.bluebrain.nexus.delta.sdk.model.search.SearchResults._
import ch.epfl.bluebrain.nexus.delta.sdk.model.search.{PaginationConfig, SearchResults}
import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, IdSegment}
import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, IdSegment, ResourceF}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import io.circe.Json
import monix.execution.Scheduler
Expand Down Expand Up @@ -66,6 +67,9 @@ class BlazegraphViewsRoutes(

import schemeDirectives._

private def indexUIO(project: ProjectRef, resource: ResourceF[BlazegraphView], mode: IndexingMode) =
index(project, resource, mode).toUIO

def routes: Route =
concat(
pathPrefix("views") {
Expand All @@ -79,7 +83,7 @@ class BlazegraphViewsRoutes(
Created,
views
.create(ref, source)
.tapEval(index(ref, _, mode))
.tapEval(indexUIO(ref, _, mode))
.mapValue(_.metadata)
.rejectWhen(decodingFailedOrViewNotFound)
)
Expand All @@ -98,7 +102,7 @@ class BlazegraphViewsRoutes(
Created,
views
.create(id, ref, source)
.tapEval(index(ref, _, mode))
.tapEval(indexUIO(ref, _, mode))
.mapValue(_.metadata)
.rejectWhen(decodingFailedOrViewNotFound)
)
Expand All @@ -107,7 +111,7 @@ class BlazegraphViewsRoutes(
emit(
views
.update(id, ref, rev, source)
.tapEval(index(ref, _, mode))
.tapEval(indexUIO(ref, _, mode))
.mapValue(_.metadata)
.rejectWhen(decodingFailedOrViewNotFound)
)
Expand All @@ -120,7 +124,7 @@ class BlazegraphViewsRoutes(
emit(
views
.deprecate(id, ref, rev)
.tapEval(index(ref, _, mode))
.tapEval(indexUIO(ref, _, mode))
.mapValue(_.metadata)
.rejectOn[ViewNotFound]
)
Expand Down Expand Up @@ -163,7 +167,7 @@ class BlazegraphViewsRoutes(
Created,
views
.tag(id, ref, tag, tagRev, rev)
.tapEval(index(ref, _, mode))
.tapEval(indexUIO(ref, _, mode))
.mapValue(_.metadata)
.rejectOn[ViewNotFound]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionErr.CouldNotFindPipeErr
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{NoopSink, PipeChain, PipeRef}
import ch.epfl.bluebrain.nexus.testkit.bio.{BioSuite, PatienceConfig}
import ch.epfl.bluebrain.nexus.testkit.bio.PatienceConfig
import ch.epfl.bluebrain.nexus.testkit.ce.CatsEffectSuite
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import fs2.Stream

import java.time.Instant
import scala.concurrent.duration._

class BlazegraphIndexingActionSuite extends BioSuite with Fixtures {
class BlazegraphIndexingActionSuite extends CatsEffectSuite with Fixtures {

implicit private val patienceConfig: PatienceConfig = PatienceConfig(5.seconds, 10.millis)

Expand Down Expand Up @@ -162,18 +164,19 @@ class BlazegraphIndexingActionSuite extends BioSuite with Fixtures {

indexingAction
.projections(project, elem)
.translate(taskToIoK)
.fold(emptyAcc) {
case (acc, s: SuccessElem[_]) => acc.success(s.id)
case (acc, d: DroppedElem) => acc.drop(d.id)
case (acc, f: FailedElem) => acc.failed(f.id)
}
.compile
.lastOrError
.assert(expected)
.assertEquals(expected)
}

test("A valid elem should be indexed") {
indexingAction.apply(project, elem).assert(List.empty)
indexingAction.apply(project, elem).assertEquals(List.empty)
}

test("A failed elem should be returned") {
Expand All @@ -187,7 +190,7 @@ class BlazegraphIndexingActionSuite extends BioSuite with Fixtures {
rev = 1
)

indexingAction.apply(project, failed).assert(List(failed))
indexingAction.apply(project, failed).assertEquals(List(failed))
}

}
Expand Down
Loading

0 comments on commit 09f3df6

Please sign in to comment.