diff --git a/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/RetryStrategy.scala b/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/RetryStrategy.scala index 5fe9b652b2..d8162d0a48 100644 --- a/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/RetryStrategy.scala +++ b/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/RetryStrategy.scala @@ -1,9 +1,9 @@ package ch.epfl.bluebrain.nexus.delta.kernel import cats.effect.IO -import com.typesafe.scalalogging.{Logger => ScalaLoggingLogger} -import monix.bio.{IO => BIO, UIO} import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toMonixBIOOps +import com.typesafe.scalalogging.{Logger => ScalaLoggingLogger} +import monix.bio.{UIO, IO => BIO} import pureconfig.ConfigReader import pureconfig.error.{CannotConvert, ConfigReaderFailures, ConvertFailure} import pureconfig.generic.semiauto._ @@ -147,7 +147,7 @@ object RetryStrategy { RetryStrategy( config, (t: Throwable) => NonFatal(t), - (t: Throwable, d: RetryDetails) => logError(logger, action)(t, d).toBIO[Throwable] + (t: Throwable, d: RetryDetails) => logError[Throwable](logger, action)(t, d).toBIO ) } diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphViews.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphViews.scala index a81103a9dc..5ca244f855 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphViews.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphViews.scala @@ -2,6 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph import cats.effect.Clock import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.ioToTaskK import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent import ch.epfl.bluebrain.nexus.delta.kernel.utils.{IOUtils, UUIDF} import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.BlazegraphViews._ @@ -291,7 +292,7 @@ final class BlazegraphViews( * Return the existing indexing views in a project in a finite stream */ def currentIndexingViews(project: ProjectRef): ElemStream[IndexingViewDef] = - log.currentStates(Scope.Project(project)).evalMapFilter { envelope => + log.currentStates(Scope.Project(project)).translate(ioToTaskK).evalMapFilter { envelope => Task.pure(toIndexViewDef(envelope)) } @@ -299,7 +300,7 @@ final class BlazegraphViews( * Return all existing indexing views in a finite stream */ def currentIndexingViews: ElemStream[IndexingViewDef] = - log.currentStates(Scope.Root).evalMapFilter { envelope => + log.currentStates(Scope.Root).translate(ioToTaskK).evalMapFilter { envelope => Task.pure(toIndexViewDef(envelope)) } @@ -307,7 +308,7 @@ final class BlazegraphViews( * Return the indexing views in a non-ending stream */ def indexingViews(start: Offset): ElemStream[IndexingViewDef] = - log.states(Scope.Root, start).evalMapFilter { envelope => + log.states(Scope.Root, start).translate(ioToTaskK).evalMapFilter { envelope => Task.pure(toIndexViewDef(envelope)) } diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViews.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViews.scala index 7c7e2c650c..fe4088368e 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViews.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViews.scala @@ -1,6 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews import cats.effect.Clock +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.ioToTaskK import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination.FromPagination import ch.epfl.bluebrain.nexus.delta.kernel.syntax.kamonSyntax @@ -330,19 +331,19 @@ final class CompositeViews private ( * Return all existing views for the given project in a finite stream */ def currentViews(project: ProjectRef): ElemStream[CompositeViewDef] = - log.currentStates(Scope.Project(project)).map(toCompositeViewDef) + log.currentStates(Scope.Project(project)).translate(ioToTaskK).map(toCompositeViewDef) /** * Return all existing indexing views in a finite stream */ def currentViews: ElemStream[CompositeViewDef] = - log.currentStates(Scope.Root).map(toCompositeViewDef) + log.currentStates(Scope.Root).translate(ioToTaskK).map(toCompositeViewDef) /** * Return the indexing views in a non-ending stream */ def views(start: Offset): ElemStream[CompositeViewDef] = - log.states(Scope.Root, start).map(toCompositeViewDef) + log.states(Scope.Root, start).translate(ioToTaskK).map(toCompositeViewDef) private def toCompositeViewDef(envelope: Envelope[CompositeViewState]) = envelope.toElem { v => Some(v.project) }.map { v => diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClient.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClient.scala index 4d660ace9e..a3793b890e 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClient.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClient.scala @@ -4,7 +4,7 @@ import akka.actor.typed.ActorSystem import akka.http.scaladsl.client.RequestBuilding.{Get, Head} import akka.http.scaladsl.model.ContentTypes.`application/json` import akka.http.scaladsl.model.Uri.Query -import akka.http.scaladsl.model.headers.{`Last-Event-ID`, Accept} +import akka.http.scaladsl.model.headers.{Accept, `Last-Event-ID`} import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCodes} import akka.stream.alpakka.sse.scaladsl.EventSource import cats.effect.{ContextShift, IO} @@ -28,7 +28,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{Elem, RemainingElems} import com.typesafe.scalalogging.Logger import io.circe.parser.decode import fs2._ -import monix.bio.{IO => BIO, UIO} +import monix.bio.{UIO, IO => BIO} import monix.execution.Scheduler import scala.concurrent.Future diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsIndexingRoutes.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsIndexingRoutes.scala index ca093ce5a3..d1b47a0d9c 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsIndexingRoutes.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsIndexingRoutes.scala @@ -4,6 +4,7 @@ import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.Route import cats.effect.{ContextShift, IO} import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.routes.BlazegraphViewsDirectives import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef.ActiveViewDef import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewRejection._ @@ -30,7 +31,6 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject import ch.epfl.bluebrain.nexus.delta.sourcing.model.{FailedElemLogRow, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.projections.ProjectionErrors -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import monix.bio.{IO => BIO} import monix.execution.Scheduler class CompositeViewsIndexingRoutes( diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchViews.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchViews.scala index 7f5c0ab7c2..3d28f027d2 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchViews.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchViews.scala @@ -2,6 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch import cats.effect.Clock import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.ioToTaskK import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent import ch.epfl.bluebrain.nexus.delta.kernel.utils.{IOUtils, UUIDF} import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchViews._ @@ -326,7 +327,7 @@ final class ElasticSearchViews private ( * Return the existing indexing views in a project in a finite stream */ def currentIndexingViews(project: ProjectRef): ElemStream[IndexingViewDef] = - log.currentStates(Scope.Project(project)).evalMapFilter { envelope => + log.currentStates(Scope.Project(project)).translate(ioToTaskK).evalMapFilter { envelope => Task.pure(toIndexViewDef(envelope)) } @@ -334,7 +335,7 @@ final class ElasticSearchViews private ( * Return all existing indexing views in a finite stream */ def currentIndexingViews: ElemStream[IndexingViewDef] = - log.currentStates(Scope.Root).evalMapFilter { envelope => + log.currentStates(Scope.Root).translate(ioToTaskK).evalMapFilter { envelope => Task.pure(toIndexViewDef(envelope)) } @@ -342,7 +343,7 @@ final class ElasticSearchViews private ( * Return the indexing views in a non-ending stream */ def indexingViews(start: Offset): ElemStream[IndexingViewDef] = - log.states(Scope.Root, start).evalMapFilter { envelope => + log.states(Scope.Root, start).translate(ioToTaskK).evalMapFilter { envelope => Task.pure(toIndexViewDef(envelope)) } diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/EventMetricsProjection.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/EventMetricsProjection.scala index c7e8965b9b..1d57552feb 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/EventMetricsProjection.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/EventMetricsProjection.scala @@ -1,6 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch import cats.data.NonEmptyChain +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.ioToTaskK import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchClient, IndexLabel} import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.ElasticSearchSink @@ -94,7 +95,7 @@ object EventMetricsProjection { ): Task[EventMetricsProjection] = { val source = Source { (offset: Offset) => - metrics(offset).map { e => e.toElem { m => Some(m.project) } } + metrics(offset).translate(ioToTaskK).map { e => e.toElem { m => Some(m.project) } } } val compiledProjection = diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/metrics/MetricsStream.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/metrics/MetricsStream.scala index b0d0c48f72..06c20e6903 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/metrics/MetricsStream.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/metrics/MetricsStream.scala @@ -1,5 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.metrics +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv import ch.epfl.bluebrain.nexus.delta.rdf.syntax.iriStringContextSyntax import ch.epfl.bluebrain.nexus.delta.sdk.model.metrics.EventMetric.{ProjectScopedMetric, _} @@ -8,7 +9,6 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Envelope, Label import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import fs2.Stream import io.circe.{Json, JsonObject} -import monix.bio.Task import java.time.Instant @@ -120,7 +120,7 @@ object MetricsStream { Envelope(EntityType("entity"), nxv + "6", 1, metric6, Instant.EPOCH, Offset.At(6L)) ) - val metricsStream: Stream[Task, Envelope[ProjectScopedMetric]] = + val metricsStream: Stream[IO, Envelope[ProjectScopedMetric]] = Stream.emits(envelopes) } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala index 48a69042ad..3bb4b46dce 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala @@ -8,6 +8,7 @@ import cats.effect.Clock import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.RetryStrategy import ch.epfl.bluebrain.nexus.delta.kernel.cache.KeyValueStore +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.ioToTaskK import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent import ch.epfl.bluebrain.nexus.delta.kernel.utils.{IOUtils, UUIDF} import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.Files._ @@ -479,6 +480,7 @@ final class Files( ) stream <- log .states(Scope.root, offset) + .translate(ioToTaskK) .map { envelope => envelope.value match { case f diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/Storages.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/Storages.scala index 115a8c9c3f..d57ce1f118 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/Storages.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/Storages.scala @@ -3,6 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages import cats.effect.Clock import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.Mapper +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.ioToTaskK import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent import ch.epfl.bluebrain.nexus.delta.kernel.utils.{IOUtils, UUIDF} import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.Storages._ @@ -290,7 +291,7 @@ final class Storages private ( * Return the existing storages in a project in a finite stream */ def currentStorages(project: ProjectRef): ElemStream[StorageState] = - log.currentStates(Scope.Project(project)).map { + log.currentStates(Scope.Project(project)).translate(ioToTaskK).map { _.toElem { s => Some(s.project) } } diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImpl.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImpl.scala index e511ff8913..8dd3ea67ca 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImpl.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImpl.scala @@ -30,6 +30,7 @@ final class AclsImpl private ( override def fetch(address: AclAddress): IO[AclNotFound, AclResource] = log .stateOr(address, AclNotFound(address)) + .toBIO[AclNotFound] .onErrorRecover { case AclNotFound(a) if a == AclAddress.Root => AclState.initial(minimum) } @@ -42,6 +43,7 @@ final class AclsImpl private ( override def fetchAt(address: AclAddress, rev: Int): IO[AclRejection.NotFound, AclResource] = log .stateOr(address, rev, AclNotFound(address), RevisionNotFound) + .toBIO[AclRejection.NotFound] .onErrorRecover { case AclNotFound(a) if a == AclAddress.Root && rev == 0 => AclState.initial(minimum) } @@ -51,6 +53,7 @@ final class AclsImpl private ( override def list(filter: AclAddressFilter): UIO[AclCollection] = { log .currentStates(_.toResource) + .translate(ioToTaskK) .filter { a => filter.matches(a.value.address) } @@ -88,9 +91,10 @@ final class AclsImpl private ( override def delete(address: AclAddress, rev: Int)(implicit caller: Subject): IO[AclRejection, AclResource] = eval(DeleteAcl(address, rev, caller)).span("deleteAcls") - private def eval(cmd: AclCommand): IO[AclRejection, AclResource] = log.evaluate(cmd.address, cmd).map(_._2.toResource) + private def eval(cmd: AclCommand): IO[AclRejection, AclResource] = + log.evaluate(cmd.address, cmd).toBIO[AclRejection].map(_._2.toResource) - override def purge(project: AclAddress): UIO[Unit] = log.delete(project) + override def purge(project: AclAddress): UIO[Unit] = log.delete(project).toBIO } object AclsImpl { diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationsImpl.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationsImpl.scala index 336c5cd621..8a7f350fd1 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationsImpl.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/organizations/OrganizationsImpl.scala @@ -77,6 +77,7 @@ final class OrganizationsImpl private ( SearchResults( log .currentStates(_.toResource) + .translate(ioToTaskK) .evalFilter(params.matches), pagination, ordering diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/permissions/PermissionsImpl.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/permissions/PermissionsImpl.scala index f5a67dc34e..25a34aa25d 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/permissions/PermissionsImpl.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/permissions/PermissionsImpl.scala @@ -27,7 +27,6 @@ final class PermissionsImpl private ( override def fetch: IO[PermissionsResource] = log .stateOr[PermissionsRejection](labelId, UnexpectedState) - .toCatsIO .handleErrorWith(_ => IO.pure(initial)) .map(_.toResource(minimum)) .span("fetchPermissions") @@ -40,7 +39,6 @@ final class PermissionsImpl private ( UnexpectedState, RevisionNotFound ) - .toCatsIO .map(_.toResource(minimum)) .span("fetchPermissionsAt") @@ -68,7 +66,6 @@ final class PermissionsImpl private ( private def eval(cmd: PermissionsCommand): IO[PermissionsResource] = log .evaluate(labelId, cmd) - .toCatsIO .map { case (_, state) => state.toResource(minimum) } diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/ProjectsImpl.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/ProjectsImpl.scala index a22390a7c7..e3968e0f31 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/ProjectsImpl.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/ProjectsImpl.scala @@ -103,11 +103,12 @@ final class ProjectsImpl private ( ).span("listProjects") override def currentRefs: Stream[Task, ProjectRef] = - log.currentStates(Scope.root).map(_.value.project) + log.currentStates(Scope.root).translate(ioToTaskK).map(_.value.project) - override def states(offset: Offset): ElemStream[ProjectState] = log.states(Scope.root, offset).map { - _.toElem { p => Some(p.project) } - } + override def states(offset: Offset): ElemStream[ProjectState] = + log.states(Scope.root, offset).translate(ioToTaskK).map { + _.toElem { p => Some(p.project) } + } private def eval(cmd: ProjectCommand): IO[ProjectRejection, ProjectResource] = log.evaluate(cmd.ref, cmd.ref, cmd).map(_._2.toResource(defaultApiMappings)) diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/realms/RealmsImpl.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/realms/RealmsImpl.scala index baa55bfe3f..85435238e1 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/realms/RealmsImpl.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/realms/RealmsImpl.scala @@ -70,7 +70,7 @@ final class RealmsImpl private (log: RealmsLog) extends Realms { ordering: Ordering[RealmResource] ): IO[SearchResults.UnscoredSearchResults[RealmResource]] = SearchResults( - log.currentStates(_.toResource).evalFilter(params.matches), + log.currentStates(_.toResource).translate(ioToTaskK).evalFilter(params.matches), pagination, ordering ).span("listRealms") diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLog.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLog.scala index 79f5b2794e..2affc11baa 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLog.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLog.scala @@ -5,7 +5,6 @@ import cats.effect.IO import cats.implicits._ import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.kernel.cache.LocalCache -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.taskToIoK import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering import ch.epfl.bluebrain.nexus.delta.sdk.error.ServiceError.UnknownSseLabel import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.RdfMarshalling.defaultPrinter @@ -179,7 +178,6 @@ object SseEventLog { config.query, xas ) - .translate(taskToIoK) .evalMap(toServerSentEvent(_, fetchUuids)) } } diff --git a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImplSpec.scala b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImplSpec.scala index 7ac0eec4e0..f1fd946d29 100644 --- a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImplSpec.scala +++ b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/acls/AclsImplSpec.scala @@ -15,6 +15,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Group, import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Identity, Label, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.DoobieScalaTestFixture +import ch.epfl.bluebrain.nexus.testkit.ce.CatsIOValues import ch.epfl.bluebrain.nexus.testkit.{CirceLiteral, IOFixedClock, IOValues} import monix.bio.UIO import monix.execution.Scheduler @@ -27,6 +28,7 @@ class AclsImplSpec extends DoobieScalaTestFixture with IOValues with IOFixedClock + with CatsIOValues with Inspectors with Matchers with CancelAfterFailure diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/GlobalEventLog.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/GlobalEventLog.scala index a2f4f48ba9..4627dd9415 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/GlobalEventLog.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/GlobalEventLog.scala @@ -1,6 +1,8 @@ package ch.epfl.bluebrain.nexus.delta.sourcing +import cats.effect.IO import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.sourcing.EvaluationError.{EvaluationFailure, EvaluationTimeout} import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig import ch.epfl.bluebrain.nexus.delta.sourcing.event.Event.GlobalEvent @@ -12,9 +14,6 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.state.State.GlobalState import doobie.implicits._ import doobie.postgres.sqlstate import fs2.Stream -import monix.bio.Cause.{Error, Termination} -import monix.bio.{IO, Task, UIO} -import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import scala.concurrent.duration.FiniteDuration @@ -35,7 +34,7 @@ trait GlobalEventLog[Id, S <: GlobalState, Command, E <: GlobalEvent, Rejection] * @param notFound * if no state is found, fails with this rejection */ - def stateOr[R <: Rejection](id: Id, notFound: => R): IO[R, S] + def stateOr[R <: Rejection](id: Id, notFound: => R): IO[S] /** * Get the current state for the entity with the given __id__ at the given __revision__ @@ -48,7 +47,7 @@ trait GlobalEventLog[Id, S <: GlobalState, Command, E <: GlobalEvent, Rejection] * @param invalidRevision * if the revision of the resulting state does not match with the one provided */ - def stateOr[R <: Rejection](id: Id, rev: Int, notFound: => R, invalidRevision: (Int, Int) => R): IO[R, S] + def stateOr[R <: Rejection](id: Id, rev: Int, notFound: => R, invalidRevision: (Int, Int) => R): IO[S] /** * Evaluates the argument __command__ in the context of entity identified by __id__. @@ -61,7 +60,7 @@ trait GlobalEventLog[Id, S <: GlobalState, Command, E <: GlobalEvent, Rejection] * the newly generated state and appended event if the command was evaluated successfully, or the rejection of the * __command__ otherwise */ - def evaluate(id: Id, command: Command): IO[Rejection, (E, S)] + def evaluate(id: Id, command: Command): IO[(E, S)] /** * Tests the evaluation the argument __command__ in the context of entity identified by __id__, without applying any @@ -75,13 +74,13 @@ trait GlobalEventLog[Id, S <: GlobalState, Command, E <: GlobalEvent, Rejection] * the state and event that would be generated in if the command was tested for evaluation successfully, or the * rejection of the __command__ in otherwise */ - def dryRun(id: Id, command: Command): IO[Rejection, (E, S)] + def dryRun(id: Id, command: Command): IO[(E, S)] /** * Delete both states and events for the given id * @param id */ - def delete(id: Id): UIO[Unit] + def delete(id: Id): IO[Unit] /** * Allow to stream all current events within [[Envelope]] s @@ -116,19 +115,19 @@ trait GlobalEventLog[Id, S <: GlobalState, Command, E <: GlobalEvent, Rejection] * @param f * the function to apply on each state */ - def currentStates[T](offset: Offset, f: S => T): Stream[Task, T] + def currentStates[T](offset: Offset, f: S => T): Stream[IO, T] /** * Allow to stream all latest states from the beginning * @param f * the function to apply on each state */ - def currentStates[T](f: S => T): Stream[Task, T] = currentStates(Offset.Start, f) + def currentStates[T](f: S => T): Stream[IO, T] = currentStates(Offset.Start, f) } object GlobalEventLog { - def apply[Id, S <: GlobalState, Command, E <: GlobalEvent, Rejection]( + def apply[Id, S <: GlobalState, Command, E <: GlobalEvent, Rejection <: Throwable]( definition: GlobalEntityDefinition[Id, S, Command, E, Rejection], config: EventLogConfig, xas: Transactors @@ -142,7 +141,7 @@ object GlobalEventLog { xas ) - def apply[Id, S <: GlobalState, Command, E <: GlobalEvent, Rejection]( + def apply[Id, S <: GlobalState, Command, E <: GlobalEvent, Rejection <: Throwable]( eventStore: GlobalEventStore[Id, E], stateStore: GlobalStateStore[Id, S], stateMachine: StateMachine[S, Command, E, Rejection], @@ -151,55 +150,54 @@ object GlobalEventLog { xas: Transactors ): GlobalEventLog[Id, S, Command, E, Rejection] = new GlobalEventLog[Id, S, Command, E, Rejection] { - override def stateOr[R <: Rejection](id: Id, notFound: => R): IO[R, S] = stateStore.get(id).toUIO.flatMap { - IO.fromOption(_, notFound) + override def stateOr[R <: Rejection](id: Id, notFound: => R): IO[S] = stateStore.get(id).flatMap { + IO.fromOption(_)(notFound) } - override def stateOr[R <: Rejection](id: Id, rev: Int, notFound: => R, invalidRevision: (Int, Int) => R): IO[R, S] = - stateMachine.computeState(eventStore.history(id, rev)).flatMap { + override def stateOr[R <: Rejection](id: Id, rev: Int, notFound: => R, invalidRevision: (Int, Int) => R): IO[S] = + stateMachine.computeState(eventStore.history(id, rev).translate(ioToTaskK)).toCatsIO.flatMap { case Some(s) if s.rev == rev => IO.pure(s) case Some(s) => IO.raiseError(invalidRevision(rev, s.rev)) case None => IO.raiseError(notFound) } - override def evaluate(id: Id, command: Command): IO[Rejection, (E, S)] = - stateStore.get(id).toUIO.flatMap { current => + override def evaluate(id: Id, command: Command): IO[(E, S)] = + stateStore.get(id).flatMap { current => stateMachine .evaluate(current, command, maxDuration) - .tapEval { case (event, state) => + .attempt + .toCatsIO + .adaptError { + case e: EvaluationTimeout[_] => e + case e: Throwable => EvaluationFailure(command, e) + } + .flatMap(IO.fromEither) + .flatTap { case (event, state) => (eventStore.save(event) >> stateStore.save(state)) .attemptSomeSqlState { case sqlstate.class23.UNIQUE_VIOLATION => onUniqueViolation(id, command) } - .transact(xas.write) - .hideErrors + .transact(xas.writeCE) .flatMap(IO.fromEither) } - .redeemCauseWith( - { - case Error(rejection) => IO.raiseError(rejection) - case Termination(e: EvaluationTimeout[_]) => IO.terminate(e) - case Termination(e) => IO.terminate(EvaluationFailure(command, e)) - }, - r => IO.pure(r) - ) } - override def dryRun(id: Id, command: Command): IO[Rejection, (E, S)] = - stateStore.get(id).toUIO.flatMap { current => + override def dryRun(id: Id, command: Command): IO[(E, S)] = + stateStore.get(id).flatMap { current => stateMachine.evaluate(current, command, maxDuration) } - override def delete(id: Id): UIO[Unit] = + override def delete(id: Id): IO[Unit] = (stateStore.delete(id) >> eventStore.delete(id)).transact(xas.write).hideErrors override def currentEvents(offset: Offset): EnvelopeStream[E] = eventStore.currentEvents(offset) override def events(offset: Offset): EnvelopeStream[E] = eventStore.events(offset) - override def currentStates[T](offset: Offset, f: S => T): Stream[Task, T] = currentStates(offset).map { e => - f(e.value) - } + override def currentStates[T](offset: Offset, f: S => T): Stream[IO, T] = + currentStates(offset).map { e => + f(e.value) + } override def currentStates(offset: Offset): EnvelopeStream[S] = stateStore.currentStates(offset) diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/ScopedEventLog.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/ScopedEventLog.scala index bfcfbf4d8a..4a0866d5b4 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/ScopedEventLog.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/ScopedEventLog.scala @@ -1,6 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.sourcing import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.ioToTaskK import ch.epfl.bluebrain.nexus.delta.sourcing.EvaluationError.{EvaluationFailure, EvaluationTimeout} import ch.epfl.bluebrain.nexus.delta.sourcing.ScopedEntityDefinition.Tagger import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig @@ -323,7 +324,7 @@ object ScopedEventLog { stateStore.currentStates(scope, offset) override def currentStates[T](scope: Scope, offset: Offset, f: S => T): Stream[Task, T] = - currentStates(scope, offset).map { s => + currentStates(scope, offset).translate(ioToTaskK).map { s => f(s.value) } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Transactors.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Transactors.scala index 0bda9251a3..74bd89f991 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Transactors.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Transactors.scala @@ -31,8 +31,9 @@ final case class Transactors( cache: PartitionsCache )(implicit s: Scheduler) { - def readCE: Transactor[IO] = read.mapK(BIO.liftTo) - def writeCE: Transactor[IO] = write.mapK(BIO.liftTo) + def readCE: Transactor[IO] = read.mapK(BIO.liftTo) + def writeCE: Transactor[IO] = write.mapK(BIO.liftTo) + def streamingCE: Transactor[IO] = streaming.mapK(BIO.liftTo) def execDDL(ddl: String)(implicit cl: ClassLoader): Task[Unit] = ClasspathResourceUtils.ioContentOf(ddl).flatMap(Fragment.const0(_).update.run.transact(write)).void diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/GlobalEventStore.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/GlobalEventStore.scala index 402eafebb9..f3e7f2762c 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/GlobalEventStore.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/GlobalEventStore.scala @@ -1,6 +1,8 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.event +import cats.effect.IO import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.taskToIoK import ch.epfl.bluebrain.nexus.delta.sourcing.{Serializer, Transactors} import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.event.Event.GlobalEvent @@ -13,7 +15,6 @@ import doobie.implicits._ import doobie.postgres.implicits._ import fs2.Stream import io.circe.Decoder -import monix.bio.Task /** * Allows to save and fetch [[GlobalEvent]] s from the database @@ -33,17 +34,17 @@ trait GlobalEventStore[Id, E <: GlobalEvent] { /** * Fetches the history for the global event up to the provided revision */ - def history(id: Id, to: Option[Int]): Stream[Task, E] + def history(id: Id, to: Option[Int]): Stream[IO, E] /** * Fetches the history for the global event up to the provided revision */ - def history(id: Id, to: Int): Stream[Task, E] = history(id, Some(to)) + def history(id: Id, to: Int): Stream[IO, E] = history(id, Some(to)) /** * Fetches the history for the global event up to the last existing revision */ - def history(id: Id): Stream[Task, E] = history(id, None) + def history(id: Id): Stream[IO, E] = history(id, None) /** * Fetches events from the given type from the provided offset. @@ -105,16 +106,16 @@ object GlobalEventStore { def delete(id: Id): ConnectionIO[Unit] = sql"""DELETE FROM global_events where type = $tpe and id = $id""".update.run.void - override def history(id: Id, to: Option[Int]): Stream[Task, E] = { + override def history(id: Id, to: Option[Int]): Stream[IO, E] = { val select = fr"SELECT value FROM public.global_events" ++ Fragments.whereAndOpt(Some(fr"type = $tpe"), Some(fr"id = $id"), to.map { t => fr" rev <= $t" }) ++ fr"ORDER BY rev" - select.query[E].streamWithChunkSize(config.batchSize).transact(xas.read) + select.query[E].streamWithChunkSize(config.batchSize).transact(xas.readCE) } - private def events(offset: Offset, strategy: RefreshStrategy): Stream[Task, Envelope[E]] = + private def events(offset: Offset, strategy: RefreshStrategy): Stream[IO, Envelope[E]] = StreamingQuery[Envelope[E]]( offset, offset => sql"""SELECT type, id, value, rev, instant, ordering FROM public.global_events @@ -124,11 +125,11 @@ object GlobalEventStore { _.offset, config.copy(refreshStrategy = strategy), xas - ) + ).translate(taskToIoK) - override def currentEvents(offset: Offset): Stream[Task, Envelope[E]] = events(offset, RefreshStrategy.Stop) + override def currentEvents(offset: Offset): Stream[IO, Envelope[E]] = events(offset, RefreshStrategy.Stop) - override def events(offset: Offset): Stream[Task, Envelope[E]] = events(offset, config.refreshStrategy) + override def events(offset: Offset): Stream[IO, Envelope[E]] = events(offset, config.refreshStrategy) } } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/ScopedEventStore.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/ScopedEventStore.scala index bacad6308d..69a2676e5b 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/ScopedEventStore.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/ScopedEventStore.scala @@ -1,6 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.event import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.taskToIoK import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.event.Event.ScopedEvent import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ @@ -132,7 +133,7 @@ object ScopedEventStore { scope: Scope, offset: Offset, strategy: RefreshStrategy - ): Stream[Task, Envelope[E]] = + ): EnvelopeStream[E] = StreamingQuery[Envelope[E]]( offset, offset => sql"""SELECT type, id, value, rev, instant, ordering FROM public.scoped_events @@ -142,12 +143,12 @@ object ScopedEventStore { _.offset, config.copy(refreshStrategy = strategy), xas - ) + ).translate(taskToIoK) - override def currentEvents(scope: Scope, offset: Offset): Stream[Task, Envelope[E]] = + override def currentEvents(scope: Scope, offset: Offset): EnvelopeStream[E] = events(scope, offset, RefreshStrategy.Stop) - override def events(scope: Scope, offset: Offset): Stream[Task, Envelope[E]] = + override def events(scope: Scope, offset: Offset): EnvelopeStream[E] = events(scope, offset, config.refreshStrategy) } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/Envelope.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/Envelope.scala index 7d9d375863..7ef778ebac 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/Envelope.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/Envelope.scala @@ -1,5 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.model +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.taskToIoK import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClassUtils import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.sourcing.{MultiDecoder, Transactors} @@ -111,5 +112,6 @@ object Envelope { // evalMapFilter re-chunks to 1, the following 2 statements do the same but preserve the chunks .evalMapChunk(e => decode(e.tpe, e.value).map(_.map(a => e.copy(value = a)))) .collect { case Some(e) => e } + .translate(taskToIoK) } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/package.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/package.scala index d92e6c5fb8..a78f4ee02f 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/package.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/package.scala @@ -1,5 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.sourcing +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem import fs2.Stream import fs2.Pipe @@ -7,7 +8,7 @@ import monix.bio.Task package object model { - type EnvelopeStream[Value] = Stream[Task, Envelope[Value]] + type EnvelopeStream[Value] = Stream[IO, Envelope[Value]] type ElemStream[Value] = Stream[Task, Elem[Value]] diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStore.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStore.scala index 4d43b772c7..6fceee744b 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStore.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/GlobalStateStore.scala @@ -2,6 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.state import cats.effect.IO import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.taskToIoK import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.sourcing.{Serializer, Transactors} import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig @@ -132,7 +133,7 @@ object GlobalStateStore { _.offset, config.copy(refreshStrategy = strategy), xas - ) + ).translate(taskToIoK) override def currentStates(offset: Offset): EnvelopeStream[S] = states(offset, RefreshStrategy.Stop) diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStore.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStore.scala index f49b62a7f5..c5188d562a 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStore.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStore.scala @@ -1,6 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.state import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.taskToIoK import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.implicits.IriInstances @@ -291,7 +292,7 @@ object ScopedStateStore { _.offset, config.copy(refreshStrategy = strategy), xas - ) + ).translate(taskToIoK) override def currentStates(scope: Scope, tag: Tag, offset: Offset): EnvelopeStream[S] = states(scope, tag, offset, RefreshStrategy.Stop) diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Arithmetic.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Arithmetic.scala index 20596a97bb..1cfdd35889 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Arithmetic.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Arithmetic.scala @@ -10,6 +10,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.event.Event.GlobalEvent import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Subject} import ch.epfl.bluebrain.nexus.delta.sourcing.model.ResourceRef.Latest import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ResourceRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.rejection.Rejection import ch.epfl.bluebrain.nexus.delta.sourcing.state.State.GlobalState import io.circe.Codec import io.circe.generic.extras.Configuration @@ -87,7 +88,9 @@ object Arithmetic { } } - sealed trait ArithmeticRejection extends Product with Serializable + sealed trait ArithmeticRejection extends Rejection with Product with Serializable { + override def reason: String = this.toString + } object ArithmeticRejection { final case object NotFound extends ArithmeticRejection diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/GlobalEventLogSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/GlobalEventLogSuite.scala index 29a03296bf..cbcec36e8a 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/GlobalEventLogSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/GlobalEventLogSuite.scala @@ -10,15 +10,15 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.EvaluationError.{EvaluationFailure import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.event.GlobalEventStore import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshStrategy import ch.epfl.bluebrain.nexus.delta.sourcing.state.GlobalStateStore -import ch.epfl.bluebrain.nexus.testkit.bio.BioSuite -import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie +import ch.epfl.bluebrain.nexus.testkit.ce.CatsEffectSuite import munit.AnyFixture import scala.concurrent.duration._ -class GlobalEventLogSuite extends BioSuite with Doobie.Fixture { +class GlobalEventLogSuite extends CatsEffectSuite with Doobie.Fixture { override def munitFixtures: Seq[AnyFixture[_]] = List(doobie) @@ -62,74 +62,74 @@ class GlobalEventLogSuite extends BioSuite with Doobie.Fixture { private val id = nxv + "id" test("Raise an error with a non-existent " + id) { - eventLog.stateOr(nxv + "xxx", NotFound).error(NotFound) + eventLog.stateOr(nxv + "xxx", NotFound).intercept(NotFound) } test("Evaluate successfully a command and store both event and state for an initial state") { for { - _ <- eventLog.evaluate(id, Add(2)).assert((plus2, total1)) + _ <- eventLog.evaluate(id, Add(2)).assertEquals((plus2, total1)) _ <- eventStore.history(id).assert(plus2) - _ <- eventLog.stateOr(id, NotFound).assert(total1) + _ <- eventLog.stateOr(id, NotFound).assertEquals(total1) } yield () } test("Evaluate successfully another and store both event and state for an initial state") { for { - _ <- eventLog.evaluate(id, Add(3)).assert((plus3, total2)) + _ <- eventLog.evaluate(id, Add(3)).assertEquals((plus3, total2)) _ <- eventStore.history(id).assert(plus2, plus3) - _ <- eventLog.stateOr(id, NotFound).assert(total2) + _ <- eventLog.stateOr(id, NotFound).assertEquals(total2) } yield () } test("Reject a command and persist nothing") { for { - _ <- eventLog.evaluate(id, Subtract(8)).error(NegativeTotal(-3)) + _ <- eventLog.evaluate(id, Subtract(8)).intercept(NegativeTotal(-3)) _ <- eventStore.history(id).assert(plus2, plus3) - _ <- eventLog.stateOr(id, NotFound).assert(total2) + _ <- eventLog.stateOr(id, NotFound).assertEquals(total2) } yield () } test("Raise an error and persist nothing") { val boom = Boom("fail") for { - _ <- eventLog.evaluate(id, boom).terminated(EvaluationFailure(boom, "RuntimeException", boom.message)) + _ <- eventLog.evaluate(id, boom).intercept(EvaluationFailure(boom, "RuntimeException", boom.message)) _ <- eventStore.history(id).assert(plus2, plus3) - _ <- eventLog.stateOr(id, NotFound).assert(total2) + _ <- eventLog.stateOr(id, NotFound).assertEquals(total2) } yield () } test("Get a timeout and persist nothing") { for { - _ <- eventLog.evaluate(id, Never).terminated(EvaluationTimeout(Never, maxDuration)) + _ <- eventLog.evaluate(id, Never).intercept(EvaluationTimeout(Never, maxDuration)) _ <- eventStore.history(id).assert(plus2, plus3) - _ <- eventLog.stateOr(id, NotFound).assert(total2) + _ <- eventLog.stateOr(id, NotFound).assertEquals(total2) } yield () } test("Dry run successfully a command without persisting anything") { for { - _ <- eventLog.dryRun(id, Subtract(4)).assert((minus4, total3)) + _ <- eventLog.dryRun(id, Subtract(4)).assertEquals((minus4, total3)) _ <- eventStore.history(id).assert(plus2, plus3) - _ <- eventLog.stateOr(id, NotFound).assert(total2) + _ <- eventLog.stateOr(id, NotFound).assertEquals(total2) } yield () } test("Get state at the specified revision") { - eventLog.stateOr(id, 1, NotFound, RevisionNotFound).assert(total1) + eventLog.stateOr(id, 1, NotFound, RevisionNotFound).assertEquals(total1) } test("Raise an error with a non-existent " + id) { - eventLog.stateOr(nxv + "xxx", 1, NotFound, RevisionNotFound).error(NotFound) + eventLog.stateOr(nxv + "xxx", 1, NotFound, RevisionNotFound).intercept(NotFound) } test(s"Raise an error when providing a nonexistent revision") { - eventLog.stateOr(id, 10, NotFound, RevisionNotFound).error(RevisionNotFound(10, 2)) + eventLog.stateOr(id, 10, NotFound, RevisionNotFound).intercept(RevisionNotFound(10, 2)) } test(s"Delete events and state for $id") { for { _ <- eventLog.delete(id) - _ <- eventLog.stateOr(id, 1, NotFound, RevisionNotFound).error(NotFound) + _ <- eventLog.stateOr(id, 1, NotFound, RevisionNotFound).intercept(NotFound) _ <- eventLog.currentEvents(Offset.start).assertSize(0) } yield () } diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/ScopedEventLogSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/ScopedEventLogSuite.scala index 568059b14e..fc6d7dba19 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/ScopedEventLogSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/ScopedEventLogSuite.scala @@ -1,5 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.sourcing +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.ioToTaskK import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.sourcing.EvaluationError.{EvaluationFailure, EvaluationTimeout} import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestCommand._ @@ -204,7 +205,13 @@ class ScopedEventLogSuite extends BioSuite with Doobie.Fixture { test("Stream continuously the current states") { for { queue <- Queue.unbounded[Task, Envelope[PullRequestState]] - _ <- eventLog.states(Scope.root, Offset.Start).through(queue.enqueue).compile.drain.timeout(500.millis) + _ <- eventLog + .states(Scope.root, Offset.Start) + .translate(ioToTaskK) + .through(queue.enqueue) + .compile + .drain + .timeout(500.millis) elems <- queue.tryDequeueChunk1(Int.MaxValue).map(opt => opt.map(_.toList).getOrElse(Nil)) _ = elems.assertSize(2) } yield () diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/EventStreamingSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/EventStreamingSuite.scala index 51274c660e..523b0cf83c 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/EventStreamingSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/EventStreamingSuite.scala @@ -12,17 +12,17 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.event.EventStreamingSuite.IdRev import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Anonymous import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshStrategy import ch.epfl.bluebrain.nexus.delta.sourcing.{Arithmetic, MultiDecoder, PullRequest, Scope} -import ch.epfl.bluebrain.nexus.testkit.bio.BioSuite -import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie +import ch.epfl.bluebrain.nexus.testkit.ce.CatsEffectSuite import doobie.implicits._ import io.circe.Decoder import munit.AnyFixture import java.time.Instant -class EventStreamingSuite extends BioSuite with Doobie.Fixture with Doobie.Assertions { +class EventStreamingSuite extends CatsEffectSuite with Doobie.Fixture with Doobie.Assertions { override def munitFixtures: Seq[AnyFixture[_]] = List(doobie) diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/GlobalEventStoreSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/GlobalEventStoreSuite.scala index 5729bf1a0b..c190154437 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/GlobalEventStoreSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/GlobalEventStoreSuite.scala @@ -1,6 +1,7 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.event import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv import ch.epfl.bluebrain.nexus.delta.sourcing.Arithmetic @@ -10,16 +11,16 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, User} import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Envelope, Label} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset -import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshStrategy -import ch.epfl.bluebrain.nexus.testkit.bio.BioSuite import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie +import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshStrategy +import ch.epfl.bluebrain.nexus.testkit.ce.CatsEffectSuite import doobie.implicits._ import munit.AnyFixture import java.time.Instant import scala.concurrent.duration._ -class GlobalEventStoreSuite extends BioSuite with Doobie.Fixture with Doobie.Assertions { +class GlobalEventStoreSuite extends CatsEffectSuite with Doobie.Fixture with Doobie.Assertions { override def munitFixtures: Seq[AnyFixture[_]] = List(doobie) @@ -46,18 +47,20 @@ class GlobalEventStoreSuite extends BioSuite with Doobie.Fixture with Doobie.Ass private val envelope3 = Envelope(Arithmetic.entityType, id, 3, event3, Instant.EPOCH, Offset.at(3L)) private val envelope4 = Envelope(Arithmetic.entityType, id2, 1, event4, Instant.EPOCH, Offset.at(4L)) - private def assertCount = sql"select count(*) from global_events".query[Int].unique.transact(xas.read).assert(4) + private def assertCount = + sql"select count(*) from global_events".query[Int].unique.transact(xas.readCE).assertEquals(4) test("Save events successfully") { for { - _ <- List(event1, event2, event3, event4).traverse(store.save).transact(xas.write) + _ <- List(event1, event2, event3, event4).traverse(store.save).transact(xas.writeCE) _ <- assertCount } yield () } test("Fail when the PK already exists") { for { - _ <- store.save(Plus(id, 2, 5, Instant.EPOCH, Anonymous)).transact(xas.write).expectUniqueViolation + _ <- + store.save(Plus(id, 2, 5, Instant.EPOCH, Anonymous)).transact(xas.writeCE).toUIO.expectUniqueViolation.toCatsIO _ <- assertCount } yield () } @@ -92,7 +95,7 @@ class GlobalEventStoreSuite extends BioSuite with Doobie.Fixture with Doobie.Ass test(s"Delete events for $id") { for { - _ <- store.delete(id).transact(xas.write) + _ <- store.delete(id).transact(xas.writeCE) _ <- store.history(id).assertSize(0) } yield () } diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/ScopedEventStoreSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/ScopedEventStoreSuite.scala index a3545b8d52..1509cc47fb 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/ScopedEventStoreSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/ScopedEventStoreSuite.scala @@ -2,24 +2,30 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.event import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri -import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestEvent import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv +import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestEvent import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestEvent.{PullRequestCreated, PullRequestMerged, PullRequestUpdated} import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, User} import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Envelope, Label, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshStrategy import ch.epfl.bluebrain.nexus.delta.sourcing.{PullRequest, Scope} import ch.epfl.bluebrain.nexus.testkit.bio.BioSuite -import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie +import ch.epfl.bluebrain.nexus.testkit.ce.{CatsRunContext, CatsStreamAssertions} import doobie.implicits._ import munit.AnyFixture import java.time.Instant import scala.concurrent.duration._ -class ScopedEventStoreSuite extends BioSuite with Doobie.Fixture with Doobie.Assertions { +class ScopedEventStoreSuite + extends BioSuite + with CatsRunContext + with CatsStreamAssertions + with Doobie.Fixture + with Doobie.Assertions { override def munitFixtures: Seq[AnyFixture[_]] = List(doobie) diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStoreSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStoreSuite.scala index 2092d5fd50..d0ccef8238 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStoreSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStoreSuite.scala @@ -16,13 +16,19 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.state.ScopedStateStore.StateNotFou import ch.epfl.bluebrain.nexus.delta.sourcing.{EntityCheck, PullRequest, Scope} import ch.epfl.bluebrain.nexus.testkit.bio.BioSuite import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie +import ch.epfl.bluebrain.nexus.testkit.ce.{CatsRunContext, CatsStreamAssertions} import doobie.implicits._ import munit.AnyFixture import java.time.Instant import scala.concurrent.duration._ -class ScopedStateStoreSuite extends BioSuite with Doobie.Fixture with Doobie.Assertions { +class ScopedStateStoreSuite + extends BioSuite + with CatsRunContext + with CatsStreamAssertions + with Doobie.Fixture + with Doobie.Assertions { override def munitFixtures: Seq[AnyFixture[_]] = List(doobie) diff --git a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/bio/StreamAssertions.scala b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/bio/StreamAssertions.scala index eba9949a19..64201b4ab7 100644 --- a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/bio/StreamAssertions.scala +++ b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/bio/StreamAssertions.scala @@ -1,6 +1,5 @@ package ch.epfl.bluebrain.nexus.testkit.bio -import cats.effect.{ContextShift, IO, Timer} import fs2.Stream import monix.bio.Task import munit.{Assertions, Location} @@ -23,25 +22,4 @@ trait StreamAssertions { self: Assertions => def assert(expected: A*): Task[Unit] = assert(expected.toList) def assertEmpty: Task[Unit] = assert(List.empty) } - - implicit class CEStreamAssertionsOps[A](stream: Stream[IO, A])(implicit - loc: Location, - contextShift: ContextShift[IO], - timer: Timer[IO] - ) { - def assert(expected: List[A]): IO[Unit] = - stream.take(expected.size.toLong).timeout(3.seconds).mask.compile.toList.map { obtained => - assertEquals(obtained, expected, s"Got ${obtained.size} elements, ${expected.size} elements were expected.") - } - - def assertSize(expected: Int): IO[Unit] = - stream.take(expected.toLong).timeout(3.seconds).mask.compile.toList.map { obtained => - assertEquals(obtained.size, expected, s"Got ${obtained.size} elements, $expected elements were expected.") - } - - def assert(expected: A*): IO[Unit] = assert(expected.toList) - - def assertEmpty: IO[Unit] = assert(List.empty) - } - } diff --git a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsEffectSuite.scala b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsEffectSuite.scala index 39a87deca8..70402a7bb5 100644 --- a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsEffectSuite.scala +++ b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsEffectSuite.scala @@ -1,12 +1,11 @@ package ch.epfl.bluebrain.nexus.testkit.ce -import cats.effect.{ContextShift, IO, Timer} +import cats.effect.IO import ch.epfl.bluebrain.nexus.testkit.NexusSuite -import ch.epfl.bluebrain.nexus.testkit.bio.{CollectionAssertions, EitherAssertions, StreamAssertions} +import ch.epfl.bluebrain.nexus.testkit.bio.{CollectionAssertions, EitherAssertions} import monix.bio.{IO => BIO} import monix.execution.Scheduler -import scala.concurrent.ExecutionContext import scala.concurrent.duration.{DurationInt, FiniteDuration} /** @@ -15,15 +14,13 @@ import scala.concurrent.duration.{DurationInt, FiniteDuration} */ abstract class CatsEffectSuite extends NexusSuite + with CatsRunContext with CatsEffectAssertions - with StreamAssertions + with CatsStreamAssertions with CollectionAssertions with EitherAssertions { protected val ioTimeout: FiniteDuration = 45.seconds - implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) - implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) - override def munitValueTransforms: List[ValueTransform] = super.munitValueTransforms ++ List(munitIOTransform, munitBIOTransform) diff --git a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsRunContext.scala b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsRunContext.scala new file mode 100644 index 0000000000..c059c63123 --- /dev/null +++ b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsRunContext.scala @@ -0,0 +1,12 @@ +package ch.epfl.bluebrain.nexus.testkit.ce + +import cats.effect.{ContextShift, IO, Timer} + +import scala.concurrent.ExecutionContext + +trait CatsRunContext { + + implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) + implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) + +} diff --git a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsStreamAssertions.scala b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsStreamAssertions.scala new file mode 100644 index 0000000000..fa94ef56dc --- /dev/null +++ b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/ce/CatsStreamAssertions.scala @@ -0,0 +1,29 @@ +package ch.epfl.bluebrain.nexus.testkit.ce + +import cats.effect.IO +import fs2.Stream +import munit.{Assertions, Location} + +import scala.concurrent.duration.DurationInt + +trait CatsStreamAssertions { self: CatsRunContext with Assertions => + + implicit class CEStreamAssertionsOps[A](stream: Stream[IO, A])(implicit + loc: Location + ) { + def assert(expected: List[A]): IO[Unit] = + stream.take(expected.size.toLong).timeout(3.seconds).mask.compile.toList.map { obtained => + assertEquals(obtained, expected, s"Got ${obtained.size} elements, ${expected.size} elements were expected.") + } + + def assertSize(expected: Int): IO[Unit] = + stream.take(expected.toLong).timeout(3.seconds).mask.compile.toList.map { obtained => + assertEquals(obtained.size, expected, s"Got ${obtained.size} elements, $expected elements were expected.") + } + + def assert(expected: A*): IO[Unit] = assert(expected.toList) + + def assertEmpty: IO[Unit] = assert(List.empty) + } + +}