Skip to content

Commit

Permalink
Save summary at the end of the batch
Browse files Browse the repository at this point in the history
  • Loading branch information
Simon Dumas committed Apr 16, 2024
1 parent 9dc4cc1 commit ab2fea5
Show file tree
Hide file tree
Showing 12 changed files with 194 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
DROP TABLE IF EXISTS public.ship_reports;
DROP TABLE IF EXISTS public.global_events;
DROP TABLE IF EXISTS public.global_states;
DROP TABLE IF EXISTS public.scoped_tombstones;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE TABLE IF NOT EXISTS public.ship_runs(
ordering bigserial,
started_at timestamptz NOT NULL,
ended_at timestamptz NOT NULL,
command JSONB NOT NULL,
success boolean NOT NULL,
error text,
report JSONB
)

Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.sourcing.model

import doobie.{Get, Put}
import io.circe.{Decoder, Encoder}
import io.circe.{Decoder, Encoder, KeyEncoder}

/**
* Entity type
Expand All @@ -19,4 +19,6 @@ object EntityType {

implicit val entityTypeDecoder: Decoder[EntityType] =
Decoder.decodeString.map(EntityType(_))

implicit val entityTypeKeyEncoder: KeyEncoder[EntityType] = KeyEncoder.encodeKeyString.contramap(_.toString)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.RowEvent
import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.ship.ImportReport.Count
import ch.epfl.bluebrain.nexus.ship.ImportReport.Statistics
import io.circe.Encoder
import io.circe.generic.semiauto.deriveEncoder

import java.time.Instant

final case class ImportReport(offset: Offset, instant: Instant, progress: Map[EntityType, Count]) {
final case class ImportReport(offset: Offset, instant: Instant, progress: Map[EntityType, Statistics]) {
def +(event: RowEvent, status: ImportStatus): ImportReport = {
val entityType = event.`type`
val newProgress = progress.updatedWith(entityType) {
Expand All @@ -25,25 +27,30 @@ object ImportReport {

val start: ImportReport = ImportReport(Offset.start, Instant.EPOCH, Map.empty)

final case class Count(success: Long, dropped: Long)
final case class Statistics(success: Long, dropped: Long)

object Count {
object Statistics {

implicit val countMonoid: Monoid[Count] = new Monoid[Count] {
override def empty: Count = Count(0L, 0L)
implicit val statisticsMonoid: Monoid[Statistics] = new Monoid[Statistics] {
override def empty: Statistics = Statistics(0L, 0L)

override def combine(x: Count, y: Count): Count = Count(x.success + y.success, x.dropped + y.dropped)
override def combine(x: Statistics, y: Statistics): Statistics =
Statistics(x.success + y.success, x.dropped + y.dropped)
}

implicit val statisticsEncoder: Encoder[Statistics] = deriveEncoder[Statistics]

}

implicit val reportEncoder: Encoder[ImportReport] = deriveEncoder[ImportReport]

implicit val showReport: Show[ImportReport] = (report: ImportReport) => {
val header = s"Type\tSuccess\tDropped\n"
val details = report.progress.foldLeft(header) { case (acc, (entityType, count)) =>
acc ++ s"$entityType\t${count.success}\t${count.dropped}\n"
}

val aggregatedCount = report.progress.values.reduceOption(_ |+| _).getOrElse(Count(0L, 0L))
val aggregatedCount = report.progress.values.reduceOption(_ |+| _).getOrElse(Statistics(0L, 0L))
val global =
s"${aggregatedCount.success} events were imported up to offset ${report.offset} (${aggregatedCount.dropped} have been dropped)."
s"$global\n$details"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
package ch.epfl.bluebrain.nexus.ship

import ch.epfl.bluebrain.nexus.ship.ImportReport.Count
import ch.epfl.bluebrain.nexus.ship.ImportReport.Statistics

sealed trait ImportStatus {

def asCount: Count
def asCount: Statistics

}

object ImportStatus {

case object Success extends ImportStatus {
override def asCount: Count = Count(1L, 0L)
override def asCount: Statistics = Statistics(1L, 0L)
}

case object Dropped extends ImportStatus {

override def asCount: Count = Count(0L, 1L)
override def asCount: Statistics = Statistics(0L, 1L)

}

Expand Down
12 changes: 10 additions & 2 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/InitShip.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
package ch.epfl.bluebrain.nexus.ship

import cats.effect.IO
import cats.effect.{IO, Resource}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.RowEvent
import ch.epfl.bluebrain.nexus.ship.ShipCommand.RunCommand
import ch.epfl.bluebrain.nexus.ship.config.ShipConfig
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider

object InitShip {

def apply(run: RunCommand): IO[(ShipConfig, fs2.Stream[IO, RowEvent])] = run.mode match {
def apply(run: RunCommand): Resource[IO, (ShipConfig, fs2.Stream[IO, RowEvent], Transactors)] =
Resource.eval(configAndStream(run)).flatMap { case (config, eventStream) =>
Transactors
.init(config.database)
.map { xas => (config, eventStream, xas) }
}

private def configAndStream(run: RunCommand): IO[(ShipConfig, fs2.Stream[IO, RowEvent])] = run.mode match {
case RunMode.Local =>
val eventsStream = EventStreamer.localStreamer.stream(run.path, run.offset)
ShipConfig.load(run.config).map(_ -> eventsStream)
Expand Down
28 changes: 19 additions & 9 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ch.epfl.bluebrain.nexus.ship

import cats.effect.{ExitCode, IO}
import cats.effect.{Clock, ExitCode, IO}
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.ship.BuildInfo
Expand Down Expand Up @@ -41,7 +41,7 @@ object Main
}

private val run = Opts.subcommand("run", "Run an import") {
(inputPath, configFile, offset, runMode).mapN(RunCommand)
(inputPath, configFile, offset, runMode).mapN(RunCommand.apply)
}

private val showConfig = Opts.subcommand("config", "Show reconciled config") {
Expand All @@ -56,13 +56,23 @@ object Main
}
.map(_.as(ExitCode.Success))

private[ship] def run(r: RunCommand) =
for {
(config, eventsStream) <- InitShip(r)
_ <- Transactors.init(config.database).use { xas =>
RunShip(eventsStream, config.input, xas)
}
} yield ()
private[ship] def run(r: RunCommand): IO[Unit] = {
val clock = Clock[IO]
InitShip(r).use { case (config, eventsStream, xas) =>
for {
start <- clock.realTimeInstant
reportOrError <- Transactors
.init(config.database)
.use { xas =>
RunShip(eventsStream, config.input, xas)
}
.attempt
end <- clock.realTimeInstant
_ <- ShipSummaryStore.save(xas, start, end, r, reportOrError)
_ <- IO.fromEither(reportOrError)
} yield ()
}
}

private[ship] def showConfig(config: Option[Path]) =
for {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package ch.epfl.bluebrain.nexus.ship

import io.circe.Encoder

sealed trait RunMode extends Product with Serializable

object RunMode {
Expand All @@ -8,4 +10,9 @@ object RunMode {

final case object S3 extends RunMode

implicit val runModeEncoder: Encoder[RunMode] = Encoder.encodeString.contramap {
case Local => "Local"
case S3 => "S3"
}

}
12 changes: 12 additions & 0 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ShipCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,25 @@ package ch.epfl.bluebrain.nexus.ship

import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import fs2.io.file.Path
import io.circe.Encoder
import io.circe.generic.semiauto.deriveEncoder

import scala.annotation.nowarn

sealed trait ShipCommand extends Product with Serializable

object ShipCommand {

final case class RunCommand(path: Path, config: Option[Path], offset: Offset, mode: RunMode) extends ShipCommand

object RunCommand {
@nowarn("cat=unused")
implicit val runCommandEncoder: Encoder[RunCommand] = {
implicit val pathEncoder: Encoder[Path] = Encoder.encodeString.contramap(_.toString)
deriveEncoder[RunCommand]
}
}

final case class ShowConfigCommand(config: Option[Path]) extends ShipCommand

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package ch.epfl.bluebrain.nexus.ship

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.utils.ThrowableUtils
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.ship.ShipCommand.RunCommand
import doobie.implicits._
import doobie.postgres.implicits._
import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._
import io.circe.syntax.EncoderOps

import java.time.Instant

object ShipSummaryStore {

def save(
xas: Transactors,
start: Instant,
end: Instant,
command: RunCommand,
reportOrError: Either[Throwable, ImportReport]
): IO[Unit] = {
val success = reportOrError.isRight
val insert = reportOrError match {
case Left(error) => {
val errorMessage = ThrowableUtils.stackTraceAsString(error)
sql""" INSERT INTO ship_runs(started_at, ended_at, command, success, error) VALUES($start, $end, ${command.asJson}, $success, $errorMessage)"""
}
case Right(report) =>
sql""" INSERT INTO ship_runs(started_at, ended_at, command , success, report) VALUES($start, $end, ${command.asJson}, $success, ${report.asJson})"""
}
insert.update.run.transact(xas.write).void
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie
import ch.epfl.bluebrain.nexus.ship.ImportReport.Count
import ch.epfl.bluebrain.nexus.ship.ImportReport.Statistics
import ch.epfl.bluebrain.nexus.ship.RunShipSuite.{checkFor, expectedImportReport, getDistinctOrgProjects}
import ch.epfl.bluebrain.nexus.ship.config.ShipConfigFixtures
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
Expand Down Expand Up @@ -75,7 +75,7 @@ class RunShipSuite extends NexusSuite with Doobie.Fixture with ShipConfigFixture
}

private def thereIsOneProjectEventIn(report: ImportReport) =
report.progress == Map(Projects.entityType -> Count(1L, 0L))
report.progress == Map(Projects.entityType -> Statistics(1L, 0L))

}

Expand All @@ -100,10 +100,10 @@ object RunShipSuite {
Offset.at(9999999L),
Instant.parse("2099-12-31T22:59:59.999Z"),
Map(
Projects.entityType -> Count(5L, 0L),
Resolvers.entityType -> Count(5L, 0L),
Resources.entityType -> Count(1L, 0L),
EntityType("xxx") -> Count(0L, 1L)
Projects.entityType -> Statistics(5L, 0L),
Resolvers.entityType -> Statistics(5L, 0L),
Resources.entityType -> Statistics(1L, 0L),
EntityType("xxx") -> Statistics(0L, 1L)
)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package ch.epfl.bluebrain.nexus.ship

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie
import ch.epfl.bluebrain.nexus.ship.ShipCommand.RunCommand
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
import doobie.implicits._
import doobie.postgres.implicits._
import munit.AnyFixture
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.ship.ShipSummaryStoreSuite.ReportRow
import fs2.io.file.Path

import java.time.Instant

class ShipSummaryStoreSuite extends NexusSuite with Doobie.Fixture {

override def munitFixtures: Seq[AnyFixture[_]] = List(doobie)

private lazy val xas = doobie()

private def readLast: IO[ReportRow] =
sql"""SELECT started_at, ended_at, success, error is NOT NULL, report is NOT NULL
|FROM public.ship_runs
|ORDER by ordering DESC
|LIMIT 1
|""".stripMargin
.query[(Instant, Instant, Boolean, Boolean, Boolean)]
.unique
.transact(xas.read)
.map { case (start, end, success, hasError, hasReport) =>
ReportRow(start, end, success, hasError, hasReport)
}

private def assertSave(
start: Instant,
end: Instant,
command: RunCommand,
reportOrError: Either[Throwable, ImportReport]
) = {
for {
_ <- ShipSummaryStore.save(xas, start, end, command, reportOrError)
row <- readLast
} yield {
assertEquals(row.start, start)
assertEquals(row.end, end)
assertEquals(row.success, reportOrError.isRight)
assertEquals(row.hasError, reportOrError.isLeft)
assertEquals(row.hasReport, reportOrError.isRight)
}
}

private val start = Instant.now().minusSeconds(3600)
private val end = Instant.now()
private val runCommand = RunCommand(Path("/data"), None, Offset.start, RunMode.Local)

test("Save a failed run") {
val error = new IllegalStateException("BOOM !")
assertSave(start, end, runCommand, Left(error))
}

test("Save a successful run") {
val report = ImportReport(Offset.at(5L), Instant.now(), Map.empty)
assertSave(start, end, runCommand, Right(report))
}

}

object ShipSummaryStoreSuite {

final case class ReportRow(start: Instant, end: Instant, success: Boolean, hasError: Boolean, hasReport: Boolean)

}

0 comments on commit ab2fea5

Please sign in to comment.