Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save summary at the end of the batch #4872

Merged
merged 5 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
DROP TABLE IF EXISTS public.ship_reports;
DROP TABLE IF EXISTS public.global_events;
DROP TABLE IF EXISTS public.global_states;
DROP TABLE IF EXISTS public.scoped_tombstones;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE TABLE IF NOT EXISTS public.ship_runs(
ordering bigserial,
started_at timestamptz NOT NULL,
ended_at timestamptz NOT NULL,
command JSONB NOT NULL,
success boolean NOT NULL,
error text,
report JSONB
)

Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.sourcing.model

import doobie.{Get, Put}
import io.circe.{Decoder, Encoder}
import io.circe.{Decoder, Encoder, KeyEncoder}

/**
* Entity type
Expand All @@ -19,4 +19,6 @@ object EntityType {

implicit val entityTypeDecoder: Decoder[EntityType] =
Decoder.decodeString.map(EntityType(_))

implicit val entityTypeKeyEncoder: KeyEncoder[EntityType] = KeyEncoder.encodeKeyString.contramap(_.toString)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.RowEvent
import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.ship.ImportReport.Count
import ch.epfl.bluebrain.nexus.ship.ImportReport.Statistics
import io.circe.Encoder
import io.circe.generic.semiauto.deriveEncoder

import java.time.Instant

final case class ImportReport(offset: Offset, instant: Instant, progress: Map[EntityType, Count]) {
final case class ImportReport(offset: Offset, instant: Instant, progress: Map[EntityType, Statistics]) {
def +(event: RowEvent, status: ImportStatus): ImportReport = {
val entityType = event.`type`
val newProgress = progress.updatedWith(entityType) {
Expand All @@ -25,25 +27,30 @@ object ImportReport {

val start: ImportReport = ImportReport(Offset.start, Instant.EPOCH, Map.empty)

final case class Count(success: Long, dropped: Long)
final case class Statistics(success: Long, dropped: Long)

object Count {
object Statistics {

implicit val countMonoid: Monoid[Count] = new Monoid[Count] {
override def empty: Count = Count(0L, 0L)
implicit val statisticsMonoid: Monoid[Statistics] = new Monoid[Statistics] {
override def empty: Statistics = Statistics(0L, 0L)

override def combine(x: Count, y: Count): Count = Count(x.success + y.success, x.dropped + y.dropped)
override def combine(x: Statistics, y: Statistics): Statistics =
Statistics(x.success + y.success, x.dropped + y.dropped)
}

implicit val statisticsEncoder: Encoder[Statistics] = deriveEncoder[Statistics]

}

implicit val reportEncoder: Encoder[ImportReport] = deriveEncoder[ImportReport]

implicit val showReport: Show[ImportReport] = (report: ImportReport) => {
val header = s"Type\tSuccess\tDropped\n"
val details = report.progress.foldLeft(header) { case (acc, (entityType, count)) =>
acc ++ s"$entityType\t${count.success}\t${count.dropped}\n"
}

val aggregatedCount = report.progress.values.reduceOption(_ |+| _).getOrElse(Count(0L, 0L))
val aggregatedCount = report.progress.values.reduceOption(_ |+| _).getOrElse(Statistics(0L, 0L))
val global =
s"${aggregatedCount.success} events were imported up to offset ${report.offset} (${aggregatedCount.dropped} have been dropped)."
s"$global\n$details"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
package ch.epfl.bluebrain.nexus.ship

import ch.epfl.bluebrain.nexus.ship.ImportReport.Count
import ch.epfl.bluebrain.nexus.ship.ImportReport.Statistics

sealed trait ImportStatus {

def asCount: Count
def asCount: Statistics

}

object ImportStatus {

case object Success extends ImportStatus {
override def asCount: Count = Count(1L, 0L)
override def asCount: Statistics = Statistics(1L, 0L)
}

case object Dropped extends ImportStatus {

override def asCount: Count = Count(0L, 1L)
override def asCount: Statistics = Statistics(0L, 1L)

}

Expand Down
12 changes: 10 additions & 2 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/InitShip.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
package ch.epfl.bluebrain.nexus.ship

import cats.effect.IO
import cats.effect.{IO, Resource}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.RowEvent
import ch.epfl.bluebrain.nexus.ship.ShipCommand.RunCommand
import ch.epfl.bluebrain.nexus.ship.config.ShipConfig
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider

object InitShip {

def apply(run: RunCommand): IO[(ShipConfig, fs2.Stream[IO, RowEvent])] = run.mode match {
def apply(run: RunCommand): Resource[IO, (ShipConfig, fs2.Stream[IO, RowEvent], Transactors)] =
Resource.eval(configAndStream(run)).flatMap { case (config, eventStream) =>
Transactors
.init(config.database)
.map { xas => (config, eventStream, xas) }
}

private def configAndStream(run: RunCommand): IO[(ShipConfig, fs2.Stream[IO, RowEvent])] = run.mode match {
case RunMode.Local =>
val eventsStream = EventStreamer.localStreamer.stream(run.path, run.offset)
ShipConfig.load(run.config).map(_ -> eventsStream)
Expand Down
24 changes: 14 additions & 10 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package ch.epfl.bluebrain.nexus.ship

import cats.effect.{ExitCode, IO}
import cats.effect.{Clock, ExitCode, IO}
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.ship.BuildInfo
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.ship.ShipCommand._
import ch.epfl.bluebrain.nexus.ship.config.ShipConfig
Expand Down Expand Up @@ -41,7 +40,7 @@ object Main
}

private val run = Opts.subcommand("run", "Run an import") {
(inputPath, configFile, offset, runMode).mapN(RunCommand)
(inputPath, configFile, offset, runMode).mapN(RunCommand.apply)
}

private val showConfig = Opts.subcommand("config", "Show reconciled config") {
Expand All @@ -56,13 +55,18 @@ object Main
}
.map(_.as(ExitCode.Success))

private[ship] def run(r: RunCommand) =
for {
(config, eventsStream) <- InitShip(r)
_ <- Transactors.init(config.database).use { xas =>
RunShip(eventsStream, config.input, xas)
}
} yield ()
private[ship] def run(r: RunCommand): IO[Unit] = {
val clock = Clock[IO]
InitShip(r).use { case (config, eventsStream, xas) =>
for {
start <- clock.realTimeInstant
reportOrError <- RunShip(eventsStream, config.input, xas).attempt
end <- clock.realTimeInstant
_ <- ShipSummaryStore.save(xas, start, end, r, reportOrError)
_ <- IO.fromEither(reportOrError)
} yield ()
}
}

private[ship] def showConfig(config: Option[Path]) =
for {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package ch.epfl.bluebrain.nexus.ship

import io.circe.Encoder

sealed trait RunMode extends Product with Serializable

object RunMode {
Expand All @@ -8,4 +10,9 @@ object RunMode {

final case object S3 extends RunMode

implicit val runModeEncoder: Encoder[RunMode] = Encoder.encodeString.contramap {
case Local => "Local"
case S3 => "S3"
}

}
12 changes: 12 additions & 0 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ShipCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,25 @@ package ch.epfl.bluebrain.nexus.ship

import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import fs2.io.file.Path
import io.circe.Encoder
import io.circe.generic.semiauto.deriveEncoder

import scala.annotation.nowarn

sealed trait ShipCommand extends Product with Serializable

object ShipCommand {

final case class RunCommand(path: Path, config: Option[Path], offset: Offset, mode: RunMode) extends ShipCommand

object RunCommand {
@nowarn("cat=unused")
implicit val runCommandEncoder: Encoder[RunCommand] = {
implicit val pathEncoder: Encoder[Path] = Encoder.encodeString.contramap(_.toString)
deriveEncoder[RunCommand]
}
}

final case class ShowConfigCommand(config: Option[Path]) extends ShipCommand

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package ch.epfl.bluebrain.nexus.ship

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.utils.ThrowableUtils
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.ship.ShipCommand.RunCommand
import doobie.implicits._
import doobie.postgres.implicits._
import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._
import io.circe.syntax.EncoderOps

import java.time.Instant

object ShipSummaryStore {

def save(
xas: Transactors,
start: Instant,
end: Instant,
command: RunCommand,
reportOrError: Either[Throwable, ImportReport]
): IO[Unit] = {
val success = reportOrError.isRight
val insert = reportOrError match {
case Left(error) => {
val errorMessage = ThrowableUtils.stackTraceAsString(error)
sql""" INSERT INTO ship_runs(started_at, ended_at, command, success, error) VALUES($start, $end, ${command.asJson}, $success, $errorMessage)"""
}
case Right(report) =>
sql""" INSERT INTO ship_runs(started_at, ended_at, command , success, report) VALUES($start, $end, ${command.asJson}, $success, ${report.asJson})"""
}
insert.update.run.transact(xas.write).void
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie
import ch.epfl.bluebrain.nexus.ship.ImportReport.Count
import ch.epfl.bluebrain.nexus.ship.ImportReport.Statistics
import ch.epfl.bluebrain.nexus.ship.RunShipSuite.{checkFor, expectedImportReport, getDistinctOrgProjects}
import ch.epfl.bluebrain.nexus.ship.config.ShipConfigFixtures
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
Expand Down Expand Up @@ -76,7 +76,7 @@ class RunShipSuite extends NexusSuite with Doobie.Fixture with ShipConfigFixture
}

private def thereIsOneProjectEventIn(report: ImportReport) =
report.progress == Map(Projects.entityType -> Count(1L, 0L))
report.progress == Map(Projects.entityType -> Statistics(1L, 0L))

}

Expand All @@ -101,10 +101,10 @@ object RunShipSuite {
Offset.at(9999999L),
Instant.parse("2099-12-31T22:59:59.999Z"),
Map(
Projects.entityType -> Count(5L, 0L),
Resolvers.entityType -> Count(5L, 0L),
Resources.entityType -> Count(1L, 0L),
EntityType("xxx") -> Count(0L, 1L)
Projects.entityType -> Statistics(5L, 0L),
Resolvers.entityType -> Statistics(5L, 0L),
Resources.entityType -> Statistics(1L, 0L),
EntityType("xxx") -> Statistics(0L, 1L)
)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package ch.epfl.bluebrain.nexus.ship

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie
import ch.epfl.bluebrain.nexus.ship.ShipCommand.RunCommand
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
import doobie.implicits._
import doobie.postgres.implicits._
import munit.AnyFixture
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.ship.ShipSummaryStoreSuite.ReportRow
import fs2.io.file.Path

import java.time.Instant

class ShipSummaryStoreSuite extends NexusSuite with Doobie.Fixture {

override def munitFixtures: Seq[AnyFixture[_]] = List(doobie)

private lazy val xas = doobie()

private def readLast: IO[ReportRow] =
sql"""SELECT started_at, ended_at, success, error is NOT NULL, report is NOT NULL
|FROM public.ship_runs
|ORDER by ordering DESC
|LIMIT 1
|""".stripMargin
.query[(Instant, Instant, Boolean, Boolean, Boolean)]
.unique
.transact(xas.read)
.map { case (start, end, success, hasError, hasReport) =>
ReportRow(start, end, success, hasError, hasReport)
}

private def assertSaveRun(
start: Instant,
end: Instant,
command: RunCommand,
reportOrError: Either[Throwable, ImportReport]
) = {
for {
_ <- ShipSummaryStore.save(xas, start, end, command, reportOrError)
row <- readLast
} yield {
assertEquals(row.start, start)
assertEquals(row.end, end)
assertEquals(row.success, reportOrError.isRight)
assertEquals(row.hasError, reportOrError.isLeft)
assertEquals(row.hasReport, reportOrError.isRight)
}
}

private val start = Instant.parse("2024-04-17T10:00:00.000Z")
private val end = Instant.parse("2024-04-17T11:00:00.000Z")
private val runCommand = RunCommand(Path("/data"), None, Offset.start, RunMode.Local)

test("Save a failed run") {
val error = new IllegalStateException("BOOM !")
assertSaveRun(start, end, runCommand, Left(error))
}

test("Save a successful run") {
val report = ImportReport(Offset.at(5L), Instant.now(), Map.empty)
assertSaveRun(start, end, runCommand, Right(report))
}

}

object ShipSummaryStoreSuite {

final case class ReportRow(start: Instant, end: Instant, success: Boolean, hasError: Boolean, hasReport: Boolean)

}