From cb2a450ab34f2fafa7cbe03422114feee393edb7 Mon Sep 17 00:00:00 2001 From: Simon Date: Thu, 15 Feb 2024 11:45:46 +0100 Subject: [PATCH] Introduce Nexus ship to load events (#4739) * Introduce Nexus ship to load events --------- Co-authored-by: Simon Dumas --- .github/workflows/ci-delta-plugins.yml | 2 +- .github/workflows/ci-delta-ship.yml | 36 +++++++++++++ build.sbt | 23 ++++++++ ship/src/main/resources/default.conf | 32 +++++++++++ .../bluebrain/nexus/ship/EventProcessor.scala | 46 ++++++++++++++++ .../ch/epfl/bluebrain/nexus/ship/Main.scala | 54 +++++++++++++++++++ .../nexus/ship/config/ShipConfig.scala | 32 +++++++++++ .../nexus/ship/model/InputEvent.scala | 28 ++++++++++ .../nexus/ship/config/ShipConfigSuite.scala | 10 ++++ 9 files changed, 262 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/ci-delta-ship.yml create mode 100644 ship/src/main/resources/default.conf create mode 100644 ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventProcessor.scala create mode 100644 ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala create mode 100644 ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfig.scala create mode 100644 ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/model/InputEvent.scala create mode 100644 ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigSuite.scala diff --git a/.github/workflows/ci-delta-plugins.yml b/.github/workflows/ci-delta-plugins.yml index 93ab0b6e88..3db5b684d5 100644 --- a/.github/workflows/ci-delta-plugins.yml +++ b/.github/workflows/ci-delta-plugins.yml @@ -2,8 +2,8 @@ name: Delta Plugins unit tests on: pull_request: paths: + - 'ship/**' - 'delta/kernel/**' - - 'delta/plugins/**' - 'delta/rdf/**' - 'delta/sdk/**' - 'delta/sourcing-psql/**' diff --git a/.github/workflows/ci-delta-ship.yml b/.github/workflows/ci-delta-ship.yml new file mode 100644 index 0000000000..d364d6e060 --- /dev/null +++ b/.github/workflows/ci-delta-ship.yml @@ -0,0 +1,36 @@ +name: Delta Ship unit tests +on: + pull_request: + paths: + - 'delta/kernel/**' + - 'delta/plugins/**' + - 'delta/rdf/**' + - 'delta/sdk/**' + - 'delta/sourcing-psql/**' + - 'delta/testkit/**' + - 'build.sbt' + - 'project/**' + - '.github/workflows/ci-delta-ship.yml' +jobs: + run: + if: github.event_name == 'pull_request' + runs-on: it + timeout-minutes: 20 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + steps: + - name: Setup JDK + uses: actions/setup-java@v4 + with: + distribution: 'temurin' + java-version: '21' + check-latest: true + - name: Checkout + uses: actions/checkout@v4 + with: + fetch-depth: 0 + - name: Unit tests + run: | + sbt -Dsbt.color=always -Dsbt.supershell=false \ + clean \ + ship-unit-tests-with-coverage diff --git a/build.sbt b/build.sbt index 4c04d2c3ac..e3d7cc3f44 100755 --- a/build.sbt +++ b/build.sbt @@ -33,6 +33,7 @@ val circeVersion = "0.14.6" val circeOpticsVersion = "0.15.0" val circeExtrasVersions = "0.14.3" val classgraphVersion = "4.8.165" +val declineVersion = "2.4.1" val distageVersion = "1.2.5" val doobieVersion = "1.0.0-RC5" val fs2Version = "3.9.4" @@ -92,6 +93,7 @@ lazy val circeOptics = "io.circe" %% "circe-optics" lazy val circeParser = "io.circe" %% "circe-parser" % circeVersion lazy val classgraph = "io.github.classgraph" % "classgraph" % classgraphVersion lazy val distageCore = "io.7mind.izumi" %% "distage-core" % distageVersion +lazy val declineEffect = "com.monovore" %% "decline-effect" % declineVersion lazy val doobiePostgres = "org.tpolecat" %% "doobie-postgres" % doobieVersion lazy val doobie = Seq( doobiePostgres, @@ -719,6 +721,25 @@ lazy val delta = project .settings(shared, compilation, noPublish) .aggregate(kernel, testkit, sourcingPsql, rdf, sdk, app, plugins) +lazy val ship = project + .in(file("ship")) + .settings( + name := "nexus-ship", + moduleName := "nexus-ship" + ) + .enablePlugins(UniversalPlugin, JavaAppPackaging, JavaAgent, DockerPlugin, BuildInfoPlugin) + .settings(shared, compilation, servicePackaging, assertJavaVersion, kamonSettings, coverage, release) + .dependsOn(sdk % "compile->compile;test->test", testkit % "test->compile") + .settings( + libraryDependencies ++= Seq(declineEffect), + addCompilerPlugin(betterMonadicFor), + run / fork := true, + buildInfoKeys := Seq[BuildInfoKey](version), + buildInfoPackage := "ch.epfl.bluebrain.nexus.delta.ship", + Docker / packageName := "nexus-ship", + coverageFailOnMinimum := false + ) + lazy val cargo = taskKey[(File, String)]("Run Cargo to build 'nexus-fixer'") lazy val storage = project @@ -1061,4 +1082,6 @@ addCommandAlias("app-unit-tests", runTestsCommandsForModules(List("app"))) addCommandAlias("app-unit-tests-with-coverage", runTestsWithCoverageCommandsForModules(List("app"))) addCommandAlias("plugins-unit-tests", runTestsCommandsForModules(List("plugins"))) addCommandAlias("plugins-unit-tests-with-coverage", runTestsWithCoverageCommandsForModules(List("plugins"))) +addCommandAlias("ship-unit-tests", runTestsCommandsForModules(List("ship"))) +addCommandAlias("ship-unit-tests-with-coverage", runTestsWithCoverageCommandsForModules(List("ship"))) addCommandAlias("integration-tests", runTestsCommandsForModules(List("tests"))) diff --git a/ship/src/main/resources/default.conf b/ship/src/main/resources/default.conf new file mode 100644 index 0000000000..28e2cfcdbf --- /dev/null +++ b/ship/src/main/resources/default.conf @@ -0,0 +1,32 @@ +ship { + database { + read = ${ship.database.access} + # Access to database for write access + write = ${ship.database.access} + # Access to database for streaming access (indexing / SSEs) + streaming = ${ship.database.access} + + # when true it creates the tables on service boot + tables-autocreate = false + + cache { + # The max number of tokens in the partition cache + max-size = 1000 + # The duration after an entry in the cache expires + expire-after = 10 minutes + } + + access { + # the database host + host = 127.0.0.1 + # the database port + port = 5432 + # the pool size + pool-size = 10 + } + + name = "postgres" + username = "postgres" + password = "postgres" + } +} \ No newline at end of file diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventProcessor.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventProcessor.scala new file mode 100644 index 0000000000..fe62471221 --- /dev/null +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventProcessor.scala @@ -0,0 +1,46 @@ +package ch.epfl.bluebrain.nexus.ship + +import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.kernel.Logger +import ch.epfl.bluebrain.nexus.delta.sourcing.event.Event.ScopedEvent +import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label +import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.ship.model.InputEvent +import fs2.Stream +import io.circe.Decoder + +/** + * Process events for the defined resource type + */ +trait EventProcessor[Event <: ScopedEvent] { + + def resourceType: Label + + def decoder: Decoder[Event] + + def evaluate(event: Event): IO[Unit] + + def evaluate(event: InputEvent): IO[Unit] = + IO.fromEither(decoder.decodeJson(event.value)).flatMap(evaluate) +} + +object EventProcessor { + + private val logger = Logger[EventProcessor.type ] + + def run(processors: List[EventProcessor[_]], eventStream: Stream[IO, InputEvent]): IO[Offset] = { + val processorsMap = processors.foldLeft(Map.empty[Label, EventProcessor[_]]) { + (acc, processor) => acc + (processor.resourceType -> processor) + } + eventStream.evalTap { event => + processorsMap.get(event.`type`) match { + case Some(processor) => processor.evaluate(event) + case None => logger.warn(s"No processor is provided for '${event.`type`}', skipping...") + } + }.scan((0, Offset.start)){ case ((count, _), event) => (count + 1, event.ordering) + }.compile.lastOrError.flatMap { case (count, offset) => + logger.info(s"$count events were imported up to offset $offset").as(offset) + } + } + +} diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala new file mode 100644 index 0000000000..5bdef5af0f --- /dev/null +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala @@ -0,0 +1,54 @@ +package ch.epfl.bluebrain.nexus.ship + +import cats.effect.{ExitCode, IO} +import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.kernel.Logger +import ch.epfl.bluebrain.nexus.delta.ship.BuildInfo +import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors +import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.ship.config.ShipConfig +import ch.epfl.bluebrain.nexus.ship.model.InputEvent +import com.monovore.decline.Opts +import com.monovore.decline.effect.CommandIOApp +import fs2.Stream +import fs2.io.file.{Files, Path} +import io.circe.parser._ + +object Main + extends CommandIOApp( + name = "ship", + header = "Nexus Ship", + version = BuildInfo.version + ) { + + private val logger = Logger[Main.type] + + private val inputFile: Opts[Path] = Opts.option[String]("file", help = "The data file containing the imports.").map(Path(_)) + + private val configFile: Opts[Option[Path]] = Opts.option[String]("config", help = "The configuration file.").map(Path(_)).orNone + + private val offset: Opts[Offset] = Opts + .option[Long]("offset", help = "To perform an incremental import from the provided offset.") + .map(Offset.at).withDefault(Offset.start) + + private val runProcess = Opts.subcommand("run", "Run an import") { + (inputFile, configFile, offset).mapN(Run) + } + + override def main: Opts[IO[ExitCode]] = runProcess.map { + case Run(file, config, offset) => + for { + _ <- logger.info(s"Running the import with file $file, config $config and from offset $offset") + config <- ShipConfig.load(config) + events = eventStream(file) + _ <- Transactors.init(config.database).use { + _ => EventProcessor.run(List.empty, events) + } + } yield ExitCode.Success + } + + private def eventStream(file: Path): Stream[IO, InputEvent] = Files[IO].readUtf8Lines(file).map(decode[InputEvent]).rethrow + + private final case class Run(file: Path, config: Option[Path], offset: Offset) + +} diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfig.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfig.scala new file mode 100644 index 0000000000..03dd3a12bc --- /dev/null +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfig.scala @@ -0,0 +1,32 @@ +package ch.epfl.bluebrain.nexus.ship.config + +import cats.effect.IO +import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.sourcing.config.DatabaseConfig +import com.typesafe.config.{ConfigFactory, ConfigParseOptions, ConfigResolveOptions} +import fs2.io.file.Path +import pureconfig.error.ConfigReaderException +import pureconfig.generic.semiauto.deriveReader +import pureconfig.{ConfigReader, ConfigSource} + +final case class ShipConfig(database: DatabaseConfig) + +object ShipConfig { + + private val parseOptions = ConfigParseOptions.defaults().setAllowMissing(false) + private val resolverOptions = ConfigResolveOptions.defaults() + + implicit final val shipConfigReader: ConfigReader[ShipConfig] = + deriveReader[ShipConfig] + + def load(externalConfigPath: Option[Path]) = + for { + externalConfig <- IO.blocking(externalConfigPath.fold(ConfigFactory.empty()) { path => + ConfigFactory.parseFile(path.toNioPath.toFile, parseOptions) + }) + defaultConfig <- IO.blocking(ConfigFactory.parseResources("default.conf", parseOptions)) + merged = (externalConfig, defaultConfig).foldLeft(ConfigFactory.defaultOverrides())(_ withFallback _).withFallback(ConfigFactory.load()) + .resolve(resolverOptions) + config <- IO.fromEither(ConfigSource.fromConfig(merged).at("ship").load[ShipConfig].leftMap(ConfigReaderException(_))) + } yield config +} diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/model/InputEvent.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/model/InputEvent.scala new file mode 100644 index 0000000000..70728b3227 --- /dev/null +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/model/InputEvent.scala @@ -0,0 +1,28 @@ +package ch.epfl.bluebrain.nexus.ship.model + +import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri +import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label +import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import io.circe.{Decoder, Json} + +import java.time.Instant +import scala.annotation.nowarn + +final case class InputEvent(ordering: Offset, + `type`: Label, + org: Label, + project: Label, + id: Iri, rev: Int, + value: Json, + instant: Instant) + +object InputEvent { + + @nowarn("cat=unused") + implicit final val elasticSearchViewValueEncoder: Decoder[InputEvent] = { + import io.circe.generic.extras.Configuration + import io.circe.generic.extras.semiauto._ + implicit val config: Configuration = Configuration.default + deriveConfiguredDecoder[InputEvent] + } +} diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigSuite.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigSuite.scala new file mode 100644 index 0000000000..cdec394250 --- /dev/null +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigSuite.scala @@ -0,0 +1,10 @@ +package ch.epfl.bluebrain.nexus.ship.config + +import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite + +class ShipConfigSuite extends NexusSuite { + + test("Default configuration should be parsed and loaded") { + ShipConfig.load(None).assert(_ => true) + } +}