From c7a8eb29cbfdb9654199d94679f245e3e4aabff8 Mon Sep 17 00:00:00 2001 From: Simon Dumas Date: Wed, 11 Oct 2023 15:42:15 +0200 Subject: [PATCH] Migrate indexing actions to Cats Effect --- .../nexus/delta/routes/ResolversRoutes.scala | 1 - .../nexus/delta/routes/ResourcesRoutes.scala | 21 ++-- .../nexus/delta/routes/SchemasRoutes.scala | 1 - .../nexus/delta/wiring/DeltaModule.scala | 5 +- .../nexus/delta/wiring/ResolversModule.scala | 5 +- .../nexus/delta/wiring/ResourcesModule.scala | 5 +- .../nexus/delta/wiring/SchemasModule.scala | 5 +- .../blazegraph/BlazegraphIndexingAction.scala | 5 +- .../blazegraph/BlazegraphPluginModule.scala | 5 +- .../routes/BlazegraphViewsRoutes.scala | 18 ++-- .../BlazegraphIndexingActionSuite.scala | 13 ++- .../ElasticSearchIndexingAction.scala | 13 +-- .../ElasticSearchPluginModule.scala | 10 +- .../routes/ElasticSearchViewsRoutes.scala | 15 ++- .../ElasticSearchIndexingActionSuite.scala | 13 ++- .../plugins/storage/StoragePluginModule.scala | 9 +- .../storage/files/routes/FilesRoutes.scala | 26 +++-- .../storages/routes/StoragesRoutes.scala | 19 ++-- .../nexus/delta/sdk/IndexingAction.scala | 100 ++++++++---------- .../delta/sourcing/config/BatchConfig.scala | 2 +- 20 files changed, 153 insertions(+), 138 deletions(-) diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/ResolversRoutes.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/ResolversRoutes.scala index 95826f8a29..7031e7f0a5 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/ResolversRoutes.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/ResolversRoutes.scala @@ -6,7 +6,6 @@ import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server._ import cats.effect.IO import cats.implicits._ -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.{contexts, schemas} import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution} import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/ResourcesRoutes.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/ResourcesRoutes.scala index f4c17f9785..db803992b7 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/ResourcesRoutes.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/ResourcesRoutes.scala @@ -10,6 +10,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering import ch.epfl.bluebrain.nexus.delta.routes.ResourcesRoutes.asSourceWithMetadata import ch.epfl.bluebrain.nexus.delta.sdk._ +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck import ch.epfl.bluebrain.nexus.delta.sdk.circe.CirceUnmarshalling import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives._ @@ -25,6 +26,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.resources.NexusSource.DecodingOption import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.ResourceRejection.{InvalidJsonLdFormat, InvalidSchemaRejection, ResourceNotFound} import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.{Resource, ResourceRejection} import ch.epfl.bluebrain.nexus.delta.sdk.resources.{NexusSource, Resources} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import io.circe.{Json, Printer} import monix.bio.IO import monix.execution.Scheduler @@ -67,6 +69,9 @@ final class ResourcesRoutes( implicit private def resourceFAJsonLdEncoder[A: JsonLdEncoder]: JsonLdEncoder[ResourceF[A]] = ResourceF.resourceFAJsonLdEncoder(ContextValue.empty) + private def indexUIO(project: ProjectRef, resource: ResourceF[Resource], mode: IndexingMode) = + index(project, resource, mode).toUIO + def routes: Route = baseUriPrefix(baseUri.prefix) { pathPrefix("resources") { @@ -79,7 +84,7 @@ final class ResourcesRoutes( authorizeFor(ref, Write).apply { emit( Created, - resources.create(ref, resourceSchema, source.value).tapEval(index(ref, _, mode)).map(_.void) + resources.create(ref, resourceSchema, source.value).tapEval(indexUIO(ref, _, mode)).map(_.void) ) } }, @@ -94,7 +99,7 @@ final class ResourcesRoutes( Created, resources .create(ref, schema, source.value) - .tapEval(index(ref, _, mode)) + .tapEval(indexUIO(ref, _, mode)) .map(_.void) .rejectWhen(wrongJsonOrNotFound) ) @@ -115,7 +120,7 @@ final class ResourcesRoutes( Created, resources .create(id, ref, schema, source.value) - .tapEval(index(ref, _, mode)) + .tapEval(indexUIO(ref, _, mode)) .map(_.void) .rejectWhen(wrongJsonOrNotFound) ) @@ -124,7 +129,7 @@ final class ResourcesRoutes( emit( resources .update(id, ref, schemaOpt, rev, source.value) - .tapEval(index(ref, _, mode)) + .tapEval(indexUIO(ref, _, mode)) .map(_.void) .rejectWhen(wrongJsonOrNotFound) ) @@ -137,7 +142,7 @@ final class ResourcesRoutes( emit( resources .deprecate(id, ref, schemaOpt, rev) - .tapEval(index(ref, _, mode)) + .tapEval(indexUIO(ref, _, mode)) .map(_.void) .rejectWhen(wrongJsonOrNotFound) ) @@ -166,7 +171,7 @@ final class ResourcesRoutes( OK, resources .refresh(id, ref, schemaOpt) - .tapEval(index(ref, _, mode)) + .tapEval(indexUIO(ref, _, mode)) .map(_.void) .rejectWhen(wrongJsonOrNotFound) ) @@ -214,7 +219,7 @@ final class ResourcesRoutes( Created, resources .tag(id, ref, schemaOpt, tag, tagRev, rev) - .tapEval(index(ref, _, mode)) + .tapEval(indexUIO(ref, _, mode)) .map(_.void) .rejectWhen(wrongJsonOrNotFound) ) @@ -229,7 +234,7 @@ final class ResourcesRoutes( emit( resources .deleteTag(id, ref, schemaOpt, tag, rev) - .tapEval(index(ref, _, mode)) + .tapEval(indexUIO(ref, _, mode)) .map(_.void) .rejectOn[ResourceNotFound] ) diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/SchemasRoutes.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/SchemasRoutes.scala index f93d41a7ee..1e908f1f0f 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/SchemasRoutes.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/SchemasRoutes.scala @@ -6,7 +6,6 @@ import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server._ import cats.effect.IO import cats.implicits._ -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.contexts import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.schemas.shacl import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution} diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/DeltaModule.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/DeltaModule.scala index e5d12a4401..8bfba3c365 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/DeltaModule.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/DeltaModule.scala @@ -73,8 +73,9 @@ class DeltaModule(appCfg: AppConfig, config: Config)(implicit classLoader: Class many[MetadataContextValue].addEffect(MetadataContextValue.fromFile("contexts/metadata.json")) - make[IndexingAction].named("aggregate").from { (internal: Set[IndexingAction]) => - AggregateIndexingAction(NonEmptyList.fromListUnsafe(internal.toList)) + make[AggregateIndexingAction].from { + (internal: Set[IndexingAction], contextShift: ContextShift[IO], cr: RemoteContextResolution @Id("aggregate")) => + AggregateIndexingAction(NonEmptyList.fromListUnsafe(internal.toList))(contextShift, cr) } make[RemoteContextResolution].named("aggregate").fromEffect { (otherCtxResolutions: Set[RemoteContextResolution]) => diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/ResolversModule.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/ResolversModule.scala index a9f0cf6a18..c34e912a59 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/ResolversModule.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/ResolversModule.scala @@ -9,6 +9,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution} import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering import ch.epfl.bluebrain.nexus.delta.routes.ResolversRoutes +import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction.AggregateIndexingAction import ch.epfl.bluebrain.nexus.delta.sdk._ import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaSchemeDirectives @@ -70,7 +71,7 @@ object ResolversModule extends ModuleDef { aclCheck: AclCheck, resolvers: Resolvers, schemeDirectives: DeltaSchemeDirectives, - indexingAction: IndexingAction @Id("aggregate"), + indexingAction: AggregateIndexingAction, shift: Resolver.Shift, multiResolution: MultiResolution, baseUri: BaseUri, @@ -84,7 +85,7 @@ object ResolversModule extends ModuleDef { resolvers, multiResolution, schemeDirectives, - indexingAction(_, _, _)(shift, cr) + indexingAction(_, _, _)(shift) )( baseUri, cr, diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/ResourcesModule.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/ResourcesModule.scala index d4ec23b9b8..4f158debde 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/ResourcesModule.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/ResourcesModule.scala @@ -8,6 +8,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering import ch.epfl.bluebrain.nexus.delta.routes.ResourcesRoutes +import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction.AggregateIndexingAction import ch.epfl.bluebrain.nexus.delta.sdk._ import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaSchemeDirectives @@ -81,7 +82,7 @@ object ResourcesModule extends ModuleDef { aclCheck: AclCheck, resources: Resources, schemeDirectives: DeltaSchemeDirectives, - indexingAction: IndexingAction @Id("aggregate"), + indexingAction: AggregateIndexingAction, shift: Resource.Shift, baseUri: BaseUri, s: Scheduler, @@ -95,7 +96,7 @@ object ResourcesModule extends ModuleDef { aclCheck, resources, schemeDirectives, - indexingAction(_, _, _)(shift, cr) + indexingAction(_, _, _)(shift) )( baseUri, s, diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/SchemasModule.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/SchemasModule.scala index 550cc08b2a..40f4ac41b0 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/SchemasModule.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/SchemasModule.scala @@ -9,6 +9,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution} import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering import ch.epfl.bluebrain.nexus.delta.routes.SchemasRoutes +import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction.AggregateIndexingAction import ch.epfl.bluebrain.nexus.delta.sdk._ import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaSchemeDirectives @@ -71,14 +72,14 @@ object SchemasModule extends ModuleDef { aclCheck: AclCheck, schemas: Schemas, schemeDirectives: DeltaSchemeDirectives, - indexingAction: IndexingAction @Id("aggregate"), + indexingAction: AggregateIndexingAction, shift: Schema.Shift, baseUri: BaseUri, cr: RemoteContextResolution @Id("aggregate"), ordering: JsonKeyOrdering, fusionConfig: FusionConfig ) => - new SchemasRoutes(identities, aclCheck, schemas, schemeDirectives, indexingAction(_, _, _)(shift, cr))( + new SchemasRoutes(identities, aclCheck, schemas, schemeDirectives, indexingAction(_, _, _)(shift))( baseUri, cr, ordering, diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphIndexingAction.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphIndexingAction.scala index 02606a1585..c5a864023c 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphIndexingAction.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphIndexingAction.scala @@ -3,7 +3,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.IndexingViewDef.{ActiveViewDef, DeprecatedViewDef} import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.{BlazegraphSink, IndexingViewDef} -import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri import ch.epfl.bluebrain.nexus.delta.sourcing.config.BatchConfig @@ -49,9 +48,7 @@ final class BlazegraphIndexingAction( case _: DeprecatedViewDef => UIO.none } - def projections(project: ProjectRef, elem: Elem[GraphResource])(implicit - cr: RemoteContextResolution - ): ElemStream[CompiledProjection] = + def projections(project: ProjectRef, elem: Elem[GraphResource]): ElemStream[CompiledProjection] = fetchCurrentViews(project).evalMap { _.evalMapFilter(compile(_, elem)) } } diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphPluginModule.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphPluginModule.scala index 15b85899d7..f37b107cf3 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphPluginModule.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphPluginModule.scala @@ -13,6 +13,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries.{BlazegraphS import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution} import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering +import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction.AggregateIndexingAction import ch.epfl.bluebrain.nexus.delta.sdk._ import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck import ch.epfl.bluebrain.nexus.delta.sdk.deletion.ProjectDeletionTask @@ -180,7 +181,7 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef { views: BlazegraphViews, viewsQuery: BlazegraphViewsQuery, schemeDirectives: DeltaSchemeDirectives, - indexingAction: IndexingAction @Id("aggregate"), + indexingAction: AggregateIndexingAction, shift: BlazegraphView.Shift, baseUri: BaseUri, cfg: BlazegraphViewsConfig, @@ -195,7 +196,7 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef { identities, aclCheck, schemeDirectives, - indexingAction(_, _, _)(shift, cr) + indexingAction(_, _, _)(shift) )( baseUri, s, diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsRoutes.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsRoutes.scala index dea871edd3..fce82669c3 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsRoutes.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsRoutes.scala @@ -4,6 +4,7 @@ import akka.http.scaladsl.model.StatusCodes.Created import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.{Directive0, Route} import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphView._ +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewRejection._ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model._ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.permissions.{read => Read, write => Write} @@ -13,7 +14,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteCon import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder import ch.epfl.bluebrain.nexus.delta.rdf.query.SparqlQuery import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering -import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction +import ch.epfl.bluebrain.nexus.delta.sdk.{IndexingAction, IndexingMode} import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck import ch.epfl.bluebrain.nexus.delta.sdk.circe.CirceUnmarshalling import ch.epfl.bluebrain.nexus.delta.sdk.directives.{AuthDirectives, DeltaDirectives, DeltaSchemeDirectives} @@ -25,7 +26,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.RdfMarshalling import ch.epfl.bluebrain.nexus.delta.sdk.model.routes.Tag import ch.epfl.bluebrain.nexus.delta.sdk.model.search.SearchResults._ import ch.epfl.bluebrain.nexus.delta.sdk.model.search.{PaginationConfig, SearchResults} -import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, IdSegment} +import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, IdSegment, ResourceF} import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import io.circe.Json import monix.execution.Scheduler @@ -66,6 +67,9 @@ class BlazegraphViewsRoutes( import schemeDirectives._ + private def indexUIO(project: ProjectRef, resource: ResourceF[BlazegraphView], mode: IndexingMode) = + index(project, resource, mode).toUIO + def routes: Route = concat( pathPrefix("views") { @@ -79,7 +83,7 @@ class BlazegraphViewsRoutes( Created, views .create(ref, source) - .tapEval(index(ref, _, mode)) + .tapEval(indexUIO(ref, _, mode)) .mapValue(_.metadata) .rejectWhen(decodingFailedOrViewNotFound) ) @@ -98,7 +102,7 @@ class BlazegraphViewsRoutes( Created, views .create(id, ref, source) - .tapEval(index(ref, _, mode)) + .tapEval(indexUIO(ref, _, mode)) .mapValue(_.metadata) .rejectWhen(decodingFailedOrViewNotFound) ) @@ -107,7 +111,7 @@ class BlazegraphViewsRoutes( emit( views .update(id, ref, rev, source) - .tapEval(index(ref, _, mode)) + .tapEval(indexUIO(ref, _, mode)) .mapValue(_.metadata) .rejectWhen(decodingFailedOrViewNotFound) ) @@ -120,7 +124,7 @@ class BlazegraphViewsRoutes( emit( views .deprecate(id, ref, rev) - .tapEval(index(ref, _, mode)) + .tapEval(indexUIO(ref, _, mode)) .mapValue(_.metadata) .rejectOn[ViewNotFound] ) @@ -163,7 +167,7 @@ class BlazegraphViewsRoutes( Created, views .tag(id, ref, tag, tagRev, rev) - .tapEval(index(ref, _, mode)) + .tapEval(indexUIO(ref, _, mode)) .mapValue(_.metadata) .rejectOn[ViewNotFound] ) diff --git a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphIndexingActionSuite.scala b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphIndexingActionSuite.scala index fcb89af841..f5c89610bf 100644 --- a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphIndexingActionSuite.scala +++ b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphIndexingActionSuite.scala @@ -20,13 +20,15 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem} import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionErr.CouldNotFindPipeErr import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{NoopSink, PipeChain, PipeRef} -import ch.epfl.bluebrain.nexus.testkit.bio.{BioSuite, PatienceConfig} +import ch.epfl.bluebrain.nexus.testkit.bio.PatienceConfig +import ch.epfl.bluebrain.nexus.testkit.ce.CatsEffectSuite +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import fs2.Stream import java.time.Instant import scala.concurrent.duration._ -class BlazegraphIndexingActionSuite extends BioSuite with Fixtures { +class BlazegraphIndexingActionSuite extends CatsEffectSuite with Fixtures { implicit private val patienceConfig: PatienceConfig = PatienceConfig(5.seconds, 10.millis) @@ -162,6 +164,7 @@ class BlazegraphIndexingActionSuite extends BioSuite with Fixtures { indexingAction .projections(project, elem) + .translate(taskToIoK) .fold(emptyAcc) { case (acc, s: SuccessElem[_]) => acc.success(s.id) case (acc, d: DroppedElem) => acc.drop(d.id) @@ -169,11 +172,11 @@ class BlazegraphIndexingActionSuite extends BioSuite with Fixtures { } .compile .lastOrError - .assert(expected) + .assertEquals(expected) } test("A valid elem should be indexed") { - indexingAction.apply(project, elem).assert(List.empty) + indexingAction.apply(project, elem).assertEquals(List.empty) } test("A failed elem should be returned") { @@ -187,7 +190,7 @@ class BlazegraphIndexingActionSuite extends BioSuite with Fixtures { rev = 1 ) - indexingAction.apply(project, failed).assert(List(failed)) + indexingAction.apply(project, failed).assertEquals(List(failed)) } } diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchIndexingAction.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchIndexingAction.scala index 2d041aa4c3..d9b9649cd4 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchIndexingAction.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchIndexingAction.scala @@ -32,11 +32,10 @@ final class ElasticSearchIndexingAction( compilePipeChain: PipeChain => Either[ProjectionErr, Operation], sink: ActiveViewDef => Sink, override val timeout: FiniteDuration -) extends IndexingAction { +)(implicit cr: RemoteContextResolution) + extends IndexingAction { - private def compile(view: IndexingViewDef, elem: Elem[GraphResource])(implicit - cr: RemoteContextResolution - ): Task[Option[CompiledProjection]] = view match { + private def compile(view: IndexingViewDef, elem: Elem[GraphResource]): Task[Option[CompiledProjection]] = view match { // Synchronous indexing only applies to views that index the latest version case active: ActiveViewDef if active.selectFilter.tag == Tag.latest => IndexingViewDef @@ -51,9 +50,7 @@ final class ElasticSearchIndexingAction( case _: DeprecatedViewDef => UIO.none } - def projections(project: ProjectRef, elem: Elem[GraphResource])(implicit - cr: RemoteContextResolution - ): ElemStream[CompiledProjection] = + def projections(project: ProjectRef, elem: Elem[GraphResource]): ElemStream[CompiledProjection] = fetchCurrentViews(project).evalMap { _.evalMapFilter(compile(_, elem)) } } object ElasticSearchIndexingAction { @@ -64,7 +61,7 @@ object ElasticSearchIndexingAction { client: ElasticSearchClient, timeout: FiniteDuration, syncIndexingRefresh: Refresh - ): ElasticSearchIndexingAction = { + )(implicit cr: RemoteContextResolution): ElasticSearchIndexingAction = { val batchConfig = BatchConfig.individual new ElasticSearchIndexingAction( views.currentIndexingViews, diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchPluginModule.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchPluginModule.scala index a7de7a4604..f76e602bc4 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchPluginModule.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchPluginModule.scala @@ -16,6 +16,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.ContextValue.ContextObject import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution} import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering +import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction.AggregateIndexingAction import ch.epfl.bluebrain.nexus.delta.sdk._ import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck import ch.epfl.bluebrain.nexus.delta.sdk.deletion.ProjectDeletionTask @@ -171,7 +172,7 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef { aclCheck: AclCheck, views: ElasticSearchViews, schemeDirectives: DeltaSchemeDirectives, - indexingAction: IndexingAction @Id("aggregate"), + indexingAction: AggregateIndexingAction, viewsQuery: ElasticSearchViewsQuery, shift: ElasticSearchView.Shift, baseUri: BaseUri, @@ -186,7 +187,7 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef { views, viewsQuery, schemeDirectives, - indexingAction(_, _, _)(shift, cr) + indexingAction(_, _, _)(shift) )( baseUri, s, @@ -371,9 +372,10 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef { views: ElasticSearchViews, registry: ReferenceRegistry, client: ElasticSearchClient, - config: ElasticSearchViewsConfig + config: ElasticSearchViewsConfig, + cr: RemoteContextResolution @Id("aggregate") ) => - ElasticSearchIndexingAction(views, registry, client, config.syncIndexingTimeout, config.syncIndexingRefresh) + ElasticSearchIndexingAction(views, registry, client, config.syncIndexingTimeout, config.syncIndexingRefresh)(cr) } make[ElasticSearchView.Shift].fromEffect { (views: ElasticSearchViews, base: BaseUri) => diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchViewsRoutes.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchViewsRoutes.scala index ab71f1fc9d..65cae6df62 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchViewsRoutes.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchViewsRoutes.scala @@ -3,6 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.routes import akka.http.scaladsl.model.StatusCodes.Created import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server._ +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewRejection._ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model._ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.permissions.{read => Read, write => Write} @@ -20,6 +21,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.implicits._ import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.RdfMarshalling import ch.epfl.bluebrain.nexus.delta.sdk.model._ import ch.epfl.bluebrain.nexus.delta.sdk.model.routes.Tag +import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import io.circe.{Json, JsonObject} import monix.execution.Scheduler @@ -59,6 +61,9 @@ final class ElasticSearchViewsRoutes( import schemeDirectives._ + private def indexUIO(project: ProjectRef, resource: ResourceF[ElasticSearchView], mode: IndexingMode) = + index(project, resource, mode).toUIO + def routes: Route = pathPrefix("views") { extractCaller { implicit caller => @@ -72,7 +77,7 @@ final class ElasticSearchViewsRoutes( Created, views .create(ref, source) - .tapEval(index(ref, _, mode)) + .tapEval(indexUIO(ref, _, mode)) .mapValue(_.metadata) .rejectWhen(decodingFailedOrViewNotFound) ) @@ -93,7 +98,7 @@ final class ElasticSearchViewsRoutes( Created, views .create(id, ref, source) - .tapEval(index(ref, _, mode)) + .tapEval(indexUIO(ref, _, mode)) .mapValue(_.metadata) .rejectWhen(decodingFailedOrViewNotFound) ) @@ -102,7 +107,7 @@ final class ElasticSearchViewsRoutes( emit( views .update(id, ref, rev, source) - .tapEval(index(ref, _, mode)) + .tapEval(indexUIO(ref, _, mode)) .mapValue(_.metadata) .rejectWhen(decodingFailedOrViewNotFound) ) @@ -115,7 +120,7 @@ final class ElasticSearchViewsRoutes( emit( views .deprecate(id, ref, rev) - .tapEval(index(ref, _, mode)) + .tapEval(indexUIO(ref, _, mode)) .mapValue(_.metadata) .rejectWhen(decodingFailedOrViewNotFound) ) @@ -159,7 +164,7 @@ final class ElasticSearchViewsRoutes( Created, views .tag(id, ref, tag, tagRev, rev) - .tapEval(index(ref, _, mode)) + .tapEval(indexUIO(ref, _, mode)) .mapValue(_.metadata) .rejectWhen(decodingFailedOrViewNotFound) ) diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchIndexingActionSuite.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchIndexingActionSuite.scala index ca58fabf18..a28120c191 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchIndexingActionSuite.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchIndexingActionSuite.scala @@ -14,6 +14,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestState import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestState.PullRequestActive import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Anonymous import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter @@ -21,14 +22,15 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedEl import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionErr.CouldNotFindPipeErr import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{NoopSink, PipeChain, PipeRef} import ch.epfl.bluebrain.nexus.testkit.CirceLiteral -import ch.epfl.bluebrain.nexus.testkit.bio.{BioSuite, PatienceConfig} +import ch.epfl.bluebrain.nexus.testkit.bio.PatienceConfig +import ch.epfl.bluebrain.nexus.testkit.ce.CatsEffectSuite import fs2.Stream import io.circe.Json import java.time.Instant import scala.concurrent.duration._ -class ElasticSearchIndexingActionSuite extends BioSuite with CirceLiteral with Fixtures { +class ElasticSearchIndexingActionSuite extends CatsEffectSuite with CirceLiteral with Fixtures { implicit private val patienceConfig: PatienceConfig = PatienceConfig(5.seconds, 10.millis) @@ -174,6 +176,7 @@ class ElasticSearchIndexingActionSuite extends BioSuite with CirceLiteral with F indexingAction .projections(project, elem) + .translate(taskToIoK) .fold(emptyAcc) { case (acc, s: SuccessElem[_]) => acc.success(s.id) case (acc, d: DroppedElem) => acc.drop(d.id) @@ -181,11 +184,11 @@ class ElasticSearchIndexingActionSuite extends BioSuite with CirceLiteral with F } .compile .lastOrError - .assert(expected) + .assertEquals(expected) } test("A valid elem should be indexed") { - indexingAction.apply(project, elem).assert(List.empty) + indexingAction.apply(project, elem).assertEquals(List.empty) } test("A failed elem should be returned") { @@ -199,7 +202,7 @@ class ElasticSearchIndexingActionSuite extends BioSuite with CirceLiteral with F rev = 1 ) - indexingAction.apply(project, failed).assert(List(failed)) + indexingAction.apply(project, failed).assertEquals(List(failed)) } } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/StoragePluginModule.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/StoragePluginModule.scala index 7e98d6daa5..24374f0941 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/StoragePluginModule.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/StoragePluginModule.scala @@ -22,6 +22,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.{StorageDeletionTa import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution} import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering +import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction.AggregateIndexingAction import ch.epfl.bluebrain.nexus.delta.sdk._ import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck import ch.epfl.bluebrain.nexus.delta.sdk.auth.{AuthTokenProvider, Credentials} @@ -118,7 +119,7 @@ class StoragePluginModule(priority: Int) extends ModuleDef { storages: Storages, storagesStatistics: StoragesStatistics, schemeDirectives: DeltaSchemeDirectives, - indexingAction: IndexingAction @Id("aggregate"), + indexingAction: AggregateIndexingAction, shift: Storage.Shift, baseUri: BaseUri, s: Scheduler, @@ -133,7 +134,7 @@ class StoragePluginModule(priority: Int) extends ModuleDef { storages, storagesStatistics, schemeDirectives, - indexingAction(_, _, _)(shift, cr) + indexingAction(_, _, _)(shift) )( baseUri, s, @@ -197,7 +198,7 @@ class StoragePluginModule(priority: Int) extends ModuleDef { aclCheck: AclCheck, files: Files, schemeDirectives: DeltaSchemeDirectives, - indexingAction: IndexingAction @Id("aggregate"), + indexingAction: AggregateIndexingAction, shift: File.Shift, baseUri: BaseUri, s: Scheduler, @@ -206,7 +207,7 @@ class StoragePluginModule(priority: Int) extends ModuleDef { fusionConfig: FusionConfig ) => val storageConfig = cfg.storages.storageTypeConfig - new FilesRoutes(identities, aclCheck, files, schemeDirectives, indexingAction(_, _, _)(shift, cr))( + new FilesRoutes(identities, aclCheck, files, schemeDirectives, indexingAction(_, _, _)(shift))( baseUri, storageConfig, s, diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/routes/FilesRoutes.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/routes/FilesRoutes.scala index d41f74e3cb..8f6ab8b69c 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/routes/FilesRoutes.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/routes/FilesRoutes.scala @@ -6,6 +6,7 @@ import akka.http.scaladsl.model.headers.Accept import akka.http.scaladsl.model.{ContentType, MediaRange} import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server._ +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{File, FileRejection} import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection._ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.permissions.{read => Read, write => Write} @@ -24,7 +25,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller import ch.epfl.bluebrain.nexus.delta.sdk.implicits._ import ch.epfl.bluebrain.nexus.delta.sdk.model.routes.Tag -import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, IdSegment, IdSegmentRef} +import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, IdSegment, IdSegmentRef, ResourceF} import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import io.circe.Decoder import io.circe.generic.extras.Configuration @@ -68,6 +69,9 @@ final class FilesRoutes( import baseUri.prefixSegment import schemeDirectives._ + private def indexUIO(project: ProjectRef, resource: ResourceF[File], mode: IndexingMode) = + index(project, resource, mode).toUIO + def routes: Route = (baseUriPrefix(baseUri.prefix) & replaceUri("files", schemas.files)) { pathPrefix("files") { @@ -83,12 +87,12 @@ final class FilesRoutes( entity(as[LinkFile]) { case LinkFile(filename, mediaType, path) => emit( Created, - files.createLink(storage, ref, filename, mediaType, path).tapEval(index(ref, _, mode)) + files.createLink(storage, ref, filename, mediaType, path).tapEval(indexUIO(ref, _, mode)) ) }, // Create a file without id segment extractRequestEntity { entity => - emit(Created, files.create(storage, ref, entity).tapEval(index(ref, _, mode))) + emit(Created, files.create(storage, ref, entity).tapEval(indexUIO(ref, _, mode))) } ) } @@ -108,12 +112,12 @@ final class FilesRoutes( Created, files .createLink(id, storage, ref, filename, mediaType, path) - .tapEval(index(ref, _, mode)) + .tapEval(indexUIO(ref, _, mode)) ) }, // Create a file with id segment extractRequestEntity { entity => - emit(Created, files.create(id, storage, ref, entity).tapEval(index(ref, _, mode))) + emit(Created, files.create(id, storage, ref, entity).tapEval(indexUIO(ref, _, mode))) } ) case (Some(rev), storage) => @@ -123,12 +127,12 @@ final class FilesRoutes( emit( files .updateLink(id, storage, ref, filename, mediaType, path, rev) - .tapEval(index(ref, _, mode)) + .tapEval(indexUIO(ref, _, mode)) ) }, // Update a file extractRequestEntity { entity => - emit(files.update(id, storage, ref, rev, entity).tapEval(index(ref, _, mode))) + emit(files.update(id, storage, ref, rev, entity).tapEval(indexUIO(ref, _, mode))) } ) } @@ -136,7 +140,7 @@ final class FilesRoutes( // Deprecate a file (delete & parameter("rev".as[Int])) { rev => authorizeFor(ref, Write).apply { - emit(files.deprecate(id, ref, rev).tapEval(index(ref, _, mode)).rejectOn[FileNotFound]) + emit(files.deprecate(id, ref, rev).tapEval(indexUIO(ref, _, mode)).rejectOn[FileNotFound]) } }, // Fetch a file @@ -161,7 +165,7 @@ final class FilesRoutes( (post & parameter("rev".as[Int]) & pathEndOrSingleSlash) { rev => authorizeFor(ref, Write).apply { entity(as[Tag]) { case Tag(tagRev, tag) => - emit(Created, files.tag(id, ref, tag, tagRev, rev).tapEval(index(ref, _, mode))) + emit(Created, files.tag(id, ref, tag, tagRev, rev).tapEval(indexUIO(ref, _, mode))) } } }, @@ -170,7 +174,9 @@ final class FilesRoutes( ref, Write )) { (tag, rev) => - emit(files.deleteTag(id, ref, tag, rev).tapEval(index(ref, _, mode)).rejectOn[FileNotFound]) + emit( + files.deleteTag(id, ref, tag, rev).tapEval(indexUIO(ref, _, mode)).rejectOn[FileNotFound] + ) } ) } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/routes/StoragesRoutes.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/routes/StoragesRoutes.scala index 15e275efd8..3f96759601 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/routes/StoragesRoutes.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/routes/StoragesRoutes.scala @@ -2,6 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.routes import akka.http.scaladsl.model.StatusCodes.Created import akka.http.scaladsl.server.Directives._ +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import akka.http.scaladsl.server._ import cats.implicits._ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages._ @@ -10,7 +11,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.{Storage, St import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.permissions.{read => Read, write => Write} import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering -import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction +import ch.epfl.bluebrain.nexus.delta.sdk.{IndexingAction, IndexingMode} import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck import ch.epfl.bluebrain.nexus.delta.sdk.circe.CirceUnmarshalling import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives._ @@ -19,8 +20,9 @@ import ch.epfl.bluebrain.nexus.delta.sdk.fusion.FusionConfig import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities import ch.epfl.bluebrain.nexus.delta.sdk.implicits._ import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.RdfMarshalling -import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri +import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, ResourceF} import ch.epfl.bluebrain.nexus.delta.sdk.model.routes.Tag +import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import io.circe.Json import kamon.instrumentation.akka.http.TracingDirectives.operationName import monix.execution.Scheduler @@ -59,6 +61,9 @@ final class StoragesRoutes( import baseUri.prefixSegment import schemeDirectives._ + private def indexUIO(project: ProjectRef, resource: ResourceF[Storage], mode: IndexingMode) = + index(project, resource, mode).toUIO + def routes: Route = (baseUriPrefix(baseUri.prefix) & replaceUri("storages", schemas.storage)) { pathPrefix("storages") { @@ -71,7 +76,7 @@ final class StoragesRoutes( authorizeFor(ref, Write).apply { emit( Created, - storages.create(ref, source).tapEval(index(ref, _, mode)).mapValue(_.metadata) + storages.create(ref, source).tapEval(indexUIO(ref, _, mode)).mapValue(_.metadata) ) } } @@ -91,7 +96,7 @@ final class StoragesRoutes( Created, storages .create(id, ref, source) - .tapEval(index(ref, _, mode)) + .tapEval(indexUIO(ref, _, mode)) .mapValue(_.metadata) ) case (Some(rev), source) => @@ -99,7 +104,7 @@ final class StoragesRoutes( emit( storages .update(id, ref, rev, source) - .tapEval(index(ref, _, mode)) + .tapEval(indexUIO(ref, _, mode)) .mapValue(_.metadata) ) } @@ -111,7 +116,7 @@ final class StoragesRoutes( emit( storages .deprecate(id, ref, rev) - .tapEval(index(ref, _, mode)) + .tapEval(indexUIO(ref, _, mode)) .mapValue(_.metadata) .rejectOn[StorageNotFound] ) @@ -162,7 +167,7 @@ final class StoragesRoutes( Created, storages .tag(id, ref, tag, tagRev, rev) - .tapEval(index(ref, _, mode)) + .tapEval(indexUIO(ref, _, mode)) .mapValue(_.metadata) ) } diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/IndexingAction.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/IndexingAction.scala index bf74cc9da8..2682ad24b2 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/IndexingAction.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/IndexingAction.scala @@ -2,7 +2,10 @@ package ch.epfl.bluebrain.nexus.delta.sdk import cats.data.NonEmptyList import cats.effect.concurrent.Ref +import cats.effect.{ContextShift, IO} import cats.implicits._ +import ch.epfl.bluebrain.nexus.delta.kernel.Logger +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction.logger import ch.epfl.bluebrain.nexus.delta.sdk.IndexingMode.{Async, Sync} @@ -13,9 +16,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem} import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{CompiledProjection, Elem, Projection} -import com.typesafe.scalalogging.Logger -import monix.bio.{IO, Task, UIO} -import fs2.Stream +import monix.bio.UIO import scala.concurrent.duration._ @@ -36,62 +37,33 @@ trait IndexingAction { * @param elem * the element to index */ - def projections(project: ProjectRef, elem: Elem[GraphResource])(implicit - cr: RemoteContextResolution - ): ElemStream[CompiledProjection] - - /** - * Perform an indexing action based on the indexing parameter. - * - * @param project - * the project in which the resource is located - * @param res - * the resource to perform the indexing action for - * @param indexingMode - * the execution type - */ - def apply[A](project: ProjectRef, res: ResourceF[A], indexingMode: IndexingMode)(implicit - shift: ResourceShift[_, A, _], - cr: RemoteContextResolution - ): UIO[Unit] = { - indexingMode match { - case Async => UIO.unit - case Sync => - for { - _ <- UIO.delay(logger.debug("Synchronous indexing of resource '{}/{}' has been requested.", project, res.id)) - // We create the GraphResource wrapped in an `Elem` - elem <- shift.toGraphResourceElem(project, res) - errors <- apply(project, elem) - _ <- IO.raiseWhen(errors.nonEmpty)(IndexingFailed(res.void, errors.map(_.throwable))) - } yield () - } - }.hideErrors + def projections(project: ProjectRef, elem: Elem[GraphResource]): ElemStream[CompiledProjection] def apply(project: ProjectRef, elem: Elem[GraphResource])(implicit - cr: RemoteContextResolution - ): Task[List[FailedElem]] = { + contextShift: ContextShift[IO] + ): IO[List[FailedElem]] = { for { // To collect the errors - errorsRef <- Ref.of[Task, List[FailedElem]](List.empty) + errorsRef <- Ref.of[IO, List[FailedElem]](List.empty) // We build and start the projections where the resource will apply _ <- projections(project, elem) + .translate(taskToIoK) // TODO make this configurable .parEvalMap(5) { case s: SuccessElem[CompiledProjection] => - runProjection(s.value, failed => errorsRef.update(_ ++ failed).hideErrors) - case _: DroppedElem => UIO.unit - case f: FailedElem => UIO.delay(logger.error(s"Fetching '$f' returned an error.", f.throwable)).as(None) + runProjection(s.value, failed => errorsRef.update(_ ++ failed)) + case _: DroppedElem => IO.unit + case f: FailedElem => logger.error(f.throwable)(s"Fetching '$f' returned an error.").as(None) } .compile .toList - .hideErrors - errors <- errorsRef.get.hideErrors + errors <- errorsRef.get } yield errors } - private def runProjection(compiled: CompiledProjection, saveFailedElems: List[FailedElem] => UIO[Unit]) = + private def runProjection(compiled: CompiledProjection, saveFailedElems: List[FailedElem] => IO[Unit]) = for { - projection <- Projection(compiled, UIO.none, _ => UIO.unit, saveFailedElems) + projection <- Projection(compiled, UIO.none, _ => UIO.unit, saveFailedElems(_).toUIO) _ <- projection.waitForCompletion(timeout) // We stop the projection if it has not complete yet _ <- projection.stop() @@ -100,32 +72,44 @@ trait IndexingAction { object IndexingAction { - type Execute[A] = (ProjectRef, ResourceF[A], IndexingMode) => UIO[Unit] + type Execute[A] = (ProjectRef, ResourceF[A], IndexingMode) => IO[Unit] /** * Does not perform any action */ - def noop[A]: Execute[A] = (_, _, _) => UIO.unit - - private val logger: Logger = Logger[IndexingAction] + def noop[A]: Execute[A] = (_, _, _) => IO.unit - private val noProjection: ElemStream[CompiledProjection] = Stream.empty + private val logger = Logger.cats[IndexingAction] /** * An instance of [[IndexingAction]] which executes other [[IndexingAction]] s in parallel. */ - final class AggregateIndexingAction(private val internal: NonEmptyList[IndexingAction]) extends IndexingAction { - - // We pick the maximum timeout of all - override val timeout: FiniteDuration = internal.maximumBy(_.timeout).timeout - - override def projections(project: ProjectRef, elem: Elem[GraphResource])(implicit - cr: RemoteContextResolution - ): ElemStream[CompiledProjection] = - internal.foldLeft(noProjection) { case (acc, action) => acc.merge(action.projections(project, elem)) } + final class AggregateIndexingAction(private val internal: NonEmptyList[IndexingAction])(implicit + contextShift: ContextShift[IO], + cr: RemoteContextResolution + ) { + + def apply[A](project: ProjectRef, res: ResourceF[A], indexingMode: IndexingMode)(implicit + shift: ResourceShift[_, A, _] + ): IO[Unit] = + indexingMode match { + case Async => IO.unit + case Sync => + for { + _ <- logger.debug(s"Synchronous indexing of resource '$project/${res.id}' has been requested.") + // We create the GraphResource wrapped in an `Elem` + elem <- toCatsIO(shift.toGraphResourceElem(project, res)) + errorsPerAction <- internal.traverse(_.apply(project, elem)) + errors = errorsPerAction.toList.flatMap(_.map(_.throwable)) + _ <- IO.raiseWhen(errors.nonEmpty)(IndexingFailed(res.void, errors)) + } yield () + } } object AggregateIndexingAction { - def apply(internal: NonEmptyList[IndexingAction]): AggregateIndexingAction = new AggregateIndexingAction(internal) + def apply( + internal: NonEmptyList[IndexingAction] + )(implicit contextShift: ContextShift[IO], cr: RemoteContextResolution): AggregateIndexingAction = + new AggregateIndexingAction(internal) } } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/BatchConfig.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/BatchConfig.scala index 33a7f73422..63eb1569d1 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/BatchConfig.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/BatchConfig.scala @@ -17,7 +17,7 @@ final case class BatchConfig(maxElements: Int, maxInterval: FiniteDuration) object BatchConfig { - val individual = BatchConfig(1, 5.millis) + val individual = BatchConfig(1, 200.millis) implicit final val batchConfigReader: ConfigReader[BatchConfig] = deriveReader[BatchConfig]