diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/store/CompositeProgressStore.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/store/CompositeProgressStore.scala index c6116c5aab..5431c49566 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/store/CompositeProgressStore.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/store/CompositeProgressStore.scala @@ -90,6 +90,7 @@ final class CompositeProgressStore(xas: Transactors)(implicit clock: Clock[UIO]) | processed = ${reset.processed}, | discarded = ${reset.discarded}, | failed = ${reset.failed}, + | created_at = $instant, | updated_at = $instant |$where |""".stripMargin.update.run diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/Projections.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/Projections.scala index d7209366c2..d41ac9f9c5 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/Projections.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/Projections.scala @@ -42,6 +42,14 @@ trait Projections { */ def save(metadata: ProjectionMetadata, progress: ProjectionProgress): UIO[Unit] + /** + * Resets the progress of a projection to 0, and the instants (createdAt, updatedAt) to the time of the reset + * + * @param name + * the name of the projection to reset + */ + def reset(name: String): UIO[Unit] + /** * Deletes a projection offset if found. * @@ -104,6 +112,8 @@ object Projections { override def save(metadata: ProjectionMetadata, progress: ProjectionProgress): UIO[Unit] = projectionStore.save(metadata, progress) + override def reset(name: String): UIO[Unit] = projectionStore.reset(name) + override def delete(name: String): UIO[Unit] = projectionStore.delete(name) override def scheduleRestart(projectionName: String)(implicit subject: Subject): UIO[Unit] = { diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectionStore.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectionStore.scala index 68d5075dbb..635fe82071 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectionStore.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectionStore.scala @@ -32,6 +32,13 @@ trait ProjectionStore { */ def save(metadata: ProjectionMetadata, progress: ProjectionProgress): UIO[Unit] + /** + * Resets the progress of a projection to 0, and the instants (createdAt, updatedAt) to the time of the reset + * @param name + * the name of the projection to reset + */ + def reset(name: String): UIO[Unit] + /** * Retrieves a projection offset if found. * @@ -83,6 +90,22 @@ object ProjectionStore { .hideErrors } + override def reset(name: String): UIO[Unit] = + IOUtils.instant.flatMap { instant => + sql"""UPDATE projection_offsets + SET ordering = 0, + processed = 0, + discarded = 0, + failed = 0, + created_at = $instant, + updated_at = $instant + WHERE name = $name + """.stripMargin.update.run + .transact(xas.write) + .void + .hideErrors + } + override def offset(name: String): UIO[Option[ProjectionProgress]] = sql"""SELECT * FROM projection_offsets |WHERE name = $name; diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Supervisor.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Supervisor.scala index f79c364408..89cab9786b 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Supervisor.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Supervisor.scala @@ -341,7 +341,7 @@ object Supervisor { _ <- log.info(s"Restarting '${metadata.module}/${metadata.name}'...") _ <- stopProjection(s) _ <- Task.when(s.executionStrategy == PersistentSingleNode)( - projections.save(metadata, ProjectionProgress.NoProgress) + projections.reset(metadata.name) ) _ <- Supervisor.restartProjection(s, mapRef) status <- s.control.status diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectionStoreSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectionStoreSuite.scala index d6c99c4024..c78c0073a9 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectionStoreSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectionStoreSuite.scala @@ -27,6 +27,7 @@ class ProjectionStoreSuite extends BioSuite with IOFixedClock with Doobie.Fixtur private val metadata = ProjectionMetadata("test", name, Some(project), Some(resource)) private val progress = ProjectionProgress(Offset.At(42L), Instant.EPOCH, 5, 2, 1) private val newProgress = progress.copy(offset = Offset.At(100L), processed = 100L) + private val noProgress = ProjectionProgress.NoProgress test("Return an empty offset when not found") { store.offset("not found").assertNone @@ -71,4 +72,30 @@ class ProjectionStoreSuite extends BioSuite with IOFixedClock with Doobie.Fixtur _ <- store.offset(name).assertNone } yield () } + + test("Reset an offset") { + val later = Instant.EPOCH.plusSeconds(1000) + val storeLater = ProjectionStore(xas, QueryConfig(10, RefreshStrategy.Stop))(ioClock(later)) + + for { + _ <- store.save(metadata, progress) + _ <- assertProgressAndInstants(metadata.name, progress, Instant.EPOCH, Instant.EPOCH)(store) + _ <- storeLater.reset(metadata.name) + _ <- assertProgressAndInstants(metadata.name, noProgress.copy(instant = later), later, later)(store) + } yield () + } + + private def assertProgressAndInstants( + name: String, + progress: ProjectionProgress, + createdAt: Instant, + updatedAt: Instant + )( + store: ProjectionStore + ) = + for { + entries <- store.entries.compile.toList + r = entries.assertOneElem + _ = assertEquals((r.name, r.progress, r.createdAt, r.updatedAt), (name, progress, createdAt, updatedAt)) + } yield () }