Skip to content

Commit

Permalink
Use the StreamingQuery approach to handle long exports (#4820)
Browse files Browse the repository at this point in the history
* Use the StreamingQuery approach to handle long exports

* Scalafmt

---------

Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas authored Apr 3, 2024
1 parent 5abedd2 commit 725effa
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 61 deletions.
1 change: 1 addition & 0 deletions delta/app/src/main/resources/app.conf
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ app {

# Database export configuration
export {
batch-size = 30
# Max number of concurrent exports
permits = 1
# Target directory for exports
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.exporter

import fs2.io.file.Path
import pureconfig.ConfigConvert.catchReadError
import pureconfig.{ConfigConvert, ConfigReader}
import pureconfig.generic.semiauto.deriveReader
import pureconfig.{ConfigConvert, ConfigReader}

import scala.annotation.nowarn

final case class ExportConfig(permits: Int, target: Path)
final case class ExportConfig(batchSize: Int, permits: Int, target: Path)

object ExportConfig {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ import cats.effect.kernel.Clock
import cats.effect.std.Semaphore
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.Exporter.ExportResult
import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.query.{RefreshStrategy, StreamingQuery}
import doobie.Fragments
import doobie.implicits._
import doobie.util.fragment.Fragment
import fs2.io.file.{Files, Path}
import doobie.util.query.Query0
import fs2.Stream
import fs2.io.file.{Files, Path}
import io.circe.syntax.EncoderOps

import java.time.Instant
Expand All @@ -29,27 +32,30 @@ object Exporter {
final case class ExportResult(json: Path, success: Path, start: Instant, end: Instant)

def apply(config: ExportConfig, clock: Clock[IO], xas: Transactors): IO[Exporter] =
Semaphore[IO](config.permits.toLong).map(new ExporterImpl(config.target, _, clock, xas))
Semaphore[IO](config.permits.toLong).map(new ExporterImpl(config, _, clock, xas))

private class ExporterImpl(rootDirectory: Path, semaphore: Semaphore[IO], clock: Clock[IO], xas: Transactors)
private class ExporterImpl(config: ExportConfig, semaphore: Semaphore[IO], clock: Clock[IO], xas: Transactors)
extends Exporter {

val queryConfig = QueryConfig(config.batchSize, RefreshStrategy.Stop)
override def events(query: ExportEventQuery): IO[ExportResult] = {
val projectFilter = Fragments.orOpt(
val projectFilter = Fragments.orOpt(
query.projects.map { project => sql"(org = ${project.organization} and project = ${project.project})" }
)
val q = asJson(sql"""SELECT *
def q(offset: Offset) =
sql"""SELECT ordering, type, org, project, id, rev, value, instant
|FROM public.scoped_events
|${Fragments.whereAndOpt(projectFilter, query.offset.asFragment)}
|${Fragments.whereAndOpt(projectFilter, offset.asFragment)}
|ORDER BY ordering
|""".stripMargin)
|""".stripMargin.query[RowEvent]

val exportIO = for {
start <- clock.realTimeInstant
_ <- logger.info(s"Starting export for projects ${query.projects} from offset ${query.offset}")
targetDirectory = rootDirectory / query.output.value
targetDirectory = config.target / query.output.value
_ <- Files[IO].createDirectory(targetDirectory)
exportFile = targetDirectory / s"$start.json"
_ <- exportToFile(q, exportFile)
_ <- exportToFile(q, query.offset, exportFile)
end <- clock.realTimeInstant
exportSuccess = targetDirectory / s"$start.success"
_ <- writeSuccessFile(query, exportSuccess)
Expand All @@ -62,20 +68,17 @@ object Exporter {
semaphore.permit.use { _ => exportIO }
}

private def exportToFile(q: Fragment, targetFile: Path) =
q.query[String]
.stream
private def exportToFile(query: Offset => Query0[RowEvent], start: Offset, targetFile: Path) = {
StreamingQuery[RowEvent](start, query, _.ordering, queryConfig, xas)
.map(_.asJson.noSpaces)
.intersperse("\n")
.transact(xas.streaming)
.through(Files[IO].writeUtf8(targetFile))
.compile
.drain
}

private def writeSuccessFile(query: ExportEventQuery, targetFile: Path) =
Stream(query.asJson.toString()).through(Files[IO].writeUtf8(targetFile)).compile.drain

private def asJson(query: Fragment) =
sql"""(select row_to_json(t) from ($query) t)"""
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package ch.epfl.bluebrain.nexus.delta.sourcing.exporter

import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import doobie.Read
import io.circe.{Codec, Decoder, Encoder, Json}

import java.time.Instant
import scala.annotation.nowarn

final case class RowEvent(
ordering: Offset.At,
`type`: EntityType,
org: Label,
project: Label,
id: Iri,
rev: Int,
value: Json,
instant: Instant
)

object RowEvent {

@nowarn("cat=unused")
implicit final val inputEventEncoder: Codec[RowEvent] = {
import io.circe.generic.extras.Configuration
import io.circe.generic.extras.semiauto.deriveConfiguredCodec
implicit val offsetEncoder: Encoder[Offset.At] = Encoder.encodeLong.contramap(_.value)
implicit val offsetDecoder: Decoder[Offset.At] = Decoder.decodeLong.map(Offset.At)
implicit val config: Configuration = Configuration.default
deriveConfiguredCodec[RowEvent]
}

implicit val inputEventRead: Read[RowEvent] = {
import doobie._
import doobie.postgres.implicits._
import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._
Read[(Long, EntityType, Label, Label, Iri, Int, Json, Instant)].map {
case (offset, entityType, org, project, id, rev, value, instant) =>
RowEvent(Offset.At(offset), entityType, org, project, id, rev, value, instant)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package ch.epfl.bluebrain.nexus.delta.sourcing.exporter

import cats.data.NonEmptyList
import cats.syntax.all._
import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest
import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestEvent.{PullRequestCreated, PullRequestMerged, PullRequestUpdated}
Expand All @@ -27,7 +27,7 @@ class ExporterSuite extends NexusSuite with Doobie.Fixture with TempDirectory.Fi
)
override def munitFixtures: Seq[AnyFixture[_]] = List(tempDirectory, doobieFixture)

private lazy val exporterConfig = ExportConfig(3, exportDirectory)
private lazy val exporterConfig = ExportConfig(5, 3, exportDirectory)
private lazy val (_, _, exporter) = doobieFixture()
private lazy val exportDirectory = tempDirectory()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.sourcing.event.Event.ScopedEvent
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.RowEvent
import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType
import ch.epfl.bluebrain.nexus.ship.EventProcessor.logger
import ch.epfl.bluebrain.nexus.ship.model.InputEvent
import fs2.Stream
import io.circe.Decoder

Expand All @@ -21,7 +21,7 @@ trait EventProcessor[Event <: ScopedEvent] {

def evaluate(event: Event): IO[ImportStatus]

def evaluate(event: InputEvent): IO[ImportStatus] =
def evaluate(event: RowEvent): IO[ImportStatus] =
IO.fromEither(decoder.decodeJson(event.value))
.onError(err => logger.error(err)(s"Error while attempting to decode $resourceType at offset ${event.ordering}"))
.flatMap(evaluate)
Expand All @@ -31,7 +31,7 @@ object EventProcessor {

private val logger = Logger[EventProcessor.type]

def run(eventStream: Stream[IO, InputEvent], processors: EventProcessor[_]*): IO[ImportReport] = {
def run(eventStream: Stream[IO, RowEvent], processors: EventProcessor[_]*): IO[ImportReport] = {
val processorsMap = processors.foldLeft(Map.empty[EntityType, EventProcessor[_]]) { (acc, processor) =>
acc + (processor.resourceType -> processor)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ package ch.epfl.bluebrain.nexus.ship
import cats.Show
import cats.kernel.Monoid
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.RowEvent
import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.ship.ImportReport.Count
import ch.epfl.bluebrain.nexus.ship.model.InputEvent

import java.time.Instant

final case class ImportReport(offset: Offset, instant: Instant, progress: Map[EntityType, Count]) {
def +(event: InputEvent, status: ImportStatus): ImportReport = {
def +(event: RowEvent, status: ImportStatus): ImportReport = {
val entityType = event.`type`
val newProgress = progress.updatedWith(entityType) {
case Some(count) => Some(count |+| status.asCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ApiMappings
import ch.epfl.bluebrain.nexus.delta.sdk.quotas.Quotas
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.RowEvent
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.ship.config.ShipConfig
import ch.epfl.bluebrain.nexus.ship.model.InputEvent
import ch.epfl.bluebrain.nexus.ship.organizations.OrganizationProvider
import ch.epfl.bluebrain.nexus.ship.projects.ProjectProcessor
import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverProcessor
Expand Down Expand Up @@ -79,12 +79,12 @@ class RunShip {
} yield report
}

private def eventStream(file: Path, fromOffset: Offset): Stream[IO, InputEvent] =
private def eventStream(file: Path, fromOffset: Offset): Stream[IO, RowEvent] =
Files[IO]
.readUtf8Lines(file)
.zipWithIndex
.evalMap { case (line, index) =>
IO.fromEither(decode[InputEvent](line)).onError { err =>
IO.fromEither(decode[RowEvent](line)).onError { err =>
logger.error(err)(s"Error parsing to event at line $index")
}
}
Expand Down

This file was deleted.

0 comments on commit 725effa

Please sign in to comment.