Skip to content

Commit

Permalink
Migrate Delta app to Cats effect (#4376)
Browse files Browse the repository at this point in the history
* Migrate Delta app to Cats effect

---------

Co-authored-by: Simon Dumas <[email protected]>
Co-authored-by: Daniel Bell <[email protected]>
  • Loading branch information
3 people authored Oct 18, 2023
1 parent 05d18a2 commit 9136b71
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 78 deletions.
113 changes: 52 additions & 61 deletions delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/Main.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package ch.epfl.bluebrain.nexus.delta

import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.LoggerOps
import akka.actor.typed.scaladsl.adapter._
import akka.actor.{ActorSystem => ActorSystemClassic}
import akka.http.scaladsl.Http
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import akka.http.scaladsl.server.{ExceptionHandler, RejectionHandler, Route, RouteResult}
import cats.effect.{ExitCode, Resource}
import cats.effect.{ExitCode, IO, IOApp, Resource}
import ch.epfl.bluebrain.nexus.delta.config.{AppConfig, BuildInfo}
import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMonitoring
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.kamon.{KamonMonitoring, KamonMonitoringCats}
import ch.epfl.bluebrain.nexus.delta.plugin.PluginsLoader.PluginLoaderConfig
import ch.epfl.bluebrain.nexus.delta.plugin.{PluginsLoader, WiringInitializer}
import ch.epfl.bluebrain.nexus.delta.sdk.PriorityRoute
Expand All @@ -21,51 +22,47 @@ import ch.epfl.bluebrain.nexus.delta.wiring.DeltaModule
import ch.megard.akka.http.cors.scaladsl.CorsDirectives.cors
import ch.megard.akka.http.cors.scaladsl.settings.CorsSettings
import com.typesafe.config.Config
import com.typesafe.scalalogging.{Logger => Logging}
import izumi.distage.model.Locator
import monix.bio.{BIOApp, Cause, Task, UIO}
import org.slf4j.{Logger, LoggerFactory}

import scala.concurrent.duration.DurationInt

object Main extends BIOApp {
object Main extends IOApp {

private val externalConfigEnvVariable = "DELTA_EXTERNAL_CONF"
private val pluginEnvVariable = "DELTA_PLUGINS"
private val log: Logging = Logging[Main.type]
private val logger = Logger.cats[Main.type]
val pluginsMaxPriority: Int = 100
val pluginsMinPriority: Int = 1

override def run(args: List[String]): UIO[ExitCode] = {
LoggerFactory.getLogger("Main") // initialize logging to suppress SLF4J error
override def run(args: List[String]): IO[ExitCode] = {
// TODO: disable this for now, but investigate why it happens
System.setProperty("cats.effect.logNonDaemonThreadsOnExit", "false")
val config = sys.env.get(pluginEnvVariable).fold(PluginLoaderConfig())(PluginLoaderConfig(_))
start(config)
.use(_ => UIO.never)
.use(_ => IO.never)
.as(ExitCode.Success)
.redeemCauseWith(logTerminalError, UIO.pure)
.redeemWith(logTerminalError, IO.pure)
}

private def logTerminalError: Cause[Throwable] => UIO[ExitCode] = c =>
UIO.delay(log.error("Delta failed to start", c.toThrowable)).as(ExitCode.Error)
private def logTerminalError: Throwable => IO[ExitCode] = e =>
logger.error(e)("Delta failed to start").as(ExitCode.Error)

private[delta] def start(loaderConfig: PluginLoaderConfig): Resource[Task, Locator] =
private[delta] def start(loaderConfig: PluginLoaderConfig): Resource[IO, Locator] =
for {
_ <- Resource.eval(UIO.delay(log.info(s"Starting Nexus Delta version '${BuildInfo.version}'.")))
_ <- Resource.eval(UIO.delay(log.info(s"Loading plugins and config...")))
_ <- Resource.eval(logger.info(s"Starting Nexus Delta version '${BuildInfo.version}'."))
_ <- Resource.eval(logger.info(s"Loading plugins and config..."))
(cfg, config, cl, pluginDefs) <- Resource.eval(loadPluginsAndConfig(loaderConfig))
_ <- Resource.eval(KamonMonitoring.initialize(config))
_ <- Resource.eval(KamonMonitoringCats.initialize(config))
modules = DeltaModule(cfg, config, cl)
(plugins, locator) <- WiringInitializer(modules, pluginDefs).mapK(ioToTaskK)
(plugins, locator) <- WiringInitializer(modules, pluginDefs)
_ <- bootstrap(locator, plugins)
} yield locator

private[delta] def loadPluginsAndConfig(
config: PluginLoaderConfig
): Task[(AppConfig, Config, ClassLoader, List[PluginDef])] =
): IO[(AppConfig, Config, ClassLoader, List[PluginDef])] =
for {
(classLoader, pluginDefs) <- PluginsLoader(config).load
(classLoader, pluginDefs) <- toCatsIO(PluginsLoader(config).load)
_ <- logPlugins(pluginDefs)
enabledDefs = pluginDefs.filter(_.enabled)
_ <- validatePriority(enabledDefs)
Expand All @@ -75,19 +72,17 @@ object Main extends BIOApp {
(appConfig, mergedConfig) <- AppConfig.loadOrThrow(cfgPathOpt, configNames, classLoader)
} yield (appConfig, mergedConfig, classLoader, enabledDefs)

private def logPlugins(pluginDefs: List[PluginDef]): UIO[Unit] = {
private def logPlugins(pluginDefs: List[PluginDef]): IO[Unit] = {
def pluginLogEntry(pdef: PluginDef): String =
s"${pdef.info.name} - version: '${pdef.info.version}', enabled: '${pdef.enabled}'"

if (pluginDefs.isEmpty) UIO.delay(log.info("No plugins discovered."))
if (pluginDefs.isEmpty) logger.info("No plugins discovered.")
else
UIO.delay {
log.info(s"Discovered plugins: ${pluginDefs.map(p => pluginLogEntry(p)).mkString("\n- ", "\n- ", "")}")
}
logger.info(s"Discovered plugins: ${pluginDefs.map(p => pluginLogEntry(p)).mkString("\n- ", "\n- ", "")}")
}

private def validatePriority(pluginsDef: List[PluginDef]): Task[Unit] =
Task.raiseWhen(pluginsDef.map(_.priority).distinct.size != pluginsDef.size)(
private def validatePriority(pluginsDef: List[PluginDef]): IO[Unit] =
IO.raiseWhen(pluginsDef.map(_.priority).distinct.size != pluginsDef.size)(
PluginInitializationError(
"Several plugins have the same priority:" + pluginsDef
.map(p => s"name '${p.info.name}' priority '${p.priority}'")
Expand All @@ -96,16 +91,16 @@ object Main extends BIOApp {
) >>
(pluginsDef.find(p => p.priority > pluginsMaxPriority || p.priority < pluginsMinPriority) match {
case Some(pluginDef) =>
Task.raiseError(
IO.raiseError(
PluginInitializationError(
s"Plugin '$pluginDef' has a priority out of the allowed range [$pluginsMinPriority - $pluginsMaxPriority]"
)
)
case None => Task.unit
case None => IO.unit
})

private def validateDifferentName(pluginsDef: List[PluginDef]): Task[Unit] =
Task.raiseWhen(pluginsDef.map(_.info.name).distinct.size != pluginsDef.size)(
private def validateDifferentName(pluginsDef: List[PluginDef]): IO[Unit] =
IO.raiseWhen(pluginsDef.map(_.info.name).distinct.size != pluginsDef.size)(
PluginInitializationError(
s"Several plugins have the same name: ${pluginsDef.map(p => s"name '${p.info.name}'").mkString(",")}"
)
Expand Down Expand Up @@ -133,40 +128,36 @@ object Main extends BIOApp {
}
}

private def bootstrap(locator: Locator, plugins: List[Plugin]): Resource[Task, Unit] = {
private def bootstrap(locator: Locator, plugins: List[Plugin]): Resource[IO, Unit] = {
implicit val as: ActorSystemClassic = locator.get[ActorSystem[Nothing]].toClassic
implicit val cfg: AppConfig = locator.get[AppConfig]
val logger = locator.get[Logger]

logger.info("Booting up service....")

val acquire = Task
.fromFutureLike(
Task.delay(
Http()
.newServerAt(
cfg.http.interface,
cfg.http.port
)
.bindFlow(RouteResult.routeToFlow(routes(locator)))
)
)
.flatMap { binding =>
Task.delay(logger.infoN("Bound to {}:{}", binding.localAddress.getHostString, binding.localAddress.getPort))
}
.onErrorRecoverWith { th =>
th.printStackTrace()
Task.delay(
logger.error(
s"Failed to perform an http binding on ${cfg.http.interface}:${cfg.http.port}",
th
val startHttpServer = IO.fromFuture(
IO(
Http()
.newServerAt(
cfg.http.interface,
cfg.http.port
)
) >> Task
.traverse(plugins)(_.stop())
.timeout(30.seconds) >> KamonMonitoring.terminate
}
.bindFlow(RouteResult.routeToFlow(routes(locator)))
)
)

val acquire = {
for {
_ <- logger.info("Booting up service....")
binding <- startHttpServer
_ <- logger.info(s"Bound to ${binding.localAddress.getHostString}:${binding.localAddress.getPort}")
} yield ()
}.recoverWith { th =>
logger.error(th)(
s"Failed to perform an http binding on ${cfg.http.interface}:${cfg.http.port}"
) >> plugins
.traverse(_.stop())
.timeout(30.seconds) >> KamonMonitoring.terminate
}

val release = Task.deferFuture(as.terminate())
val release = IO.fromFuture(IO(as.terminate()))

Resource.make(acquire)(_ => release.void)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ch.epfl.bluebrain.nexus.delta.plugin

import cats.effect.{IO, Resource}
import cats.effect.{ContextShift, IO, Resource, Timer}
import cats.syntax.traverse._
import cats.syntax.flatMap._
import cats.syntax.monadError._
Expand All @@ -24,9 +24,13 @@ object WiringInitializer {
def apply(
serviceModule: ModuleDef,
pluginsDef: List[PluginDef]
): Resource[IO, (List[Plugin], Locator)] = {
)(implicit contextShift: ContextShift[IO], timer: Timer[IO]): Resource[IO, (List[Plugin], Locator)] = {
val catsEffectModule = new ModuleDef {
make[ContextShift[IO]].fromValue(contextShift)
make[Timer[IO]].fromValue(timer)
}
val pluginsInfoModule = new ModuleDef { make[List[PluginDef]].from(pluginsDef) }
val appModules = (serviceModule :: pluginsInfoModule :: pluginsDef.map(_.module)).merge
val appModules = (catsEffectModule :: serviceModule :: pluginsInfoModule :: pluginsDef.map(_.module)).merge

// workaround for: java.lang.NoClassDefFoundError: zio/blocking/package$Blocking$Service
implicit val defaultModule: DefaultModule[IO] = DefaultModule.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import monix.bio.UIO
import monix.execution.Scheduler
import org.slf4j.{Logger, LoggerFactory}

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt

/**
Expand Down Expand Up @@ -109,8 +108,6 @@ class DeltaModule(appCfg: AppConfig, config: Config)(implicit classLoader: Class

make[Clock[UIO]].from(Clock[UIO])
make[Clock[IO]].from(Clock.create[IO])
make[Timer[IO]].from(IO.timer(ExecutionContext.global))
make[ContextShift[IO]].from(IO.contextShift(ExecutionContext.global))
make[EvaluationExecution].from(EvaluationExecution(_, _))
make[UUIDF].from(UUIDF.random)
make[Scheduler].from(scheduler)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package ch.epfl.bluebrain.nexus.delta

import akka.http.scaladsl.server.Route
import cats.effect.{IO, Resource}
import cats.effect.{ContextShift, IO, Resource, Timer}
import ch.epfl.bluebrain.nexus.delta.plugin.PluginsLoader.PluginLoaderConfig
import ch.epfl.bluebrain.nexus.delta.sdk.plugin.PluginDef
import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie._
import ch.epfl.bluebrain.nexus.delta.wiring.DeltaModule
import ch.epfl.bluebrain.nexus.testkit.bio.{BioSuite, ResourceFixture}
import ch.epfl.bluebrain.nexus.testkit.bio.ResourceFixture
import ch.epfl.bluebrain.nexus.testkit.ce.CatsEffectSuite
import ch.epfl.bluebrain.nexus.testkit.elasticsearch.ElasticSearchContainer
import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie._
import ch.epfl.bluebrain.nexus.testkit.postgres.PostgresContainer
import com.typesafe.config.impl.ConfigImpl
import izumi.distage.model.definition.{Module, ModuleDef}
Expand All @@ -24,7 +25,7 @@ import java.nio.file.{Files, Paths}
* - HOCON configuration files match their classes counterpart
* - Distage wiring is valid
*/
class MainSuite extends BioSuite with MainSuite.Fixture {
class MainSuite extends CatsEffectSuite with MainSuite.Fixture {

private val pluginsParentPath = Paths.get("target/plugins").toAbsolutePath
private val pluginLoaderConfig = PluginLoaderConfig(pluginsParentPath.toString)
Expand All @@ -37,9 +38,14 @@ class MainSuite extends BioSuite with MainSuite.Fixture {
}

test("yield a correct plan") {
val (cfg, config, cl, pDefs) = Main.loadPluginsAndConfig(pluginLoaderConfig).runSyncUnsafe()
val catsEffectModule = new ModuleDef {
make[ContextShift[IO]].fromValue(contextShift)
make[Timer[IO]].fromValue(timer)
}
val (cfg, config, cl, pDefs) = Main.loadPluginsAndConfig(pluginLoaderConfig).unsafeRunSync()
val pluginsInfoModule = new ModuleDef { make[List[PluginDef]].from(pDefs) }
val modules: Module = (DeltaModule(cfg, config, cl) :: pluginsInfoModule :: pDefs.map(_.module)).merge
val modules: Module =
(catsEffectModule :: DeltaModule(cfg, config, cl) :: pluginsInfoModule :: pDefs.map(_.module)).merge

PlanVerifier()
.verify[IO](
Expand All @@ -56,15 +62,15 @@ class MainSuite extends BioSuite with MainSuite.Fixture {
Main
.start(pluginLoaderConfig)
.use { locator =>
Task.delay(locator.get[Vector[Route]])
IO.delay(locator.get[Vector[Route]])
}
.void
}
}

object MainSuite {

trait Fixture { self: BioSuite =>
trait Fixture { self: CatsEffectSuite =>

// Overload config via system properties
private def acquire(postgres: PostgresContainer, elastic: ElasticSearchContainer) = Task.delay {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.plugin

import akka.http.scaladsl.testkit.ScalatestRouteTest
import cats.effect.{ContextShift, IO, Timer}
import ch.epfl.bluebrain.nexus.delta.plugin.PluginsLoader.PluginLoaderConfig
import ch.epfl.bluebrain.nexus.delta.sdk.PriorityRoute
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
Expand All @@ -14,6 +15,8 @@ import monix.execution.Scheduler.Implicits.global
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

import scala.concurrent.ExecutionContext

class PluginLoaderSpec extends AnyWordSpecLike with ScalatestRouteTest with Matchers with IOValues {

private val baseUri = BaseUri.withoutPrefix("http://localhost")
Expand All @@ -22,6 +25,9 @@ class PluginLoaderSpec extends AnyWordSpecLike with ScalatestRouteTest with Matc
make[Scheduler].from(Scheduler.global)
}

implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)

"A PluginLoader" should {
val config = PluginLoaderConfig("../plugins/test-plugin/target")
"load plugins from .jar in a directory" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ trait IndexingAction {
// We build and start the projections where the resource will apply
_ <- projections(project, elem)
.translate(taskToIoK)
// TODO make this configurable
.parEvalMap(5) {
.evalMap {
case s: SuccessElem[CompiledProjection] =>
runProjection(s.value, failed => errorsRef.update(_ ++ failed))
case _: DroppedElem => IO.unit
Expand All @@ -61,13 +60,14 @@ trait IndexingAction {
} yield errors
}

private def runProjection(compiled: CompiledProjection, saveFailedElems: List[FailedElem] => IO[Unit]) =
private def runProjection(compiled: CompiledProjection, saveFailedElems: List[FailedElem] => IO[Unit]) = toCatsIO {
for {
projection <- Projection(compiled, UIO.none, _ => UIO.unit, saveFailedElems(_).toUIO)
_ <- projection.waitForCompletion(timeout)
// We stop the projection if it has not complete yet
_ <- projection.stop()
} yield ()
}
}

object IndexingAction {
Expand Down

0 comments on commit 9136b71

Please sign in to comment.