Skip to content

Commit

Permalink
Add basic
Browse files Browse the repository at this point in the history
  • Loading branch information
Simon Dumas committed Mar 4, 2024
1 parent c7fad7a commit bb38025
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class HttpClientSpec
object HttpClientSpec {
final case class Value(name: String, rev: Int, deprecated: Boolean)

private final case class Count(
final private case class Count(
reqGetValue: AtomicInteger,
reqStreamValues: AtomicInteger,
reqClientError: AtomicInteger,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ch.epfl.bluebrain.nexus.ship

import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.sourcing.event.Event.ScopedEvent
import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType
Expand Down Expand Up @@ -35,35 +36,27 @@ object EventProcessor {
acc + (processor.resourceType -> processor)
}
eventStream
.evalScan(ImportReport.start) { case(report, event) =>
.evalScan(ImportReport.start) { case (report, event) =>
processorsMap.get(event.`type`) match {
case Some(processor) =>
processor.evaluate(event)
processor
.evaluate(event)
.map { status =>
report + (event, status)
}
.onError { err =>
logger.error(err)(
s"Error while processing event with offset '${event.ordering.value}' with processor '${event.`type`}'."
)
}
logger.error(err)(
s"Error while processing event with offset '${event.ordering.value}' with processor '${event.`type`}'."
)
}
case None =>
logger.warn(s"No processor is provided for '${event.`type`}', skipping...") >>
IO.pure(report + (event, ImportStatus.Dropped))
IO.pure(report + (event, ImportStatus.Dropped))
}
}
.compile
.lastOrError
.flatTap { report =>
val header = logger.info(s"Type\tSuccess\tDropped")
val detailedCount = report.progress.foldLeft(header) { case (acc, (entityType, count)) =>
acc >> logger.info(s"$entityType\t${count.success}\t${count.dropped}")
}

val aggregatedCount = report.aggregatedCount
logger.info(s"${aggregatedCount.success} events were imported up to offset ${report.offset} (${aggregatedCount.success} have been dropped).") >>
detailedCount
}
.flatTap { report => logger.info(report.show) }
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ch.epfl.bluebrain.nexus.ship

import cats.Show
import cats.kernel.Monoid
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType
Expand All @@ -10,13 +11,13 @@ import ch.epfl.bluebrain.nexus.ship.model.InputEvent
import java.time.Instant

final case class ImportReport(offset: Offset, instant: Instant, progress: Map[EntityType, Count]) {
def + (event: InputEvent, status: ImportStatus): ImportReport = {
val entityType = event.`type`
def +(event: InputEvent, status: ImportStatus): ImportReport = {
val entityType = event.`type`
val newProgress = progress.updatedWith(entityType) {
case Some(count) => Some(count |+| status.asCount)
case None => Some(status.asCount)
case None => Some(status.asCount)
}
copy(offset = event.ordering, instant = event.instant, progress= newProgress)
copy(offset = event.ordering, instant = event.instant, progress = newProgress)
}

def aggregatedCount: Count = {
Expand All @@ -39,4 +40,16 @@ object ImportReport {
}

}

implicit val showReport: Show[ImportReport] = (report: ImportReport) => {
val header = s"Type\tSuccess\tDropped\n"
val details = report.progress.foldLeft(header) { case (acc, (entityType, count)) =>
acc ++ s"$entityType\t${count.success}\t${count.dropped}\n"
}

val aggregatedCount = report.aggregatedCount
val global =
s"${aggregatedCount.success} events were imported up to offset ${report.offset} (${aggregatedCount.dropped} have been dropped)."
s"$global\n$details"
}
}
7 changes: 4 additions & 3 deletions ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ object Main
for {
_ <- logger.info(s"Running the import with file $file, config $config and from offset $offset")
config <- ShipConfig.load(config)
report <- Transactors.init(config.database).use { xas =>
val orgProvider = OrganizationProvider(config.eventLog, config.serviceAccount.value, xas, clock)(uuidF)
report <- Transactors.init(config.database).use { xas =>
val orgProvider =
OrganizationProvider(config.eventLog, config.serviceAccount.value, xas, clock)(uuidF)
val fetchContext = FetchContext(ApiMappings.empty, xas, Quotas.disabled)
val eventLogConfig = config.eventLog
val baseUri = config.baseUri
Expand All @@ -79,7 +80,7 @@ object Main
fetchActiveOrg = FetchActiveOrganization(xas)
projectProcessor <- ProjectProcessor(fetchActiveOrg, eventLogConfig, xas)(baseUri)
resolverProcessor <- ResolverProcessor(fetchContext, eventLogConfig, xas)
report <- EventProcessor.run(events, projectProcessor, resolverProcessor)
report <- EventProcessor.run(events, projectProcessor, resolverProcessor)
} yield report
}
} yield report
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ final class ProjectProcessor private (projects: Projects, clock: EventClock, uui

override def evaluate(event: ProjectEvent): IO[ImportStatus] = {
for {
_ <- clock.setInstant(event.instant)
_ <- uuidF.setUUID(event.uuid)
_ <- clock.setInstant(event.instant)
_ <- uuidF.setUUID(event.uuid)
result <- evaluateInternal(event)
} yield result
}
Expand All @@ -52,9 +52,9 @@ final class ProjectProcessor private (projects: Projects, clock: EventClock, uui
}
}.redeemWith(
{
case notFound: NotFound => IO.raiseError(notFound)
case notFound: NotFound => IO.raiseError(notFound)
case error: ProjectRejection => logger.warn(error)(error.reason).as(ImportStatus.Dropped)
case other => IO.raiseError(other)
case other => IO.raiseError(other)
},
_ => IO.pure(ImportStatus.Success)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class ResolverProcessor private (resolvers: Resolvers, clock: EventClock) extend

override def evaluate(event: ResolverEvent): IO[ImportStatus] = {
for {
_ <- clock.setInstant(event.instant)
_ <- clock.setInstant(event.instant)
result <- evaluateInternal(event)
} yield result
}
Expand All @@ -54,7 +54,7 @@ class ResolverProcessor private (resolvers: Resolvers, clock: EventClock) extend
{
case a: ResourceAlreadyExists => logger.warn(a)("The resolver already exists").as(ImportStatus.Dropped)
case i: IncorrectRev => logger.warn(i)("An incorrect revision as been provided").as(ImportStatus.Dropped)
case other => IO.raiseError(other)
case other => IO.raiseError(other)
},
_ => IO.pure(ImportStatus.Success)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class MainSuite extends NexusSuite with MainSuite.Fixture {
Offset.at(5418473L),
Instant.parse("2023-08-22T15:05:13.654Z"),
Map(
Projects.entityType -> Count(4L, 0L),
Projects.entityType -> Count(4L, 0L),
Resolvers.entityType -> Count(5L, 0L)
)
)
Expand Down

0 comments on commit bb38025

Please sign in to comment.