Skip to content

Commit

Permalink
Add ElasticSearchViewProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
olivergrabinski committed Mar 25, 2024
1 parent 278de2c commit 15503b9
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 68 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ lazy val ship = project
)
.enablePlugins(UniversalPlugin, JavaAppPackaging, JavaAgent, DockerPlugin, BuildInfoPlugin)
.settings(shared, compilation, servicePackaging, assertJavaVersion, kamonSettings, coverage, release)
.dependsOn(sdk % "compile->compile;test->test", tests % "test->test")
.dependsOn(sdk % "compile->compile;test->test", elasticsearchPlugin % "compile->compile", tests % "test->compile;test->test")
.settings(
libraryDependencies ++= Seq(declineEffect),
addCompilerPlugin(betterMonadicFor),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,26 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig
import ch.epfl.bluebrain.nexus.ship.acls.AclWiring
import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverWiring

import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{contexts => esContexts}

object ContextWiring {

implicit private val loader: ClasspathResourceLoader = ClasspathResourceLoader.withContext(getClass)

def remoteContextResolution: IO[RemoteContextResolution] =
for {
shaclCtx <- ContextValue.fromFile("contexts/shacl.json")
schemasMetaCtx <- ContextValue.fromFile("contexts/schemas-metadata.json")
pipelineCtx <- ContextValue.fromFile("contexts/pipeline.json")
shaclCtx <- ContextValue.fromFile("contexts/shacl.json")
schemasMetaCtx <- ContextValue.fromFile("contexts/schemas-metadata.json")
elasticsearchCtx <- ContextValue.fromFile("contexts/elasticsearch.json")
} yield RemoteContextResolution.fixed(
// Delta
contexts.pipeline -> pipelineCtx,
// Schema
contexts.shacl -> shaclCtx,
contexts.schemasMetadata -> schemasMetaCtx
contexts.schemasMetadata -> schemasMetaCtx,
// ElasticSearch
esContexts.elasticsearch -> elasticsearchCtx
)

def resolverContextResolution(
Expand Down
15 changes: 12 additions & 3 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import ch.epfl.bluebrain.nexus.ship.projects.ProjectProcessor
import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverProcessor
import ch.epfl.bluebrain.nexus.ship.resources.{ResourceProcessor, ResourceWiring}
import ch.epfl.bluebrain.nexus.ship.schemas.{SchemaProcessor, SchemaWiring}
import ch.epfl.bluebrain.nexus.ship.views.ElasticSearchViewProcessor
import fs2.Stream
import fs2.io.file.{Files, Path}
import io.circe.parser.decode
Expand Down Expand Up @@ -48,7 +49,7 @@ class RunShip {
(schemaLog, fetchSchema) <- SchemaWiring(config.eventLog, eventClock, xas, jsonLdApi)
(resourceLog, fetchResource) =
ResourceWiring(fetchContext, fetchSchema, eventLogConfig, eventClock, xas)
rcr = ContextWiring
rcr <- ContextWiring
.resolverContextResolution(fetchResource, fetchContext, eventLogConfig, eventClock, xas)
schemaImports = SchemaWiring.schemaImports(
fetchResource,
Expand All @@ -61,10 +62,18 @@ class RunShip {
// Processors
projectProcessor <- ProjectProcessor(fetchActiveOrg, eventLogConfig, eventClock, xas)(baseUri)
resolverProcessor = ResolverProcessor(fetchContext, eventLogConfig, eventClock, xas)
schemaProcessor <- SchemaProcessor(schemaLog, fetchContext, schemaImports, rcr, eventClock)
schemaProcessor = SchemaProcessor(schemaLog, fetchContext, schemaImports, rcr, eventClock)
resourceProcessor = ResourceProcessor(resourceLog, fetchContext, eventClock)
esViewsProcessor <- ElasticSearchViewProcessor(fetchContext, rcr, eventLogConfig, eventClock, xas)
report <- EventProcessor
.run(events, projectProcessor, resolverProcessor, schemaProcessor, resourceProcessor)
.run(
events,
projectProcessor,
resolverProcessor,
schemaProcessor,
resourceProcessor,
esViewsProcessor
)
} yield report
}
} yield report
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,11 @@ object SchemaProcessor {
log: SchemaLog,
fetchContext: FetchContext,
schemaImports: SchemaImports,
resolverContextResolution: IO[ResolverContextResolution],
rcr: ResolverContextResolution,
clock: EventClock
)(implicit jsonLdApi: JsonLdApi): IO[SchemaProcessor] =
for {
rcr <- resolverContextResolution
schemas = SchemasImpl(log, fetchContext, schemaImports, rcr)(jsonLdApi, FailingUUID)
} yield new SchemaProcessor(schemas, clock)
)(implicit jsonLdApi: JsonLdApi): SchemaProcessor = {
val schemas = SchemasImpl(log, fetchContext, schemaImports, rcr)(jsonLdApi, FailingUUID)
new SchemaProcessor(schemas, clock)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package ch.epfl.bluebrain.nexus.ship.views

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.utils.{ClasspathResourceLoader, UUIDF}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewEvent._
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewRejection.{IncorrectRev, ResourceAlreadyExists}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{ElasticSearchFiles, ElasticSearchViewEvent, ElasticSearchViewValue}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.{ElasticSearchViews, ValidateElasticSearchView}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext
import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverContextResolution
import ch.epfl.bluebrain.nexus.delta.sdk.views.IndexingRev
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.ship.views.ElasticSearchViewProcessor.logger
import ch.epfl.bluebrain.nexus.ship.{EventClock, EventProcessor, ImportStatus}
import io.circe.Decoder

import java.util.UUID

class ElasticSearchViewProcessor private (views: ElasticSearchViews, clock: EventClock)
extends EventProcessor[ElasticSearchViewEvent] {

override def resourceType: EntityType = ElasticSearchViews.entityType

override def decoder: Decoder[ElasticSearchViewEvent] = ElasticSearchViewEvent.serializer.codec

override def evaluate(event: ElasticSearchViewEvent): IO[ImportStatus] =
for {
_ <- clock.setInstant(event.instant)
result <- evaluateInternal(event)
} yield result

private def evaluateInternal(event: ElasticSearchViewEvent): IO[ImportStatus] = {
implicit val s: Subject = event.subject
val cRev = event.rev - 1
event match {
case e: ElasticSearchViewCreated => views.create(e.id, e.project, e.value)
case e: ElasticSearchViewUpdated => views.update(e.id, e.project, cRev, e.value)
case e: ElasticSearchViewDeprecated => views.deprecate(e.id, e.project, cRev)
case e: ElasticSearchViewUndeprecated => views.undeprecate(e.id, e.project, cRev)
case _: ElasticSearchViewTagAdded => IO.unit // TODO: Check if this is correct
}
}.redeemWith(
{
case a: ResourceAlreadyExists => logger.warn(a)("The resource already exists").as(ImportStatus.Dropped)
case i: IncorrectRev => logger.warn(i)("An incorrect revision has been provided").as(ImportStatus.Dropped)
case other => IO.raiseError(other)
},
_ => IO.pure(ImportStatus.Success)
)

}

object ElasticSearchViewProcessor {

private val logger = Logger[ElasticSearchViewProcessor]

def apply(
fetchContext: FetchContext,
rcr: ResolverContextResolution,
config: EventLogConfig,
clock: EventClock,
xas: Transactors
)(implicit
jsonLdApi: JsonLdApi
): IO[ElasticSearchViewProcessor] = {
implicit val uuidF: UUIDF = UUIDF.random // TODO: Use correct UUID?
implicit val loader: ClasspathResourceLoader = ClasspathResourceLoader.withContext(getClass)

val noValidation = new ValidateElasticSearchView {
override def apply(uuid: UUID, indexingRev: IndexingRev, v: ElasticSearchViewValue): IO[Unit] = IO.unit
}
val prefix = "wrong_prefix" // TODO: fix prefix
val files = ElasticSearchFiles.mk(loader)

for {
f <- files
views <- ElasticSearchViews(
fetchContext,
rcr,
noValidation,
config,
prefix,
xas,
f.defaultMapping,
f.defaultSettings,
clock
)
} yield new ElasticSearchViewProcessor(views, clock)
}

}
55 changes: 0 additions & 55 deletions ship/src/test/resources/config/default.conf

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,61 @@ class EndToEndTest extends BaseIntegrationSpec {
thereShouldBeASchema(project, schema, schemaJson)
}

"transfer an elasticsearch view" in {
val (project, _, _) = thereIsAProject()
val (schema, schemaJson) = thereIsAnElasticSearchView(project)

whenTheExportIsRunOnProject(project)
theOldProjectIsDeleted(project)

weRunTheImporter(project)
weFixThePermissions(project)

thereShouldBeAnElasticSearchView(project, schema, schemaJson)
}

def thereShouldBeAnElasticSearchView(project: ProjectRef, schema: Iri, originalJson: Json): Assertion = {
val encodedIri = UrlUtils.encode(schema.toString)
deltaClient
.get[Json](s"/views/${project.organization}/${project.project}/$encodedIri", writer) { (json, response) =>
{
response.status shouldEqual StatusCodes.OK
json shouldEqual originalJson
}
}
.accepted
}

def thereIsAnElasticSearchView(project: ProjectRef): (Iri, Json) = {
val view = nxv + genString()
val encodedView = UrlUtils.encode(view.toString)
val simpleView =
json"""
{
"@type": "ElasticSearchView",
"resourceSchemas": [],
"resourceTypes": [],
"sourceAsText": false,
"includeMetadata": true,
"includeDeprecated": false,
"mapping": {}
}
"""

deltaClient
.put[Json](s"/views/${project.organization}/${project.project}/$encodedView", simpleView, writer) {
(_, response) =>
response.status shouldEqual StatusCodes.Created
}
.accepted

val (viewJson, status) = deltaClient
.getJsonAndStatus(s"/views/${project.organization}/${project.project}/$encodedView", writer)
.accepted
status shouldEqual StatusCodes.OK
view -> viewJson
}

def thereIsAProject(): (ProjectRef, ProjectPayload, Json) = {
val orgName = genString()
val projName = genString()
Expand Down

0 comments on commit 15503b9

Please sign in to comment.