From d2b7b413f3b9143a0ba25bd17c627ce3bfd7a9c5 Mon Sep 17 00:00:00 2001 From: Oliver <20188437+olivergrabinski@users.noreply.github.com> Date: Tue, 2 Apr 2024 09:48:46 +0200 Subject: [PATCH] Add incremental import option (#4813) --- .../nexus/delta/sourcing/Transactors.scala | 2 +- .../delta/sourcing/postgres/Doobie.scala | 2 +- .../ch/epfl/bluebrain/nexus/ship/Main.scala | 4 +- .../epfl/bluebrain/nexus/ship/RunShip.scala | 21 ++++--- .../test/resources/import/two-projects.json | 2 + .../{MainSuite.scala => RunShipSuite.scala} | 59 +++++++++++++++---- 6 files changed, 66 insertions(+), 24 deletions(-) create mode 100644 ship/src/test/resources/import/two-projects.json rename ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/{MainSuite.scala => RunShipSuite.scala} (51%) diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Transactors.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Transactors.scala index 4efa8a459f..110a08df7f 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Transactors.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Transactors.scala @@ -69,7 +69,7 @@ object Transactors { /** * For testing purposes, drop the current tables and then executes the different available scripts */ - private[sourcing] def dropAndCreateDDLs: IO[List[String]] = ddls.map(dropScript :: _) + def dropAndCreateDDLs: IO[List[String]] = ddls.map(dropScript :: _) /** Type of a cache that contains the hashed names of the projectRefs for which a partition was already created. */ type PartitionsCache = LocalCache[String, Unit] diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/postgres/Doobie.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/postgres/Doobie.scala index d910e7924b..a87e5cb068 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/postgres/Doobie.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/postgres/Doobie.scala @@ -17,7 +17,7 @@ object Doobie { val PostgresUser = "postgres" val PostgresPassword = "postgres" - private def transactors( + def transactors( postgres: Resource[IO, PostgresContainer], user: String, pass: String diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala index 66e1cbdc9d..cb776417f8 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala @@ -41,8 +41,8 @@ object Main override def main: Opts[IO[ExitCode]] = (run orElse showConfig) .map { - case Run(file, config, _) => new RunShip().run(file, config) - case ShowConfig(config) => showConfig(config) + case Run(file, config, offset) => new RunShip().run(file, config, offset) + case ShowConfig(config) => showConfig(config) } .map(_.as(ExitCode.Success)) diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala index c38f78a7b4..9f8faefca6 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala @@ -9,6 +9,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ApiMappings import ch.epfl.bluebrain.nexus.delta.sdk.quotas.Quotas import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors +import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.ship.config.ShipConfig import ch.epfl.bluebrain.nexus.ship.model.InputEvent import ch.epfl.bluebrain.nexus.ship.organizations.OrganizationProvider @@ -25,7 +26,7 @@ class RunShip { private val logger = Logger[RunShip] - def run(file: Path, config: Option[Path]): IO[ImportReport] = { + def run(file: Path, config: Option[Path], fromOffset: Offset = Offset.start): IO[ImportReport] = { val clock = Clock[IO] val uuidF = UUIDF.random // Resources may have been created with different configurations so we adopt the lenient one for the import @@ -42,7 +43,7 @@ class RunShip { for { // Provision organizations _ <- orgProvider.create(config.organizations.values) - events = eventStream(file) + events = eventStream(file, fromOffset) fetchActiveOrg = FetchActiveOrganization(xas) // Wiring eventClock <- EventClock.init() @@ -83,11 +84,17 @@ class RunShip { } yield report } - private def eventStream(file: Path): Stream[IO, InputEvent] = - Files[IO].readUtf8Lines(file).zipWithIndex.evalMap { case (line, index) => - IO.fromEither(decode[InputEvent](line)).onError { err => - logger.error(err)(s"Error parsing to event at line $index") + private def eventStream(file: Path, fromOffset: Offset): Stream[IO, InputEvent] = + Files[IO] + .readUtf8Lines(file) + .zipWithIndex + .evalMap { case (line, index) => + IO.fromEither(decode[InputEvent](line)).onError { err => + logger.error(err)(s"Error parsing to event at line $index") + } + } + .filter { event => + event.ordering.value >= fromOffset.value } - } } diff --git a/ship/src/test/resources/import/two-projects.json b/ship/src/test/resources/import/two-projects.json new file mode 100644 index 0000000000..87c04ac9d7 --- /dev/null +++ b/ship/src/test/resources/import/two-projects.json @@ -0,0 +1,2 @@ +{"ordering":1,"type":"project","org":"public","project":"sscx","id":"projects/public/sscx","rev":1,"value":{"rev": 1, "base": "https://bbp.epfl.ch/neurosciencegraph/data/", "uuid": "c7d70522-4305-480a-b190-75d757ed9a49", "@type": "ProjectCreated", "label": "sscx", "vocab": "https://bbp.epfl.ch/nexus/v1/resources/public/sscx/_/", "instant": "2020-01-22T16:03:33.105Z", "subject": {"@type": "User", "realm": "bbp", "subject": "bob"}, "apiMappings": {"prov": "http://www.w3.org/ns/prov#", "context": "https://incf.github.io/neuroshapes/contexts/", "schemaorg": "http://schema.org/", "datashapes": "https://neuroshapes.org/dash/", "ontologies": "https://neuroshapes.org/dash/ontology", "taxonomies": "https://neuroshapes.org/dash/taxonomy", "commonshapes": "https://neuroshapes.org/commons/", "provdatashapes": "https://provshapes.org/datashapes/", "provcommonshapes": "https://provshapes.org/commons/"}, "description": "This project contains somatosensorycortex dissemination publication data.", "organizationUuid": "d098a020-508b-4131-a28d-75f73b7f5f0e", "organizationLabel": "public"},"instant":"2020-01-22T17:03:33.105+01:00"} +{"ordering":2,"type":"project","org":"public","project":"other","id":"projects/public/other","rev":1,"value":{"rev": 1, "base": "https://bbp.epfl.ch/neurosciencegraph/data/", "uuid": "c7d70522-4305-480a-b190-75d757ed9a49", "@type": "ProjectCreated", "label": "other", "vocab": "https://bbp.epfl.ch/nexus/v1/resources/public/other/_/", "instant": "2020-01-22T16:03:33.105Z", "subject": {"@type": "User", "realm": "bbp", "subject": "bob"}, "apiMappings": {"prov": "http://www.w3.org/ns/prov#", "context": "https://incf.github.io/neuroshapes/contexts/", "schemaorg": "http://schema.org/", "datashapes": "https://neuroshapes.org/dash/", "ontologies": "https://neuroshapes.org/dash/ontology", "taxonomies": "https://neuroshapes.org/dash/taxonomy", "commonshapes": "https://neuroshapes.org/commons/", "provdatashapes": "https://provshapes.org/datashapes/", "provcommonshapes": "https://provshapes.org/commons/"}, "description": "This is a second project.", "organizationUuid": "d098a020-508b-4131-a28d-75f73b7f5f0e", "organizationLabel": "public"},"instant":"2020-01-22T17:03:33.105+01:00"} \ No newline at end of file diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/MainSuite.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/RunShipSuite.scala similarity index 51% rename from ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/MainSuite.scala rename to ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/RunShipSuite.scala index f2dcc8279e..08593cc71f 100644 --- a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/MainSuite.scala +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/RunShipSuite.scala @@ -6,22 +6,32 @@ import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceLoader import ch.epfl.bluebrain.nexus.delta.sdk.projects.Projects import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.Resolvers import ch.epfl.bluebrain.nexus.delta.sdk.resources.Resources +import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset -import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie.{PostgresPassword, PostgresUser} +import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie.{PostgresPassword, PostgresUser, transactors} import ch.epfl.bluebrain.nexus.ship.ImportReport.Count +import ch.epfl.bluebrain.nexus.ship.RunShipSuite.clearDB import ch.epfl.bluebrain.nexus.testkit.config.SystemPropertyOverride import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite import ch.epfl.bluebrain.nexus.testkit.postgres.PostgresContainer +import doobie.implicits._ import fs2.io.file.Path -import munit.{AnyFixture, CatsEffectSuite} import munit.catseffect.IOFixture +import munit.{AnyFixture, CatsEffectSuite} import java.time.Instant -class MainSuite extends NexusSuite with MainSuite.Fixture { +class RunShipSuite extends NexusSuite with RunShipSuite.Fixture { override def munitFixtures: Seq[AnyFixture[_]] = List(mainFixture) + private lazy val xas = mainFixture() + + override def beforeEach(context: BeforeEach): Unit = { + super.beforeEach(context) + clearDB(xas).accepted + () + } test("Run import") { val expected = ImportReport( @@ -35,18 +45,41 @@ class MainSuite extends NexusSuite with MainSuite.Fixture { ) ) for { - importFile <- ClasspathResourceLoader().absolutePath("import/import.json").map(Path(_)) + importFile <- asPath("import/import.json") _ <- new RunShip().run(importFile, None).assertEquals(expected) } yield () } + test("Test the increment") { + for { + importFileWithTwoProjects <- asPath("import/two-projects.json") + startFrom = Offset.at(2) + _ <- new RunShip().run(importFileWithTwoProjects, None, startFrom).map { report => + assert(report.offset == Offset.at(2L)) + assert(thereIsOneProjectEventIn(report)) + } + } yield () + } + test("Show config") { Main.showConfig(None) } + private def asPath(path: String): IO[Path] = { + ClasspathResourceLoader().absolutePath(path).map(Path(_)) + } + + private def thereIsOneProjectEventIn(report: ImportReport) = + report.progress == Map(Projects.entityType -> Count(1L, 0L)) + } -object MainSuite { +object RunShipSuite { + + def clearDB(xas: Transactors) = + sql""" + | DELETE FROM scoped_events; DELETE FROM scoped_states; + |""".stripMargin.update.run.void.transact(xas.write) trait Fixture { self: CatsEffectSuite => @@ -58,15 +91,15 @@ object MainSuite { "ship.organizations.values.public" -> "The public organization" ) - private def resource(): Resource[IO, Unit] = - PostgresContainer - .resource(PostgresUser, PostgresPassword) - .flatMap { postgres => - SystemPropertyOverride(initConfig(postgres)) - } - .void + private val resource: Resource[IO, Transactors] = transactors( + PostgresContainer.resource(PostgresUser, PostgresPassword).flatTap { pg => + SystemPropertyOverride(initConfig(pg)).void + }, + PostgresUser, + PostgresPassword + ) - val mainFixture: IOFixture[Unit] = ResourceSuiteLocalFixture("main", resource()) + val mainFixture: IOFixture[Transactors] = ResourceSuiteLocalFixture("main", resource) } }