Skip to content

Commit

Permalink
Allow mapping of org/project during ship import (#4816)
Browse files Browse the repository at this point in the history
  • Loading branch information
olivergrabinski authored Apr 3, 2024
1 parent 7e5f9a0 commit e99d912
Show file tree
Hide file tree
Showing 20 changed files with 262 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,6 @@ final case class SearchConfig(

object SearchConfig {

implicit val projectRefReader: ConfigReader[ProjectRef] = ConfigReader.fromString { value =>
value.split("/").toList match {
case orgStr :: projectStr :: Nil =>
(Label(orgStr), Label(projectStr))
.mapN(ProjectRef(_, _))
.leftMap(err => CannotConvert(value, classOf[ProjectRef].getSimpleName, err.getMessage))
case _ =>
Left(CannotConvert(value, classOf[ProjectRef].getSimpleName, "Wrong format"))
}
}

type Suite = Set[ProjectRef]
type Suites = Map[Label, Suite]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.decoder.JsonLdDecoderError.Parsi
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder
import doobie.{Get, Put}
import io.circe.{Decoder, Encoder, KeyDecoder, KeyEncoder}
import pureconfig.ConfigReader
import pureconfig.error.CannotConvert

/**
* A project label along with its parent organization label.
Expand Down Expand Up @@ -70,4 +72,15 @@ object ProjectRef {
(projectRef.organization.value, projectRef.project.value)
}

implicit val projectRefConfigReader: ConfigReader[ProjectRef] = ConfigReader.fromString { value =>
value.split("/").toList match {
case orgStr :: projectStr :: Nil =>
(Label(orgStr), Label(projectStr))
.mapN(ProjectRef(_, _))
.leftMap(err => CannotConvert(value, classOf[ProjectRef].getSimpleName, err.getMessage))
case _ =>
Left(CannotConvert(value, classOf[ProjectRef].getSimpleName, "Wrong format"))
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package ch.epfl.bluebrain.nexus.ship

import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.ship.config.ShipConfig.ProjectMapping

trait ProjectMapper {
def map(project: ProjectRef): ProjectRef
}

object ProjectMapper {

def apply(projectMapping: ProjectMapping): ProjectMapper =
(project: ProjectRef) =>
projectMapping match {
case m if m.isEmpty => project
case mapping => mapping.getOrElse(project, project)
}

}
29 changes: 12 additions & 17 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class RunShip {
val fetchContext = FetchContext(ApiMappings.empty, xas, Quotas.disabled)
val eventLogConfig = config.eventLog
val baseUri = config.baseUri
val projectMapper = ProjectMapper(config.projectMapping)
for {
// Provision organizations
_ <- orgProvider.create(config.organizations.values)
Expand All @@ -50,24 +51,18 @@ class RunShip {
(schemaLog, fetchSchema) <- SchemaWiring(config.eventLog, eventClock, xas, jsonLdApi)
(resourceLog, fetchResource) =
ResourceWiring(fetchContext, fetchSchema, eventLogConfig, eventClock, xas)
rcr <- ContextWiring
.resolverContextResolution(fetchResource, fetchContext, eventLogConfig, eventClock, xas)
schemaImports = SchemaWiring.schemaImports(
fetchResource,
fetchSchema,
fetchContext,
eventLogConfig,
eventClock,
xas
)
// format: off
rcr <- ContextWiring.resolverContextResolution(fetchResource, fetchContext, eventLogConfig, eventClock, xas)
schemaImports = SchemaWiring.schemaImports(fetchResource, fetchSchema, fetchContext, eventLogConfig, eventClock, xas)
// Processors
projectProcessor <- ProjectProcessor(fetchActiveOrg, eventLogConfig, eventClock, xas)(baseUri)
resolverProcessor = ResolverProcessor(fetchContext, eventLogConfig, eventClock, xas)
schemaProcessor = SchemaProcessor(schemaLog, fetchContext, schemaImports, rcr, eventClock)
resourceProcessor = ResourceProcessor(resourceLog, fetchContext, eventClock)
esViewsProcessor <- ElasticSearchViewProcessor(fetchContext, rcr, eventLogConfig, eventClock, xas)
bgViewsProcessor = BlazegraphViewProcessor(fetchContext, rcr, eventLogConfig, eventClock, xas)
compositeViewsProcessor = CompositeViewProcessor(fetchContext, rcr, eventLogConfig, eventClock, xas)
projectProcessor <- ProjectProcessor(fetchActiveOrg, projectMapper, eventLogConfig, eventClock, xas)(baseUri)
resolverProcessor = ResolverProcessor(fetchContext, projectMapper, eventLogConfig, eventClock, xas)
schemaProcessor = SchemaProcessor(schemaLog, fetchContext, schemaImports, rcr, projectMapper, eventClock)
resourceProcessor = ResourceProcessor(resourceLog, projectMapper, fetchContext, eventClock)
esViewsProcessor <- ElasticSearchViewProcessor(fetchContext, rcr, projectMapper, eventLogConfig, eventClock, xas)
bgViewsProcessor = BlazegraphViewProcessor(fetchContext, rcr, projectMapper, eventLogConfig, eventClock, xas)
compositeViewsProcessor = CompositeViewProcessor(fetchContext, rcr, projectMapper, eventLogConfig, eventClock, xas)
// format: on
report <- EventProcessor
.run(
events,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,40 @@
package ch.epfl.bluebrain.nexus.ship.config

import cats.effect.IO
import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.kernel.config.Configs
import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, ServiceAccountConfig}
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{DatabaseConfig, EventLogConfig}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.ship.config.ShipConfig.ProjectMapping
import com.typesafe.config.Config
import fs2.io.file.Path
import pureconfig.ConfigReader
import pureconfig.configurable.genericMapReader
import pureconfig.error.CannotConvert
import pureconfig.generic.semiauto.deriveReader

final case class ShipConfig(
baseUri: BaseUri,
database: DatabaseConfig,
eventLog: EventLogConfig,
organizations: OrganizationCreationConfig,
projectMapping: ProjectMapping = Map.empty,
serviceAccount: ServiceAccountConfig
)

object ShipConfig {

implicit final val shipConfigReader: ConfigReader[ShipConfig] =
type ProjectMapping = Map[ProjectRef, ProjectRef]

implicit val mapReader: ConfigReader[ProjectMapping] =
genericMapReader(str =>
ProjectRef.parse(str).leftMap(e => CannotConvert(str, classOf[ProjectRef].getSimpleName, e))
)

implicit final val shipConfigReader: ConfigReader[ShipConfig] = {
deriveReader[ShipConfig]
}

def merge(externalConfigPath: Option[Path]): IO[(ShipConfig, Config)] =
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef}
import ch.epfl.bluebrain.nexus.ship.error.ShipError.ProjectDeletionIsNotAllowed
import ch.epfl.bluebrain.nexus.ship.projects.ProjectProcessor.logger
import ch.epfl.bluebrain.nexus.ship.{EventClock, EventProcessor, EventUUIDF, ImportStatus}
import ch.epfl.bluebrain.nexus.ship.{EventClock, EventProcessor, EventUUIDF, ImportStatus, ProjectMapper}
import io.circe.Decoder

final class ProjectProcessor private (projects: Projects, clock: EventClock, uuidF: EventUUIDF)
extends EventProcessor[ProjectEvent] {
final class ProjectProcessor private (
projects: Projects,
projectMapper: ProjectMapper,
clock: EventClock,
uuidF: EventUUIDF
) extends EventProcessor[ProjectEvent] {
override def resourceType: EntityType = Projects.entityType

override def decoder: Decoder[ProjectEvent] = ProjectEvent.serializer.codec
Expand All @@ -34,8 +38,9 @@ final class ProjectProcessor private (projects: Projects, clock: EventClock, uui

private def evaluateInternal(event: ProjectEvent): IO[ImportStatus] = {
implicit val s: Subject = event.subject
val projectRef = event.project
val projectRef = projectMapper.map(event.project)
val cRev = event.rev - 1

event match {
case ProjectCreated(_, _, _, _, _, description, apiMappings, base, vocab, enforceSchema, _, _) =>
val fields = ProjectFields(description, apiMappings, Some(base), Some(vocab), enforceSchema)
Expand Down Expand Up @@ -63,8 +68,14 @@ final class ProjectProcessor private (projects: Projects, clock: EventClock, uui
object ProjectProcessor {

private val logger = Logger[ProjectProcessor]
def apply(fetchActiveOrg: FetchActiveOrganization, config: EventLogConfig, clock: EventClock, xas: Transactors)(
implicit base: BaseUri
def apply(
fetchActiveOrg: FetchActiveOrganization,
projectMapper: ProjectMapper,
config: EventLogConfig,
clock: EventClock,
xas: Transactors
)(implicit
base: BaseUri
): IO[ProjectProcessor] =
for {
uuidF <- EventUUIDF.init()
Expand All @@ -79,6 +90,6 @@ object ProjectProcessor {
xas,
clock
)(base, uuidF)
new ProjectProcessor(projects, clock, uuidF)
new ProjectProcessor(projects, projectMapper, clock, uuidF)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Identity}
import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverProcessor.logger
import ch.epfl.bluebrain.nexus.ship.{EventClock, EventProcessor, ImportStatus}
import ch.epfl.bluebrain.nexus.ship.{EventClock, EventProcessor, ImportStatus, ProjectMapper}
import io.circe.Decoder

class ResolverProcessor private (resolvers: Resolvers, clock: EventClock) extends EventProcessor[ResolverEvent] {
class ResolverProcessor private (resolvers: Resolvers, projectMapper: ProjectMapper, clock: EventClock)
extends EventProcessor[ResolverEvent] {
override def resourceType: EntityType = Resolvers.entityType

override def decoder: Decoder[ResolverEvent] = ResolverEvent.serializer.codec
Expand All @@ -34,7 +35,7 @@ class ResolverProcessor private (resolvers: Resolvers, clock: EventClock) extend
private def evaluateInternal(event: ResolverEvent): IO[ImportStatus] = {
val id = event.id
implicit val s: Subject = event.subject
val projectRef = event.project
val projectRef = projectMapper.map(event.project)
val cRev = event.rev - 1
event match {
case ResolverCreated(_, _, value, _, _, _, _) =>
Expand Down Expand Up @@ -75,11 +76,12 @@ object ResolverProcessor {

def apply(
fetchContext: FetchContext,
projectMapper: ProjectMapper,
config: EventLogConfig,
clock: EventClock,
xas: Transactors
)(implicit api: JsonLdApi): ResolverProcessor = {
val resolvers = ResolverWiring.resolvers(fetchContext, config, clock, xas)
new ResolverProcessor(resolvers, clock)
new ResolverProcessor(resolvers, projectMapper, clock)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.ResourceRejection.{Inco
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ResourceRef}
import ch.epfl.bluebrain.nexus.ship.resources.ResourceProcessor.logger
import ch.epfl.bluebrain.nexus.ship.{EventClock, EventProcessor, ImportStatus}
import ch.epfl.bluebrain.nexus.ship.{EventClock, EventProcessor, ImportStatus, ProjectMapper}
import io.circe.Decoder

class ResourceProcessor private (resources: Resources, clock: EventClock) extends EventProcessor[ResourceEvent] {
class ResourceProcessor private (resources: Resources, projectMapper: ProjectMapper, clock: EventClock)
extends EventProcessor[ResourceEvent] {

override def resourceType: EntityType = Resources.entityType

Expand All @@ -35,28 +36,29 @@ class ResourceProcessor private (resources: Resources, clock: EventClock) extend
implicit val s: Subject = event.subject
implicit val c: Caller = Caller(s, Set.empty)
val cRev = event.rev - 1
val project = projectMapper.map(event.project)

implicit class ResourceRefOps(ref: ResourceRef) {
def toIdSegment: IdSegment = IdSegmentRef(ref).value
}

event match {
case e: ResourceCreated =>
resources.create(e.id, e.project, e.schema.toIdSegment, e.source, e.tag)
resources.create(e.id, project, e.schema.toIdSegment, e.source, e.tag)
case e: ResourceUpdated =>
resources.update(e.id, event.project, e.schema.toIdSegment.some, cRev, e.source, e.tag)
resources.update(e.id, project, e.schema.toIdSegment.some, cRev, e.source, e.tag)
case e: ResourceSchemaUpdated =>
resources.updateAttachedSchema(e.id, e.project, e.schema.toIdSegment)
resources.updateAttachedSchema(e.id, project, e.schema.toIdSegment)
case e: ResourceRefreshed =>
resources.refresh(e.id, e.project, e.schema.toIdSegment.some)
resources.refresh(e.id, project, e.schema.toIdSegment.some)
case e: ResourceTagAdded =>
resources.tag(e.id, e.project, None, e.tag, e.targetRev, cRev)
resources.tag(e.id, project, None, e.tag, e.targetRev, cRev)
case e: ResourceTagDeleted =>
resources.deleteTag(e.id, e.project, None, e.tag, cRev)
resources.deleteTag(e.id, project, None, e.tag, cRev)
case e: ResourceDeprecated =>
resources.deprecate(e.id, e.project, None, cRev)
resources.deprecate(e.id, project, None, cRev)
case e: ResourceUndeprecated =>
resources.undeprecate(e.id, e.project, None, cRev)
resources.undeprecate(e.id, project, None, cRev)
}
}.redeemWith(
{
Expand All @@ -75,12 +77,13 @@ object ResourceProcessor {

def apply(
log: ResourceLog,
projectMapper: ProjectMapper,
fetchContext: FetchContext,
clock: EventClock
)(implicit jsonLdApi: JsonLdApi): ResourceProcessor = {
val rcr = ResolverContextResolution.never // TODO: Pass correct ResolverContextResolution
val resources = ResourcesImpl(log, fetchContext, rcr)
new ResourceProcessor(resources, clock)
new ResourceProcessor(resources, projectMapper, clock)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ import ch.epfl.bluebrain.nexus.delta.sdk.schemas.{SchemaImports, Schemas, Schema
import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.ship.schemas.SchemaProcessor.logger
import ch.epfl.bluebrain.nexus.ship.{EventClock, EventProcessor, FailingUUID, ImportStatus}
import ch.epfl.bluebrain.nexus.ship.{EventClock, EventProcessor, FailingUUID, ImportStatus, ProjectMapper}
import io.circe.Decoder

class SchemaProcessor private (schemas: Schemas, clock: EventClock) extends EventProcessor[SchemaEvent] {
class SchemaProcessor private (schemas: Schemas, projectMapper: ProjectMapper, clock: EventClock)
extends EventProcessor[SchemaEvent] {

override def resourceType: EntityType = Schemas.entityType

Expand All @@ -34,15 +35,16 @@ class SchemaProcessor private (schemas: Schemas, clock: EventClock) extends Even
implicit val s: Subject = event.subject
implicit val c: Caller = Caller(s, Set.empty)
val cRev = event.rev - 1
val project = projectMapper.map(event.project)

event match {
case e: SchemaCreated => schemas.create(e.id, e.project, e.source)
case e: SchemaUpdated => schemas.update(e.id, e.project, cRev, e.source)
case e: SchemaRefreshed => schemas.refresh(e.id, e.project)
case e: SchemaTagAdded => schemas.tag(e.id, e.project, e.tag, e.targetRev, cRev)
case e: SchemaTagDeleted => schemas.deleteTag(e.id, e.project, e.tag, cRev)
case e: SchemaDeprecated => schemas.deprecate(e.id, e.project, cRev)
case e: SchemaUndeprecated => schemas.undeprecate(e.id, e.project, cRev)
case e: SchemaCreated => schemas.create(e.id, project, e.source)
case e: SchemaUpdated => schemas.update(e.id, project, cRev, e.source)
case e: SchemaRefreshed => schemas.refresh(e.id, project)
case e: SchemaTagAdded => schemas.tag(e.id, project, e.tag, e.targetRev, cRev)
case e: SchemaTagDeleted => schemas.deleteTag(e.id, project, e.tag, cRev)
case e: SchemaDeprecated => schemas.deprecate(e.id, project, cRev)
case e: SchemaUndeprecated => schemas.undeprecate(e.id, project, cRev)
}
}.redeemWith(
{
Expand All @@ -64,10 +66,11 @@ object SchemaProcessor {
fetchContext: FetchContext,
schemaImports: SchemaImports,
rcr: ResolverContextResolution,
projectMapper: ProjectMapper,
clock: EventClock
)(implicit jsonLdApi: JsonLdApi): SchemaProcessor = {
val schemas = SchemasImpl(log, fetchContext, schemaImports, rcr)(jsonLdApi, FailingUUID)
new SchemaProcessor(schemas, clock)
new SchemaProcessor(schemas, projectMapper, clock)
}

}
Loading

0 comments on commit e99d912

Please sign in to comment.