Skip to content

Commit

Permalink
Introduce Nexus ship to load events (#4739)
Browse files Browse the repository at this point in the history
* Introduce Nexus ship to load events

---------

Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas authored Feb 15, 2024
1 parent 6eb7506 commit cb2a450
Show file tree
Hide file tree
Showing 9 changed files with 262 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/workflows/ci-delta-plugins.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ name: Delta Plugins unit tests
on:
pull_request:
paths:
- 'ship/**'
- 'delta/kernel/**'
- 'delta/plugins/**'
- 'delta/rdf/**'
- 'delta/sdk/**'
- 'delta/sourcing-psql/**'
Expand Down
36 changes: 36 additions & 0 deletions .github/workflows/ci-delta-ship.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: Delta Ship unit tests
on:
pull_request:
paths:
- 'delta/kernel/**'
- 'delta/plugins/**'
- 'delta/rdf/**'
- 'delta/sdk/**'
- 'delta/sourcing-psql/**'
- 'delta/testkit/**'
- 'build.sbt'
- 'project/**'
- '.github/workflows/ci-delta-ship.yml'
jobs:
run:
if: github.event_name == 'pull_request'
runs-on: it
timeout-minutes: 20
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
steps:
- name: Setup JDK
uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: '21'
check-latest: true
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Unit tests
run: |
sbt -Dsbt.color=always -Dsbt.supershell=false \
clean \
ship-unit-tests-with-coverage
23 changes: 23 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ val circeVersion = "0.14.6"
val circeOpticsVersion = "0.15.0"
val circeExtrasVersions = "0.14.3"
val classgraphVersion = "4.8.165"
val declineVersion = "2.4.1"
val distageVersion = "1.2.5"
val doobieVersion = "1.0.0-RC5"
val fs2Version = "3.9.4"
Expand Down Expand Up @@ -92,6 +93,7 @@ lazy val circeOptics = "io.circe" %% "circe-optics"
lazy val circeParser = "io.circe" %% "circe-parser" % circeVersion
lazy val classgraph = "io.github.classgraph" % "classgraph" % classgraphVersion
lazy val distageCore = "io.7mind.izumi" %% "distage-core" % distageVersion
lazy val declineEffect = "com.monovore" %% "decline-effect" % declineVersion
lazy val doobiePostgres = "org.tpolecat" %% "doobie-postgres" % doobieVersion
lazy val doobie = Seq(
doobiePostgres,
Expand Down Expand Up @@ -719,6 +721,25 @@ lazy val delta = project
.settings(shared, compilation, noPublish)
.aggregate(kernel, testkit, sourcingPsql, rdf, sdk, app, plugins)

lazy val ship = project
.in(file("ship"))
.settings(
name := "nexus-ship",
moduleName := "nexus-ship"
)
.enablePlugins(UniversalPlugin, JavaAppPackaging, JavaAgent, DockerPlugin, BuildInfoPlugin)
.settings(shared, compilation, servicePackaging, assertJavaVersion, kamonSettings, coverage, release)
.dependsOn(sdk % "compile->compile;test->test", testkit % "test->compile")
.settings(
libraryDependencies ++= Seq(declineEffect),
addCompilerPlugin(betterMonadicFor),
run / fork := true,
buildInfoKeys := Seq[BuildInfoKey](version),
buildInfoPackage := "ch.epfl.bluebrain.nexus.delta.ship",
Docker / packageName := "nexus-ship",
coverageFailOnMinimum := false
)

lazy val cargo = taskKey[(File, String)]("Run Cargo to build 'nexus-fixer'")

lazy val storage = project
Expand Down Expand Up @@ -1061,4 +1082,6 @@ addCommandAlias("app-unit-tests", runTestsCommandsForModules(List("app")))
addCommandAlias("app-unit-tests-with-coverage", runTestsWithCoverageCommandsForModules(List("app")))
addCommandAlias("plugins-unit-tests", runTestsCommandsForModules(List("plugins")))
addCommandAlias("plugins-unit-tests-with-coverage", runTestsWithCoverageCommandsForModules(List("plugins")))
addCommandAlias("ship-unit-tests", runTestsCommandsForModules(List("ship")))
addCommandAlias("ship-unit-tests-with-coverage", runTestsWithCoverageCommandsForModules(List("ship")))
addCommandAlias("integration-tests", runTestsCommandsForModules(List("tests")))
32 changes: 32 additions & 0 deletions ship/src/main/resources/default.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
ship {
database {
read = ${ship.database.access}
# Access to database for write access
write = ${ship.database.access}
# Access to database for streaming access (indexing / SSEs)
streaming = ${ship.database.access}

# when true it creates the tables on service boot
tables-autocreate = false

cache {
# The max number of tokens in the partition cache
max-size = 1000
# The duration after an entry in the cache expires
expire-after = 10 minutes
}

access {
# the database host
host = 127.0.0.1
# the database port
port = 5432
# the pool size
pool-size = 10
}

name = "postgres"
username = "postgres"
password = "postgres"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package ch.epfl.bluebrain.nexus.ship

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.sourcing.event.Event.ScopedEvent
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.ship.model.InputEvent
import fs2.Stream
import io.circe.Decoder

/**
* Process events for the defined resource type
*/
trait EventProcessor[Event <: ScopedEvent] {

def resourceType: Label

def decoder: Decoder[Event]

def evaluate(event: Event): IO[Unit]

def evaluate(event: InputEvent): IO[Unit] =
IO.fromEither(decoder.decodeJson(event.value)).flatMap(evaluate)
}

object EventProcessor {

private val logger = Logger[EventProcessor.type ]

def run(processors: List[EventProcessor[_]], eventStream: Stream[IO, InputEvent]): IO[Offset] = {
val processorsMap = processors.foldLeft(Map.empty[Label, EventProcessor[_]]) {
(acc, processor) => acc + (processor.resourceType -> processor)
}
eventStream.evalTap { event =>
processorsMap.get(event.`type`) match {
case Some(processor) => processor.evaluate(event)
case None => logger.warn(s"No processor is provided for '${event.`type`}', skipping...")
}
}.scan((0, Offset.start)){ case ((count, _), event) => (count + 1, event.ordering)
}.compile.lastOrError.flatMap { case (count, offset) =>
logger.info(s"$count events were imported up to offset $offset").as(offset)
}
}

}
54 changes: 54 additions & 0 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package ch.epfl.bluebrain.nexus.ship

import cats.effect.{ExitCode, IO}
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.ship.BuildInfo
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.ship.config.ShipConfig
import ch.epfl.bluebrain.nexus.ship.model.InputEvent
import com.monovore.decline.Opts
import com.monovore.decline.effect.CommandIOApp
import fs2.Stream
import fs2.io.file.{Files, Path}
import io.circe.parser._

object Main
extends CommandIOApp(
name = "ship",
header = "Nexus Ship",
version = BuildInfo.version
) {

private val logger = Logger[Main.type]

private val inputFile: Opts[Path] = Opts.option[String]("file", help = "The data file containing the imports.").map(Path(_))

private val configFile: Opts[Option[Path]] = Opts.option[String]("config", help = "The configuration file.").map(Path(_)).orNone

private val offset: Opts[Offset] = Opts
.option[Long]("offset", help = "To perform an incremental import from the provided offset.")
.map(Offset.at).withDefault(Offset.start)

private val runProcess = Opts.subcommand("run", "Run an import") {
(inputFile, configFile, offset).mapN(Run)
}

override def main: Opts[IO[ExitCode]] = runProcess.map {
case Run(file, config, offset) =>
for {
_ <- logger.info(s"Running the import with file $file, config $config and from offset $offset")
config <- ShipConfig.load(config)
events = eventStream(file)
_ <- Transactors.init(config.database).use {
_ => EventProcessor.run(List.empty, events)
}
} yield ExitCode.Success
}

private def eventStream(file: Path): Stream[IO, InputEvent] = Files[IO].readUtf8Lines(file).map(decode[InputEvent]).rethrow

private final case class Run(file: Path, config: Option[Path], offset: Offset)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package ch.epfl.bluebrain.nexus.ship.config

import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.sourcing.config.DatabaseConfig
import com.typesafe.config.{ConfigFactory, ConfigParseOptions, ConfigResolveOptions}
import fs2.io.file.Path
import pureconfig.error.ConfigReaderException
import pureconfig.generic.semiauto.deriveReader
import pureconfig.{ConfigReader, ConfigSource}

final case class ShipConfig(database: DatabaseConfig)

object ShipConfig {

private val parseOptions = ConfigParseOptions.defaults().setAllowMissing(false)
private val resolverOptions = ConfigResolveOptions.defaults()

implicit final val shipConfigReader: ConfigReader[ShipConfig] =
deriveReader[ShipConfig]

def load(externalConfigPath: Option[Path]) =
for {
externalConfig <- IO.blocking(externalConfigPath.fold(ConfigFactory.empty()) { path =>
ConfigFactory.parseFile(path.toNioPath.toFile, parseOptions)
})
defaultConfig <- IO.blocking(ConfigFactory.parseResources("default.conf", parseOptions))
merged = (externalConfig, defaultConfig).foldLeft(ConfigFactory.defaultOverrides())(_ withFallback _).withFallback(ConfigFactory.load())
.resolve(resolverOptions)
config <- IO.fromEither(ConfigSource.fromConfig(merged).at("ship").load[ShipConfig].leftMap(ConfigReaderException(_)))
} yield config
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package ch.epfl.bluebrain.nexus.ship.model

import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import io.circe.{Decoder, Json}

import java.time.Instant
import scala.annotation.nowarn

final case class InputEvent(ordering: Offset,
`type`: Label,
org: Label,
project: Label,
id: Iri, rev: Int,
value: Json,
instant: Instant)

object InputEvent {

@nowarn("cat=unused")
implicit final val elasticSearchViewValueEncoder: Decoder[InputEvent] = {
import io.circe.generic.extras.Configuration
import io.circe.generic.extras.semiauto._
implicit val config: Configuration = Configuration.default
deriveConfiguredDecoder[InputEvent]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package ch.epfl.bluebrain.nexus.ship.config

import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite

class ShipConfigSuite extends NexusSuite {

test("Default configuration should be parsed and loaded") {
ShipConfig.load(None).assert(_ => true)
}
}

0 comments on commit cb2a450

Please sign in to comment.