Skip to content

Commit

Permalink
Reset createdAt when restarting a view (#4345)
Browse files Browse the repository at this point in the history
  • Loading branch information
olivergrabinski authored Oct 9, 2023
1 parent f480c08 commit cd9396a
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ final class CompositeProgressStore(xas: Transactors)(implicit clock: Clock[UIO])
| processed = ${reset.processed},
| discarded = ${reset.discarded},
| failed = ${reset.failed},
| created_at = $instant,
| updated_at = $instant
|$where
|""".stripMargin.update.run
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ trait Projections {
*/
def save(metadata: ProjectionMetadata, progress: ProjectionProgress): UIO[Unit]

/**
* Resets the progress of a projection to 0, and the instants (createdAt, updatedAt) to the time of the reset
*
* @param name
* the name of the projection to reset
*/
def reset(name: String): UIO[Unit]

/**
* Deletes a projection offset if found.
*
Expand Down Expand Up @@ -104,6 +112,8 @@ object Projections {
override def save(metadata: ProjectionMetadata, progress: ProjectionProgress): UIO[Unit] =
projectionStore.save(metadata, progress)

override def reset(name: String): UIO[Unit] = projectionStore.reset(name)

override def delete(name: String): UIO[Unit] = projectionStore.delete(name)

override def scheduleRestart(projectionName: String)(implicit subject: Subject): UIO[Unit] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ trait ProjectionStore {
*/
def save(metadata: ProjectionMetadata, progress: ProjectionProgress): UIO[Unit]

/**
* Resets the progress of a projection to 0, and the instants (createdAt, updatedAt) to the time of the reset
* @param name
* the name of the projection to reset
*/
def reset(name: String): UIO[Unit]

/**
* Retrieves a projection offset if found.
*
Expand Down Expand Up @@ -83,6 +90,22 @@ object ProjectionStore {
.hideErrors
}

override def reset(name: String): UIO[Unit] =
IOUtils.instant.flatMap { instant =>
sql"""UPDATE projection_offsets
SET ordering = 0,
processed = 0,
discarded = 0,
failed = 0,
created_at = $instant,
updated_at = $instant
WHERE name = $name
""".stripMargin.update.run
.transact(xas.write)
.void
.hideErrors
}

override def offset(name: String): UIO[Option[ProjectionProgress]] =
sql"""SELECT * FROM projection_offsets
|WHERE name = $name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ object Supervisor {
_ <- log.info(s"Restarting '${metadata.module}/${metadata.name}'...")
_ <- stopProjection(s)
_ <- Task.when(s.executionStrategy == PersistentSingleNode)(
projections.save(metadata, ProjectionProgress.NoProgress)
projections.reset(metadata.name)
)
_ <- Supervisor.restartProjection(s, mapRef)
status <- s.control.status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class ProjectionStoreSuite extends BioSuite with IOFixedClock with Doobie.Fixtur
private val metadata = ProjectionMetadata("test", name, Some(project), Some(resource))
private val progress = ProjectionProgress(Offset.At(42L), Instant.EPOCH, 5, 2, 1)
private val newProgress = progress.copy(offset = Offset.At(100L), processed = 100L)
private val noProgress = ProjectionProgress.NoProgress

test("Return an empty offset when not found") {
store.offset("not found").assertNone
Expand Down Expand Up @@ -71,4 +72,30 @@ class ProjectionStoreSuite extends BioSuite with IOFixedClock with Doobie.Fixtur
_ <- store.offset(name).assertNone
} yield ()
}

test("Reset an offset") {
val later = Instant.EPOCH.plusSeconds(1000)
val storeLater = ProjectionStore(xas, QueryConfig(10, RefreshStrategy.Stop))(ioClock(later))

for {
_ <- store.save(metadata, progress)
_ <- assertProgressAndInstants(metadata.name, progress, Instant.EPOCH, Instant.EPOCH)(store)
_ <- storeLater.reset(metadata.name)
_ <- assertProgressAndInstants(metadata.name, noProgress.copy(instant = later), later, later)(store)
} yield ()
}

private def assertProgressAndInstants(
name: String,
progress: ProjectionProgress,
createdAt: Instant,
updatedAt: Instant
)(
store: ProjectionStore
) =
for {
entries <- store.entries.compile.toList
r = entries.assertOneElem
_ = assertEquals((r.name, r.progress, r.createdAt, r.updatedAt), (name, progress, createdAt, updatedAt))
} yield ()
}

0 comments on commit cd9396a

Please sign in to comment.