Skip to content

Commit

Permalink
Migrate Elasticsearch Client to Cats Effect (#4464)
Browse files Browse the repository at this point in the history
  • Loading branch information
dantb authored Nov 7, 2023
1 parent d580a3a commit 1f42c1b
Show file tree
Hide file tree
Showing 35 changed files with 196 additions and 217 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.model.Uri.Query
import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef.ActiveViewDef
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.projectionIndex
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection.ElasticSearchProjection
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewRejection.WrappedElasticSearchClientError
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.error.ServiceError.AuthorizationFailed
import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClient.HttpResult
import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientError
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
import ch.epfl.bluebrain.nexus.delta.sdk.model.IdSegment
import ch.epfl.bluebrain.nexus.delta.sdk.model.search.SortList
Expand Down Expand Up @@ -69,7 +68,7 @@ trait ElasticSearchQuery {
object ElasticSearchQuery {

private[compositeviews] type ElasticSearchClientQuery =
(JsonObject, Set[String], Query) => HttpResult[Json]
(JsonObject, Set[String], Query) => IO[Json]

final def apply(
aclCheck: AclCheck,
Expand Down Expand Up @@ -101,7 +100,9 @@ object ElasticSearchQuery {
_ <-
aclCheck.authorizeForOr(project, projection.permission)(AuthorizationFailed(project, projection.permission))
index = projectionIndex(projection, view.uuid, prefix).value
search <- elasticSearchQuery(query, Set(index), qp).mapError(WrappedElasticSearchClientError)
search <- elasticSearchQuery(query, Set(index), qp).adaptError { case e: HttpClientError =>
WrappedElasticSearchClientError(e)
}
} yield search

override def queryProjections(
Expand All @@ -113,7 +114,9 @@ object ElasticSearchQuery {
for {
view <- fetchView(id, project)
indices <- allowedProjections(view, project)
search <- elasticSearchQuery(query, indices, qp).mapError(WrappedElasticSearchClientError)
search <- elasticSearchQuery(query, indices, qp).adaptError { case e: HttpClientError =>
WrappedElasticSearchClientError(e)
}
} yield search

private def fetchProjection(view: ActiveViewDef, projectionId: IdSegment) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews

import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.client.DeltaClient
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.projectionIndex
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection.{ElasticSearchProjection, SparqlProjection}
Expand All @@ -20,6 +19,8 @@ import ch.epfl.bluebrain.nexus.delta.sdk.projects.Projects

import java.util.UUID

import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toCatsIOOps

/**
* Validate an [[CompositeViewValue]] during command evaluation
*/
Expand Down Expand Up @@ -52,17 +53,16 @@ object ValidateCompositeView {
IO.raiseWhen(!perms.contains(permission))(PermissionIsNotDefined(permission))
}

def validateIndex(es: ElasticSearchProjection, index: IndexLabel) = toCatsIO {
def validateIndex(es: ElasticSearchProjection, index: IndexLabel) =
client
.createIndex(index, Some(es.mapping), es.settings)
.mapError {
.adaptError {
case err: HttpClientStatusError => InvalidElasticSearchProjectionPayload(err.jsonBody)
case err => WrappedElasticSearchClientError(err)
case err: HttpClientError => WrappedElasticSearchClientError(err)
}
.void
}

val checkRemoteEvent: RemoteProjectSource => IO[Unit] = deltaClient.checkElems
val checkRemoteEvent: RemoteProjectSource => IO[Unit] = deltaClient.checkElems(_).toCatsIO

val validateSource: CompositeViewSource => IO[Unit] = {
case _: ProjectSource => IO.unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ object CompositeSpaces {
val result = view.value.projections.foldLeft[IO[Unit]](createCommon) {
case (acc, e: ElasticSearchProjection) =>
val index = projectionIndex(e, view.uuid, prefix)
acc >> esClient.createIndex(index, Some(e.mapping), e.settings).toCatsIO.void
acc >> esClient.createIndex(index, Some(e.mapping), e.settings).void
case (acc, s: SparqlProjection) =>
val namespace = projectionNamespace(s, view.uuid, prefix)
acc >> blazeClient.createNamespace(namespace).toCatsIO.void
Expand All @@ -69,7 +69,7 @@ object CompositeSpaces {
projection match {
case e: ElasticSearchProjection =>
val index = projectionIndex(e, view.uuid, prefix)
esClient.deleteIndex(index).toCatsIO.void
esClient.deleteIndex(index).void
case s: SparqlProjection =>
val namespace = projectionNamespace(s, view.uuid, prefix)
blazeClient.deleteNamespace(namespace).toCatsIO.void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.Uri.Query
import cats.data.NonEmptyList
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.permissions
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef.ActiveViewDef
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.projectionIndex
Expand All @@ -19,7 +20,6 @@ import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclSimpleCheck
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress
import ch.epfl.bluebrain.nexus.delta.sdk.error.ServiceError.AuthorizationFailed
import ch.epfl.bluebrain.nexus.delta.sdk.generators.ProjectGen
import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClient.HttpResult
import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientError.HttpUnexpectedError
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
import ch.epfl.bluebrain.nexus.delta.sdk.implicits._
Expand All @@ -32,7 +32,6 @@ import ch.epfl.bluebrain.nexus.testkit._
import ch.epfl.bluebrain.nexus.testkit.scalatest.ce.CatsEffectSpec
import io.circe.syntax._
import io.circe.{Json, JsonObject}
import monix.bio.IO
import org.scalatest.CancelAfterFailure

import java.util.UUID
Expand Down Expand Up @@ -126,7 +125,7 @@ class ElasticSearchQuerySpec extends CatsEffectSpec with CirceLiteral with Cance
q: JsonObject,
indices: Set[String],
qp: Query
): HttpResult[Json] =
): IO[Json] =
if (q == query)
IO.pure(Json.arr(indices.foldLeft(Seq.empty[Json])((acc, idx) => acc :+ indexResults(idx).asJson): _*))
else IO.raiseError(HttpUnexpectedError(HttpRequest(), ""))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ trait CompositeIndexingFixture extends CatsEffectSuite with BioRunContext {
)

private def resource(sinkConfig: SinkConfig): Resource[IO, Setup] = {
(Doobie.resource(), ElasticSearchClientSetup.resource(), BlazegraphClientSetup.resource())
(
Doobie.resource().mapK(taskToIoK),
ElasticSearchClientSetup.resource(),
BlazegraphClientSetup.resource().mapK(taskToIoK)
)
.parMapN { case (xas, esClient, bgClient) =>
val compositeRestartStore = new CompositeRestartStore(xas)
val projections =
Expand All @@ -96,7 +100,6 @@ trait CompositeIndexingFixture extends CatsEffectSuite with BioRunContext {
val sinks = CompositeSinks(prefix, esClient, bgClient, compositeConfig.copy(sinkConfig = sinkConfig))
Setup(esClient, bgClient, projections, spaces, sinks)
}
.mapK(taskToIoK)
}

def suiteLocalFixture(name: String, sinkConfig: SinkConfig): ResourceFixture.IOFixture[Setup] =
Expand Down Expand Up @@ -336,12 +339,12 @@ abstract class CompositeIndexingSuite(sinkConfig: SinkConfig, query: SparqlConst
_ <- spaces.init(view)
_ <- bgClient.existsNamespace(commonNs).toCatsIO.assertEquals(true)
_ <- bgClient.existsNamespace(sparqlNamespace).toCatsIO.assertEquals(true)
_ <- esClient.existsIndex(elasticIndex).toCatsIO.assertEquals(true)
_ <- esClient.existsIndex(elasticIndex).assertEquals(true)
// Delete them on destroy
_ <- spaces.destroyAll(view)
_ <- bgClient.existsNamespace(commonNs).toCatsIO.assertEquals(false)
_ <- bgClient.existsNamespace(sparqlNamespace).toCatsIO.assertEquals(false)
_ <- esClient.existsIndex(elasticIndex).toCatsIO.assertEquals(false)
_ <- esClient.existsIndex(elasticIndex).assertEquals(false)
} yield ()
}

Expand All @@ -361,17 +364,17 @@ abstract class CompositeIndexingSuite(sinkConfig: SinkConfig, query: SparqlConst
_ <- spaces.init(view)
_ <- bgClient.existsNamespace(commonNs).toCatsIO.assertEquals(true)
_ <- bgClient.existsNamespace(sparqlNamespace).toCatsIO.assertEquals(true)
_ <- esClient.existsIndex(elasticIndex).toCatsIO.assertEquals(true)
_ <- esClient.existsIndex(elasticIndex).assertEquals(true)
// Delete the blazegraph projection
_ <- spaces.destroyProjection(view, blazegraphProjection)
_ <- bgClient.existsNamespace(commonNs).toCatsIO.assertEquals(true)
_ <- bgClient.existsNamespace(sparqlNamespace).toCatsIO.assertEquals(false)
_ <- esClient.existsIndex(elasticIndex).toCatsIO.assertEquals(true)
_ <- esClient.existsIndex(elasticIndex).assertEquals(true)
// Delete the elasticsearch projection
_ <- spaces.destroyProjection(view, elasticSearchProjection)
_ <- bgClient.existsNamespace(commonNs).toCatsIO.assertEquals(true)
_ <- bgClient.existsNamespace(sparqlNamespace).toCatsIO.assertEquals(false)
_ <- esClient.existsIndex(elasticIndex).toCatsIO.assertEquals(false)
_ <- esClient.existsIndex(elasticIndex).assertEquals(false)
} yield ()
}

Expand Down Expand Up @@ -574,14 +577,13 @@ abstract class CompositeIndexingSuite(sinkConfig: SinkConfig, query: SparqlConst
private def checkElasticSearchDocuments(index: IndexLabel, expected: Json*): IO[Unit] = {
val page = FromPagination(0, 5000)
for {
_ <- esClient.refresh(index).toCatsIO
_ <- esClient.refresh(index)
results <- esClient
.search(
QueryBuilder.empty.withSort(SortList(List(Sort("@id")))).withPage(page),
Set(index.value),
Query.Empty
)
.toCatsIO
_ = assertEquals(results.sources.size, expected.size)
_ = results.sources.zip(expected).foreach { case (obtained, expected) =>
obtained.asJson.equalsIgnoreArrayOrder(expected)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient
import ch.epfl.bluebrain.nexus.delta.sdk.ServiceDependency
import ch.epfl.bluebrain.nexus.delta.sdk.model.ComponentDescription.ServiceDescription
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._

/**
* Describes the remote storage [[ServiceDependency]] providing a way to extract the [[ServiceDescription]] from a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ final class ElasticSearchViewsQueryImpl private[elasticsearch] (
for {
view <- viewStore.fetch(id, project).toCatsIO
indices <- extractIndices(view)
search <- client.search(query, indices, qp)(SortList.empty).mapError(WrappedElasticSearchClientError)
search <- client.search(query, indices, qp)(SortList.empty).adaptError { case e: HttpClientError =>
WrappedElasticSearchClientError(e)
}
} yield search
}

Expand All @@ -119,7 +121,7 @@ final class ElasticSearchViewsQueryImpl private[elasticsearch] (
_ <- aclCheck.authorizeForOr(project, permissions.write)(AuthorizationFailed(project, permissions.write))
view <- viewStore.fetch(id, project).toCatsIO
idx <- indexOrError(view, id)
search <- client.mapping(IndexLabel.unsafe(idx)).toCatsIO.adaptError { case e: HttpClientError =>
search <- client.mapping(IndexLabel.unsafe(idx)).adaptError { case e: HttpClientError =>
WrappedElasticSearchClientError(e)
}
} yield search
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch

import cats.data.NonEmptyChain
import cats.effect.{ContextShift, IO, Timer}
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchClient, IndexLabel}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.ElasticSearchSink
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch

import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchClient, IndexLabel}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewRejection.{InvalidElasticSearchIndexPayload, InvalidPipeline, InvalidViewReferences, PermissionIsNotDefined, TooManyViewReferences, WrappedElasticSearchClientError}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewValue.{AggregateElasticSearchViewValue, IndexingElasticSearchViewValue}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{DefaultMapping, DefaultSettings, ElasticSearchViewValue}
import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClient.HttpResult
import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientError
import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientError.HttpClientStatusError
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions
Expand Down Expand Up @@ -55,7 +53,7 @@ object ValidateElasticSearchView {
def apply(
validatePipeChain: PipeChain => Either[ProjectionErr, Unit],
fetchPermissionSet: IO[Set[Permission]],
createIndex: (IndexLabel, Option[JsonObject], Option[JsonObject]) => HttpResult[Unit],
createIndex: (IndexLabel, Option[JsonObject], Option[JsonObject]) => IO[Unit],
prefix: String,
maxViewRefs: Int,
xas: Transactors,
Expand All @@ -81,7 +79,7 @@ object ValidateElasticSearchView {
IndexLabel.fromView(prefix, uuid, indexingRev),
value.mapping.orElse(Some(defaultMapping.value)),
value.settings.orElse(Some(defaultSettings.value))
).toCatsIO
)
.adaptError {
case err: HttpClientStatusError => InvalidElasticSearchIndexPayload(err.jsonBody)
case err: HttpClientError => WrappedElasticSearchClientError(err)
Expand Down
Loading

0 comments on commit 1f42c1b

Please sign in to comment.