Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate indexing actions to Cats Effect #4355

Merged
merged 1 commit into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server._
import cats.effect.IO
import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.{contexts, schemas}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.routes.ResourcesRoutes.asSourceWithMetadata
import ch.epfl.bluebrain.nexus.delta.sdk._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.circe.CirceUnmarshalling
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives._
Expand All @@ -25,6 +26,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.resources.NexusSource.DecodingOption
import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.ResourceRejection.{InvalidJsonLdFormat, InvalidSchemaRejection, ResourceNotFound}
import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.{Resource, ResourceRejection}
import ch.epfl.bluebrain.nexus.delta.sdk.resources.{NexusSource, Resources}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import io.circe.{Json, Printer}
import monix.bio.IO
import monix.execution.Scheduler
Expand Down Expand Up @@ -67,6 +69,9 @@ final class ResourcesRoutes(
implicit private def resourceFAJsonLdEncoder[A: JsonLdEncoder]: JsonLdEncoder[ResourceF[A]] =
ResourceF.resourceFAJsonLdEncoder(ContextValue.empty)

private def indexUIO(project: ProjectRef, resource: ResourceF[Resource], mode: IndexingMode) =
index(project, resource, mode).toUIO

def routes: Route =
baseUriPrefix(baseUri.prefix) {
pathPrefix("resources") {
Expand All @@ -79,7 +84,7 @@ final class ResourcesRoutes(
authorizeFor(ref, Write).apply {
emit(
Created,
resources.create(ref, resourceSchema, source.value).tapEval(index(ref, _, mode)).map(_.void)
resources.create(ref, resourceSchema, source.value).tapEval(indexUIO(ref, _, mode)).map(_.void)
)
}
},
Expand All @@ -94,7 +99,7 @@ final class ResourcesRoutes(
Created,
resources
.create(ref, schema, source.value)
.tapEval(index(ref, _, mode))
.tapEval(indexUIO(ref, _, mode))
.map(_.void)
.rejectWhen(wrongJsonOrNotFound)
)
Expand All @@ -115,7 +120,7 @@ final class ResourcesRoutes(
Created,
resources
.create(id, ref, schema, source.value)
.tapEval(index(ref, _, mode))
.tapEval(indexUIO(ref, _, mode))
.map(_.void)
.rejectWhen(wrongJsonOrNotFound)
)
Expand All @@ -124,7 +129,7 @@ final class ResourcesRoutes(
emit(
resources
.update(id, ref, schemaOpt, rev, source.value)
.tapEval(index(ref, _, mode))
.tapEval(indexUIO(ref, _, mode))
.map(_.void)
.rejectWhen(wrongJsonOrNotFound)
)
Expand All @@ -137,7 +142,7 @@ final class ResourcesRoutes(
emit(
resources
.deprecate(id, ref, schemaOpt, rev)
.tapEval(index(ref, _, mode))
.tapEval(indexUIO(ref, _, mode))
.map(_.void)
.rejectWhen(wrongJsonOrNotFound)
)
Expand Down Expand Up @@ -166,7 +171,7 @@ final class ResourcesRoutes(
OK,
resources
.refresh(id, ref, schemaOpt)
.tapEval(index(ref, _, mode))
.tapEval(indexUIO(ref, _, mode))
.map(_.void)
.rejectWhen(wrongJsonOrNotFound)
)
Expand Down Expand Up @@ -214,7 +219,7 @@ final class ResourcesRoutes(
Created,
resources
.tag(id, ref, schemaOpt, tag, tagRev, rev)
.tapEval(index(ref, _, mode))
.tapEval(indexUIO(ref, _, mode))
.map(_.void)
.rejectWhen(wrongJsonOrNotFound)
)
Expand All @@ -229,7 +234,7 @@ final class ResourcesRoutes(
emit(
resources
.deleteTag(id, ref, schemaOpt, tag, rev)
.tapEval(index(ref, _, mode))
.tapEval(indexUIO(ref, _, mode))
.map(_.void)
.rejectOn[ResourceNotFound]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server._
import cats.effect.IO
import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.contexts
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.schemas.shacl
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ class DeltaModule(appCfg: AppConfig, config: Config)(implicit classLoader: Class

many[MetadataContextValue].addEffect(MetadataContextValue.fromFile("contexts/metadata.json"))

make[IndexingAction].named("aggregate").from { (internal: Set[IndexingAction]) =>
AggregateIndexingAction(NonEmptyList.fromListUnsafe(internal.toList))
make[AggregateIndexingAction].from {
(internal: Set[IndexingAction], contextShift: ContextShift[IO], cr: RemoteContextResolution @Id("aggregate")) =>
AggregateIndexingAction(NonEmptyList.fromListUnsafe(internal.toList))(contextShift, cr)
}

make[RemoteContextResolution].named("aggregate").fromEffect { (otherCtxResolutions: Set[RemoteContextResolution]) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.routes.ResolversRoutes
import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction.AggregateIndexingAction
import ch.epfl.bluebrain.nexus.delta.sdk._
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaSchemeDirectives
Expand Down Expand Up @@ -70,7 +71,7 @@ object ResolversModule extends ModuleDef {
aclCheck: AclCheck,
resolvers: Resolvers,
schemeDirectives: DeltaSchemeDirectives,
indexingAction: IndexingAction @Id("aggregate"),
indexingAction: AggregateIndexingAction,
shift: Resolver.Shift,
multiResolution: MultiResolution,
baseUri: BaseUri,
Expand All @@ -84,7 +85,7 @@ object ResolversModule extends ModuleDef {
resolvers,
multiResolution,
schemeDirectives,
indexingAction(_, _, _)(shift, cr)
indexingAction(_, _, _)(shift)
)(
baseUri,
cr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.routes.ResourcesRoutes
import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction.AggregateIndexingAction
import ch.epfl.bluebrain.nexus.delta.sdk._
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaSchemeDirectives
Expand Down Expand Up @@ -81,7 +82,7 @@ object ResourcesModule extends ModuleDef {
aclCheck: AclCheck,
resources: Resources,
schemeDirectives: DeltaSchemeDirectives,
indexingAction: IndexingAction @Id("aggregate"),
indexingAction: AggregateIndexingAction,
shift: Resource.Shift,
baseUri: BaseUri,
s: Scheduler,
Expand All @@ -95,7 +96,7 @@ object ResourcesModule extends ModuleDef {
aclCheck,
resources,
schemeDirectives,
indexingAction(_, _, _)(shift, cr)
indexingAction(_, _, _)(shift)
)(
baseUri,
s,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.routes.SchemasRoutes
import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction.AggregateIndexingAction
import ch.epfl.bluebrain.nexus.delta.sdk._
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaSchemeDirectives
Expand Down Expand Up @@ -71,14 +72,14 @@ object SchemasModule extends ModuleDef {
aclCheck: AclCheck,
schemas: Schemas,
schemeDirectives: DeltaSchemeDirectives,
indexingAction: IndexingAction @Id("aggregate"),
indexingAction: AggregateIndexingAction,
shift: Schema.Shift,
baseUri: BaseUri,
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering,
fusionConfig: FusionConfig
) =>
new SchemasRoutes(identities, aclCheck, schemas, schemeDirectives, indexingAction(_, _, _)(shift, cr))(
new SchemasRoutes(identities, aclCheck, schemas, schemeDirectives, indexingAction(_, _, _)(shift))(
baseUri,
cr,
ordering,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.IndexingViewDef.{ActiveViewDef, DeprecatedViewDef}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.{BlazegraphSink, IndexingViewDef}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sourcing.config.BatchConfig
Expand Down Expand Up @@ -49,9 +48,7 @@ final class BlazegraphIndexingAction(
case _: DeprecatedViewDef => UIO.none
}

def projections(project: ProjectRef, elem: Elem[GraphResource])(implicit
cr: RemoteContextResolution
): ElemStream[CompiledProjection] =
def projections(project: ProjectRef, elem: Elem[GraphResource]): ElemStream[CompiledProjection] =
fetchCurrentViews(project).evalMap { _.evalMapFilter(compile(_, elem)) }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries.{BlazegraphS
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction.AggregateIndexingAction
import ch.epfl.bluebrain.nexus.delta.sdk._
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.deletion.ProjectDeletionTask
Expand Down Expand Up @@ -180,7 +181,7 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
views: BlazegraphViews,
viewsQuery: BlazegraphViewsQuery,
schemeDirectives: DeltaSchemeDirectives,
indexingAction: IndexingAction @Id("aggregate"),
indexingAction: AggregateIndexingAction,
shift: BlazegraphView.Shift,
baseUri: BaseUri,
cfg: BlazegraphViewsConfig,
Expand All @@ -195,7 +196,7 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
identities,
aclCheck,
schemeDirectives,
indexingAction(_, _, _)(shift, cr)
indexingAction(_, _, _)(shift)
)(
baseUri,
s,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import akka.http.scaladsl.model.StatusCodes.Created
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.{Directive0, Route}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphView._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewRejection._
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model._
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.permissions.{read => Read, write => Write}
Expand All @@ -13,7 +14,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteCon
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder
import ch.epfl.bluebrain.nexus.delta.rdf.query.SparqlQuery
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction
import ch.epfl.bluebrain.nexus.delta.sdk.{IndexingAction, IndexingMode}
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.circe.CirceUnmarshalling
import ch.epfl.bluebrain.nexus.delta.sdk.directives.{AuthDirectives, DeltaDirectives, DeltaSchemeDirectives}
Expand All @@ -25,7 +26,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.RdfMarshalling
import ch.epfl.bluebrain.nexus.delta.sdk.model.routes.Tag
import ch.epfl.bluebrain.nexus.delta.sdk.model.search.SearchResults._
import ch.epfl.bluebrain.nexus.delta.sdk.model.search.{PaginationConfig, SearchResults}
import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, IdSegment}
import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, IdSegment, ResourceF}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import io.circe.Json
import monix.execution.Scheduler
Expand Down Expand Up @@ -66,6 +67,9 @@ class BlazegraphViewsRoutes(

import schemeDirectives._

private def indexUIO(project: ProjectRef, resource: ResourceF[BlazegraphView], mode: IndexingMode) =
index(project, resource, mode).toUIO

def routes: Route =
concat(
pathPrefix("views") {
Expand All @@ -79,7 +83,7 @@ class BlazegraphViewsRoutes(
Created,
views
.create(ref, source)
.tapEval(index(ref, _, mode))
.tapEval(indexUIO(ref, _, mode))
.mapValue(_.metadata)
.rejectWhen(decodingFailedOrViewNotFound)
)
Expand All @@ -98,7 +102,7 @@ class BlazegraphViewsRoutes(
Created,
views
.create(id, ref, source)
.tapEval(index(ref, _, mode))
.tapEval(indexUIO(ref, _, mode))
.mapValue(_.metadata)
.rejectWhen(decodingFailedOrViewNotFound)
)
Expand All @@ -107,7 +111,7 @@ class BlazegraphViewsRoutes(
emit(
views
.update(id, ref, rev, source)
.tapEval(index(ref, _, mode))
.tapEval(indexUIO(ref, _, mode))
.mapValue(_.metadata)
.rejectWhen(decodingFailedOrViewNotFound)
)
Expand All @@ -120,7 +124,7 @@ class BlazegraphViewsRoutes(
emit(
views
.deprecate(id, ref, rev)
.tapEval(index(ref, _, mode))
.tapEval(indexUIO(ref, _, mode))
.mapValue(_.metadata)
.rejectOn[ViewNotFound]
)
Expand Down Expand Up @@ -163,7 +167,7 @@ class BlazegraphViewsRoutes(
Created,
views
.tag(id, ref, tag, tagRev, rev)
.tapEval(index(ref, _, mode))
.tapEval(indexUIO(ref, _, mode))
.mapValue(_.metadata)
.rejectOn[ViewNotFound]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionErr.CouldNotFindPipeErr
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{NoopSink, PipeChain, PipeRef}
import ch.epfl.bluebrain.nexus.testkit.bio.{BioSuite, PatienceConfig}
import ch.epfl.bluebrain.nexus.testkit.bio.PatienceConfig
import ch.epfl.bluebrain.nexus.testkit.ce.CatsEffectSuite
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import fs2.Stream

import java.time.Instant
import scala.concurrent.duration._

class BlazegraphIndexingActionSuite extends BioSuite with Fixtures {
class BlazegraphIndexingActionSuite extends CatsEffectSuite with Fixtures {

implicit private val patienceConfig: PatienceConfig = PatienceConfig(5.seconds, 10.millis)

Expand Down Expand Up @@ -162,18 +164,19 @@ class BlazegraphIndexingActionSuite extends BioSuite with Fixtures {

indexingAction
.projections(project, elem)
.translate(taskToIoK)
.fold(emptyAcc) {
case (acc, s: SuccessElem[_]) => acc.success(s.id)
case (acc, d: DroppedElem) => acc.drop(d.id)
case (acc, f: FailedElem) => acc.failed(f.id)
}
.compile
.lastOrError
.assert(expected)
.assertEquals(expected)
}

test("A valid elem should be indexed") {
indexingAction.apply(project, elem).assert(List.empty)
indexingAction.apply(project, elem).assertEquals(List.empty)
}

test("A failed elem should be returned") {
Expand All @@ -187,7 +190,7 @@ class BlazegraphIndexingActionSuite extends BioSuite with Fixtures {
rev = 1
)

indexingAction.apply(project, failed).assert(List(failed))
indexingAction.apply(project, failed).assertEquals(List(failed))
}

}
Expand Down
Loading