Skip to content

Commit

Permalink
Use cats effect and log4cats for org deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
dantb committed Sep 28, 2023
1 parent b90801e commit 5f07a09
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.circe.CirceUnmarshalling
import ch.epfl.bluebrain.nexus.delta.sdk.directives.AuthDirectives
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives._
import ch.epfl.bluebrain.nexus.delta.sdk.ce.DeltaDirectives.{emit => emitCE}
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaSchemeDirectives
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
Expand Down Expand Up @@ -124,7 +125,7 @@ final class OrganizationsRoutes(
parameter("rev".as[Int].?, "prune".?(false)) {
case (_, true) =>
authorizeFor(id, orgs.delete).apply {
emit(orgDeleter.delete(id).leftWiden[OrganizationRejection])
emitCE(orgDeleter.delete(id))
}
case (Some(rev), false) =>
authorizeFor(id, orgs.write).apply {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class DeltaModule(appCfg: AppConfig, config: Config)(implicit classLoader: Class
make[StrictEntity].from { appCfg.http.strictEntityTimeout }
make[ServiceAccount].from { appCfg.serviceAccount.value }

implicit val scheduler: Scheduler = Scheduler.global

make[Transactors].fromResource {
Transactors.init(appCfg.database)
}
Expand Down Expand Up @@ -102,7 +104,7 @@ class DeltaModule(appCfg: AppConfig, config: Config)(implicit classLoader: Class
make[Clock[UIO]].from(Clock[UIO])
make[Clock[IO]].from(Clock.create[IO])
make[UUIDF].from(UUIDF.random)
make[Scheduler].from(Scheduler.global)
make[Scheduler].from(scheduler)
make[JsonKeyOrdering].from(
JsonKeyOrdering.default(topKeys =
List("@context", "@id", "@type", "reason", "details", "sourceId", "projectionId", "_total", "_results")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label
import ch.epfl.bluebrain.nexus.testkit.bio.IOFromMap
import io.circe.Json
import monix.bio.IO
import cats.effect.IO

import java.util.UUID

Expand Down
Original file line number Diff line number Diff line change
@@ -1,45 +1,42 @@
package ch.epfl.bluebrain.nexus.delta.sdk.organizations

import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode
import ch.epfl.bluebrain.nexus.delta.sdk.acls.Acls
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress
import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.OrganizationRejection.OrganizationNonEmpty
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label}
import ch.epfl.bluebrain.nexus.delta.sourcing.{MD5, Transactors}
import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._
import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label
import com.typesafe.scalalogging.Logger
import doobie.{ConnectionIO, Update}
import doobie.implicits._
import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._
import doobie.util.update.Update0
import monix.bio.IO
import monix.bio.UIO
import doobie.{ConnectionIO, Update}
import org.typelevel.log4cats.{Logger => Log4CatsLogger}

trait OrganizationDeleter {
def delete(id: Label): IO[OrganizationNonEmpty, Unit]
def delete(id: Label): IO[Unit]
}

object OrganizationDeleter {

def apply(xas: Transactors): OrganizationDeleter = new OrganizationDeleter {

private val logger: Logger = Logger[OrganizationDeleter]
private val log: Log4CatsLogger[IO] = Logger.cats[OrganizationDeleter]

def delete(id: Label): IO[OrganizationNonEmpty, Unit] =
def delete(id: Label): IO[Unit] =
for {
orgIsEmpty <- orgIsEmpty(id)
_ <- if (orgIsEmpty) log(s"Deleting empty organization $id") *> deleteAll(id)
else IO.raiseError(OrganizationNonEmpty(id))
canDelete <- orgIsEmpty(id)
_ <- if (canDelete) log.info(s"Deleting empty organization $id") *> deleteAll(id)
else log.error(s"Failed to delete empty organization $id") *> IO.raiseError(OrganizationNonEmpty(id))
} yield ()

private def log(msg: String): UIO[Unit] = UIO.delay(logger.info(msg))

private def deleteAll(id: Label): UIO[Unit] =
private def deleteAll(id: Label): IO[Unit] =
(for {
_ <- List("scoped_events", "scoped_states").traverse(dropPartition(id, _)).void
_ <- List("global_events", "global_states").traverse(deleteGlobal(id, _))
} yield ()).transact(xas.write).void.hideErrors
} yield ()).transact(xas.writeCE).void

private def dropPartition(id: Label, table: String): ConnectionIO[Unit] =
Update0(s"DROP TABLE IF EXISTS ${table}_${MD5.hash(id.value)}", None).run.void
Expand All @@ -53,13 +50,12 @@ object OrganizationDeleter {
private def deleteGlobalQuery(id: IriOrBNode.Iri, tpe: EntityType, table: String): ConnectionIO[Unit] =
Update[(EntityType, IriOrBNode.Iri)](s"DELETE FROM $table WHERE" ++ " type = ? AND id = ?").run((tpe, id)).void

private def orgIsEmpty(id: Label): UIO[Boolean] =
private def orgIsEmpty(id: Label): IO[Boolean] =
sql"SELECT type from scoped_events WHERE org = $id LIMIT 1"
.query[Label]
.option
.map(_.isEmpty)
.transact(xas.read)
.hideErrors
.transact(xas.readCE)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import io.circe.{Encoder, JsonObject}
* @param reason
* a descriptive message as to why the rejection occurred
*/
sealed abstract class OrganizationRejection(val reason: String) extends Product with Serializable
sealed abstract class OrganizationRejection(val reason: String) extends Throwable with Product with Serializable

object OrganizationRejection {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import munit.AnyFixture

import java.util.UUID

class OrganizationDeleterSpec extends BioSuite with IOFixedClock with ConfigFixtures {
class OrganizationDeleterSuite extends BioSuite with IOFixedClock with ConfigFixtures {

private val org = Label.unsafe("org")
private val orgUuid = UUID.randomUUID()
Expand Down Expand Up @@ -55,7 +55,7 @@ class OrganizationDeleterSpec extends BioSuite with IOFixedClock with ConfigFixt
} yield ()

val act: UIO[Either[OrganizationNonEmpty, Unit]] =
orgDeleter.delete(org).attempt
IO.from(orgDeleter.delete(org).attemptNarrow[OrganizationNonEmpty]).hideErrors

def assert(result: Either[OrganizationNonEmpty, Unit]): IO[Any, Unit] = for {
eventPartitions <- queryPartitions("scoped_events")
Expand Down Expand Up @@ -83,7 +83,7 @@ class OrganizationDeleterSpec extends BioSuite with IOFixedClock with ConfigFixt
} yield ()

val act: UIO[Either[OrganizationNonEmpty, Unit]] =
orgDeleter.delete(org).attempt
IO.from(orgDeleter.delete(org).attemptNarrow[OrganizationNonEmpty]).hideErrors

def assert(result: Either[OrganizationNonEmpty, Unit]): IO[Any, Unit] = for {
globalEventsDeleted <- orgs.currentEvents(Offset.Start).compile.to(List).map(_.isEmpty)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ch.epfl.bluebrain.nexus.delta.sourcing

import cats.effect.{Blocker, Resource}
import cats.effect.{Blocker, IO, Resource}
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Secret
import ch.epfl.bluebrain.nexus.delta.kernel.cache.{CacheConfig, KeyValueStore}
Expand All @@ -15,7 +15,8 @@ import doobie.implicits._
import doobie.util.ExecutionContexts
import doobie.util.transactor.Transactor
import io.github.classgraph.ClassGraph
import monix.bio.Task
import monix.bio.{IO => BIO, Task}
import monix.execution.Scheduler

import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
Expand All @@ -28,7 +29,10 @@ final case class Transactors(
write: Transactor[Task],
streaming: Transactor[Task],
cache: PartitionsCache
) {
)(implicit s: Scheduler) {

def readCE: Transactor[IO] = read.mapK(BIO.liftTo)
def writeCE: Transactor[IO] = write.mapK(BIO.liftTo)

def execDDL(ddl: String)(implicit cl: ClassLoader): Task[Unit] =
ClasspathResourceUtils.ioContentOf(ddl).flatMap(Fragment.const0(_).update.run.transact(write)).void
Expand Down Expand Up @@ -76,7 +80,9 @@ object Transactors {
* @param password
* the password
*/
def test(host: String, port: Int, username: String, password: String): Resource[Task, Transactors] = {
def test(host: String, port: Int, username: String, password: String)(implicit
s: Scheduler
): Resource[Task, Transactors] = {
val access = DatabaseAccess(host, port, 10)
val databaseConfig = DatabaseConfig(
access,
Expand All @@ -88,10 +94,10 @@ object Transactors {
false,
CacheConfig(500, 10.minutes)
)
init(databaseConfig)(getClass.getClassLoader)
init(databaseConfig)(getClass.getClassLoader, s)
}

def init(config: DatabaseConfig)(implicit classLoader: ClassLoader): Resource[Task, Transactors] = {
def init(config: DatabaseConfig)(implicit classLoader: ClassLoader, s: Scheduler): Resource[Task, Transactors] = {
def transactor(access: DatabaseAccess, readOnly: Boolean, poolName: String) = {
for {
ce <- ExecutionContexts.fixedThreadPool[Task](access.poolSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import doobie.postgres.sqlstate
import monix.bio.{IO, Task, UIO}
import munit.Location
import org.postgresql.util.PSQLException
import monix.execution.Scheduler.Implicits.global

object Doobie {

Expand Down

0 comments on commit 5f07a09

Please sign in to comment.