diff --git a/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/RetryStrategy.scala b/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/RetryStrategy.scala index 4d1bbd0221..ac1e84f875 100644 --- a/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/RetryStrategy.scala +++ b/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/RetryStrategy.scala @@ -1,8 +1,8 @@ package ch.epfl.bluebrain.nexus.delta.kernel +import cats.effect.IO import com.typesafe.scalalogging.{Logger => ScalaLoggingLogger} -import monix.bio.{IO, UIO} -import cats.effect.{IO => CIO} +import monix.bio.{UIO, IO => BIO} import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toMonixBIOOps import pureconfig.ConfigReader import pureconfig.error.{CannotConvert, ConfigReaderFailures, ConvertFailure} @@ -27,9 +27,9 @@ import scala.util.control.NonFatal final case class RetryStrategy[E]( config: RetryStrategyConfig, retryWhen: E => Boolean, - onError: (E, RetryDetails) => IO[E, Unit] + onError: (E, RetryDetails) => BIO[E, Unit] ) { - val policy: RetryPolicy[IO[E, *]] = config.toPolicy[E] + val policy: RetryPolicy[BIO[E, *]] = config.toPolicy[E] } object RetryStrategy { @@ -37,7 +37,7 @@ object RetryStrategy { /** * Apply the provided strategy on the given io */ - def use[E, A](io: IO[E, A], retryStrategy: RetryStrategy[E]): IO[E, A] = + def use[E, A](io: BIO[E, A], retryStrategy: RetryStrategy[E]): BIO[E, A] = io.retryingOnSomeErrors( retryStrategy.retryWhen, retryStrategy.policy, @@ -47,7 +47,7 @@ object RetryStrategy { /** * Log errors when retrying */ - def logError[E](logger: ScalaLoggingLogger, action: String): (E, RetryDetails) => IO[E, Unit] = { + def logError[E](logger: ScalaLoggingLogger, action: String): (E, RetryDetails) => BIO[E, Unit] = { case (err, WillDelayAndRetry(nextDelay, retriesSoFar, _)) => val message = s"""Error $err while $action: retrying in ${nextDelay.toMillis}ms (retries so far: $retriesSoFar)""" UIO.delay(logger.warn(message)) @@ -59,7 +59,7 @@ object RetryStrategy { /** * Log errors when retrying */ - def logError[E](logger: org.typelevel.log4cats.Logger[CIO], action: String): (E, RetryDetails) => CIO[Unit] = { + def logError[E](logger: org.typelevel.log4cats.Logger[IO], action: String): (E, RetryDetails) => IO[Unit] = { case (err, WillDelayAndRetry(nextDelay, retriesSoFar, _)) => val message = s"""Error $err while $action: retrying in ${nextDelay.toMillis}ms (retries so far: $retriesSoFar)""" logger.warn(message) @@ -71,7 +71,7 @@ object RetryStrategy { /** * Log errors when retrying */ - def logError[E](logger: Logger, action: String): (E, RetryDetails) => IO[E, Unit] = { + def logError[E](logger: Logger, action: String): (E, RetryDetails) => BIO[E, Unit] = { case (err, WillDelayAndRetry(nextDelay, retriesSoFar, _)) => val message = s"""Error $err while $action: retrying in ${nextDelay.toMillis}ms (retries so far: $retriesSoFar)""" logger.warn(message) @@ -85,7 +85,7 @@ object RetryStrategy { * @param onError * what action to perform on error */ - def alwaysGiveUp[E](onError: (E, RetryDetails) => IO[E, Unit]): RetryStrategy[E] = + def alwaysGiveUp[E](onError: (E, RetryDetails) => BIO[E, Unit]): RetryStrategy[E] = RetryStrategy(RetryStrategyConfig.AlwaysGiveUp, _ => false, onError) /** @@ -103,7 +103,7 @@ object RetryStrategy { constant: FiniteDuration, maxRetries: Int, retryWhen: E => Boolean, - onError: (E, RetryDetails) => IO[E, Unit] + onError: (E, RetryDetails) => BIO[E, Unit] ): RetryStrategy[E] = RetryStrategy( RetryStrategyConfig.ConstantStrategyConfig(constant, maxRetries), @@ -141,7 +141,7 @@ object RetryStrategy { def retryOnNonFatal( config: RetryStrategyConfig, - logger: org.typelevel.log4cats.Logger[CIO], + logger: org.typelevel.log4cats.Logger[IO], action: String ): RetryStrategy[Throwable] = RetryStrategy( @@ -156,7 +156,7 @@ object RetryStrategy { * Configuration for a [[RetryStrategy]] */ sealed trait RetryStrategyConfig extends Product with Serializable { - def toPolicy[E]: RetryPolicy[IO[E, *]] + def toPolicy[E]: RetryPolicy[BIO[E, *]] } @@ -166,7 +166,7 @@ object RetryStrategyConfig { * Fails without retry */ case object AlwaysGiveUp extends RetryStrategyConfig { - override def toPolicy[E]: RetryPolicy[IO[E, *]] = alwaysGiveUp[IO[E, *]] + override def toPolicy[E]: RetryPolicy[BIO[E, *]] = alwaysGiveUp[BIO[E, *]] } /** @@ -177,8 +177,8 @@ object RetryStrategyConfig { * the maximum number of retries */ final case class ConstantStrategyConfig(delay: FiniteDuration, maxRetries: Int) extends RetryStrategyConfig { - override def toPolicy[E]: RetryPolicy[IO[E, *]] = - constantDelay[IO[E, *]](delay) join limitRetries(maxRetries) + override def toPolicy[E]: RetryPolicy[BIO[E, *]] = + constantDelay[BIO[E, *]](delay) join limitRetries(maxRetries) } /** @@ -187,8 +187,8 @@ object RetryStrategyConfig { * the interval before the retry will be attempted */ final case class OnceStrategyConfig(delay: FiniteDuration) extends RetryStrategyConfig { - override def toPolicy[E]: RetryPolicy[IO[E, *]] = - constantDelay[IO[E, *]](delay) join limitRetries(1) + override def toPolicy[E]: RetryPolicy[BIO[E, *]] = + constantDelay[BIO[E, *]](delay) join limitRetries(1) } /** @@ -202,8 +202,8 @@ object RetryStrategyConfig { */ final case class ExponentialStrategyConfig(initialDelay: FiniteDuration, maxDelay: FiniteDuration, maxRetries: Int) extends RetryStrategyConfig { - override def toPolicy[E]: RetryPolicy[IO[E, *]] = - capDelay[IO[E, *]](maxDelay, fullJitter(initialDelay)) join limitRetries(maxRetries) + override def toPolicy[E]: RetryPolicy[BIO[E, *]] = + capDelay[BIO[E, *]](maxDelay, fullJitter(initialDelay)) join limitRetries(maxRetries) } /** @@ -215,7 +215,7 @@ object RetryStrategyConfig { */ final case class MaximumCumulativeDelayConfig(threshold: FiniteDuration, delay: FiniteDuration) extends RetryStrategyConfig { - override def toPolicy[E]: RetryPolicy[IO[E, *]] = + override def toPolicy[E]: RetryPolicy[BIO[E, *]] = RetryPolicies.limitRetriesByCumulativeDelay( threshold, RetryPolicies.constantDelay(delay) diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutes.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutes.scala index 5fbcbb387a..307f3b5e74 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutes.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutes.scala @@ -2,7 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.routes import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.Route -import cats.effect.{ContextShift, IO => CIO} +import cats.effect.{ContextShift, IO} import cats.implicits.catsSyntaxApplicativeError import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.IndexingViewDef.ActiveViewDef @@ -32,7 +32,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectionErrors, Pro import io.circe.Encoder import io.circe.generic.semiauto.deriveEncoder import io.circe.syntax._ -import monix.bio.IO +import monix.bio.{IO => BIO} import monix.execution.Scheduler class BlazegraphViewsIndexingRoutes( fetch: FetchIndexingView, @@ -44,7 +44,7 @@ class BlazegraphViewsIndexingRoutes( )(implicit baseUri: BaseUri, s: Scheduler, - c: ContextShift[CIO], + c: ContextShift[IO], cr: RemoteContextResolution, ordering: JsonKeyOrdering, pc: PaginationConfig @@ -137,7 +137,7 @@ class BlazegraphViewsIndexingRoutes( object BlazegraphViewsIndexingRoutes { - type FetchIndexingView = (IdSegment, ProjectRef) => IO[BlazegraphViewRejection, ActiveViewDef] + type FetchIndexingView = (IdSegment, ProjectRef) => BIO[BlazegraphViewRejection, ActiveViewDef] /** * @return @@ -153,7 +153,7 @@ object BlazegraphViewsIndexingRoutes { )(implicit baseUri: BaseUri, s: Scheduler, - c: ContextShift[CIO], + c: ContextShift[IO], cr: RemoteContextResolution, ordering: JsonKeyOrdering, pc: PaginationConfig diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClient.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClient.scala index 89c232244b..a3793b890e 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClient.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClient.scala @@ -4,10 +4,10 @@ import akka.actor.typed.ActorSystem import akka.http.scaladsl.client.RequestBuilding.{Get, Head} import akka.http.scaladsl.model.ContentTypes.`application/json` import akka.http.scaladsl.model.Uri.Query -import akka.http.scaladsl.model.headers.{`Last-Event-ID`, Accept} +import akka.http.scaladsl.model.headers.{Accept, `Last-Event-ID`} import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCodes} import akka.stream.alpakka.sse.scaladsl.EventSource -import cats.effect.ContextShift +import cats.effect.{ContextShift, IO} import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.MigrateEffectSyntax import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewSource.RemoteProjectSource import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.stream.CompositeBranch @@ -28,8 +28,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{Elem, RemainingElems} import com.typesafe.scalalogging.Logger import io.circe.parser.decode import fs2._ -import cats.effect.{IO => CIO} -import monix.bio.{IO, UIO} +import monix.bio.{UIO, IO => BIO} import monix.execution.Scheduler import scala.concurrent.Future @@ -90,7 +89,7 @@ object DeltaClient { )(implicit as: ActorSystem[Nothing], scheduler: Scheduler, - c: ContextShift[CIO] + c: ContextShift[IO] ) extends DeltaClient with MigrateEffectSyntax { @@ -122,7 +121,7 @@ object DeltaClient { for { authToken <- authTokenProvider(credentials).toBIO result <- client(Head(elemAddress(source)).withCredentials(authToken)) { - case resp if resp.status.isSuccess() => UIO.delay(resp.discardEntityBytes()) >> IO.unit + case resp if resp.status.isSuccess() => UIO.delay(resp.discardEntityBytes()) >> BIO.unit } } yield result } @@ -136,7 +135,7 @@ object DeltaClient { def send(request: HttpRequest): Future[HttpResponse] = { (for { authToken <- authTokenProvider(credentials).toBIO - result <- client[HttpResponse](request.withCredentials(authToken))(IO.pure(_)) + result <- client[HttpResponse](request.withCredentials(authToken))(BIO.pure(_)) } yield result).runToFuture } @@ -192,7 +191,7 @@ object DeltaClient { )(implicit as: ActorSystem[Nothing], sc: Scheduler, - c: ContextShift[CIO] + c: ContextShift[IO] ): DeltaClient = new DeltaClientImpl(client, authTokenProvider, credentials, retryDelay) } diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsIndexingRoutes.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsIndexingRoutes.scala index 0ca1827e60..ca093ce5a3 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsIndexingRoutes.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsIndexingRoutes.scala @@ -2,9 +2,8 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.routes import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.Route -import cats.effect.ContextShift +import cats.effect.{ContextShift, IO} import cats.syntax.all._ -import cats.effect.{IO => CIO} import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.routes.BlazegraphViewsDirectives import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef.ActiveViewDef import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewRejection._ @@ -32,7 +31,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{FailedElemLogRow, ProjectRe import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.projections.ProjectionErrors import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ -import monix.bio.IO +import monix.bio.{IO => BIO} import monix.execution.Scheduler class CompositeViewsIndexingRoutes( identities: Identities, @@ -47,7 +46,7 @@ class CompositeViewsIndexingRoutes( baseUri: BaseUri, paginationConfig: PaginationConfig, s: Scheduler, - c: ContextShift[CIO], + c: ContextShift[IO], cr: RemoteContextResolution, ordering: JsonKeyOrdering ) extends AuthDirectives(identities, aclCheck) @@ -238,12 +237,12 @@ class CompositeViewsIndexingRoutes( private def fetchProjection(view: ActiveViewDef, projectionId: IdSegment) = expandId(projectionId, view.project).flatMap { id => - IO.fromEither(view.projection(id)) + BIO.fromEither(view.projection(id)) } private def fetchSource(view: ActiveViewDef, sourceId: IdSegment) = expandId(sourceId, view.project).flatMap { id => - IO.fromEither(view.source(id)) + BIO.fromEither(view.source(id)) } } @@ -267,7 +266,7 @@ object CompositeViewsIndexingRoutes { baseUri: BaseUri, paginationConfig: PaginationConfig, s: Scheduler, - c: ContextShift[CIO], + c: ContextShift[IO], cr: RemoteContextResolution, ordering: JsonKeyOrdering ): Route = diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchIndexingRoutes.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchIndexingRoutes.scala index 5c8e40889d..159f0354ed 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchIndexingRoutes.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchIndexingRoutes.scala @@ -2,7 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.routes import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server._ -import cats.effect.{ContextShift, IO => CIO} +import cats.effect.{ContextShift, IO} import cats.implicits.catsSyntaxApplicativeError import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchViewsQuery @@ -34,7 +34,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectionErrors, Pro import io.circe.Encoder import io.circe.generic.semiauto.deriveEncoder import io.circe.syntax._ -import monix.bio.IO +import monix.bio.{IO => BIO} import monix.execution.Scheduler /** @@ -65,7 +65,7 @@ final class ElasticSearchIndexingRoutes( baseUri: BaseUri, paginationConfig: PaginationConfig, s: Scheduler, - c: ContextShift[CIO], + c: ContextShift[IO], cr: RemoteContextResolution, ordering: JsonKeyOrdering ) extends AuthDirectives(identities, aclCheck) @@ -161,7 +161,7 @@ final class ElasticSearchIndexingRoutes( object ElasticSearchIndexingRoutes { - type FetchIndexingView = (IdSegment, ProjectRef) => IO[ElasticSearchViewRejection, ActiveViewDef] + type FetchIndexingView = (IdSegment, ProjectRef) => BIO[ElasticSearchViewRejection, ActiveViewDef] /** * @return @@ -179,7 +179,7 @@ object ElasticSearchIndexingRoutes { baseUri: BaseUri, paginationConfig: PaginationConfig, s: Scheduler, - c: ContextShift[CIO], + c: ContextShift[IO], cr: RemoteContextResolution, ordering: JsonKeyOrdering ): Route =