From 725effa8bda07c7119edef78eb0932f555bca801 Mon Sep 17 00:00:00 2001 From: Simon Date: Wed, 3 Apr 2024 13:40:04 +0200 Subject: [PATCH] Use the StreamingQuery approach to handle long exports (#4820) * Use the StreamingQuery approach to handle long exports * Scalafmt --------- Co-authored-by: Simon Dumas --- delta/app/src/main/resources/app.conf | 1 + .../sourcing/exporter/ExportConfig.scala | 4 +- .../delta/sourcing/exporter/Exporter.scala | 37 +++++++++------- .../delta/sourcing/exporter/RowEvent.scala | 44 +++++++++++++++++++ .../sourcing/exporter/ExporterSuite.scala | 4 +- .../bluebrain/nexus/ship/EventProcessor.scala | 6 +-- .../bluebrain/nexus/ship/ImportReport.scala | 4 +- .../epfl/bluebrain/nexus/ship/RunShip.scala | 6 +-- .../nexus/ship/model/InputEvent.scala | 32 -------------- 9 files changed, 77 insertions(+), 61 deletions(-) create mode 100644 delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/RowEvent.scala delete mode 100644 ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/model/InputEvent.scala diff --git a/delta/app/src/main/resources/app.conf b/delta/app/src/main/resources/app.conf index ea90dd25cf..eda9a41c45 100644 --- a/delta/app/src/main/resources/app.conf +++ b/delta/app/src/main/resources/app.conf @@ -45,6 +45,7 @@ app { # Database export configuration export { + batch-size = 30 # Max number of concurrent exports permits = 1 # Target directory for exports diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/ExportConfig.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/ExportConfig.scala index 9c3dcb8914..a7d490132b 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/ExportConfig.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/ExportConfig.scala @@ -2,12 +2,12 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.exporter import fs2.io.file.Path import pureconfig.ConfigConvert.catchReadError -import pureconfig.{ConfigConvert, ConfigReader} import pureconfig.generic.semiauto.deriveReader +import pureconfig.{ConfigConvert, ConfigReader} import scala.annotation.nowarn -final case class ExportConfig(permits: Int, target: Path) +final case class ExportConfig(batchSize: Int, permits: Int, target: Path) object ExportConfig { diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/Exporter.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/Exporter.scala index 0c000695ea..455ca797da 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/Exporter.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/Exporter.scala @@ -5,13 +5,16 @@ import cats.effect.kernel.Clock import cats.effect.std.Semaphore import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors +import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.Exporter.ExportResult import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ +import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.query.{RefreshStrategy, StreamingQuery} import doobie.Fragments import doobie.implicits._ -import doobie.util.fragment.Fragment -import fs2.io.file.{Files, Path} +import doobie.util.query.Query0 import fs2.Stream +import fs2.io.file.{Files, Path} import io.circe.syntax.EncoderOps import java.time.Instant @@ -29,27 +32,30 @@ object Exporter { final case class ExportResult(json: Path, success: Path, start: Instant, end: Instant) def apply(config: ExportConfig, clock: Clock[IO], xas: Transactors): IO[Exporter] = - Semaphore[IO](config.permits.toLong).map(new ExporterImpl(config.target, _, clock, xas)) + Semaphore[IO](config.permits.toLong).map(new ExporterImpl(config, _, clock, xas)) - private class ExporterImpl(rootDirectory: Path, semaphore: Semaphore[IO], clock: Clock[IO], xas: Transactors) + private class ExporterImpl(config: ExportConfig, semaphore: Semaphore[IO], clock: Clock[IO], xas: Transactors) extends Exporter { + + val queryConfig = QueryConfig(config.batchSize, RefreshStrategy.Stop) override def events(query: ExportEventQuery): IO[ExportResult] = { - val projectFilter = Fragments.orOpt( + val projectFilter = Fragments.orOpt( query.projects.map { project => sql"(org = ${project.organization} and project = ${project.project})" } ) - val q = asJson(sql"""SELECT * + def q(offset: Offset) = + sql"""SELECT ordering, type, org, project, id, rev, value, instant |FROM public.scoped_events - |${Fragments.whereAndOpt(projectFilter, query.offset.asFragment)} + |${Fragments.whereAndOpt(projectFilter, offset.asFragment)} |ORDER BY ordering - |""".stripMargin) + |""".stripMargin.query[RowEvent] val exportIO = for { start <- clock.realTimeInstant _ <- logger.info(s"Starting export for projects ${query.projects} from offset ${query.offset}") - targetDirectory = rootDirectory / query.output.value + targetDirectory = config.target / query.output.value _ <- Files[IO].createDirectory(targetDirectory) exportFile = targetDirectory / s"$start.json" - _ <- exportToFile(q, exportFile) + _ <- exportToFile(q, query.offset, exportFile) end <- clock.realTimeInstant exportSuccess = targetDirectory / s"$start.success" _ <- writeSuccessFile(query, exportSuccess) @@ -62,20 +68,17 @@ object Exporter { semaphore.permit.use { _ => exportIO } } - private def exportToFile(q: Fragment, targetFile: Path) = - q.query[String] - .stream + private def exportToFile(query: Offset => Query0[RowEvent], start: Offset, targetFile: Path) = { + StreamingQuery[RowEvent](start, query, _.ordering, queryConfig, xas) + .map(_.asJson.noSpaces) .intersperse("\n") - .transact(xas.streaming) .through(Files[IO].writeUtf8(targetFile)) .compile .drain + } private def writeSuccessFile(query: ExportEventQuery, targetFile: Path) = Stream(query.asJson.toString()).through(Files[IO].writeUtf8(targetFile)).compile.drain - - private def asJson(query: Fragment) = - sql"""(select row_to_json(t) from ($query) t)""" } } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/RowEvent.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/RowEvent.scala new file mode 100644 index 0000000000..7486dd8d4c --- /dev/null +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/RowEvent.scala @@ -0,0 +1,44 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing.exporter + +import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label} +import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import doobie.Read +import io.circe.{Codec, Decoder, Encoder, Json} + +import java.time.Instant +import scala.annotation.nowarn + +final case class RowEvent( + ordering: Offset.At, + `type`: EntityType, + org: Label, + project: Label, + id: Iri, + rev: Int, + value: Json, + instant: Instant +) + +object RowEvent { + + @nowarn("cat=unused") + implicit final val inputEventEncoder: Codec[RowEvent] = { + import io.circe.generic.extras.Configuration + import io.circe.generic.extras.semiauto.deriveConfiguredCodec + implicit val offsetEncoder: Encoder[Offset.At] = Encoder.encodeLong.contramap(_.value) + implicit val offsetDecoder: Decoder[Offset.At] = Decoder.decodeLong.map(Offset.At) + implicit val config: Configuration = Configuration.default + deriveConfiguredCodec[RowEvent] + } + + implicit val inputEventRead: Read[RowEvent] = { + import doobie._ + import doobie.postgres.implicits._ + import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ + Read[(Long, EntityType, Label, Label, Iri, Int, Json, Instant)].map { + case (offset, entityType, org, project, id, rev, value, instant) => + RowEvent(Offset.At(offset), entityType, org, project, id, rev, value, instant) + } + } +} diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/ExporterSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/ExporterSuite.scala index 582a3b19f5..c202564f16 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/ExporterSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/exporter/ExporterSuite.scala @@ -1,8 +1,8 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.exporter import cats.data.NonEmptyList -import cats.syntax.all._ import cats.effect.IO +import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestEvent.{PullRequestCreated, PullRequestMerged, PullRequestUpdated} @@ -27,7 +27,7 @@ class ExporterSuite extends NexusSuite with Doobie.Fixture with TempDirectory.Fi ) override def munitFixtures: Seq[AnyFixture[_]] = List(tempDirectory, doobieFixture) - private lazy val exporterConfig = ExportConfig(3, exportDirectory) + private lazy val exporterConfig = ExportConfig(5, 3, exportDirectory) private lazy val (_, _, exporter) = doobieFixture() private lazy val exportDirectory = tempDirectory() diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventProcessor.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventProcessor.scala index 59da1c6787..0615131d50 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventProcessor.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventProcessor.scala @@ -4,9 +4,9 @@ import cats.effect.IO import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.sourcing.event.Event.ScopedEvent +import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.RowEvent import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType import ch.epfl.bluebrain.nexus.ship.EventProcessor.logger -import ch.epfl.bluebrain.nexus.ship.model.InputEvent import fs2.Stream import io.circe.Decoder @@ -21,7 +21,7 @@ trait EventProcessor[Event <: ScopedEvent] { def evaluate(event: Event): IO[ImportStatus] - def evaluate(event: InputEvent): IO[ImportStatus] = + def evaluate(event: RowEvent): IO[ImportStatus] = IO.fromEither(decoder.decodeJson(event.value)) .onError(err => logger.error(err)(s"Error while attempting to decode $resourceType at offset ${event.ordering}")) .flatMap(evaluate) @@ -31,7 +31,7 @@ object EventProcessor { private val logger = Logger[EventProcessor.type] - def run(eventStream: Stream[IO, InputEvent], processors: EventProcessor[_]*): IO[ImportReport] = { + def run(eventStream: Stream[IO, RowEvent], processors: EventProcessor[_]*): IO[ImportReport] = { val processorsMap = processors.foldLeft(Map.empty[EntityType, EventProcessor[_]]) { (acc, processor) => acc + (processor.resourceType -> processor) } diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ImportReport.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ImportReport.scala index 4550d5039b..0c35abb7d3 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ImportReport.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/ImportReport.scala @@ -3,15 +3,15 @@ package ch.epfl.bluebrain.nexus.ship import cats.Show import cats.kernel.Monoid import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.RowEvent import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.ship.ImportReport.Count -import ch.epfl.bluebrain.nexus.ship.model.InputEvent import java.time.Instant final case class ImportReport(offset: Offset, instant: Instant, progress: Map[EntityType, Count]) { - def +(event: InputEvent, status: ImportStatus): ImportReport = { + def +(event: RowEvent, status: ImportStatus): ImportReport = { val entityType = event.`type` val newProgress = progress.updatedWith(entityType) { case Some(count) => Some(count |+| status.asCount) diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala index eff2318ee2..53d0c52446 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/RunShip.scala @@ -9,9 +9,9 @@ import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ApiMappings import ch.epfl.bluebrain.nexus.delta.sdk.quotas.Quotas import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors +import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.RowEvent import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.ship.config.ShipConfig -import ch.epfl.bluebrain.nexus.ship.model.InputEvent import ch.epfl.bluebrain.nexus.ship.organizations.OrganizationProvider import ch.epfl.bluebrain.nexus.ship.projects.ProjectProcessor import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverProcessor @@ -79,12 +79,12 @@ class RunShip { } yield report } - private def eventStream(file: Path, fromOffset: Offset): Stream[IO, InputEvent] = + private def eventStream(file: Path, fromOffset: Offset): Stream[IO, RowEvent] = Files[IO] .readUtf8Lines(file) .zipWithIndex .evalMap { case (line, index) => - IO.fromEither(decode[InputEvent](line)).onError { err => + IO.fromEither(decode[RowEvent](line)).onError { err => logger.error(err)(s"Error parsing to event at line $index") } } diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/model/InputEvent.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/model/InputEvent.scala deleted file mode 100644 index 51efe74a19..0000000000 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/model/InputEvent.scala +++ /dev/null @@ -1,32 +0,0 @@ -package ch.epfl.bluebrain.nexus.ship.model - -import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label} -import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset -import io.circe.{Decoder, Json} - -import java.time.Instant -import scala.annotation.nowarn - -final case class InputEvent( - ordering: Offset.At, - `type`: EntityType, - org: Label, - project: Label, - id: Iri, - rev: Int, - value: Json, - instant: Instant -) - -object InputEvent { - - @nowarn("cat=unused") - implicit final val elasticSearchViewValueEncoder: Decoder[InputEvent] = { - import io.circe.generic.extras.Configuration - import io.circe.generic.extras.semiauto.deriveConfiguredDecoder - implicit val offsetDecoder: Decoder[Offset.At] = Decoder.decodeLong.map(Offset.At) - implicit val config: Configuration = Configuration.default - deriveConfiguredDecoder[InputEvent] - } -}