diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/ElasticSearchQuery.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/ElasticSearchQuery.scala index cd8c4c9803..35a1141c4e 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/ElasticSearchQuery.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/ElasticSearchQuery.scala @@ -4,7 +4,6 @@ import akka.http.scaladsl.model.Uri import akka.http.scaladsl.model.Uri.Query import cats.effect.IO import cats.syntax.all._ -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef.ActiveViewDef import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.projectionIndex import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection.ElasticSearchProjection @@ -12,7 +11,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewR import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck import ch.epfl.bluebrain.nexus.delta.sdk.error.ServiceError.AuthorizationFailed -import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClient.HttpResult +import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientError import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller import ch.epfl.bluebrain.nexus.delta.sdk.model.IdSegment import ch.epfl.bluebrain.nexus.delta.sdk.model.search.SortList @@ -69,7 +68,7 @@ trait ElasticSearchQuery { object ElasticSearchQuery { private[compositeviews] type ElasticSearchClientQuery = - (JsonObject, Set[String], Query) => HttpResult[Json] + (JsonObject, Set[String], Query) => IO[Json] final def apply( aclCheck: AclCheck, @@ -101,7 +100,9 @@ object ElasticSearchQuery { _ <- aclCheck.authorizeForOr(project, projection.permission)(AuthorizationFailed(project, projection.permission)) index = projectionIndex(projection, view.uuid, prefix).value - search <- elasticSearchQuery(query, Set(index), qp).mapError(WrappedElasticSearchClientError) + search <- elasticSearchQuery(query, Set(index), qp).adaptError { case e: HttpClientError => + WrappedElasticSearchClientError(e) + } } yield search override def queryProjections( @@ -113,7 +114,9 @@ object ElasticSearchQuery { for { view <- fetchView(id, project) indices <- allowedProjections(view, project) - search <- elasticSearchQuery(query, indices, qp).mapError(WrappedElasticSearchClientError) + search <- elasticSearchQuery(query, indices, qp).adaptError { case e: HttpClientError => + WrappedElasticSearchClientError(e) + } } yield search private def fetchProjection(view: ActiveViewDef, projectionId: IdSegment) = diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/ValidateCompositeView.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/ValidateCompositeView.scala index 5c80cfad26..4372c675d4 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/ValidateCompositeView.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/ValidateCompositeView.scala @@ -2,7 +2,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews import cats.effect.IO import cats.syntax.all._ -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.client.DeltaClient import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.projectionIndex import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection.{ElasticSearchProjection, SparqlProjection} @@ -20,6 +19,8 @@ import ch.epfl.bluebrain.nexus.delta.sdk.projects.Projects import java.util.UUID +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toCatsIOOps + /** * Validate an [[CompositeViewValue]] during command evaluation */ @@ -52,17 +53,16 @@ object ValidateCompositeView { IO.raiseWhen(!perms.contains(permission))(PermissionIsNotDefined(permission)) } - def validateIndex(es: ElasticSearchProjection, index: IndexLabel) = toCatsIO { + def validateIndex(es: ElasticSearchProjection, index: IndexLabel) = client .createIndex(index, Some(es.mapping), es.settings) - .mapError { + .adaptError { case err: HttpClientStatusError => InvalidElasticSearchProjectionPayload(err.jsonBody) - case err => WrappedElasticSearchClientError(err) + case err: HttpClientError => WrappedElasticSearchClientError(err) } .void - } - val checkRemoteEvent: RemoteProjectSource => IO[Unit] = deltaClient.checkElems + val checkRemoteEvent: RemoteProjectSource => IO[Unit] = deltaClient.checkElems(_).toCatsIO val validateSource: CompositeViewSource => IO[Unit] = { case _: ProjectSource => IO.unit diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeSpaces.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeSpaces.scala index 4d2ed1dbcc..90b82a4269 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeSpaces.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeSpaces.scala @@ -47,7 +47,7 @@ object CompositeSpaces { val result = view.value.projections.foldLeft[IO[Unit]](createCommon) { case (acc, e: ElasticSearchProjection) => val index = projectionIndex(e, view.uuid, prefix) - acc >> esClient.createIndex(index, Some(e.mapping), e.settings).toCatsIO.void + acc >> esClient.createIndex(index, Some(e.mapping), e.settings).void case (acc, s: SparqlProjection) => val namespace = projectionNamespace(s, view.uuid, prefix) acc >> blazeClient.createNamespace(namespace).toCatsIO.void @@ -69,7 +69,7 @@ object CompositeSpaces { projection match { case e: ElasticSearchProjection => val index = projectionIndex(e, view.uuid, prefix) - esClient.deleteIndex(index).toCatsIO.void + esClient.deleteIndex(index).void case s: SparqlProjection => val namespace = projectionNamespace(s, view.uuid, prefix) blazeClient.deleteNamespace(namespace).toCatsIO.void diff --git a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/ElasticSearchQuerySpec.scala b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/ElasticSearchQuerySpec.scala index e817b1c8ef..7f2f2abf62 100644 --- a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/ElasticSearchQuerySpec.scala +++ b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/ElasticSearchQuerySpec.scala @@ -3,6 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews import akka.http.scaladsl.model.HttpRequest import akka.http.scaladsl.model.Uri.Query import cats.data.NonEmptyList +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.permissions import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef.ActiveViewDef import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.projectionIndex @@ -19,7 +20,6 @@ import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclSimpleCheck import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress import ch.epfl.bluebrain.nexus.delta.sdk.error.ServiceError.AuthorizationFailed import ch.epfl.bluebrain.nexus.delta.sdk.generators.ProjectGen -import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClient.HttpResult import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientError.HttpUnexpectedError import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller import ch.epfl.bluebrain.nexus.delta.sdk.implicits._ @@ -32,7 +32,6 @@ import ch.epfl.bluebrain.nexus.testkit._ import ch.epfl.bluebrain.nexus.testkit.scalatest.ce.CatsEffectSpec import io.circe.syntax._ import io.circe.{Json, JsonObject} -import monix.bio.IO import org.scalatest.CancelAfterFailure import java.util.UUID @@ -126,7 +125,7 @@ class ElasticSearchQuerySpec extends CatsEffectSpec with CirceLiteral with Cance q: JsonObject, indices: Set[String], qp: Query - ): HttpResult[Json] = + ): IO[Json] = if (q == query) IO.pure(Json.arr(indices.foldLeft(Seq.empty[Json])((acc, idx) => acc :+ indexResults(idx).asJson): _*)) else IO.raiseError(HttpUnexpectedError(HttpRequest(), "")) diff --git a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeIndexingSuite.scala b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeIndexingSuite.scala index 4f94a0cdf7..a788d7543d 100644 --- a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeIndexingSuite.scala +++ b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeIndexingSuite.scala @@ -87,7 +87,11 @@ trait CompositeIndexingFixture extends CatsEffectSuite with BioRunContext { ) private def resource(sinkConfig: SinkConfig): Resource[IO, Setup] = { - (Doobie.resource(), ElasticSearchClientSetup.resource(), BlazegraphClientSetup.resource()) + ( + Doobie.resource().mapK(taskToIoK), + ElasticSearchClientSetup.resource(), + BlazegraphClientSetup.resource().mapK(taskToIoK) + ) .parMapN { case (xas, esClient, bgClient) => val compositeRestartStore = new CompositeRestartStore(xas) val projections = @@ -96,7 +100,6 @@ trait CompositeIndexingFixture extends CatsEffectSuite with BioRunContext { val sinks = CompositeSinks(prefix, esClient, bgClient, compositeConfig.copy(sinkConfig = sinkConfig)) Setup(esClient, bgClient, projections, spaces, sinks) } - .mapK(taskToIoK) } def suiteLocalFixture(name: String, sinkConfig: SinkConfig): ResourceFixture.IOFixture[Setup] = @@ -336,12 +339,12 @@ abstract class CompositeIndexingSuite(sinkConfig: SinkConfig, query: SparqlConst _ <- spaces.init(view) _ <- bgClient.existsNamespace(commonNs).toCatsIO.assertEquals(true) _ <- bgClient.existsNamespace(sparqlNamespace).toCatsIO.assertEquals(true) - _ <- esClient.existsIndex(elasticIndex).toCatsIO.assertEquals(true) + _ <- esClient.existsIndex(elasticIndex).assertEquals(true) // Delete them on destroy _ <- spaces.destroyAll(view) _ <- bgClient.existsNamespace(commonNs).toCatsIO.assertEquals(false) _ <- bgClient.existsNamespace(sparqlNamespace).toCatsIO.assertEquals(false) - _ <- esClient.existsIndex(elasticIndex).toCatsIO.assertEquals(false) + _ <- esClient.existsIndex(elasticIndex).assertEquals(false) } yield () } @@ -361,17 +364,17 @@ abstract class CompositeIndexingSuite(sinkConfig: SinkConfig, query: SparqlConst _ <- spaces.init(view) _ <- bgClient.existsNamespace(commonNs).toCatsIO.assertEquals(true) _ <- bgClient.existsNamespace(sparqlNamespace).toCatsIO.assertEquals(true) - _ <- esClient.existsIndex(elasticIndex).toCatsIO.assertEquals(true) + _ <- esClient.existsIndex(elasticIndex).assertEquals(true) // Delete the blazegraph projection _ <- spaces.destroyProjection(view, blazegraphProjection) _ <- bgClient.existsNamespace(commonNs).toCatsIO.assertEquals(true) _ <- bgClient.existsNamespace(sparqlNamespace).toCatsIO.assertEquals(false) - _ <- esClient.existsIndex(elasticIndex).toCatsIO.assertEquals(true) + _ <- esClient.existsIndex(elasticIndex).assertEquals(true) // Delete the elasticsearch projection _ <- spaces.destroyProjection(view, elasticSearchProjection) _ <- bgClient.existsNamespace(commonNs).toCatsIO.assertEquals(true) _ <- bgClient.existsNamespace(sparqlNamespace).toCatsIO.assertEquals(false) - _ <- esClient.existsIndex(elasticIndex).toCatsIO.assertEquals(false) + _ <- esClient.existsIndex(elasticIndex).assertEquals(false) } yield () } @@ -574,14 +577,13 @@ abstract class CompositeIndexingSuite(sinkConfig: SinkConfig, query: SparqlConst private def checkElasticSearchDocuments(index: IndexLabel, expected: Json*): IO[Unit] = { val page = FromPagination(0, 5000) for { - _ <- esClient.refresh(index).toCatsIO + _ <- esClient.refresh(index) results <- esClient .search( QueryBuilder.empty.withSort(SortList(List(Sort("@id")))).withPage(page), Set(index.value), Query.Empty ) - .toCatsIO _ = assertEquals(results.sources.size, expected.size) _ = results.sources.zip(expected).foreach { case (obtained, expected) => obtained.asJson.equalsIgnoreArrayOrder(expected) diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchServiceDependency.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchServiceDependency.scala index b43b68de04..6deb870dae 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchServiceDependency.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchServiceDependency.scala @@ -4,7 +4,6 @@ import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient import ch.epfl.bluebrain.nexus.delta.sdk.ServiceDependency import ch.epfl.bluebrain.nexus.delta.sdk.model.ComponentDescription.ServiceDescription -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ /** * Describes the remote storage [[ServiceDependency]] providing a way to extract the [[ServiceDescription]] from a diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchViewsQuery.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchViewsQuery.scala index b40d47c660..fe7c32ace4 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchViewsQuery.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchViewsQuery.scala @@ -94,7 +94,9 @@ final class ElasticSearchViewsQueryImpl private[elasticsearch] ( for { view <- viewStore.fetch(id, project).toCatsIO indices <- extractIndices(view) - search <- client.search(query, indices, qp)(SortList.empty).mapError(WrappedElasticSearchClientError) + search <- client.search(query, indices, qp)(SortList.empty).adaptError { case e: HttpClientError => + WrappedElasticSearchClientError(e) + } } yield search } @@ -119,7 +121,7 @@ final class ElasticSearchViewsQueryImpl private[elasticsearch] ( _ <- aclCheck.authorizeForOr(project, permissions.write)(AuthorizationFailed(project, permissions.write)) view <- viewStore.fetch(id, project).toCatsIO idx <- indexOrError(view, id) - search <- client.mapping(IndexLabel.unsafe(idx)).toCatsIO.adaptError { case e: HttpClientError => + search <- client.mapping(IndexLabel.unsafe(idx)).adaptError { case e: HttpClientError => WrappedElasticSearchClientError(e) } } yield search diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/EventMetricsProjection.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/EventMetricsProjection.scala index 059e84c0c0..eca60fdbe6 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/EventMetricsProjection.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/EventMetricsProjection.scala @@ -2,7 +2,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch import cats.data.NonEmptyChain import cats.effect.{ContextShift, IO, Timer} -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchClient, IndexLabel} import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.ElasticSearchSink diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ValidateElasticSearchView.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ValidateElasticSearchView.scala index e64e417c69..b2a8a5a17e 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ValidateElasticSearchView.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ValidateElasticSearchView.scala @@ -2,12 +2,10 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch import cats.effect.IO import cats.syntax.all._ -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchClient, IndexLabel} import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewRejection.{InvalidElasticSearchIndexPayload, InvalidPipeline, InvalidViewReferences, PermissionIsNotDefined, TooManyViewReferences, WrappedElasticSearchClientError} import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewValue.{AggregateElasticSearchViewValue, IndexingElasticSearchViewValue} import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{DefaultMapping, DefaultSettings, ElasticSearchViewValue} -import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClient.HttpResult import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientError import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientError.HttpClientStatusError import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions @@ -55,7 +53,7 @@ object ValidateElasticSearchView { def apply( validatePipeChain: PipeChain => Either[ProjectionErr, Unit], fetchPermissionSet: IO[Set[Permission]], - createIndex: (IndexLabel, Option[JsonObject], Option[JsonObject]) => HttpResult[Unit], + createIndex: (IndexLabel, Option[JsonObject], Option[JsonObject]) => IO[Unit], prefix: String, maxViewRefs: Int, xas: Transactors, @@ -81,7 +79,7 @@ object ValidateElasticSearchView { IndexLabel.fromView(prefix, uuid, indexingRev), value.mapping.orElse(Some(defaultMapping.value)), value.settings.orElse(Some(defaultSettings.value)) - ).toCatsIO + ) .adaptError { case err: HttpClientStatusError => InvalidElasticSearchIndexPayload(err.jsonBody) case err: HttpClientError => WrappedElasticSearchClientError(err) diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/client/ElasticSearchClient.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/client/ElasticSearchClient.scala index 94f72dffbb..0c3a582962 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/client/ElasticSearchClient.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/client/ElasticSearchClient.scala @@ -6,16 +6,17 @@ import akka.http.scaladsl.model.StatusCodes.{BadRequest, Created, NotFound, OK} import akka.http.scaladsl.model.Uri.Query import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.BasicHttpCredentials +import cats.effect.IO import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.RetryStrategy import ch.epfl.bluebrain.nexus.delta.kernel.RetryStrategy.logError +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination import ch.epfl.bluebrain.nexus.delta.kernel.utils.UrlUtils import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.BulkResponse.MixedOutcomes.Outcome import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient._ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{EmptyResults, ResourcesSearchParams} import ch.epfl.bluebrain.nexus.delta.sdk.circe.CirceMarshalling._ -import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClient.HttpResult import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientError.{HttpClientStatusError, HttpUnexpectedError} import ch.epfl.bluebrain.nexus.delta.sdk.http.{HttpClient, HttpClientError} import ch.epfl.bluebrain.nexus.delta.sdk.model.ComponentDescription.ServiceDescription @@ -28,7 +29,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ import com.typesafe.scalalogging.Logger import io.circe._ import io.circe.syntax._ -import monix.bio.{IO, UIO} +import monix.bio.{IO => BIO} import scala.concurrent.duration._ import scala.reflect.ClassTag @@ -82,7 +83,7 @@ class ElasticSearchClient(client: HttpClient, endpoint: Uri, maxIndexPathLength: /** * Fetches the service description information (name and version) */ - def serviceDescription: UIO[ServiceDescription] = + def serviceDescription: IO[ServiceDescription] = client .fromJsonTo[ResolvedServiceDescription](Get(endpoint).withHttpCredentials) .timeout(3.seconds) @@ -99,10 +100,10 @@ class ElasticSearchClient(client: HttpClient, endpoint: Uri, maxIndexPathLength: * @return * ''true'' when the index exists and ''false'' when it doesn't, wrapped in an IO */ - def existsIndex(index: IndexLabel): HttpResult[Boolean] = - client(Head(endpoint / index.value).withHttpCredentials) { - case resp if resp.status == OK => IO.pure(true) - case resp if resp.status == NotFound => IO.pure(false) + def existsIndex(index: IndexLabel): IO[Boolean] = + client.run(Head(endpoint / index.value).withHttpCredentials) { + case resp if resp.status == OK => BIO.pure(true) + case resp if resp.status == NotFound => BIO.pure(false) } /** @@ -115,11 +116,11 @@ class ElasticSearchClient(client: HttpClient, endpoint: Uri, maxIndexPathLength: * @return * ''true'' when the index has been created and ''false'' when it already existed, wrapped in an IO */ - def createIndex(index: IndexLabel, payload: JsonObject = JsonObject.empty): HttpResult[Boolean] = { + def createIndex(index: IndexLabel, payload: JsonObject = JsonObject.empty): IO[Boolean] = { existsIndex(index).flatMap { case false => - client(Put(endpoint / index.value, payload).withHttpCredentials) { - case resp if resp.status.isSuccess() => discardEntity(resp) >> IO.pure(true) + client.run(Put(endpoint / index.value, payload).withHttpCredentials) { + case resp if resp.status.isSuccess() => discardEntity(resp).as(true) } case true => IO.pure(false) @@ -138,7 +139,7 @@ class ElasticSearchClient(client: HttpClient, endpoint: Uri, maxIndexPathLength: * @return * ''true'' when the index has been created and ''false'' when it already existed, wrapped in an IO */ - def createIndex(index: IndexLabel, mappings: Option[JsonObject], settings: Option[JsonObject]): HttpResult[Boolean] = + def createIndex(index: IndexLabel, mappings: Option[JsonObject], settings: Option[JsonObject]): IO[Boolean] = createIndex(index, JsonObject.empty.addIfExists("mappings", mappings).addIfExists("settings", settings)) /** @@ -151,10 +152,9 @@ class ElasticSearchClient(client: HttpClient, endpoint: Uri, maxIndexPathLength: * @return * ''true'' when the index template has been created */ - def createIndexTemplate(name: String, template: JsonObject): HttpResult[Boolean] = - client(Put(endpoint / indexTemplate / name, template).withHttpCredentials) { - case resp if resp.status.isSuccess() => - discardEntity(resp) >> IO.pure(true) + def createIndexTemplate(name: String, template: JsonObject): IO[Boolean] = + client.run(Put(endpoint / indexTemplate / name, template).withHttpCredentials) { + case resp if resp.status.isSuccess() => discardEntity(resp).as(true) } /** @@ -165,10 +165,10 @@ class ElasticSearchClient(client: HttpClient, endpoint: Uri, maxIndexPathLength: * @return * ''true'' when the index has been deleted and ''false'' when it didn't exist, wrapped in an IO */ - def deleteIndex(index: IndexLabel): HttpResult[Boolean] = - client(Delete(endpoint / index.value).withHttpCredentials) { - case resp if resp.status == OK => discardEntity(resp) >> IO.pure(true) - case resp if resp.status == NotFound => discardEntity(resp) >> IO.pure(false) + def deleteIndex(index: IndexLabel): IO[Boolean] = + client.run(Delete(endpoint / index.value).withHttpCredentials) { + case resp if resp.status == OK => discardEntity(resp).as(true) + case resp if resp.status == NotFound => discardEntity(resp).as(false) } /** @@ -185,8 +185,8 @@ class ElasticSearchClient(client: HttpClient, endpoint: Uri, maxIndexPathLength: index: IndexLabel, id: String, payload: JsonObject - ): HttpResult[Unit] = - client(Put(endpoint / index.value / docPath / UrlUtils.encode(id), payload).withHttpCredentials) { + ): IO[Unit] = + client.run(Put(endpoint / index.value / docPath / UrlUtils.encode(id), payload).withHttpCredentials) { case resp if resp.status == Created || resp.status == OK => discardEntity(resp) } @@ -200,10 +200,10 @@ class ElasticSearchClient(client: HttpClient, endpoint: Uri, maxIndexPathLength: * @return * ''true'' when the document has been deleted and ''false'' when it didn't exist, wrapped in an IO */ - def delete(index: IndexLabel, id: String): HttpResult[Boolean] = - client(Delete(endpoint / index.value / docPath / UrlUtils.encode(id)).withHttpCredentials) { - case resp if resp.status == OK => discardEntity(resp) >> IO.pure(true) - case resp if resp.status == NotFound => discardEntity(resp) >> IO.pure(false) + def delete(index: IndexLabel, id: String): IO[Boolean] = + client.run(Delete(endpoint / index.value / docPath / UrlUtils.encode(id)).withHttpCredentials) { + case resp if resp.status == OK => discardEntity(resp).as(true) + case resp if resp.status == NotFound => discardEntity(resp).as(false) } /** @@ -214,7 +214,7 @@ class ElasticSearchClient(client: HttpClient, endpoint: Uri, maxIndexPathLength: * @param refresh * the value for the `refresh` Elasticsearch parameter */ - def bulk(ops: Seq[ElasticSearchBulk], refresh: Refresh = Refresh.False): HttpResult[BulkResponse] = { + def bulk(ops: Seq[ElasticSearchBulk], refresh: Refresh = Refresh.False): IO[BulkResponse] = { if (ops.isEmpty) IO.pure(BulkResponse.Success) else { val bulkEndpoint = (endpoint / bulkPath).withQuery(Query(refreshParam -> refresh.value)) @@ -223,22 +223,25 @@ class ElasticSearchClient(client: HttpClient, endpoint: Uri, maxIndexPathLength: client .toJson(req) - .redeemWith( - err => IO.fromEither(BulkResponse(err)).mapError(_ => err), - json => IO.fromEither(BulkResponse(json)).mapError { err => HttpUnexpectedError(req, err.getMessage()) } - ) + .toCatsIO + .attemptNarrow[HttpClientError] + .flatMap { + case Left(e) => IO.fromEither(BulkResponse(e)).orRaise(e) + case Right(json) => + IO.fromEither(BulkResponse(json)).adaptError { e => HttpUnexpectedError(req, e.getMessage) } + } } } /** * Creates a script on Elasticsearch with the passed ''id'' and ''content'' */ - def createScript(id: String, content: String): HttpResult[Unit] = { + def createScript(id: String, content: String): IO[Unit] = { val payload = Json.obj("script" -> Json.obj("lang" -> "painless".asJson, "source" -> content.asJson)) val req = Put(endpoint / scriptPath / UrlUtils.encode(id), payload).withHttpCredentials - client.toJson(req).flatMap { json => - IO.unless(json.hcursor.get[Boolean]("acknowledged").contains(true))( - IO.raiseError(HttpClientStatusError(req, BadRequest, json.noSpaces)) + client.toJson(req).toCatsIO.flatMap { json => + IO.raiseWhen(!json.hcursor.get[Boolean]("acknowledged").contains(true))( + HttpClientStatusError(req, BadRequest, json.noSpaces) ) } } @@ -252,15 +255,14 @@ class ElasticSearchClient(client: HttpClient, endpoint: Uri, maxIndexPathLength: * @param indices * the indices targeted by the update query */ - def updateByQuery(query: JsonObject, indices: Set[String]): HttpResult[Unit] = { + def updateByQuery(query: JsonObject, indices: Set[String]): IO[Unit] = { val (indexPath, q) = indexPathAndQuery(indices, QueryBuilder(query)) val updateEndpoint = (endpoint / indexPath / updateByQueryPath).withQuery(Uri.Query(defaultUpdateByQuery)) val req = Post(updateEndpoint, q.build).withHttpCredentials for { - json <- client.toJson(req) - taskId <- IO.fromEither( - json.hcursor.get[String]("task").leftMap(_ => HttpClientStatusError(req, BadRequest, json.noSpaces)) - ) + json <- client.toJson(req).toCatsIO + taskId <- + IO.fromEither(json.hcursor.get[String]("task")).orRaise(HttpClientStatusError(req, BadRequest, json.noSpaces)) taskReq = Get((endpoint / tasksPath / taskId).withQuery(Query(waitForCompletion -> "true"))).withHttpCredentials _ <- client.toJson(taskReq).retry(updateByQueryStrategy) } yield () @@ -275,7 +277,7 @@ class ElasticSearchClient(client: HttpClient, endpoint: Uri, maxIndexPathLength: * @param index * the index targeted by the delete query */ - def deleteByQuery(query: JsonObject, index: IndexLabel): HttpResult[Unit] = { + def deleteByQuery(query: JsonObject, index: IndexLabel): IO[Unit] = { val deleteEndpoint = (endpoint / index.value / deleteByQueryPath).withQuery(Uri.Query(defaultDeleteByQuery)) val req = Post(deleteEndpoint, query).withHttpCredentials client.toJson(req).void @@ -288,7 +290,7 @@ class ElasticSearchClient(client: HttpClient, endpoint: Uri, maxIndexPathLength: * @param id * the identifier of the document */ - def getSource[R: Decoder: ClassTag](index: IndexLabel, id: String): HttpResult[R] = { + def getSource[R: Decoder: ClassTag](index: IndexLabel, id: String): IO[R] = { val sourceEndpoint = endpoint / index.value / source / id val req = Get(sourceEndpoint).withHttpCredentials client.fromJsonTo[R](req) @@ -303,30 +305,28 @@ class ElasticSearchClient(client: HttpClient, endpoint: Uri, maxIndexPathLength: * @param field * the field to extract */ - def multiGet[R: Decoder](index: IndexLabel, ids: Set[String], field: String): HttpResult[Map[String, Option[R]]] = { - if (ids.isEmpty) UIO.pure(Map.empty) + def multiGet[R: Decoder](index: IndexLabel, ids: Set[String], field: String): IO[Map[String, Option[R]]] = { + if (ids.isEmpty) IO.pure(Map.empty) else { val multiGetEndpoint = (endpoint / index.value / mget).withQuery(Uri.Query(Map("_source" -> field))) val req = Get(multiGetEndpoint, Json.obj("ids" -> ids.asJson)).withHttpCredentials client .toJson(req) + .toCatsIO .flatMap { json => - IO.fromOption( - json.hcursor.downField("docs").focus.flatMap(_.asArray).map { - _.mapFilter { r => - if (r.hcursor.get[Boolean]("found").contains(true)) - r.hcursor - .get[String]("_id") - .map { id => - id -> r.hcursor.downField("_source").get[R](field).toOption - } - .toOption - else None - }.toMap - }, - HttpClientStatusError(req, BadRequest, json.noSpaces) - ) + IO.fromOption(json.hcursor.downField("docs").focus.flatMap(_.asArray).map { + _.mapFilter { r => + if (r.hcursor.get[Boolean]("found").contains(true)) + r.hcursor + .get[String]("_id") + .map { id => + id -> r.hcursor.downField("_source").get[R](field).toOption + } + .toOption + else None + }.toMap + })(HttpClientStatusError(req, BadRequest, json.noSpaces)) } } } @@ -336,12 +336,11 @@ class ElasticSearchClient(client: HttpClient, endpoint: Uri, maxIndexPathLength: * @param index * the index to use */ - def count(index: String): HttpResult[Long] = { + def count(index: String): IO[Long] = { val req = Get(endpoint / index / countPath).withHttpCredentials - client.toJson(req).flatMap { json => + client.toJson(req).toCatsIO.flatMap { json => val count = json.hcursor.downField("count").focus.flatMap(_.asNumber.flatMap(_.toLong)) - IO.fromOption( - count, + IO.fromOption(count)( HttpClientStatusError(req, BadRequest, json.noSpaces) ) } @@ -364,7 +363,7 @@ class ElasticSearchClient(client: HttpClient, endpoint: Uri, maxIndexPathLength: def search(params: ResourcesSearchParams, indices: Set[String], qp: Query)( page: Pagination, sort: SortList - )(implicit base: BaseUri): HttpResult[SearchResults[JsonObject]] = + )(implicit base: BaseUri): IO[SearchResults[JsonObject]] = search( QueryBuilder(params).withPage(page).withTotalHits(true).withSort(sort), indices, @@ -392,7 +391,7 @@ class ElasticSearchClient(client: HttpClient, endpoint: Uri, maxIndexPathLength: )( page: Pagination, sort: SortList - )(implicit base: BaseUri): HttpResult[SearchResults[JsonObject]] = + )(implicit base: BaseUri): IO[SearchResults[JsonObject]] = search( QueryBuilder(params).withPage(page).withTotalHits(true).withSort(sort), Set(index), @@ -413,7 +412,7 @@ class ElasticSearchClient(client: HttpClient, endpoint: Uri, maxIndexPathLength: query: QueryBuilder, indices: Set[String], qp: Query - ): HttpResult[SearchResults[JsonObject]] = + ): IO[SearchResults[JsonObject]] = searchAs[SearchResults[JsonObject]](query, indices, qp)(SearchResults.empty) /** @@ -434,7 +433,7 @@ class ElasticSearchClient(client: HttpClient, endpoint: Uri, maxIndexPathLength: qp: Query )( sort: SortList = SortList.empty - ): HttpResult[Json] = + ): IO[Json] = if (indices.isEmpty) IO.pure(esEmptyResults.value) else { val (indexPath, q) = indexPathAndQuery(indices, QueryBuilder(query)) @@ -457,7 +456,7 @@ class ElasticSearchClient(client: HttpClient, endpoint: Uri, maxIndexPathLength: query: QueryBuilder, indices: Set[String], qp: Query - )(onEmpty: => T): HttpResult[T] = + )(onEmpty: => T): IO[T] = if (indices.isEmpty) IO.pure(onEmpty) else { @@ -480,7 +479,7 @@ class ElasticSearchClient(client: HttpClient, endpoint: Uri, maxIndexPathLength: query: QueryBuilder, index: String, qp: Query - ): HttpResult[T] = { + ): IO[T] = { val searchEndpoint = (endpoint / index / searchPath).withQuery(Uri.Query(defaultQuery ++ qp.toMap)) client.fromJsonTo[T](Post(searchEndpoint, query.build).withHttpCredentials) } @@ -501,7 +500,7 @@ class ElasticSearchClient(client: HttpClient, endpoint: Uri, maxIndexPathLength: */ def aggregate(params: ResourcesSearchParams, indices: Set[String], qp: Query, bucketSize: Int)(implicit base: BaseUri - ): HttpResult[AggregationResult] = { + ): IO[AggregationResult] = { val query = QueryBuilder(params).aggregation(bucketSize) val (indexPath, q) = indexPathAndQuery(indices, query) val searchEndpoint = (endpoint / indexPath / searchPath).withQuery(Uri.Query(defaultQuery ++ qp.toMap)) @@ -511,20 +510,19 @@ class ElasticSearchClient(client: HttpClient, endpoint: Uri, maxIndexPathLength: /** * Refresh the given index */ - def refresh(index: IndexLabel): HttpResult[Boolean] = - client(Post(endpoint / index.value / refreshPath).withHttpCredentials) { - case resp if resp.status.isSuccess() => - discardEntity(resp) >> IO.pure(true) + def refresh(index: IndexLabel): IO[Boolean] = + client.run(Post(endpoint / index.value / refreshPath).withHttpCredentials) { + case resp if resp.status.isSuccess() => discardEntity(resp).as(true) } /** * Obtain the mapping of the given index */ - def mapping(index: IndexLabel): HttpResult[Json] = + def mapping(index: IndexLabel): IO[Json] = client.toJson(Get(endpoint / index.value / mapping).withHttpCredentials) private def discardEntity(resp: HttpResponse) = - UIO.delay(resp.discardEntityBytes()) >> IO.unit + IO.delay(resp.discardEntityBytes()) >> IO.unit private def indexPathAndQuery(indices: Set[String], query: QueryBuilder): (String, QueryBuilder) = { val indexPath = indices.mkString(",") diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/deletion/EventMetricsDeletionTask.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/deletion/EventMetricsDeletionTask.scala index f970ceb0be..358249f6f0 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/deletion/EventMetricsDeletionTask.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/deletion/EventMetricsDeletionTask.scala @@ -1,7 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.deletion import cats.effect.IO -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.EventMetricsProjection import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient import ch.epfl.bluebrain.nexus.delta.sdk.deletion.ProjectDeletionTask @@ -24,10 +23,7 @@ final class EventMetricsDeletionTask(client: ElasticSearchClient, prefix: String searchByProject(project).flatMap { search => client .deleteByQuery(search, index) - .toCatsIO - .as( - ProjectDeletionReport.Stage("event-metrics", "Event metrics have been successfully deleted.") - ) + .as(ProjectDeletionReport.Stage("event-metrics", "Event metrics have been successfully deleted.")) } private[deletion] def searchByProject(project: ProjectRef) = IO.fromEither { diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchCoordinator.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchCoordinator.scala index 9853c81b66..f6d5c947d3 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchCoordinator.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchCoordinator.scala @@ -5,7 +5,6 @@ import cats.effect.{ContextShift, IO, Timer} import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.kernel.cache.LocalCache -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchViews import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh @@ -153,7 +152,6 @@ object ElasticSearchCoordinator { (v: ActiveViewDef) => client .createIndex(v.index, Some(v.mapping), Some(v.settings)) - .toCatsIO .onError { e => logger.error(e)(s"Index for view '${v.ref.project}/${v.ref.viewId}' could not be created.") } diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSink.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSink.scala index 74c1ab5095..003b20983b 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSink.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSink.scala @@ -1,7 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing import cats.effect.IO -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent import ch.epfl.bluebrain.nexus.delta.kernel.syntax.kamonSyntax import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchViews @@ -64,7 +63,6 @@ final class ElasticSearchSink private ( if (bulk.nonEmpty) { client .bulk(bulk, refresh) - .toCatsIO .map(ElasticSearchSink.markElems(_, elements, documentId)) } else { IO.pure(elements.map(_.void)) diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/query/DefaultViewsQuery.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/query/DefaultViewsQuery.scala index 3bed89891b..348cb28906 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/query/DefaultViewsQuery.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/query/DefaultViewsQuery.scala @@ -2,6 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.query import akka.http.scaladsl.model.Uri import cats.effect.IO +import cats.implicits.catsSyntaxMonadError import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.config.ElasticSearchViewsConfig @@ -10,6 +11,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.query.ElasticSearchQu import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress.{Project => ProjectAcl} import ch.epfl.bluebrain.nexus.delta.sdk.error.ServiceError.AuthorizationFailed +import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientError import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri import ch.epfl.bluebrain.nexus.delta.sdk.model.search.{AggregationResult, SearchResults} @@ -51,11 +53,11 @@ object DefaultViewsQuery { (request: DefaultSearchRequest, views: Set[IndexingView]) => client .search(request.params, views.map(_.index), Uri.Query.Empty)(request.pagination, request.sort) - .mapError(ElasticSearchClientError), + .adaptError { case e: HttpClientError => ElasticSearchClientError(e) }, (request: DefaultSearchRequest, views: Set[IndexingView]) => client .aggregate(request.params, views.map(_.index), Uri.Query.Empty, config.listingBucketSize) - .mapError(ElasticSearchClientError) + .adaptError { case e: HttpClientError => ElasticSearchClientError(e) } ) } diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchClientSetup.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchClientSetup.scala index 54ccb1f5d2..0bf7a2fd7d 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchClientSetup.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchClientSetup.scala @@ -2,16 +2,16 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch import akka.actor.ActorSystem import akka.http.scaladsl.model.headers.BasicHttpCredentials -import cats.effect.Resource +import cats.effect.{IO, Resource} +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.taskToIoK import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientSetup import ch.epfl.bluebrain.nexus.testkit.CirceLiteral import ch.epfl.bluebrain.nexus.testkit.bio.BioRunContext import ch.epfl.bluebrain.nexus.testkit.ce.CatsRunContext import ch.epfl.bluebrain.nexus.testkit.elasticsearch.ElasticSearchContainer -import ch.epfl.bluebrain.nexus.testkit.mu.bio.ResourceFixture -import ch.epfl.bluebrain.nexus.testkit.mu.bio.ResourceFixture.TaskFixture -import monix.bio.Task +import ch.epfl.bluebrain.nexus.testkit.mu.ce.ResourceFixture +import ch.epfl.bluebrain.nexus.testkit.mu.ce.ResourceFixture.IOFixture import monix.execution.Scheduler import munit.Suite @@ -29,8 +29,8 @@ object ElasticSearchClientSetup extends CirceLiteral with CatsRunContext with Fi } }""" - def resource()(implicit s: Scheduler): Resource[Task, ElasticSearchClient] = { - for { + def resource()(implicit s: Scheduler): Resource[IO, ElasticSearchClient] = { + (for { (httpClient, actorSystem) <- HttpClientSetup(compression = true) container <- ElasticSearchContainer.resource() } yield { @@ -42,16 +42,16 @@ object ElasticSearchClientSetup extends CirceLiteral with CatsRunContext with Fi 2000, emptyResults ) - } + }).mapK(taskToIoK) }.evalTap { client => client.createIndexTemplate("test_template", template) } - def suiteLocalFixture(name: String)(implicit s: Scheduler): TaskFixture[ElasticSearchClient] = + def suiteLocalFixture(name: String)(implicit s: Scheduler): IOFixture[ElasticSearchClient] = ResourceFixture.suiteLocal(name, resource()) trait Fixture { self: Suite with BioRunContext => - val esClient: ResourceFixture.TaskFixture[ElasticSearchClient] = + val esClient: IOFixture[ElasticSearchClient] = ElasticSearchClientSetup.suiteLocalFixture("esclient") } diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchViewsQuerySuite.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchViewsQuerySuite.scala index 4dc416d0c0..c0dd1e9981 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchViewsQuerySuite.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchViewsQuerySuite.scala @@ -276,9 +276,9 @@ class ElasticSearchViewsQuerySuite ElasticSearchBulk.Index(view.index, genString(), d) } } - _ <- client.bulk(bulk).toCatsIO + _ <- client.bulk(bulk) // We refresh explicitly - _ <- client.refresh(view.index).toCatsIO + _ <- client.refresh(view.index) } yield () }.void diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchViewsSpec.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchViewsSpec.scala index bf1f44ed75..24b0281ebc 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchViewsSpec.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchViewsSpec.scala @@ -29,7 +29,6 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes.{FilterBySchema, Filt import ch.epfl.bluebrain.nexus.testkit.scalatest.ce.CatsEffectSpec import io.circe.Json import io.circe.literal._ -import monix.bio.UIO import java.time.Instant import java.util.UUID @@ -136,7 +135,7 @@ class ElasticSearchViewsSpec extends CatsEffectSpec with DoobieScalaTestFixture ValidateElasticSearchView( PipeChain.validate(_, registry), IO.pure(Set(queryPermissions)), - (_, _, _) => UIO.unit, + (_, _, _) => IO.unit, "prefix", 2, xas, diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ScalaTestElasticSearchClientSetup.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ScalaTestElasticSearchClientSetup.scala index 43faf3d9b0..175f894c17 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ScalaTestElasticSearchClientSetup.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ScalaTestElasticSearchClientSetup.scala @@ -33,7 +33,7 @@ trait ScalaTestElasticSearchClientSetup extends CirceLiteral with Fixtures { sel lazy val esClient = { val c = new ElasticSearchClient(HttpClient(), docker.esHostConfig.endpoint, 2000, emptyResults) - c.createIndexTemplate("test_template", template).runSyncUnsafe() + c.createIndexTemplate("test_template", template).unsafeRunSync() c } diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/client/ElasticSearchClientSpec.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/client/ElasticSearchClientSpec.scala index 2b4ec4991b..86a25a0368 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/client/ElasticSearchClientSpec.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/client/ElasticSearchClientSpec.scala @@ -4,42 +4,36 @@ import akka.actor.ActorSystem import akka.http.scaladsl.model.StatusCodes import akka.http.scaladsl.model.Uri.Query import akka.testkit.TestKit +import cats.implicits.catsSyntaxApplicativeError +import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination.FromPagination import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ScalaTestElasticSearchClientSetup import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.BulkResponse.MixedOutcomes.Outcome import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.{BulkResponse, Refresh} import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ResourcesSearchParams +import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientError import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientError.HttpClientStatusError import ch.epfl.bluebrain.nexus.delta.sdk.model.ComponentDescription.ServiceDescription -import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, Name} -import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination.FromPagination import ch.epfl.bluebrain.nexus.delta.sdk.model.search.ResultEntry.ScoredResultEntry import ch.epfl.bluebrain.nexus.delta.sdk.model.search.SearchResults.ScoredSearchResults import ch.epfl.bluebrain.nexus.delta.sdk.model.search.{AggregationResult, SearchResults, Sort, SortList} +import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, Name} import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label +import ch.epfl.bluebrain.nexus.testkit.CirceLiteral import ch.epfl.bluebrain.nexus.testkit.elasticsearch.ElasticSearchDocker -import ch.epfl.bluebrain.nexus.testkit.scalatest.EitherValues -import ch.epfl.bluebrain.nexus.testkit.scalatest.bio.BIOValues -import ch.epfl.bluebrain.nexus.testkit.{CirceLiteral, TestHelpers} +import ch.epfl.bluebrain.nexus.testkit.scalatest.ce.CatsEffectSpec import io.circe.{Json, JsonObject} -import org.scalatest.{DoNotDiscover, OptionValues} +import org.scalatest.DoNotDiscover import org.scalatest.concurrent.Eventually -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpecLike import scala.concurrent.duration._ @DoNotDiscover class ElasticSearchClientSpec(override val docker: ElasticSearchDocker) extends TestKit(ActorSystem("ElasticSearchClientSpec")) - with AnyWordSpecLike - with Matchers + with CatsEffectSpec with ScalaTestElasticSearchClientSetup - with EitherValues - with OptionValues with CirceLiteral - with TestHelpers - with BIOValues with Eventually { implicit override def patienceConfig: PatienceConfig = PatienceConfig(6.seconds, 100.millis) @@ -279,9 +273,9 @@ class ElasticSearchClientSpec(override val docker: ElasticSearchDocker) // Checking docs again newCount <- esClient.count(index.value) _ = newCount shouldEqual 1L - doc1 <- esClient.getSource[Json](index, "1").attempt + doc1 <- esClient.getSource[Json](index, "1").attemptNarrow[HttpClientError] _ = doc1.rightValue - doc2 <- esClient.getSource[Json](index, "2").attempt + doc2 <- esClient.getSource[Json](index, "2").attemptNarrow[HttpClientError] _ = doc2.leftValue.errorCode.value shouldEqual StatusCodes.NotFound } yield () }.accepted diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/deletion/EventMetricsDeletionTaskSuite.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/deletion/EventMetricsDeletionTaskSuite.scala index d960e80a4f..5cdcdab246 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/deletion/EventMetricsDeletionTaskSuite.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/deletion/EventMetricsDeletionTaskSuite.scala @@ -1,7 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.deletion import akka.http.scaladsl.model.Uri.Query -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchBulk, QueryBuilder} import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.{ElasticSearchClientSetup, EventMetricsProjection, Fixtures} import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Subject} @@ -50,14 +49,14 @@ class EventMetricsDeletionTaskSuite for { // Indexing and checking count - _ <- client.createIndex(index, Some(metricsMapping.value), Some(metricsSettings.value)).toCatsIO - _ <- client.bulk(operations).toCatsIO - _ <- client.refresh(index).toCatsIO - _ <- client.count(index.value).toCatsIO.assertEquals(4L) + _ <- client.createIndex(index, Some(metricsMapping.value), Some(metricsSettings.value)) + _ <- client.bulk(operations) + _ <- client.refresh(index) + _ <- client.count(index.value).assertEquals(4L) // Running the task and checking the index again _ <- task(projectToDelete) - _ <- client.refresh(index).toCatsIO - _ <- client.count(index.value).toCatsIO.assertEquals(2L) + _ <- client.refresh(index) + _ <- client.count(index.value).assertEquals(2L) _ <- countMetrics(projectToDelete).assertEquals(0L) _ <- countMetrics(anotherProject).assertEquals(2L) } yield () diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSinkSuite.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSinkSuite.scala index a921e82d92..9fcec163a3 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSinkSuite.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSinkSuite.scala @@ -2,7 +2,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing import akka.http.scaladsl.model.StatusCodes import akka.http.scaladsl.model.Uri.Query -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchClientSetup import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{IndexLabel, QueryBuilder} @@ -58,7 +57,7 @@ class ElasticSearchSinkSuite DroppedElem(membersEntity, id, None, Instant.EPOCH, offset, rev) test("Create the index") { - client.createIndex(index, None, None).toCatsIO.assertEquals(true) + client.createIndex(index, None, None).assertEquals(true) } test("Index a chunk of documents and retrieve them") { @@ -68,7 +67,6 @@ class ElasticSearchSinkSuite _ <- sink.apply(chunk).assertEquals(chunk.map(_.void)) _ <- client .search(QueryBuilder.empty, Set(index.value), Query.Empty) - .toCatsIO .map(_.sources.toSet) .assertEquals(members.flatMap(_._2.asObject)) } yield () @@ -81,7 +79,6 @@ class ElasticSearchSinkSuite _ <- sink.apply(chunk).assertEquals(chunk.map(_.void)) _ <- client .search(QueryBuilder.empty, Set(index.value), Query.Empty) - .toCatsIO .map(_.sources.toSet) .assertEquals(Set(bob, judy).flatMap(_._2.asObject)) } yield () @@ -122,7 +119,6 @@ class ElasticSearchSinkSuite _ = assert(result.lift(2).flatMap(_.toOption).contains(())) _ <- client .search(QueryBuilder.empty, Set(index.value), Query.Empty) - .toCatsIO .map(_.sources.toSet) .assertEquals(Set(bob, judy, alice).flatMap(_._2.asObject)) } yield () @@ -138,9 +134,9 @@ class ElasticSearchSinkSuite val sink = ElasticSearchSink.states(client, 2, 50.millis, index, Refresh.True) for { - _ <- client.createIndex(index, None, None).toCatsIO.assertEquals(true) + _ <- client.createIndex(index, None, None).assertEquals(true) _ <- sink.apply(chunk).assertEquals(chunk.map(_.void)) - _ <- client.getSource[Json](index, charlie_2._1.toString).toCatsIO.assertEquals(charlie_2._2) + _ <- client.getSource[Json](index, charlie_2._1.toString).assertEquals(charlie_2._2) } yield () } @@ -157,11 +153,10 @@ class ElasticSearchSinkSuite val sink = ElasticSearchSink.states(client, 2, 50.millis, index, Refresh.True) for { - _ <- client.createIndex(index, None, None).toCatsIO.assertEquals(true) + _ <- client.createIndex(index, None, None).assertEquals(true) _ <- sink.apply(chunk).assertEquals(chunk.map(_.void)) _ <- client .getSource[Json](index, charlie._1.toString) - .toCatsIO .assertError[HttpClientError](_.errorCode.contains(StatusCodes.NotFound)) } yield () } diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/query/DefaultViewSearchSuite.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/query/DefaultViewSearchSuite.scala index f4d733d3c4..ff15b066bd 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/query/DefaultViewSearchSuite.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/query/DefaultViewSearchSuite.scala @@ -1,6 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.query import akka.http.scaladsl.model.Uri +import cats.effect.IO import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toCatsIOOps import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination.FromPagination @@ -29,7 +30,6 @@ import ch.epfl.bluebrain.nexus.testkit.mu.ce.CatsEffectSuite import ch.epfl.bluebrain.nexus.testkit.{CirceLiteral, TestHelpers} import io.circe.syntax.EncoderOps import io.circe.{Decoder, Json, JsonObject} -import monix.bio.UIO import munit.{AnyFixture, Location} import java.time.Instant @@ -146,7 +146,6 @@ class DefaultViewSearchSuite private def paginatedSearch(params: ResourcesSearchParams, pagination: Pagination, sort: SortList) = client .search(params, Set(defaultIndex.value), Uri.Query.Empty)(pagination, sort) - .toCatsIO private def aggregate(params: ResourcesSearchParams) = client.aggregate(params, Set(defaultIndex.value), Uri.Query.Empty, 100) @@ -294,9 +293,9 @@ object DefaultViewSearchSuite { ) } - def asDocument(implicit baseUri: BaseUri, rcr: RemoteContextResolution, jsonldApi: JsonLdApi): UIO[Json] = { + def asDocument(implicit baseUri: BaseUri, rcr: RemoteContextResolution, jsonldApi: JsonLdApi): IO[Json] = { val metadata = Resource.fileMetadataEncoder(Resource.Metadata(tag.toList)) - asResourceF.toCompactedJsonLd.map(_.json.deepMerge(metadata)).hideErrors + asResourceF.toCompactedJsonLd.map(_.json.deepMerge(metadata)).toCatsIO } } diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchIndexingRoutesSpec.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchIndexingRoutesSpec.scala index 1f8554f7ca..4aa30360e1 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchIndexingRoutesSpec.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchIndexingRoutesSpec.scala @@ -5,12 +5,12 @@ import akka.http.scaladsl.model.{MediaTypes, StatusCodes} import akka.http.scaladsl.server.Route import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF -import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.{ElasticSearchViews, ValidateElasticSearchView} import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.IndexLabel import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.IndexingViewDef.ActiveViewDef import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewRejection.{InvalidResourceId, ViewNotFound} import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{permissions => esPermissions, ElasticSearchViewRejection} import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.routes.ElasticSearchIndexingRoutes.FetchIndexingView +import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.{ElasticSearchViews, ValidateElasticSearchView} import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress @@ -29,7 +29,6 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.FailedElem import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{PipeChain, ProjectionProgress} import ch.epfl.bluebrain.nexus.testkit.bio.IOFromMap import io.circe.JsonObject -import monix.bio.UIO import java.time.Instant import scala.concurrent.duration._ @@ -86,7 +85,7 @@ class ElasticSearchIndexingRoutesSpec extends ElasticSearchViewsRoutesFixtures w ValidateElasticSearchView( PipeChain.validate(_, registry), IO.pure(allowedPerms), - (_, _, _) => UIO.unit, + (_, _, _) => IO.unit, "prefix", 5, xas, diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchViewsRoutesSpec.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchViewsRoutesSpec.scala index 2ec0ee88bf..3dfeb8439a 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchViewsRoutesSpec.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchViewsRoutesSpec.scala @@ -24,7 +24,6 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Subject import ch.epfl.bluebrain.nexus.delta.sourcing.stream.PipeChain import ch.epfl.bluebrain.nexus.testkit.bio.IOFromMap import io.circe.Json -import monix.bio.UIO class ElasticSearchViewsRoutesSpec extends ElasticSearchViewsRoutesFixtures with IOFromMap { @@ -67,7 +66,7 @@ class ElasticSearchViewsRoutesSpec extends ElasticSearchViewsRoutesFixtures with ValidateElasticSearchView( PipeChain.validate(_, registry), IO.pure(allowedPerms), - (_, _, _) => UIO.unit, + (_, _, _) => IO.unit, "prefix", 5, xas, diff --git a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalytics.scala b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalytics.scala index 78cb6aa0cb..9d6a2b51e2 100644 --- a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalytics.scala +++ b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalytics.scala @@ -13,6 +13,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.model.GraphAnalytic import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.model.PropertiesStatistics.propertiesDecoderFromEsAggregations import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.model.{AnalyticsGraph, GraphAnalyticsRejection, PropertiesStatistics} import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri +import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientError import ch.epfl.bluebrain.nexus.delta.sdk.jsonld.ExpandIri import ch.epfl.bluebrain.nexus.delta.sdk.model.IdSegment import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext @@ -55,8 +56,9 @@ object GraphAnalytics { query <- relationshipsAggQuery(config) stats <- client .searchAs[AnalyticsGraph](QueryBuilder(query), index(prefix, projectRef).value, Query.Empty) - .mapError(err => WrappedElasticSearchRejection(WrappedElasticSearchClientError(err))) - .toCatsIO + .adaptError { case e: HttpClientError => + WrappedElasticSearchRejection(WrappedElasticSearchClientError(e)) + } } yield stats override def properties( @@ -68,8 +70,7 @@ object GraphAnalytics { implicit val d: Decoder[PropertiesStatistics] = propertiesDecoderFromEsAggregations(tpe) client .searchAs[PropertiesStatistics](QueryBuilder(query).withTotalHits(true), idx.value, Query.Empty) - .mapError(err => WrappedElasticSearchRejection(WrappedElasticSearchClientError(err))) - .toCatsIO + .adaptError { case e: HttpClientError => WrappedElasticSearchRejection(WrappedElasticSearchClientError(e)) } } for { diff --git a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsCoordinator.scala b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsCoordinator.scala index f78edfd2f5..c24210dcbc 100644 --- a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsCoordinator.scala +++ b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsCoordinator.scala @@ -3,7 +3,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics import cats.effect.{ContextShift, IO, Timer} import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.Logger -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.GraphAnalytics.{index, projectionName} import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.config.GraphAnalyticsConfig @@ -145,14 +144,14 @@ object GraphAnalyticsCoordinator { ), ref => graphAnalyticsMappings.flatMap { mappings => - client.createIndex(index(config.prefix, ref), Some(mappings), None).toCatsIO + client.createIndex(index(config.prefix, ref), Some(mappings), None) }.void, - ref => client.deleteIndex(index(config.prefix, ref)).toCatsIO.void + ref => client.deleteIndex(index(config.prefix, ref)).void ) for { script <- scriptContent - _ <- client.createScript(updateRelationshipsScriptId, script).toCatsIO + _ <- client.createScript(updateRelationshipsScriptId, script) c <- coordinator } yield c } else { diff --git a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsViewsQuery.scala b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsViewsQuery.scala index 4cab2609fd..0357443862 100644 --- a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsViewsQuery.scala +++ b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsViewsQuery.scala @@ -2,9 +2,10 @@ package ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics import akka.http.scaladsl.model.Uri import cats.effect.IO -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toCatsIOOps +import cats.implicits.catsSyntaxMonadError import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewRejection.WrappedElasticSearchClientError +import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientError import ch.epfl.bluebrain.nexus.delta.sdk.model.search.SortList import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import io.circe.{Json, JsonObject} @@ -34,7 +35,9 @@ trait GraphAnalyticsViewsQuery { class GraphAnalyticsViewsQueryImpl(prefix: String, client: ElasticSearchClient) extends GraphAnalyticsViewsQuery { override def query(projectRef: ProjectRef, query: JsonObject, qp: Uri.Query): IO[Json] = { val index = GraphAnalytics.index(prefix, projectRef) - client.search(query, Set(index.value), qp)(SortList.empty).mapError(WrappedElasticSearchClientError).toCatsIO + client.search(query, Set(index.value), qp)(SortList.empty).adaptError { case e: HttpClientError => + WrappedElasticSearchClientError(e) + } } } diff --git a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsSink.scala b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsSink.scala index 1fa836fc7a..1a5b91ecbb 100644 --- a/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsSink.scala +++ b/delta/plugins/graph-analytics/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsSink.scala @@ -15,7 +15,6 @@ import fs2.Chunk import io.circe.JsonObject import io.circe.literal._ import io.circe.syntax.EncoderOps -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import shapeless.Typeable import scala.concurrent.duration.FiniteDuration @@ -86,8 +85,8 @@ final class GraphAnalyticsSink( case (acc, _: FailedElem) => acc } - client.bulk(result.bulk, Refresh.True).toCatsIO.map(ElasticSearchSink.markElems(_, elements, documentId)) <* - client.updateByQuery(relationshipsQuery(result.updates), Set(index.value)).toCatsIO + client.bulk(result.bulk, Refresh.True).map(ElasticSearchSink.markElems(_, elements, documentId)) <* + client.updateByQuery(relationshipsQuery(result.updates), Set(index.value)) }.span("graphAnalyticsSink") } diff --git a/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsSpec.scala b/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsSpec.scala index 9cfeeea780..9e9f496fc7 100644 --- a/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsSpec.scala +++ b/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsSpec.scala @@ -3,7 +3,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics import akka.actor.ActorSystem import akka.testkit.TestKit import ch.epfl.bluebrain.nexus.delta.kernel.RetryStrategyConfig -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toMonixBIOOps import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.Fixtures import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchBulk, ElasticSearchClient} import ch.epfl.bluebrain.nexus.delta.plugins.graph.analytics.config.GraphAnalyticsConfig.TermAggregationsConfig @@ -17,17 +16,14 @@ import ch.epfl.bluebrain.nexus.delta.sdk.generators.ProjectGen import ch.epfl.bluebrain.nexus.delta.sdk.http.{HttpClient, HttpClientConfig, HttpClientWorthRetry} import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContextDummy import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ +import ch.epfl.bluebrain.nexus.testkit.bio.IOFixedClock import ch.epfl.bluebrain.nexus.testkit.elasticsearch.ElasticSearchContainer._ import ch.epfl.bluebrain.nexus.testkit.elasticsearch.ElasticSearchDocker -import ch.epfl.bluebrain.nexus.testkit.scalatest.bio.BIOValues -import ch.epfl.bluebrain.nexus.testkit.TestHelpers -import ch.epfl.bluebrain.nexus.testkit.bio.IOFixedClock +import ch.epfl.bluebrain.nexus.testkit.scalatest.ce.CatsEffectSpec import monix.execution.Scheduler import org.scalatest.DoNotDiscover import org.scalatest.concurrent.Eventually -import org.scalatest.matchers.should.Matchers import org.scalatest.time.{Millis, Span} -import org.scalatest.wordspec.AnyWordSpecLike import java.util.UUID import scala.concurrent.duration._ @@ -35,10 +31,7 @@ import scala.concurrent.duration._ @DoNotDiscover class GraphAnalyticsSpec(docker: ElasticSearchDocker) extends TestKit(ActorSystem("GraphAnalyticsSpec")) - with AnyWordSpecLike - with Matchers - with TestHelpers - with BIOValues + with CatsEffectSpec with IOFixedClock with ConfigFixtures with Eventually @@ -83,7 +76,7 @@ class GraphAnalyticsSpec(docker: ElasticSearchDocker) } "fetch relationships" in eventually { - graphAnalytics.relationships(project.ref).toBIO[GraphAnalyticsRejection].accepted shouldEqual + graphAnalytics.relationships(project.ref).accepted shouldEqual AnalyticsGraph( List(Node(schema.Person, "Person", 3)), List(Edge(schema.Person, schema.Person, 3, Vector(EdgePath(schema + "brother", "brother")))) @@ -91,7 +84,7 @@ class GraphAnalyticsSpec(docker: ElasticSearchDocker) } "fetch properties" in { - graphAnalytics.properties(project.ref, schema.Person).toBIO[GraphAnalyticsRejection].accepted shouldEqual + graphAnalytics.properties(project.ref, schema.Person).accepted shouldEqual PropertiesStatistics( Metadata(schema.Person, "Person", 3), List( diff --git a/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsSinkSuite.scala b/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsSinkSuite.scala index 1eaca5a2d0..b1ec3c9c20 100644 --- a/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsSinkSuite.scala +++ b/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsSinkSuite.scala @@ -95,9 +95,9 @@ class GraphAnalyticsSinkSuite test("Create the update script and the index") { for { script <- scriptContent - _ <- client.createScript(updateRelationshipsScriptId, script).toCatsIO + _ <- client.createScript(updateRelationshipsScriptId, script) mapping <- graphAnalyticsMappings - _ <- client.createIndex(index, Some(mapping), None).toCatsIO.assertEquals(true) + _ <- client.createIndex(index, Some(mapping), None).assertEquals(true) } yield () } @@ -135,13 +135,13 @@ class GraphAnalyticsSinkSuite // - `resource1` with the relationship to `resource3` resolved // - `resource2` with no reference resolved // - `deprecatedResource` with only metadata, resolution is skipped - _ <- client.count(index.value).toCatsIO.eventually(3L) + _ <- client.count(index.value).eventually(3L) expected1 <- ioJsonContentOf("result/resource1.json") expected2 <- ioJsonContentOf("result/resource2.json") expectedDeprecated <- ioJsonContentOf("result/resource_deprecated.json") - _ <- client.getSource[Json](index, resource1.toString).toCatsIO.eventually(expected1) - _ <- client.getSource[Json](index, resource2.toString).toCatsIO.eventually(expected2) - _ <- client.getSource[Json](index, deprecatedResource.toString).toCatsIO.eventually(expectedDeprecated) + _ <- client.getSource[Json](index, resource1.toString).eventually(expected1) + _ <- client.getSource[Json](index, resource2.toString).eventually(expected2) + _ <- client.getSource[Json](index, deprecatedResource.toString).eventually(expectedDeprecated) } yield () } @@ -167,12 +167,12 @@ class GraphAnalyticsSinkSuite _ <- sink(chunk).assertEquals(chunk.map(_.void)) // The reference to file1 should have been resolved and introduced as a relationship // The update query should not have an effect on the other resource - _ <- client.refresh(index).toCatsIO + _ <- client.refresh(index) expected1 <- ioJsonContentOf("result/resource1_updated.json") expected2 <- ioJsonContentOf("result/resource2.json") - _ <- client.count(index.value).toCatsIO.eventually(3L) - _ <- client.getSource[Json](index, resource1.toString).toCatsIO.eventually(expected1) - _ <- client.getSource[Json](index, resource2.toString).toCatsIO.eventually(expected2) + _ <- client.count(index.value).eventually(3L) + _ <- client.getSource[Json](index, resource1.toString).eventually(expected1) + _ <- client.getSource[Json](index, resource2.toString).eventually(expected2) } yield () } diff --git a/delta/plugins/search/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/search/Search.scala b/delta/plugins/search/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/search/Search.scala index 7526a2f1f8..d66b497832 100644 --- a/delta/plugins/search/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/search/Search.scala +++ b/delta/plugins/search/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/search/Search.scala @@ -2,6 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.search import akka.http.scaladsl.model.Uri import cats.effect.IO +import cats.implicits.catsSyntaxMonadError import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.CompositeViews @@ -13,6 +14,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.search.model.SearchRejection.{Unkno import ch.epfl.bluebrain.nexus.delta.plugins.search.model._ import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress.{Project => ProjectAcl} +import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientError import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label import io.circe.{Json, JsonObject} @@ -98,7 +100,9 @@ object Search { p => ProjectAcl(p.view.project) -> p.projection.permission, p => projectionIndex(p.projection, p.view.uuid, prefix).value ) - results <- client.search(payload, accessibleIndices, qp)().mapError(WrappedElasticSearchClientError) + results <- client.search(payload, accessibleIndices, qp)().adaptError { case e: HttpClientError => + WrappedElasticSearchClientError(e) + } } yield results override def query(payload: JsonObject, qp: Uri.Query)(implicit caller: Caller): IO[Json] = diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/StoragesStatistics.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/StoragesStatistics.scala index c811aaeeed..e047e2c92e 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/StoragesStatistics.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/StoragesStatistics.scala @@ -2,7 +2,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages import akka.http.scaladsl.model.Uri.Query import cats.effect.IO -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.EventMetricsProjection.eventMetricsIndex import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageStatEntry @@ -39,7 +38,7 @@ object StoragesStatistics { indexPrefix: String ): StoragesStatistics = { val search = (jsonObject: JsonObject) => - client.search(jsonObject, Set(eventMetricsIndex(indexPrefix).value), Query.Empty)().toCatsIO + client.search(jsonObject, Set(eventMetricsIndex(indexPrefix).value), Query.Empty)() (idSegment: IdSegment, project: ProjectRef) => { for { diff --git a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/statistics/StoragesStatisticsSuite.scala b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/statistics/StoragesStatisticsSuite.scala index 157ed14c35..445c96ef50 100644 --- a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/statistics/StoragesStatisticsSuite.scala +++ b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/statistics/StoragesStatisticsSuite.scala @@ -1,7 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.statistics import cats.effect.IO -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toCatsIOOps import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.EventMetricsProjection.eventMetricsIndex import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh @@ -45,7 +44,7 @@ class StoragesStatisticsSuite StoragesStatistics.apply(client, (storage, _) => IO.pure(Iri.unsafe(storage.toString)), indexPrefix) test("Run the event metrics projection") { - val createIndex = client.createIndex(index, Some(metricsMapping.value), Some(metricsSettings.value)).toCatsIO.void + val createIndex = client.createIndex(index, Some(metricsMapping.value), Some(metricsSettings.value)).void val metricsProjection = EventMetricsProjection(sink, sv, _ => metricsStream, createIndex) metricsProjection.accepted } diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/http/HttpClient.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/http/HttpClient.scala index fb90ce2fdc..9092b8419b 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/http/HttpClient.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/http/HttpClient.scala @@ -9,6 +9,7 @@ import akka.http.scaladsl.model.{HttpRequest, HttpResponse} import akka.http.scaladsl.unmarshalling.FromEntityUnmarshaller import akka.stream.StreamTcpException import akka.util.ByteString +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toMonixBIOOps import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource import ch.epfl.bluebrain.nexus.delta.sdk.circe.CirceUnmarshalling._ import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClient.HttpResult @@ -33,10 +34,13 @@ trait HttpClient { */ def apply[A](req: HttpRequest)(handleResponse: PartialFunction[HttpResponse, HttpResult[A]]): HttpResult[A] + def run[A](req: HttpRequest)(handleResponse: PartialFunction[HttpResponse, cats.effect.IO[A]]): HttpResult[A] = + apply(req) { case r if handleResponse.isDefinedAt(r) => handleResponse(r).toBIO[HttpClientError] } + /** * Execute the argument request and unmarshal the response Json response. */ - def toJson(req: HttpRequest): HttpResult[Json] = + def toJson(req: HttpRequest): HttpResult[Json] = fromJsonTo[Json](req) /** diff --git a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsRunContext.scala b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsRunContext.scala index d8f60b3a48..c059c63123 100644 --- a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsRunContext.scala +++ b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsRunContext.scala @@ -6,7 +6,7 @@ import scala.concurrent.ExecutionContext trait CatsRunContext { - implicit lazy val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) - implicit lazy val timer: Timer[IO] = IO.timer(ExecutionContext.global) + implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) + implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) }