Skip to content

Commit

Permalink
Merge branch 'master' into resolver-priority
Browse files Browse the repository at this point in the history
  • Loading branch information
olivergrabinski authored Mar 26, 2024
2 parents 8ba0983 + f296a28 commit 2b0d888
Show file tree
Hide file tree
Showing 17 changed files with 510 additions and 137 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci-delta-ship.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ jobs:
- name: Clean, build Delta & Storage images
run: |
sbt -Dsbt.color=always -Dsbt.supershell=false \
clean \
app/Docker/publishLocal
- name: Start services
run: docker-compose -f tests/docker/docker-compose.yml up -d
Expand All @@ -45,5 +46,4 @@ jobs:
- name: Unit tests
run: |
sbt -Dsbt.color=always -Dsbt.supershell=false \
clean \
ship-unit-tests
7 changes: 6 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,12 @@ lazy val ship = project
)
.enablePlugins(UniversalPlugin, JavaAppPackaging, JavaAgent, DockerPlugin, BuildInfoPlugin)
.settings(shared, compilation, servicePackaging, assertJavaVersion, kamonSettings, coverage, release)
.dependsOn(sdk % "compile->compile;test->test", tests % "test->test")
.dependsOn(
sdk % "compile->compile;test->test",
blazegraphPlugin % "compile->compile",
elasticsearchPlugin % "compile->compile",
tests % "test->compile;test->test"
)
.settings(
libraryDependencies ++= Seq(declineEffect),
addCompilerPlugin(betterMonadicFor),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ object BlazegraphViews {
apply(fetchContext, contextResolution, validate, createNameSpace, eventLogConfig, prefix, xas, clock)
}

private[blazegraph] def apply(
def apply(
fetchContext: FetchContext,
contextResolution: ResolverContextResolution,
validate: ValidateBlazegraphView,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,19 +105,19 @@ object Transactors {
): Resource[IO, Transactors] = {
def transactor(access: DatabaseAccess, readOnly: Boolean, poolName: String): Resource[IO, HikariTransactor[IO]] = {
for {
ec <- ExecutionContexts.fixedThreadPool[IO](access.poolSize)
dataSource = {
val ds = new HikariDataSource
ds.setJdbcUrl(s"jdbc:postgresql://${access.host}:${access.port}/")
ds.setUsername(config.username)
ds.setPassword(config.password.value)
ds.setDriverClassName("org.postgresql.Driver")
ds.setMaximumPoolSize(access.poolSize)
ds.setPoolName(poolName)
ds.setAutoCommit(false)
ds.setReadOnly(readOnly)
ds
}
ec <- ExecutionContexts.fixedThreadPool[IO](access.poolSize)
dataSource <- Resource.make[IO, HikariDataSource](IO.delay {
val ds = new HikariDataSource
ds.setJdbcUrl(s"jdbc:postgresql://${access.host}:${access.port}/")
ds.setUsername(config.username)
ds.setPassword(config.password.value)
ds.setDriverClassName("org.postgresql.Driver")
ds.setMaximumPoolSize(access.poolSize)
ds.setPoolName(poolName)
ds.setAutoCommit(false)
ds.setReadOnly(readOnly)
ds
})(ds => IO.delay(ds.close()))
} yield HikariTransactor[IO](dataSource, ec, None)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ch.epfl.bluebrain.nexus.ship

import cats.effect.{Clock, IO}
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceLoader
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.contexts
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
Expand All @@ -9,37 +9,49 @@ import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext
import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverContextResolution
import ch.epfl.bluebrain.nexus.delta.sdk.resources.FetchResource
import ch.epfl.bluebrain.nexus.delta.sdk.resources.Resources.ResourceLog
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig
import ch.epfl.bluebrain.nexus.ship.acls.AclWiring
import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverWiring

import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{contexts => esContexts}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.{contexts => bgContexts}

object ContextWiring {

implicit private val loader: ClasspathResourceLoader = ClasspathResourceLoader.withContext(getClass)

def remoteContextResolution: IO[RemoteContextResolution] =
for {
shaclCtx <- ContextValue.fromFile("contexts/shacl.json")
schemasMetaCtx <- ContextValue.fromFile("contexts/schemas-metadata.json")
pipelineCtx <- ContextValue.fromFile("contexts/pipeline.json")
shaclCtx <- ContextValue.fromFile("contexts/shacl.json")
schemasMetaCtx <- ContextValue.fromFile("contexts/schemas-metadata.json")
elasticsearchCtx <- ContextValue.fromFile("contexts/elasticsearch.json")
blazegraphCtx <- ContextValue.fromFile("contexts/sparql.json")
} yield RemoteContextResolution.fixed(
// Delta
contexts.pipeline -> pipelineCtx,
// Schema
contexts.shacl -> shaclCtx,
contexts.schemasMetadata -> schemasMetaCtx
contexts.schemasMetadata -> schemasMetaCtx,
// ElasticSearch
esContexts.elasticsearch -> elasticsearchCtx,
// Blazegraph
bgContexts.blazegraph -> blazegraphCtx
)

def resolverContextResolution(
resourceLog: Clock[IO] => IO[ResourceLog],
fetchResource: FetchResource,
fetchContext: FetchContext,
config: EventLogConfig,
clock: EventClock,
xas: Transactors
)(implicit jsonLdApi: JsonLdApi): Clock[IO] => IO[ResolverContextResolution] = { clock =>
)(implicit jsonLdApi: JsonLdApi): IO[ResolverContextResolution] = {
val aclCheck = AclCheck(AclWiring.acls(config, clock, xas))
val resolvers = ResolverWiring.resolvers(fetchContext, config, clock, xas)

for {
fetchResource <- resourceLog(clock).map(FetchResource(_))
rcr <- remoteContextResolution
rcr <- remoteContextResolution
} yield ResolverContextResolution(aclCheck, resolvers, rcr, fetchResource)
}

Expand Down
53 changes: 34 additions & 19 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import ch.epfl.bluebrain.nexus.ship.projects.ProjectProcessor
import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverProcessor
import ch.epfl.bluebrain.nexus.ship.resources.{ResourceProcessor, ResourceWiring}
import ch.epfl.bluebrain.nexus.ship.schemas.{SchemaProcessor, SchemaWiring}
import ch.epfl.bluebrain.nexus.ship.views.{BlazegraphViewProcessor, ElasticSearchViewProcessor}
import fs2.Stream
import fs2.io.file.{Files, Path}
import io.circe.parser.decode
Expand All @@ -40,27 +41,41 @@ class RunShip {
val baseUri = config.baseUri
for {
// Provision organizations
_ <- orgProvider.create(config.organizations.values)
events = eventStream(file)
fetchActiveOrg = FetchActiveOrganization(xas)
_ <- orgProvider.create(config.organizations.values)
events = eventStream(file)
fetchActiveOrg = FetchActiveOrganization(xas)
// Wiring
schemaLog = SchemaWiring.schemaLog(config.eventLog, xas, jsonLdApi)
resourceLog = ResourceWiring.resourceLog(fetchContext, schemaLog, eventLogConfig, xas)
schemaImports = SchemaWiring.schemaImports(
resourceLog,
schemaLog,
fetchContext,
eventLogConfig,
xas
)
rcr = ContextWiring.resolverContextResolution(resourceLog, fetchContext, eventLogConfig, xas)
eventClock <- EventClock.init()
(schemaLog, fetchSchema) <- SchemaWiring(config.eventLog, eventClock, xas, jsonLdApi)
(resourceLog, fetchResource) =
ResourceWiring(fetchContext, fetchSchema, eventLogConfig, eventClock, xas)
rcr <- ContextWiring
.resolverContextResolution(fetchResource, fetchContext, eventLogConfig, eventClock, xas)
schemaImports = SchemaWiring.schemaImports(
fetchResource,
fetchSchema,
fetchContext,
eventLogConfig,
eventClock,
xas
)
// Processors
projectProcessor <- ProjectProcessor(fetchActiveOrg, eventLogConfig, xas)(baseUri)
resolverProcessor <- ResolverProcessor(fetchContext, eventLogConfig, xas)
schemaProcessor <- SchemaProcessor(schemaLog, fetchContext, schemaImports, rcr)
resourceProcessor <- ResourceProcessor(resourceLog, fetchContext)
report <- EventProcessor
.run(events, projectProcessor, resolverProcessor, schemaProcessor, resourceProcessor)
projectProcessor <- ProjectProcessor(fetchActiveOrg, eventLogConfig, eventClock, xas)(baseUri)
resolverProcessor = ResolverProcessor(fetchContext, eventLogConfig, eventClock, xas)
schemaProcessor = SchemaProcessor(schemaLog, fetchContext, schemaImports, rcr, eventClock)
resourceProcessor = ResourceProcessor(resourceLog, fetchContext, eventClock)
esViewsProcessor <- ElasticSearchViewProcessor(fetchContext, rcr, eventLogConfig, eventClock, xas)
bgViewsProcessor = BlazegraphViewProcessor(fetchContext, rcr, eventLogConfig, eventClock, xas)
report <- EventProcessor
.run(
events,
projectProcessor,
resolverProcessor,
schemaProcessor,
resourceProcessor,
esViewsProcessor,
bgViewsProcessor
)
} yield report
}
} yield report
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,10 @@ final class ProjectProcessor private (projects: Projects, clock: EventClock, uui
object ProjectProcessor {

private val logger = Logger[ProjectProcessor]
def apply(fetchActiveOrg: FetchActiveOrganization, config: EventLogConfig, xas: Transactors)(implicit
base: BaseUri
def apply(fetchActiveOrg: FetchActiveOrganization, config: EventLogConfig, clock: EventClock, xas: Transactors)(
implicit base: BaseUri
): IO[ProjectProcessor] =
for {
clock <- EventClock.init()
uuidF <- EventUUIDF.init()
} yield {
val disableDeletion: ValidateProjectDeletion = (p: ProjectRef) => IO.raiseError(ProjectDeletionIsNotAllowed(p))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class ResolverProcessor private (resolvers: Resolvers, clock: EventClock) extend
}.redeemWith(
{
case a: ResourceAlreadyExists => logger.warn(a)("The resolver already exists").as(ImportStatus.Dropped)
case i: IncorrectRev => logger.warn(i)("An incorrect revision as been provided").as(ImportStatus.Dropped)
case i: IncorrectRev => logger.warn(i)("An incorrect revision has been provided").as(ImportStatus.Dropped)
case other => IO.raiseError(other)
},
_ => IO.pure(ImportStatus.Success)
Expand All @@ -76,10 +76,10 @@ object ResolverProcessor {
def apply(
fetchContext: FetchContext,
config: EventLogConfig,
clock: EventClock,
xas: Transactors
)(implicit api: JsonLdApi): IO[ResolverProcessor] =
EventClock.init().map { clock =>
val resolvers = ResolverWiring.resolvers(fetchContext, config, clock, xas)
new ResolverProcessor(resolvers, clock)
}
)(implicit api: JsonLdApi): ResolverProcessor = {
val resolvers = ResolverWiring.resolvers(fetchContext, config, clock, xas)
new ResolverProcessor(resolvers, clock)
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ch.epfl.bluebrain.nexus.ship.resources

import cats.effect.{Clock, IO}
import cats.effect.IO
import cats.implicits.catsSyntaxOptionId
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
Expand Down Expand Up @@ -74,14 +74,13 @@ object ResourceProcessor {
private val logger = Logger[ResourceProcessor]

def apply(
log: Clock[IO] => IO[ResourceLog],
fetchContext: FetchContext
)(implicit jsonLdApi: JsonLdApi): IO[ResourceProcessor] =
EventClock.init().flatMap { clock =>
for {
resourceLog <- log(clock)
resources = ResourcesImpl(resourceLog, fetchContext, ResolverContextResolution.never)
} yield new ResourceProcessor(resources, clock)
}
log: ResourceLog,
fetchContext: FetchContext,
clock: EventClock
)(implicit jsonLdApi: JsonLdApi): ResourceProcessor = {
val rcr = ResolverContextResolution.never // TODO: Pass correct ResolverContextResolution
val resources = ResourcesImpl(log, fetchContext, rcr)
new ResourceProcessor(resources, clock)
}

}
Original file line number Diff line number Diff line change
@@ -1,41 +1,41 @@
package ch.epfl.bluebrain.nexus.ship.resources

import cats.effect.{Clock, IO}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext
import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResourceResolution
import ch.epfl.bluebrain.nexus.delta.sdk.resources.Resources.ResourceLog
import ch.epfl.bluebrain.nexus.delta.sdk.resources.{DetectChange, Resources, ValidateResource}
import ch.epfl.bluebrain.nexus.delta.sdk.resources.{DetectChange, FetchResource, Resources, ValidateResource}
import ch.epfl.bluebrain.nexus.delta.sdk.schemas.FetchSchema
import ch.epfl.bluebrain.nexus.delta.sdk.schemas.Schemas.SchemaLog
import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.{ScopedEventLog, Transactors}
import ch.epfl.bluebrain.nexus.ship.EventClock
import ch.epfl.bluebrain.nexus.ship.acls.AclWiring
import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverWiring

object ResourceWiring {

def resourceLog(
def apply(
fetchContext: FetchContext,
schemaLog: Clock[IO] => IO[SchemaLog],
fetchSchema: FetchSchema,
config: EventLogConfig,
clock: EventClock,
xas: Transactors
)(implicit
jsonLdApi: JsonLdApi
): Clock[IO] => IO[ResourceLog] = { clock =>
val detectChange = DetectChange(false)
val aclCheck = AclCheck(AclWiring.acls(config, clock, xas))
val resolvers = ResolverWiring.resolvers(fetchContext, config, clock, xas)
): (ResourceLog, FetchResource) = {
val rcr = RemoteContextResolution.never // TODO: Use correct RemoteContextResolution
val detectChange = DetectChange(false)
val aclCheck = AclCheck(AclWiring.acls(config, clock, xas))
val resolvers = ResolverWiring.resolvers(fetchContext, config, clock, xas)
val resourceResolution =
ResourceResolution.schemaResource(aclCheck, resolvers, fetchSchema, excludeDeprecated = false)
val validate = ValidateResource(resourceResolution)(rcr)
val resourceDef = Resources.definition(validate, detectChange, clock)

for {
fetchSchema <- schemaLog(clock).map(FetchSchema(_))
resourceResolution =
ResourceResolution.schemaResource(aclCheck, resolvers, fetchSchema, excludeDeprecated = false)
validate = ValidateResource(resourceResolution)(RemoteContextResolution.never)
resourceDef = Resources.definition(validate, detectChange, clock)
} yield ScopedEventLog(resourceDef, config, xas)
val log = ScopedEventLog(resourceDef, config, xas)
(log, FetchResource(log))
}

}
Loading

0 comments on commit 2b0d888

Please sign in to comment.