diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventProcessor.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventProcessor.scala index d1559a15f7..d4cb791543 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventProcessor.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventProcessor.scala @@ -4,11 +4,10 @@ import cats.effect.IO import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri -import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv import ch.epfl.bluebrain.nexus.delta.sourcing.event.Event.ScopedEvent import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.RowEvent import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType -import ch.epfl.bluebrain.nexus.ship.EventProcessor.{defaultViewIds, logger} +import ch.epfl.bluebrain.nexus.ship.EventProcessor.logger import fs2.Stream import io.circe.Decoder import io.circe.optics.JsonPath.root @@ -24,34 +23,22 @@ trait EventProcessor[Event <: ScopedEvent] { def evaluate(event: Event): IO[ImportStatus] - def evaluate(event: RowEvent)(implicit iriPatcher: IriPatcher): IO[ImportStatus] = - if (defaultViewIds.contains(event.id)) { - logger - .info(s"Default views re created on project creation and search updates are handled separately") - .as(ImportStatus.Success) - } else { - val value = event.value - def patchEventId(idAsString: String) = Iri(idAsString).map(iriPatcher(_).toString).getOrElse(idAsString) - def updateIdToPatchedVersion = root.id.string.modify { patchEventId } - def removeSourceMetadata = root.source.obj.modify(_.filterKeys(!_.startsWith("_"))) - val patchedValue = updateIdToPatchedVersion.andThen(removeSourceMetadata)(value) - IO.fromEither(decoder.decodeJson(patchedValue)) - .onError(err => - logger.error(err)(s"Error while attempting to decode $resourceType at offset ${event.ordering}") - ) - .flatMap(evaluate) - } + def evaluate(event: RowEvent)(implicit iriPatcher: IriPatcher): IO[ImportStatus] = { + val value = event.value + def patchEventId(idAsString: String) = Iri(idAsString).map(iriPatcher(_).toString).getOrElse(idAsString) + def updateIdToPatchedVersion = root.id.string.modify { patchEventId } + def removeSourceMetadata = root.source.obj.modify(_.filterKeys(!_.startsWith("_"))) + val patchedValue = updateIdToPatchedVersion.andThen(removeSourceMetadata)(value) + IO.fromEither(decoder.decodeJson(patchedValue)) + .onError(err => logger.error(err)(s"Error while attempting to decode $resourceType at offset ${event.ordering}")) + .flatMap(evaluate) + } } object EventProcessor { private val logger = Logger[EventProcessor.type] - private val defaultElasticsearchViewId = ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.defaultViewId - private val defaultBlazegraphViewId = ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.defaultViewId - private val searchViewId: Iri = nxv + "searchView" - private val defaultViewIds = Set(defaultElasticsearchViewId, defaultBlazegraphViewId, searchViewId) - def run( eventStream: Stream[IO, RowEvent], droppedEventStore: DroppedEventStore, diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/views/BlazegraphViewProcessor.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/views/BlazegraphViewProcessor.scala index 4801687ee5..e949481800 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/views/BlazegraphViewProcessor.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/views/BlazegraphViewProcessor.scala @@ -4,9 +4,9 @@ import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.BlazegraphViews -import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewEvent import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewEvent._ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewRejection.{IncorrectRev, ResourceAlreadyExists} +import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.{defaultViewId, BlazegraphViewEvent} import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext @@ -45,15 +45,29 @@ class BlazegraphViewProcessor private ( val project = projectMapper.map(event.project) event match { case e: BlazegraphViewCreated => - val patchedSource = viewPatcher.patchBlazegraphViewSource(e.source) - views(event.uuid).flatMap(_.create(e.id, project, patchedSource)) + e.id match { + case id if id == defaultViewId => IO.unit // the default view is created on project creation + case _ => + val patchedSource = viewPatcher.patchBlazegraphViewSource(e.source) + views(event.uuid).flatMap(_.create(e.id, project, patchedSource)) + } case e: BlazegraphViewUpdated => - val patchedSource = viewPatcher.patchBlazegraphViewSource(e.source) - views(event.uuid).flatMap(_.update(e.id, project, cRev, patchedSource)) + e.id match { + case id if id == defaultViewId => IO.unit + case _ => + val patchedSource = viewPatcher.patchBlazegraphViewSource(e.source) + views(event.uuid).flatMap(_.update(e.id, project, cRev, patchedSource)) + } case e: BlazegraphViewDeprecated => - views(event.uuid).flatMap(_.deprecate(e.id, project, cRev)) + e.id match { + case id if id == defaultViewId => IO.unit + case _ => views(event.uuid).flatMap(_.deprecate(e.id, project, cRev)) + } case e: BlazegraphViewUndeprecated => - views(event.uuid).flatMap(_.undeprecate(e.id, project, cRev)) + e.id match { + case id if id == defaultViewId => IO.unit + case _ => views(event.uuid).flatMap(_.undeprecate(e.id, project, cRev)) + } case _: BlazegraphViewTagAdded => IO.unit // TODO: Can we tag? } }.redeemWith( diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/views/ElasticSearchViewProcessor.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/views/ElasticSearchViewProcessor.scala index 7f99613666..c2397fd682 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/views/ElasticSearchViewProcessor.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/views/ElasticSearchViewProcessor.scala @@ -4,9 +4,9 @@ import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchViews -import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewEvent import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewEvent._ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewRejection.{IncorrectRev, ResourceAlreadyExists} +import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{defaultViewId, ElasticSearchViewEvent} import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext @@ -45,15 +45,29 @@ class ElasticSearchViewProcessor private ( val project = projectMapper.map(event.project) event match { case e: ElasticSearchViewCreated => - val patchedSource = viewPatcher.patchElasticSearchViewSource(e.source) - views(event.uuid).flatMap(_.create(e.id, project, patchedSource)) + e.id match { + case id if id == defaultViewId => IO.unit // the default view is created on project creation + case _ => + val patchedSource = viewPatcher.patchElasticSearchViewSource(e.source) + views(event.uuid).flatMap(_.create(e.id, project, patchedSource)) + } case e: ElasticSearchViewUpdated => - val patchedSource = viewPatcher.patchElasticSearchViewSource(e.source) - views(event.uuid).flatMap(_.update(e.id, project, cRev, patchedSource)) + e.id match { + case id if id == defaultViewId => IO.unit + case _ => + val patchedSource = viewPatcher.patchElasticSearchViewSource(e.source) + views(event.uuid).flatMap(_.update(e.id, project, cRev, patchedSource)) + } case e: ElasticSearchViewDeprecated => - views(event.uuid).flatMap(_.deprecate(e.id, project, cRev)) + e.id match { + case id if id == defaultViewId => IO.unit + case _ => views(event.uuid).flatMap(_.deprecate(e.id, project, cRev)) + } case e: ElasticSearchViewUndeprecated => - views(event.uuid).flatMap(_.undeprecate(e.id, project, cRev)) + e.id match { + case id if id == defaultViewId => IO.unit + case _ => views(event.uuid).flatMap(_.undeprecate(e.id, project, cRev)) + } case _: ElasticSearchViewTagAdded => IO.unit // TODO: Check if this is correct } }.redeemWith( diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/ShipIntegrationSpec.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/ShipIntegrationSpec.scala index 0116b5cb27..f2e7df71b1 100644 --- a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/ShipIntegrationSpec.scala +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/ShipIntegrationSpec.scala @@ -185,9 +185,9 @@ class ShipIntegrationSpec extends BaseIntegrationSpec { thereShouldBeAView(project, bgView, patchedSource) } - "NOT transfer a search view" in { - val (project, _, _) = thereIsAProject() - val searchView = nxv + "searchView" + "transfer a search view" in { + val (project, _, _) = thereIsAProject() + val (searchView, searchViewJson) = thereIsASearchView(project) whenTheExportIsRunOnProject(project) theOldProjectIsDeleted(project) @@ -195,7 +195,17 @@ class ShipIntegrationSpec extends BaseIntegrationSpec { weRunTheImporter(project) weFixThePermissions(project) - thereShouldBeNoView(project, searchView) + thereShouldBeAViewIgnoringUUID(project, searchView, searchViewJson) + } + + def thereIsASearchView(project: ProjectRef): (Iri, Json) = { + val searchView = nxv + "searchView" + val encodedView = UrlUtils.encode(searchView.toString) + val (viewJson, status) = deltaClient + .getJsonAndStatus(s"/views/${project.organization}/${project.project}/$encodedView", writer) + .accepted + status shouldEqual StatusCodes.OK + searchView -> viewJson } def thereShouldBeAView(project: ProjectRef, view: Iri, expectedJson: Json): Assertion = { @@ -210,12 +220,24 @@ class ShipIntegrationSpec extends BaseIntegrationSpec { .accepted } - def thereShouldBeNoView(project: ProjectRef, view: Iri): Assertion = { + def thereShouldBeAViewIgnoringUUID(project: ProjectRef, view: Iri, originalJson: Json): Assertion = { val encodedIri = UrlUtils.encode(view.toString) + import io.circe.optics.JsonPath.root + val ignoreSourceUUID = root.sources.each.at("_uuid").replace(None) + val ignoreProjectionUUID = root.projections.each.at("_uuid").replace(None) + val ignoreUUID = root.at("_uuid").replace(None) + + val filter = ignoreUUID andThen ignoreSourceUUID andThen ignoreProjectionUUID + + root.sources.`null` + deltaClient - .get[Json](s"/views/${project.organization}/${project.project}/$encodedIri", writer) { (_, response) => - { response.status shouldEqual StatusCodes.NotFound } + .get[Json](s"/views/${project.organization}/${project.project}/$encodedIri", writer) { (json, response) => + { + response.status shouldEqual StatusCodes.OK + filter(json) shouldEqual filter(originalJson) + } } .accepted }