diff --git a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/IndexingViewDefSuite.scala b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/IndexingViewDefSuite.scala index 5b5030e769..613cac12a6 100644 --- a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/IndexingViewDefSuite.scala +++ b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/IndexingViewDefSuite.scala @@ -56,7 +56,7 @@ class IndexingViewDefSuite extends BioSuite { private val aggregate = AggregateBlazegraphViewValue(Some("viewName"), Some("viewDescription"), NonEmptySet.of(viewRef)) - private val sink = CacheSink.states[NTriples] + private val sink = CacheSink2.states[NTriples] private def state(v: BlazegraphViewValue) = BlazegraphViewState( id, @@ -156,7 +156,7 @@ class IndexingViewDefSuite extends BioSuite { for { compiled <- IndexingViewDef.compile( v, - _ => Operation.merge(FilterDeprecated.withConfig(()), FilterByType.withConfig(filterByTypeConfig)), + _ => OperationF.merge(FilterDeprecated.withConfig(()), FilterByType.withConfig(filterByTypeConfig)), GraphResourceStream.unsafeFromStream(PullRequestStream.generate(projectRef)), sink ) diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeViewDef.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeViewDef.scala index 452c89069a..16598cb21d 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeViewDef.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeViewDef.scala @@ -493,7 +493,7 @@ object CompositeViewDef { ): Either[ProjectionErr, Operation] = { //TODO Add leap on target val branchProgress = progress.branches.get(branch) - Operation + OperationF .merge(operation, closeBranch(branch, branchProgress.getOrElse(ProjectionProgress.NoProgress))) } @@ -519,9 +519,9 @@ object CompositeViewDef { for { pipes <- source.pipeChain.traverse(compilePipeChain) // We apply `Operation.tap` as we want to keep the GraphResource for the rest of the stream - tail <- Operation.merge(GraphResourceToNTriples, sink).map(_.tap) + tail <- OperationF.merge(GraphResourceToNTriples, sink).map(_.tap) chain = pipes.fold(NonEmptyChain.one(tail))(NonEmptyChain(_, tail)) - operation <- Operation.merge(chain) + operation <- OperationF.merge(chain) // We create the elem stream for the two types of branch // The main source produces an infinite stream and waits for new elements mainSource = graphStream.main(source, project) @@ -550,7 +550,7 @@ object CompositeViewDef { for { pipes <- target.pipeChain.traverse(compilePipeChain) chain = pipes.fold(tail)(NonEmptyChain.one(_) ++ tail) - result <- Operation.merge(chain) + result <- OperationF.merge(chain) } yield target.id -> result } diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/projections/CompositeProjections.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/projections/CompositeProjections.scala index f8dc446497..4fdf3eb515 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/projections/CompositeProjections.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/projections/CompositeProjections.scala @@ -110,7 +110,7 @@ object CompositeProjections { branch: CompositeBranch, progress: ProjectionProgress ): Operation = - Operation.fromFs2Pipe[Unit]( + OperationF.fromFs2Pipe[Task, Unit]( Projection.persist( progress, compositeProgressStore.save(view.indexingRef, branch, _), diff --git a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeViewDefSuite.scala b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeViewDefSuite.scala index 3c942cf64c..b16a6e7be6 100644 --- a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeViewDefSuite.scala +++ b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeViewDefSuite.scala @@ -1,5 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing +import cats.effect.Concurrent import cats.effect.concurrent.Ref import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.CompositeViewsFixture import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeView.{Interval, RebuildStrategy} @@ -11,7 +12,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes.FilterDeprecated -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{NoopSink, RemainingElems, Source} +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{NoopSink, RemainingElems, Source, SourceF} import ch.epfl.bluebrain.nexus.testkit.bio.{BioSuite, PatienceConfig} import fs2.Stream import monix.bio.{Task, UIO} @@ -27,11 +28,13 @@ class CompositeViewDefSuite extends BioSuite with CompositeViewsFixture { test("Compile correctly the source") { - def makeSource(nameValue: String): Source = new Source { + def makeSource(nameValue: String): Source = new SourceF[Task] { override type Out = Unit override def outType: Typeable[Unit] = Typeable[Unit] override def apply(offset: Offset): ElemStream[Unit] = Stream.empty[Task] override def name: String = nameValue + + implicit override def concurrent: Concurrent[Task] = monix.bio.IO.catsAsync } val graphStream = new CompositeGraphStream { diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/IndexingViewDefSuite.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/IndexingViewDefSuite.scala index 1bc11746e2..2bd12b39c6 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/IndexingViewDefSuite.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/IndexingViewDefSuite.scala @@ -76,7 +76,7 @@ class IndexingViewDefSuite extends BioSuite with CirceLiteral with Fixtures { ) private val aggregate = AggregateElasticSearchViewValue(NonEmptySet.of(viewRef)) - private val sink = CacheSink.states[Json] + private val sink = CacheSink2.states[Json] private val indexingRev = IndexingRev.init private val rev = 2 @@ -212,7 +212,7 @@ class IndexingViewDefSuite extends BioSuite with CirceLiteral with Fixtures { for { compiled <- IndexingViewDef.compile( v, - _ => Operation.merge(FilterDeprecated.withConfig(()), FilterByType.withConfig(filterByTypeConfig)), + _ => OperationF.merge(FilterDeprecated.withConfig(()), FilterByType.withConfig(filterByTypeConfig)), GraphResourceStream.unsafeFromStream(PullRequestStream.generate(projectRef)), sink ) diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/metrics/EventMetricsProjectionSuite.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/metrics/EventMetricsProjectionSuite.scala index 81ac060e88..46eaed712e 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/metrics/EventMetricsProjectionSuite.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/metrics/EventMetricsProjectionSuite.scala @@ -3,7 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.metrics import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.metrics.MetricsStream._ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.{EventMetricsProjection, Fixtures} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{CacheSink, ProjectionProgress, SupervisorSetup} +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{CacheSink2, ProjectionProgress, SupervisorSetup} import ch.epfl.bluebrain.nexus.testkit.bio.{BioSuite, PatienceConfig} import io.circe.Json import io.circe.syntax.EncoderOps @@ -20,7 +20,7 @@ class EventMetricsProjectionSuite extends BioSuite with SupervisorSetup.Fixture implicit private val patienceConfig: PatienceConfig = PatienceConfig(2.seconds, 10.millis) private lazy val sv = supervisor().supervisor - private val sink = CacheSink.events[Json] + private val sink = CacheSink2.events[Json] test("Start the metrics projection") { for { diff --git a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsCoordinator.scala b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsCoordinator.scala index 6318cf110e..a48fd4dffe 100644 --- a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsCoordinator.scala +++ b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsCoordinator.scala @@ -3,6 +3,8 @@ package ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics import cats.effect.IO import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.Logger +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.MigrateEffectSyntax import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.GraphAnalytics.{index, projectionName} import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.config.GraphAnalyticsConfig @@ -11,9 +13,8 @@ import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv import ch.epfl.bluebrain.nexus.delta.sdk.projects.Projects import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.SinkCatsEffect import ch.epfl.bluebrain.nexus.delta.sourcing.stream._ -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration import fs2.Stream import org.typelevel.log4cats.{Logger => Log4CatsLogger} @@ -46,10 +47,11 @@ object GraphAnalyticsCoordinator { fetchProjects: Offset => Stream[IO, Elem[ProjectDef]], analyticsStream: GraphAnalyticsStream, supervisor: Supervisor, - sink: ProjectRef => Sink, + sink: ProjectRef => SinkCatsEffect, createIndex: ProjectRef => IO[Unit], deleteIndex: ProjectRef => IO[Unit] - ) extends GraphAnalyticsCoordinator { + ) extends GraphAnalyticsCoordinator + with MigrateEffectSyntax { def run(offset: Offset): Stream[IO, Elem[Unit]] = fetchProjects(offset).evalMap { @@ -63,8 +65,8 @@ object GraphAnalyticsCoordinator { CompiledProjection.compile( analyticsMetadata(project), ExecutionStrategy.PersistentSingleNode, - Source(analyticsStream(project, _)), - sink(project) + Source(analyticsStream(project, _).translate(ioToTaskK)), + sink(project).mapK(ioToTaskK) ) ) // Start the analysis projection for the given project @@ -162,7 +164,7 @@ object GraphAnalyticsCoordinator { fetchProjects: Offset => Stream[IO, Elem[ProjectDef]], analyticsStream: GraphAnalyticsStream, supervisor: Supervisor, - sink: ProjectRef => Sink, + sink: ProjectRef => SinkCatsEffect, createIndex: ProjectRef => IO[Unit], deleteIndex: ProjectRef => IO[Unit] ): IO[GraphAnalyticsCoordinator] = { diff --git a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsSink.scala b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsSink.scala index 13f19f58e9..300d883612 100644 --- a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsSink.scala +++ b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsSink.scala @@ -1,5 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.indexing +import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toCatsIO import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent import ch.epfl.bluebrain.nexus.delta.kernel.syntax.kamonSyntax import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh @@ -9,12 +11,11 @@ import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.indexing.GraphAnaly import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem} -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.SinkCatsEffect import fs2.Chunk import io.circe.JsonObject import io.circe.literal._ import io.circe.syntax.EncoderOps -import monix.bio.Task import shapeless.Typeable import scala.concurrent.duration.FiniteDuration @@ -35,7 +36,7 @@ final class GraphAnalyticsSink( override val chunkSize: Int, override val maxWindow: FiniteDuration, index: IndexLabel -) extends Sink { +) extends SinkCatsEffect { implicit private val kamonComponent: KamonMetricComponent = KamonMetricComponent("graph-analytics") @@ -71,7 +72,7 @@ final class GraphAnalyticsSink( private def documentId[A](elem: Elem[A]) = elem.id.toString // TODO: depends on Operation.Sink in the sourcing module being moved to CE - override def apply(elements: Chunk[Elem[GraphAnalyticsResult]]): Task[Chunk[Elem[Unit]]] = { + override def apply(elements: Chunk[Elem[GraphAnalyticsResult]]): IO[Chunk[Elem[Unit]]] = { val result = elements.foldLeft(GraphAnalyticsSink.empty) { case (acc, success: SuccessElem[GraphAnalyticsResult]) => success.value match { @@ -86,8 +87,10 @@ final class GraphAnalyticsSink( case (acc, _: FailedElem) => acc } - client.bulk(result.bulk, Refresh.True).map(ElasticSearchSink.markElems(_, elements, documentId)) <* - client.updateByQuery(relationshipsQuery(result.updates), Set(index.value)) + toCatsIO(for { + elems <- client.bulk(result.bulk, Refresh.True).map(ElasticSearchSink.markElems(_, elements, documentId)) + _ <- client.updateByQuery(relationshipsQuery(result.updates), Set(index.value)) + } yield elems) }.span("graphAnalyticsSink") } diff --git a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsStream.scala b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsStream.scala index 1ce3998c09..49476c0f9a 100644 --- a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsStream.scala +++ b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsStream.scala @@ -12,7 +12,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.ResourceState import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, EntityType, ProjectRef, Tag} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStreamCats, EntityType, ProjectRef, Tag} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.query.{SelectFilter, StreamingQuery} import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ @@ -30,7 +30,7 @@ trait GraphAnalyticsStream { * @param start * the offset to start with */ - def apply(project: ProjectRef, start: Offset): ElemStream[GraphAnalyticsResult] + def apply(project: ProjectRef, start: Offset): ElemStreamCats[GraphAnalyticsResult] } @@ -102,7 +102,9 @@ object GraphAnalyticsStream { case _ => IO.pure(Noop) } - StreamingQuery.elems(project, start, SelectFilter.latest, qc, xas, (a, b) => ioToTaskK.apply(decode(a, b))) + StreamingQuery + .elems(project, start, SelectFilter.latest, qc, xas, (a, b) => ioToTaskK.apply(decode(a, b))) + .translate(taskToIoK) } // $COVERAGE-ON$ diff --git a/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsSinkSuite.scala b/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsSinkSuite.scala index 13d9fbddf9..ab0fa173e9 100644 --- a/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsSinkSuite.scala +++ b/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsSinkSuite.scala @@ -1,7 +1,8 @@ package ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.indexing import cats.effect.IO -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toCatsIO +import ch.epfl.bluebrain.nexus.delta.kernel.utils.CatsEffectsClasspathResourceUtils.ioJsonContentOf import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchClientSetup import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.IndexLabel import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.indexing.GraphAnalyticsResult.Index @@ -16,8 +17,9 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Anonymous import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{FailedElem, SuccessElem} +import ch.epfl.bluebrain.nexus.testkit.CirceLiteral import ch.epfl.bluebrain.nexus.testkit.bio.{BioSuite, PatienceConfig} -import ch.epfl.bluebrain.nexus.testkit.{CirceLiteral, TestHelpers} +import ch.epfl.bluebrain.nexus.testkit.ce.CatsEffectSuite import fs2.Chunk import io.circe.Json import munit.AnyFixture @@ -27,9 +29,10 @@ import scala.concurrent.duration._ class GraphAnalyticsSinkSuite extends BioSuite + with CatsEffectSuite with ElasticSearchClientSetup.Fixture - with CirceLiteral - with TestHelpers { + with CirceLiteral { + override val ioTimeout: FiniteDuration = 45.seconds implicit private val patienceConfig: PatienceConfig = PatienceConfig(5.seconds, 50.millis) @@ -89,9 +92,9 @@ class GraphAnalyticsSinkSuite test("Create the update script and the index") { for { script <- scriptContent - _ <- client.createScript(updateRelationshipsScriptId, script) + _ <- toCatsIO(client.createScript(updateRelationshipsScriptId, script)) mapping <- graphAnalyticsMappings - _ <- client.createIndex(index, Some(mapping), None).assert(true) + _ <- toCatsIO(client.createIndex(index, Some(mapping), None)).assert } yield () } @@ -124,12 +127,12 @@ class GraphAnalyticsSinkSuite deprecated = indexDeprecated(deprecatedResource, deprecatedResourceTypes) chunk = Chunk.seq(List(active1, active2, discarded, deprecated)) // We expect no error - _ <- sink(chunk).assert(chunk.map(_.void)) + _ <- sink(chunk).assertEquals(chunk.map(_.void)) // 3 documents should have been indexed correctly: // - `resource1` with the relationship to `resource3` resolved // - `resource2` with no reference resolved // - `deprecatedResource` with only metadata, resolution is skipped - _ <- client.count(index.value).eventually(3L) + _ <- toCatsIO(client.count(index.value).eventually(3L)) expected1 <- ioJsonContentOf("result/resource1.json") expected2 <- ioJsonContentOf("result/resource2.json") expectedDeprecated <- ioJsonContentOf("result/resource_deprecated.json") @@ -158,15 +161,15 @@ class GraphAnalyticsSinkSuite ) for { - _ <- sink(chunk).assert(chunk.map(_.void)) + _ <- sink(chunk).assertEquals(chunk.map(_.void)) // The reference to file1 should have been resolved and introduced as a relationship // The update query should not have an effect on the other resource - _ <- client.refresh(index) + _ <- toCatsIO(client.refresh(index)) expected1 <- ioJsonContentOf("result/resource1_updated.json") expected2 <- ioJsonContentOf("result/resource2.json") - _ <- client.count(index.value).eventually(3L) - _ <- client.getSource[Json](index, resource1.toString).eventually(expected1) - _ <- client.getSource[Json](index, resource2.toString).eventually(expected2) + _ <- toCatsIO(client.count(index.value).eventually(3L)) + _ <- toCatsIO(client.getSource[Json](index, resource1.toString).eventually(expected1)) + _ <- toCatsIO(client.getSource[Json](index, resource2.toString).eventually(expected2)) } yield () } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/package.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/package.scala index 86be79f268..d350860029 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/package.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/package.scala @@ -12,7 +12,9 @@ package object model { type ElemStream[Value] = Stream[Task, Elem[Value]] - type ElemPipe[In, Out] = Pipe[Task, Elem[In], Elem[Out]] + type ElemPipe[In, Out] = ElemPipeF[Task, In, Out] + + type ElemPipeF[F[_], In, Out] = Pipe[F, Elem[In], Elem[Out]] type ElemStreamCats[Value] = Stream[IO, Elem[Value]] diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/CompiledProjection.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/CompiledProjection.scala index 505e820f30..57d2162c59 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/CompiledProjection.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/CompiledProjection.scala @@ -4,6 +4,7 @@ import cats.data.NonEmptyChain import cats.effect.concurrent.Ref import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.OperationF.SinkF import fs2.Stream import fs2.concurrent.SignallingRef import monix.bio.Task @@ -51,7 +52,7 @@ object CompiledProjection { metadata: ProjectionMetadata, executionStrategy: ExecutionStrategy, source: Source, - sink: Sink + sink: SinkF[Task] ): Either[ProjectionErr, CompiledProjection] = source.through(sink).map { p => CompiledProjection(metadata, executionStrategy, offset => _ => _ => p.apply(offset).map(_.void)) @@ -68,7 +69,7 @@ object CompiledProjection { sink: Sink ): Either[ProjectionErr, CompiledProjection] = for { - operations <- Operation.merge(chain ++ NonEmptyChain.one(sink)) + operations <- OperationF.merge(chain ++ NonEmptyChain.one(sink)) result <- source.through(operations) } yield CompiledProjection(metadata, executionStrategy, offset => _ => _ => result.apply(offset).map(_.void)) diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Operation.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Operation.scala index be69673cad..9a1177baf7 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Operation.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Operation.scala @@ -1,24 +1,63 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.stream import cats.data.NonEmptyChain +import cats.effect.{Concurrent, ContextShift, ExitCase, Fiber, IO, Timer} import cats.syntax.all._ -import ch.epfl.bluebrain.nexus.delta.sourcing.model.ElemPipe +import cats.~> +import ch.epfl.bluebrain.nexus.delta.sourcing.model.ElemPipeF import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem} +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.OperationF.{PipeF, SinkF} import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionErr.{LeapingNotAllowedErr, OperationInOutMatchErr} import com.typesafe.scalalogging.Logger import fs2.{Chunk, Pipe, Pull, Stream} import monix.bio.Task import shapeless.Typeable +import scala.concurrent.ExecutionContext import scala.concurrent.duration.FiniteDuration -/** - * Operations represent individual steps in a [[Projection]] where [[Elem]] values are processed. - * - * They are ultimately chained and attached to sources to complete the underlying projection [[Stream]] that is run. - */ -sealed trait Operation { self => +sealed trait OperationF[F[_]] { self => + + implicit def timer: Timer[F] + implicit def concurrent: Concurrent[F] + +// def mapK[G[_]](f: F ~> G, g: G ~> F): OperationF[G] = new OperationF[G] { +// override implicit def timer: Timer[G] = self.timer.mapK(f) +// +// override implicit def concurrent: Concurrent[G] = new Concurrent[G] { +// override def start[A](fa: G[A]): G[Fiber[G, A]] = ??? +// +// override def racePair[A, B](fa: G[A], fb: G[B]): G[Either[(A, Fiber[G, B]), (Fiber[G, A], B)]] = ??? +// +// override def async[A](k: (Either[Throwable, A] => Unit) => Unit): G[A] = ??? +// +// override def asyncF[A](k: (Either[Throwable, A] => Unit) => G[Unit]): G[A] = ??? +// +// override def suspend[A](thunk: => G[A]): G[A] = ??? +// +// override def bracketCase[A, B](acquire: G[A])(use: A => G[B])(release: (A, ExitCase[Throwable]) => G[Unit]): G[B] = ??? +// +// override def raiseError[A](e: Throwable): G[A] = ??? +// +// override def handleErrorWith[A](fa: G[A])(f: Throwable => G[A]): G[A] = ??? +// +// override def flatMap[A, B](fa: G[A])(f: A => G[B]): G[B] = ??? +// +// override def tailRecM[A, B](a: A)(f: A => G[Either[A, B]]): G[B] = ??? +// +// override def pure[A](x: A): G[A] = ??? +// } +// +// override type In = self.In +// override type Out = self.Out +// +// override def inType: Typeable[In] = self.inType +// override def outType: Typeable[Out] = self.outType +// +// override protected[stream] def asFs2: Pipe[G, Elem[In], Elem[Out]] = +// inputStream => self.asFs2(inputStream.translate(g)).translate(f) +// } /** * The underlying element type accepted by the Operation. @@ -48,18 +87,21 @@ sealed trait Operation { self => */ def outType: Typeable[Out] - protected[stream] def asFs2: fs2.Pipe[Task, Elem[In], Elem[Out]] + protected[stream] def asFs2: fs2.Pipe[F, Elem[In], Elem[Out]] - private[stream] def andThen(that: Operation): Either[OperationInOutMatchErr, Operation] = + private[stream] def andThen(that: OperationF[F]): Either[OperationInOutMatchErr[F], OperationF[F]] = Either.cond( self.outType.describe == that.inType.describe, - new Operation { + new OperationF[F] { + implicit val timer: Timer[F] = self.timer + implicit val concurrent: Concurrent[F] = self.concurrent + override type In = self.In override type Out = that.Out override def inType: Typeable[self.In] = self.inType override def outType: Typeable[that.Out] = that.outType - override protected[stream] def asFs2: Pipe[Task, Elem[Operation.this.In], Elem[that.Out]] = { stream => + override protected[stream] def asFs2: Pipe[F, Elem[OperationF.this.In], Elem[that.Out]] = { stream => stream .through(self.asFs2) .map { @@ -74,7 +116,10 @@ sealed trait Operation { self => /** * Send the values through the operation while returning original values */ - def tap: Operation = new Operation { + def tap: OperationF[F] = new OperationF[F] { + implicit val timer: Timer[F] = self.timer + implicit val concurrent: Concurrent[F] = self.concurrent + override type In = self.In override type Out = self.In @@ -82,7 +127,7 @@ sealed trait Operation { self => override def outType: Typeable[Out] = self.inType - override protected[stream] def asFs2: Pipe[Task, Elem[Operation.this.In], Elem[this.Out]] = + override protected[stream] def asFs2: Pipe[F, Elem[OperationF.this.In], Elem[this.Out]] = _.chunks .evalTap { chunk => Stream.chunk(chunk).through(self.asFs2).compile.drain @@ -98,7 +143,10 @@ sealed trait Operation { self => def debug( formatter: Elem[Out] => String = (elem: Elem[Out]) => elem.toString, logger: String => Unit = println(_) - ): Operation = new Operation { + ): OperationF[F] = new OperationF[F] { + implicit val timer: Timer[F] = self.timer + implicit val concurrent: Concurrent[F] = self.concurrent + override type In = self.In override type Out = self.Out @@ -106,7 +154,7 @@ sealed trait Operation { self => override def outType: Typeable[Out] = self.outType - override protected[stream] def asFs2: Pipe[Task, Elem[Operation.this.In], Elem[this.Out]] = + override protected[stream] def asFs2: Pipe[F, Elem[OperationF.this.In], Elem[this.Out]] = _.through(self.asFs2).debug(formatter, logger) } @@ -118,8 +166,11 @@ sealed trait Operation { self => * @param mapSkip * the function to apply when skipping an element */ - def leap[A](offset: Offset, mapSkip: self.In => A)(implicit ta: Typeable[A]): Either[ProjectionErr, Operation] = { - val pipe = new Operation { + def leap[A](offset: Offset, mapSkip: self.In => A)(implicit ta: Typeable[A]): Either[ProjectionErr, OperationF[F]] = { + val pipe = new OperationF[F] { + implicit val timer: Timer[F] = self.timer + implicit val concurrent: Concurrent[F] = self.concurrent + override type In = self.In override type Out = self.Out @@ -129,8 +180,8 @@ sealed trait Operation { self => override def outType: Typeable[Out] = self.outType - override protected[stream] def asFs2: Pipe[Task, Elem[Operation.this.In], Elem[this.Out]] = { - def go(s: fs2.Stream[Task, Elem[In]]): Pull[Task, Elem[this.Out], Unit] = { + override protected[stream] def asFs2: Pipe[F, Elem[OperationF.this.In], Elem[this.Out]] = { + def go(s: fs2.Stream[F, Elem[In]]): Pull[F, Elem[this.Out], Unit] = { s.pull.peek.flatMap { case Some((chunk, stream)) => val (before, after) = chunk.partitionEither { e => @@ -154,7 +205,7 @@ sealed trait Operation { self => Either.cond( ta.describe == self.outType.describe, pipe, - LeapingNotAllowedErr(self, ta) + LeapingNotAllowedErr[F, A](self, ta) ) } @@ -163,30 +214,40 @@ sealed trait Operation { self => * @param offset * the offset to reach before applying the operation */ - def identityLeap(offset: Offset): Either[ProjectionErr, Operation] = leap(offset, identity[self.In])(inType) + def identityLeap(offset: Offset): Either[ProjectionErr, OperationF[F]] = leap(offset, identity[self.In])(inType) } -object Operation { +/** + * Operations represent individual steps in a [[Projection]] where [[Elem]] values are processed. + * + * They are ultimately chained and attached to sources to complete the underlying projection [[Stream]] that is run. + */ +object OperationF { /** * Creates an operation from an fs2 Pipe * @param elemPipe * fs2 pipe */ - def fromFs2Pipe[I: Typeable](elemPipe: ElemPipe[I, Unit]): Operation = new Operation { + def fromFs2Pipe[F[_], I: Typeable]( + elemPipe: ElemPipeF[F, I, Unit] + )(implicit t: Timer[F], c: Concurrent[F]): OperationF[F] = new OperationF[F] { + implicit val timer: Timer[F] = t + implicit val concurrent: Concurrent[F] = c + override type In = I override type Out = Unit override def inType: Typeable[In] = Typeable[In] override def outType: Typeable[Out] = Typeable[Out] - override protected[stream] def asFs2: fs2.Pipe[Task, Elem[In], Elem[Out]] = elemPipe + override protected[stream] def asFs2: fs2.Pipe[F, Elem[In], Elem[Out]] = elemPipe } - def merge(first: Operation, others: Operation*): Either[ProjectionErr, Operation] = + def merge[F[_]](first: OperationF[F], others: OperationF[F]*): Either[ProjectionErr, OperationF[F]] = merge(NonEmptyChain(first, others: _*)) - def merge(operations: NonEmptyChain[Operation]): Either[ProjectionErr, Operation] = - operations.tail.foldLeftM[Either[ProjectionErr, *], Operation](operations.head) { case (acc, e) => + def merge[F[_]](operations: NonEmptyChain[OperationF[F]]): Either[ProjectionErr, OperationF[F]] = + operations.tail.foldLeftM[Either[ProjectionErr, *], OperationF[F]](operations.head) { case (acc, e) => acc.andThen(e) } @@ -200,7 +261,7 @@ object Operation { * * They are ultimately chained and attached to sources to complete the underlying projection [[Stream]] that is run. */ - trait Pipe extends Operation { + trait PipeF[F[_]] extends OperationF[F] { /** * @return @@ -218,7 +279,7 @@ object Operation { * @return * a new element (possibly failed, dropped) of type Out */ - def apply(element: SuccessElem[In]): Task[Elem[Out]] + def apply(element: SuccessElem[In]): F[Elem[Out]] /** * Checks if the provided envelope has a successful element value of type `I`. If true, it will return it in Right. @@ -233,8 +294,8 @@ object Operation { case _: FailedElem | _: DroppedElem => Left(element.asInstanceOf[Elem[O]]) } - protected[stream] def asFs2: fs2.Pipe[Task, Elem[In], Elem[Out]] = { - def go(s: fs2.Stream[Task, Elem[In]]): Pull[Task, Elem[Out], Unit] = { + protected[stream] def asFs2: fs2.Pipe[F, Elem[In], Elem[Out]] = { + def go(s: fs2.Stream[F, Elem[In]]): Pull[F, Elem[Out], Unit] = { s.pull.uncons1.flatMap { case Some((head, tail)) => partitionSuccess(head) match { @@ -242,8 +303,8 @@ object Operation { Pull .eval( apply(value) - .onErrorHandleWith { err => - Task + .handleErrorWith { err => + Concurrent[F] .delay( logger.error(s"Error while applying pipe $name on element ${value.id}", err) ) @@ -261,12 +322,15 @@ object Operation { } } - object Pipe { + object PipeF { /** * Create an identity pipe that just pass along the elem */ - def identity[A: Typeable]: Pipe = new Pipe { + def identity[F[_], A: Typeable](implicit t: Timer[F], c: Concurrent[F]): PipeF[F] = new PipeF[F] { + implicit val timer: Timer[F] = t + implicit val concurrent: Concurrent[F] = c + override def ref: PipeRef = PipeRef.unsafe("identity") override type In = A @@ -275,23 +339,69 @@ object Operation { override def inType: Typeable[In] = Typeable[In] override def outType: Typeable[Out] = Typeable[Out] - override def apply(element: SuccessElem[In]): Task[Elem[Out]] = Task.pure(element) + override def apply(element: SuccessElem[In]): F[Elem[Out]] = c.pure(element) } } - trait Sink extends Operation { + trait SinkF[F[_]] extends OperationF[F] { self => type Out = Unit + + def mapK[G[_]](f: F ~> G): SinkF[G] = new SinkF[G] { + override def chunkSize: Int = self.chunkSize + + override def maxWindow: FiniteDuration = self.maxWindow + + override def apply(elements: Chunk[Elem[In]]): G[Chunk[Elem[Unit]]] = + f(self.apply(elements)) + + implicit override def timer: Timer[G] = self.timer.mapK(f) + + implicit override def concurrent: Concurrent[G] = new Concurrent[G] { + override def start[A](fa: G[A]): G[Fiber[G, A]] = ??? + + override def racePair[A, B](fa: G[A], fb: G[B]): G[Either[(A, Fiber[G, B]), (Fiber[G, A], B)]] = ??? + + override def async[A](k: (Either[Throwable, A] => Unit) => Unit): G[A] = ??? + + override def asyncF[A](k: (Either[Throwable, A] => Unit) => G[Unit]): G[A] = ??? + + override def suspend[A](thunk: => G[A]): G[A] = ??? + + override def bracketCase[A, B](acquire: G[A])(use: A => G[B])( + release: (A, ExitCase[Throwable]) => G[Unit] + ): G[B] = ??? + + override def raiseError[A](e: Throwable): G[A] = ??? + + override def handleErrorWith[A](fa: G[A])(f: Throwable => G[A]): G[A] = ??? + + override def flatMap[A, B](fa: G[A])(f: A => G[B]): G[B] = ??? + + override def tailRecM[A, B](a: A)(f: A => G[Either[A, B]]): G[B] = ??? + + override def pure[A](x: A): G[A] = ??? + } + + override type In = self.In + + /** + * @return + * the Typeable instance for the accepted element type + */ + override def inType: Typeable[In] = self.inType + } + def outType: Typeable[Out] = Typeable[Unit] def chunkSize: Int def maxWindow: FiniteDuration - def apply(elements: Chunk[Elem[In]]): Task[Chunk[Elem[Unit]]] + def apply(elements: Chunk[Elem[In]]): F[Chunk[Elem[Unit]]] - protected[stream] def asFs2: fs2.Pipe[Task, Elem[In], Elem[Unit]] = + protected[stream] def asFs2: fs2.Pipe[F, Elem[In], Elem[Unit]] = _.groupWithin(chunkSize, maxWindow) .evalMap { chunk => apply(chunk) @@ -299,8 +409,38 @@ object Operation { .flatMap(Stream.chunk) } - type Aux[I, O] = Operation { + type Aux[F[_], I, O] = OperationF[F] { type In = I type Out = O } } + +object Operation { + + trait SinkTask extends SinkF[Task] { + implicit val timer: Timer[Task] = monix.bio.IO.timer + implicit val concurrent: Concurrent[Task] = monix.bio.IO.catsAsync + } + + type Sink = SinkTask + + trait PipeTask extends PipeF[Task] { + implicit val timer: Timer[Task] = monix.bio.IO.timer + implicit val concurrent: Concurrent[Task] = monix.bio.IO.catsAsync + } + + type Pipe = PipeTask + + trait SinkCatsEffect extends SinkF[IO] { + implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global) + implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) + implicit val concurrent: Concurrent[IO] = IO.ioConcurrentEffect + } + + trait PipeCatsEffect extends PipeF[IO] { + implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global) + implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) + implicit val concurrent: Concurrent[IO] = IO.ioConcurrentEffect + } + +} diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/PipeChain.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/PipeChain.scala index 0dfb7fde51..572b1f852f 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/PipeChain.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/PipeChain.scala @@ -36,7 +36,7 @@ object PipeChain { configured <- pipeChain.pipes.traverse { case (ref, cfg) => registry.lookup(ref).flatMap(_.withJsonLdConfig(cfg)) } - chained <- Operation.merge(configured) + chained <- OperationF.merge(configured) } yield chained def validate(pipeChain: PipeChain, registry: ReferenceRegistry): Either[ProjectionErr, Unit] = diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/PipeDefCats.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/PipeDefCats.scala new file mode 100644 index 0000000000..88a642669a --- /dev/null +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/PipeDefCats.scala @@ -0,0 +1,65 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing.stream + +import cats.implicits._ +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.ExpandedJsonLd +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.decoder.JsonLdDecoder +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.PipeCatsEffect +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionErr.CouldNotDecodePipeConfigErr +import shapeless.Typeable + +/** + * Contract for definition for pipes. PipeDefs are used to construct [[PipeCatsEffect]] instances to be used when + * materializing runnable projections. They are indexed by their label in a registry such they can be looked up given a + * [[PipeRef]]. Provided with a configuration, pipe definitions can produce [[PipeCatsEffect]] instances. + */ +trait PipeDefCats { + + /** + * The type of the [[PipeCatsEffect]] that this definition produces. + */ + type PipeType <: PipeCatsEffect + + /** + * The required configuration type for producing a [[PipeCatsEffect]] of this type. + */ + type Config + + /** + * @return + * the Typeable instance for the required [[PipeCatsEffect]] configuration + */ + def configType: Typeable[Config] + + /** + * @return + * a json-ld decoder for the [[PipeCatsEffect]] configuration + */ + def configDecoder: JsonLdDecoder[Config] + + /** + * @return + * the unique reference for a pipe of this type + */ + def ref: PipeRef + + /** + * Produces a [[PipeCatsEffect]] instance given an expected configuration. + * + * @param config + * the configuration for the [[PipeDefCats]] + */ + def withConfig(config: Config): PipeType + + /** + * Attempts to construct a corresponding [[PipeCatsEffect]] instance by decoding the required configuration from a + * json-ld configuration. + * + * @param jsonLd + * the source configuration in the json-ld format + */ + def withJsonLdConfig(jsonLd: ExpandedJsonLd): Either[CouldNotDecodePipeConfigErr, PipeType] = + configDecoder(jsonLd) + .map(c => withConfig(c)) + .leftMap(e => CouldNotDecodePipeConfigErr(jsonLd, configType.describe, ref, e.reason)) + +} diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectionErr.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectionErr.scala index 53021e3f6b..1e70b521bc 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectionErr.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectionErr.scala @@ -28,7 +28,7 @@ object ProjectionErr { * @param that * the source being merged with self */ - final case class SourceOutMatchErr(self: Source, that: Source) extends ProjectionErr { + final case class SourceOutMatchErr[F[_]](self: SourceF[F], that: SourceF[F]) extends ProjectionErr { override def reason: String = s"Unable to match Out type '${self.outType.describe}' of source '${self.name}' to the Out type '${that.outType.describe}' of source '${that.name}'" } @@ -41,7 +41,7 @@ object ProjectionErr { * @param that * the operation to attach to the source */ - final case class SourceOutPipeInMatchErr(self: Source, that: Operation) extends ProjectionErr { + final case class SourceOutPipeInMatchErr[F[_]](self: SourceF[F], that: OperationF[F]) extends ProjectionErr { override def reason: String = s"Unable to match Out type '${self.outType.describe}' of source '${self.name}' to the In type '${that.inType.describe}' of pipe '${that.name}'" } @@ -76,7 +76,7 @@ object ProjectionErr { * @param that * the operation being merged with self */ - final case class OperationInOutMatchErr(self: Operation, that: Operation) extends ProjectionErr { + final case class OperationInOutMatchErr[F[_]](self: OperationF[F], that: OperationF[F]) extends ProjectionErr { override def reason: String = s"Unable to match Out type '${self.outType.describe}' of operation '${self.name}' to the In type '${that.inType.describe}' of operation '${that.name}'" } @@ -84,7 +84,7 @@ object ProjectionErr { /** * Leaping is only possible for an operation when we provide a skip function that aligns to the out type */ - final case class LeapingNotAllowedErr[A](self: Operation, skip: Typeable[A]) extends ProjectionErr { + final case class LeapingNotAllowedErr[F[_], A](self: OperationF[F], skip: Typeable[A]) extends ProjectionErr { override def reason: String = s"Unable to leap on operation '${self.name}' as skip type '${skip.describe}' does not match Out type '${self.outType.describe}'." } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Source.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Source.scala index a65b289121..2a17835b77 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Source.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Source.scala @@ -1,6 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.stream import cats.data.NonEmptyChain +import cats.effect.Concurrent import cats.implicits._ import ch.epfl.bluebrain.nexus.delta.sourcing.model.ElemStream import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset @@ -18,7 +19,9 @@ import shapeless.Typeable * A [[Projection]] may make use of multiple [[Source]] s (at least one) that will be further chained with * [[Operation]] s and ultimately merged together. */ -trait Source { self => +trait SourceF[F[_]] { self => + + implicit def concurrent: Concurrent[F] /** * The underlying element type emitted by the Source. @@ -46,16 +49,17 @@ trait Source { self => * @return * an [[fs2.Stream]] of elements of this Source's Out type */ - def apply(offset: Offset): Stream[Task, Elem[Out]] + def apply(offset: Offset): Stream[F, Elem[Out]] - def through(operation: Operation): Either[SourceOutPipeInMatchErr, Source] = + def through(operation: OperationF[F]): Either[SourceOutPipeInMatchErr[F], SourceF[F]] = Either.cond( outType.describe == operation.inType.describe, - new Source { + new SourceF[F] { + implicit def concurrent: Concurrent[F] = self.concurrent override type Out = operation.Out override def outType: Typeable[operation.Out] = operation.outType - override def apply(offset: Offset): Stream[Task, Elem[operation.Out]] = + override def apply(offset: Offset): Stream[F, Elem[operation.Out]] = self .apply(offset) .map { @@ -71,14 +75,15 @@ trait Source { self => SourceOutPipeInMatchErr(self, operation) ) - private[stream] def merge(that: Source): Either[SourceOutMatchErr, Source] = + private[stream] def merge(that: SourceF[F]): Either[SourceOutMatchErr[F], SourceF[F]] = Either.cond( self.outType.describe == that.outType.describe, - new Source { + new SourceF[F] { + implicit def concurrent: Concurrent[F] = self.concurrent override type Out = self.Out override def outType: Typeable[self.Out] = self.outType - override def apply(offset: Offset): Stream[Task, Elem[Out]] = + override def apply(offset: Offset): Stream[F, Elem[Out]] = self .apply(offset) .merge(that.apply(offset).map { @@ -94,22 +99,23 @@ trait Source { self => ) def broadcastThrough( - operations: NonEmptyChain[Operation] - ): Either[SourceOutPipeInMatchErr, Source.Aux[Unit]] = + operations: NonEmptyChain[OperationF[F]] + ): Either[SourceOutPipeInMatchErr[F], SourceF.Aux[F, Unit]] = operations .traverse { operation => Either.cond( self.outType.describe == operation.inType.describe, - operation.asInstanceOf[Operation.Aux[self.Out, Unit]], + operation.asInstanceOf[OperationF.Aux[F, self.Out, Unit]], SourceOutPipeInMatchErr(self, operation) ) } .map { verified => - new Source { + new SourceF[F] { + implicit def concurrent: Concurrent[F] = self.concurrent override type Out = Unit override def outType: Typeable[Unit] = Typeable[Unit] - override def apply(offset: Offset): Stream[Task, Elem[Unit]] = + override def apply(offset: Offset): Stream[F, Elem[Unit]] = self .apply(offset) .broadcastThrough(verified.toList.map { _.asFs2 }: _*) @@ -118,6 +124,24 @@ trait Source { self => } +object SourceF { + + type Aux[F[_], O] = SourceF[F] { + type Out = O + } + + def apply[F[_], A: Typeable](stream: Offset => Stream[F, Elem[A]])(implicit c: Concurrent[F]): SourceF[F] = + new SourceF[F] { + override type Out = A + + override def outType: Typeable[A] = Typeable[A] + + override def apply(offset: Offset): Stream[F, Elem[A]] = stream(offset) + + implicit override def concurrent: Concurrent[F] = c + } +} + object Source { type Aux[O] = Source { @@ -131,5 +155,7 @@ object Source { override def outType: Typeable[A] = Typeable[A] override def apply(offset: Offset): Stream[Task, Elem[A]] = stream(offset) + + implicit override def concurrent: Concurrent[Task] = monix.bio.IO.catsAsync } } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/package.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/package.scala new file mode 100644 index 0000000000..2133e6895a --- /dev/null +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/package.scala @@ -0,0 +1,12 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing + +import cats.effect.IO +import monix.bio.Task + +package object stream { + type Operation = OperationF[Task] + + type Source = SourceF[Task] + + type SourceCatsEffect = SourceF[IO] +} diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/GenericPipeCats.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/GenericPipeCats.scala new file mode 100644 index 0000000000..7bbcb073ef --- /dev/null +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/GenericPipeCats.scala @@ -0,0 +1,64 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes + +import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.decoder.JsonLdDecoder +import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.SuccessElem +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.PipeCatsEffect +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{Elem, PipeDefCats, PipeRef} +import shapeless.Typeable + +/** + * A generic pipe instance backed by a fn. + * @param label + * the pipe label + * @param fn + * the fn to apply to each element + * @tparam I + * the input element type + * @tparam O + * the output element type + */ +class GenericPipeCats[I: Typeable, O: Typeable] private[stream] ( + label: Label, + fn: SuccessElem[I] => IO[Elem[O]] +) extends PipeCatsEffect { + override val ref: PipeRef = PipeRef(label) + override type In = I + override type Out = O + override def inType: Typeable[In] = Typeable[In] + override def outType: Typeable[Out] = Typeable[Out] + + override def apply(element: SuccessElem[In]): IO[Elem[Out]] = fn(element) +} + +object GenericPipeCats { + + /** + * Lifts the argument fn to a pipe and associated definition. + * @param label + * the pipe/pipedef label + * @param fn + * the fn to apply to elements + * @tparam I + * the input element type + * @tparam O + * the output element type + */ + def apply[I: Typeable, O: Typeable](label: Label, fn: SuccessElem[I] => IO[Elem[O]]): PipeDefCats = + new GenericPipeDef(label, fn) + + private class GenericPipeDef[I: Typeable, O: Typeable] private[stream] ( + label: Label, + fn: SuccessElem[I] => IO[Elem[O]] + ) extends PipeDefCats { + override val ref: PipeRef = PipeRef(label) + override type PipeType = GenericPipeCats[I, O] + override type Config = Unit + override def configType: Typeable[Unit] = Typeable[Unit] + override def configDecoder: JsonLdDecoder[Unit] = JsonLdDecoder[Unit] + + override def withConfig(config: Unit): GenericPipeCats[I, O] = new GenericPipeCats[I, O](label, fn) + } + +} diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/CacheSink.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/CacheSink.scala index 3d96e75d85..7abcbe5e95 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/CacheSink.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/CacheSink.scala @@ -1,19 +1,49 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.stream +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.syntax.iriStringContextSyntax import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem} -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.{Sink, SinkCatsEffect} import fs2.Chunk import monix.bio.Task import shapeless.Typeable -import collection.mutable.{Set => MutableSet} import scala.collection.concurrent.TrieMap import scala.collection.mutable +import scala.collection.mutable.{Set => MutableSet} import scala.concurrent.duration._ -final class CacheSink[A: Typeable] private (documentId: Elem[A] => Iri) extends Sink { +final class CacheSink[A: Typeable] private (documentId: Elem[A] => Iri) extends SinkCatsEffect { + + val successes: mutable.Map[Iri, A] = TrieMap.empty[Iri, A] + val dropped: MutableSet[Iri] = MutableSet.empty[Iri] + val failed: MutableSet[Iri] = MutableSet.empty[Iri] + + override type In = A + + override def inType: Typeable[A] = Typeable[A] + + override def apply(elements: Chunk[Elem[A]]): IO[Chunk[Elem[Unit]]] = IO.delay { + elements.map { + case s: SuccessElem[A] => + successes.put(documentId(s), s.value) + s.void + case d: DroppedElem => + dropped.add(documentId(d)) + d + case f: FailedElem => + failed.add(documentId(f)) + f + } + } + + override def chunkSize: Int = 1 + + override def maxWindow: FiniteDuration = 10.millis +} + +final class CacheSink2[A: Typeable] private (documentId: Elem[A] => Iri) extends Sink { val successes: mutable.Map[Iri, A] = TrieMap.empty[Iri, A] val dropped: MutableSet[Iri] = MutableSet.empty[Iri] @@ -55,3 +85,17 @@ object CacheSink { /** CacheSink for states */ def states[A: Typeable]: CacheSink[A] = new CacheSink(_.id) } + +object CacheSink2 { + + private val eventDocumentId: Elem[_] => Iri = elem => + elem.project match { + case Some(project) => iri"$project/${elem.id}:${elem.rev}" + case None => iri"${elem.id}:${elem.rev}" + } + + /** CacheSink for events */ + def events[A: Typeable]: CacheSink2[A] = new CacheSink2(eventDocumentId) + + def states[A: Typeable]: CacheSink2[A] = new CacheSink2(_.id) +} diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/OperationSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/OperationSuite.scala index f3935e7d6a..c19e576eae 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/OperationSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/OperationSuite.scala @@ -1,25 +1,26 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.stream +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.SuccessElem -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Pipe +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.PipeCatsEffect import ch.epfl.bluebrain.nexus.delta.sourcing.stream.OperationSuite.{double, half, until} import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionErr.{LeapingNotAllowedErr, OperationInOutMatchErr} -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes.GenericPipe -import ch.epfl.bluebrain.nexus.testkit.bio.{BioSuite, StreamAssertions} +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes.GenericPipeCats +import ch.epfl.bluebrain.nexus.testkit.bio.StreamAssertions +import ch.epfl.bluebrain.nexus.testkit.ce.{CatsEffectSuite, CatsIOValues} import fs2.Stream -import monix.bio.Task import shapeless.Typeable import java.time.Instant -class OperationSuite extends BioSuite with StreamAssertions { +class OperationSuite extends CatsEffectSuite with StreamAssertions { test("Run the double stream") { val sink = CacheSink.states[Int] - val operations = Operation.merge(double, sink).rightValue + val operations = OperationF.merge[IO](double, sink).rightValue for { _ <- until(5).through(operations).rightValue.apply(Offset.Start).assertSize(5) @@ -31,7 +32,7 @@ class OperationSuite extends BioSuite with StreamAssertions { test("Run the half stream") { val sink = CacheSink.states[Double] - val operations = Operation.merge(half, sink).rightValue + val operations = OperationF.merge[IO](half, sink).rightValue for { _ <- until(5).through(operations).rightValue.apply(Offset.at(1L)).assertSize(4) @@ -43,7 +44,7 @@ class OperationSuite extends BioSuite with StreamAssertions { test("Fail as the input and output types don't match") { val sink = CacheSink.states[Int] - Operation.merge(half, sink).assertLeft(OperationInOutMatchErr(half, sink)) + OperationF.merge(half, sink).assertLeft(OperationInOutMatchErr(half, sink)) } test("Run the double stream as an tap operation") { @@ -52,8 +53,8 @@ class OperationSuite extends BioSuite with StreamAssertions { // We should have the originals here val sink2 = CacheSink.states[Int] - val tap = Operation.merge(double, sink1).rightValue.tap - val all = Operation.merge(tap, sink2).rightValue + val tap = OperationF.merge(double, sink1).rightValue.tap + val all = OperationF.merge(tap, sink2).rightValue for { _ <- until(5).through(all).rightValue.apply(Offset.Start).assertSize(5) @@ -70,8 +71,8 @@ class OperationSuite extends BioSuite with StreamAssertions { val sink = CacheSink.states[Int] val first = double.identityLeap(Offset.at(2L)).rightValue - val second = Operation.merge(double, sink).rightValue - val all = Operation.merge(first, second).rightValue + val second = OperationF.merge(double, sink).rightValue + val all = OperationF.merge(first, second).rightValue for { _ <- until(5).through(all).rightValue.apply(Offset.Start).assertSize(5) @@ -87,17 +88,18 @@ class OperationSuite extends BioSuite with StreamAssertions { } -object OperationSuite { +object OperationSuite extends CatsIOValues { private val numberType = EntityType("number") - private def until(n: Int): Source = Source(offset => - Stream.range(offset.value.toInt, n).covary[Task].map { i => + private def until(n: Int): SourceCatsEffect = SourceF(offset => + Stream.range(offset.value.toInt, n).covary[IO].map { i => SuccessElem(numberType, nxv + i.toString, None, Instant.EPOCH, Offset.at(i.toLong), i, 1) } ) - val double: Pipe = new GenericPipe[Int, Int](Label.unsafe("double"), _.evalMap { v => Task.pure(v * 2) }) - val half: Pipe = new GenericPipe[Int, Double](Label.unsafe("half"), _.evalMap { v => Task.pure((v.toDouble / 2)) }) + val double: PipeCatsEffect = new GenericPipeCats[Int, Int](Label.unsafe("double"), x => IO.pure(x.map { v => v * 2 })) + val half: PipeCatsEffect = + new GenericPipeCats[Int, Double](Label.unsafe("half"), x => IO.pure(x.map { v => v.toDouble / 2 })) } diff --git a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/bio/StreamAssertions.scala b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/bio/StreamAssertions.scala index 238bdcad73..6fcf000495 100644 --- a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/bio/StreamAssertions.scala +++ b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/bio/StreamAssertions.scala @@ -1,26 +1,31 @@ package ch.epfl.bluebrain.nexus.testkit.bio +import cats.effect.{Concurrent, Timer} +import cats.syntax.all._ import fs2.Stream -import monix.bio.Task import munit.{Assertions, Location} import scala.concurrent.duration.DurationInt trait StreamAssertions { self: Assertions => - implicit class StreamAssertionsOps[A](stream: Stream[Task, A])(implicit loc: Location) { - def assert(expected: List[A]): Task[Unit] = + implicit class StreamAssertionsOps[F[_], A](stream: Stream[F, A])(implicit + loc: Location, + t: Timer[F], + c: Concurrent[F] + ) { + def assert(expected: List[A]): F[Unit] = stream.take(expected.size.toLong).timeout(3.seconds).mask.compile.toList.map { obtained => assertEquals(obtained, expected, s"Got ${obtained.size} elements, ${expected.size} elements were expected.") } - def assertSize(expected: Int): Task[Unit] = + def assertSize(expected: Int): F[Unit] = stream.take(expected.toLong).timeout(3.seconds).mask.compile.toList.map { obtained => assertEquals(obtained.size, expected, s"Got ${obtained.size} elements, $expected elements were expected.") } - def assert(expected: A*): Task[Unit] = assert(expected.toList) - def assertEmpty: Task[Unit] = assert(List.empty) + def assert(expected: A*): F[Unit] = assert(expected.toList) + def assertEmpty: F[Unit] = assert(List.empty) } } diff --git a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsEffectSuite.scala b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsEffectSuite.scala index caa2b963c3..7b2b6dcb84 100644 --- a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsEffectSuite.scala +++ b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsEffectSuite.scala @@ -12,7 +12,7 @@ import scala.concurrent.duration.{DurationInt, FiniteDuration} * Adapted from: * https://github.com/typelevel/munit-cats-effect/blob/main/core/src/main/scala/munit/CatsEffectSuite.scala */ -abstract class CatsEffectSuite +trait CatsEffectSuite extends NexusSuite with CatsEffectAssertions with StreamAssertions