Skip to content

Commit

Permalink
Add a way to reset all default Elasticsearch views (#4531)
Browse files Browse the repository at this point in the history
  • Loading branch information
olivergrabinski authored Dec 1, 2023
1 parent 55cb64d commit 316b650
Show file tree
Hide file tree
Showing 8 changed files with 384 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import ch.epfl.bluebrain.nexus.delta.kernel.utils.{ClasspathResourceLoader, UUID
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.config.ElasticSearchViewsConfig
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.deletion.{ElasticSearchDeletionTask, EventMetricsDeletionTask}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.ElasticSearchCoordinator
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.{ElasticSearchCoordinator, ElasticSearchDefaultViewsResetter}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewRejection.ProjectContextRejection
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{contexts, schema => viewsSchemaId, ElasticSearchFiles, ElasticSearchView, ElasticSearchViewEvent}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.query.{DefaultViewsQuery, ElasticSearchQueryError}
Expand Down Expand Up @@ -115,6 +115,17 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef {
)(api, uuidF)
}

make[ElasticSearchDefaultViewsResetter].from {
(
client: ElasticSearchClient,
views: ElasticSearchViews,
scope: ElasticSearchScopeInitialization,
xas: Transactors,
serviceAccount: ServiceAccount
) =>
ElasticSearchDefaultViewsResetter(client, views, scope.defaultValue, xas)(serviceAccount.subject)
}

make[ElasticSearchCoordinator].fromEffect {
(
views: ElasticSearchViews,
Expand All @@ -123,15 +134,17 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef {
supervisor: Supervisor,
client: ElasticSearchClient,
config: ElasticSearchViewsConfig,
cr: RemoteContextResolution @Id("aggregate")
cr: RemoteContextResolution @Id("aggregate"),
resetter: ElasticSearchDefaultViewsResetter
) =>
ElasticSearchCoordinator(
views,
graphStream,
registry,
supervisor,
client,
config
config,
resetter
)(cr)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ElasticSearchScopeInitialization(
implicit private val serviceAccountSubject: Subject = serviceAccount.subject
implicit private val kamonComponent: KamonMetricComponent = KamonMetricComponent(entityType.value)

private lazy val defaultValue: IndexingElasticSearchViewValue =
lazy val defaultValue: IndexingElasticSearchViewValue =
IndexingElasticSearchViewValue(
name = Some(defaults.name),
description = Some(defaults.description),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,23 @@ final class ElasticSearchViews private (
} yield res
}.span("createElasticSearchView")

/**
* Creates a new ElasticSearchView without checking whether the provided project is deprecated or not.
*
* @param iri
* id of the view to be created
* @param project
* project in which the view has to be created
* @param value
* configuration of the view to be created
*/
private[elasticsearch] def internalCreate(
iri: Iri,
project: ProjectRef,
value: ElasticSearchViewValue
)(implicit subject: Subject): IO[ViewResource] =
eval(CreateElasticSearchView(iri, project, value, value.toJson(iri), subject))

/**
* Updates an existing ElasticSearchView.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ object ElasticSearchCoordinator {
registry: ReferenceRegistry,
supervisor: Supervisor,
client: ElasticSearchClient,
config: ElasticSearchViewsConfig
config: ElasticSearchViewsConfig,
resetter: ElasticSearchDefaultViewsResetter
)(implicit cr: RemoteContextResolution): IO[ElasticSearchCoordinator] = {
if (config.indexingEnabled) {
apply(
Expand All @@ -156,7 +157,8 @@ object ElasticSearchCoordinator {
logger.error(e)(s"Index for view '${v.ref.project}/${v.ref.viewId}' could not be created.")
}
.void,
(v: ActiveViewDef) => client.deleteIndex(v.index).void
(v: ActiveViewDef) => client.deleteIndex(v.index).void,
resetter.resetDefaultViews
)
} else {
Noop.log.as(Noop)
Expand All @@ -170,7 +172,8 @@ object ElasticSearchCoordinator {
supervisor: Supervisor,
sink: ActiveViewDef => Sink,
createIndex: ActiveViewDef => IO[Unit],
deleteIndex: ActiveViewDef => IO[Unit]
deleteIndex: ActiveViewDef => IO[Unit],
resetDefaultViews: IO[Unit]
)(implicit cr: RemoteContextResolution): IO[ElasticSearchCoordinator] =
for {
cache <- LocalCache[ViewRef, ActiveViewDef]()
Expand All @@ -188,7 +191,7 @@ object ElasticSearchCoordinator {
CompiledProjection.fromStream(
metadata,
ExecutionStrategy.EveryNode,
coordinator.run
offset => Stream.eval(resetDefaultViews) >> coordinator.run(offset)
)
)
} yield coordinator
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing

import cats.effect.IO
import cats.effect.std.Env
import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchViews
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchClient, IndexLabel}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.IndexingViewDef.{ActiveViewDef, DeprecatedViewDef}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{defaultViewId, ElasticSearchViewValue}
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.syntax.iriStringContextSyntax
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef}
import doobie.implicits._
import fs2.Stream

/** A way to reset default Elasticsearch views */
trait ElasticSearchDefaultViewsResetter {
def resetDefaultViews: IO[Unit]

private[elasticsearch] def resetView(view: IndexingViewDef): IO[Unit]
}

object ElasticSearchDefaultViewsResetter {

private val logger = Logger[ElasticSearchDefaultViewsResetter]

def resetTrigger: IO[Boolean] =
Env[IO].get("RESET_DEFAULT_ES_VIEWS").map(_.getOrElse("false").toBoolean)

/**
* Provides a resetter that for each default elasticsearch view:
*
* 1. Deletes the associated index in elasticsearch 2. Deletes all associated events, states, and projection
* offsets 3. Creates a new default elasticsearch view using the provided new view value
*
* Note that it will reset the default view even its project is deprecated.
*/
def apply(
client: ElasticSearchClient,
views: ElasticSearchViews,
newViewValue: ElasticSearchViewValue,
xas: Transactors
)(implicit subject: Subject): ElasticSearchDefaultViewsResetter =
apply(
views.currentIndexingViews,
client.deleteIndex,
views.internalCreate(_, _, _)(subject).void,
newViewValue,
resetTrigger,
xas
)

def apply(
views: ElemStream[IndexingViewDef],
deleteIndex: IndexLabel => IO[Boolean],
createView: (Iri, ProjectRef, ElasticSearchViewValue) => IO[Unit],
newViewValue: ElasticSearchViewValue,
resetTrigger: IO[Boolean],
xas: Transactors
): ElasticSearchDefaultViewsResetter =
new ElasticSearchDefaultViewsResetter {
override def resetDefaultViews: IO[Unit] =
resetTrigger.flatMap { triggered =>
IO.whenA(triggered) {
views
.filter(_.id == defaultEsViewId)
.flatTap { view => Stream.eval(view.evalMap(resetView)) }
.compile
.drain
}
}

override def resetView(view: IndexingViewDef): IO[Unit] =
deleteEsIndex(view) >>
deleteEventsStatesOffsets(view.ref.project).transact(xas.write) >>
createDefaultView(view.ref.project)

private val defaultEsViewId = iri"https://bluebrain.github.io/nexus/vocabulary/defaultElasticSearchIndex"

private def deleteEsIndex(viewDef: IndexingViewDef) =
viewDef match {
case activeView: ActiveViewDef => deleteIndex(activeView.index)
case _: DeprecatedViewDef => IO.pure(true)
}

private def deleteEventsStatesOffsets(project: ProjectRef): doobie.ConnectionIO[Unit] =
sql"""
DELETE FROM scoped_events WHERE type = 'elasticsearch' AND id = ${defaultEsViewId.toString} AND org = ${project.organization} AND project = ${project.project};
DELETE FROM scoped_states WHERE type = 'elasticsearch' AND id = ${defaultEsViewId.toString} AND org = ${project.organization} AND project = ${project.project};
DELETE FROM projection_offsets WHERE module = 'elasticsearch' AND resource_id = ${defaultEsViewId.toString} AND project = $project;
""".stripMargin.update.run.void

private def createDefaultView(project: ProjectRef): IO[Unit] =
createView(defaultViewId, project, newViewValue)
.flatMap(_ => logger.info(s"Created a new defaultElasticSearchView in project '$project'."))
.handleErrorWith(e => logger.error(s"Could not create view. Message: '${e.getMessage}'"))
.void

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ class ElasticSearchCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixt
failed = 1
)

private val reset = new ElasticSearchDefaultViewsResetter {
override def resetDefaultViews: IO[Unit] = IO.unit
override private[elasticsearch] def resetView(view: IndexingViewDef): IO[Unit] = IO.unit
}.resetDefaultViews

test("Start the coordinator") {
for {
_ <- ElasticSearchCoordinator(
Expand All @@ -196,7 +201,8 @@ class ElasticSearchCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixt
sv,
(_: ActiveViewDef) => new NoopSink[Json],
(v: ActiveViewDef) => IO.delay(createdIndices.add(v.index)).void,
(v: ActiveViewDef) => IO.delay(deletedIndices.add(v.index)).void
(v: ActiveViewDef) => IO.delay(deletedIndices.add(v.index)).void,
reset
)
_ <- sv.describe(ElasticSearchCoordinator.metadata.name)
.map(_.map(_.progress))
Expand Down
Loading

0 comments on commit 316b650

Please sign in to comment.