Skip to content

Commit

Permalink
Migrate graph analytics module to Cats Effect
Browse files Browse the repository at this point in the history
  • Loading branch information
dantb committed Oct 13, 2023
1 parent cb4b6b1 commit 0e7f7b0
Show file tree
Hide file tree
Showing 55 changed files with 352 additions and 318 deletions.
4 changes: 4 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ val mockitoVersion = "1.17.22"
val monixVersion = "3.4.1"
val monixBioVersion = "1.2.0"
val munitVersion = "1.0.0-M8"
val munitCEVersion = "1.0.7"
val nimbusJoseJwtVersion = "9.31"
val postgresJdbcVersion = "42.6.0"
val pureconfigVersion = "0.17.4"
Expand Down Expand Up @@ -115,6 +116,7 @@ lazy val magnolia = "com.softwaremill.magnolia1_2" %% "magnolia"
lazy val mockito = "org.mockito" %% "mockito-scala" % mockitoVersion
lazy val monixBio = "io.monix" %% "monix-bio" % monixBioVersion
lazy val munit = "org.scalameta" %% "munit" % munitVersion
lazy val munitCE = "org.typelevel" %% "munit-cats-effect-2" % munitCEVersion
lazy val nimbusJoseJwt = "com.nimbusds" % "nimbus-jose-jwt" % nimbusJoseJwtVersion
lazy val pureconfig = "com.github.pureconfig" %% "pureconfig" % pureconfigVersion
lazy val pureconfigCats = "com.github.pureconfig" %% "pureconfig-cats" % pureconfigVersion
Expand Down Expand Up @@ -329,6 +331,7 @@ lazy val sdk = project
akkaTestKitTyped % Test,
akkaHttpTestKit % Test,
munit % Test,
munitCE % Test,
scalaTest % Test
),
addCompilerPlugin(kindProjector),
Expand Down Expand Up @@ -1018,6 +1021,7 @@ Global / excludeLintKeys += packageDoc / publishArtifact
Global / excludeLintKeys += docs / paradoxRoots
Global / excludeLintKeys += docs / Paradox / paradoxNavigationDepth
Global / concurrentRestrictions += Tags.limit(Tags.Test, 1)
Global / onChangedBuildSource := ReloadOnSourceChanges

addCommandAlias(
"review",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import cats.effect.{IO, Resource}
import cats.syntax.traverse._
import cats.syntax.flatMap._
import cats.syntax.monadError._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toCatsIO
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.sdk.error.PluginError.PluginInitializationError
import ch.epfl.bluebrain.nexus.delta.sdk.plugin.{Plugin, PluginDef}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package ch.epfl.bluebrain.nexus.delta.config

import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceUtils
import ch.epfl.bluebrain.nexus.testkit.IOValues
import ch.epfl.bluebrain.nexus.testkit.ce.CatsIOValues
import com.typesafe.config.impl.ConfigImpl
import org.scalatest.BeforeAndAfterAll
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

class AppConfigSpec extends AnyWordSpecLike with Matchers with IOValues with BeforeAndAfterAll {
class AppConfigSpec extends AnyWordSpecLike with Matchers with IOValues with CatsIOValues with BeforeAndAfterAll {

implicit private val classLoader: ClassLoader = getClass.getClassLoader

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ trait MigrateEffectSyntax {
implicit def toCatsIO[E <: Throwable, A](io: BIO[E, A]): IO[A] = io.to[IO]
implicit def toCatsIOOps[E <: Throwable, A](io: BIO[E, A]): MonixBioToCatsIOOps[E, A] = new MonixBioToCatsIOOps(io)

implicit def toMonixBIO[E <: Throwable, A](io: IO[A]): BIO[E, A] = io.toBIO

implicit def toMonixBIOOps[A](io: IO[A]): CatsIOToBioOps[A] = new CatsIOToBioOps(io)

val taskToIoK: Task ~> IO = λ[Task ~> IO](toCatsIO(_))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.kernel.syntax
import cats.Functor
import cats.effect.IO
import cats.syntax.functor._
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.RetryStrategy
import com.typesafe.scalalogging.Logger
import monix.bio.{IO => BIO, Task, UIO}
Expand All @@ -18,6 +18,8 @@ trait IOSyntax {
implicit final def taskSyntaxLogErrors[A](task: Task[A]): TaskOps[A] = new TaskOps(task)

implicit final def ioFunctorOps[A, F[_]: Functor](io: IO[F[A]]): IOFunctorOps[A, F] = new IOFunctorOps(io)

implicit final def ioOps[A](io: IO[A]): IOOps[A] = new IOOps(io)
}

final class BIORetryStrategyOps[E, A](private val io: BIO[E, A]) extends AnyVal {
Expand Down Expand Up @@ -66,3 +68,16 @@ final class IOFunctorOps[A, F[_]: Functor](private val io: IO[F[A]]) {
*/
def mapValue[B](f: A => B): IO[F[B]] = io.map(_.map(f))
}

final class IOOps[A](private val io: IO[A]) {

/**
* Log errors before hiding them
*/
def logAndDiscardErrors(action: String)(implicit logger: Logger): IO[A] =
// TODO the method this replicates says it hides errors, but it uses `terminate` - I'm not sure I understand the desired semantics?
// You can't discard errors and return an A at the same time?
io.onError { ex =>
IO.delay(logger.warn(s"A Task is hiding an error while '$action'", ex))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@ import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceError.{Invali
import com.github.jknack.handlebars.{EscapingStrategy, Handlebars}
import io.circe.parser.parse
import io.circe.{Json, JsonObject}
import monix.bio.IO
import cats.syntax.all._

import java.io.InputStream
import java.util.Properties
import scala.io.{Codec, Source}
import scala.jdk.CollectionConverters._
import cats.effect.IO
import io.circe.ParsingFailure

trait ClasspathResourceUtils {

final def absolutePath(resourcePath: String)(implicit classLoader: ClassLoader): IO[ResourcePathNotFound, String] =
IO.fromOption(
Option(getClass.getResource(resourcePath)) orElse Option(classLoader.getResource(resourcePath)),
final def absolutePath(resourcePath: String)(implicit classLoader: ClassLoader): IO[String] =
IO.fromOption(Option(getClass.getResource(resourcePath)) orElse Option(classLoader.getResource(resourcePath)))(
ResourcePathNotFound(resourcePath)
).map(_.getPath)

Expand All @@ -29,11 +30,11 @@ trait ClasspathResourceUtils {
* the content of the referenced resource as an [[InputStream]] or a [[ClasspathResourceError]] when the resource
* is not found
*/
def ioStreamOf(resourcePath: String)(implicit classLoader: ClassLoader): IO[ClasspathResourceError, InputStream] =
IO.deferAction { _ =>
def ioStreamOf(resourcePath: String)(implicit classLoader: ClassLoader): IO[InputStream] =
IO.defer {
lazy val fromClass = Option(getClass.getResourceAsStream(resourcePath))
val fromClassLoader = Option(classLoader.getResourceAsStream(resourcePath))
IO.fromOption(fromClass orElse fromClassLoader, ResourcePathNotFound(resourcePath))
IO.fromOption(fromClass orElse fromClassLoader)(ResourcePathNotFound(resourcePath))
}

/**
Expand All @@ -49,7 +50,7 @@ trait ClasspathResourceUtils {
final def ioContentOf(
resourcePath: String,
attributes: (String, Any)*
)(implicit classLoader: ClassLoader): IO[ClasspathResourceError, String] =
)(implicit classLoader: ClassLoader): IO[String] =
resourceAsTextFrom(resourcePath).map {
case text if attributes.isEmpty => text
case text => handleBars.compileInline(text).apply(attributes.toMap.asJava)
Expand All @@ -67,7 +68,7 @@ trait ClasspathResourceUtils {
*/
final def ioPropertiesOf(resourcePath: String)(implicit
classLoader: ClassLoader
): IO[ClasspathResourceError, Map[String, String]] =
): IO[Map[String, String]] =
ioStreamOf(resourcePath).map { is =>
val props = new Properties()
props.load(is)
Expand All @@ -87,10 +88,10 @@ trait ClasspathResourceUtils {
final def ioJsonContentOf(
resourcePath: String,
attributes: (String, Any)*
)(implicit classLoader: ClassLoader): IO[ClasspathResourceError, Json] =
)(implicit classLoader: ClassLoader): IO[Json] =
for {
text <- ioContentOf(resourcePath, attributes: _*)
json <- IO.fromEither(parse(text)).mapError(InvalidJson(resourcePath, text, _))
json <- IO.fromEither(parse(text)).adaptError { case e: ParsingFailure => InvalidJson(resourcePath, text, e) }
} yield json

/**
Expand All @@ -105,15 +106,15 @@ trait ClasspathResourceUtils {
*/
final def ioJsonObjectContentOf(resourcePath: String, attributes: (String, Any)*)(implicit
classLoader: ClassLoader
): IO[ClasspathResourceError, JsonObject] =
): IO[JsonObject] =
for {
json <- ioJsonContentOf(resourcePath, attributes: _*)
jsonObj <- IO.fromOption(json.asObject, InvalidJsonObject(resourcePath))
jsonObj <- IO.fromOption(json.asObject)(InvalidJsonObject(resourcePath))
} yield jsonObj

private def resourceAsTextFrom(resourcePath: String)(implicit
classLoader: ClassLoader
): IO[ClasspathResourceError, String] =
): IO[String] =
ioStreamOf(resourcePath).map(is => Source.fromInputStream(is)(Codec.UTF8).mkString)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
package ch.epfl.bluebrain.nexus.delta.kernel.utils

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceError.{InvalidJson, InvalidJsonObject, ResourcePathNotFound}
import io.circe.syntax._
import io.circe.{Json, JsonObject}
import monix.bio.IO
import monix.execution.Scheduler
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

class ClasspathResourceUtilsSpec extends AnyWordSpecLike with Matchers with ClasspathResourceUtils with ScalaFutures {
implicit private val sc: Scheduler = Scheduler.global
import ClasspathResourceUtils._

class ClasspathResourceUtilsSpec extends AnyWordSpecLike with Matchers with ScalaFutures {
implicit private val classLoader: ClassLoader = getClass.getClassLoader

private def accept[E, A](io: IO[E, A]): A =
io.attempt.runSyncUnsafe() match {
private def accept[A](io: IO[A]): A =
io.attempt.unsafeRunSync() match {
case Left(value) => fail(s"Expected Right, found Left with value '$value'")
case Right(value) => value
}

private def reject[E, A](io: IO[E, A]): E =
io.attempt.runSyncUnsafe() match {
private def reject[A](io: IO[A]): Throwable =
io.attempt.unsafeRunSync() match {
case Left(value) => value
case Right(value) => fail(s"Expected Left, found Right with value '$value'")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph

import cats.effect.IO
import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph

import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination.FromPagination
import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceUtils.ioContentOf
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlQueryResponseType.{Aux, SparqlResultsJson}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import akka.http.scaladsl.model.headers.{BasicHttpCredentials, HttpCredentials,
import akka.http.scaladsl.model.{HttpEntity, HttpHeader, Uri}
import akka.http.scaladsl.unmarshalling.FromEntityUnmarshaller
import akka.http.scaladsl.unmarshalling.PredefinedFromEntityUnmarshallers.stringUnmarshaller
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceUtils
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient.timeoutHeader
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlClientError.{InvalidCountRequest, WrappedHttpClientError}
Expand Down Expand Up @@ -38,8 +39,9 @@ class BlazegraphClient(
private val serviceVersion = """(buildVersion">)([^<]*)""".r
private val serviceName = Name.unsafe("blazegraph")

// TODODODODODO Sort out memoization - maybe inject in files values?
private val defaultProperties =
ClasspathResourceUtils.ioPropertiesOf("blazegraph/index.properties").hideErrors.memoizeOnSuccess
ClasspathResourceUtils.ioPropertiesOf("blazegraph/index.properties")

override def query[R <: SparqlQueryResponse](
indices: Iterable[String],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.migration

import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceUtils
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.CompositeViews
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.migration.MigrateCompositeViews.{eventsToMigrate, statesToMigrate}
Expand All @@ -9,18 +10,19 @@ import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewE
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.{CompositeViewEvent, CompositeViewState, CompositeViewValue}
import ch.epfl.bluebrain.nexus.delta.rdf.syntax.iriStringContextSyntax
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, Tag}
import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import doobie.postgres.implicits._
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, Tag}
import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie
import ch.epfl.bluebrain.nexus.testkit.bio.BioSuite
import doobie.postgres.implicits._
import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._

import doobie.Get
import munit.{AnyFixture, Location}
import doobie.implicits._
import io.circe.syntax.EncoderOps
import io.circe.JsonObject
import monix.bio.IO
import io.circe.syntax.EncoderOps
import monix.bio.{IO, UIO}
import munit.{AnyFixture, Location}

import java.time.Instant

Expand Down Expand Up @@ -110,7 +112,7 @@ object MigrateCompositeViewsSuite extends ClasspathResourceUtils {
} yield (project, id, rev)
}

def loadEvent(jsonPath: String)(implicit xas: Transactors, classLoader: ClassLoader) = {
def loadEvent(jsonPath: String)(implicit xas: Transactors, classLoader: ClassLoader): IO[Unit, Unit] = {
def insert(project: ProjectRef, id: String, rev: Int, json: JsonObject) =
sql"""
| INSERT INTO scoped_events (
Expand All @@ -130,17 +132,17 @@ object MigrateCompositeViewsSuite extends ClasspathResourceUtils {
| $rev,
| ${json.asJson},
| ${Instant.EPOCH}
| )""".stripMargin.update.run.void.transact(xas.write)
| )""".stripMargin.update.run.void.transact(xas.write).hideErrors

for {
json <- ioJsonObjectContentOf(jsonPath)
json <- ioJsonObjectContentOf(jsonPath): UIO[JsonObject]
(project, id, rev) <- extractIdentifiers(json)
_ <- insert(project, id, rev, json)
} yield ()
}

def loadState(tag: Tag, jsonPath: String)(implicit xas: Transactors, classLoader: ClassLoader) = {
def insert(project: ProjectRef, id: String, rev: Int, json: JsonObject) =
def insert(project: ProjectRef, id: String, rev: Int, json: JsonObject): UIO[Unit] =
sql"""
| INSERT INTO scoped_states (
| type,
Expand All @@ -163,10 +165,10 @@ object MigrateCompositeViewsSuite extends ClasspathResourceUtils {
| ${json.asJson},
| ${false},
| ${Instant.EPOCH}
| )""".stripMargin.update.run.void.transact(xas.write)
| )""".stripMargin.update.run.void.transact(xas.write).hideErrors

for {
json <- ioJsonObjectContentOf(jsonPath)
json <- ioJsonObjectContentOf(jsonPath): UIO[JsonObject]
(project, id, rev) <- extractIdentifiers(json)
_ <- insert(project, id, rev, json)
} yield ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch

import akka.actor.typed.ActorSystem
import cats.effect.Clock
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toMonixBIO
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch

import cats.effect.IO
import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent
import ch.epfl.bluebrain.nexus.delta.kernel.syntax._
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchViews.entityType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch

import cats.effect.Clock
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent
import ch.epfl.bluebrain.nexus.delta.kernel.utils.{IOUtils, UUIDF}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchViews._
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch

import cats.data.NonEmptyChain
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchClient, IndexLabel}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.ElasticSearchSink
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch

import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchClient, IndexLabel}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewRejection.{InvalidElasticSearchIndexPayload, InvalidPipeline, InvalidViewReferences, PermissionIsNotDefined, TooManyViewReferences, WrappedElasticSearchClientError}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewValue.{AggregateElasticSearchViewValue, IndexingElasticSearchViewValue}
Expand All @@ -14,7 +15,6 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{PipeChain, ProjectionErr}
import io.circe.JsonObject
import monix.bio.{IO, UIO}
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._

import java.util.UUID

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import akka.http.scaladsl.model.headers.BasicHttpCredentials
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.RetryStrategy
import ch.epfl.bluebrain.nexus.delta.kernel.RetryStrategy.logError
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UrlUtils
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.BulkResponse.MixedOutcomes.Outcome
Expand Down
Loading

0 comments on commit 0e7f7b0

Please sign in to comment.