Skip to content

Commit

Permalink
Log dropped events in database (#4984)
Browse files Browse the repository at this point in the history
Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas authored May 17, 2024
1 parent 30848be commit 660bfda
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TABLE IF NOT EXISTS public.ship_dropped_events(
ordering bigint NOT NULL,
type text NOT NULL,
org text NOT NULL,
project text NOT NULL,
id text NOT NULL,
rev integer NOT NULL,
value JSONB NOT NULL,
instant timestamptz NOT NULL,
PRIMARY KEY(org, project, id, rev)
);

Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package ch.epfl.bluebrain.nexus.ship

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.RowEvent
import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._
import doobie.postgres.implicits._
import doobie.implicits._

final class DroppedEventStore(xas: Transactors) {

def count: IO[Long] =
sql"""SELECT COUNT(*) FROM ship_dropped_events""".stripMargin.query[Long].unique.transact(xas.read)

def truncate =
sql"""TRUNCATE ship_dropped_events""".update.run.transact(xas.write).void

def save(rowEvent: RowEvent) =
sql"""
| INSERT INTO ship_dropped_events (
| ordering,
| type,
| org,
| project,
| id,
| rev,
| value,
| instant
| )
| VALUES (
| ${rowEvent.ordering},
| ${rowEvent.`type`},
| ${rowEvent.org},
| ${rowEvent.project},
| ${rowEvent.id},
| ${rowEvent.rev},
| ${rowEvent.value},
| ${rowEvent.instant}
| )""".stripMargin.update.run.transact(xas.write).void

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,34 +31,39 @@ object EventProcessor {

private val logger = Logger[EventProcessor.type]

def run(eventStream: Stream[IO, RowEvent], processors: EventProcessor[_]*): IO[ImportReport] = {
def run(
eventStream: Stream[IO, RowEvent],
droppedEventStore: DroppedEventStore,
processors: EventProcessor[_]*
): IO[ImportReport] = {
val processorsMap = processors.foldLeft(Map.empty[EntityType, EventProcessor[_]]) { (acc, processor) =>
acc + (processor.resourceType -> processor)
}
eventStream
.evalScan(ImportReport.start) { case (report, event) =>
val processed = report.progress.foldLeft(0L) { case (acc, (_, stats)) => acc + stats.success + stats.dropped }
processorsMap.get(event.`type`) match {
case Some(processor) =>
IO.whenA(processed % 1000 == 0)(logger.info(s"Current progress is: ${report.progress}")) >>
processor
.evaluate(event)
.map { status =>
report + (event, status)
}
.onError { err =>
logger.error(err)(
s"Error while processing event with offset '${event.ordering.value}' with processor '${event.`type`}'."
)
}
case None =>
logger.warn(s"No processor is provided for '${event.`type`}', skipping...") >>
IO.pure(report + (event, ImportStatus.Dropped))
// Truncating dropped events from previous run before running the stream
droppedEventStore.truncate >>
eventStream
.evalScan(ImportReport.start) { case (report, event) =>
val processed = report.progress.foldLeft(0L) { case (acc, (_, stats)) => acc + stats.success + stats.dropped }
processorsMap.get(event.`type`) match {
case Some(processor) =>
for {
_ <- IO.whenA(processed % 1000 == 0)(logger.info(s"Current progress is: ${report.progress}"))
status <- processor.evaluate(event).onError { err =>
val message =
s"Error while processing event with offset '${event.ordering.value}' with processor '${event.`type`}'."
logger.error(err)(message)
}
_ <- IO.whenA(status == ImportStatus.Dropped)(droppedEventStore.save(event))
} yield report + (event, status)
case None =>
logger.warn(s"No processor is provided for '${event.`type`}', skipping...") >>
droppedEventStore.save(event) >>
IO.pure(report + (event, ImportStatus.Dropped))
}
}
}
.compile
.lastOrError
.flatTap { report => logger.info(report.show) }
.compile
.lastOrError
.flatTap { report => logger.info(report.show) }
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@ object RunShip {
fileProcessor = FileProcessor(fetchContext, s3Client, projectMapper, rcr, config, eventClock, xas)
// format: on
_ <- logger.info("Starting import")
droppedEventStore = new DroppedEventStore(xas)
report <- EventProcessor
.run(
eventsStream,
droppedEventStore,
projectProcessor,
resolverProcessor,
schemaProcessor,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package ch.epfl.bluebrain.nexus.ship

import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.RowEvent
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
import io.circe.Json
import io.circe.syntax.KeyOps
import munit.AnyFixture

import java.time.Instant

class DroppedEventStoreSuite extends NexusSuite with Doobie.Fixture {

override def munitFixtures: Seq[AnyFixture[_]] = List(doobie)

private lazy val xas = doobie()
private lazy val store = new DroppedEventStore(xas)

test("Insert an event and truncate it") {
val rowEvent = RowEvent(
Offset.At(5L),
EntityType("entity"),
Label.unsafe("org"),
Label.unsafe("project"),
nxv + "id",
3,
Json.obj("field" := "value"),
Instant.EPOCH
)
for {
_ <- store.save(rowEvent)
_ <- store.count.assertEquals(1L)
_ <- store.truncate
_ <- store.count.assertEquals(0L)
} yield ()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class RunShipSuite
for {
events <- eventsStream("import/multi-part-import")
_ <- RunShip(events, s3Client, inputConfig, xas).assertEquals(expectedImportReport)
_ <- new DroppedEventStore(xas).count.assertEquals(1L)
} yield ()
}

Expand Down

0 comments on commit 660bfda

Please sign in to comment.