Skip to content

Commit

Permalink
don't use CIO
Browse files Browse the repository at this point in the history
  • Loading branch information
shinyhappydan committed Oct 16, 2023
1 parent 91f6662 commit 2402545
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package ch.epfl.bluebrain.nexus.delta.kernel

import cats.effect.IO
import com.typesafe.scalalogging.{Logger => ScalaLoggingLogger}
import monix.bio.{IO, UIO}
import cats.effect.{IO => CIO}
import monix.bio.{UIO, IO => BIO}
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toMonixBIOOps
import pureconfig.ConfigReader
import pureconfig.error.{CannotConvert, ConfigReaderFailures, ConvertFailure}
Expand All @@ -27,17 +27,17 @@ import scala.util.control.NonFatal
final case class RetryStrategy[E](
config: RetryStrategyConfig,
retryWhen: E => Boolean,
onError: (E, RetryDetails) => IO[E, Unit]
onError: (E, RetryDetails) => BIO[E, Unit]
) {
val policy: RetryPolicy[IO[E, *]] = config.toPolicy[E]
val policy: RetryPolicy[BIO[E, *]] = config.toPolicy[E]
}

object RetryStrategy {

/**
* Apply the provided strategy on the given io
*/
def use[E, A](io: IO[E, A], retryStrategy: RetryStrategy[E]): IO[E, A] =
def use[E, A](io: BIO[E, A], retryStrategy: RetryStrategy[E]): BIO[E, A] =
io.retryingOnSomeErrors(
retryStrategy.retryWhen,
retryStrategy.policy,
Expand All @@ -47,7 +47,7 @@ object RetryStrategy {
/**
* Log errors when retrying
*/
def logError[E](logger: ScalaLoggingLogger, action: String): (E, RetryDetails) => IO[E, Unit] = {
def logError[E](logger: ScalaLoggingLogger, action: String): (E, RetryDetails) => BIO[E, Unit] = {
case (err, WillDelayAndRetry(nextDelay, retriesSoFar, _)) =>
val message = s"""Error $err while $action: retrying in ${nextDelay.toMillis}ms (retries so far: $retriesSoFar)"""
UIO.delay(logger.warn(message))
Expand All @@ -59,7 +59,7 @@ object RetryStrategy {
/**
* Log errors when retrying
*/
def logError[E](logger: org.typelevel.log4cats.Logger[CIO], action: String): (E, RetryDetails) => CIO[Unit] = {
def logError[E](logger: org.typelevel.log4cats.Logger[IO], action: String): (E, RetryDetails) => IO[Unit] = {
case (err, WillDelayAndRetry(nextDelay, retriesSoFar, _)) =>
val message = s"""Error $err while $action: retrying in ${nextDelay.toMillis}ms (retries so far: $retriesSoFar)"""
logger.warn(message)
Expand All @@ -71,7 +71,7 @@ object RetryStrategy {
/**
* Log errors when retrying
*/
def logError[E](logger: Logger, action: String): (E, RetryDetails) => IO[E, Unit] = {
def logError[E](logger: Logger, action: String): (E, RetryDetails) => BIO[E, Unit] = {
case (err, WillDelayAndRetry(nextDelay, retriesSoFar, _)) =>
val message = s"""Error $err while $action: retrying in ${nextDelay.toMillis}ms (retries so far: $retriesSoFar)"""
logger.warn(message)
Expand All @@ -85,7 +85,7 @@ object RetryStrategy {
* @param onError
* what action to perform on error
*/
def alwaysGiveUp[E](onError: (E, RetryDetails) => IO[E, Unit]): RetryStrategy[E] =
def alwaysGiveUp[E](onError: (E, RetryDetails) => BIO[E, Unit]): RetryStrategy[E] =
RetryStrategy(RetryStrategyConfig.AlwaysGiveUp, _ => false, onError)

/**
Expand All @@ -103,7 +103,7 @@ object RetryStrategy {
constant: FiniteDuration,
maxRetries: Int,
retryWhen: E => Boolean,
onError: (E, RetryDetails) => IO[E, Unit]
onError: (E, RetryDetails) => BIO[E, Unit]
): RetryStrategy[E] =
RetryStrategy(
RetryStrategyConfig.ConstantStrategyConfig(constant, maxRetries),
Expand Down Expand Up @@ -141,7 +141,7 @@ object RetryStrategy {

def retryOnNonFatal(
config: RetryStrategyConfig,
logger: org.typelevel.log4cats.Logger[CIO],
logger: org.typelevel.log4cats.Logger[IO],
action: String
): RetryStrategy[Throwable] =
RetryStrategy(
Expand All @@ -156,7 +156,7 @@ object RetryStrategy {
* Configuration for a [[RetryStrategy]]
*/
sealed trait RetryStrategyConfig extends Product with Serializable {
def toPolicy[E]: RetryPolicy[IO[E, *]]
def toPolicy[E]: RetryPolicy[BIO[E, *]]

}

Expand All @@ -166,7 +166,7 @@ object RetryStrategyConfig {
* Fails without retry
*/
case object AlwaysGiveUp extends RetryStrategyConfig {
override def toPolicy[E]: RetryPolicy[IO[E, *]] = alwaysGiveUp[IO[E, *]]
override def toPolicy[E]: RetryPolicy[BIO[E, *]] = alwaysGiveUp[BIO[E, *]]
}

/**
Expand All @@ -177,8 +177,8 @@ object RetryStrategyConfig {
* the maximum number of retries
*/
final case class ConstantStrategyConfig(delay: FiniteDuration, maxRetries: Int) extends RetryStrategyConfig {
override def toPolicy[E]: RetryPolicy[IO[E, *]] =
constantDelay[IO[E, *]](delay) join limitRetries(maxRetries)
override def toPolicy[E]: RetryPolicy[BIO[E, *]] =
constantDelay[BIO[E, *]](delay) join limitRetries(maxRetries)
}

/**
Expand All @@ -187,8 +187,8 @@ object RetryStrategyConfig {
* the interval before the retry will be attempted
*/
final case class OnceStrategyConfig(delay: FiniteDuration) extends RetryStrategyConfig {
override def toPolicy[E]: RetryPolicy[IO[E, *]] =
constantDelay[IO[E, *]](delay) join limitRetries(1)
override def toPolicy[E]: RetryPolicy[BIO[E, *]] =
constantDelay[BIO[E, *]](delay) join limitRetries(1)
}

/**
Expand All @@ -202,8 +202,8 @@ object RetryStrategyConfig {
*/
final case class ExponentialStrategyConfig(initialDelay: FiniteDuration, maxDelay: FiniteDuration, maxRetries: Int)
extends RetryStrategyConfig {
override def toPolicy[E]: RetryPolicy[IO[E, *]] =
capDelay[IO[E, *]](maxDelay, fullJitter(initialDelay)) join limitRetries(maxRetries)
override def toPolicy[E]: RetryPolicy[BIO[E, *]] =
capDelay[BIO[E, *]](maxDelay, fullJitter(initialDelay)) join limitRetries(maxRetries)
}

/**
Expand All @@ -215,7 +215,7 @@ object RetryStrategyConfig {
*/
final case class MaximumCumulativeDelayConfig(threshold: FiniteDuration, delay: FiniteDuration)
extends RetryStrategyConfig {
override def toPolicy[E]: RetryPolicy[IO[E, *]] =
override def toPolicy[E]: RetryPolicy[BIO[E, *]] =
RetryPolicies.limitRetriesByCumulativeDelay(
threshold,
RetryPolicies.constantDelay(delay)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.routes

import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import cats.effect.{ContextShift, IO => CIO}
import cats.effect.{ContextShift, IO}
import cats.implicits.catsSyntaxApplicativeError
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.IndexingViewDef.ActiveViewDef
Expand Down Expand Up @@ -32,7 +32,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectionErrors, Pro
import io.circe.Encoder
import io.circe.generic.semiauto.deriveEncoder
import io.circe.syntax._
import monix.bio.IO
import monix.bio.{IO => BIO}
import monix.execution.Scheduler
class BlazegraphViewsIndexingRoutes(
fetch: FetchIndexingView,
Expand All @@ -44,7 +44,7 @@ class BlazegraphViewsIndexingRoutes(
)(implicit
baseUri: BaseUri,
s: Scheduler,
c: ContextShift[CIO],
c: ContextShift[IO],
cr: RemoteContextResolution,
ordering: JsonKeyOrdering,
pc: PaginationConfig
Expand Down Expand Up @@ -137,7 +137,7 @@ class BlazegraphViewsIndexingRoutes(

object BlazegraphViewsIndexingRoutes {

type FetchIndexingView = (IdSegment, ProjectRef) => IO[BlazegraphViewRejection, ActiveViewDef]
type FetchIndexingView = (IdSegment, ProjectRef) => BIO[BlazegraphViewRejection, ActiveViewDef]

/**
* @return
Expand All @@ -153,7 +153,7 @@ object BlazegraphViewsIndexingRoutes {
)(implicit
baseUri: BaseUri,
s: Scheduler,
c: ContextShift[CIO],
c: ContextShift[IO],
cr: RemoteContextResolution,
ordering: JsonKeyOrdering,
pc: PaginationConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import akka.actor.typed.ActorSystem
import akka.http.scaladsl.client.RequestBuilding.{Get, Head}
import akka.http.scaladsl.model.ContentTypes.`application/json`
import akka.http.scaladsl.model.Uri.Query
import akka.http.scaladsl.model.headers.{`Last-Event-ID`, Accept}
import akka.http.scaladsl.model.headers.{Accept, `Last-Event-ID`}
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCodes}
import akka.stream.alpakka.sse.scaladsl.EventSource
import cats.effect.ContextShift
import cats.effect.{ContextShift, IO}
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.MigrateEffectSyntax
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewSource.RemoteProjectSource
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.stream.CompositeBranch
Expand All @@ -28,8 +28,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{Elem, RemainingElems}
import com.typesafe.scalalogging.Logger
import io.circe.parser.decode
import fs2._
import cats.effect.{IO => CIO}
import monix.bio.{IO, UIO}
import monix.bio.{UIO, IO => BIO}
import monix.execution.Scheduler

import scala.concurrent.Future
Expand Down Expand Up @@ -90,7 +89,7 @@ object DeltaClient {
)(implicit
as: ActorSystem[Nothing],
scheduler: Scheduler,
c: ContextShift[CIO]
c: ContextShift[IO]
) extends DeltaClient
with MigrateEffectSyntax {

Expand Down Expand Up @@ -122,7 +121,7 @@ object DeltaClient {
for {
authToken <- authTokenProvider(credentials).toBIO
result <- client(Head(elemAddress(source)).withCredentials(authToken)) {
case resp if resp.status.isSuccess() => UIO.delay(resp.discardEntityBytes()) >> IO.unit
case resp if resp.status.isSuccess() => UIO.delay(resp.discardEntityBytes()) >> BIO.unit
}
} yield result
}
Expand All @@ -136,7 +135,7 @@ object DeltaClient {
def send(request: HttpRequest): Future[HttpResponse] = {
(for {
authToken <- authTokenProvider(credentials).toBIO
result <- client[HttpResponse](request.withCredentials(authToken))(IO.pure(_))
result <- client[HttpResponse](request.withCredentials(authToken))(BIO.pure(_))
} yield result).runToFuture
}

Expand Down Expand Up @@ -192,7 +191,7 @@ object DeltaClient {
)(implicit
as: ActorSystem[Nothing],
sc: Scheduler,
c: ContextShift[CIO]
c: ContextShift[IO]
): DeltaClient =
new DeltaClientImpl(client, authTokenProvider, credentials, retryDelay)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.routes

import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import cats.effect.ContextShift
import cats.effect.{ContextShift, IO}
import cats.syntax.all._
import cats.effect.{IO => CIO}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.routes.BlazegraphViewsDirectives
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef.ActiveViewDef
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewRejection._
Expand Down Expand Up @@ -32,7 +31,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{FailedElemLogRow, ProjectRe
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.projections.ProjectionErrors
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import monix.bio.IO
import monix.bio.{IO => BIO}
import monix.execution.Scheduler
class CompositeViewsIndexingRoutes(
identities: Identities,
Expand All @@ -47,7 +46,7 @@ class CompositeViewsIndexingRoutes(
baseUri: BaseUri,
paginationConfig: PaginationConfig,
s: Scheduler,
c: ContextShift[CIO],
c: ContextShift[IO],
cr: RemoteContextResolution,
ordering: JsonKeyOrdering
) extends AuthDirectives(identities, aclCheck)
Expand Down Expand Up @@ -238,12 +237,12 @@ class CompositeViewsIndexingRoutes(

private def fetchProjection(view: ActiveViewDef, projectionId: IdSegment) =
expandId(projectionId, view.project).flatMap { id =>
IO.fromEither(view.projection(id))
BIO.fromEither(view.projection(id))
}

private def fetchSource(view: ActiveViewDef, sourceId: IdSegment) =
expandId(sourceId, view.project).flatMap { id =>
IO.fromEither(view.source(id))
BIO.fromEither(view.source(id))
}

}
Expand All @@ -267,7 +266,7 @@ object CompositeViewsIndexingRoutes {
baseUri: BaseUri,
paginationConfig: PaginationConfig,
s: Scheduler,
c: ContextShift[CIO],
c: ContextShift[IO],
cr: RemoteContextResolution,
ordering: JsonKeyOrdering
): Route =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.routes

import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server._
import cats.effect.{ContextShift, IO => CIO}
import cats.effect.{ContextShift, IO}
import cats.implicits.catsSyntaxApplicativeError
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchViewsQuery
Expand Down Expand Up @@ -34,7 +34,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectionErrors, Pro
import io.circe.Encoder
import io.circe.generic.semiauto.deriveEncoder
import io.circe.syntax._
import monix.bio.IO
import monix.bio.{IO => BIO}
import monix.execution.Scheduler

/**
Expand Down Expand Up @@ -65,7 +65,7 @@ final class ElasticSearchIndexingRoutes(
baseUri: BaseUri,
paginationConfig: PaginationConfig,
s: Scheduler,
c: ContextShift[CIO],
c: ContextShift[IO],
cr: RemoteContextResolution,
ordering: JsonKeyOrdering
) extends AuthDirectives(identities, aclCheck)
Expand Down Expand Up @@ -161,7 +161,7 @@ final class ElasticSearchIndexingRoutes(

object ElasticSearchIndexingRoutes {

type FetchIndexingView = (IdSegment, ProjectRef) => IO[ElasticSearchViewRejection, ActiveViewDef]
type FetchIndexingView = (IdSegment, ProjectRef) => BIO[ElasticSearchViewRejection, ActiveViewDef]

/**
* @return
Expand All @@ -179,7 +179,7 @@ object ElasticSearchIndexingRoutes {
baseUri: BaseUri,
paginationConfig: PaginationConfig,
s: Scheduler,
c: ContextShift[CIO],
c: ContextShift[IO],
cr: RemoteContextResolution,
ordering: JsonKeyOrdering
): Route =
Expand Down

0 comments on commit 2402545

Please sign in to comment.