diff --git a/delta/sourcing-psql/src/main/resources/scripts/postgres/drop/drop-tables.ddl b/delta/sourcing-psql/src/main/resources/scripts/postgres/drop/drop-tables.ddl index 680006570e..db79608e49 100644 --- a/delta/sourcing-psql/src/main/resources/scripts/postgres/drop/drop-tables.ddl +++ b/delta/sourcing-psql/src/main/resources/scripts/postgres/drop/drop-tables.ddl @@ -1,3 +1,4 @@ +DROP TABLE IF EXISTS public.ship_reports; DROP TABLE IF EXISTS public.global_events; DROP TABLE IF EXISTS public.global_states; DROP TABLE IF EXISTS public.scoped_tombstones; diff --git a/delta/sourcing-psql/src/main/resources/scripts/postgres/init/V1_10_M09_001__ship_reports.ddl b/delta/sourcing-psql/src/main/resources/scripts/postgres/init/V1_10_M09_001__ship_reports.ddl new file mode 100644 index 0000000000..25cf1754bb --- /dev/null +++ b/delta/sourcing-psql/src/main/resources/scripts/postgres/init/V1_10_M09_001__ship_reports.ddl @@ -0,0 +1,10 @@ +CREATE TABLE IF NOT EXISTS public.ship_runs( + ordering bigserial, + started_at timestamptz NOT NULL, + ended_at timestamptz NOT NULL, + command JSONB NOT NULL, + success boolean NOT NULL, + error text, + report JSONB +) + diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/EntityType.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/EntityType.scala index 3a5a244f12..44f61fec44 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/EntityType.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/EntityType.scala @@ -1,7 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.model import doobie.{Get, Put} -import io.circe.{Decoder, Encoder} +import io.circe.{Decoder, Encoder, KeyEncoder} /** * Entity type @@ -19,4 +19,6 @@ object EntityType { implicit val entityTypeDecoder: Decoder[EntityType] = Decoder.decodeString.map(EntityType(_)) + + implicit val entityTypeKeyEncoder: KeyEncoder[EntityType] = KeyEncoder.encodeKeyString.contramap(_.toString) } diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ImportReport.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ImportReport.scala index 0c35abb7d3..f7ba5892d1 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ImportReport.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ImportReport.scala @@ -6,11 +6,13 @@ import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.RowEvent import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset -import ch.epfl.bluebrain.nexus.ship.ImportReport.Count +import ch.epfl.bluebrain.nexus.ship.ImportReport.Statistics +import io.circe.Encoder +import io.circe.generic.semiauto.deriveEncoder import java.time.Instant -final case class ImportReport(offset: Offset, instant: Instant, progress: Map[EntityType, Count]) { +final case class ImportReport(offset: Offset, instant: Instant, progress: Map[EntityType, Statistics]) { def +(event: RowEvent, status: ImportStatus): ImportReport = { val entityType = event.`type` val newProgress = progress.updatedWith(entityType) { @@ -25,25 +27,30 @@ object ImportReport { val start: ImportReport = ImportReport(Offset.start, Instant.EPOCH, Map.empty) - final case class Count(success: Long, dropped: Long) + final case class Statistics(success: Long, dropped: Long) - object Count { + object Statistics { - implicit val countMonoid: Monoid[Count] = new Monoid[Count] { - override def empty: Count = Count(0L, 0L) + implicit val statisticsMonoid: Monoid[Statistics] = new Monoid[Statistics] { + override def empty: Statistics = Statistics(0L, 0L) - override def combine(x: Count, y: Count): Count = Count(x.success + y.success, x.dropped + y.dropped) + override def combine(x: Statistics, y: Statistics): Statistics = + Statistics(x.success + y.success, x.dropped + y.dropped) } + implicit val statisticsEncoder: Encoder[Statistics] = deriveEncoder[Statistics] + } + implicit val reportEncoder: Encoder[ImportReport] = deriveEncoder[ImportReport] + implicit val showReport: Show[ImportReport] = (report: ImportReport) => { val header = s"Type\tSuccess\tDropped\n" val details = report.progress.foldLeft(header) { case (acc, (entityType, count)) => acc ++ s"$entityType\t${count.success}\t${count.dropped}\n" } - val aggregatedCount = report.progress.values.reduceOption(_ |+| _).getOrElse(Count(0L, 0L)) + val aggregatedCount = report.progress.values.reduceOption(_ |+| _).getOrElse(Statistics(0L, 0L)) val global = s"${aggregatedCount.success} events were imported up to offset ${report.offset} (${aggregatedCount.dropped} have been dropped)." s"$global\n$details" diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ImportStatus.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ImportStatus.scala index 79525dae79..6795a0afcd 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ImportStatus.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ImportStatus.scala @@ -1,22 +1,22 @@ package ch.epfl.bluebrain.nexus.ship -import ch.epfl.bluebrain.nexus.ship.ImportReport.Count +import ch.epfl.bluebrain.nexus.ship.ImportReport.Statistics sealed trait ImportStatus { - def asCount: Count + def asCount: Statistics } object ImportStatus { case object Success extends ImportStatus { - override def asCount: Count = Count(1L, 0L) + override def asCount: Statistics = Statistics(1L, 0L) } case object Dropped extends ImportStatus { - override def asCount: Count = Count(0L, 1L) + override def asCount: Statistics = Statistics(0L, 1L) } diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/InitShip.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/InitShip.scala index fc418742e1..c40b1c1fb4 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/InitShip.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/InitShip.scala @@ -1,7 +1,8 @@ package ch.epfl.bluebrain.nexus.ship -import cats.effect.IO +import cats.effect.{IO, Resource} import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient +import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.RowEvent import ch.epfl.bluebrain.nexus.ship.ShipCommand.RunCommand import ch.epfl.bluebrain.nexus.ship.config.ShipConfig @@ -9,7 +10,14 @@ import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider object InitShip { - def apply(run: RunCommand): IO[(ShipConfig, fs2.Stream[IO, RowEvent])] = run.mode match { + def apply(run: RunCommand): Resource[IO, (ShipConfig, fs2.Stream[IO, RowEvent], Transactors)] = + Resource.eval(configAndStream(run)).flatMap { case (config, eventStream) => + Transactors + .init(config.database) + .map { xas => (config, eventStream, xas) } + } + + private def configAndStream(run: RunCommand): IO[(ShipConfig, fs2.Stream[IO, RowEvent])] = run.mode match { case RunMode.Local => val eventsStream = EventStreamer.localStreamer.stream(run.path, run.offset) ShipConfig.load(run.config).map(_ -> eventsStream) diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala index 745d341610..4a68896da7 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala @@ -1,6 +1,6 @@ package ch.epfl.bluebrain.nexus.ship -import cats.effect.{ExitCode, IO} +import cats.effect.{Clock, ExitCode, IO} import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.ship.BuildInfo @@ -41,7 +41,7 @@ object Main } private val run = Opts.subcommand("run", "Run an import") { - (inputPath, configFile, offset, runMode).mapN(RunCommand) + (inputPath, configFile, offset, runMode).mapN(RunCommand.apply) } private val showConfig = Opts.subcommand("config", "Show reconciled config") { @@ -56,13 +56,23 @@ object Main } .map(_.as(ExitCode.Success)) - private[ship] def run(r: RunCommand) = - for { - (config, eventsStream) <- InitShip(r) - _ <- Transactors.init(config.database).use { xas => - RunShip(eventsStream, config.input, xas) - } - } yield () + private[ship] def run(r: RunCommand): IO[Unit] = { + val clock = Clock[IO] + InitShip(r).use { case (config, eventsStream, xas) => + for { + start <- clock.realTimeInstant + reportOrError <- Transactors + .init(config.database) + .use { xas => + RunShip(eventsStream, config.input, xas) + } + .attempt + end <- clock.realTimeInstant + _ <- ShipSummaryStore.save(xas, start, end, r, reportOrError) + _ <- IO.fromEither(reportOrError) + } yield () + } + } private[ship] def showConfig(config: Option[Path]) = for { diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunMode.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunMode.scala index 31d97d1ae9..59028310a0 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunMode.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunMode.scala @@ -1,5 +1,7 @@ package ch.epfl.bluebrain.nexus.ship +import io.circe.Encoder + sealed trait RunMode extends Product with Serializable object RunMode { @@ -8,4 +10,9 @@ object RunMode { final case object S3 extends RunMode + implicit val runModeEncoder: Encoder[RunMode] = Encoder.encodeString.contramap { + case Local => "Local" + case S3 => "S3" + } + } diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ShipCommand.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ShipCommand.scala index 03274734b5..11bc1e8fe0 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ShipCommand.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ShipCommand.scala @@ -2,6 +2,10 @@ package ch.epfl.bluebrain.nexus.ship import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import fs2.io.file.Path +import io.circe.Encoder +import io.circe.generic.semiauto.deriveEncoder + +import scala.annotation.nowarn sealed trait ShipCommand extends Product with Serializable @@ -9,6 +13,14 @@ object ShipCommand { final case class RunCommand(path: Path, config: Option[Path], offset: Offset, mode: RunMode) extends ShipCommand + object RunCommand { + @nowarn("cat=unused") + implicit val runCommandEncoder: Encoder[RunCommand] = { + implicit val pathEncoder: Encoder[Path] = Encoder.encodeString.contramap(_.toString) + deriveEncoder[RunCommand] + } + } + final case class ShowConfigCommand(config: Option[Path]) extends ShipCommand } diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ShipSummaryStore.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ShipSummaryStore.scala new file mode 100644 index 0000000000..97cbb02a1f --- /dev/null +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ShipSummaryStore.scala @@ -0,0 +1,34 @@ +package ch.epfl.bluebrain.nexus.ship + +import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.kernel.utils.ThrowableUtils +import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors +import ch.epfl.bluebrain.nexus.ship.ShipCommand.RunCommand +import doobie.implicits._ +import doobie.postgres.implicits._ +import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ +import io.circe.syntax.EncoderOps + +import java.time.Instant + +object ShipSummaryStore { + + def save( + xas: Transactors, + start: Instant, + end: Instant, + command: RunCommand, + reportOrError: Either[Throwable, ImportReport] + ): IO[Unit] = { + val success = reportOrError.isRight + val insert = reportOrError match { + case Left(error) => { + val errorMessage = ThrowableUtils.stackTraceAsString(error) + sql""" INSERT INTO ship_runs(started_at, ended_at, command, success, error) VALUES($start, $end, ${command.asJson}, $success, $errorMessage)""" + } + case Right(report) => + sql""" INSERT INTO ship_runs(started_at, ended_at, command , success, report) VALUES($start, $end, ${command.asJson}, $success, ${report.asJson})""" + } + insert.update.run.transact(xas.write).void + } +} diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/RunShipSuite.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/RunShipSuite.scala index 10b1bba931..15ba6c88e6 100644 --- a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/RunShipSuite.scala +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/RunShipSuite.scala @@ -10,7 +10,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie -import ch.epfl.bluebrain.nexus.ship.ImportReport.Count +import ch.epfl.bluebrain.nexus.ship.ImportReport.Statistics import ch.epfl.bluebrain.nexus.ship.RunShipSuite.{checkFor, expectedImportReport, getDistinctOrgProjects} import ch.epfl.bluebrain.nexus.ship.config.ShipConfigFixtures import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite @@ -75,7 +75,7 @@ class RunShipSuite extends NexusSuite with Doobie.Fixture with ShipConfigFixture } private def thereIsOneProjectEventIn(report: ImportReport) = - report.progress == Map(Projects.entityType -> Count(1L, 0L)) + report.progress == Map(Projects.entityType -> Statistics(1L, 0L)) } @@ -100,10 +100,10 @@ object RunShipSuite { Offset.at(9999999L), Instant.parse("2099-12-31T22:59:59.999Z"), Map( - Projects.entityType -> Count(5L, 0L), - Resolvers.entityType -> Count(5L, 0L), - Resources.entityType -> Count(1L, 0L), - EntityType("xxx") -> Count(0L, 1L) + Projects.entityType -> Statistics(5L, 0L), + Resolvers.entityType -> Statistics(5L, 0L), + Resources.entityType -> Statistics(1L, 0L), + EntityType("xxx") -> Statistics(0L, 1L) ) ) diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/ShipSummaryStoreSuite.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/ShipSummaryStoreSuite.scala new file mode 100644 index 0000000000..1c2d397456 --- /dev/null +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/ShipSummaryStoreSuite.scala @@ -0,0 +1,73 @@ +package ch.epfl.bluebrain.nexus.ship + +import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie +import ch.epfl.bluebrain.nexus.ship.ShipCommand.RunCommand +import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite +import doobie.implicits._ +import doobie.postgres.implicits._ +import munit.AnyFixture +import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.ship.ShipSummaryStoreSuite.ReportRow +import fs2.io.file.Path + +import java.time.Instant + +class ShipSummaryStoreSuite extends NexusSuite with Doobie.Fixture { + + override def munitFixtures: Seq[AnyFixture[_]] = List(doobie) + + private lazy val xas = doobie() + + private def readLast: IO[ReportRow] = + sql"""SELECT started_at, ended_at, success, error is NOT NULL, report is NOT NULL + |FROM public.ship_runs + |ORDER by ordering DESC + |LIMIT 1 + |""".stripMargin + .query[(Instant, Instant, Boolean, Boolean, Boolean)] + .unique + .transact(xas.read) + .map { case (start, end, success, hasError, hasReport) => + ReportRow(start, end, success, hasError, hasReport) + } + + private def assertSave( + start: Instant, + end: Instant, + command: RunCommand, + reportOrError: Either[Throwable, ImportReport] + ) = { + for { + _ <- ShipSummaryStore.save(xas, start, end, command, reportOrError) + row <- readLast + } yield { + assertEquals(row.start, start) + assertEquals(row.end, end) + assertEquals(row.success, reportOrError.isRight) + assertEquals(row.hasError, reportOrError.isLeft) + assertEquals(row.hasReport, reportOrError.isRight) + } + } + + private val start = Instant.now().minusSeconds(3600) + private val end = Instant.now() + private val runCommand = RunCommand(Path("/data"), None, Offset.start, RunMode.Local) + + test("Save a failed run") { + val error = new IllegalStateException("BOOM !") + assertSave(start, end, runCommand, Left(error)) + } + + test("Save a successful run") { + val report = ImportReport(Offset.at(5L), Instant.now(), Map.empty) + assertSave(start, end, runCommand, Right(report)) + } + +} + +object ShipSummaryStoreSuite { + + final case class ReportRow(start: Instant, end: Instant, success: Boolean, hasError: Boolean, hasReport: Boolean) + +}