Skip to content

Commit

Permalink
Migrate ProjectDeletion plugin to cats-effect (#4311)
Browse files Browse the repository at this point in the history
  • Loading branch information
olivergrabinski authored Sep 29, 2023
1 parent 65e2ba3 commit b6faad4
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import akka.http.scaladsl.model.headers.Location
import akka.http.scaladsl.server.{ExceptionHandler, RejectionHandler, Route}
import akka.stream.{Materializer, SystemMaterializer}
import cats.data.NonEmptyList
import cats.effect.{Clock, IO, Resource, Sync}
import cats.effect.{Clock, IO, Resource, Sync, Timer}
import ch.epfl.bluebrain.nexus.delta.Main.pluginsMaxPriority
import ch.epfl.bluebrain.nexus.delta.config.AppConfig
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
Expand Down Expand Up @@ -37,6 +37,7 @@ import monix.bio.{Task, UIO}
import monix.execution.Scheduler
import org.slf4j.{Logger, LoggerFactory}

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt

/**
Expand Down Expand Up @@ -101,6 +102,7 @@ class DeltaModule(appCfg: AppConfig, config: Config)(implicit classLoader: Class

make[Clock[UIO]].from(Clock[UIO])
make[Clock[IO]].from(Clock.create[IO])
make[Timer[IO]].from(IO.timer(ExecutionContext.global))
make[UUIDF].from(UUIDF.random)
make[Scheduler].from(Scheduler.global)
make[JsonKeyOrdering].from(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.kernel.effect.migration

import cats.effect.IO
import cats.~>
import monix.bio.{IO => BIO, UIO}
import monix.execution.Scheduler.Implicits.global

Expand All @@ -12,6 +13,8 @@ trait MigrateEffectSyntax {

implicit def toMonixBIOOps[A](io: IO[A]): CatsIOToBioOps[A] = new CatsIOToBioOps(io)

val ioToUioK: IO ~> UIO = λ[IO ~> UIO](_.toUIO)

}

final class CatsIOToBioOps[A](private val io: IO[A]) extends AnyVal {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.plugins.projectdeletion

import cats.effect.{Clock, IO, Timer}
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.plugins.projectdeletion.model.{contexts, ProjectDeletionConfig}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
Expand All @@ -8,7 +10,6 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.projects.{Projects, ProjectsStatistics}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Supervisor
import izumi.distage.model.definition.{Id, ModuleDef}
import monix.execution.Scheduler

class ProjectDeletionModule(priority: Int) extends ModuleDef {

Expand All @@ -24,10 +25,9 @@ class ProjectDeletionModule(priority: Int) extends ModuleDef {
(
config: ProjectDeletionConfig,
baseUri: BaseUri,
s: Scheduler,
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering
) => new ProjectDeletionRoutes(config)(baseUri, s, cr, ordering)
) => new ProjectDeletionRoutes(config)(baseUri, cr, ordering)
}

many[PriorityRoute].add { (route: ProjectDeletionRoutes) =>
Expand All @@ -39,7 +39,9 @@ class ProjectDeletionModule(priority: Int) extends ModuleDef {
projects: Projects,
config: ProjectDeletionConfig,
projectStatistics: ProjectsStatistics,
supervisor: Supervisor
) => ProjectDeletionRunner.start(projects, config, projectStatistics, supervisor)
supervisor: Supervisor,
clock: Clock[IO],
timer: Timer[IO]
) => ProjectDeletionRunner.start(projects, config, projectStatistics, supervisor)(clock, timer).toUIO
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ package ch.epfl.bluebrain.nexus.delta.plugins.projectdeletion

import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.projectdeletion.model.ProjectDeletionConfig
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives.{baseUriPrefix, emit}
import ch.epfl.bluebrain.nexus.delta.sdk.ce.DeltaDirectives._
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives.baseUriPrefix
import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.RdfMarshalling
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import kamon.instrumentation.akka.http.TracingDirectives.operationName
import monix.bio.UIO
import monix.execution.Scheduler

/**
* The project deletion routes that expose the current configuration of the plugin.
Expand All @@ -22,18 +21,14 @@ import monix.execution.Scheduler
*/
class ProjectDeletionRoutes(config: ProjectDeletionConfig)(implicit
baseUri: BaseUri,
s: Scheduler,
cr: RemoteContextResolution,
ordering: JsonKeyOrdering
) extends RdfMarshalling {
import baseUri.prefixSegment

def routes: Route =
baseUriPrefix(baseUri.prefix) {
pathPrefix("project-deletion" / "config") {
operationName(s"$prefixSegment/v1/project-deletion/config") {
emit(UIO.pure(config))
}
emit(IO.pure(config))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package ch.epfl.bluebrain.nexus.delta.plugins.projectdeletion

import cats.implicits.toTraverseOps
import cats.effect.{Clock, IO, Timer}
import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination
import ch.epfl.bluebrain.nexus.delta.plugins.projectdeletion.model.ProjectDeletionConfig
import ch.epfl.bluebrain.nexus.delta.sdk.ProjectResource
Expand All @@ -9,17 +12,15 @@ import ch.epfl.bluebrain.nexus.delta.sdk.projects.{Projects, ProjectsStatistics}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{CompiledProjection, ExecutionStrategy, ProjectionMetadata, Supervisor}
import com.typesafe.scalalogging.Logger
import fs2.Stream
import monix.bio.{Task, UIO}

import java.time.Instant

class ProjectDeletionRunner(projects: Projects, config: ProjectDeletionConfig, projectStatistics: ProjectsStatistics) {

private val logger: Logger = Logger[ProjectDeletionRunner]
private val logger = Logger.cats[ProjectDeletionRunner]

private def lastEventTime(pr: ProjectResource, now: Instant): UIO[Instant] = {
private def lastEventTime(pr: ProjectResource, now: Instant): IO[Instant] = {
projectStatistics
.get(pr.value.ref)
.map(_.map(_.lastEventTime).getOrElse {
Expand All @@ -28,34 +29,32 @@ class ProjectDeletionRunner(projects: Projects, config: ProjectDeletionConfig, p
})
}

private val allProjects: UIO[Seq[ProjectResource]] = {
private val allProjects: IO[Seq[ProjectResource]] = {
projects
.list(
Pagination.OnePage,
ProjectSearchParams(filter = _ => UIO.pure(true)),
ProjectSearchParams(filter = _ => IO.pure(true).toUIO),
Ordering.by(_.updatedAt) // this is not needed, we are forced to specify an ordering
)
.map(_.results)
.map(_.map(_.source))
}

private def deleteProject(pr: ProjectResource): UIO[Unit] = {
private def deleteProject(pr: ProjectResource): IO[Unit] = {
implicit val caller: Subject = Identity.Anonymous
projects
.delete(pr.value.ref, pr.rev)
.void
.onErrorHandleWith(e => UIO.delay(logger.error(s"Error deleting project from plugin: $e")))
toCatsIO(projects.delete(pr.value.ref, pr.rev))
.handleErrorWith(e => logger.error(s"Error deleting project from plugin: $e"))
.void
}

def projectDeletionPass: UIO[Unit] = {
def projectDeletionPass(implicit clock: Clock[IO]): IO[Unit] = {

val shouldDeleteProject = ShouldDeleteProject(config, lastEventTime)

def possiblyDelete(project: ProjectResource): UIO[Unit] = {
def possiblyDelete(project: ProjectResource): IO[Unit] = {
shouldDeleteProject(project).flatMap {
case true => deleteProject(project)
case false => UIO.unit
case false => IO.unit
}
}

Expand All @@ -77,14 +76,15 @@ object ProjectDeletionRunner {
config: ProjectDeletionConfig,
projectStatistics: ProjectsStatistics,
supervisor: Supervisor
): Task[ProjectDeletionRunner] = {
)(implicit clock: Clock[IO], timer: Timer[IO]): IO[ProjectDeletionRunner] = {

val runner = new ProjectDeletionRunner(projects, config, projectStatistics)

val continuousStream = Stream
.fixedRate[Task](config.idleCheckPeriod)
.fixedRate[IO](config.idleCheckPeriod)
.evalMap(_ => runner.projectDeletionPass)
.drain
.translate(ioToUioK)

val compiledProjection =
CompiledProjection.fromStream(projectionMetadata, ExecutionStrategy.TransientSingleNode, _ => continuousStream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package ch.epfl.bluebrain.nexus.delta.plugins.projectdeletion

import cats.Semigroup
import cats.data.NonEmptyList
import cats.effect.{Clock, IO}
import ch.epfl.bluebrain.nexus.delta.kernel.syntax.instantSyntax
import ch.epfl.bluebrain.nexus.delta.kernel.utils.IOUtils
import ch.epfl.bluebrain.nexus.delta.kernel.utils.IOInstant
import ch.epfl.bluebrain.nexus.delta.plugins.projectdeletion.model.ProjectDeletionConfig
import ch.epfl.bluebrain.nexus.delta.sdk.ProjectResource
import monix.bio.UIO

import java.time.{Duration, Instant}

Expand All @@ -17,8 +17,8 @@ object ShouldDeleteProject {

def apply(
config: ProjectDeletionConfig,
lastEventTime: (ProjectResource, Instant) => UIO[Instant]
): ProjectResource => UIO[Boolean] = (pr: ProjectResource) => {
lastEventTime: (ProjectResource, Instant) => IO[Instant]
)(implicit clock: Clock[IO]): ProjectResource => IO[Boolean] = (pr: ProjectResource) => {

def isIncluded(pr: ProjectResource): Boolean = {
config.includedProjects.exists(regex => regex.matches(pr.value.ref.toString))
Expand All @@ -32,11 +32,11 @@ object ShouldDeleteProject {
config.deleteDeprecatedProjects && pr.deprecated
}

def deletableDueToBeingIdle(pr: ProjectResource): UIO[Boolean] = {
def deletableDueToBeingIdle(pr: ProjectResource): IO[Boolean] = {
implicit val and = andSemigroup
for {
now <- IOUtils.instant
idle <- NonEmptyList.of(UIO.pure(projectIsIdle(pr, now)), resourcesAreIdle(pr, now)).reduce
now <- IOInstant.now
idle <- NonEmptyList.of(IO.pure(projectIsIdle(pr, now)), resourcesAreIdle(pr, now)).reduce
} yield {
idle
}
Expand All @@ -46,25 +46,25 @@ object ShouldDeleteProject {
(now diff pr.updatedAt).toSeconds > config.idleInterval.toSeconds
}

def resourcesAreIdle(pr: ProjectResource, now: Instant): UIO[Boolean] = {
def resourcesAreIdle(pr: ProjectResource, now: Instant): IO[Boolean] = {
lastEventTime(pr, now).map(_.isBefore(now.minus(Duration.ofMillis(config.idleInterval.toMillis))))
}

def alreadyDeleted(pr: ProjectResource): Boolean = {
pr.value.markedForDeletion
}

def worthyOfDeletion(pr: ProjectResource): UIO[Boolean] = {
def worthyOfDeletion(pr: ProjectResource): IO[Boolean] = {
implicit val or = orSemigroup
NonEmptyList.of(UIO.pure(deletableDueToDeprecation(pr)), deletableDueToBeingIdle(pr)).reduce
NonEmptyList.of(IO.pure(deletableDueToDeprecation(pr)), deletableDueToBeingIdle(pr)).reduce
}

implicit val and = andSemigroup
NonEmptyList
.of(
UIO.pure(isIncluded(pr)),
UIO.pure(notExcluded(pr)),
UIO.pure(!alreadyDeleted(pr)),
IO.pure(isIncluded(pr)),
IO.pure(notExcluded(pr)),
IO.pure(!alreadyDeleted(pr)),
worthyOfDeletion(pr)
)
.reduce
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.utils.RouteHelpers
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label
import ch.epfl.bluebrain.nexus.testkit.{IOValues, TestHelpers}
import monix.execution.Scheduler
import org.scalatest.matchers.should.Matchers

import scala.concurrent.duration.DurationInt
Expand All @@ -22,7 +21,6 @@ class ProjectDeletionRoutesSpec
with IOValues
with TestHelpers {

implicit private val scheduler: Scheduler = Scheduler.global
implicit private val cl: ClassLoader = getClass.getClassLoader
implicit private val ordering: JsonKeyOrdering = JsonKeyOrdering.default()
implicit private val baseUri: BaseUri = BaseUri("http://localhost", Label.unsafe("v1"))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ch.epfl.bluebrain.nexus.delta.plugins.projectdeletion

import cats.effect.{Clock, IO}
import ch.epfl.bluebrain.nexus.delta.plugins.projectdeletion.ShouldDeleteProjectSuite.{assertDeleted, assertNotDeleted, configWhere, projectWhere, shouldBeDeleted, ThreeHoursAgo, TwoDaysAgo}
import ch.epfl.bluebrain.nexus.delta.plugins.projectdeletion.model.ProjectDeletionConfig
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
Expand All @@ -11,16 +12,15 @@ import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.Project
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Anonymous
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ResourceRef
import ch.epfl.bluebrain.nexus.testkit.TestHelpers.genString
import ch.epfl.bluebrain.nexus.testkit.bio.{BioAssertions, BioSuite}
import monix.bio.UIO
import ch.epfl.bluebrain.nexus.testkit.ce.{CatsEffectAssertions, CatsEffectSuite}
import munit.{Assertions, Location}

import java.time.{Duration, Instant}
import scala.collection.mutable
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.matching.Regex

class ShouldDeleteProjectSuite extends BioSuite {
class ShouldDeleteProjectSuite extends CatsEffectSuite {

test("delete a deprecated project") {
assertDeleted(
Expand Down Expand Up @@ -104,7 +104,7 @@ class ShouldDeleteProjectSuite extends BioSuite {
}
}

object ShouldDeleteProjectSuite extends Assertions with BioAssertions {
object ShouldDeleteProjectSuite extends Assertions with CatsEffectAssertions {
case class ProjectFixture(
deprecated: Boolean,
updatedAt: Instant,
Expand Down Expand Up @@ -167,31 +167,33 @@ object ShouldDeleteProjectSuite extends Assertions with BioAssertions {
)
}

def addTo(deletedProjects: mutable.Set[ProjectResource]): ProjectResource => UIO[Unit] = { pr =>
UIO.delay {
def addTo(deletedProjects: mutable.Set[ProjectResource]): ProjectResource => IO[Unit] = { pr =>
IO.delay {
deletedProjects.add(pr)
()
}
}

def assertDeleted(result: UIO[Boolean])(implicit loc: Location): UIO[Unit] = {
assertUIO[Boolean](result, _ == true, "project was not deleted")
def assertDeleted(result: IO[Boolean])(implicit loc: Location): IO[Unit] = {
assertIO[Boolean, Boolean](result, true, "project was not deleted")
}

def assertNotDeleted(result: UIO[Boolean])(implicit loc: Location): UIO[Unit] = {
assertUIO[Boolean](result, _ == false, "project was deleted")
def assertNotDeleted(result: IO[Boolean])(implicit loc: Location): IO[Unit] = {
assertIO[Boolean, Boolean](result, false, "project was deleted")
}

val TwoDaysAgo = Instant.now().minus(Duration.ofDays(2))
val ThreeHoursAgo = Instant.now().minus(Duration.ofHours(3))

implicit val clock: Clock[IO] = Clock.create

def shouldBeDeleted(
config: ProjectDeletionConfig,
project: ProjectFixture
): UIO[Boolean] = {
): IO[Boolean] = {
val shouldDeleteProject = ShouldDeleteProject(
config,
lastEventTime = (_, _) => UIO.pure(project.lastEventTime)
lastEventTime = (_, _) => IO.pure(project.lastEventTime)
)

shouldDeleteProject(project.resource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectRejection.Project
import ch.epfl.bluebrain.nexus.delta.sdk.syntax.httpResponseFieldsSyntax
import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityDependency.ReferencedBy
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.rejection.Rejection
import io.circe.syntax._
import io.circe.{Encoder, JsonObject}

Expand All @@ -24,7 +25,7 @@ import io.circe.{Encoder, JsonObject}
* @param reason
* a descriptive message as to why the rejection occurred
*/
sealed abstract class ProjectRejection(val reason: String) extends Product with Serializable
sealed abstract class ProjectRejection(val reason: String) extends Rejection

object ProjectRejection {

Expand Down

0 comments on commit b6faad4

Please sign in to comment.