Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate Search plugin to cats-effect #4315

Merged
merged 17 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.routes
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.headers.OAuth2BearerToken
import akka.http.scaladsl.server.Route
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.kernel.utils.{UUIDF, UrlUtils}
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.contexts
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclSimpleCheck
Expand Down Expand Up @@ -35,7 +36,7 @@ class OrganizationsRoutesSpec extends BaseRouteSpec with IOFromMap {

private val aclChecker = AclSimpleCheck().accepted
private val aopd = new OwnerPermissionsScopeInitialization(
aclChecker.append,
acl => aclChecker.append(acl),
Set(orgsPermissions.write, orgsPermissions.read)
)
private lazy val orgs = OrganizationsImpl(Set(aopd), config, xas)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.kernel.effect.migration

import cats.effect.IO
import cats.~>
import monix.bio.{IO => BIO, UIO}
import monix.bio.{IO => BIO, Task, UIO}
import monix.execution.Scheduler.Implicits.global

import scala.reflect.ClassTag
Expand All @@ -13,7 +13,8 @@ trait MigrateEffectSyntax {

implicit def toMonixBIOOps[A](io: IO[A]): CatsIOToBioOps[A] = new CatsIOToBioOps(io)

val ioToUioK: IO ~> UIO = λ[IO ~> UIO](_.toUIO)
val taskToIoK: Task ~> IO = λ[Task ~> IO](toCatsIO(_))
val ioToUioK: IO ~> UIO = λ[IO ~> UIO](_.toUIO)

}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph

import cats.effect.IO
import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent
import ch.epfl.bluebrain.nexus.delta.kernel.syntax._
import ch.epfl.bluebrain.nexus.delta.kernel.syntax.kamonSyntax
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.BlazegraphViews.entityType
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewRejection.{ProjectContextRejection, ResourceAlreadyExists}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewValue.IndexingBlazegraphViewValue
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model._
import ch.epfl.bluebrain.nexus.delta.sdk.{Defaults, ScopeInitialization}
import ch.epfl.bluebrain.nexus.delta.sdk.error.ServiceError.ScopeInitializationFailed
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.ServiceAccount
import ch.epfl.bluebrain.nexus.delta.sdk.organizations.model.Organization
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.Project
import ch.epfl.bluebrain.nexus.delta.sdk.{Defaults, ScopeInitialization}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import com.typesafe.scalalogging.Logger
import monix.bio.{IO, UIO}

/**
* The default creation of the default SparqlView as part of the project initialization.
Expand All @@ -30,7 +32,7 @@ class BlazegraphScopeInitialization(
defaults: Defaults
) extends ScopeInitialization {

private val logger: Logger = Logger[BlazegraphScopeInitialization]
private val logger = Logger.cats[BlazegraphScopeInitialization]
implicit private val serviceAccountSubject: Subject = serviceAccount.subject
implicit private val kamonComponent: KamonMetricComponent = KamonMetricComponent(entityType.value)

Expand All @@ -45,23 +47,23 @@ class BlazegraphScopeInitialization(
permission = permissions.query
)

override def onProjectCreation(project: Project, subject: Identity.Subject): IO[ScopeInitializationFailed, Unit] =
override def onProjectCreation(project: Project, subject: Identity.Subject): IO[Unit] =
views
.create(defaultViewId, project.ref, defaultValue)
.void
.onErrorHandleWith {
case _: ResourceAlreadyExists => UIO.unit // nothing to do, view already exits
case _: ProjectContextRejection => UIO.unit // project or org are likely deprecated
.handleErrorWith {
case _: ResourceAlreadyExists => IO.unit // nothing to do, view already exits
case _: ProjectContextRejection => IO.unit // project or org are likely deprecated
case rej =>
val str =
s"Failed to create the default SparqlView for project '${project.ref}' due to '${rej.reason}'."
UIO.delay(logger.error(str)) >> IO.raiseError(ScopeInitializationFailed(str))
s"Failed to create the default SparqlView for project '${project.ref}' due to '${rej.getMessage}'."
logger.error(str) >> IO.raiseError(ScopeInitializationFailed(str))
}
.span("createDefaultSparqlView")

override def onOrganizationCreation(
organization: Organization,
subject: Identity.Subject
): IO[ScopeInitializationFailed, Unit] = IO.unit
): IO[Unit] = IO.unit

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext.ContextRejection
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import ch.epfl.bluebrain.nexus.delta.sourcing.rejection.Rejection
import io.circe.syntax.EncoderOps
import io.circe.{Encoder, JsonObject}

sealed abstract class BlazegraphViewRejection(val reason: String) extends Product with Serializable
sealed abstract class BlazegraphViewRejection(val reason: String) extends Rejection

object BlazegraphViewRejection {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import ch.epfl.bluebrain.nexus.delta.sdk.{ConfigFixtures, Defaults}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Subject, User}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label
import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.DoobieScalaTestFixture
import ch.epfl.bluebrain.nexus.testkit.{IOFixedClock, IOValues, TestHelpers}
import ch.epfl.bluebrain.nexus.testkit.ce.CatsIOValues
import ch.epfl.bluebrain.nexus.testkit.{IOFixedClock, TestHelpers}
import monix.bio.UIO
import monix.execution.Scheduler
import org.scalatest.Inspectors
Expand All @@ -27,7 +28,7 @@ class BlazegraphScopeInitializationSpec
with Matchers
with Inspectors
with IOFixedClock
with IOValues
with CatsIOValues
with TestHelpers
with ConfigFixtures
with Fixtures {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing

import cats.syntax.all._
import cats.effect.IO
import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toMonixBIOOps
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef.{ActiveViewDef, DeprecatedViewDef}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.projections.CompositeProjections
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.stream.CompositeGraphStream
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ExecutionStrategy.TransientSingleNode
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{CompiledProjection, PipeChain}
import monix.bio.{IO, Task}
import monix.bio.{IO => BIO, Task}

/**
* Handle the different life stages of a composite view projection
Expand Down Expand Up @@ -39,7 +41,7 @@ object CompositeProjectionLifeCycle {
* Hook that allows to capture changes to apply before starting the indexing of a composite view
*/
trait Hook {
def apply(view: ActiveViewDef): Option[Task[Unit]]
def apply(view: ActiveViewDef): Option[IO[Unit]]
}

/**
Expand Down Expand Up @@ -96,20 +98,20 @@ object CompositeProjectionLifeCycle {
}

private def detectHook(view: ActiveViewDef) = {
val initial: Option[Task[Unit]] = None
val initial: Option[IO[Unit]] = None
hooks.toList
.foldLeft(initial) { case (acc, hook) =>
(acc ++ hook(view)).reduceOption(_ >> _)
}
.map { task =>
Task.pure(CompiledProjection.fromTask(view.metadata, TransientSingleNode, task))
Task.pure(CompiledProjection.fromTask(view.metadata, TransientSingleNode, task.toUIO))
}
}

override def destroyOnIndexingChange(prev: ActiveViewDef, next: CompositeViewDef): Task[Unit] =
(prev, next) match {
case (prev, next) if prev.ref != next.ref =>
IO.terminate(new IllegalArgumentException(s"Different views were provided: '${prev.ref}' and '${next.ref}'"))
BIO.terminate(new IllegalArgumentException(s"Different views were provided: '${prev.ref}' and '${next.ref}'"))
olivergrabinski marked this conversation as resolved.
Show resolved Hide resolved
case (prev, _: DeprecatedViewDef) =>
logger.info(s"View '${prev.ref}' has been deprecated, cleaning up the current one.") >> destroyAll(prev)
case (prev, nextActive: ActiveViewDef) if prev.indexingRev != nextActive.indexingRev =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext.ContextRejection
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import ch.epfl.bluebrain.nexus.delta.sourcing.rejection.Rejection
import io.circe.syntax._
import io.circe.{Encoder, Json, JsonObject}

Expand All @@ -33,7 +34,7 @@ import io.circe.{Encoder, Json, JsonObject}
* @param reason
* a descriptive message as to why the rejection occurred
*/
sealed abstract class CompositeViewRejection(val reason: String) extends Product with Serializable
sealed abstract class CompositeViewRejection(val reason: String) extends Rejection

object CompositeViewRejection {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,23 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing

import cats.data.NonEmptyMapImpl
import cats.effect.concurrent.Ref
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.CompositeViewsFixture
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeProjectionLifeCycle.Hook
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeProjectionLifeCycleSuite.DestroyResult
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeProjectionLifeCycleSuite.DestroyResult._
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef.{ActiveViewDef, DeprecatedViewDef}
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.sdk.views.{IndexingRev, IndexingViewRef, ViewRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.config.BatchConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{CompiledProjection, ExecutionStatus, ExecutionStrategy, Projection, ProjectionMetadata}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream._
import ch.epfl.bluebrain.nexus.testkit.bio.{BioSuite, PatienceConfig}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeProjectionLifeCycleSuite.DestroyResult._
import monix.bio.{Task, UIO}
import munit.Location

import concurrent.duration._
import java.util.UUID
import scala.concurrent.duration._

class CompositeProjectionLifeCycleSuite extends BioSuite with CompositeViewsFixture {

Expand All @@ -34,7 +35,7 @@ class CompositeProjectionLifeCycleSuite extends BioSuite with CompositeViewsFixt
private def createHook(name: String, test: ViewRef => Boolean, ref: Ref[Task, Set[String]]): Hook =
(view: CompositeViewDef.ActiveViewDef) => {
Option.when(test(view.ref)) {
ref.update {
ref.mapK(taskToIoK).update {
_ + name
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch

import cats.effect.IO
import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent
import ch.epfl.bluebrain.nexus.delta.kernel.syntax._
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchViews.entityType
Expand All @@ -15,8 +17,8 @@ import ch.epfl.bluebrain.nexus.delta.sdk.views.PipeStep
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes.{DefaultLabelPredicates, SourceAsText}
import com.typesafe.scalalogging.Logger
import monix.bio.{IO, UIO}
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._

/**
* The default creation of the default ElasticSearchView as part of the project initialization.
Expand All @@ -32,7 +34,7 @@ class ElasticSearchScopeInitialization(
defaults: Defaults
) extends ScopeInitialization {

private val logger: Logger = Logger[ElasticSearchScopeInitialization]
private val logger = Logger.cats[ElasticSearchScopeInitialization]
implicit private val serviceAccountSubject: Subject = serviceAccount.subject
implicit private val kamonComponent: KamonMetricComponent = KamonMetricComponent(entityType.value)

Expand All @@ -48,23 +50,23 @@ class ElasticSearchScopeInitialization(
permission = permissions.query
)

override def onProjectCreation(project: Project, subject: Identity.Subject): IO[ScopeInitializationFailed, Unit] =
override def onProjectCreation(project: Project, subject: Identity.Subject): IO[Unit] =
views
.create(defaultViewId, project.ref, defaultValue)
.void
.onErrorHandleWith {
case _: ResourceAlreadyExists => UIO.unit // nothing to do, view already exits
case _: ProjectContextRejection => UIO.unit // project or org are likely deprecated
.handleErrorWith {
case _: ResourceAlreadyExists => IO.unit // nothing to do, view already exits
case _: ProjectContextRejection => IO.unit // project or org are likely deprecated
case rej =>
val str =
s"Failed to create the default ElasticSearchView for project '${project.ref}' due to '${rej.reason}'."
UIO.delay(logger.error(str)) >> IO.raiseError(ScopeInitializationFailed(str))
s"Failed to create the default ElasticSearchView for project '${project.ref}' due to '${rej.getMessage}'."
logger.error(str) >> IO.raiseError(ScopeInitializationFailed(str))
}
.span("createDefaultElasticSearchView")

override def onOrganizationCreation(
organization: Organization,
subject: Identity.Subject
): IO[ScopeInitializationFailed, Unit] = IO.unit
): IO[Unit] = IO.unit

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.syntax._
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import ch.epfl.bluebrain.nexus.delta.sourcing.rejection.Rejection
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionErr
import io.circe.syntax._
import io.circe.{Encoder, Json, JsonObject}
Expand All @@ -30,7 +31,7 @@ import io.circe.{Encoder, Json, JsonObject}
* @param reason
* a descriptive message as to why the rejection occurred
*/
sealed abstract class ElasticSearchViewRejection(val reason: String) extends Product with Serializable
sealed abstract class ElasticSearchViewRejection(val reason: String) extends Rejection

object ElasticSearchViewRejection {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Subject, User}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label
import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.DoobieScalaTestFixture
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes.{DefaultLabelPredicates, SourceAsText}
import ch.epfl.bluebrain.nexus.testkit.{EitherValuable, IOValues, TestHelpers}
import ch.epfl.bluebrain.nexus.testkit.ce.CatsIOValues
import ch.epfl.bluebrain.nexus.testkit.{EitherValuable, TestHelpers}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import org.scalatest.{Inspectors, OptionValues}
Expand All @@ -29,7 +30,7 @@ class ElasticSearchScopeInitializationSpec
with AnyWordSpecLike
with Matchers
with Inspectors
with IOValues
with CatsIOValues
with OptionValues
with EitherValuable
with TestHelpers
Expand Down
Loading