Skip to content

Commit

Permalink
Resetting default views now recreates the missing ones (#4679)
Browse files Browse the repository at this point in the history
Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas authored Jan 19, 2024
1 parent 84be090 commit 9fcb974
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.ServiceAccount
import ch.epfl.bluebrain.nexus.delta.sdk.model._
import ch.epfl.bluebrain.nexus.delta.sdk.model.metrics.ScopedEventMetricEncoder
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext
import ch.epfl.bluebrain.nexus.delta.sdk.projects.{FetchContext, Projects}
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext.ContextRejection
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ApiMappings
import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverContextResolution
Expand Down Expand Up @@ -118,12 +118,13 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef {
make[ElasticSearchDefaultViewsResetter].from {
(
client: ElasticSearchClient,
projects: Projects,
views: ElasticSearchViews,
scope: ElasticSearchScopeInitialization,
xas: Transactors,
serviceAccount: ServiceAccount
) =>
ElasticSearchDefaultViewsResetter(client, views, scope.defaultValue, xas)(serviceAccount.subject)
ElasticSearchDefaultViewsResetter(client, projects, views, scope.defaultValue, xas)(serviceAccount.subject)
}

make[ElasticSearchCoordinator].fromEffect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,49 @@ import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchViews
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchClient, IndexLabel}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.ElasticSearchDefaultViewsResetter.ViewElement
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.IndexingViewDef.{ActiveViewDef, DeprecatedViewDef}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{defaultViewId, ElasticSearchViewValue}
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.syntax.iriStringContextSyntax
import ch.epfl.bluebrain.nexus.delta.sdk.projects.Projects
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, SuccessElemStream}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import doobie.implicits._
import fs2.Stream

/** A way to reset default Elasticsearch views */
/**
* A way to reset default Elasticsearch views
*/
trait ElasticSearchDefaultViewsResetter {
def resetDefaultViews: IO[Unit]

private[elasticsearch] def resetView(view: IndexingViewDef): IO[Unit]
private[indexing] def resetView(view: ViewElement): IO[Unit]
}

object ElasticSearchDefaultViewsResetter {

private val logger = Logger[ElasticSearchDefaultViewsResetter]

sealed private[indexing] trait ViewElement {
def project: ProjectRef

def value: Option[IndexingViewDef]
}

private[indexing] object ViewElement {

final case class MissingView(project: ProjectRef) extends ViewElement {
override def value: Option[IndexingViewDef] = None
}
final case class ExistingView(indexing: IndexingViewDef) extends ViewElement {
override def project: ProjectRef = indexing.ref.project

override def value: Option[IndexingViewDef] = Some(indexing)
}

}

def resetTrigger: IO[Boolean] =
Env[IO].get("RESET_DEFAULT_ES_VIEWS").map(_.getOrElse("false").toBoolean)

Expand All @@ -39,12 +62,17 @@ object ElasticSearchDefaultViewsResetter {
*/
def apply(
client: ElasticSearchClient,
projects: Projects,
views: ElasticSearchViews,
newViewValue: ElasticSearchViewValue,
xas: Transactors
)(implicit subject: Subject): ElasticSearchDefaultViewsResetter =
apply(
views.currentIndexingViews,
projects.currentRefs.evalMap { project =>
views
.fetchIndexingView(defaultViewId, project)
.redeem(_ => ViewElement.MissingView(project), ViewElement.ExistingView(_))
},
client.deleteIndex,
views.internalCreate(_, _, _)(subject).void,
newViewValue,
Expand All @@ -53,7 +81,7 @@ object ElasticSearchDefaultViewsResetter {
)

def apply(
views: SuccessElemStream[IndexingViewDef],
views: Stream[IO, ViewElement],
deleteIndex: IndexLabel => IO[Boolean],
createView: (Iri, ProjectRef, ElasticSearchViewValue) => IO[Unit],
newViewValue: ElasticSearchViewValue,
Expand All @@ -64,41 +92,35 @@ object ElasticSearchDefaultViewsResetter {
override def resetDefaultViews: IO[Unit] =
resetTrigger.flatMap { triggered =>
IO.whenA(triggered) {
views
.filter(_.id == defaultEsViewId)
.compile
.toList
.flatMap { _.traverse { view => view.evalMap(resetView) } }
views.compile.toList
.flatMap { _.traverse { resetView } }
.flatMap { _ => logger.info("Completed resetting default elasticsearch views.") }
}
}

override def resetView(view: IndexingViewDef): IO[Unit] =
override def resetView(view: ViewElement): IO[Unit] =
deleteEsIndex(view) >>
deleteEventsStatesOffsets(view.ref.project).transact(xas.write) >>
createDefaultView(view.ref.project)

private val defaultEsViewId = iri"https://bluebrain.github.io/nexus/vocabulary/defaultElasticSearchIndex"
deleteEventsStatesOffsets(view.project).transact(xas.write) >>
createDefaultView(view.project)

private def deleteEsIndex(viewDef: IndexingViewDef) =
viewDef match {
private def deleteEsIndex(view: ViewElement) =
view.value.traverse {
case activeView: ActiveViewDef => deleteIndex(activeView.index)
case _: DeprecatedViewDef => IO.pure(true)
}
}.void

private def deleteEventsStatesOffsets(project: ProjectRef): doobie.ConnectionIO[Unit] =
sql"""
DELETE FROM scoped_events WHERE type = 'elasticsearch' AND id = ${defaultEsViewId.toString} AND org = ${project.organization} AND project = ${project.project};
DELETE FROM scoped_states WHERE type = 'elasticsearch' AND id = ${defaultEsViewId.toString} AND org = ${project.organization} AND project = ${project.project};
DELETE FROM projection_offsets WHERE module = 'elasticsearch' AND resource_id = ${defaultEsViewId.toString} AND project = $project;
DELETE FROM scoped_events WHERE type = 'elasticsearch' AND id = ${defaultViewId.toString} AND org = ${project.organization} AND project = ${project.project};
DELETE FROM scoped_states WHERE type = 'elasticsearch' AND id = ${defaultViewId.toString} AND org = ${project.organization} AND project = ${project.project};
DELETE FROM projection_offsets WHERE module = 'elasticsearch' AND resource_id = ${defaultViewId.toString} AND project = $project;
""".stripMargin.update.run.void

private def createDefaultView(project: ProjectRef): IO[Unit] =
createView(defaultViewId, project, newViewValue)
.flatMap(_ => logger.info(s"Created a new defaultElasticSearchView in project '$project'."))
.handleErrorWith(e => logger.error(s"Could not create view. Message: '${e.getMessage}'"))
.void

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,6 @@ class ElasticSearchCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixt
failed = 1
)

private val reset = new ElasticSearchDefaultViewsResetter {
override def resetDefaultViews: IO[Unit] = IO.unit
override private[elasticsearch] def resetView(view: IndexingViewDef): IO[Unit] = IO.unit
}.resetDefaultViews

test("Start the coordinator") {
for {
_ <- ElasticSearchCoordinator(
Expand All @@ -185,7 +180,7 @@ class ElasticSearchCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixt
(_: ActiveViewDef) => new NoopSink[Json],
(v: ActiveViewDef) => IO.delay(createdIndices.add(v.index)).void,
(v: ActiveViewDef) => IO.delay(deletedIndices.add(v.index)).void,
reset
IO.unit
)
_ <- sv.describe(ElasticSearchCoordinator.metadata.name)
.map(_.map(_.progress))
Expand Down
Loading

0 comments on commit 9fcb974

Please sign in to comment.