Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate Search plugin to cats-effect #4315

Merged
merged 17 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.kernel.effect.migration

import cats.effect.IO
import cats.~>
import monix.bio.{IO => BIO, UIO}
import monix.bio.{IO => BIO, Task, UIO}
import monix.execution.Scheduler.Implicits.global

import scala.reflect.ClassTag
Expand All @@ -13,7 +13,8 @@ trait MigrateEffectSyntax {

implicit def toMonixBIOOps[A](io: IO[A]): CatsIOToBioOps[A] = new CatsIOToBioOps(io)

val ioToUioK: IO ~> UIO = λ[IO ~> UIO](_.toUIO)
val taskToIoK: Task ~> IO = λ[Task ~> IO](toCatsIO(_))
val ioToUioK: IO ~> UIO = λ[IO ~> UIO](_.toUIO)

}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing

import cats.syntax.all._
import cats.effect.IO
import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toMonixBIOOps
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef.{ActiveViewDef, DeprecatedViewDef}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.projections.CompositeProjections
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.stream.CompositeGraphStream
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ExecutionStrategy.TransientSingleNode
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{CompiledProjection, PipeChain}
import monix.bio.{IO, Task}
import monix.bio.{IO => BIO, Task}

/**
* Handle the different life stages of a composite view projection
Expand Down Expand Up @@ -39,7 +41,7 @@ object CompositeProjectionLifeCycle {
* Hook that allows to capture changes to apply before starting the indexing of a composite view
*/
trait Hook {
def apply(view: ActiveViewDef): Option[Task[Unit]]
def apply(view: ActiveViewDef): Option[IO[Unit]]
}

/**
Expand Down Expand Up @@ -96,20 +98,20 @@ object CompositeProjectionLifeCycle {
}

private def detectHook(view: ActiveViewDef) = {
val initial: Option[Task[Unit]] = None
val initial: Option[IO[Unit]] = None
hooks.toList
.foldLeft(initial) { case (acc, hook) =>
(acc ++ hook(view)).reduceOption(_ >> _)
}
.map { task =>
Task.pure(CompiledProjection.fromTask(view.metadata, TransientSingleNode, task))
Task.pure(CompiledProjection.fromTask(view.metadata, TransientSingleNode, task.toUIO))
}
}

override def destroyOnIndexingChange(prev: ActiveViewDef, next: CompositeViewDef): Task[Unit] =
(prev, next) match {
case (prev, next) if prev.ref != next.ref =>
IO.terminate(new IllegalArgumentException(s"Different views were provided: '${prev.ref}' and '${next.ref}'"))
BIO.terminate(new IllegalArgumentException(s"Different views were provided: '${prev.ref}' and '${next.ref}'"))
olivergrabinski marked this conversation as resolved.
Show resolved Hide resolved
case (prev, _: DeprecatedViewDef) =>
logger.info(s"View '${prev.ref}' has been deprecated, cleaning up the current one.") >> destroyAll(prev)
case (prev, nextActive: ActiveViewDef) if prev.indexingRev != nextActive.indexingRev =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext.ContextRejection
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import ch.epfl.bluebrain.nexus.delta.sourcing.rejection.Rejection
import io.circe.syntax._
import io.circe.{Encoder, Json, JsonObject}

Expand All @@ -33,7 +34,7 @@ import io.circe.{Encoder, Json, JsonObject}
* @param reason
* a descriptive message as to why the rejection occurred
*/
sealed abstract class CompositeViewRejection(val reason: String) extends Product with Serializable
sealed abstract class CompositeViewRejection(val reason: String) extends Rejection

object CompositeViewRejection {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,23 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing

import cats.data.NonEmptyMapImpl
import cats.effect.concurrent.Ref
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.CompositeViewsFixture
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeProjectionLifeCycle.Hook
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeProjectionLifeCycleSuite.DestroyResult
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeProjectionLifeCycleSuite.DestroyResult._
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef.{ActiveViewDef, DeprecatedViewDef}
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.sdk.views.{IndexingRev, IndexingViewRef, ViewRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.config.BatchConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{CompiledProjection, ExecutionStatus, ExecutionStrategy, Projection, ProjectionMetadata}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream._
import ch.epfl.bluebrain.nexus.testkit.bio.{BioSuite, PatienceConfig}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeProjectionLifeCycleSuite.DestroyResult._
import monix.bio.{Task, UIO}
import munit.Location

import concurrent.duration._
import java.util.UUID
import scala.concurrent.duration._

class CompositeProjectionLifeCycleSuite extends BioSuite with CompositeViewsFixture {

Expand All @@ -34,7 +35,7 @@ class CompositeProjectionLifeCycleSuite extends BioSuite with CompositeViewsFixt
private def createHook(name: String, test: ViewRef => Boolean, ref: Ref[Task, Set[String]]): Hook =
(view: CompositeViewDef.ActiveViewDef) => {
Option.when(test(view.ref)) {
ref.update {
ref.mapK(taskToIoK).update {
_ + name
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.plugins.search

import akka.http.scaladsl.model.Uri
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.CompositeViews
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.projectionIndex
Expand All @@ -14,7 +15,8 @@ import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress.{Project => Proje
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label
import io.circe.{Json, JsonObject}
import monix.bio.{IO, UIO}

import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._

trait Search {

Expand All @@ -24,7 +26,7 @@ trait Search {
* @param payload
* the query payload
*/
def query(payload: JsonObject, qp: Uri.Query)(implicit caller: Caller): IO[SearchRejection, Json]
def query(payload: JsonObject, qp: Uri.Query)(implicit caller: Caller): IO[Json]

/**
* Queries the underlying search indices for the provided suite that the ''caller'' has access to
Expand All @@ -34,14 +36,14 @@ trait Search {
* @param payload
* the query payload
*/
def query(suite: Label, payload: JsonObject, qp: Uri.Query)(implicit caller: Caller): IO[SearchRejection, Json]
def query(suite: Label, payload: JsonObject, qp: Uri.Query)(implicit caller: Caller): IO[Json]
}

object Search {

final case class TargetProjection(projection: ElasticSearchProjection, view: CompositeView)

private[search] type ListProjections = () => UIO[Seq[TargetProjection]]
private[search] type ListProjections = () => IO[Seq[TargetProjection]]

/**
* Constructs a new [[Search]] instance.
Expand All @@ -58,7 +60,7 @@ object Search {
compositeViews
.list(
Pagination.OnePage,
CompositeViewSearchParams(deprecated = Some(false), filter = v => UIO.pure(v.id == defaultViewId)),
CompositeViewSearchParams(deprecated = Some(false), filter = v => IO.pure(v.id == defaultViewId).toUIO),
olivergrabinski marked this conversation as resolved.
Show resolved Hide resolved
Ordering.by(_.createdAt)
)
.map(
Expand Down Expand Up @@ -91,27 +93,27 @@ object Search {
) =
for {
allProjections <- listProjections().map(_.filter(projectionPredicate))
accessibleIndices <- aclCheck.mapFilter[TargetProjection, String](
allProjections,
p => ProjectAcl(p.view.project) -> p.projection.permission,
p => projectionIndex(p.projection, p.view.uuid, prefix).value
accessibleIndices <- toCatsIO(
dantb marked this conversation as resolved.
Show resolved Hide resolved
aclCheck.mapFilter[TargetProjection, String](
allProjections,
p => ProjectAcl(p.view.project) -> p.projection.permission,
p => projectionIndex(p.projection, p.view.uuid, prefix).value
)
)
results <- client.search(payload, accessibleIndices, qp)().mapError(WrappedElasticSearchClientError)
results <- toCatsIO(client.search(payload, accessibleIndices, qp)().mapError(WrappedElasticSearchClientError))
} yield results

override def query(payload: JsonObject, qp: Uri.Query)(implicit caller: Caller): IO[SearchRejection, Json] =
override def query(payload: JsonObject, qp: Uri.Query)(implicit caller: Caller): IO[Json] =
query(_ => true, payload, qp)

override def query(suite: Label, payload: JsonObject, qp: Uri.Query)(implicit
caller: Caller
): IO[SearchRejection, Json] =
IO.fromOption(
suites.get(suite),
UnknownSuite(suite)
).flatMap { projects =>
): IO[Json] = {
IO.fromOption(suites.get(suite))(UnknownSuite(suite)).flatMap { projects =>
def predicate(p: TargetProjection): Boolean = projects.contains(p.view.project)
query(predicate(_), payload, qp)
}
}

}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ch.epfl.bluebrain.nexus.delta.plugins.search

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.CompositeViews
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeProjectionLifeCycle.Hook
Expand All @@ -10,16 +11,17 @@ import ch.epfl.bluebrain.nexus.delta.plugins.search.model.defaultViewId
import ch.epfl.bluebrain.nexus.delta.sdk.Defaults
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import monix.bio.{Task, UIO}

import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._

final class SearchConfigHook(
defaults: Defaults,
config: IndexingConfig,
update: (ActiveViewDef, CompositeViewFields) => UIO[Unit]
update: (ActiveViewDef, CompositeViewFields) => IO[Unit]
) extends Hook {

private val defaultSearchViewFields = SearchViewFactory(defaults, config)
override def apply(view: ActiveViewDef): Option[Task[Unit]] =
private val defaultSearchViewFields = SearchViewFactory(defaults, config)
override def apply(view: ActiveViewDef): Option[IO[Unit]] =
Option.when(viewIsDefault(view) && configHasChanged(view))(update(view, defaultSearchViewFields))

private def configHasChanged(v: ActiveViewDef): Boolean = !SearchViewFactory.matches(v.value, defaults, config)
Expand All @@ -29,7 +31,7 @@ final class SearchConfigHook(

object SearchConfigHook {

private val logger: Logger = Logger[SearchConfigHook]
private val logger = Logger.cats[SearchConfigHook]

def apply(compositeViews: CompositeViews, defaults: Defaults, indexingConfig: IndexingConfig)(implicit
baseUri: BaseUri,
Expand All @@ -43,13 +45,12 @@ object SearchConfigHook {
private def update(views: CompositeViews)(implicit
subject: Subject,
baseUri: BaseUri
): (ActiveViewDef, CompositeViewFields) => UIO[Unit] = { (viewDef: ActiveViewDef, fields: CompositeViewFields) =>
views
.update(viewDef.ref.viewId, viewDef.ref.project, viewDef.rev, fields)
.redeemWith(
e => logger.error(s"Could not update view '${viewDef.ref}'. Reason: '${e.reason}'"),
_ => logger.info(s"Search view '${viewDef.ref}' has been successfully updated.")
)
): (ActiveViewDef, CompositeViewFields) => IO[Unit] = { (viewDef: ActiveViewDef, fields: CompositeViewFields) =>
toCatsIO(
views.update(viewDef.ref.viewId, viewDef.ref.project, viewDef.rev, fields)
)
.handleErrorWith(e => logger.error(s"Could not update view '${viewDef.ref}'. Message: '${e.getMessage}'"))
.flatMap(_ => logger.info(s"Search view '${viewDef.ref}' has been successfully updated."))
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ch.epfl.bluebrain.nexus.delta.plugins.search

import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.CompositeViews
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeProjectionLifeCycle
Expand All @@ -9,17 +10,18 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.sdk._
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.ce.CatsScopeInitialization
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.ServiceAccount
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import com.typesafe.config.Config
import distage.ModuleDef
import io.circe.syntax.EncoderOps
import izumi.distage.model.definition.Id
import monix.execution.Scheduler

class SearchPluginModule(priority: Int) extends ModuleDef {

make[SearchConfig].fromEffect { cfg => SearchConfig.load(cfg) }
make[SearchConfig].fromEffect { (cfg: Config) => SearchConfig.load(cfg).toUIO }

make[Search].from {
(
Expand All @@ -37,7 +39,9 @@ class SearchPluginModule(priority: Int) extends ModuleDef {
new SearchScopeInitialization(views, config.indexing, serviceAccount, config.defaults)(baseUri)
}

many[ScopeInitialization].ref[SearchScopeInitialization]
many[ScopeInitialization].add { (s: SearchScopeInitialization) =>
CatsScopeInitialization.toBioScope(s)
}

make[SearchRoutes].from {
(
Expand All @@ -46,10 +50,9 @@ class SearchPluginModule(priority: Int) extends ModuleDef {
search: Search,
config: SearchConfig,
baseUri: BaseUri,
s: Scheduler,
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering
) => new SearchRoutes(identities, aclCheck, search, config.fields.asJson)(baseUri, s, cr, ordering)
) => new SearchRoutes(identities, aclCheck, search, config.fields.asJson)(baseUri, cr, ordering)
}

many[PriorityRoute].add { (route: SearchRoutes) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,26 @@ package ch.epfl.bluebrain.nexus.delta.plugins.search

import akka.http.scaladsl.server.Directives.{as, concat, entity, get, pathEndOrSingleSlash, pathPrefix, post}
import akka.http.scaladsl.server.Route
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.routes.ElasticSearchViewsDirectives.extractQueryParams
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.ce.DeltaDirectives
import ch.epfl.bluebrain.nexus.delta.sdk.circe.CirceUnmarshalling
import ch.epfl.bluebrain.nexus.delta.sdk.directives.{AuthDirectives, DeltaDirectives}
import ch.epfl.bluebrain.nexus.delta.sdk.directives.AuthDirectives
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.RdfMarshalling
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import io.circe.{Json, JsonObject}
import kamon.instrumentation.akka.http.TracingDirectives.operationName
import monix.bio.UIO
import monix.execution.Scheduler

class SearchRoutes(
identities: Identities,
aclCheck: AclCheck,
search: Search,
configFields: Json
)(implicit baseUri: BaseUri, s: Scheduler, cr: RemoteContextResolution, ordering: JsonKeyOrdering)
)(implicit baseUri: BaseUri, cr: RemoteContextResolution, ordering: JsonKeyOrdering)
extends AuthDirectives(identities, aclCheck)
with CirceUnmarshalling
with RdfMarshalling
Expand Down Expand Up @@ -50,7 +50,7 @@ class SearchRoutes(
// Get fields config
(pathPrefix("config") & get & pathEndOrSingleSlash) {
operationName(s"$prefixSegment/search/config") {
emit(UIO.pure(configFields: Json))
emit(IO.pure(configFields: Json))
}
}
)
Expand Down
Loading