diff --git a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/http/HttpClientSpec.scala b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/http/HttpClientSpec.scala index 8800904cb9..295f75ef14 100644 --- a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/http/HttpClientSpec.scala +++ b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/http/HttpClientSpec.scala @@ -126,7 +126,7 @@ class HttpClientSpec object HttpClientSpec { final case class Value(name: String, rev: Int, deprecated: Boolean) - private final case class Count( + final private case class Count( reqGetValue: AtomicInteger, reqStreamValues: AtomicInteger, reqClientError: AtomicInteger, diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventProcessor.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventProcessor.scala index f374f6e5b4..59da1c6787 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventProcessor.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventProcessor.scala @@ -1,6 +1,7 @@ package ch.epfl.bluebrain.nexus.ship import cats.effect.IO +import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.sourcing.event.Event.ScopedEvent import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType @@ -35,35 +36,27 @@ object EventProcessor { acc + (processor.resourceType -> processor) } eventStream - .evalScan(ImportReport.start) { case(report, event) => + .evalScan(ImportReport.start) { case (report, event) => processorsMap.get(event.`type`) match { case Some(processor) => - processor.evaluate(event) + processor + .evaluate(event) .map { status => report + (event, status) } .onError { err => - logger.error(err)( - s"Error while processing event with offset '${event.ordering.value}' with processor '${event.`type`}'." - ) - } + logger.error(err)( + s"Error while processing event with offset '${event.ordering.value}' with processor '${event.`type`}'." + ) + } case None => logger.warn(s"No processor is provided for '${event.`type`}', skipping...") >> - IO.pure(report + (event, ImportStatus.Dropped)) + IO.pure(report + (event, ImportStatus.Dropped)) } } .compile .lastOrError - .flatTap { report => - val header = logger.info(s"Type\tSuccess\tDropped") - val detailedCount = report.progress.foldLeft(header) { case (acc, (entityType, count)) => - acc >> logger.info(s"$entityType\t${count.success}\t${count.dropped}") - } - - val aggregatedCount = report.aggregatedCount - logger.info(s"${aggregatedCount.success} events were imported up to offset ${report.offset} (${aggregatedCount.success} have been dropped).") >> - detailedCount - } + .flatTap { report => logger.info(report.show) } } } diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ImportReport.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ImportReport.scala index b65fb6220e..563fedf280 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ImportReport.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ImportReport.scala @@ -1,5 +1,6 @@ package ch.epfl.bluebrain.nexus.ship +import cats.Show import cats.kernel.Monoid import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType @@ -10,13 +11,13 @@ import ch.epfl.bluebrain.nexus.ship.model.InputEvent import java.time.Instant final case class ImportReport(offset: Offset, instant: Instant, progress: Map[EntityType, Count]) { - def + (event: InputEvent, status: ImportStatus): ImportReport = { - val entityType = event.`type` + def +(event: InputEvent, status: ImportStatus): ImportReport = { + val entityType = event.`type` val newProgress = progress.updatedWith(entityType) { case Some(count) => Some(count |+| status.asCount) - case None => Some(status.asCount) + case None => Some(status.asCount) } - copy(offset = event.ordering, instant = event.instant, progress= newProgress) + copy(offset = event.ordering, instant = event.instant, progress = newProgress) } def aggregatedCount: Count = { @@ -39,4 +40,16 @@ object ImportReport { } } + + implicit val showReport: Show[ImportReport] = (report: ImportReport) => { + val header = s"Type\tSuccess\tDropped\n" + val details = report.progress.foldLeft(header) { case (acc, (entityType, count)) => + acc ++ s"$entityType\t${count.success}\t${count.dropped}\n" + } + + val aggregatedCount = report.aggregatedCount + val global = + s"${aggregatedCount.success} events were imported up to offset ${report.offset} (${aggregatedCount.dropped} have been dropped)." + s"$global\n$details" + } } diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala index 81baebf241..c353f4d64f 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala @@ -67,8 +67,9 @@ object Main for { _ <- logger.info(s"Running the import with file $file, config $config and from offset $offset") config <- ShipConfig.load(config) - report <- Transactors.init(config.database).use { xas => - val orgProvider = OrganizationProvider(config.eventLog, config.serviceAccount.value, xas, clock)(uuidF) + report <- Transactors.init(config.database).use { xas => + val orgProvider = + OrganizationProvider(config.eventLog, config.serviceAccount.value, xas, clock)(uuidF) val fetchContext = FetchContext(ApiMappings.empty, xas, Quotas.disabled) val eventLogConfig = config.eventLog val baseUri = config.baseUri @@ -79,7 +80,7 @@ object Main fetchActiveOrg = FetchActiveOrganization(xas) projectProcessor <- ProjectProcessor(fetchActiveOrg, eventLogConfig, xas)(baseUri) resolverProcessor <- ResolverProcessor(fetchContext, eventLogConfig, xas) - report <- EventProcessor.run(events, projectProcessor, resolverProcessor) + report <- EventProcessor.run(events, projectProcessor, resolverProcessor) } yield report } } yield report diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/projects/ProjectProcessor.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/projects/ProjectProcessor.scala index d0f6f047b3..dcc8d85a43 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/projects/ProjectProcessor.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/projects/ProjectProcessor.scala @@ -26,8 +26,8 @@ final class ProjectProcessor private (projects: Projects, clock: EventClock, uui override def evaluate(event: ProjectEvent): IO[ImportStatus] = { for { - _ <- clock.setInstant(event.instant) - _ <- uuidF.setUUID(event.uuid) + _ <- clock.setInstant(event.instant) + _ <- uuidF.setUUID(event.uuid) result <- evaluateInternal(event) } yield result } @@ -52,9 +52,9 @@ final class ProjectProcessor private (projects: Projects, clock: EventClock, uui } }.redeemWith( { - case notFound: NotFound => IO.raiseError(notFound) + case notFound: NotFound => IO.raiseError(notFound) case error: ProjectRejection => logger.warn(error)(error.reason).as(ImportStatus.Dropped) - case other => IO.raiseError(other) + case other => IO.raiseError(other) }, _ => IO.pure(ImportStatus.Success) ) diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resolvers/ResolverProcessor.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resolvers/ResolverProcessor.scala index 53b0e9acf9..00282709ff 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resolvers/ResolverProcessor.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resolvers/ResolverProcessor.scala @@ -27,7 +27,7 @@ class ResolverProcessor private (resolvers: Resolvers, clock: EventClock) extend override def evaluate(event: ResolverEvent): IO[ImportStatus] = { for { - _ <- clock.setInstant(event.instant) + _ <- clock.setInstant(event.instant) result <- evaluateInternal(event) } yield result } @@ -54,7 +54,7 @@ class ResolverProcessor private (resolvers: Resolvers, clock: EventClock) extend { case a: ResourceAlreadyExists => logger.warn(a)("The resolver already exists").as(ImportStatus.Dropped) case i: IncorrectRev => logger.warn(i)("An incorrect revision as been provided").as(ImportStatus.Dropped) - case other => IO.raiseError(other) + case other => IO.raiseError(other) }, _ => IO.pure(ImportStatus.Success) ) diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/MainSuite.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/MainSuite.scala index ac261623c2..2c21e909b8 100644 --- a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/MainSuite.scala +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/MainSuite.scala @@ -26,7 +26,7 @@ class MainSuite extends NexusSuite with MainSuite.Fixture { Offset.at(5418473L), Instant.parse("2023-08-22T15:05:13.654Z"), Map( - Projects.entityType -> Count(4L, 0L), + Projects.entityType -> Count(4L, 0L), Resolvers.entityType -> Count(5L, 0L) ) )