Skip to content

Commit

Permalink
Migrate remote context resolution and distage modules to Cats Effect (#…
Browse files Browse the repository at this point in the history
…4361)

* Migrate remote context resolution and distage modules to Cats Effect
---------

Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas authored Oct 12, 2023
1 parent fbbb192 commit bd60ed2
Show file tree
Hide file tree
Showing 44 changed files with 397 additions and 336 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import akka.actor.typed.scaladsl.LoggerOps
import akka.actor.typed.scaladsl.adapter._
import akka.actor.{ActorSystem => ActorSystemClassic}
import akka.http.scaladsl.Http
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import akka.http.scaladsl.server.{ExceptionHandler, RejectionHandler, Route, RouteResult}
import cats.effect.{ExitCode, Resource}
import ch.epfl.bluebrain.nexus.delta.config.{AppConfig, BuildInfo}
Expand Down Expand Up @@ -56,7 +57,7 @@ object Main extends BIOApp {
(cfg, config, cl, pluginDefs) <- Resource.eval(loadPluginsAndConfig(loaderConfig))
_ <- Resource.eval(KamonMonitoring.initialize(config))
modules = DeltaModule(cfg, config, cl)
(plugins, locator) <- WiringInitializer(modules, pluginDefs)
(plugins, locator) <- WiringInitializer(modules, pluginDefs).mapK(ioToTaskK)
_ <- bootstrap(locator, plugins)
} yield locator

Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package ch.epfl.bluebrain.nexus.delta.plugin

import cats.effect.Resource
import cats.effect.{IO, Resource}
import cats.syntax.traverse._
import cats.syntax.flatMap._
import cats.syntax.monadError._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.sdk.error.PluginError.PluginInitializationError
import ch.epfl.bluebrain.nexus.delta.sdk.plugin.{Plugin, PluginDef}
import com.typesafe.scalalogging.Logger
import distage.{Injector, Roots}
import izumi.distage.model.Locator
import izumi.distage.model.definition.ModuleDef
import izumi.distage.modules.DefaultModule
import monix.bio.{IO, Task}

object WiringInitializer {

private val logger = Logger[WiringInitializer.type]
private val logger = Logger.cats[WiringInitializer.type]

/**
* Combines the [[ModuleDef]] of the passed ''serviceModule'' with the ones provided by the plugins. Afterwards
Expand All @@ -21,23 +24,25 @@ object WiringInitializer {
def apply(
serviceModule: ModuleDef,
pluginsDef: List[PluginDef]
): Resource[Task, (List[Plugin], Locator)] = {
): Resource[IO, (List[Plugin], Locator)] = {
val pluginsInfoModule = new ModuleDef { make[List[PluginDef]].from(pluginsDef) }
val appModules = (serviceModule :: pluginsInfoModule :: pluginsDef.map(_.module)).merge

// workaround for: java.lang.NoClassDefFoundError: zio/blocking/package$Blocking$Service
implicit val defaultModule: DefaultModule[Task] = DefaultModule.empty
Injector[Task]()
implicit val defaultModule: DefaultModule[IO] = DefaultModule.empty
Injector[IO]()
.produce(appModules, Roots.Everything)
.toCats
.evalMap { locator =>
IO.traverse(pluginsDef) { plugin =>
Task.delay(logger.info(s"Initializing plugin ${plugin.info.name}...")) >>
plugin.initialize(locator).tapEval { _ =>
Task.delay(logger.info(s"Plugin ${plugin.info.name} initialized."))
}
}.map(_ -> locator)
.mapError(e => PluginInitializationError(e.getMessage))
pluginsDef
.traverse { plugin =>
logger.info(s"Initializing plugin ${plugin.info.name}...") >>
toCatsIO(plugin.initialize(locator)).flatTap { _ =>
logger.info(s"Plugin ${plugin.info.name} initialized.")
}
}
.map(_ -> locator)
.adaptError(e => PluginInitializationError(e.getMessage))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import cats.data.NonEmptyList
import cats.effect.{Clock, ContextShift, IO, Resource, Sync, Timer}
import ch.epfl.bluebrain.nexus.delta.Main.pluginsMaxPriority
import ch.epfl.bluebrain.nexus.delta.config.AppConfig
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.contexts
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi}
Expand All @@ -34,7 +35,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.execution.EvaluationExecution
import ch.megard.akka.http.cors.scaladsl.settings.CorsSettings
import com.typesafe.config.Config
import izumi.distage.model.definition.{Id, ModuleDef}
import monix.bio.{Task, UIO}
import monix.bio.UIO
import monix.execution.Scheduler
import org.slf4j.{Logger, LoggerFactory}

Expand All @@ -50,7 +51,7 @@ import scala.concurrent.duration.DurationInt
* the raw merged and resolved configuration
*/
class DeltaModule(appCfg: AppConfig, config: Config)(implicit classLoader: ClassLoader) extends ModuleDef {
addImplicit[Sync[Task]]
addImplicit[Sync[IO]]

make[AppConfig].from(appCfg)
make[Config].from(config)
Expand All @@ -66,7 +67,7 @@ class DeltaModule(appCfg: AppConfig, config: Config)(implicit classLoader: Class
implicit val scheduler: Scheduler = Scheduler.global

make[Transactors].fromResource {
Transactors.init(appCfg.database)
Transactors.init(appCfg.database).mapK(taskToIoK)
}

make[List[PluginDescription]].from { (pluginsDef: List[PluginDef]) => pluginsDef.map(_.info) }
Expand Down Expand Up @@ -102,7 +103,9 @@ class DeltaModule(appCfg: AppConfig, config: Config)(implicit classLoader: Class
.merge(otherCtxResolutions.toSeq: _*)
}

make[JsonLdApi].fromValue(new JsonLdJavaApi(appCfg.jsonLdApi))
make[JsonLdApi].from { contextShift: ContextShift[IO] =>
new JsonLdJavaApi(appCfg.jsonLdApi)(contextShift)
}

make[Clock[UIO]].from(Clock[UIO])
make[Clock[IO]].from(Clock.create[IO])
Expand All @@ -116,17 +119,19 @@ class DeltaModule(appCfg: AppConfig, config: Config)(implicit classLoader: Class
List("@context", "@id", "@type", "reason", "details", "sourceId", "projectionId", "_total", "_results")
)
)
make[ActorSystem[Nothing]].fromResource {
val make = Task.delay(
make[ActorSystem[Nothing]].fromResource { (timer: Timer[IO], contextShift: ContextShift[IO]) =>
implicit val t: Timer[IO] = timer
implicit val cs: ContextShift[IO] = contextShift
val make = IO.delay(
ActorSystem[Nothing](
Behaviors.empty,
appCfg.description.fullName,
BootstrapSetup().withConfig(config).withClassloader(classLoader)
)
)
val release = (as: ActorSystem[Nothing]) => {
val release = (as: ActorSystem[Nothing]) => {
import akka.actor.typed.scaladsl.adapter._
Task.deferFuture(as.toClassic.terminate()).timeout(15.seconds).void
IO.fromFuture(IO(as.toClassic.terminate()).timeout(15.seconds)).void
}
Resource.make(make)(release)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import ch.epfl.bluebrain.nexus.delta.config.AppConfig
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.routes.{ElemRoutes, EventsRoutes}
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.sdk._
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaSchemeDirectives
Expand Down Expand Up @@ -32,13 +33,15 @@ object EventsModule extends ModuleDef {
xas: Transactors,
jo: JsonKeyOrdering
) =>
SseEventLog(
sseEncoders,
organizations.fetch(_).void,
projects.fetch(_).map { p => (p.value.organizationUuid, p.value.uuid) },
config.sse,
xas
)(jo)
toCatsIO(
SseEventLog(
sseEncoders,
organizations.fetch(_).void,
projects.fetch(_).map { p => (p.value.organizationUuid, p.value.uuid) },
config.sse,
xas
)(jo)
)
}

make[SseElemStream].from { (qc: QueryConfig, xas: Transactors) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import ch.epfl.bluebrain.nexus.delta.sdk.identities.{Identities, IdentitiesImpl}
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.realms.Realms
import izumi.distage.model.definition.{Id, ModuleDef}
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._

/**
* Identities module wiring config.
Expand All @@ -28,7 +27,7 @@ object IdentitiesModule extends ModuleDef {
make[CacheConfig].from((cfg: AppConfig) => cfg.identities)

make[Identities].fromEffect { (realms: Realms, hc: HttpClient @Id("realm"), config: CacheConfig) =>
IdentitiesImpl(realms, hc, config).toUIO
IdentitiesImpl(realms, hc, config)
}

make[OpenIdAuthService].from { (httpClient: HttpClient @Id("realm"), realms: Realms) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package ch.epfl.bluebrain.nexus.delta.wiring

import cats.effect.Clock
import cats.effect.{Clock, IO}
import ch.epfl.bluebrain.nexus.delta.Main.pluginsMaxPriority
import ch.epfl.bluebrain.nexus.delta.config.AppConfig
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.contexts
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.routes.ProjectsRoutes
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.sdk._
import ch.epfl.bluebrain.nexus.delta.sdk.acls.{AclCheck, Acls}
import ch.epfl.bluebrain.nexus.delta.sdk.deletion.{ProjectDeletionCoordinator, ProjectDeletionTask}
Expand All @@ -28,7 +29,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseEncoder
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Supervisor
import izumi.distage.model.definition.{Id, ModuleDef}
import monix.bio.{Task, UIO}
import monix.bio.UIO
import monix.execution.Scheduler

/**
Expand Down Expand Up @@ -58,7 +59,7 @@ object ProjectsModule extends ModuleDef {
clock: Clock[UIO],
uuidF: UUIDF
) =>
Task.pure(
IO.pure(
ProjectsImpl(
organizations.fetchActiveOrganization(_).mapError(WrappedOrganizationRejection),
ValidateProjectDeletion(xas, config.projects.deletion.enabled),
Expand All @@ -71,7 +72,7 @@ object ProjectsModule extends ModuleDef {
}

make[ProjectsStatistics].fromEffect { (xas: Transactors) =>
ProjectsStatistics(xas)
toCatsIO(ProjectsStatistics(xas))
}

make[ProjectProvisioning].from {
Expand All @@ -81,7 +82,7 @@ object ProjectsModule extends ModuleDef {

make[FetchContext[ContextRejection]].fromEffect {
(organizations: Organizations, projects: Projects, quotas: Quotas) =>
Task.pure(FetchContext(organizations, projects, quotas))
IO.pure(FetchContext(organizations, projects, quotas))
}

make[ProjectDeletionCoordinator].fromEffect {
Expand All @@ -94,18 +95,20 @@ object ProjectsModule extends ModuleDef {
xas: Transactors,
clock: Clock[UIO]
) =>
ProjectDeletionCoordinator(
projects,
deletionTasks,
config.projects.deletion,
serviceAccount,
supervisor,
xas
)(clock)
toCatsIO(
ProjectDeletionCoordinator(
projects,
deletionTasks,
config.projects.deletion,
serviceAccount,
supervisor,
xas
)(clock)
)
}

make[UUIDCache].fromEffect { (config: AppConfig, xas: Transactors) =>
UUIDCache(config.projects.cache, config.organizations.cache, xas)
toCatsIO(UUIDCache(config.projects.cache, config.organizations.cache, xas))
}

make[DeltaSchemeDirectives].from {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.wiring

import cats.effect.{Clock, IO, Sync, Timer}
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.sdk.ResourceShifts
import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{ProjectionConfig, QueryConfig}
Expand All @@ -9,14 +10,13 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.stream._
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes._
import ch.epfl.bluebrain.nexus.delta.sourcing.{DeleteExpired, PurgeElemFailures, Transactors}
import izumi.distage.model.definition.ModuleDef
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import monix.bio.{Task, UIO}
import monix.bio.UIO

/**
* Indexing specific wiring.
*/
object StreamModule extends ModuleDef {
addImplicit[Sync[Task]]
addImplicit[Sync[IO]]

make[GraphResourceStream].from {
(
Expand Down Expand Up @@ -52,16 +52,16 @@ object StreamModule extends ModuleDef {

make[Supervisor].fromResource {
(projections: Projections, projectionErrors: ProjectionErrors, cfg: ProjectionConfig) =>
Supervisor(projections, projectionErrors, cfg)
Supervisor(projections, projectionErrors, cfg).mapK(taskToIoK)
}

make[DeleteExpired].fromEffect {
(supervisor: Supervisor, config: ProjectionConfig, xas: Transactors, clock: Clock[IO], timer: Timer[IO]) =>
DeleteExpired(supervisor, config, xas)(clock, timer).toUIO
DeleteExpired(supervisor, config, xas)(clock, timer)
}

make[PurgeElemFailures].fromEffect {
(supervisor: Supervisor, config: ProjectionConfig, xas: Transactors, clock: Clock[UIO]) =>
PurgeElemFailures(supervisor, config, xas)(clock)
toCatsIO(PurgeElemFailures(supervisor, config, xas)(clock))
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package ch.epfl.bluebrain.nexus.delta

import akka.http.scaladsl.server.Route
import cats.effect.Resource
import cats.effect.{IO, Resource}
import ch.epfl.bluebrain.nexus.delta.plugin.PluginsLoader.PluginLoaderConfig
import ch.epfl.bluebrain.nexus.delta.sdk.plugin.PluginDef
import ch.epfl.bluebrain.nexus.delta.wiring.DeltaModule
Expand Down Expand Up @@ -42,7 +42,7 @@ class MainSuite extends BioSuite with MainSuite.Fixture {
val modules: Module = (DeltaModule(cfg, config, cl) :: pluginsInfoModule :: pDefs.map(_.module)).merge

PlanVerifier()
.verify[Task](
.verify[IO](
bindings = modules,
roots = Roots.Everything,
providedKeys = Set.empty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import ch.epfl.bluebrain.nexus.testkit.IOValues
import com.typesafe.config.impl.ConfigImpl
import izumi.distage.model.definition.ModuleDef
import monix.bio.Task
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import monix.execution.Scheduler
import monix.execution.Scheduler.Implicits.global
import org.scalatest.matchers.should.Matchers
Expand All @@ -25,15 +26,18 @@ class PluginLoaderSpec extends AnyWordSpecLike with ScalatestRouteTest with Matc
val config = PluginLoaderConfig("../plugins/test-plugin/target")
"load plugins from .jar in a directory" in {
val (_, pluginsDef) = PluginsLoader(config).load.accepted
WiringInitializer(serviceModule, pluginsDef).use { case (_, locator) =>
Task.delay {
val route = locator.get[Set[PriorityRoute]].head
pluginsDef.head.priority shouldEqual 10
Get("/test-plugin") ~> route.route ~> check {
responseAs[String] shouldEqual "http://localhost"
WiringInitializer(serviceModule, pluginsDef)
.mapK(ioToTaskK)
.use { case (_, locator) =>
Task.delay {
val route = locator.get[Set[PriorityRoute]].head
pluginsDef.head.priority shouldEqual 10
Get("/test-plugin") ~> route.route ~> check {
responseAs[String] shouldEqual "http://localhost"
}
}
}
}.accepted
.accepted
}

"load overriding priority" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ trait MigrateEffectSyntax {

val taskToIoK: Task ~> IO = λ[Task ~> IO](toCatsIO(_))
val ioToUioK: IO ~> UIO = λ[IO ~> UIO](_.toUIO)
val ioToTaskK: IO ~> Task = λ[IO ~> Task](Task.from(_))

}

Expand Down
Loading

0 comments on commit bd60ed2

Please sign in to comment.