Skip to content

Commit

Permalink
Add incremental import option (#4813)
Browse files Browse the repository at this point in the history
  • Loading branch information
olivergrabinski authored Apr 2, 2024
1 parent 32f30d6 commit d2b7b41
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ object Transactors {
/**
* For testing purposes, drop the current tables and then executes the different available scripts
*/
private[sourcing] def dropAndCreateDDLs: IO[List[String]] = ddls.map(dropScript :: _)
def dropAndCreateDDLs: IO[List[String]] = ddls.map(dropScript :: _)

/** Type of a cache that contains the hashed names of the projectRefs for which a partition was already created. */
type PartitionsCache = LocalCache[String, Unit]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object Doobie {
val PostgresUser = "postgres"
val PostgresPassword = "postgres"

private def transactors(
def transactors(
postgres: Resource[IO, PostgresContainer],
user: String,
pass: String
Expand Down
4 changes: 2 additions & 2 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ object Main
override def main: Opts[IO[ExitCode]] =
(run orElse showConfig)
.map {
case Run(file, config, _) => new RunShip().run(file, config)
case ShowConfig(config) => showConfig(config)
case Run(file, config, offset) => new RunShip().run(file, config, offset)
case ShowConfig(config) => showConfig(config)
}
.map(_.as(ExitCode.Success))

Expand Down
21 changes: 14 additions & 7 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ApiMappings
import ch.epfl.bluebrain.nexus.delta.sdk.quotas.Quotas
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.ship.config.ShipConfig
import ch.epfl.bluebrain.nexus.ship.model.InputEvent
import ch.epfl.bluebrain.nexus.ship.organizations.OrganizationProvider
Expand All @@ -25,7 +26,7 @@ class RunShip {

private val logger = Logger[RunShip]

def run(file: Path, config: Option[Path]): IO[ImportReport] = {
def run(file: Path, config: Option[Path], fromOffset: Offset = Offset.start): IO[ImportReport] = {
val clock = Clock[IO]
val uuidF = UUIDF.random
// Resources may have been created with different configurations so we adopt the lenient one for the import
Expand All @@ -42,7 +43,7 @@ class RunShip {
for {
// Provision organizations
_ <- orgProvider.create(config.organizations.values)
events = eventStream(file)
events = eventStream(file, fromOffset)
fetchActiveOrg = FetchActiveOrganization(xas)
// Wiring
eventClock <- EventClock.init()
Expand Down Expand Up @@ -83,11 +84,17 @@ class RunShip {
} yield report
}

private def eventStream(file: Path): Stream[IO, InputEvent] =
Files[IO].readUtf8Lines(file).zipWithIndex.evalMap { case (line, index) =>
IO.fromEither(decode[InputEvent](line)).onError { err =>
logger.error(err)(s"Error parsing to event at line $index")
private def eventStream(file: Path, fromOffset: Offset): Stream[IO, InputEvent] =
Files[IO]
.readUtf8Lines(file)
.zipWithIndex
.evalMap { case (line, index) =>
IO.fromEither(decode[InputEvent](line)).onError { err =>
logger.error(err)(s"Error parsing to event at line $index")
}
}
.filter { event =>
event.ordering.value >= fromOffset.value
}
}

}
2 changes: 2 additions & 0 deletions ship/src/test/resources/import/two-projects.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"ordering":1,"type":"project","org":"public","project":"sscx","id":"projects/public/sscx","rev":1,"value":{"rev": 1, "base": "https://bbp.epfl.ch/neurosciencegraph/data/", "uuid": "c7d70522-4305-480a-b190-75d757ed9a49", "@type": "ProjectCreated", "label": "sscx", "vocab": "https://bbp.epfl.ch/nexus/v1/resources/public/sscx/_/", "instant": "2020-01-22T16:03:33.105Z", "subject": {"@type": "User", "realm": "bbp", "subject": "bob"}, "apiMappings": {"prov": "http://www.w3.org/ns/prov#", "context": "https://incf.github.io/neuroshapes/contexts/", "schemaorg": "http://schema.org/", "datashapes": "https://neuroshapes.org/dash/", "ontologies": "https://neuroshapes.org/dash/ontology", "taxonomies": "https://neuroshapes.org/dash/taxonomy", "commonshapes": "https://neuroshapes.org/commons/", "provdatashapes": "https://provshapes.org/datashapes/", "provcommonshapes": "https://provshapes.org/commons/"}, "description": "This project contains somatosensorycortex dissemination publication data.", "organizationUuid": "d098a020-508b-4131-a28d-75f73b7f5f0e", "organizationLabel": "public"},"instant":"2020-01-22T17:03:33.105+01:00"}
{"ordering":2,"type":"project","org":"public","project":"other","id":"projects/public/other","rev":1,"value":{"rev": 1, "base": "https://bbp.epfl.ch/neurosciencegraph/data/", "uuid": "c7d70522-4305-480a-b190-75d757ed9a49", "@type": "ProjectCreated", "label": "other", "vocab": "https://bbp.epfl.ch/nexus/v1/resources/public/other/_/", "instant": "2020-01-22T16:03:33.105Z", "subject": {"@type": "User", "realm": "bbp", "subject": "bob"}, "apiMappings": {"prov": "http://www.w3.org/ns/prov#", "context": "https://incf.github.io/neuroshapes/contexts/", "schemaorg": "http://schema.org/", "datashapes": "https://neuroshapes.org/dash/", "ontologies": "https://neuroshapes.org/dash/ontology", "taxonomies": "https://neuroshapes.org/dash/taxonomy", "commonshapes": "https://neuroshapes.org/commons/", "provdatashapes": "https://provshapes.org/datashapes/", "provcommonshapes": "https://provshapes.org/commons/"}, "description": "This is a second project.", "organizationUuid": "d098a020-508b-4131-a28d-75f73b7f5f0e", "organizationLabel": "public"},"instant":"2020-01-22T17:03:33.105+01:00"}
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,32 @@ import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceLoader
import ch.epfl.bluebrain.nexus.delta.sdk.projects.Projects
import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.Resolvers
import ch.epfl.bluebrain.nexus.delta.sdk.resources.Resources
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie.{PostgresPassword, PostgresUser}
import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie.{PostgresPassword, PostgresUser, transactors}
import ch.epfl.bluebrain.nexus.ship.ImportReport.Count
import ch.epfl.bluebrain.nexus.ship.RunShipSuite.clearDB
import ch.epfl.bluebrain.nexus.testkit.config.SystemPropertyOverride
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
import ch.epfl.bluebrain.nexus.testkit.postgres.PostgresContainer
import doobie.implicits._
import fs2.io.file.Path
import munit.{AnyFixture, CatsEffectSuite}
import munit.catseffect.IOFixture
import munit.{AnyFixture, CatsEffectSuite}

import java.time.Instant

class MainSuite extends NexusSuite with MainSuite.Fixture {
class RunShipSuite extends NexusSuite with RunShipSuite.Fixture {

override def munitFixtures: Seq[AnyFixture[_]] = List(mainFixture)
private lazy val xas = mainFixture()

override def beforeEach(context: BeforeEach): Unit = {
super.beforeEach(context)
clearDB(xas).accepted
()
}

test("Run import") {
val expected = ImportReport(
Expand All @@ -35,18 +45,41 @@ class MainSuite extends NexusSuite with MainSuite.Fixture {
)
)
for {
importFile <- ClasspathResourceLoader().absolutePath("import/import.json").map(Path(_))
importFile <- asPath("import/import.json")
_ <- new RunShip().run(importFile, None).assertEquals(expected)
} yield ()
}

test("Test the increment") {
for {
importFileWithTwoProjects <- asPath("import/two-projects.json")
startFrom = Offset.at(2)
_ <- new RunShip().run(importFileWithTwoProjects, None, startFrom).map { report =>
assert(report.offset == Offset.at(2L))
assert(thereIsOneProjectEventIn(report))
}
} yield ()
}

test("Show config") {
Main.showConfig(None)
}

private def asPath(path: String): IO[Path] = {
ClasspathResourceLoader().absolutePath(path).map(Path(_))
}

private def thereIsOneProjectEventIn(report: ImportReport) =
report.progress == Map(Projects.entityType -> Count(1L, 0L))

}

object MainSuite {
object RunShipSuite {

def clearDB(xas: Transactors) =
sql"""
| DELETE FROM scoped_events; DELETE FROM scoped_states;
|""".stripMargin.update.run.void.transact(xas.write)

trait Fixture { self: CatsEffectSuite =>

Expand All @@ -58,15 +91,15 @@ object MainSuite {
"ship.organizations.values.public" -> "The public organization"
)

private def resource(): Resource[IO, Unit] =
PostgresContainer
.resource(PostgresUser, PostgresPassword)
.flatMap { postgres =>
SystemPropertyOverride(initConfig(postgres))
}
.void
private val resource: Resource[IO, Transactors] = transactors(
PostgresContainer.resource(PostgresUser, PostgresPassword).flatTap { pg =>
SystemPropertyOverride(initConfig(pg)).void
},
PostgresUser,
PostgresPassword
)

val mainFixture: IOFixture[Unit] = ResourceSuiteLocalFixture("main", resource())
val mainFixture: IOFixture[Transactors] = ResourceSuiteLocalFixture("main", resource)
}

}

0 comments on commit d2b7b41

Please sign in to comment.