Skip to content

Commit

Permalink
Add ResourceProcessor, SchemaProcessor; add E2E tests for ship (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
olivergrabinski authored Mar 22, 2024
1 parent f6986f9 commit 834de7f
Show file tree
Hide file tree
Showing 28 changed files with 799 additions and 81 deletions.
14 changes: 13 additions & 1 deletion .github/workflows/ci-delta-ship.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,27 @@ jobs:
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Create /tmp/ship folder
run: mkdir -p /tmp/ship
- name: Setup JDK
uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: '21'
cache: 'sbt'
check-latest: true
- name: Clean, build Delta & Storage images
run: |
sbt -Dsbt.color=always -Dsbt.supershell=false \
app/Docker/publishLocal
- name: Start services
run: docker-compose -f tests/docker/docker-compose.yml up -d
- name: Waiting for Delta to start
run: |
URL="http://localhost:8080/v1/version"
curl --connect-timeout 3 --max-time 5 --retry 30 --retry-all-errors --retry-delay 3 --retry-max-time 90 $URL
- name: Unit tests
run: |
sbt -Dsbt.color=always -Dsbt.supershell=false \
clean \
ship-unit-tests-with-coverage
ship-unit-tests
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ lazy val ship = project
)
.enablePlugins(UniversalPlugin, JavaAppPackaging, JavaAgent, DockerPlugin, BuildInfoPlugin)
.settings(shared, compilation, servicePackaging, assertJavaVersion, kamonSettings, coverage, release)
.dependsOn(sdk % "compile->compile;test->test", testkit % "test->compile")
.dependsOn(sdk % "compile->compile;test->test", tests % "test->test")
.settings(
libraryDependencies ++= Seq(declineEffect),
addCompilerPlugin(betterMonadicFor),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object AclsModule extends ModuleDef {
permissions.fetchPermissionSet,
AclsImpl.findUnknownRealms(xas),
permissions.minimum,
config.acls,
config.acls.eventLog,
xas,
clock
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.JsonLdContext.keywords
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress.{Organization, Project, Root}
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.{Acl, AclAddress}
import ch.epfl.bluebrain.nexus.delta.sdk.acls.{AclCheck, Acls, AclsConfig, AclsImpl}
import ch.epfl.bluebrain.nexus.delta.sdk.acls.{AclCheck, Acls, AclsImpl}
import ch.epfl.bluebrain.nexus.delta.sdk.identities.IdentitiesDummy
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
import ch.epfl.bluebrain.nexus.delta.sdk.implicits._
Expand Down Expand Up @@ -90,7 +90,7 @@ class AclsRoutesSpec extends BaseRouteSpec {
IO.pure(Set(aclsPermissions.read, aclsPermissions.write, managePermission, events.read)),
Acls.findUnknownRealms(_, Set(realm1, realm2)),
Set(aclsPermissions.read, aclsPermissions.write, managePermission, events.read),
AclsConfig(EventLogConfig(QueryConfig(5, RefreshStrategy.Stop), 100.millis)),
EventLogConfig(QueryConfig(5, RefreshStrategy.Stop), 100.millis),
xas,
clock
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.permissions.model.Permission
import ch.epfl.bluebrain.nexus.delta.sdk.realms.Realms
import ch.epfl.bluebrain.nexus.delta.sdk.syntax._
import ch.epfl.bluebrain.nexus.delta.sourcing._
import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label
import ch.epfl.bluebrain.nexus.delta.sourcing.state.GlobalStateStore
Expand Down Expand Up @@ -108,12 +109,12 @@ object AclsImpl {
fetchPermissionSet: IO[Set[Permission]],
findUnknownRealms: Set[Label] => IO[Unit],
minimum: Set[Permission],
config: AclsConfig,
config: EventLogConfig,
xas: Transactors,
clock: Clock[IO]
): Acls =
new AclsImpl(
GlobalEventLog(Acls.definition(fetchPermissionSet, findUnknownRealms, clock), config.eventLog, xas),
GlobalEventLog(Acls.definition(fetchPermissionSet, findUnknownRealms, clock), config, xas),
minimum
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class AclsImplSpec extends CatsEffectSpec with DoobieScalaTestFixture with Cance
IO.pure(minimumPermissions),
Acls.findUnknownRealms(_, Set(realm, realm2)),
minimumPermissions,
AclsConfig(eventLogConfig),
eventLogConfig,
xas,
clock
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class OrganizationDeleterSuite extends NexusSuite with ConfigFixtures with Proje
private val fields = ProjectFields(None, ApiMappings.empty, None, None)
private lazy val orgs = OrganizationsImpl(ScopeInitializer.noop, eventLogConfig, xas, clock)
private val permission = Permissions.resources.read
private lazy val acls = AclsImpl(IO.pure(Set(permission)), _ => IO.unit, Set(), aclsConfig, xas, clock)
private lazy val acls = AclsImpl(IO.pure(Set(permission)), _ => IO.unit, Set(), aclsConfig.eventLog, xas, clock)

implicit val subject: Subject = Identity.User("Bob", Label.unsafe("realm"))
implicit val uuidF: UUIDF = UUIDF.fixed(UUID.randomUUID())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.sdk.projects
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.sdk.ConfigFixtures
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.{Acl, AclAddress}
import ch.epfl.bluebrain.nexus.delta.sdk.acls.{Acls, AclsConfig, AclsImpl}
import ch.epfl.bluebrain.nexus.delta.sdk.acls.{Acls, AclsImpl}
import ch.epfl.bluebrain.nexus.delta.sdk.generators.{OrganizationGen, PermissionsGen, ProjectGen}
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.{Caller, ServiceAccount}
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions
Expand All @@ -22,7 +22,7 @@ class OwnerPermissionsScopeInitializationSpec extends CatsEffectSpec with Doobie
IO.pure(PermissionsGen.minimum),
Acls.findUnknownRealms(_, Set(saRealm, usersRealm)),
PermissionsGen.minimum,
AclsConfig(eventLogConfig),
eventLogConfig,
xas,
clock
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ trait CatsIOValues {

implicit final class CatsIOValuesOps[A](private val io: IO[A]) {
def accepted(implicit pos: source.Position): A = {
io.attempt.unsafeRunTimed(45.seconds).getOrElse(fail("IO timed out during .accepted call")) match {
case Left(e) => fail(s"IO failed when it was expected to succeed $e.", e)
case Right(value) => value
}
io.unsafeRunTimed(45.seconds).getOrElse(fail("IO timed out during .accepted call"))
}

def rejected(implicit pos: source.Position): Throwable = rejectedWith[Throwable]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package ch.epfl.bluebrain.nexus.ship

import cats.effect.{Clock, IO}
import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceLoader
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.contexts
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext
import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverContextResolution
import ch.epfl.bluebrain.nexus.delta.sdk.resources.FetchResource
import ch.epfl.bluebrain.nexus.delta.sdk.resources.Resources.ResourceLog
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig
import ch.epfl.bluebrain.nexus.ship.acls.AclWiring
import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverWiring

object ContextWiring {

implicit private val loader: ClasspathResourceLoader = ClasspathResourceLoader.withContext(getClass)

def remoteContextResolution: IO[RemoteContextResolution] =
for {
shaclCtx <- ContextValue.fromFile("contexts/shacl.json")
schemasMetaCtx <- ContextValue.fromFile("contexts/schemas-metadata.json")
} yield RemoteContextResolution.fixed(
contexts.shacl -> shaclCtx,
contexts.schemasMetadata -> schemasMetaCtx
)

def resolverContextResolution(
resourceLog: Clock[IO] => IO[ResourceLog],
fetchContext: FetchContext,
config: EventLogConfig,
xas: Transactors
)(implicit jsonLdApi: JsonLdApi): Clock[IO] => IO[ResolverContextResolution] = { clock =>
val aclCheck = AclCheck(AclWiring.acls(config, clock, xas))
val resolvers = ResolverWiring.resolvers(fetchContext, config, clock, xas)

for {
fetchResource <- resourceLog(clock).map(FetchResource(_))
rcr <- remoteContextResolution
} yield ResolverContextResolution(aclCheck, resolvers, rcr, fetchResource)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ class EventClock(instant: Ref[IO, Instant]) extends Clock[IO] {
override def realTime: IO[FiniteDuration] = toDuration

private def toDuration: IO[FiniteDuration] = instant.get.map { i =>
FiniteDuration(i.toEpochMilli, TimeUnit.MILLISECONDS)
val seconds = FiniteDuration(i.getEpochSecond, TimeUnit.SECONDS)
val nanos = FiniteDuration(i.getNano, TimeUnit.NANOSECONDS)
seconds + nanos
}
}

Expand Down
53 changes: 3 additions & 50 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala
Original file line number Diff line number Diff line change
@@ -1,27 +1,14 @@
package ch.epfl.bluebrain.nexus.ship

import cats.effect.{Clock, ExitCode, IO}
import cats.effect.{ExitCode, IO}
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi}
import ch.epfl.bluebrain.nexus.delta.sdk.organizations.FetchActiveOrganization
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ApiMappings
import ch.epfl.bluebrain.nexus.delta.sdk.quotas.Quotas
import ch.epfl.bluebrain.nexus.delta.ship.BuildInfo
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.ship.config.ShipConfig
import ch.epfl.bluebrain.nexus.ship.model.InputEvent
import ch.epfl.bluebrain.nexus.ship.organizations.OrganizationProvider
import ch.epfl.bluebrain.nexus.ship.projects.ProjectProcessor
import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverProcessor
import com.monovore.decline.Opts
import com.monovore.decline.effect.CommandIOApp
import fs2.Stream
import fs2.io.file.{Files, Path}
import io.circe.parser._
import fs2.io.file.Path

object Main
extends CommandIOApp(
Expand Down Expand Up @@ -54,52 +41,18 @@ object Main
override def main: Opts[IO[ExitCode]] =
(run orElse showConfig)
.map {
case Run(file, config, _) => run(file, config)
case Run(file, config, _) => new RunShip().run(file, config)
case ShowConfig(config) => showConfig(config)
}
.map(_.as(ExitCode.Success))

private[ship] def run(file: Path, config: Option[Path]): IO[ImportReport] = {
val clock = Clock[IO]
val uuidF = UUIDF.random
// Resources may have been created with different configurations so we adopt the lenient one for the import
implicit val jsonLdApi: JsonLdApi = JsonLdJavaApi.lenient
for {
_ <- logger.info(s"Running the import with file $file, config $config and from offset $offset")
config <- ShipConfig.load(config)
report <- Transactors.init(config.database).use { xas =>
val orgProvider =
OrganizationProvider(config.eventLog, config.serviceAccount.value, xas, clock)(uuidF)
val fetchContext = FetchContext(ApiMappings.empty, xas, Quotas.disabled)
val eventLogConfig = config.eventLog
val baseUri = config.baseUri
for {
// Provision organizations
_ <- orgProvider.create(config.organizations.values)
events = eventStream(file)
fetchActiveOrg = FetchActiveOrganization(xas)
projectProcessor <- ProjectProcessor(fetchActiveOrg, eventLogConfig, xas)(baseUri)
resolverProcessor <- ResolverProcessor(fetchContext, eventLogConfig, xas)
report <- EventProcessor.run(events, projectProcessor, resolverProcessor)
} yield report
}
} yield report
}

private[ship] def showConfig(config: Option[Path]) =
for {
_ <- logger.info(s"Showing reconciled config")
config <- ShipConfig.merge(config).map(_._2)
_ <- logger.info(config.root().render())
} yield ()

private def eventStream(file: Path): Stream[IO, InputEvent] =
Files[IO].readUtf8Lines(file).zipWithIndex.evalMap { case (line, index) =>
IO.fromEither(decode[InputEvent](line)).onError { err =>
logger.error(err)(s"Error parsing to event at line $index")
}
}

sealed private trait Command

final private case class Run(file: Path, config: Option[Path], offset: Offset) extends Command
Expand Down
76 changes: 76 additions & 0 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package ch.epfl.bluebrain.nexus.ship

import cats.effect.{Clock, IO}
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi}
import ch.epfl.bluebrain.nexus.delta.sdk.organizations.FetchActiveOrganization
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ApiMappings
import ch.epfl.bluebrain.nexus.delta.sdk.quotas.Quotas
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.ship.config.ShipConfig
import ch.epfl.bluebrain.nexus.ship.model.InputEvent
import ch.epfl.bluebrain.nexus.ship.organizations.OrganizationProvider
import ch.epfl.bluebrain.nexus.ship.projects.ProjectProcessor
import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverProcessor
import ch.epfl.bluebrain.nexus.ship.resources.{ResourceProcessor, ResourceWiring}
import ch.epfl.bluebrain.nexus.ship.schemas.{SchemaProcessor, SchemaWiring}
import fs2.Stream
import fs2.io.file.{Files, Path}
import io.circe.parser.decode

class RunShip {

private val logger = Logger[RunShip]

def run(file: Path, config: Option[Path]): IO[ImportReport] = {
val clock = Clock[IO]
val uuidF = UUIDF.random
// Resources may have been created with different configurations so we adopt the lenient one for the import
implicit val jsonLdApi: JsonLdApi = JsonLdJavaApi.lenient
for {
_ <- logger.info(s"Running the import with file $file, config $config")
config <- ShipConfig.load(config)
report <- Transactors.init(config.database).use { xas =>
val orgProvider =
OrganizationProvider(config.eventLog, config.serviceAccount.value, xas, clock)(uuidF)
val fetchContext = FetchContext(ApiMappings.empty, xas, Quotas.disabled)
val eventLogConfig = config.eventLog
val baseUri = config.baseUri
for {
// Provision organizations
_ <- orgProvider.create(config.organizations.values)
events = eventStream(file)
fetchActiveOrg = FetchActiveOrganization(xas)
// Wiring
schemaLog = SchemaWiring.schemaLog(config.eventLog, xas, jsonLdApi)
resourceLog = ResourceWiring.resourceLog(fetchContext, schemaLog, eventLogConfig, xas)
schemaImports = SchemaWiring.schemaImports(
resourceLog,
schemaLog,
fetchContext,
eventLogConfig,
xas
)
rcr = ContextWiring.resolverContextResolution(resourceLog, fetchContext, eventLogConfig, xas)
// Processors
projectProcessor <- ProjectProcessor(fetchActiveOrg, eventLogConfig, xas)(baseUri)
resolverProcessor <- ResolverProcessor(fetchContext, eventLogConfig, xas)
schemaProcessor <- SchemaProcessor(schemaLog, fetchContext, schemaImports, rcr)
resourceProcessor <- ResourceProcessor(resourceLog, fetchContext)
report <- EventProcessor
.run(events, projectProcessor, resolverProcessor, schemaProcessor, resourceProcessor)
} yield report
}
} yield report
}

private def eventStream(file: Path): Stream[IO, InputEvent] =
Files[IO].readUtf8Lines(file).zipWithIndex.evalMap { case (line, index) =>
IO.fromEither(decode[InputEvent](line)).onError { err =>
logger.error(err)(s"Error parsing to event at line $index")
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package ch.epfl.bluebrain.nexus.ship.acls

import cats.effect.{Clock, IO}
import ch.epfl.bluebrain.nexus.delta.sdk.acls.{Acls, AclsImpl}
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.model.Permission
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig

object AclWiring {

def acls(config: EventLogConfig, clock: Clock[IO], xas: Transactors): Acls = {
val permissionSet = Set(Permission.unsafe("resources/read"))
AclsImpl(
IO.pure(permissionSet),
AclsImpl.findUnknownRealms(xas),
permissionSet,
config,
xas,
clock
)
}

}
Loading

0 comments on commit 834de7f

Please sign in to comment.