diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/OrganizationsRoutes.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/OrganizationsRoutes.scala index cc6bdee68b..4c656eec30 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/OrganizationsRoutes.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/OrganizationsRoutes.scala @@ -14,6 +14,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck import ch.epfl.bluebrain.nexus.delta.sdk.circe.CirceUnmarshalling import ch.epfl.bluebrain.nexus.delta.sdk.directives.AuthDirectives import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives._ +import ch.epfl.bluebrain.nexus.delta.sdk.ce.DeltaDirectives.{emit => emitCE} import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaSchemeDirectives import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller @@ -124,7 +125,7 @@ final class OrganizationsRoutes( parameter("rev".as[Int].?, "prune".?(false)) { case (_, true) => authorizeFor(id, orgs.delete).apply { - emit(orgDeleter.delete(id).leftWiden[OrganizationRejection]) + emitCE(orgDeleter.delete(id)) } case (Some(rev), false) => authorizeFor(id, orgs.write).apply { diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/DeltaModule.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/DeltaModule.scala index 08c7f0408c..9ac82b076e 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/DeltaModule.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/DeltaModule.scala @@ -61,6 +61,8 @@ class DeltaModule(appCfg: AppConfig, config: Config)(implicit classLoader: Class make[StrictEntity].from { appCfg.http.strictEntityTimeout } make[ServiceAccount].from { appCfg.serviceAccount.value } + implicit val scheduler: Scheduler = Scheduler.global + make[Transactors].fromResource { Transactors.init(appCfg.database) } @@ -102,7 +104,7 @@ class DeltaModule(appCfg: AppConfig, config: Config)(implicit classLoader: Class make[Clock[UIO]].from(Clock[UIO]) make[Clock[IO]].from(Clock.create[IO]) make[UUIDF].from(UUIDF.random) - make[Scheduler].from(Scheduler.global) + make[Scheduler].from(scheduler) make[JsonKeyOrdering].from( JsonKeyOrdering.default(topKeys = List("@context", "@id", "@type", "reason", "details", "sourceId", "projectionId", "_total", "_results") diff --git a/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/OrganizationsRoutesSpec.scala b/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/OrganizationsRoutesSpec.scala index 6c738f285e..23c5afb0b6 100644 --- a/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/OrganizationsRoutesSpec.scala +++ b/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/OrganizationsRoutesSpec.scala @@ -28,7 +28,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label import ch.epfl.bluebrain.nexus.testkit.bio.IOFromMap import io.circe.Json -import monix.bio.IO +import cats.effect.IO import java.util.UUID diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationDeleter.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationDeleter.scala index ad77a942f8..4120454a2e 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationDeleter.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationDeleter.scala @@ -1,45 +1,42 @@ package ch.epfl.bluebrain.nexus.delta.sdk.organizations +import cats.effect.IO import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode import ch.epfl.bluebrain.nexus.delta.sdk.acls.Acls import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.OrganizationRejection.OrganizationNonEmpty +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label} import ch.epfl.bluebrain.nexus.delta.sourcing.{MD5, Transactors} -import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ -import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType -import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label -import com.typesafe.scalalogging.Logger -import doobie.{ConnectionIO, Update} import doobie.implicits._ +import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ import doobie.util.update.Update0 -import monix.bio.IO -import monix.bio.UIO +import doobie.{ConnectionIO, Update} +import org.typelevel.log4cats.{Logger => Log4CatsLogger} trait OrganizationDeleter { - def delete(id: Label): IO[OrganizationNonEmpty, Unit] + def delete(id: Label): IO[Unit] } object OrganizationDeleter { def apply(xas: Transactors): OrganizationDeleter = new OrganizationDeleter { - private val logger: Logger = Logger[OrganizationDeleter] + private val log: Log4CatsLogger[IO] = Logger.cats[OrganizationDeleter] - def delete(id: Label): IO[OrganizationNonEmpty, Unit] = + def delete(id: Label): IO[Unit] = for { - orgIsEmpty <- orgIsEmpty(id) - _ <- if (orgIsEmpty) log(s"Deleting empty organization $id") *> deleteAll(id) - else IO.raiseError(OrganizationNonEmpty(id)) + canDelete <- orgIsEmpty(id) + _ <- if (canDelete) log.info(s"Deleting empty organization $id") *> deleteAll(id) + else log.error(s"Failed to delete empty organization $id") *> IO.raiseError(OrganizationNonEmpty(id)) } yield () - private def log(msg: String): UIO[Unit] = UIO.delay(logger.info(msg)) - - private def deleteAll(id: Label): UIO[Unit] = + private def deleteAll(id: Label): IO[Unit] = (for { _ <- List("scoped_events", "scoped_states").traverse(dropPartition(id, _)).void _ <- List("global_events", "global_states").traverse(deleteGlobal(id, _)) - } yield ()).transact(xas.write).void.hideErrors + } yield ()).transact(xas.writeCE).void private def dropPartition(id: Label, table: String): ConnectionIO[Unit] = Update0(s"DROP TABLE IF EXISTS ${table}_${MD5.hash(id.value)}", None).run.void @@ -53,13 +50,12 @@ object OrganizationDeleter { private def deleteGlobalQuery(id: IriOrBNode.Iri, tpe: EntityType, table: String): ConnectionIO[Unit] = Update[(EntityType, IriOrBNode.Iri)](s"DELETE FROM $table WHERE" ++ " type = ? AND id = ?").run((tpe, id)).void - private def orgIsEmpty(id: Label): UIO[Boolean] = + private def orgIsEmpty(id: Label): IO[Boolean] = sql"SELECT type from scoped_events WHERE org = $id LIMIT 1" .query[Label] .option .map(_.isEmpty) - .transact(xas.read) - .hideErrors + .transact(xas.readCE) } } diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/model/OrganizationRejection.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/model/OrganizationRejection.scala index 76ba67d9d3..2cb2fb3374 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/model/OrganizationRejection.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/model/OrganizationRejection.scala @@ -18,7 +18,7 @@ import io.circe.{Encoder, JsonObject} * @param reason * a descriptive message as to why the rejection occurred */ -sealed abstract class OrganizationRejection(val reason: String) extends Product with Serializable +sealed abstract class OrganizationRejection(val reason: String) extends Throwable with Product with Serializable object OrganizationRejection { diff --git a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationDeleterSpec.scala b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationDeleterSuite.scala similarity index 95% rename from delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationDeleterSpec.scala rename to delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationDeleterSuite.scala index d46043eb16..ce113b1128 100644 --- a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationDeleterSpec.scala +++ b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationDeleterSuite.scala @@ -23,7 +23,7 @@ import munit.AnyFixture import java.util.UUID -class OrganizationDeleterSpec extends BioSuite with IOFixedClock with ConfigFixtures { +class OrganizationDeleterSuite extends BioSuite with IOFixedClock with ConfigFixtures { private val org = Label.unsafe("org") private val orgUuid = UUID.randomUUID() @@ -55,7 +55,7 @@ class OrganizationDeleterSpec extends BioSuite with IOFixedClock with ConfigFixt } yield () val act: UIO[Either[OrganizationNonEmpty, Unit]] = - orgDeleter.delete(org).attempt + IO.from(orgDeleter.delete(org).attemptNarrow[OrganizationNonEmpty]).hideErrors def assert(result: Either[OrganizationNonEmpty, Unit]): IO[Any, Unit] = for { eventPartitions <- queryPartitions("scoped_events") @@ -83,7 +83,7 @@ class OrganizationDeleterSpec extends BioSuite with IOFixedClock with ConfigFixt } yield () val act: UIO[Either[OrganizationNonEmpty, Unit]] = - orgDeleter.delete(org).attempt + IO.from(orgDeleter.delete(org).attemptNarrow[OrganizationNonEmpty]).hideErrors def assert(result: Either[OrganizationNonEmpty, Unit]): IO[Any, Unit] = for { globalEventsDeleted <- orgs.currentEvents(Offset.Start).compile.to(List).map(_.isEmpty) diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Transactors.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Transactors.scala index af9d26f9c7..0bda9251a3 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Transactors.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Transactors.scala @@ -1,6 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.sourcing -import cats.effect.{Blocker, Resource} +import cats.effect.{Blocker, IO, Resource} import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.Secret import ch.epfl.bluebrain.nexus.delta.kernel.cache.{CacheConfig, KeyValueStore} @@ -15,7 +15,8 @@ import doobie.implicits._ import doobie.util.ExecutionContexts import doobie.util.transactor.Transactor import io.github.classgraph.ClassGraph -import monix.bio.Task +import monix.bio.{IO => BIO, Task} +import monix.execution.Scheduler import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ @@ -28,7 +29,10 @@ final case class Transactors( write: Transactor[Task], streaming: Transactor[Task], cache: PartitionsCache -) { +)(implicit s: Scheduler) { + + def readCE: Transactor[IO] = read.mapK(BIO.liftTo) + def writeCE: Transactor[IO] = write.mapK(BIO.liftTo) def execDDL(ddl: String)(implicit cl: ClassLoader): Task[Unit] = ClasspathResourceUtils.ioContentOf(ddl).flatMap(Fragment.const0(_).update.run.transact(write)).void @@ -76,7 +80,9 @@ object Transactors { * @param password * the password */ - def test(host: String, port: Int, username: String, password: String): Resource[Task, Transactors] = { + def test(host: String, port: Int, username: String, password: String)(implicit + s: Scheduler + ): Resource[Task, Transactors] = { val access = DatabaseAccess(host, port, 10) val databaseConfig = DatabaseConfig( access, @@ -88,10 +94,10 @@ object Transactors { false, CacheConfig(500, 10.minutes) ) - init(databaseConfig)(getClass.getClassLoader) + init(databaseConfig)(getClass.getClassLoader, s) } - def init(config: DatabaseConfig)(implicit classLoader: ClassLoader): Resource[Task, Transactors] = { + def init(config: DatabaseConfig)(implicit classLoader: ClassLoader, s: Scheduler): Resource[Task, Transactors] = { def transactor(access: DatabaseAccess, readOnly: Boolean, poolName: String) = { for { ce <- ExecutionContexts.fixedThreadPool[Task](access.poolSize) diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/postgres/Doobie.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/postgres/Doobie.scala index cc36b221bc..0a9cb108bc 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/postgres/Doobie.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/postgres/Doobie.scala @@ -13,6 +13,7 @@ import doobie.postgres.sqlstate import monix.bio.{IO, Task, UIO} import munit.Location import org.postgresql.util.PSQLException +import monix.execution.Scheduler.Implicits.global object Doobie {