Skip to content

Commit

Permalink
Effect parameterisation shithousery
Browse files Browse the repository at this point in the history
  • Loading branch information
dantb committed Oct 17, 2023
1 parent 9e62293 commit 5d05de9
Show file tree
Hide file tree
Showing 23 changed files with 504 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class IndexingViewDefSuite extends BioSuite {

private val aggregate =
AggregateBlazegraphViewValue(Some("viewName"), Some("viewDescription"), NonEmptySet.of(viewRef))
private val sink = CacheSink.states[NTriples]
private val sink = CacheSink2.states[NTriples]

private def state(v: BlazegraphViewValue) = BlazegraphViewState(
id,
Expand Down Expand Up @@ -156,7 +156,7 @@ class IndexingViewDefSuite extends BioSuite {
for {
compiled <- IndexingViewDef.compile(
v,
_ => Operation.merge(FilterDeprecated.withConfig(()), FilterByType.withConfig(filterByTypeConfig)),
_ => OperationF.merge(FilterDeprecated.withConfig(()), FilterByType.withConfig(filterByTypeConfig)),
GraphResourceStream.unsafeFromStream(PullRequestStream.generate(projectRef)),
sink
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ object CompositeViewDef {
): Either[ProjectionErr, Operation] = {
//TODO Add leap on target
val branchProgress = progress.branches.get(branch)
Operation
OperationF
.merge(operation, closeBranch(branch, branchProgress.getOrElse(ProjectionProgress.NoProgress)))
}

Expand All @@ -519,9 +519,9 @@ object CompositeViewDef {
for {
pipes <- source.pipeChain.traverse(compilePipeChain)
// We apply `Operation.tap` as we want to keep the GraphResource for the rest of the stream
tail <- Operation.merge(GraphResourceToNTriples, sink).map(_.tap)
tail <- OperationF.merge(GraphResourceToNTriples, sink).map(_.tap)
chain = pipes.fold(NonEmptyChain.one(tail))(NonEmptyChain(_, tail))
operation <- Operation.merge(chain)
operation <- OperationF.merge(chain)
// We create the elem stream for the two types of branch
// The main source produces an infinite stream and waits for new elements
mainSource = graphStream.main(source, project)
Expand Down Expand Up @@ -550,7 +550,7 @@ object CompositeViewDef {
for {
pipes <- target.pipeChain.traverse(compilePipeChain)
chain = pipes.fold(tail)(NonEmptyChain.one(_) ++ tail)
result <- Operation.merge(chain)
result <- OperationF.merge(chain)
} yield target.id -> result
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ object CompositeProjections {
branch: CompositeBranch,
progress: ProjectionProgress
): Operation =
Operation.fromFs2Pipe[Unit](
OperationF.fromFs2Pipe[Task, Unit](
Projection.persist(
progress,
compositeProgressStore.save(view.indexingRef, branch, _),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing

import cats.effect.Concurrent
import cats.effect.concurrent.Ref
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.CompositeViewsFixture
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeView.{Interval, RebuildStrategy}
Expand All @@ -11,7 +12,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes.FilterDeprecated
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{NoopSink, RemainingElems, Source}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{NoopSink, RemainingElems, Source, SourceF}
import ch.epfl.bluebrain.nexus.testkit.bio.{BioSuite, PatienceConfig}
import fs2.Stream
import monix.bio.{Task, UIO}
Expand All @@ -27,11 +28,13 @@ class CompositeViewDefSuite extends BioSuite with CompositeViewsFixture {

test("Compile correctly the source") {

def makeSource(nameValue: String): Source = new Source {
def makeSource(nameValue: String): Source = new SourceF[Task] {
override type Out = Unit
override def outType: Typeable[Unit] = Typeable[Unit]
override def apply(offset: Offset): ElemStream[Unit] = Stream.empty[Task]
override def name: String = nameValue

implicit override def concurrent: Concurrent[Task] = monix.bio.IO.catsAsync
}

val graphStream = new CompositeGraphStream {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class IndexingViewDefSuite extends BioSuite with CirceLiteral with Fixtures {
)

private val aggregate = AggregateElasticSearchViewValue(NonEmptySet.of(viewRef))
private val sink = CacheSink.states[Json]
private val sink = CacheSink2.states[Json]

private val indexingRev = IndexingRev.init
private val rev = 2
Expand Down Expand Up @@ -212,7 +212,7 @@ class IndexingViewDefSuite extends BioSuite with CirceLiteral with Fixtures {
for {
compiled <- IndexingViewDef.compile(
v,
_ => Operation.merge(FilterDeprecated.withConfig(()), FilterByType.withConfig(filterByTypeConfig)),
_ => OperationF.merge(FilterDeprecated.withConfig(()), FilterByType.withConfig(filterByTypeConfig)),
GraphResourceStream.unsafeFromStream(PullRequestStream.generate(projectRef)),
sink
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.metrics
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.metrics.MetricsStream._
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.{EventMetricsProjection, Fixtures}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{CacheSink, ProjectionProgress, SupervisorSetup}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{CacheSink2, ProjectionProgress, SupervisorSetup}
import ch.epfl.bluebrain.nexus.testkit.bio.{BioSuite, PatienceConfig}
import io.circe.Json
import io.circe.syntax.EncoderOps
Expand All @@ -20,7 +20,7 @@ class EventMetricsProjectionSuite extends BioSuite with SupervisorSetup.Fixture
implicit private val patienceConfig: PatienceConfig = PatienceConfig(2.seconds, 10.millis)

private lazy val sv = supervisor().supervisor
private val sink = CacheSink.events[Json]
private val sink = CacheSink2.events[Json]

test("Start the metrics projection") {
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics
import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.MigrateEffectSyntax
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient
import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.GraphAnalytics.{index, projectionName}
import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.config.GraphAnalyticsConfig
Expand All @@ -11,9 +13,8 @@ import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.sdk.projects.Projects
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.SinkCatsEffect
import ch.epfl.bluebrain.nexus.delta.sourcing.stream._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration
import fs2.Stream
import org.typelevel.log4cats.{Logger => Log4CatsLogger}

Expand Down Expand Up @@ -46,10 +47,11 @@ object GraphAnalyticsCoordinator {
fetchProjects: Offset => Stream[IO, Elem[ProjectDef]],
analyticsStream: GraphAnalyticsStream,
supervisor: Supervisor,
sink: ProjectRef => Sink,
sink: ProjectRef => SinkCatsEffect,
createIndex: ProjectRef => IO[Unit],
deleteIndex: ProjectRef => IO[Unit]
) extends GraphAnalyticsCoordinator {
) extends GraphAnalyticsCoordinator
with MigrateEffectSyntax {

def run(offset: Offset): Stream[IO, Elem[Unit]] =
fetchProjects(offset).evalMap {
Expand All @@ -63,8 +65,8 @@ object GraphAnalyticsCoordinator {
CompiledProjection.compile(
analyticsMetadata(project),
ExecutionStrategy.PersistentSingleNode,
Source(analyticsStream(project, _)),
sink(project)
Source(analyticsStream(project, _).translate(ioToTaskK)),
sink(project).mapK(ioToTaskK)
)
)
// Start the analysis projection for the given project
Expand Down Expand Up @@ -162,7 +164,7 @@ object GraphAnalyticsCoordinator {
fetchProjects: Offset => Stream[IO, Elem[ProjectDef]],
analyticsStream: GraphAnalyticsStream,
supervisor: Supervisor,
sink: ProjectRef => Sink,
sink: ProjectRef => SinkCatsEffect,
createIndex: ProjectRef => IO[Unit],
deleteIndex: ProjectRef => IO[Unit]
): IO[GraphAnalyticsCoordinator] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.indexing

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toCatsIO
import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent
import ch.epfl.bluebrain.nexus.delta.kernel.syntax.kamonSyntax
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh
Expand All @@ -9,12 +11,11 @@ import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.indexing.GraphAnaly
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.SinkCatsEffect
import fs2.Chunk
import io.circe.JsonObject
import io.circe.literal._
import io.circe.syntax.EncoderOps
import monix.bio.Task
import shapeless.Typeable

import scala.concurrent.duration.FiniteDuration
Expand All @@ -35,7 +36,7 @@ final class GraphAnalyticsSink(
override val chunkSize: Int,
override val maxWindow: FiniteDuration,
index: IndexLabel
) extends Sink {
) extends SinkCatsEffect {

implicit private val kamonComponent: KamonMetricComponent =
KamonMetricComponent("graph-analytics")
Expand Down Expand Up @@ -71,7 +72,7 @@ final class GraphAnalyticsSink(
private def documentId[A](elem: Elem[A]) = elem.id.toString

// TODO: depends on Operation.Sink in the sourcing module being moved to CE
override def apply(elements: Chunk[Elem[GraphAnalyticsResult]]): Task[Chunk[Elem[Unit]]] = {
override def apply(elements: Chunk[Elem[GraphAnalyticsResult]]): IO[Chunk[Elem[Unit]]] = {
val result = elements.foldLeft(GraphAnalyticsSink.empty) {
case (acc, success: SuccessElem[GraphAnalyticsResult]) =>
success.value match {
Expand All @@ -86,8 +87,10 @@ final class GraphAnalyticsSink(
case (acc, _: FailedElem) => acc
}

client.bulk(result.bulk, Refresh.True).map(ElasticSearchSink.markElems(_, elements, documentId)) <*
client.updateByQuery(relationshipsQuery(result.updates), Set(index.value))
toCatsIO(for {
elems <- client.bulk(result.bulk, Refresh.True).map(ElasticSearchSink.markElems(_, elements, documentId))
_ <- client.updateByQuery(relationshipsQuery(result.updates), Set(index.value))
} yield elems)
}.span("graphAnalyticsSink")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.ResourceState
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, EntityType, ProjectRef, Tag}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStreamCats, EntityType, ProjectRef, Tag}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.query.{SelectFilter, StreamingQuery}
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
Expand All @@ -30,7 +30,7 @@ trait GraphAnalyticsStream {
* @param start
* the offset to start with
*/
def apply(project: ProjectRef, start: Offset): ElemStream[GraphAnalyticsResult]
def apply(project: ProjectRef, start: Offset): ElemStreamCats[GraphAnalyticsResult]

}

Expand Down Expand Up @@ -102,7 +102,9 @@ object GraphAnalyticsStream {
case _ => IO.pure(Noop)
}

StreamingQuery.elems(project, start, SelectFilter.latest, qc, xas, (a, b) => ioToTaskK.apply(decode(a, b)))
StreamingQuery
.elems(project, start, SelectFilter.latest, qc, xas, (a, b) => ioToTaskK.apply(decode(a, b)))
.translate(taskToIoK)
}
// $COVERAGE-ON$

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.indexing

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toCatsIO
import ch.epfl.bluebrain.nexus.delta.kernel.utils.CatsEffectsClasspathResourceUtils.ioJsonContentOf
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchClientSetup
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.IndexLabel
import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.indexing.GraphAnalyticsResult.Index
Expand All @@ -16,8 +17,9 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Anonymous
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{FailedElem, SuccessElem}
import ch.epfl.bluebrain.nexus.testkit.CirceLiteral
import ch.epfl.bluebrain.nexus.testkit.bio.{BioSuite, PatienceConfig}
import ch.epfl.bluebrain.nexus.testkit.{CirceLiteral, TestHelpers}
import ch.epfl.bluebrain.nexus.testkit.ce.CatsEffectSuite
import fs2.Chunk
import io.circe.Json
import munit.AnyFixture
Expand All @@ -27,9 +29,10 @@ import scala.concurrent.duration._

class GraphAnalyticsSinkSuite
extends BioSuite
with CatsEffectSuite
with ElasticSearchClientSetup.Fixture
with CirceLiteral
with TestHelpers {
with CirceLiteral {
override val ioTimeout: FiniteDuration = 45.seconds

implicit private val patienceConfig: PatienceConfig = PatienceConfig(5.seconds, 50.millis)

Expand Down Expand Up @@ -89,9 +92,9 @@ class GraphAnalyticsSinkSuite
test("Create the update script and the index") {
for {
script <- scriptContent
_ <- client.createScript(updateRelationshipsScriptId, script)
_ <- toCatsIO(client.createScript(updateRelationshipsScriptId, script))
mapping <- graphAnalyticsMappings
_ <- client.createIndex(index, Some(mapping), None).assert(true)
_ <- toCatsIO(client.createIndex(index, Some(mapping), None)).assert
} yield ()
}

Expand Down Expand Up @@ -124,12 +127,12 @@ class GraphAnalyticsSinkSuite
deprecated = indexDeprecated(deprecatedResource, deprecatedResourceTypes)
chunk = Chunk.seq(List(active1, active2, discarded, deprecated))
// We expect no error
_ <- sink(chunk).assert(chunk.map(_.void))
_ <- sink(chunk).assertEquals(chunk.map(_.void))
// 3 documents should have been indexed correctly:
// - `resource1` with the relationship to `resource3` resolved
// - `resource2` with no reference resolved
// - `deprecatedResource` with only metadata, resolution is skipped
_ <- client.count(index.value).eventually(3L)
_ <- toCatsIO(client.count(index.value).eventually(3L))
expected1 <- ioJsonContentOf("result/resource1.json")
expected2 <- ioJsonContentOf("result/resource2.json")
expectedDeprecated <- ioJsonContentOf("result/resource_deprecated.json")
Expand Down Expand Up @@ -158,15 +161,15 @@ class GraphAnalyticsSinkSuite
)

for {
_ <- sink(chunk).assert(chunk.map(_.void))
_ <- sink(chunk).assertEquals(chunk.map(_.void))
// The reference to file1 should have been resolved and introduced as a relationship
// The update query should not have an effect on the other resource
_ <- client.refresh(index)
_ <- toCatsIO(client.refresh(index))
expected1 <- ioJsonContentOf("result/resource1_updated.json")
expected2 <- ioJsonContentOf("result/resource2.json")
_ <- client.count(index.value).eventually(3L)
_ <- client.getSource[Json](index, resource1.toString).eventually(expected1)
_ <- client.getSource[Json](index, resource2.toString).eventually(expected2)
_ <- toCatsIO(client.count(index.value).eventually(3L))
_ <- toCatsIO(client.getSource[Json](index, resource1.toString).eventually(expected1))
_ <- toCatsIO(client.getSource[Json](index, resource2.toString).eventually(expected2))
} yield ()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ package object model {

type ElemStream[Value] = Stream[Task, Elem[Value]]

type ElemPipe[In, Out] = Pipe[Task, Elem[In], Elem[Out]]
type ElemPipe[In, Out] = ElemPipeF[Task, In, Out]

type ElemPipeF[F[_], In, Out] = Pipe[F, Elem[In], Elem[Out]]

type ElemStreamCats[Value] = Stream[IO, Elem[Value]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import cats.data.NonEmptyChain
import cats.effect.concurrent.Ref
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.OperationF.SinkF
import fs2.Stream
import fs2.concurrent.SignallingRef
import monix.bio.Task
Expand Down Expand Up @@ -51,7 +52,7 @@ object CompiledProjection {
metadata: ProjectionMetadata,
executionStrategy: ExecutionStrategy,
source: Source,
sink: Sink
sink: SinkF[Task]
): Either[ProjectionErr, CompiledProjection] =
source.through(sink).map { p =>
CompiledProjection(metadata, executionStrategy, offset => _ => _ => p.apply(offset).map(_.void))
Expand All @@ -68,7 +69,7 @@ object CompiledProjection {
sink: Sink
): Either[ProjectionErr, CompiledProjection] =
for {
operations <- Operation.merge(chain ++ NonEmptyChain.one(sink))
operations <- OperationF.merge(chain ++ NonEmptyChain.one(sink))
result <- source.through(operations)
} yield CompiledProjection(metadata, executionStrategy, offset => _ => _ => result.apply(offset).map(_.void))

Expand Down
Loading

0 comments on commit 5d05de9

Please sign in to comment.