diff --git a/build.sbt b/build.sbt index 8e0efcefdc..2027409aa4 100755 --- a/build.sbt +++ b/build.sbt @@ -50,6 +50,7 @@ val mockitoVersion = "1.17.22" val monixVersion = "3.4.1" val monixBioVersion = "1.2.0" val munitVersion = "1.0.0-M8" +val munitCEVersion = "1.0.7" val nimbusJoseJwtVersion = "9.31" val postgresJdbcVersion = "42.6.0" val pureconfigVersion = "0.17.4" @@ -115,6 +116,7 @@ lazy val magnolia = "com.softwaremill.magnolia1_2" %% "magnolia" lazy val mockito = "org.mockito" %% "mockito-scala" % mockitoVersion lazy val monixBio = "io.monix" %% "monix-bio" % monixBioVersion lazy val munit = "org.scalameta" %% "munit" % munitVersion +lazy val munitCE = "org.typelevel" %% "munit-cats-effect-2" % munitCEVersion lazy val nimbusJoseJwt = "com.nimbusds" % "nimbus-jose-jwt" % nimbusJoseJwtVersion lazy val pureconfig = "com.github.pureconfig" %% "pureconfig" % pureconfigVersion lazy val pureconfigCats = "com.github.pureconfig" %% "pureconfig-cats" % pureconfigVersion @@ -329,6 +331,7 @@ lazy val sdk = project akkaTestKitTyped % Test, akkaHttpTestKit % Test, munit % Test, + munitCE % Test, scalaTest % Test ), addCompilerPlugin(kindProjector), @@ -1018,6 +1021,7 @@ Global / excludeLintKeys += packageDoc / publishArtifact Global / excludeLintKeys += docs / paradoxRoots Global / excludeLintKeys += docs / Paradox / paradoxNavigationDepth Global / concurrentRestrictions += Tags.limit(Tags.Test, 1) +Global / onChangedBuildSource := ReloadOnSourceChanges addCommandAlias( "review", diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugin/WiringInitializer.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugin/WiringInitializer.scala index 35fef91f40..df54c01c60 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugin/WiringInitializer.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugin/WiringInitializer.scala @@ -4,7 +4,7 @@ import cats.effect.{IO, Resource} import cats.syntax.traverse._ import cats.syntax.flatMap._ import cats.syntax.monadError._ -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toCatsIO import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.sdk.error.PluginError.PluginInitializationError import ch.epfl.bluebrain.nexus.delta.sdk.plugin.{Plugin, PluginDef} diff --git a/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/config/AppConfigSpec.scala b/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/config/AppConfigSpec.scala index bab0f8c5ac..ade05686ee 100644 --- a/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/config/AppConfigSpec.scala +++ b/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/config/AppConfigSpec.scala @@ -2,12 +2,13 @@ package ch.epfl.bluebrain.nexus.delta.config import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceUtils import ch.epfl.bluebrain.nexus.testkit.IOValues +import ch.epfl.bluebrain.nexus.testkit.ce.CatsIOValues import com.typesafe.config.impl.ConfigImpl import org.scalatest.BeforeAndAfterAll import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike -class AppConfigSpec extends AnyWordSpecLike with Matchers with IOValues with BeforeAndAfterAll { +class AppConfigSpec extends AnyWordSpecLike with Matchers with IOValues with CatsIOValues with BeforeAndAfterAll { implicit private val classLoader: ClassLoader = getClass.getClassLoader diff --git a/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/effect/migration/MigrateEffectSyntax.scala b/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/effect/migration/MigrateEffectSyntax.scala index c1c4d6d25a..1a54334266 100644 --- a/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/effect/migration/MigrateEffectSyntax.scala +++ b/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/effect/migration/MigrateEffectSyntax.scala @@ -16,6 +16,8 @@ trait MigrateEffectSyntax { io ) + implicit def toMonixBIO[E <: Throwable, A](io: IO[A]): BIO[E, A] = io.toBIO + implicit def toMonixBIOOps[A](io: IO[A]): CatsIOToBioOps[A] = new CatsIOToBioOps(io) val taskToIoK: Task ~> IO = λ[Task ~> IO](toCatsIO(_)) diff --git a/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/syntax/IOSyntax.scala b/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/syntax/IOSyntax.scala index 590ef31e23..ff6e3e50b1 100644 --- a/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/syntax/IOSyntax.scala +++ b/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/syntax/IOSyntax.scala @@ -1,7 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.kernel.syntax import cats.Functor import cats.effect.IO -import cats.syntax.functor._ +import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.RetryStrategy import com.typesafe.scalalogging.Logger import monix.bio.{IO => BIO, Task, UIO} @@ -18,6 +18,8 @@ trait IOSyntax { implicit final def taskSyntaxLogErrors[A](task: Task[A]): TaskOps[A] = new TaskOps(task) implicit final def ioFunctorOps[A, F[_]: Functor](io: IO[F[A]]): IOFunctorOps[A, F] = new IOFunctorOps(io) + + implicit final def ioOps[A](io: IO[A]): IOOps[A] = new IOOps(io) } final class BIORetryStrategyOps[E, A](private val io: BIO[E, A]) extends AnyVal { @@ -66,3 +68,16 @@ final class IOFunctorOps[A, F[_]: Functor](private val io: IO[F[A]]) { */ def mapValue[B](f: A => B): IO[F[B]] = io.map(_.map(f)) } + +final class IOOps[A](private val io: IO[A]) { + + /** + * Log errors before hiding them + */ + def logAndDiscardErrors(action: String)(implicit logger: Logger): IO[A] = + // TODO the method this replicates says it hides errors, but it uses `terminate` - I'm not sure I understand the desired semantics? + // You can't discard errors and return an A at the same time? + io.onError { ex => + IO.delay(logger.warn(s"A Task is hiding an error while '$action'", ex)) + } +} diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphScopeInitialization.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphScopeInitialization.scala index 2e595dfa91..83e5b46612 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphScopeInitialization.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphScopeInitialization.scala @@ -1,7 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph import cats.effect.IO -import cats.implicits._ import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/client/BlazegraphClient.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/client/BlazegraphClient.scala index 26cefc6958..4753615d81 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/client/BlazegraphClient.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/client/BlazegraphClient.scala @@ -38,6 +38,7 @@ class BlazegraphClient( private val serviceVersion = """(buildVersion">)([^<]*)""".r private val serviceName = Name.unsafe("blazegraph") + // TODODODODODO Sort out memoization - maybe inject in files values? private val defaultProperties = ClasspathResourceUtils.ioPropertiesOf("blazegraph/index.properties").hideErrors.memoizeOnSuccess diff --git a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/IndexingViewDefSuite.scala b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/IndexingViewDefSuite.scala index 5b5030e769..613cac12a6 100644 --- a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/IndexingViewDefSuite.scala +++ b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/IndexingViewDefSuite.scala @@ -56,7 +56,7 @@ class IndexingViewDefSuite extends BioSuite { private val aggregate = AggregateBlazegraphViewValue(Some("viewName"), Some("viewDescription"), NonEmptySet.of(viewRef)) - private val sink = CacheSink.states[NTriples] + private val sink = CacheSink2.states[NTriples] private def state(v: BlazegraphViewValue) = BlazegraphViewState( id, @@ -156,7 +156,7 @@ class IndexingViewDefSuite extends BioSuite { for { compiled <- IndexingViewDef.compile( v, - _ => Operation.merge(FilterDeprecated.withConfig(()), FilterByType.withConfig(filterByTypeConfig)), + _ => OperationF.merge(FilterDeprecated.withConfig(()), FilterByType.withConfig(filterByTypeConfig)), GraphResourceStream.unsafeFromStream(PullRequestStream.generate(projectRef)), sink ) diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeViewDef.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeViewDef.scala index 452c89069a..16598cb21d 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeViewDef.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeViewDef.scala @@ -493,7 +493,7 @@ object CompositeViewDef { ): Either[ProjectionErr, Operation] = { //TODO Add leap on target val branchProgress = progress.branches.get(branch) - Operation + OperationF .merge(operation, closeBranch(branch, branchProgress.getOrElse(ProjectionProgress.NoProgress))) } @@ -519,9 +519,9 @@ object CompositeViewDef { for { pipes <- source.pipeChain.traverse(compilePipeChain) // We apply `Operation.tap` as we want to keep the GraphResource for the rest of the stream - tail <- Operation.merge(GraphResourceToNTriples, sink).map(_.tap) + tail <- OperationF.merge(GraphResourceToNTriples, sink).map(_.tap) chain = pipes.fold(NonEmptyChain.one(tail))(NonEmptyChain(_, tail)) - operation <- Operation.merge(chain) + operation <- OperationF.merge(chain) // We create the elem stream for the two types of branch // The main source produces an infinite stream and waits for new elements mainSource = graphStream.main(source, project) @@ -550,7 +550,7 @@ object CompositeViewDef { for { pipes <- target.pipeChain.traverse(compilePipeChain) chain = pipes.fold(tail)(NonEmptyChain.one(_) ++ tail) - result <- Operation.merge(chain) + result <- OperationF.merge(chain) } yield target.id -> result } diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/projections/CompositeProjections.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/projections/CompositeProjections.scala index f8dc446497..4fdf3eb515 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/projections/CompositeProjections.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/projections/CompositeProjections.scala @@ -110,7 +110,7 @@ object CompositeProjections { branch: CompositeBranch, progress: ProjectionProgress ): Operation = - Operation.fromFs2Pipe[Unit]( + OperationF.fromFs2Pipe[Task, Unit]( Projection.persist( progress, compositeProgressStore.save(view.indexingRef, branch, _), diff --git a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeViewDefSuite.scala b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeViewDefSuite.scala index 3c942cf64c..b16a6e7be6 100644 --- a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeViewDefSuite.scala +++ b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeViewDefSuite.scala @@ -1,5 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing +import cats.effect.Concurrent import cats.effect.concurrent.Ref import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.CompositeViewsFixture import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeView.{Interval, RebuildStrategy} @@ -11,7 +12,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes.FilterDeprecated -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{NoopSink, RemainingElems, Source} +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{NoopSink, RemainingElems, Source, SourceF} import ch.epfl.bluebrain.nexus.testkit.bio.{BioSuite, PatienceConfig} import fs2.Stream import monix.bio.{Task, UIO} @@ -27,11 +28,13 @@ class CompositeViewDefSuite extends BioSuite with CompositeViewsFixture { test("Compile correctly the source") { - def makeSource(nameValue: String): Source = new Source { + def makeSource(nameValue: String): Source = new SourceF[Task] { override type Out = Unit override def outType: Typeable[Unit] = Typeable[Unit] override def apply(offset: Offset): ElemStream[Unit] = Stream.empty[Task] override def name: String = nameValue + + implicit override def concurrent: Concurrent[Task] = monix.bio.IO.catsAsync } val graphStream = new CompositeGraphStream { diff --git a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/migration/MigrateCompositeViewsSuite.scala b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/migration/MigrateCompositeViewsSuite.scala index ba57456b0f..79b7216dc5 100644 --- a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/migration/MigrateCompositeViewsSuite.scala +++ b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/migration/MigrateCompositeViewsSuite.scala @@ -9,18 +9,19 @@ import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewE import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.{CompositeViewEvent, CompositeViewState, CompositeViewValue} import ch.epfl.bluebrain.nexus.delta.rdf.syntax.iriStringContextSyntax import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, Tag} -import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag -import doobie.postgres.implicits._ +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, Tag} import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie import ch.epfl.bluebrain.nexus.testkit.bio.BioSuite +import doobie.postgres.implicits._ +import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ + import doobie.Get -import munit.{AnyFixture, Location} import doobie.implicits._ -import io.circe.syntax.EncoderOps import io.circe.JsonObject +import io.circe.syntax.EncoderOps import monix.bio.IO +import munit.{AnyFixture, Location} import java.time.Instant diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchScopeInitialization.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchScopeInitialization.scala index 7014802a86..0dff2ea8a2 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchScopeInitialization.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchScopeInitialization.scala @@ -1,7 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch import cats.effect.IO -import cats.implicits._ import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent import ch.epfl.bluebrain.nexus.delta.kernel.syntax._ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchViews.entityType diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/IndexingViewDefSuite.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/IndexingViewDefSuite.scala index 1bc11746e2..2bd12b39c6 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/IndexingViewDefSuite.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/IndexingViewDefSuite.scala @@ -76,7 +76,7 @@ class IndexingViewDefSuite extends BioSuite with CirceLiteral with Fixtures { ) private val aggregate = AggregateElasticSearchViewValue(NonEmptySet.of(viewRef)) - private val sink = CacheSink.states[Json] + private val sink = CacheSink2.states[Json] private val indexingRev = IndexingRev.init private val rev = 2 @@ -212,7 +212,7 @@ class IndexingViewDefSuite extends BioSuite with CirceLiteral with Fixtures { for { compiled <- IndexingViewDef.compile( v, - _ => Operation.merge(FilterDeprecated.withConfig(()), FilterByType.withConfig(filterByTypeConfig)), + _ => OperationF.merge(FilterDeprecated.withConfig(()), FilterByType.withConfig(filterByTypeConfig)), GraphResourceStream.unsafeFromStream(PullRequestStream.generate(projectRef)), sink ) diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/metrics/EventMetricsProjectionSuite.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/metrics/EventMetricsProjectionSuite.scala index 81ac060e88..46eaed712e 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/metrics/EventMetricsProjectionSuite.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/metrics/EventMetricsProjectionSuite.scala @@ -3,7 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.metrics import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.metrics.MetricsStream._ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.{EventMetricsProjection, Fixtures} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{CacheSink, ProjectionProgress, SupervisorSetup} +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{CacheSink2, ProjectionProgress, SupervisorSetup} import ch.epfl.bluebrain.nexus.testkit.bio.{BioSuite, PatienceConfig} import io.circe.Json import io.circe.syntax.EncoderOps @@ -20,7 +20,7 @@ class EventMetricsProjectionSuite extends BioSuite with SupervisorSetup.Fixture implicit private val patienceConfig: PatienceConfig = PatienceConfig(2.seconds, 10.millis) private lazy val sv = supervisor().supervisor - private val sink = CacheSink.events[Json] + private val sink = CacheSink2.events[Json] test("Start the metrics projection") { for { diff --git a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalytics.scala b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalytics.scala index f765fca396..0b9f8a5d62 100644 --- a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalytics.scala +++ b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalytics.scala @@ -17,19 +17,21 @@ import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import io.circe.{Decoder, JsonObject} -import monix.bio.IO +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ +import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectContext trait GraphAnalytics { /** * The relationship statistics between different types for the passed ''projectRef'' */ - def relationships(projectRef: ProjectRef): IO[GraphAnalyticsRejection, AnalyticsGraph] + def relationships(projectRef: ProjectRef): IO[AnalyticsGraph] /** * The properties statistics of the passed ''project'' and type ''tpe''. */ - def properties(projectRef: ProjectRef, tpe: IdSegment): IO[GraphAnalyticsRejection, PropertiesStatistics] + def properties(projectRef: ProjectRef, tpe: IdSegment): IO[PropertiesStatistics] } @@ -43,14 +45,16 @@ object GraphAnalytics { ): GraphAnalytics = new GraphAnalytics { - private val expandIri: ExpandIri[InvalidPropertyType] = new ExpandIri(InvalidPropertyType.apply) + private val iriExpander: ExpandIri[InvalidPropertyType] = new ExpandIri(InvalidPropertyType.apply) - private def propertiesAggQueryFor(tpe: Iri) = + private def propertiesAggQueryFor(tpe: Iri): IO[JsonObject] = propertiesAggQuery(config).map(_.replace("@type" -> "{{type}}", tpe)) - override def relationships(projectRef: ProjectRef): IO[GraphAnalyticsRejection, AnalyticsGraph] = + private def fetchProjectContext(ref: ProjectRef): IO[ProjectContext] = fetchContext.onRead(ref) + + override def relationships(projectRef: ProjectRef): IO[AnalyticsGraph] = for { - _ <- fetchContext.onRead(projectRef) + _ <- fetchProjectContext(projectRef) query <- relationshipsAggQuery(config) stats <- client .searchAs[AnalyticsGraph](QueryBuilder(query), index(prefix, projectRef).value, Query.Empty) @@ -60,18 +64,20 @@ object GraphAnalytics { override def properties( projectRef: ProjectRef, tpe: IdSegment - ): IO[GraphAnalyticsRejection, PropertiesStatistics] = { + ): IO[PropertiesStatistics] = { - def search(tpe: Iri, idx: IndexLabel, query: JsonObject) = { + def search(tpe: Iri, idx: IndexLabel, query: JsonObject): IO[PropertiesStatistics] = { implicit val d: Decoder[PropertiesStatistics] = propertiesDecoderFromEsAggregations(tpe) client .searchAs[PropertiesStatistics](QueryBuilder(query).withTotalHits(true), idx.value, Query.Empty) .mapError(err => WrappedElasticSearchRejection(WrappedElasticSearchClientError(err))) } + def expandIri(pc: ProjectContext): IO[Iri] = iriExpander.apply(tpe, pc) + for { - pc <- fetchContext.onRead(projectRef) - tpeIri <- expandIri(tpe, pc) + pc <- fetchProjectContext(projectRef) + tpeIri <- expandIri(pc) query <- propertiesAggQueryFor(tpeIri) stats <- search(tpeIri, index(prefix, projectRef), query) } yield stats diff --git a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsCoordinator.scala b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsCoordinator.scala index 8ee9dc3790..a48fd4dffe 100644 --- a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsCoordinator.scala +++ b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsCoordinator.scala @@ -1,19 +1,22 @@ package ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics +import cats.effect.IO import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.Logger +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.MigrateEffectSyntax import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.GraphAnalytics.{index, projectionName} import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.config.GraphAnalyticsConfig import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.indexing.{graphAnalyticsMappings, scriptContent, updateRelationshipsScriptId, GraphAnalyticsSink, GraphAnalyticsStream} import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv import ch.epfl.bluebrain.nexus.delta.sdk.projects.Projects -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.SinkCatsEffect import ch.epfl.bluebrain.nexus.delta.sourcing.stream._ import fs2.Stream -import monix.bio.Task +import org.typelevel.log4cats.{Logger => Log4CatsLogger} sealed trait GraphAnalyticsCoordinator @@ -21,7 +24,7 @@ object GraphAnalyticsCoordinator { /** If indexing is disabled we can only log */ final private case object Noop extends GraphAnalyticsCoordinator { - def log: Task[Unit] = + def log: IO[Unit] = logger.info("Graph Analytics indexing has been disabled via config") } @@ -41,73 +44,70 @@ object GraphAnalyticsCoordinator { * how to delete an index */ final private class Active( - fetchProjects: Offset => ElemStream[ProjectDef], + fetchProjects: Offset => Stream[IO, Elem[ProjectDef]], analyticsStream: GraphAnalyticsStream, supervisor: Supervisor, - sink: ProjectRef => Sink, - createIndex: ProjectRef => Task[Unit], - deleteIndex: ProjectRef => Task[Unit] - ) extends GraphAnalyticsCoordinator { + sink: ProjectRef => SinkCatsEffect, + createIndex: ProjectRef => IO[Unit], + deleteIndex: ProjectRef => IO[Unit] + ) extends GraphAnalyticsCoordinator + with MigrateEffectSyntax { - def run(offset: Offset): Stream[Task, Elem[Unit]] = + def run(offset: Offset): Stream[IO, Elem[Unit]] = fetchProjects(offset).evalMap { _.traverse { case p if p.markedForDeletion => destroy(p.ref) case p => start(p.ref) } } - - private def compile(project: ProjectRef): Task[CompiledProjection] = - Task.fromEither( + private def compile(project: ProjectRef): IO[CompiledProjection] = + IO.fromEither( CompiledProjection.compile( analyticsMetadata(project), ExecutionStrategy.PersistentSingleNode, - Source(analyticsStream(project, _)), - sink(project) + Source(analyticsStream(project, _).translate(ioToTaskK)), + sink(project).mapK(ioToTaskK) ) ) - // Start the analysis projection for the given project - private def start(project: ProjectRef): Task[Unit] = + private def start(project: ProjectRef): IO[Unit] = for { compiled <- compile(project) - status <- supervisor.describe(compiled.metadata.name) + status <- migration.toCatsIO(supervisor.describe(compiled.metadata.name)) _ <- status match { case Some(value) if value.status == ExecutionStatus.Running => logger.info(s"Graph analysis of '$project' is already running.") - case _ => - logger.info(s"Starting graph analysis of '$project'...") >> - supervisor.run( - compiled, - createIndex(project) - ) + case _ => startGraphAnalysis(compiled, project) } } yield () + private def startGraphAnalysis(compiled: CompiledProjection, project: ProjectRef): IO[ExecutionStatus] = + for { + _ <- logger.info(s"Starting graph analysis of '$project'...") + status <- migration.toCatsIO(supervisor.run(compiled, migration.toMonixBIO(createIndex(project)))) + } yield status // Destroy the analysis for the given project and deletes the related Elasticsearch index - private def destroy(project: ProjectRef): Task[Unit] = { + private def destroy(project: ProjectRef): IO[Unit] = { logger.info(s"Project '$project' has been marked as deleted, stopping the graph analysis...") >> - supervisor - .destroy( - projectionName(project), - deleteIndex(project) - ) - .void + migration.toCatsIO( + supervisor + .destroy( + projectionName(project), + migration.toMonixBIO(deleteIndex(project)) + ) + .void + ) } - } - final val id = nxv + "graph-analytics" - private val logger: Logger = Logger[GraphAnalyticsCoordinator] + private val logger: Log4CatsLogger[IO] = Logger.cats[GraphAnalyticsCoordinator] private[analytics] val metadata: ProjectionMetadata = ProjectionMetadata("system", "ga-coordinator", None, None) - - private def analyticsMetadata(project: ProjectRef) = ProjectionMetadata( + private def analyticsMetadata(project: ProjectRef) = ProjectionMetadata( "ga", projectionName(project), Some(project), Some(id) ) - private[analytics] case class ProjectDef(ref: ProjectRef, markedForDeletion: Boolean) /** @@ -129,10 +129,13 @@ object GraphAnalyticsCoordinator { supervisor: Supervisor, client: ElasticSearchClient, config: GraphAnalyticsConfig - ): Task[GraphAnalyticsCoordinator] = + ): IO[GraphAnalyticsCoordinator] = if (config.indexingEnabled) { val coordinator = apply( - projects.states(_).map(_.map { p => ProjectDef(p.project, p.markedForDeletion) }), + projects + .states(_) + .map(_.map { p => ProjectDef(p.project, p.markedForDeletion) }) + .translate(migration.taskToIoK), analyticsStream, supervisor, ref => @@ -144,39 +147,39 @@ object GraphAnalyticsCoordinator { ), ref => graphAnalyticsMappings.flatMap { mappings => - client.createIndex(index(config.prefix, ref), Some(mappings), None) + migration.toCatsIO(client.createIndex(index(config.prefix, ref), Some(mappings), None)) }.void, - ref => client.deleteIndex(index(config.prefix, ref)).void + ref => migration.toCatsIO(client.deleteIndex(index(config.prefix, ref)).void) ) for { script <- scriptContent - _ <- client.createScript(updateRelationshipsScriptId, script) + _ <- migration.toCatsIO(client.createScript(updateRelationshipsScriptId, script)) c <- coordinator } yield c } else { Noop.log.as(Noop) } - private[analytics] def apply( - fetchProjects: Offset => ElemStream[ProjectDef], + fetchProjects: Offset => Stream[IO, Elem[ProjectDef]], analyticsStream: GraphAnalyticsStream, supervisor: Supervisor, - sink: ProjectRef => Sink, - createIndex: ProjectRef => Task[Unit], - deleteIndex: ProjectRef => Task[Unit] - ): Task[GraphAnalyticsCoordinator] = { + sink: ProjectRef => SinkCatsEffect, + createIndex: ProjectRef => IO[Unit], + deleteIndex: ProjectRef => IO[Unit] + ): IO[GraphAnalyticsCoordinator] = { val coordinator = new Active(fetchProjects, analyticsStream, supervisor, sink, createIndex, deleteIndex) - supervisor - .run( - CompiledProjection.fromStream( - metadata, - ExecutionStrategy.EveryNode, - coordinator.run + migration.toCatsIO( + supervisor + .run( + CompiledProjection.fromStream( + metadata, + ExecutionStrategy.EveryNode, + coordinator.run(_).translate(migration.ioToTaskK) + ) ) - ) - .as(coordinator) + .as(coordinator) + ) } - } diff --git a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsPlugin.scala b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsPlugin.scala deleted file mode 100644 index 58b6cf0c67..0000000000 --- a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsPlugin.scala +++ /dev/null @@ -1,9 +0,0 @@ -package ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics - -import ch.epfl.bluebrain.nexus.delta.sdk.plugin.Plugin -import monix.bio.Task - -object GraphAnalyticsPlugin extends Plugin { - - override def stop(): Task[Unit] = Task.unit -} diff --git a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsPluginDef.scala b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsPluginDef.scala deleted file mode 100644 index 189ae589b3..0000000000 --- a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsPluginDef.scala +++ /dev/null @@ -1,18 +0,0 @@ -package ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics - -import ch.epfl.bluebrain.nexus.delta.sdk.model.ComponentDescription.PluginDescription -import ch.epfl.bluebrain.nexus.delta.sdk.model.Name -import ch.epfl.bluebrain.nexus.delta.sdk.plugin.{Plugin, PluginDef} -import izumi.distage.model.Locator -import izumi.distage.model.definition.ModuleDef -import monix.bio.Task - -class GraphAnalyticsPluginDef extends PluginDef { - - override def module: ModuleDef = new GraphAnalyticsPluginModule(priority) - - override val info: PluginDescription = PluginDescription(Name.unsafe("graph-analytics"), BuildInfo.version) - - override def initialize(locator: Locator): Task[Plugin] = Task.pure(GraphAnalyticsPlugin) - -} diff --git a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsPluginModule.scala b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsPluginModule.scala index a67ff9ae24..f22e612edd 100644 --- a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsPluginModule.scala +++ b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsPluginModule.scala @@ -21,7 +21,6 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.projections.Projections import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Supervisor import izumi.distage.model.definition.{Id, ModuleDef} -import monix.execution.Scheduler /** * Graph analytics plugin wiring. @@ -66,7 +65,6 @@ class GraphAnalyticsPluginModule(priority: Int) extends ModuleDef { projections: Projections, schemeDirectives: DeltaSchemeDirectives, baseUri: BaseUri, - s: Scheduler, cr: RemoteContextResolution @Id("aggregate"), ordering: JsonKeyOrdering, viewsQuery: GraphAnalyticsViewsQuery @@ -80,7 +78,6 @@ class GraphAnalyticsPluginModule(priority: Int) extends ModuleDef { viewsQuery )( baseUri, - s, cr, ordering ) diff --git a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsViewsQuery.scala b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsViewsQuery.scala index 5e45bb7c04..c9a97eb7e6 100644 --- a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsViewsQuery.scala +++ b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsViewsQuery.scala @@ -1,13 +1,13 @@ package ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics import akka.http.scaladsl.model.Uri +import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toCatsIO import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient -import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewRejection import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewRejection.WrappedElasticSearchClientError import ch.epfl.bluebrain.nexus.delta.sdk.model.search.SortList import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import io.circe.{Json, JsonObject} -import monix.bio.IO /** Allows to perform elasticsearch queries on Graph Analytics views */ trait GraphAnalyticsViewsQuery { @@ -21,7 +21,7 @@ trait GraphAnalyticsViewsQuery { * @param qp * the extra query parameters for the elasticsearch index */ - def query(projectRef: ProjectRef, query: JsonObject, qp: Uri.Query): IO[ElasticSearchViewRejection, Json] + def query(projectRef: ProjectRef, query: JsonObject, qp: Uri.Query): IO[Json] } /** @@ -32,7 +32,7 @@ trait GraphAnalyticsViewsQuery { * elasticsearch client */ class GraphAnalyticsViewsQueryImpl(prefix: String, client: ElasticSearchClient) extends GraphAnalyticsViewsQuery { - override def query(projectRef: ProjectRef, query: JsonObject, qp: Uri.Query): IO[ElasticSearchViewRejection, Json] = { + override def query(projectRef: ProjectRef, query: JsonObject, qp: Uri.Query): IO[Json] = { val index = GraphAnalytics.index(prefix, projectRef) client.search(query, Set(index.value), qp)(SortList.empty).mapError(WrappedElasticSearchClientError) } diff --git a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsSink.scala b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsSink.scala index d3897da345..300d883612 100644 --- a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsSink.scala +++ b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsSink.scala @@ -1,5 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.indexing +import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toCatsIO import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent import ch.epfl.bluebrain.nexus.delta.kernel.syntax.kamonSyntax import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh @@ -9,12 +11,11 @@ import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.indexing.GraphAnaly import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem} -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.SinkCatsEffect import fs2.Chunk import io.circe.JsonObject import io.circe.literal._ import io.circe.syntax.EncoderOps -import monix.bio.Task import shapeless.Typeable import scala.concurrent.duration.FiniteDuration @@ -35,7 +36,7 @@ final class GraphAnalyticsSink( override val chunkSize: Int, override val maxWindow: FiniteDuration, index: IndexLabel -) extends Sink { +) extends SinkCatsEffect { implicit private val kamonComponent: KamonMetricComponent = KamonMetricComponent("graph-analytics") @@ -70,7 +71,8 @@ final class GraphAnalyticsSink( private def documentId[A](elem: Elem[A]) = elem.id.toString - override def apply(elements: Chunk[Elem[GraphAnalyticsResult]]): Task[Chunk[Elem[Unit]]] = { + // TODO: depends on Operation.Sink in the sourcing module being moved to CE + override def apply(elements: Chunk[Elem[GraphAnalyticsResult]]): IO[Chunk[Elem[Unit]]] = { val result = elements.foldLeft(GraphAnalyticsSink.empty) { case (acc, success: SuccessElem[GraphAnalyticsResult]) => success.value match { @@ -85,8 +87,10 @@ final class GraphAnalyticsSink( case (acc, _: FailedElem) => acc } - client.bulk(result.bulk, Refresh.True).map(ElasticSearchSink.markElems(_, elements, documentId)) <* - client.updateByQuery(relationshipsQuery(result.updates), Set(index.value)) + toCatsIO(for { + elems <- client.bulk(result.bulk, Refresh.True).map(ElasticSearchSink.markElems(_, elements, documentId)) + _ <- client.updateByQuery(relationshipsQuery(result.updates), Set(index.value)) + } yield elems) }.span("graphAnalyticsSink") } diff --git a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsStream.scala b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsStream.scala index 04dcabee64..49476c0f9a 100644 --- a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsStream.scala +++ b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsStream.scala @@ -12,13 +12,14 @@ import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.ResourceState import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, EntityType, ProjectRef, Tag} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStreamCats, EntityType, ProjectRef, Tag} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.query.{SelectFilter, StreamingQuery} +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import doobie._ import doobie.implicits._ import io.circe.Json -import monix.bio.{Task, UIO} +import cats.effect.IO trait GraphAnalyticsStream { @@ -29,7 +30,7 @@ trait GraphAnalyticsStream { * @param start * the offset to start with */ - def apply(project: ProjectRef, start: Offset): ElemStream[GraphAnalyticsResult] + def apply(project: ProjectRef, start: Offset): ElemStreamCats[GraphAnalyticsResult] } @@ -40,7 +41,7 @@ object GraphAnalyticsStream { /** * We look for a resource with these ids in this project and return their types if it can be found */ - private def query(project: ProjectRef, xas: Transactors)(nel: NonEmptyList[Iri]): UIO[Map[Iri, Set[Iri]]] = { + private def query(project: ProjectRef, xas: Transactors)(nel: NonEmptyList[Iri]): IO[Map[Iri, Set[Iri]]] = { val inIds = Fragments.in(fr"id", nel) sql""" | SELECT id, value->'types' @@ -52,7 +53,7 @@ object GraphAnalyticsStream { |""".stripMargin .query[(Iri, Option[Json])] .to[List] - .transact(xas.streaming) + .transact(xas.streamingCE) .map { l => l.foldLeft(emptyMapType) { case (acc, (id, json)) => val types = json.flatMap(_.as[Set[Iri]].toOption).getOrElse(Set.empty) @@ -66,9 +67,9 @@ object GraphAnalyticsStream { */ private[indexing] def findRelationships(project: ProjectRef, xas: Transactors, batchSize: Int)( ids: Set[Iri] - ): UIO[Map[Iri, Set[Iri]]] = { + ): IO[Map[Iri, Set[Iri]]] = { val groupIds = NonEmptyList.fromList(ids.toList).map(_.grouped(batchSize).toList) - val noop = UIO.pure(emptyMapType) + val noop = IO.pure(emptyMapType) groupIds.fold(noop) { list => list.foldLeftM(emptyMapType) { case (acc, l) => query(project, xas)(l).map(_ ++ acc) @@ -84,40 +85,40 @@ object GraphAnalyticsStream { val relationshipBatch = 500 // Decode the json payloads to [[GraphAnalyticsResult]] We only care for resources and files - def decode(entityType: EntityType, json: Json): Task[GraphAnalyticsResult] = + def decode(entityType: EntityType, json: Json): IO[GraphAnalyticsResult] = entityType match { case Files.entityType => - Task.fromEither(FileState.serializer.codec.decodeJson(json)).map { s => + IO.fromEither(FileState.serializer.codec.decodeJson(json)).map { s => UpdateByQuery(s.id, s.types) } case Resources.entityType => - Task.fromEither(ResourceState.serializer.codec.decodeJson(json)).flatMap { - case state if state.deprecated => deprecatedIndex(state) + IO.fromEither(ResourceState.serializer.codec.decodeJson(json)).flatMap { + case state if state.deprecated => IO.pure(deprecatedIndex(state)) case state => JsonLdDocument.fromExpanded(state.expanded, findRelationships(project, xas, relationshipBatch)).map { doc => activeIndex(state, doc) } } - case _ => Task.pure(Noop) + case _ => IO.pure(Noop) } - StreamingQuery.elems(project, start, SelectFilter.latest, qc, xas, decode) + StreamingQuery + .elems(project, start, SelectFilter.latest, qc, xas, (a, b) => ioToTaskK.apply(decode(a, b))) + .translate(taskToIoK) } // $COVERAGE-ON$ private def deprecatedIndex(state: ResourceState) = - Task.pure( - Index.deprecated( - state.project, - state.id, - state.remoteContexts, - state.rev, - state.types, - state.createdAt, - state.createdBy, - state.updatedAt, - state.updatedBy - ) + Index.deprecated( + state.project, + state.id, + state.remoteContexts, + state.rev, + state.types, + state.createdAt, + state.createdBy, + state.updatedAt, + state.updatedBy ) private def activeIndex(state: ResourceState, doc: JsonLdDocument) = diff --git a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsView.scala b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsView.scala index 16939df0bf..735f64b5f0 100644 --- a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsView.scala +++ b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsView.scala @@ -1,10 +1,10 @@ package ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.indexing -import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceUtils.ioJsonObjectContentOf +import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.kernel.utils.CatsEffectsClasspathResourceUtils.ioJsonObjectContentOf import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ import com.typesafe.scalalogging.Logger import io.circe.JsonObject -import monix.bio.UIO /** * A graph analytics view information @@ -22,8 +22,8 @@ object GraphAnalyticsView { private val mappings = ioJsonObjectContentOf("elasticsearch/mappings.json") - val default: UIO[GraphAnalyticsView] = mappings + // TODODODODODO Sort out memoization + val default: IO[GraphAnalyticsView] = mappings .map(GraphAnalyticsView(_)) .logAndDiscardErrors("loading graph analytics mapping") - .memoize } diff --git a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/package.scala b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/package.scala index 1c609d4ff5..ea6224c028 100644 --- a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/package.scala +++ b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/package.scala @@ -1,9 +1,11 @@ package ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics -import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceUtils.{ioContentOf, ioJsonObjectContentOf} +import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.kernel.utils.CatsEffectsClasspathResourceUtils.{ioContentOf, ioJsonObjectContentOf} import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.config.GraphAnalyticsConfig.TermAggregationsConfig import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ import com.typesafe.scalalogging.Logger +import io.circe.JsonObject package object indexing { @@ -12,21 +14,21 @@ package object indexing { val updateRelationshipsScriptId = "updateRelationships" - val scriptContent = + val scriptContent: IO[String] = ioContentOf("elasticsearch/update_relationships_script.painless") .logAndDiscardErrors("ElasticSearch script 'update_relationships_script.painless' template not found") - val graphAnalyticsMappings = ioJsonObjectContentOf("elasticsearch/mappings.json") + val graphAnalyticsMappings: IO[JsonObject] = ioJsonObjectContentOf("elasticsearch/mappings.json") .logAndDiscardErrors("ElasticSearch mapping 'mappings.json' template not found") - def propertiesAggQuery(config: TermAggregationsConfig) = ioJsonObjectContentOf( + def propertiesAggQuery(config: TermAggregationsConfig): IO[JsonObject] = ioJsonObjectContentOf( "elasticsearch/paths-properties-aggregations.json", "shard_size" -> config.shardSize, "size" -> config.size, "type" -> "{{type}}" ).logAndDiscardErrors("ElasticSearch 'paths-properties-aggregations.json' template not found") - def relationshipsAggQuery(config: TermAggregationsConfig) = + def relationshipsAggQuery(config: TermAggregationsConfig): IO[JsonObject] = ioJsonObjectContentOf( "elasticsearch/paths-relationships-aggregations.json", "shard_size" -> config.shardSize, diff --git a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/model/GraphAnalyticsRejection.scala b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/model/GraphAnalyticsRejection.scala index dd98cba4ee..979bf092c0 100644 --- a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/model/GraphAnalyticsRejection.scala +++ b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/model/GraphAnalyticsRejection.scala @@ -12,6 +12,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext.ContextRejection import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ import io.circe.syntax._ import io.circe.{Encoder, JsonObject} +import ch.epfl.bluebrain.nexus.delta.sourcing.rejection.Rejection /** * Enumeration of graph analytics rejection types. @@ -19,7 +20,7 @@ import io.circe.{Encoder, JsonObject} * @param reason * a descriptive message as to why the rejection occurred */ -sealed abstract class GraphAnalyticsRejection(val reason: String) extends Product with Serializable +sealed abstract class GraphAnalyticsRejection(val reason: String) extends Rejection object GraphAnalyticsRejection { diff --git a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/model/JsonLdDocument.scala b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/model/JsonLdDocument.scala index 5789433c60..49ad4d862c 100644 --- a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/model/JsonLdDocument.scala +++ b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/model/JsonLdDocument.scala @@ -1,6 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.model import cats.Monoid +import cats.effect.IO import cats.implicits._ import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.model.JsonLdDocument.Reference import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.model.JsonLdEntry.ObjectEntry @@ -11,7 +12,6 @@ import io.circe.generic.extras.Configuration import io.circe.generic.extras.semiauto.deriveConfiguredEncoder import io.circe.syntax.EncoderOps import io.circe.{Encoder, Json, JsonObject} -import monix.bio.UIO import scala.annotation.nowarn @@ -55,8 +55,8 @@ object JsonLdDocument { */ def fromExpanded( expanded: ExpandedJsonLd, - findRelationships: Set[Iri] => UIO[Map[Iri, Set[Iri]]] - ): UIO[JsonLdDocument] = { + findRelationships: Set[Iri] => IO[Map[Iri, Set[Iri]]] + ): IO[JsonLdDocument] = { def innerEntry(json: Json, path: Vector[Iri], isInArray: Boolean): JsonLdDocument = { diff --git a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/routes/GraphAnalyticsRoutes.scala b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/routes/GraphAnalyticsRoutes.scala index 283dd0b5ef..7afc651798 100644 --- a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/routes/GraphAnalyticsRoutes.scala +++ b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/routes/GraphAnalyticsRoutes.scala @@ -2,14 +2,16 @@ package ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.routes import akka.http.scaladsl.server.Directives.{as, concat, entity, get, pathEndOrSingleSlash, pathPrefix, post} import akka.http.scaladsl.server.Route +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.routes.ElasticSearchViewsDirectives.extractQueryParams import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.permissions.query import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.{GraphAnalytics, GraphAnalyticsViewsQuery} import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck +import ch.epfl.bluebrain.nexus.delta.sdk.ce.DeltaDirectives.emit import ch.epfl.bluebrain.nexus.delta.sdk.circe.CirceUnmarshalling -import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives.{baseUriPrefix, emit, idSegment} +import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives.{baseUriPrefix, idSegment} import ch.epfl.bluebrain.nexus.delta.sdk.directives.{AuthDirectives, DeltaSchemeDirectives} import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.RdfMarshalling @@ -18,8 +20,6 @@ import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.resources.{read import ch.epfl.bluebrain.nexus.delta.sourcing.ProgressStatistics import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import io.circe.JsonObject -import monix.bio.UIO -import monix.execution.Scheduler /** * The graph analytics routes. @@ -39,10 +39,10 @@ class GraphAnalyticsRoutes( identities: Identities, aclCheck: AclCheck, graphAnalytics: GraphAnalytics, - fetchStatistics: ProjectRef => UIO[ProgressStatistics], + fetchStatistics: ProjectRef => IO[ProgressStatistics], schemeDirectives: DeltaSchemeDirectives, viewsQuery: GraphAnalyticsViewsQuery -)(implicit baseUri: BaseUri, s: Scheduler, cr: RemoteContextResolution, ordering: JsonKeyOrdering) +)(implicit baseUri: BaseUri, cr: RemoteContextResolution, ordering: JsonKeyOrdering) extends AuthDirectives(identities, aclCheck) with CirceUnmarshalling with RdfMarshalling { diff --git a/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsCoordinatorSuite.scala b/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsCoordinatorSuite.scala index 2b290e2a07..06228246ed 100644 --- a/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsCoordinatorSuite.scala +++ b/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsCoordinatorSuite.scala @@ -1,28 +1,33 @@ package ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics +import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.GraphAnalyticsCoordinator.ProjectDef import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.indexing.GraphAnalyticsResult.Noop import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.indexing.{GraphAnalyticsResult, GraphAnalyticsStream} import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv import ch.epfl.bluebrain.nexus.delta.sdk.projects.Projects -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem} import ch.epfl.bluebrain.nexus.delta.sourcing.stream.SupervisorSetup.unapply -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{CacheSink, Elem, ExecutionStatus, ProjectionProgress, SupervisorSetup} +import ch.epfl.bluebrain.nexus.delta.sourcing.stream._ import ch.epfl.bluebrain.nexus.testkit.bio.{BioSuite, PatienceConfig} -import munit.AnyFixture +import ch.epfl.bluebrain.nexus.testkit.ce.CatsEffectSuite import fs2.Stream import fs2.concurrent.SignallingRef -import monix.bio.Task +import munit.AnyFixture import java.time.Instant -import collection.mutable.{Set => MutableSet} -import concurrent.duration._ +import scala.collection.mutable.{Set => MutableSet} +import scala.concurrent.duration._ class GraphAnalyticsCoordinatorSuite extends BioSuite with SupervisorSetup.Fixture { + private lazy val catsEffectSuite = new CatsEffectSuite {} + import catsEffectSuite._ + override def munitFixtures: Seq[AnyFixture[_]] = List(supervisor) implicit private val patienceConfig: PatienceConfig = PatienceConfig(5.seconds, 10.millis) @@ -43,14 +48,14 @@ class GraphAnalyticsCoordinatorSuite extends BioSuite with SupervisorSetup.Fixtu private def failed[A](ref: ProjectRef, id: Iri, error: Throwable, offset: Long): Elem[A] = FailedElem(tpe = Projects.entityType, id, Some(ref), Instant.EPOCH, Offset.at(offset), error, 1) - private val resumeSignal = SignallingRef[Task, Boolean](false).runSyncUnsafe() + private val resumeSignal = SignallingRef[IO, Boolean](false).unsafeRunSync() // Stream 2 elements until signal is set to true and then 2 more - private def projectStream: ElemStream[ProjectDef] = + private def projectStream: Stream[IO, Elem[ProjectDef]] = Stream( success(project1, project1Id, ProjectDef(project1, markedForDeletion = false), 1L), success(project2, project2Id, ProjectDef(project2, markedForDeletion = false), 2L) - ) ++ Stream.never[Task].interruptWhen(resumeSignal) ++ + ) ++ Stream.never[IO].interruptWhen(resumeSignal) ++ Stream( success(project1, project1Id, ProjectDef(project1, markedForDeletion = false), 3L), success(project2, project2Id, ProjectDef(project2, markedForDeletion = true), 4L) @@ -86,8 +91,8 @@ class GraphAnalyticsCoordinatorSuite extends BioSuite with SupervisorSetup.Fixtu graphAnalysisStream, sv, _ => sink, - (ref: ProjectRef) => Task.pure(createdIndices.add(ref)).void, - (ref: ProjectRef) => Task.pure(deletedIndices.add(ref)).void + (ref: ProjectRef) => IO.pure(createdIndices.add(ref)).void, + (ref: ProjectRef) => IO.pure(deletedIndices.add(ref)).void ) _ <- sv.describe(GraphAnalyticsCoordinator.metadata.name) .map(_.map(_.progress)) diff --git a/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsSpec.scala b/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsSpec.scala index b21b01d45f..e9a7c221e0 100644 --- a/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsSpec.scala +++ b/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsSpec.scala @@ -15,6 +15,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.generators.ProjectGen import ch.epfl.bluebrain.nexus.delta.sdk.http.{HttpClient, HttpClientConfig, HttpClientWorthRetry} import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContextDummy import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ +import ch.epfl.bluebrain.nexus.testkit.ce.CatsIOValues import ch.epfl.bluebrain.nexus.testkit.elasticsearch.ElasticSearchContainer._ import ch.epfl.bluebrain.nexus.testkit.elasticsearch.ElasticSearchDocker import ch.epfl.bluebrain.nexus.testkit.{IOFixedClock, IOValues, TestHelpers} @@ -34,6 +35,7 @@ class GraphAnalyticsSpec(docker: ElasticSearchDocker) with AnyWordSpecLike with Matchers with TestHelpers + with CatsIOValues with IOValues with IOFixedClock with ConfigFixtures diff --git a/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/JsonLdDocumentSpec.scala b/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/JsonLdDocumentSpec.scala index 400fec5d4f..27b224e348 100644 --- a/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/JsonLdDocumentSpec.scala +++ b/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/JsonLdDocumentSpec.scala @@ -1,12 +1,14 @@ package ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toCatsIO import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.model.JsonLdDocument import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.nxvFile import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.ExpandedJsonLd import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi} import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ -import ch.epfl.bluebrain.nexus.testkit.{CirceEq, IOValues, TestHelpers} +import ch.epfl.bluebrain.nexus.testkit.ce.CatsIOValues +import ch.epfl.bluebrain.nexus.testkit.{CirceEq, TestHelpers} import io.circe.syntax.EncoderOps import monix.bio.UIO import org.scalatest.OptionValues @@ -17,14 +19,14 @@ class JsonLdDocumentSpec extends AnyWordSpecLike with Matchers with TestHelpers - with IOValues + with CatsIOValues with OptionValues with ContextFixtures with CirceEq { "A JsonLdDocument" should { implicit val jsonLdApi: JsonLdApi = JsonLdJavaApi.lenient val input = jsonContentOf("reconstructed-cell.json") - val expanded = ExpandedJsonLd(input).accepted + val expanded = toCatsIO(ExpandedJsonLd(input)).accepted "be generated from expanded Json resource" in { val nodeRef1 = iri"http://api.brain-map.org/api/v2/data/Structure/733" diff --git a/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsSinkSuite.scala b/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsSinkSuite.scala index f85565f2d9..ab0fa173e9 100644 --- a/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsSinkSuite.scala +++ b/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsSinkSuite.scala @@ -1,5 +1,8 @@ package ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.indexing +import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toCatsIO +import ch.epfl.bluebrain.nexus.delta.kernel.utils.CatsEffectsClasspathResourceUtils.ioJsonContentOf import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchClientSetup import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.IndexLabel import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.indexing.GraphAnalyticsResult.Index @@ -14,11 +17,11 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Anonymous import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{FailedElem, SuccessElem} +import ch.epfl.bluebrain.nexus.testkit.CirceLiteral import ch.epfl.bluebrain.nexus.testkit.bio.{BioSuite, PatienceConfig} -import ch.epfl.bluebrain.nexus.testkit.{CirceLiteral, TestHelpers} +import ch.epfl.bluebrain.nexus.testkit.ce.CatsEffectSuite import fs2.Chunk import io.circe.Json -import monix.bio.{Task, UIO} import munit.AnyFixture import java.time.Instant @@ -26,9 +29,10 @@ import scala.concurrent.duration._ class GraphAnalyticsSinkSuite extends BioSuite + with CatsEffectSuite with ElasticSearchClientSetup.Fixture - with CirceLiteral - with TestHelpers { + with CirceLiteral { + override val ioTimeout: FiniteDuration = 45.seconds implicit private val patienceConfig: PatienceConfig = PatienceConfig(5.seconds, 50.millis) @@ -65,21 +69,19 @@ class GraphAnalyticsSinkSuite // File linked by 'resource1', resolved after an update by query private val file1 = iri"http://localhost/file1" - private def loadExpanded(path: String): UIO[ExpandedJsonLd] = + private def loadExpanded(path: String): IO[ExpandedJsonLd] = ioJsonContentOf(path) .flatMap { json => - Task.fromEither(ExpandedJsonLd.expanded(json)) + IO.fromEither(ExpandedJsonLd.expanded(json)) } - .memoizeOnSuccess - .hideErrors - private def getTypes(expandedJsonLd: ExpandedJsonLd): UIO[Set[Iri]] = - UIO.pure(expandedJsonLd.cursor.getTypes.getOrElse(Set.empty)) + private def getTypes(expandedJsonLd: ExpandedJsonLd): Set[Iri] = + expandedJsonLd.cursor.getTypes.getOrElse(Set.empty) - private val findRelationships: UIO[Map[Iri, Set[Iri]]] = { + private val findRelationships: IO[Map[Iri, Set[Iri]]] = { for { - resource1Types <- expanded1.flatMap(getTypes) - resource2Types <- expanded2.flatMap(getTypes) + resource1Types <- expanded1.map(getTypes) + resource2Types <- expanded2.map(getTypes) } yield Map( resource1 -> resource1Types, resource2 -> resource2Types, @@ -90,9 +92,9 @@ class GraphAnalyticsSinkSuite test("Create the update script and the index") { for { script <- scriptContent - _ <- client.createScript(updateRelationshipsScriptId, script) + _ <- toCatsIO(client.createScript(updateRelationshipsScriptId, script)) mapping <- graphAnalyticsMappings - _ <- client.createIndex(index, Some(mapping), None).assert(true) + _ <- toCatsIO(client.createIndex(index, Some(mapping), None)).assert } yield () } @@ -100,10 +102,10 @@ class GraphAnalyticsSinkSuite SuccessElem(Resources.entityType, id, Some(project), Instant.EPOCH, Offset.start, result, 1) test("Push index results") { - def indexActive(id: Iri, io: UIO[ExpandedJsonLd]) = { + def indexActive(id: Iri, io: IO[ExpandedJsonLd]) = { for { expanded <- io - types <- getTypes(expanded) + types = getTypes(expanded) doc <- JsonLdDocument.fromExpanded(expanded, _ => findRelationships) } yield { val result = @@ -125,12 +127,12 @@ class GraphAnalyticsSinkSuite deprecated = indexDeprecated(deprecatedResource, deprecatedResourceTypes) chunk = Chunk.seq(List(active1, active2, discarded, deprecated)) // We expect no error - _ <- sink(chunk).assert(chunk.map(_.void)) + _ <- sink(chunk).assertEquals(chunk.map(_.void)) // 3 documents should have been indexed correctly: // - `resource1` with the relationship to `resource3` resolved // - `resource2` with no reference resolved // - `deprecatedResource` with only metadata, resolution is skipped - _ <- client.count(index.value).eventually(3L) + _ <- toCatsIO(client.count(index.value).eventually(3L)) expected1 <- ioJsonContentOf("result/resource1.json") expected2 <- ioJsonContentOf("result/resource2.json") expectedDeprecated <- ioJsonContentOf("result/resource_deprecated.json") @@ -159,15 +161,15 @@ class GraphAnalyticsSinkSuite ) for { - _ <- sink(chunk).assert(chunk.map(_.void)) + _ <- sink(chunk).assertEquals(chunk.map(_.void)) // The reference to file1 should have been resolved and introduced as a relationship // The update query should not have an effect on the other resource - _ <- client.refresh(index) + _ <- toCatsIO(client.refresh(index)) expected1 <- ioJsonContentOf("result/resource1_updated.json") expected2 <- ioJsonContentOf("result/resource2.json") - _ <- client.count(index.value).eventually(3L) - _ <- client.getSource[Json](index, resource1.toString).eventually(expected1) - _ <- client.getSource[Json](index, resource2.toString).eventually(expected2) + _ <- toCatsIO(client.count(index.value).eventually(3L)) + _ <- toCatsIO(client.getSource[Json](index, resource1.toString).eventually(expected1)) + _ <- toCatsIO(client.getSource[Json](index, resource2.toString).eventually(expected2)) } yield () } diff --git a/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsStreamSuite.scala b/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsStreamSuite.scala index 1120d4c7ac..ba87433c06 100644 --- a/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsStreamSuite.scala +++ b/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsStreamSuite.scala @@ -1,5 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.indexing +import cats.effect.IO import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.indexing.GraphAnalyticsStreamSuite.Sample import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri @@ -9,10 +10,10 @@ import ch.epfl.bluebrain.nexus.delta.sdk.ConfigFixtures import ch.epfl.bluebrain.nexus.delta.sourcing.Serializer import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Anonymous import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Identity, ProjectRef, ResourceRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie import ch.epfl.bluebrain.nexus.delta.sourcing.state.ScopedStateStore import ch.epfl.bluebrain.nexus.delta.sourcing.state.State.ScopedState -import ch.epfl.bluebrain.nexus.testkit.bio.BioSuite -import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie +import ch.epfl.bluebrain.nexus.testkit.ce.CatsEffectSuite import doobie.implicits._ import io.circe.Codec import io.circe.generic.extras.Configuration @@ -22,7 +23,7 @@ import munit.AnyFixture import java.time.Instant import scala.annotation.nowarn -class GraphAnalyticsStreamSuite extends BioSuite with Doobie.Fixture with ConfigFixtures { +class GraphAnalyticsStreamSuite extends CatsEffectSuite with Doobie.Fixture with ConfigFixtures { override def munitFixtures: Seq[AnyFixture[_]] = List(doobie) @@ -50,16 +51,16 @@ class GraphAnalyticsStreamSuite extends BioSuite with Doobie.Fixture with Config val project2Samples = List(sample5) val expectedProject2 = project2Samples.map { s => s.id -> s.types }.toMap - def findRelationships(project: ProjectRef, ids: Set[Iri]) = + def findRelationships(project: ProjectRef, ids: Set[Iri]): IO[Map[Iri, Set[Iri]]] = GraphAnalyticsStream.findRelationships(project, xas, 2)(ids) for { // Saving samples - _ <- (project1Samples ++ project2Samples).traverse(sampleStore.unsafeSave).transact(xas.write) + _ <- (project1Samples ++ project2Samples).traverse(sampleStore.unsafeSave).transact(xas.writeCE) // Asserting relationships - _ <- findRelationships(project1, expectedProject1.keySet).assert(expectedProject1) - _ <- findRelationships(project2, expectedProject2.keySet).assert(expectedProject2) - _ <- findRelationships(project2, Set.empty).assert(Map.empty) + _ <- findRelationships(project1, expectedProject1.keySet).assertEquals(expectedProject1) + _ <- findRelationships(project2, expectedProject2.keySet).assertEquals(expectedProject2) + _ <- findRelationships(project2, Set.empty).assertEquals(Map.empty[Iri, Set[Iri]]) } yield () } diff --git a/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/routes/GraphAnalyticsRoutesSpec.scala b/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/routes/GraphAnalyticsRoutesSpec.scala index 5de6d74f72..ee0bba9c77 100644 --- a/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/routes/GraphAnalyticsRoutesSpec.scala +++ b/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/routes/GraphAnalyticsRoutesSpec.scala @@ -3,10 +3,11 @@ package ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.routes import akka.http.scaladsl.model.StatusCodes import akka.http.scaladsl.model.headers.OAuth2BearerToken import akka.http.scaladsl.server.Route +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.model.AnalyticsGraph.{Edge, EdgePath, Node} import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.model.GraphAnalyticsRejection.ProjectContextRejection import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.model.PropertiesStatistics.Metadata -import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.model.{AnalyticsGraph, GraphAnalyticsRejection, PropertiesStatistics} +import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.model.{AnalyticsGraph, PropertiesStatistics} import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.{contexts, permissions, GraphAnalytics} import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.schema @@ -27,7 +28,6 @@ import ch.epfl.bluebrain.nexus.delta.sdk.utils.BaseRouteSpec import ch.epfl.bluebrain.nexus.delta.sourcing.ProgressStatistics import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Authenticated, Group} import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef -import monix.bio.{IO, UIO} import org.scalatest.CancelAfterFailure import java.time.Instant @@ -59,7 +59,7 @@ class GraphAnalyticsRoutesSpec extends BaseRouteSpec with CancelAfterFailure { private val graphAnalytics = new GraphAnalytics { - override def relationships(projectRef: ProjectRef): IO[GraphAnalyticsRejection, AnalyticsGraph] = + override def relationships(projectRef: ProjectRef): IO[AnalyticsGraph] = IO.raiseWhen(projectRef != project.ref)(projectNotFound(projectRef)) .as( AnalyticsGraph( @@ -68,7 +68,7 @@ class GraphAnalyticsRoutesSpec extends BaseRouteSpec with CancelAfterFailure { ) ) - override def properties(projectRef: ProjectRef, tpe: IdSegment): IO[GraphAnalyticsRejection, PropertiesStatistics] = + override def properties(projectRef: ProjectRef, tpe: IdSegment): IO[PropertiesStatistics] = IO.raiseWhen(projectRef != project.ref)(projectNotFound(projectRef)) .as( PropertiesStatistics( @@ -86,7 +86,7 @@ class GraphAnalyticsRoutesSpec extends BaseRouteSpec with CancelAfterFailure { identities, aclCheck, graphAnalytics, - _ => UIO.pure(ProgressStatistics(0L, 0L, 0L, 10L, Some(Instant.EPOCH), None)), + _ => IO.pure(ProgressStatistics(0L, 0L, 0L, 10L, Some(Instant.EPOCH), None)), DeltaSchemeDirectives.empty, (_, _, _) => IO.pure(viewQueryResponse) ).routes diff --git a/delta/plugins/search/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/search/SearchScopeInitialization.scala b/delta/plugins/search/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/search/SearchScopeInitialization.scala index 598b8abfd8..eb4c1f5a57 100644 --- a/delta/plugins/search/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/search/SearchScopeInitialization.scala +++ b/delta/plugins/search/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/search/SearchScopeInitialization.scala @@ -1,7 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.search import cats.effect.IO -import cats.implicits._ import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.kernel.syntax._ diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/StorageScopeInitialization.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/StorageScopeInitialization.scala index aa79fe28a3..342bca1056 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/StorageScopeInitialization.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/StorageScopeInitialization.scala @@ -13,7 +13,6 @@ import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.Organization import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.Project import ch.epfl.bluebrain.nexus.delta.sdk.{Defaults, ScopeInitialization} import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity -import cats.implicits._ import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/OwnerPermissionsScopeInitialization.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/OwnerPermissionsScopeInitialization.scala index f06136d799..e5a81decc8 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/OwnerPermissionsScopeInitialization.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/OwnerPermissionsScopeInitialization.scala @@ -1,7 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.sdk.projects import cats.effect.IO -import cats.implicits._ import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/resolvers/Resolvers.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/resolvers/Resolvers.scala index 6b08e8a59a..8622c71e95 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/resolvers/Resolvers.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/resolvers/Resolvers.scala @@ -1,7 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.sdk.resolvers import cats.effect.{Clock, IO} -import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination.FromPagination import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Transactors.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Transactors.scala index 0bda9251a3..332d858d37 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Transactors.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Transactors.scala @@ -4,6 +4,7 @@ import cats.effect.{Blocker, IO, Resource} import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.Secret import ch.epfl.bluebrain.nexus.delta.kernel.cache.{CacheConfig, KeyValueStore} +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceUtils import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors.PartitionsCache import ch.epfl.bluebrain.nexus.delta.sourcing.config.DatabaseConfig @@ -31,11 +32,12 @@ final case class Transactors( cache: PartitionsCache )(implicit s: Scheduler) { - def readCE: Transactor[IO] = read.mapK(BIO.liftTo) - def writeCE: Transactor[IO] = write.mapK(BIO.liftTo) + def readCE: Transactor[IO] = read.mapK(BIO.liftTo) + def writeCE: Transactor[IO] = write.mapK(BIO.liftTo) + def streamingCE: Transactor[IO] = streaming.mapK(BIO.liftTo) def execDDL(ddl: String)(implicit cl: ClassLoader): Task[Unit] = - ClasspathResourceUtils.ioContentOf(ddl).flatMap(Fragment.const0(_).update.run.transact(write)).void + ClasspathResourceUtils.ioContentOf(ddl).flatMap(Fragment.const0(_).update.run.transact(writeCE)).void def execDDLs(ddls: List[String])(implicit cl: ClassLoader): Task[Unit] = ddls.traverse(execDDL).void diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/package.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/package.scala index d92e6c5fb8..d350860029 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/package.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/package.scala @@ -4,6 +4,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem import fs2.Stream import fs2.Pipe import monix.bio.Task +import cats.effect.IO package object model { @@ -11,6 +12,10 @@ package object model { type ElemStream[Value] = Stream[Task, Elem[Value]] - type ElemPipe[In, Out] = Pipe[Task, Elem[In], Elem[Out]] + type ElemPipe[In, Out] = ElemPipeF[Task, In, Out] + + type ElemPipeF[F[_], In, Out] = Pipe[F, Elem[In], Elem[Out]] + + type ElemStreamCats[Value] = Stream[IO, Elem[Value]] } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/CompiledProjection.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/CompiledProjection.scala index 505e820f30..57d2162c59 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/CompiledProjection.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/CompiledProjection.scala @@ -4,6 +4,7 @@ import cats.data.NonEmptyChain import cats.effect.concurrent.Ref import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.OperationF.SinkF import fs2.Stream import fs2.concurrent.SignallingRef import monix.bio.Task @@ -51,7 +52,7 @@ object CompiledProjection { metadata: ProjectionMetadata, executionStrategy: ExecutionStrategy, source: Source, - sink: Sink + sink: SinkF[Task] ): Either[ProjectionErr, CompiledProjection] = source.through(sink).map { p => CompiledProjection(metadata, executionStrategy, offset => _ => _ => p.apply(offset).map(_.void)) @@ -68,7 +69,7 @@ object CompiledProjection { sink: Sink ): Either[ProjectionErr, CompiledProjection] = for { - operations <- Operation.merge(chain ++ NonEmptyChain.one(sink)) + operations <- OperationF.merge(chain ++ NonEmptyChain.one(sink)) result <- source.through(operations) } yield CompiledProjection(metadata, executionStrategy, offset => _ => _ => result.apply(offset).map(_.void)) diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Operation.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Operation.scala index be69673cad..9a1177baf7 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Operation.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Operation.scala @@ -1,24 +1,63 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.stream import cats.data.NonEmptyChain +import cats.effect.{Concurrent, ContextShift, ExitCase, Fiber, IO, Timer} import cats.syntax.all._ -import ch.epfl.bluebrain.nexus.delta.sourcing.model.ElemPipe +import cats.~> +import ch.epfl.bluebrain.nexus.delta.sourcing.model.ElemPipeF import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem} +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.OperationF.{PipeF, SinkF} import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionErr.{LeapingNotAllowedErr, OperationInOutMatchErr} import com.typesafe.scalalogging.Logger import fs2.{Chunk, Pipe, Pull, Stream} import monix.bio.Task import shapeless.Typeable +import scala.concurrent.ExecutionContext import scala.concurrent.duration.FiniteDuration -/** - * Operations represent individual steps in a [[Projection]] where [[Elem]] values are processed. - * - * They are ultimately chained and attached to sources to complete the underlying projection [[Stream]] that is run. - */ -sealed trait Operation { self => +sealed trait OperationF[F[_]] { self => + + implicit def timer: Timer[F] + implicit def concurrent: Concurrent[F] + +// def mapK[G[_]](f: F ~> G, g: G ~> F): OperationF[G] = new OperationF[G] { +// override implicit def timer: Timer[G] = self.timer.mapK(f) +// +// override implicit def concurrent: Concurrent[G] = new Concurrent[G] { +// override def start[A](fa: G[A]): G[Fiber[G, A]] = ??? +// +// override def racePair[A, B](fa: G[A], fb: G[B]): G[Either[(A, Fiber[G, B]), (Fiber[G, A], B)]] = ??? +// +// override def async[A](k: (Either[Throwable, A] => Unit) => Unit): G[A] = ??? +// +// override def asyncF[A](k: (Either[Throwable, A] => Unit) => G[Unit]): G[A] = ??? +// +// override def suspend[A](thunk: => G[A]): G[A] = ??? +// +// override def bracketCase[A, B](acquire: G[A])(use: A => G[B])(release: (A, ExitCase[Throwable]) => G[Unit]): G[B] = ??? +// +// override def raiseError[A](e: Throwable): G[A] = ??? +// +// override def handleErrorWith[A](fa: G[A])(f: Throwable => G[A]): G[A] = ??? +// +// override def flatMap[A, B](fa: G[A])(f: A => G[B]): G[B] = ??? +// +// override def tailRecM[A, B](a: A)(f: A => G[Either[A, B]]): G[B] = ??? +// +// override def pure[A](x: A): G[A] = ??? +// } +// +// override type In = self.In +// override type Out = self.Out +// +// override def inType: Typeable[In] = self.inType +// override def outType: Typeable[Out] = self.outType +// +// override protected[stream] def asFs2: Pipe[G, Elem[In], Elem[Out]] = +// inputStream => self.asFs2(inputStream.translate(g)).translate(f) +// } /** * The underlying element type accepted by the Operation. @@ -48,18 +87,21 @@ sealed trait Operation { self => */ def outType: Typeable[Out] - protected[stream] def asFs2: fs2.Pipe[Task, Elem[In], Elem[Out]] + protected[stream] def asFs2: fs2.Pipe[F, Elem[In], Elem[Out]] - private[stream] def andThen(that: Operation): Either[OperationInOutMatchErr, Operation] = + private[stream] def andThen(that: OperationF[F]): Either[OperationInOutMatchErr[F], OperationF[F]] = Either.cond( self.outType.describe == that.inType.describe, - new Operation { + new OperationF[F] { + implicit val timer: Timer[F] = self.timer + implicit val concurrent: Concurrent[F] = self.concurrent + override type In = self.In override type Out = that.Out override def inType: Typeable[self.In] = self.inType override def outType: Typeable[that.Out] = that.outType - override protected[stream] def asFs2: Pipe[Task, Elem[Operation.this.In], Elem[that.Out]] = { stream => + override protected[stream] def asFs2: Pipe[F, Elem[OperationF.this.In], Elem[that.Out]] = { stream => stream .through(self.asFs2) .map { @@ -74,7 +116,10 @@ sealed trait Operation { self => /** * Send the values through the operation while returning original values */ - def tap: Operation = new Operation { + def tap: OperationF[F] = new OperationF[F] { + implicit val timer: Timer[F] = self.timer + implicit val concurrent: Concurrent[F] = self.concurrent + override type In = self.In override type Out = self.In @@ -82,7 +127,7 @@ sealed trait Operation { self => override def outType: Typeable[Out] = self.inType - override protected[stream] def asFs2: Pipe[Task, Elem[Operation.this.In], Elem[this.Out]] = + override protected[stream] def asFs2: Pipe[F, Elem[OperationF.this.In], Elem[this.Out]] = _.chunks .evalTap { chunk => Stream.chunk(chunk).through(self.asFs2).compile.drain @@ -98,7 +143,10 @@ sealed trait Operation { self => def debug( formatter: Elem[Out] => String = (elem: Elem[Out]) => elem.toString, logger: String => Unit = println(_) - ): Operation = new Operation { + ): OperationF[F] = new OperationF[F] { + implicit val timer: Timer[F] = self.timer + implicit val concurrent: Concurrent[F] = self.concurrent + override type In = self.In override type Out = self.Out @@ -106,7 +154,7 @@ sealed trait Operation { self => override def outType: Typeable[Out] = self.outType - override protected[stream] def asFs2: Pipe[Task, Elem[Operation.this.In], Elem[this.Out]] = + override protected[stream] def asFs2: Pipe[F, Elem[OperationF.this.In], Elem[this.Out]] = _.through(self.asFs2).debug(formatter, logger) } @@ -118,8 +166,11 @@ sealed trait Operation { self => * @param mapSkip * the function to apply when skipping an element */ - def leap[A](offset: Offset, mapSkip: self.In => A)(implicit ta: Typeable[A]): Either[ProjectionErr, Operation] = { - val pipe = new Operation { + def leap[A](offset: Offset, mapSkip: self.In => A)(implicit ta: Typeable[A]): Either[ProjectionErr, OperationF[F]] = { + val pipe = new OperationF[F] { + implicit val timer: Timer[F] = self.timer + implicit val concurrent: Concurrent[F] = self.concurrent + override type In = self.In override type Out = self.Out @@ -129,8 +180,8 @@ sealed trait Operation { self => override def outType: Typeable[Out] = self.outType - override protected[stream] def asFs2: Pipe[Task, Elem[Operation.this.In], Elem[this.Out]] = { - def go(s: fs2.Stream[Task, Elem[In]]): Pull[Task, Elem[this.Out], Unit] = { + override protected[stream] def asFs2: Pipe[F, Elem[OperationF.this.In], Elem[this.Out]] = { + def go(s: fs2.Stream[F, Elem[In]]): Pull[F, Elem[this.Out], Unit] = { s.pull.peek.flatMap { case Some((chunk, stream)) => val (before, after) = chunk.partitionEither { e => @@ -154,7 +205,7 @@ sealed trait Operation { self => Either.cond( ta.describe == self.outType.describe, pipe, - LeapingNotAllowedErr(self, ta) + LeapingNotAllowedErr[F, A](self, ta) ) } @@ -163,30 +214,40 @@ sealed trait Operation { self => * @param offset * the offset to reach before applying the operation */ - def identityLeap(offset: Offset): Either[ProjectionErr, Operation] = leap(offset, identity[self.In])(inType) + def identityLeap(offset: Offset): Either[ProjectionErr, OperationF[F]] = leap(offset, identity[self.In])(inType) } -object Operation { +/** + * Operations represent individual steps in a [[Projection]] where [[Elem]] values are processed. + * + * They are ultimately chained and attached to sources to complete the underlying projection [[Stream]] that is run. + */ +object OperationF { /** * Creates an operation from an fs2 Pipe * @param elemPipe * fs2 pipe */ - def fromFs2Pipe[I: Typeable](elemPipe: ElemPipe[I, Unit]): Operation = new Operation { + def fromFs2Pipe[F[_], I: Typeable]( + elemPipe: ElemPipeF[F, I, Unit] + )(implicit t: Timer[F], c: Concurrent[F]): OperationF[F] = new OperationF[F] { + implicit val timer: Timer[F] = t + implicit val concurrent: Concurrent[F] = c + override type In = I override type Out = Unit override def inType: Typeable[In] = Typeable[In] override def outType: Typeable[Out] = Typeable[Out] - override protected[stream] def asFs2: fs2.Pipe[Task, Elem[In], Elem[Out]] = elemPipe + override protected[stream] def asFs2: fs2.Pipe[F, Elem[In], Elem[Out]] = elemPipe } - def merge(first: Operation, others: Operation*): Either[ProjectionErr, Operation] = + def merge[F[_]](first: OperationF[F], others: OperationF[F]*): Either[ProjectionErr, OperationF[F]] = merge(NonEmptyChain(first, others: _*)) - def merge(operations: NonEmptyChain[Operation]): Either[ProjectionErr, Operation] = - operations.tail.foldLeftM[Either[ProjectionErr, *], Operation](operations.head) { case (acc, e) => + def merge[F[_]](operations: NonEmptyChain[OperationF[F]]): Either[ProjectionErr, OperationF[F]] = + operations.tail.foldLeftM[Either[ProjectionErr, *], OperationF[F]](operations.head) { case (acc, e) => acc.andThen(e) } @@ -200,7 +261,7 @@ object Operation { * * They are ultimately chained and attached to sources to complete the underlying projection [[Stream]] that is run. */ - trait Pipe extends Operation { + trait PipeF[F[_]] extends OperationF[F] { /** * @return @@ -218,7 +279,7 @@ object Operation { * @return * a new element (possibly failed, dropped) of type Out */ - def apply(element: SuccessElem[In]): Task[Elem[Out]] + def apply(element: SuccessElem[In]): F[Elem[Out]] /** * Checks if the provided envelope has a successful element value of type `I`. If true, it will return it in Right. @@ -233,8 +294,8 @@ object Operation { case _: FailedElem | _: DroppedElem => Left(element.asInstanceOf[Elem[O]]) } - protected[stream] def asFs2: fs2.Pipe[Task, Elem[In], Elem[Out]] = { - def go(s: fs2.Stream[Task, Elem[In]]): Pull[Task, Elem[Out], Unit] = { + protected[stream] def asFs2: fs2.Pipe[F, Elem[In], Elem[Out]] = { + def go(s: fs2.Stream[F, Elem[In]]): Pull[F, Elem[Out], Unit] = { s.pull.uncons1.flatMap { case Some((head, tail)) => partitionSuccess(head) match { @@ -242,8 +303,8 @@ object Operation { Pull .eval( apply(value) - .onErrorHandleWith { err => - Task + .handleErrorWith { err => + Concurrent[F] .delay( logger.error(s"Error while applying pipe $name on element ${value.id}", err) ) @@ -261,12 +322,15 @@ object Operation { } } - object Pipe { + object PipeF { /** * Create an identity pipe that just pass along the elem */ - def identity[A: Typeable]: Pipe = new Pipe { + def identity[F[_], A: Typeable](implicit t: Timer[F], c: Concurrent[F]): PipeF[F] = new PipeF[F] { + implicit val timer: Timer[F] = t + implicit val concurrent: Concurrent[F] = c + override def ref: PipeRef = PipeRef.unsafe("identity") override type In = A @@ -275,23 +339,69 @@ object Operation { override def inType: Typeable[In] = Typeable[In] override def outType: Typeable[Out] = Typeable[Out] - override def apply(element: SuccessElem[In]): Task[Elem[Out]] = Task.pure(element) + override def apply(element: SuccessElem[In]): F[Elem[Out]] = c.pure(element) } } - trait Sink extends Operation { + trait SinkF[F[_]] extends OperationF[F] { self => type Out = Unit + + def mapK[G[_]](f: F ~> G): SinkF[G] = new SinkF[G] { + override def chunkSize: Int = self.chunkSize + + override def maxWindow: FiniteDuration = self.maxWindow + + override def apply(elements: Chunk[Elem[In]]): G[Chunk[Elem[Unit]]] = + f(self.apply(elements)) + + implicit override def timer: Timer[G] = self.timer.mapK(f) + + implicit override def concurrent: Concurrent[G] = new Concurrent[G] { + override def start[A](fa: G[A]): G[Fiber[G, A]] = ??? + + override def racePair[A, B](fa: G[A], fb: G[B]): G[Either[(A, Fiber[G, B]), (Fiber[G, A], B)]] = ??? + + override def async[A](k: (Either[Throwable, A] => Unit) => Unit): G[A] = ??? + + override def asyncF[A](k: (Either[Throwable, A] => Unit) => G[Unit]): G[A] = ??? + + override def suspend[A](thunk: => G[A]): G[A] = ??? + + override def bracketCase[A, B](acquire: G[A])(use: A => G[B])( + release: (A, ExitCase[Throwable]) => G[Unit] + ): G[B] = ??? + + override def raiseError[A](e: Throwable): G[A] = ??? + + override def handleErrorWith[A](fa: G[A])(f: Throwable => G[A]): G[A] = ??? + + override def flatMap[A, B](fa: G[A])(f: A => G[B]): G[B] = ??? + + override def tailRecM[A, B](a: A)(f: A => G[Either[A, B]]): G[B] = ??? + + override def pure[A](x: A): G[A] = ??? + } + + override type In = self.In + + /** + * @return + * the Typeable instance for the accepted element type + */ + override def inType: Typeable[In] = self.inType + } + def outType: Typeable[Out] = Typeable[Unit] def chunkSize: Int def maxWindow: FiniteDuration - def apply(elements: Chunk[Elem[In]]): Task[Chunk[Elem[Unit]]] + def apply(elements: Chunk[Elem[In]]): F[Chunk[Elem[Unit]]] - protected[stream] def asFs2: fs2.Pipe[Task, Elem[In], Elem[Unit]] = + protected[stream] def asFs2: fs2.Pipe[F, Elem[In], Elem[Unit]] = _.groupWithin(chunkSize, maxWindow) .evalMap { chunk => apply(chunk) @@ -299,8 +409,38 @@ object Operation { .flatMap(Stream.chunk) } - type Aux[I, O] = Operation { + type Aux[F[_], I, O] = OperationF[F] { type In = I type Out = O } } + +object Operation { + + trait SinkTask extends SinkF[Task] { + implicit val timer: Timer[Task] = monix.bio.IO.timer + implicit val concurrent: Concurrent[Task] = monix.bio.IO.catsAsync + } + + type Sink = SinkTask + + trait PipeTask extends PipeF[Task] { + implicit val timer: Timer[Task] = monix.bio.IO.timer + implicit val concurrent: Concurrent[Task] = monix.bio.IO.catsAsync + } + + type Pipe = PipeTask + + trait SinkCatsEffect extends SinkF[IO] { + implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global) + implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) + implicit val concurrent: Concurrent[IO] = IO.ioConcurrentEffect + } + + trait PipeCatsEffect extends PipeF[IO] { + implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global) + implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) + implicit val concurrent: Concurrent[IO] = IO.ioConcurrentEffect + } + +} diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/PipeChain.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/PipeChain.scala index 0dfb7fde51..572b1f852f 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/PipeChain.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/PipeChain.scala @@ -36,7 +36,7 @@ object PipeChain { configured <- pipeChain.pipes.traverse { case (ref, cfg) => registry.lookup(ref).flatMap(_.withJsonLdConfig(cfg)) } - chained <- Operation.merge(configured) + chained <- OperationF.merge(configured) } yield chained def validate(pipeChain: PipeChain, registry: ReferenceRegistry): Either[ProjectionErr, Unit] = diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/PipeDefCats.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/PipeDefCats.scala new file mode 100644 index 0000000000..88a642669a --- /dev/null +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/PipeDefCats.scala @@ -0,0 +1,65 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing.stream + +import cats.implicits._ +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.ExpandedJsonLd +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.decoder.JsonLdDecoder +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.PipeCatsEffect +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionErr.CouldNotDecodePipeConfigErr +import shapeless.Typeable + +/** + * Contract for definition for pipes. PipeDefs are used to construct [[PipeCatsEffect]] instances to be used when + * materializing runnable projections. They are indexed by their label in a registry such they can be looked up given a + * [[PipeRef]]. Provided with a configuration, pipe definitions can produce [[PipeCatsEffect]] instances. + */ +trait PipeDefCats { + + /** + * The type of the [[PipeCatsEffect]] that this definition produces. + */ + type PipeType <: PipeCatsEffect + + /** + * The required configuration type for producing a [[PipeCatsEffect]] of this type. + */ + type Config + + /** + * @return + * the Typeable instance for the required [[PipeCatsEffect]] configuration + */ + def configType: Typeable[Config] + + /** + * @return + * a json-ld decoder for the [[PipeCatsEffect]] configuration + */ + def configDecoder: JsonLdDecoder[Config] + + /** + * @return + * the unique reference for a pipe of this type + */ + def ref: PipeRef + + /** + * Produces a [[PipeCatsEffect]] instance given an expected configuration. + * + * @param config + * the configuration for the [[PipeDefCats]] + */ + def withConfig(config: Config): PipeType + + /** + * Attempts to construct a corresponding [[PipeCatsEffect]] instance by decoding the required configuration from a + * json-ld configuration. + * + * @param jsonLd + * the source configuration in the json-ld format + */ + def withJsonLdConfig(jsonLd: ExpandedJsonLd): Either[CouldNotDecodePipeConfigErr, PipeType] = + configDecoder(jsonLd) + .map(c => withConfig(c)) + .leftMap(e => CouldNotDecodePipeConfigErr(jsonLd, configType.describe, ref, e.reason)) + +} diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectionErr.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectionErr.scala index 53021e3f6b..1e70b521bc 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectionErr.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectionErr.scala @@ -28,7 +28,7 @@ object ProjectionErr { * @param that * the source being merged with self */ - final case class SourceOutMatchErr(self: Source, that: Source) extends ProjectionErr { + final case class SourceOutMatchErr[F[_]](self: SourceF[F], that: SourceF[F]) extends ProjectionErr { override def reason: String = s"Unable to match Out type '${self.outType.describe}' of source '${self.name}' to the Out type '${that.outType.describe}' of source '${that.name}'" } @@ -41,7 +41,7 @@ object ProjectionErr { * @param that * the operation to attach to the source */ - final case class SourceOutPipeInMatchErr(self: Source, that: Operation) extends ProjectionErr { + final case class SourceOutPipeInMatchErr[F[_]](self: SourceF[F], that: OperationF[F]) extends ProjectionErr { override def reason: String = s"Unable to match Out type '${self.outType.describe}' of source '${self.name}' to the In type '${that.inType.describe}' of pipe '${that.name}'" } @@ -76,7 +76,7 @@ object ProjectionErr { * @param that * the operation being merged with self */ - final case class OperationInOutMatchErr(self: Operation, that: Operation) extends ProjectionErr { + final case class OperationInOutMatchErr[F[_]](self: OperationF[F], that: OperationF[F]) extends ProjectionErr { override def reason: String = s"Unable to match Out type '${self.outType.describe}' of operation '${self.name}' to the In type '${that.inType.describe}' of operation '${that.name}'" } @@ -84,7 +84,7 @@ object ProjectionErr { /** * Leaping is only possible for an operation when we provide a skip function that aligns to the out type */ - final case class LeapingNotAllowedErr[A](self: Operation, skip: Typeable[A]) extends ProjectionErr { + final case class LeapingNotAllowedErr[F[_], A](self: OperationF[F], skip: Typeable[A]) extends ProjectionErr { override def reason: String = s"Unable to leap on operation '${self.name}' as skip type '${skip.describe}' does not match Out type '${self.outType.describe}'." } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Source.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Source.scala index a65b289121..2a17835b77 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Source.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Source.scala @@ -1,6 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.stream import cats.data.NonEmptyChain +import cats.effect.Concurrent import cats.implicits._ import ch.epfl.bluebrain.nexus.delta.sourcing.model.ElemStream import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset @@ -18,7 +19,9 @@ import shapeless.Typeable * A [[Projection]] may make use of multiple [[Source]] s (at least one) that will be further chained with * [[Operation]] s and ultimately merged together. */ -trait Source { self => +trait SourceF[F[_]] { self => + + implicit def concurrent: Concurrent[F] /** * The underlying element type emitted by the Source. @@ -46,16 +49,17 @@ trait Source { self => * @return * an [[fs2.Stream]] of elements of this Source's Out type */ - def apply(offset: Offset): Stream[Task, Elem[Out]] + def apply(offset: Offset): Stream[F, Elem[Out]] - def through(operation: Operation): Either[SourceOutPipeInMatchErr, Source] = + def through(operation: OperationF[F]): Either[SourceOutPipeInMatchErr[F], SourceF[F]] = Either.cond( outType.describe == operation.inType.describe, - new Source { + new SourceF[F] { + implicit def concurrent: Concurrent[F] = self.concurrent override type Out = operation.Out override def outType: Typeable[operation.Out] = operation.outType - override def apply(offset: Offset): Stream[Task, Elem[operation.Out]] = + override def apply(offset: Offset): Stream[F, Elem[operation.Out]] = self .apply(offset) .map { @@ -71,14 +75,15 @@ trait Source { self => SourceOutPipeInMatchErr(self, operation) ) - private[stream] def merge(that: Source): Either[SourceOutMatchErr, Source] = + private[stream] def merge(that: SourceF[F]): Either[SourceOutMatchErr[F], SourceF[F]] = Either.cond( self.outType.describe == that.outType.describe, - new Source { + new SourceF[F] { + implicit def concurrent: Concurrent[F] = self.concurrent override type Out = self.Out override def outType: Typeable[self.Out] = self.outType - override def apply(offset: Offset): Stream[Task, Elem[Out]] = + override def apply(offset: Offset): Stream[F, Elem[Out]] = self .apply(offset) .merge(that.apply(offset).map { @@ -94,22 +99,23 @@ trait Source { self => ) def broadcastThrough( - operations: NonEmptyChain[Operation] - ): Either[SourceOutPipeInMatchErr, Source.Aux[Unit]] = + operations: NonEmptyChain[OperationF[F]] + ): Either[SourceOutPipeInMatchErr[F], SourceF.Aux[F, Unit]] = operations .traverse { operation => Either.cond( self.outType.describe == operation.inType.describe, - operation.asInstanceOf[Operation.Aux[self.Out, Unit]], + operation.asInstanceOf[OperationF.Aux[F, self.Out, Unit]], SourceOutPipeInMatchErr(self, operation) ) } .map { verified => - new Source { + new SourceF[F] { + implicit def concurrent: Concurrent[F] = self.concurrent override type Out = Unit override def outType: Typeable[Unit] = Typeable[Unit] - override def apply(offset: Offset): Stream[Task, Elem[Unit]] = + override def apply(offset: Offset): Stream[F, Elem[Unit]] = self .apply(offset) .broadcastThrough(verified.toList.map { _.asFs2 }: _*) @@ -118,6 +124,24 @@ trait Source { self => } +object SourceF { + + type Aux[F[_], O] = SourceF[F] { + type Out = O + } + + def apply[F[_], A: Typeable](stream: Offset => Stream[F, Elem[A]])(implicit c: Concurrent[F]): SourceF[F] = + new SourceF[F] { + override type Out = A + + override def outType: Typeable[A] = Typeable[A] + + override def apply(offset: Offset): Stream[F, Elem[A]] = stream(offset) + + implicit override def concurrent: Concurrent[F] = c + } +} + object Source { type Aux[O] = Source { @@ -131,5 +155,7 @@ object Source { override def outType: Typeable[A] = Typeable[A] override def apply(offset: Offset): Stream[Task, Elem[A]] = stream(offset) + + implicit override def concurrent: Concurrent[Task] = monix.bio.IO.catsAsync } } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/package.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/package.scala new file mode 100644 index 0000000000..2133e6895a --- /dev/null +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/package.scala @@ -0,0 +1,12 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing + +import cats.effect.IO +import monix.bio.Task + +package object stream { + type Operation = OperationF[Task] + + type Source = SourceF[Task] + + type SourceCatsEffect = SourceF[IO] +} diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/GenericPipeCats.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/GenericPipeCats.scala new file mode 100644 index 0000000000..7bbcb073ef --- /dev/null +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/GenericPipeCats.scala @@ -0,0 +1,64 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes + +import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.decoder.JsonLdDecoder +import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.SuccessElem +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.PipeCatsEffect +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{Elem, PipeDefCats, PipeRef} +import shapeless.Typeable + +/** + * A generic pipe instance backed by a fn. + * @param label + * the pipe label + * @param fn + * the fn to apply to each element + * @tparam I + * the input element type + * @tparam O + * the output element type + */ +class GenericPipeCats[I: Typeable, O: Typeable] private[stream] ( + label: Label, + fn: SuccessElem[I] => IO[Elem[O]] +) extends PipeCatsEffect { + override val ref: PipeRef = PipeRef(label) + override type In = I + override type Out = O + override def inType: Typeable[In] = Typeable[In] + override def outType: Typeable[Out] = Typeable[Out] + + override def apply(element: SuccessElem[In]): IO[Elem[Out]] = fn(element) +} + +object GenericPipeCats { + + /** + * Lifts the argument fn to a pipe and associated definition. + * @param label + * the pipe/pipedef label + * @param fn + * the fn to apply to elements + * @tparam I + * the input element type + * @tparam O + * the output element type + */ + def apply[I: Typeable, O: Typeable](label: Label, fn: SuccessElem[I] => IO[Elem[O]]): PipeDefCats = + new GenericPipeDef(label, fn) + + private class GenericPipeDef[I: Typeable, O: Typeable] private[stream] ( + label: Label, + fn: SuccessElem[I] => IO[Elem[O]] + ) extends PipeDefCats { + override val ref: PipeRef = PipeRef(label) + override type PipeType = GenericPipeCats[I, O] + override type Config = Unit + override def configType: Typeable[Unit] = Typeable[Unit] + override def configDecoder: JsonLdDecoder[Unit] = JsonLdDecoder[Unit] + + override def withConfig(config: Unit): GenericPipeCats[I, O] = new GenericPipeCats[I, O](label, fn) + } + +} diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/CacheSink.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/CacheSink.scala index 3d96e75d85..7abcbe5e95 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/CacheSink.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/CacheSink.scala @@ -1,19 +1,49 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.stream +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.syntax.iriStringContextSyntax import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem} -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.{Sink, SinkCatsEffect} import fs2.Chunk import monix.bio.Task import shapeless.Typeable -import collection.mutable.{Set => MutableSet} import scala.collection.concurrent.TrieMap import scala.collection.mutable +import scala.collection.mutable.{Set => MutableSet} import scala.concurrent.duration._ -final class CacheSink[A: Typeable] private (documentId: Elem[A] => Iri) extends Sink { +final class CacheSink[A: Typeable] private (documentId: Elem[A] => Iri) extends SinkCatsEffect { + + val successes: mutable.Map[Iri, A] = TrieMap.empty[Iri, A] + val dropped: MutableSet[Iri] = MutableSet.empty[Iri] + val failed: MutableSet[Iri] = MutableSet.empty[Iri] + + override type In = A + + override def inType: Typeable[A] = Typeable[A] + + override def apply(elements: Chunk[Elem[A]]): IO[Chunk[Elem[Unit]]] = IO.delay { + elements.map { + case s: SuccessElem[A] => + successes.put(documentId(s), s.value) + s.void + case d: DroppedElem => + dropped.add(documentId(d)) + d + case f: FailedElem => + failed.add(documentId(f)) + f + } + } + + override def chunkSize: Int = 1 + + override def maxWindow: FiniteDuration = 10.millis +} + +final class CacheSink2[A: Typeable] private (documentId: Elem[A] => Iri) extends Sink { val successes: mutable.Map[Iri, A] = TrieMap.empty[Iri, A] val dropped: MutableSet[Iri] = MutableSet.empty[Iri] @@ -55,3 +85,17 @@ object CacheSink { /** CacheSink for states */ def states[A: Typeable]: CacheSink[A] = new CacheSink(_.id) } + +object CacheSink2 { + + private val eventDocumentId: Elem[_] => Iri = elem => + elem.project match { + case Some(project) => iri"$project/${elem.id}:${elem.rev}" + case None => iri"${elem.id}:${elem.rev}" + } + + /** CacheSink for events */ + def events[A: Typeable]: CacheSink2[A] = new CacheSink2(eventDocumentId) + + def states[A: Typeable]: CacheSink2[A] = new CacheSink2(_.id) +} diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/OperationSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/OperationSuite.scala index f3935e7d6a..c19e576eae 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/OperationSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/OperationSuite.scala @@ -1,25 +1,26 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.stream +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.SuccessElem -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Pipe +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.PipeCatsEffect import ch.epfl.bluebrain.nexus.delta.sourcing.stream.OperationSuite.{double, half, until} import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionErr.{LeapingNotAllowedErr, OperationInOutMatchErr} -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes.GenericPipe -import ch.epfl.bluebrain.nexus.testkit.bio.{BioSuite, StreamAssertions} +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes.GenericPipeCats +import ch.epfl.bluebrain.nexus.testkit.bio.StreamAssertions +import ch.epfl.bluebrain.nexus.testkit.ce.{CatsEffectSuite, CatsIOValues} import fs2.Stream -import monix.bio.Task import shapeless.Typeable import java.time.Instant -class OperationSuite extends BioSuite with StreamAssertions { +class OperationSuite extends CatsEffectSuite with StreamAssertions { test("Run the double stream") { val sink = CacheSink.states[Int] - val operations = Operation.merge(double, sink).rightValue + val operations = OperationF.merge[IO](double, sink).rightValue for { _ <- until(5).through(operations).rightValue.apply(Offset.Start).assertSize(5) @@ -31,7 +32,7 @@ class OperationSuite extends BioSuite with StreamAssertions { test("Run the half stream") { val sink = CacheSink.states[Double] - val operations = Operation.merge(half, sink).rightValue + val operations = OperationF.merge[IO](half, sink).rightValue for { _ <- until(5).through(operations).rightValue.apply(Offset.at(1L)).assertSize(4) @@ -43,7 +44,7 @@ class OperationSuite extends BioSuite with StreamAssertions { test("Fail as the input and output types don't match") { val sink = CacheSink.states[Int] - Operation.merge(half, sink).assertLeft(OperationInOutMatchErr(half, sink)) + OperationF.merge(half, sink).assertLeft(OperationInOutMatchErr(half, sink)) } test("Run the double stream as an tap operation") { @@ -52,8 +53,8 @@ class OperationSuite extends BioSuite with StreamAssertions { // We should have the originals here val sink2 = CacheSink.states[Int] - val tap = Operation.merge(double, sink1).rightValue.tap - val all = Operation.merge(tap, sink2).rightValue + val tap = OperationF.merge(double, sink1).rightValue.tap + val all = OperationF.merge(tap, sink2).rightValue for { _ <- until(5).through(all).rightValue.apply(Offset.Start).assertSize(5) @@ -70,8 +71,8 @@ class OperationSuite extends BioSuite with StreamAssertions { val sink = CacheSink.states[Int] val first = double.identityLeap(Offset.at(2L)).rightValue - val second = Operation.merge(double, sink).rightValue - val all = Operation.merge(first, second).rightValue + val second = OperationF.merge(double, sink).rightValue + val all = OperationF.merge(first, second).rightValue for { _ <- until(5).through(all).rightValue.apply(Offset.Start).assertSize(5) @@ -87,17 +88,18 @@ class OperationSuite extends BioSuite with StreamAssertions { } -object OperationSuite { +object OperationSuite extends CatsIOValues { private val numberType = EntityType("number") - private def until(n: Int): Source = Source(offset => - Stream.range(offset.value.toInt, n).covary[Task].map { i => + private def until(n: Int): SourceCatsEffect = SourceF(offset => + Stream.range(offset.value.toInt, n).covary[IO].map { i => SuccessElem(numberType, nxv + i.toString, None, Instant.EPOCH, Offset.at(i.toLong), i, 1) } ) - val double: Pipe = new GenericPipe[Int, Int](Label.unsafe("double"), _.evalMap { v => Task.pure(v * 2) }) - val half: Pipe = new GenericPipe[Int, Double](Label.unsafe("half"), _.evalMap { v => Task.pure((v.toDouble / 2)) }) + val double: PipeCatsEffect = new GenericPipeCats[Int, Int](Label.unsafe("double"), x => IO.pure(x.map { v => v * 2 })) + val half: PipeCatsEffect = + new GenericPipeCats[Int, Double](Label.unsafe("half"), x => IO.pure(x.map { v => v.toDouble / 2 })) } diff --git a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/bio/StreamAssertions.scala b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/bio/StreamAssertions.scala index eba9949a19..6fcf000495 100644 --- a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/bio/StreamAssertions.scala +++ b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/bio/StreamAssertions.scala @@ -1,47 +1,31 @@ package ch.epfl.bluebrain.nexus.testkit.bio -import cats.effect.{ContextShift, IO, Timer} +import cats.effect.{Concurrent, Timer} +import cats.syntax.all._ import fs2.Stream -import monix.bio.Task import munit.{Assertions, Location} import scala.concurrent.duration.DurationInt trait StreamAssertions { self: Assertions => - implicit class StreamAssertionsOps[A](stream: Stream[Task, A])(implicit loc: Location) { - def assert(expected: List[A]): Task[Unit] = - stream.take(expected.size.toLong).timeout(3.seconds).mask.compile.toList.map { obtained => - assertEquals(obtained, expected, s"Got ${obtained.size} elements, ${expected.size} elements were expected.") - } - - def assertSize(expected: Int): Task[Unit] = - stream.take(expected.toLong).timeout(3.seconds).mask.compile.toList.map { obtained => - assertEquals(obtained.size, expected, s"Got ${obtained.size} elements, $expected elements were expected.") - } - - def assert(expected: A*): Task[Unit] = assert(expected.toList) - def assertEmpty: Task[Unit] = assert(List.empty) - } - - implicit class CEStreamAssertionsOps[A](stream: Stream[IO, A])(implicit + implicit class StreamAssertionsOps[F[_], A](stream: Stream[F, A])(implicit loc: Location, - contextShift: ContextShift[IO], - timer: Timer[IO] + t: Timer[F], + c: Concurrent[F] ) { - def assert(expected: List[A]): IO[Unit] = + def assert(expected: List[A]): F[Unit] = stream.take(expected.size.toLong).timeout(3.seconds).mask.compile.toList.map { obtained => assertEquals(obtained, expected, s"Got ${obtained.size} elements, ${expected.size} elements were expected.") } - def assertSize(expected: Int): IO[Unit] = + def assertSize(expected: Int): F[Unit] = stream.take(expected.toLong).timeout(3.seconds).mask.compile.toList.map { obtained => assertEquals(obtained.size, expected, s"Got ${obtained.size} elements, $expected elements were expected.") } - def assert(expected: A*): IO[Unit] = assert(expected.toList) - - def assertEmpty: IO[Unit] = assert(List.empty) + def assert(expected: A*): F[Unit] = assert(expected.toList) + def assertEmpty: F[Unit] = assert(List.empty) } } diff --git a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsEffectSuite.scala b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsEffectSuite.scala index 39a87deca8..7b2b6dcb84 100644 --- a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsEffectSuite.scala +++ b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsEffectSuite.scala @@ -1,29 +1,25 @@ package ch.epfl.bluebrain.nexus.testkit.ce -import cats.effect.{ContextShift, IO, Timer} +import cats.effect.IO import ch.epfl.bluebrain.nexus.testkit.NexusSuite import ch.epfl.bluebrain.nexus.testkit.bio.{CollectionAssertions, EitherAssertions, StreamAssertions} import monix.bio.{IO => BIO} import monix.execution.Scheduler -import scala.concurrent.ExecutionContext import scala.concurrent.duration.{DurationInt, FiniteDuration} /** * Adapted from: * https://github.com/typelevel/munit-cats-effect/blob/main/core/src/main/scala/munit/CatsEffectSuite.scala */ -abstract class CatsEffectSuite +trait CatsEffectSuite extends NexusSuite with CatsEffectAssertions with StreamAssertions with CollectionAssertions - with EitherAssertions { - protected val ioTimeout: FiniteDuration = 45.seconds - - implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) - implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) - + with EitherAssertions + with CatsIOValues { + protected val ioTimeout: FiniteDuration = 45.seconds override def munitValueTransforms: List[ValueTransform] = super.munitValueTransforms ++ List(munitIOTransform, munitBIOTransform) diff --git a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/CompositeViewsLifeCycleSpec.scala b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/CompositeViewsLifeCycleSpec.scala index ce15eef591..187bd3ba5a 100644 --- a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/CompositeViewsLifeCycleSpec.scala +++ b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/CompositeViewsLifeCycleSpec.scala @@ -2,6 +2,7 @@ package ch.epfl.bluebrain.nexus.tests.kg import cats.data.NonEmptyMap import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.tests.BaseSpec import ch.epfl.bluebrain.nexus.tests.Identity.compositeviews.Jerry import ch.epfl.bluebrain.nexus.tests.iam.types.Permission.{Events, Organizations, Views} @@ -9,10 +10,13 @@ import ch.epfl.bluebrain.nexus.tests.kg.CompositeViewsLifeCycleSpec.Spaces.Proje import ch.epfl.bluebrain.nexus.tests.kg.CompositeViewsLifeCycleSpec.{query, Spaces} import io.circe.generic.semiauto.deriveDecoder import io.circe.{Decoder, Json} +import monix.bio.Task import org.scalactic.source.Position final class CompositeViewsLifeCycleSpec extends BaseSpec { + implicit private val classLoader: ClassLoader = getClass.getClassLoader + private val orgId = genId() private val projId = genId() private val proj2Id = genId() @@ -31,7 +35,7 @@ final class CompositeViewsLifeCycleSpec extends BaseSpec { private val viewEndpoint = s"/views/$orgId/$projId/composite" - private def createView(query: String, includeCrossProject: Boolean, includeSparqlProjection: Boolean) = { + private def createView(query: String, includeCrossProject: Boolean, includeSparqlProjection: Boolean): IO[Json] = { val includeCrossProjectOpt = Option.when(includeCrossProject)("cross_project" -> includeCrossProject.toString) val includeSparqlProjectionOpt = Option.when(includeSparqlProjection)("sparql" -> includeSparqlProjection.toString) val values = List( @@ -39,18 +43,17 @@ final class CompositeViewsLifeCycleSpec extends BaseSpec { "proj" -> proj2Id, "query" -> query ) ++ includeCrossProjectOpt ++ includeSparqlProjectionOpt - IO( - jsonContentOf( - "/kg/views/composite/composite-view-lifecycle.json", - replacements( - Jerry, - values: _* - ): _* - ) + ioJsonContentOf( + "/kg/views/composite/composite-view-lifecycle.json", + replacements( + Jerry, + values: _* + ): _* ) + } - private def fetchSpaces = deltaClient.getJson[Spaces](s"$viewEndpoint/description", Jerry) + private def fetchSpaces: Task[Spaces] = deltaClient.getJson[Spaces](s"$viewEndpoint/description", Jerry) private def includeAllSpaces(spaces: Spaces)(implicit pos: Position) = { eventually { blazegraphDsl.includes(spaces.commonSpace) }