Skip to content

Commit

Permalink
Revert "Ignore search views during import (#5060)" (#5067)
Browse files Browse the repository at this point in the history
This reverts commit 94b1423.

Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas authored Jul 17, 2024
1 parent 6a9272b commit cdd352e
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.sourcing.event.Event.ScopedEvent
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.RowEvent
import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType
import ch.epfl.bluebrain.nexus.ship.EventProcessor.{defaultViewIds, logger}
import ch.epfl.bluebrain.nexus.ship.EventProcessor.logger
import fs2.Stream
import io.circe.Decoder
import io.circe.optics.JsonPath.root
Expand All @@ -24,34 +23,22 @@ trait EventProcessor[Event <: ScopedEvent] {

def evaluate(event: Event): IO[ImportStatus]

def evaluate(event: RowEvent)(implicit iriPatcher: IriPatcher): IO[ImportStatus] =
if (defaultViewIds.contains(event.id)) {
logger
.info(s"Default views re created on project creation and search updates are handled separately")
.as(ImportStatus.Success)
} else {
val value = event.value
def patchEventId(idAsString: String) = Iri(idAsString).map(iriPatcher(_).toString).getOrElse(idAsString)
def updateIdToPatchedVersion = root.id.string.modify { patchEventId }
def removeSourceMetadata = root.source.obj.modify(_.filterKeys(!_.startsWith("_")))
val patchedValue = updateIdToPatchedVersion.andThen(removeSourceMetadata)(value)
IO.fromEither(decoder.decodeJson(patchedValue))
.onError(err =>
logger.error(err)(s"Error while attempting to decode $resourceType at offset ${event.ordering}")
)
.flatMap(evaluate)
}
def evaluate(event: RowEvent)(implicit iriPatcher: IriPatcher): IO[ImportStatus] = {
val value = event.value
def patchEventId(idAsString: String) = Iri(idAsString).map(iriPatcher(_).toString).getOrElse(idAsString)
def updateIdToPatchedVersion = root.id.string.modify { patchEventId }
def removeSourceMetadata = root.source.obj.modify(_.filterKeys(!_.startsWith("_")))
val patchedValue = updateIdToPatchedVersion.andThen(removeSourceMetadata)(value)
IO.fromEither(decoder.decodeJson(patchedValue))
.onError(err => logger.error(err)(s"Error while attempting to decode $resourceType at offset ${event.ordering}"))
.flatMap(evaluate)
}
}

object EventProcessor {

private val logger = Logger[EventProcessor.type]

private val defaultElasticsearchViewId = ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.defaultViewId
private val defaultBlazegraphViewId = ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.defaultViewId
private val searchViewId: Iri = nxv + "searchView"
private val defaultViewIds = Set(defaultElasticsearchViewId, defaultBlazegraphViewId, searchViewId)

def run(
eventStream: Stream[IO, RowEvent],
droppedEventStore: DroppedEventStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.BlazegraphViews
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewEvent
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewEvent._
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewRejection.{IncorrectRev, ResourceAlreadyExists}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.{defaultViewId, BlazegraphViewEvent}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext
Expand Down Expand Up @@ -45,15 +45,29 @@ class BlazegraphViewProcessor private (
val project = projectMapper.map(event.project)
event match {
case e: BlazegraphViewCreated =>
val patchedSource = viewPatcher.patchBlazegraphViewSource(e.source)
views(event.uuid).flatMap(_.create(e.id, project, patchedSource))
e.id match {
case id if id == defaultViewId => IO.unit // the default view is created on project creation
case _ =>
val patchedSource = viewPatcher.patchBlazegraphViewSource(e.source)
views(event.uuid).flatMap(_.create(e.id, project, patchedSource))
}
case e: BlazegraphViewUpdated =>
val patchedSource = viewPatcher.patchBlazegraphViewSource(e.source)
views(event.uuid).flatMap(_.update(e.id, project, cRev, patchedSource))
e.id match {
case id if id == defaultViewId => IO.unit
case _ =>
val patchedSource = viewPatcher.patchBlazegraphViewSource(e.source)
views(event.uuid).flatMap(_.update(e.id, project, cRev, patchedSource))
}
case e: BlazegraphViewDeprecated =>
views(event.uuid).flatMap(_.deprecate(e.id, project, cRev))
e.id match {
case id if id == defaultViewId => IO.unit
case _ => views(event.uuid).flatMap(_.deprecate(e.id, project, cRev))
}
case e: BlazegraphViewUndeprecated =>
views(event.uuid).flatMap(_.undeprecate(e.id, project, cRev))
e.id match {
case id if id == defaultViewId => IO.unit
case _ => views(event.uuid).flatMap(_.undeprecate(e.id, project, cRev))
}
case _: BlazegraphViewTagAdded => IO.unit // TODO: Can we tag?
}
}.redeemWith(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchViews
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewEvent
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewEvent._
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewRejection.{IncorrectRev, ResourceAlreadyExists}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{defaultViewId, ElasticSearchViewEvent}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext
Expand Down Expand Up @@ -45,15 +45,29 @@ class ElasticSearchViewProcessor private (
val project = projectMapper.map(event.project)
event match {
case e: ElasticSearchViewCreated =>
val patchedSource = viewPatcher.patchElasticSearchViewSource(e.source)
views(event.uuid).flatMap(_.create(e.id, project, patchedSource))
e.id match {
case id if id == defaultViewId => IO.unit // the default view is created on project creation
case _ =>
val patchedSource = viewPatcher.patchElasticSearchViewSource(e.source)
views(event.uuid).flatMap(_.create(e.id, project, patchedSource))
}
case e: ElasticSearchViewUpdated =>
val patchedSource = viewPatcher.patchElasticSearchViewSource(e.source)
views(event.uuid).flatMap(_.update(e.id, project, cRev, patchedSource))
e.id match {
case id if id == defaultViewId => IO.unit
case _ =>
val patchedSource = viewPatcher.patchElasticSearchViewSource(e.source)
views(event.uuid).flatMap(_.update(e.id, project, cRev, patchedSource))
}
case e: ElasticSearchViewDeprecated =>
views(event.uuid).flatMap(_.deprecate(e.id, project, cRev))
e.id match {
case id if id == defaultViewId => IO.unit
case _ => views(event.uuid).flatMap(_.deprecate(e.id, project, cRev))
}
case e: ElasticSearchViewUndeprecated =>
views(event.uuid).flatMap(_.undeprecate(e.id, project, cRev))
e.id match {
case id if id == defaultViewId => IO.unit
case _ => views(event.uuid).flatMap(_.undeprecate(e.id, project, cRev))
}
case _: ElasticSearchViewTagAdded => IO.unit // TODO: Check if this is correct
}
}.redeemWith(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,17 +185,27 @@ class ShipIntegrationSpec extends BaseIntegrationSpec {
thereShouldBeAView(project, bgView, patchedSource)
}

"NOT transfer a search view" in {
val (project, _, _) = thereIsAProject()
val searchView = nxv + "searchView"
"transfer a search view" in {
val (project, _, _) = thereIsAProject()
val (searchView, searchViewJson) = thereIsASearchView(project)

whenTheExportIsRunOnProject(project)
theOldProjectIsDeleted(project)

weRunTheImporter(project)
weFixThePermissions(project)

thereShouldBeNoView(project, searchView)
thereShouldBeAViewIgnoringUUID(project, searchView, searchViewJson)
}

def thereIsASearchView(project: ProjectRef): (Iri, Json) = {
val searchView = nxv + "searchView"
val encodedView = UrlUtils.encode(searchView.toString)
val (viewJson, status) = deltaClient
.getJsonAndStatus(s"/views/${project.organization}/${project.project}/$encodedView", writer)
.accepted
status shouldEqual StatusCodes.OK
searchView -> viewJson
}

def thereShouldBeAView(project: ProjectRef, view: Iri, expectedJson: Json): Assertion = {
Expand All @@ -210,12 +220,24 @@ class ShipIntegrationSpec extends BaseIntegrationSpec {
.accepted
}

def thereShouldBeNoView(project: ProjectRef, view: Iri): Assertion = {
def thereShouldBeAViewIgnoringUUID(project: ProjectRef, view: Iri, originalJson: Json): Assertion = {
val encodedIri = UrlUtils.encode(view.toString)

import io.circe.optics.JsonPath.root
val ignoreSourceUUID = root.sources.each.at("_uuid").replace(None)
val ignoreProjectionUUID = root.projections.each.at("_uuid").replace(None)
val ignoreUUID = root.at("_uuid").replace(None)

val filter = ignoreUUID andThen ignoreSourceUUID andThen ignoreProjectionUUID

root.sources.`null`

deltaClient
.get[Json](s"/views/${project.organization}/${project.project}/$encodedIri", writer) { (_, response) =>
{ response.status shouldEqual StatusCodes.NotFound }
.get[Json](s"/views/${project.organization}/${project.project}/$encodedIri", writer) { (json, response) =>
{
response.status shouldEqual StatusCodes.OK
filter(json) shouldEqual filter(originalJson)
}
}
.accepted
}
Expand Down

0 comments on commit cdd352e

Please sign in to comment.