diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchPluginModule.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchPluginModule.scala index f6852d12e2..263d603111 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchPluginModule.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchPluginModule.scala @@ -28,7 +28,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.ServiceAccount import ch.epfl.bluebrain.nexus.delta.sdk.model._ import ch.epfl.bluebrain.nexus.delta.sdk.model.metrics.ScopedEventMetricEncoder import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions -import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext +import ch.epfl.bluebrain.nexus.delta.sdk.projects.{FetchContext, Projects} import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext.ContextRejection import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ApiMappings import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverContextResolution @@ -118,12 +118,13 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef { make[ElasticSearchDefaultViewsResetter].from { ( client: ElasticSearchClient, + projects: Projects, views: ElasticSearchViews, scope: ElasticSearchScopeInitialization, xas: Transactors, serviceAccount: ServiceAccount ) => - ElasticSearchDefaultViewsResetter(client, views, scope.defaultValue, xas)(serviceAccount.subject) + ElasticSearchDefaultViewsResetter(client, projects, views, scope.defaultValue, xas)(serviceAccount.subject) } make[ElasticSearchCoordinator].fromEffect { diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchDefaultViewsResetter.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchDefaultViewsResetter.scala index fc78676d5f..7d7e970f91 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchDefaultViewsResetter.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchDefaultViewsResetter.scala @@ -6,26 +6,49 @@ import cats.implicits._ import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchViews import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchClient, IndexLabel} +import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.ElasticSearchDefaultViewsResetter.ViewElement import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.IndexingViewDef.{ActiveViewDef, DeprecatedViewDef} import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{defaultViewId, ElasticSearchViewValue} import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri -import ch.epfl.bluebrain.nexus.delta.rdf.syntax.iriStringContextSyntax +import ch.epfl.bluebrain.nexus.delta.sdk.projects.Projects import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, SuccessElemStream} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import doobie.implicits._ +import fs2.Stream -/** A way to reset default Elasticsearch views */ +/** + * A way to reset default Elasticsearch views + */ trait ElasticSearchDefaultViewsResetter { def resetDefaultViews: IO[Unit] - private[elasticsearch] def resetView(view: IndexingViewDef): IO[Unit] + private[indexing] def resetView(view: ViewElement): IO[Unit] } object ElasticSearchDefaultViewsResetter { private val logger = Logger[ElasticSearchDefaultViewsResetter] + sealed private[indexing] trait ViewElement { + def project: ProjectRef + + def value: Option[IndexingViewDef] + } + + private[indexing] object ViewElement { + + final case class MissingView(project: ProjectRef) extends ViewElement { + override def value: Option[IndexingViewDef] = None + } + final case class ExistingView(indexing: IndexingViewDef) extends ViewElement { + override def project: ProjectRef = indexing.ref.project + + override def value: Option[IndexingViewDef] = Some(indexing) + } + + } + def resetTrigger: IO[Boolean] = Env[IO].get("RESET_DEFAULT_ES_VIEWS").map(_.getOrElse("false").toBoolean) @@ -39,12 +62,17 @@ object ElasticSearchDefaultViewsResetter { */ def apply( client: ElasticSearchClient, + projects: Projects, views: ElasticSearchViews, newViewValue: ElasticSearchViewValue, xas: Transactors )(implicit subject: Subject): ElasticSearchDefaultViewsResetter = apply( - views.currentIndexingViews, + projects.currentRefs.evalMap { project => + views + .fetchIndexingView(defaultViewId, project) + .redeem(_ => ViewElement.MissingView(project), ViewElement.ExistingView(_)) + }, client.deleteIndex, views.internalCreate(_, _, _)(subject).void, newViewValue, @@ -53,7 +81,7 @@ object ElasticSearchDefaultViewsResetter { ) def apply( - views: SuccessElemStream[IndexingViewDef], + views: Stream[IO, ViewElement], deleteIndex: IndexLabel => IO[Boolean], createView: (Iri, ProjectRef, ElasticSearchViewValue) => IO[Unit], newViewValue: ElasticSearchViewValue, @@ -64,33 +92,28 @@ object ElasticSearchDefaultViewsResetter { override def resetDefaultViews: IO[Unit] = resetTrigger.flatMap { triggered => IO.whenA(triggered) { - views - .filter(_.id == defaultEsViewId) - .compile - .toList - .flatMap { _.traverse { view => view.evalMap(resetView) } } + views.compile.toList + .flatMap { _.traverse { resetView } } .flatMap { _ => logger.info("Completed resetting default elasticsearch views.") } } } - override def resetView(view: IndexingViewDef): IO[Unit] = + override def resetView(view: ViewElement): IO[Unit] = deleteEsIndex(view) >> - deleteEventsStatesOffsets(view.ref.project).transact(xas.write) >> - createDefaultView(view.ref.project) - - private val defaultEsViewId = iri"https://bluebrain.github.io/nexus/vocabulary/defaultElasticSearchIndex" + deleteEventsStatesOffsets(view.project).transact(xas.write) >> + createDefaultView(view.project) - private def deleteEsIndex(viewDef: IndexingViewDef) = - viewDef match { + private def deleteEsIndex(view: ViewElement) = + view.value.traverse { case activeView: ActiveViewDef => deleteIndex(activeView.index) case _: DeprecatedViewDef => IO.pure(true) - } + }.void private def deleteEventsStatesOffsets(project: ProjectRef): doobie.ConnectionIO[Unit] = sql""" - DELETE FROM scoped_events WHERE type = 'elasticsearch' AND id = ${defaultEsViewId.toString} AND org = ${project.organization} AND project = ${project.project}; - DELETE FROM scoped_states WHERE type = 'elasticsearch' AND id = ${defaultEsViewId.toString} AND org = ${project.organization} AND project = ${project.project}; - DELETE FROM projection_offsets WHERE module = 'elasticsearch' AND resource_id = ${defaultEsViewId.toString} AND project = $project; + DELETE FROM scoped_events WHERE type = 'elasticsearch' AND id = ${defaultViewId.toString} AND org = ${project.organization} AND project = ${project.project}; + DELETE FROM scoped_states WHERE type = 'elasticsearch' AND id = ${defaultViewId.toString} AND org = ${project.organization} AND project = ${project.project}; + DELETE FROM projection_offsets WHERE module = 'elasticsearch' AND resource_id = ${defaultViewId.toString} AND project = $project; """.stripMargin.update.run.void private def createDefaultView(project: ProjectRef): IO[Unit] = @@ -98,7 +121,6 @@ object ElasticSearchDefaultViewsResetter { .flatMap(_ => logger.info(s"Created a new defaultElasticSearchView in project '$project'.")) .handleErrorWith(e => logger.error(s"Could not create view. Message: '${e.getMessage}'")) .void - } } diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchCoordinatorSuite.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchCoordinatorSuite.scala index b0abfb0359..3c889d5fb3 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchCoordinatorSuite.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchCoordinatorSuite.scala @@ -170,11 +170,6 @@ class ElasticSearchCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixt failed = 1 ) - private val reset = new ElasticSearchDefaultViewsResetter { - override def resetDefaultViews: IO[Unit] = IO.unit - override private[elasticsearch] def resetView(view: IndexingViewDef): IO[Unit] = IO.unit - }.resetDefaultViews - test("Start the coordinator") { for { _ <- ElasticSearchCoordinator( @@ -185,7 +180,7 @@ class ElasticSearchCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixt (_: ActiveViewDef) => new NoopSink[Json], (v: ActiveViewDef) => IO.delay(createdIndices.add(v.index)).void, (v: ActiveViewDef) => IO.delay(deletedIndices.add(v.index)).void, - reset + IO.unit ) _ <- sv.describe(ElasticSearchCoordinator.metadata.name) .map(_.map(_.progress)) diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchDefaultViewsResetterSuite.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchDefaultViewsResetterSuite.scala index e2ee4624ae..84fb1a878f 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchDefaultViewsResetterSuite.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchDefaultViewsResetterSuite.scala @@ -3,23 +3,22 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing import cats.effect.{IO, Ref} import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchClientSetup import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.IndexLabel +import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.ElasticSearchDefaultViewsResetter.ViewElement._ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.IndexingViewDef.ActiveViewDef import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewValue.IndexingElasticSearchViewValue import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{defaultViewId, permissions} +import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri +import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ import ch.epfl.bluebrain.nexus.delta.rdf.syntax.iriStringContextSyntax import ch.epfl.bluebrain.nexus.delta.sdk.views.{IndexingRev, ViewRef} -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef, SuccessElemStream} -import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite import doobie.implicits._ import fs2.Stream import munit.AnyFixture -import java.time.Instant - class ElasticSearchDefaultViewsResetterSuite extends NexusSuite with Doobie.Fixture @@ -28,13 +27,13 @@ class ElasticSearchDefaultViewsResetterSuite override def munitFixtures: Seq[AnyFixture[_]] = List(doobie) private lazy val xas = doobie() - private val defaultEsViewId = "https://bluebrain.github.io/nexus/vocabulary/defaultElasticSearchIndex" private val defaultIndexLabel = IndexLabel.unsafe("default") private val project = ProjectRef.unsafe("org", "proj") private val project2 = ProjectRef.unsafe("org", "proj2") + private val project3 = ProjectRef.unsafe("org", "proj3") - private val defaultView = ActiveViewDef( - ViewRef(project, iri"$defaultEsViewId"), + private val defaultView = ActiveViewDef( + ViewRef(project, defaultViewId), projection = "projection", None, SelectFilter.latest, @@ -45,79 +44,89 @@ class ElasticSearchDefaultViewsResetterSuite IndexingRev.init, 1 ) - private val customViewId = "https://other.id" + + private val customViewId = iri"https://other.id" private val customView = defaultView.copy(ref = ViewRef(project, iri"$customViewId")) + private val existingDefaultView = ExistingView(defaultView) + private val missingDefaultView = MissingView(project3) + private val existingCustomView = ExistingView(customView) + // TODO: Find how to move this to beforeAll test("Setup: partitions should be created") { initPartitions(xas, project, project2) } - test("The resetter should delete scoped events for the default view") { - insertViewEvent(defaultEsViewId, project).transact(xas.write) >> + test("The resetter should delete scoped events for an existing default view") { + insertViewEvent(defaultViewId, project) >> assertIO(eventsCount, 1) >> - resetWithNoViewCreation.resetView(defaultView) >> + resetWithNoViewCreation.resetView(existingDefaultView) >> assertIO(eventsCount, 0) } - test("The resetter should delete scoped states for the default view") { - insertViewState(defaultEsViewId, project).transact(xas.write) >> + test("The resetter should delete scoped states for an existing default view") { + insertViewState(defaultViewId, project) >> assertIO(statesCount, 1) >> - resetWithNoViewCreation.resetView(defaultView) >> + resetWithNoViewCreation.resetView(existingDefaultView) >> assertIO(statesCount, 0) } - test("The resetter should delete projection offsets for the default view") { - insertViewProjectionOffset(defaultEsViewId, project).transact(xas.write) >> + test("The resetter should delete projection offsets for an existing default view") { + insertViewProjectionOffset(defaultViewId, project) >> assertIO(projectionOffsetCount, 1) >> - resetWithNoViewCreation.resetView(defaultView) >> + resetWithNoViewCreation.resetView(existingDefaultView) >> assertIO(projectionOffsetCount, 0) } - test("The resetter should not delete a scoped event for a custom view") { - insertViewEvent(customViewId, project).transact(xas.write) >> + test("The resetter should not delete a scoped event for an existing custom view") { + insertViewEvent(customViewId, project) >> assertIO(eventsCount, 1) >> - resetWithNoViewCreation.resetView(customView) >> + resetWithNoViewCreation.resetView(existingCustomView) >> assertIO(eventsCount, 1) } - test("The resetter should not delete a scoped state for a custom view") { - insertViewState(customViewId, project).transact(xas.write) >> + test("The resetter should not delete a scoped state for an existing custom view") { + insertViewState(customViewId, project) >> assertIO(statesCount, 1) >> - resetWithNoViewCreation.resetView(customView) >> + resetWithNoViewCreation.resetView(existingCustomView) >> assertIO(statesCount, 1) } test("The resetter should not delete projection offsets for a custom view") { - insertViewProjectionOffset(customViewId, project).transact(xas.write) >> + insertViewProjectionOffset(customViewId, project) >> assertIO(projectionOffsetCount, 1) >> - resetWithNoViewCreation.resetView(customView) >> + resetWithNoViewCreation.resetView(existingCustomView) >> assertIO(projectionOffsetCount, 1) } test("The resetter should create a new view") { for { - createdViewRef <- Ref.of[IO, String]("start") - _ <- resetWithViewCreation(createdViewRef).resetView(defaultView) - _ <- assertIO(createdViewRef.get, defaultEsViewId) + createdViewRef <- Ref.of[IO, Set[ViewRef]](Set.empty[ViewRef]) + _ <- resetWithViewCreation(createdViewRef).resetView(existingDefaultView) + _ <- assertIO(createdViewRef.get, Set(defaultView.ref)) } yield () } test("The resetter should delete the default index") { for { indexDeletedRef <- Ref.of[IO, IndexLabel](IndexLabel.unsafe("some")) - _ <- resetWithIndexDeletion(indexDeletedRef).resetView(defaultView) + _ <- resetWithIndexDeletion(indexDeletedRef).resetView(existingDefaultView) _ <- assertIO(indexDeletedRef.get, defaultIndexLabel) } yield () } - test("The resetter should delete should handle all default views") { - clearDB.transact(xas.write) >> - insertViewEvent(defaultEsViewId, project).transact(xas.write) >> - insertViewEvent(defaultEsViewId, project2).transact(xas.write) >> - assertIO(eventsCount, 2) >> - resetWithNoViewCreation.resetDefaultViews >> - assertIO(eventsCount, 0) + test("The resetter should delete existing default views and recreate them including the missing one") { + for { + createdViewRef <- Ref.of[IO, Set[ViewRef]](Set.empty[ViewRef]) + _ <- clearDB + _ <- insertViewEvent(defaultViewId, project) + _ <- insertViewEvent(defaultViewId, project2) + _ <- assertIO(eventsCount, 2) + _ <- resetWithViewCreation(createdViewRef).resetDefaultViews + _ <- assertIO(eventsCount, 0) + expectedCreatedViews = Set(project, project2, project3).map(ViewRef(_, defaultViewId)) + _ <- assertIO(createdViewRef.get, expectedCreatedViews) + } yield () } private val defaultViewValue: IndexingElasticSearchViewValue = @@ -132,21 +141,12 @@ class ElasticSearchDefaultViewsResetterSuite permission = permissions.query ) - private val viewElem1 = Elem.SuccessElem( - tpe = EntityType("elasticsearch"), - id = defaultViewId, - project = Some(defaultView.ref.project), - instant = Instant.EPOCH, - offset = Offset.start, - value = defaultView, - rev = 1 - ) - private val viewElem2 = - viewElem1.copy(project = Some(project2), value = defaultView.copy(ref = ViewRef(project2, iri"$defaultEsViewId"))) - - val viewStream: SuccessElemStream[IndexingViewDef] = Stream(viewElem1, viewElem2) + private val viewElem1 = existingDefaultView + private val viewElem2 = ExistingView(defaultView.copy(ref = ViewRef(project2, defaultViewId))) + private val viewElem3 = missingDefaultView + private val viewStream = Stream(viewElem1, viewElem2, viewElem3) - private lazy val resetWithNoViewCreation = ElasticSearchDefaultViewsResetter( + private def resetWithNoViewCreation = ElasticSearchDefaultViewsResetter( viewStream, _ => IO(true), (_, _, _) => IO.unit, @@ -155,11 +155,11 @@ class ElasticSearchDefaultViewsResetterSuite xas ) - private def resetWithViewCreation(created: Ref[IO, String]) = + private def resetWithViewCreation(created: Ref[IO, Set[ViewRef]]) = ElasticSearchDefaultViewsResetter( viewStream, _ => IO(true), - (id, _, _) => created.set(id.toString) >> IO.unit, + (id, project, _) => created.update(_ + ViewRef(project, id)) >> IO.unit, defaultViewValue, IO(true), xas @@ -175,28 +175,28 @@ class ElasticSearchDefaultViewsResetterSuite xas ) - private def insertViewEvent(id: String, projectRef: ProjectRef) = + private def insertViewEvent(id: Iri, projectRef: ProjectRef) = sql""" INSERT INTO scoped_events (type, org, project, id, rev, value, instant) VALUES ('elasticsearch', ${projectRef.organization}, ${projectRef.project}, $id, 5, '{"nb": 1}', CURRENT_TIMESTAMP); - """.stripMargin.update.run + """.stripMargin.update.run.transact(xas.write) - private def insertViewState(id: String, projectRef: ProjectRef) = + private def insertViewState(id: Iri, projectRef: ProjectRef) = sql""" INSERT INTO scoped_states (type, org, project, id, tag, rev, value, deprecated, instant) VALUES ('elasticsearch', ${projectRef.organization}, ${projectRef.project}, $id, 'tag', 5, '{"nb": 1}', false, CURRENT_TIMESTAMP); - """.stripMargin.update.run + """.stripMargin.update.run.transact(xas.write) - private def insertViewProjectionOffset(id: String, projectRef: ProjectRef) = + private def insertViewProjectionOffset(id: Iri, projectRef: ProjectRef) = sql""" INSERT INTO projection_offsets (name, module, project, resource_id, ordering, processed, discarded, failed, created_at, updated_at) VALUES ('default', 'elasticsearch', $projectRef, $id, 123, 2, 1, 0, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP); - """.stripMargin.update.run + """.stripMargin.update.run.transact(xas.write) private def clearDB = sql""" DELETE FROM scoped_events; DELETE FROM scoped_states; DELETE FROM projection_offsets; - """.stripMargin.update.run + """.stripMargin.update.run.transact(xas.write) private def eventsCount = sql"""SELECT count(*) FROM scoped_events;""".stripMargin