diff --git a/core/src/main/scala/org/typelevel/keypool/KeyPool.scala b/core/src/main/scala/org/typelevel/keypool/KeyPool.scala index 5915dc8d..ed337709 100644 --- a/core/src/main/scala/org/typelevel/keypool/KeyPool.scala +++ b/core/src/main/scala/org/typelevel/keypool/KeyPool.scala @@ -39,12 +39,11 @@ trait KeyPool[F[_], A, B] { object KeyPool { private[keypool] final class KeyPoolConcrete[F[_]: Temporal, A, B] private[keypool] ( - private[keypool] val kpCreate: A => F[B], - private[keypool] val kpDestroy: B => F[Unit], + private[keypool] val kpRes: A => Resource[F, B], private[keypool] val kpDefaultReuseState: Reusable, private[keypool] val kpMaxPerKey: A => Int, private[keypool] val kpMaxTotal: Int, - private[keypool] val kpVar: Ref[F, PoolMap[A, B]] + private[keypool] val kpVar: Ref[F, PoolMap[A, (B, F[Unit])]] ) extends KeyPool[F, A, B] { def take(k: A): Resource[F, Managed[F, B]] = @@ -91,17 +90,16 @@ object KeyPool { * Make a 'KeyPool' inactive and destroy all idle resources. */ private[keypool] def destroy[F[_]: MonadThrow, A, B]( - kpDestroy: B => F[Unit], - kpVar: Ref[F, PoolMap[A, B]] + kpVar: Ref[F, PoolMap[A, (B, F[Unit])]] ): F[Unit] = for { - m <- kpVar.getAndSet(PoolMap.closed[A, B]) + m <- kpVar.getAndSet(PoolMap.closed[A, (B, F[Unit])]) _ <- m match { case PoolClosed() => Applicative[F].unit case PoolOpen(_, m2) => m2.toList.traverse_ { case (_, pl) => pl.toList .traverse_ { case (_, r) => - kpDestroy(r).attempt.void + r._2.attempt.void } } } @@ -112,9 +110,8 @@ object KeyPool { * switches to PoolClosed. */ private[keypool] def reap[F[_], A, B]( - destroy: B => F[Unit], idleTimeAllowedInPoolNanos: FiniteDuration, - kpVar: Ref[F, PoolMap[A, B]], + kpVar: Ref[F, PoolMap[A, (B, F[Unit])]], onReaperException: Throwable => F[Unit] )(implicit F: Temporal[F]): F[Unit] = { // We are going to do non-referentially tranpsarent things as we may be waiting for our modification to go through @@ -122,8 +119,8 @@ object KeyPool { def findStale( now: FiniteDuration, idleCount: Int, - m: Map[A, PoolList[B]] - ): (PoolMap[A, B], List[(A, B)]) = { + m: Map[A, PoolList[(B, F[Unit])]] + ): (PoolMap[A, (B, F[Unit])], List[(A, (B, F[Unit]))]) = { val isNotStale: FiniteDuration => Boolean = time => time + idleTimeAllowedInPoolNanos >= now // Time value is alright inside the KeyPool in nanos. @@ -134,23 +131,24 @@ object KeyPool { // [(key, PoolList resource)] -> // (Map key (PoolList resource), [resource]) def findStale_( - toKeep: List[(A, PoolList[B])] => List[(A, PoolList[B])], - toDestroy: List[(A, B)] => List[(A, B)], - l: List[(A, PoolList[B])] - ): (Map[A, PoolList[B]], List[(A, B)]) = { + toKeep: List[(A, PoolList[(B, F[Unit])])] => List[(A, PoolList[(B, F[Unit])])], + toDestroy: List[(A, (B, F[Unit]))] => List[(A, (B, F[Unit]))], + l: List[(A, PoolList[(B, F[Unit])])] + ): (Map[A, PoolList[(B, F[Unit])]], List[(A, (B, F[Unit]))]) = { l match { case Nil => (toKeep(List.empty).toMap, toDestroy(List.empty)) case (key, pList) :: rest => // Can use span since we know everything will be ordered as the time is // when it is placed back into the pool. val (notStale, stale) = pList.toList.span(r => isNotStale(r._1)) - val toDestroy_ : List[(A, B)] => List[(A, B)] = l => + val toDestroy_ : List[(A, (B, F[Unit]))] => List[(A, (B, F[Unit]))] = l => toDestroy((stale.map(t => (key -> t._2)) ++ l)) - val toKeep_ : List[(A, PoolList[B])] => List[(A, PoolList[B])] = l => - PoolList.fromList(notStale) match { - case None => toKeep(l) - case Some(x) => toKeep((key, x) :: l) - } + val toKeep_ : List[(A, PoolList[(B, F[Unit])])] => List[(A, PoolList[(B, F[Unit])])] = + l => + PoolList.fromList(notStale) match { + case None => toKeep(l) + case Some(x) => toKeep((key, x) :: l) + } findStale_(toKeep_, toDestroy_, rest) } } @@ -175,7 +173,7 @@ object KeyPool { val (m_, toDestroy) = findStale(now, idleCount, m) ( m_, - toDestroy.traverse_(r => destroy(r._2)).attempt.flatMap { + toDestroy.traverse_(_._2._2).attempt.flatMap { case Left(t) => onReaperException(t) // .handleErrorWith(t => F.delay(t.printStackTrace())) // CHEATING? case Right(()) => F.unit @@ -193,7 +191,7 @@ object KeyPool { } private[keypool] def state[F[_]: Functor, A, B]( - kpVar: Ref[F, PoolMap[A, B]] + kpVar: Ref[F, PoolMap[A, (B, F[Unit])]] ): F[(Int, Map[A, Int])] = kpVar.get.map(pm => pm match { @@ -212,7 +210,8 @@ object KeyPool { private[keypool] def put[F[_]: Temporal, A, B]( kp: KeyPoolConcrete[F, A, B], k: A, - r: B + r: B, + destroy: F[Unit] ): F[Unit] = { def addToList[Z]( now: FiniteDuration, @@ -229,23 +228,24 @@ object KeyPool { else (l, Some(x)) } } - def go(now: FiniteDuration, pc: PoolMap[A, B]): (PoolMap[A, B], F[Unit]) = pc match { - case p @ PoolClosed() => (p, kp.kpDestroy(r)) - case p @ PoolOpen(idleCount, m) => - if (idleCount > kp.kpMaxTotal) (p, kp.kpDestroy(r)) - else - m.get(k) match { - case None => - val cnt_ = idleCount + 1 - val m_ = PoolMap.open(cnt_, m + (k -> One(r, now))) - (m_, Applicative[F].pure(())) - case Some(l) => - val (l_, mx) = addToList(now, kp.kpMaxPerKey(k), r, l) - val cnt_ = idleCount + mx.fold(1)(_ => 0) - val m_ = PoolMap.open(cnt_, m + (k -> l_)) - (m_, mx.fold(Applicative[F].unit)(r => kp.kpDestroy(r))) - } - } + def go(now: FiniteDuration, pc: PoolMap[A, (B, F[Unit])]): (PoolMap[A, (B, F[Unit])], F[Unit]) = + pc match { + case p @ PoolClosed() => (p, destroy) + case p @ PoolOpen(idleCount, m) => + if (idleCount > kp.kpMaxTotal) (p, destroy) + else + m.get(k) match { + case None => + val cnt_ = idleCount + 1 + val m_ = PoolMap.open(cnt_, m + (k -> One((r, destroy), now))) + (m_, Applicative[F].pure(())) + case Some(l) => + val (l_, mx) = addToList(now, kp.kpMaxPerKey(k), (r, destroy), l) + val cnt_ = idleCount + mx.fold(1)(_ => 0) + val m_ = PoolMap.open(cnt_, m + (k -> l_)) + (m_, mx.fold(Applicative[F].unit)(_ => destroy)) + } + } Clock[F].monotonic.flatMap { now => kp.kpVar.modify(pm => go(now, pm)).flatten @@ -256,31 +256,135 @@ object KeyPool { kp: KeyPoolConcrete[F, A, B], k: A ): Resource[F, Managed[F, B]] = { - def go(pm: PoolMap[A, B]): (PoolMap[A, B], Option[B]) = pm match { - case p @ PoolClosed() => (p, None) - case pOrig @ PoolOpen(idleCount, m) => - m.get(k) match { - case None => (pOrig, None) - case Some(One(a, _)) => - (PoolMap.open(idleCount - 1, m - (k)), Some(a)) - case Some(Cons(a, _, _, rest)) => - (PoolMap.open(idleCount - 1, m + (k -> rest)), Some(a)) - } - } + def go(pm: PoolMap[A, (B, F[Unit])]): (PoolMap[A, (B, F[Unit])], Option[(B, F[Unit])]) = + pm match { + case p @ PoolClosed() => (p, None) + case pOrig @ PoolOpen(idleCount, m) => + m.get(k) match { + case None => (pOrig, None) + case Some(One(a, _)) => + (PoolMap.open(idleCount - 1, m - (k)), Some(a)) + case Some(Cons(a, _, _, rest)) => + (PoolMap.open(idleCount - 1, m + (k -> rest)), Some(a)) + } + } for { optR <- Resource.eval(kp.kpVar.modify(go)) releasedState <- Resource.eval(Ref[F].of[Reusable](kp.kpDefaultReuseState)) - resource <- Resource.make(optR.fold(kp.kpCreate(k))(r => Applicative[F].pure(r))) { + resource <- Resource.make(optR.fold(kp.kpRes(k).allocated)(r => Applicative[F].pure(r))) { resource => for { reusable <- releasedState.get out <- reusable match { - case Reusable.Reuse => put(kp, k, resource).attempt.void - case Reusable.DontReuse => kp.kpDestroy(resource).attempt.void + case Reusable.Reuse => put(kp, k, resource._1, resource._2).attempt.void + case Reusable.DontReuse => resource._2.attempt.void } } yield out } - } yield new Managed(resource, optR.isDefined, releasedState) + } yield new Managed(resource._1, optR.isDefined, releasedState) + } + + final class Builder[F[_]: Temporal, A, B] private ( + val kpRes: A => Resource[F, B], + val kpDefaultReuseState: Reusable, + val idleTimeAllowedInPool: Duration, + val kpMaxPerKey: A => Int, + val kpMaxTotal: Int, + val onReaperException: Throwable => F[Unit] + ) { + private def copy( + kpRes: A => Resource[F, B] = this.kpRes, + kpDefaultReuseState: Reusable = this.kpDefaultReuseState, + idleTimeAllowedInPool: Duration = this.idleTimeAllowedInPool, + kpMaxPerKey: A => Int = this.kpMaxPerKey, + kpMaxTotal: Int = this.kpMaxTotal, + onReaperException: Throwable => F[Unit] = this.onReaperException + ): Builder[F, A, B] = new Builder[F, A, B]( + kpRes, + kpDefaultReuseState, + idleTimeAllowedInPool, + kpMaxPerKey, + kpMaxTotal, + onReaperException + ) + + def doOnCreate(f: B => F[Unit]): Builder[F, A, B] = + copy(kpRes = { (k: A) => this.kpRes(k).flatMap(v => Resource.eval(f(v).attempt.void.as(v))) }) + + def doOnDestroy(f: B => F[Unit]): Builder[F, A, B] = + copy(kpRes = { (k: A) => + this.kpRes(k).flatMap(v => Resource.make(Applicative[F].unit)(_ => f(v).attempt.void).as(v)) + }) + + def withDefaultReuseState(defaultReuseState: Reusable) = + copy(kpDefaultReuseState = defaultReuseState) + + def withIdleTimeAllowedInPool(duration: Duration) = + copy(idleTimeAllowedInPool = duration) + + def withMaxPerKey(f: A => Int): Builder[F, A, B] = + copy(kpMaxPerKey = f) + + def withMaxTotal(total: Int): Builder[F, A, B] = + copy(kpMaxTotal = total) + + def withOnReaperException(f: Throwable => F[Unit]) = + copy(onReaperException = f) + + def build: Resource[F, KeyPool[F, A, B]] = { + def keepRunning[Z](fa: F[Z]): F[Z] = + fa.onError { case e => onReaperException(e) }.attempt >> keepRunning(fa) + for { + kpVar <- Resource.make( + Ref[F].of[PoolMap[A, (B, F[Unit])]](PoolMap.open(0, Map.empty[A, PoolList[(B, F[Unit])]])) + )(kpVar => KeyPool.destroy(kpVar)) + _ <- idleTimeAllowedInPool match { + case fd: FiniteDuration => + val nanos = 0.seconds.max(fd) + Resource.make( + Concurrent[F].start(keepRunning(KeyPool.reap(nanos, kpVar, onReaperException))) + )(_.cancel) + case _ => + Applicative[Resource[F, *]].unit + } + } yield new KeyPool.KeyPoolConcrete( + kpRes, + kpDefaultReuseState, + kpMaxPerKey, + kpMaxTotal, + kpVar + ) + } + + } + + object Builder { + def apply[F[_]: Temporal, A, B]( + res: A => Resource[F, B] + ): Builder[F, A, B] = new Builder[F, A, B]( + res, + Defaults.defaultReuseState, + Defaults.idleTimeAllowedInPool, + Defaults.maxPerKey, + Defaults.maxTotal, + Defaults.onReaperException[F] + ) + + def apply[F[_]: Temporal, A, B]( + create: A => F[B], + destroy: B => F[Unit] + ): Builder[F, A, B] = + apply(a => Resource.make(create(a))(destroy)) + + private object Defaults { + val defaultReuseState = Reusable.Reuse + val idleTimeAllowedInPool = 30.seconds + def maxPerKey[K](k: K): Int = Function.const(100)(k) + val maxTotal = 100 + def onReaperException[F[_]: Applicative] = { (t: Throwable) => + Function.const(Applicative[F].unit)(t) + } + } } } diff --git a/core/src/main/scala/org/typelevel/keypool/KeyPoolBuilder.scala b/core/src/main/scala/org/typelevel/keypool/KeyPoolBuilder.scala index 81df37ee..2607c47d 100644 --- a/core/src/main/scala/org/typelevel/keypool/KeyPoolBuilder.scala +++ b/core/src/main/scala/org/typelevel/keypool/KeyPoolBuilder.scala @@ -6,6 +6,7 @@ import cats.syntax.all._ import cats.effect.kernel._ import scala.concurrent.duration._ +@deprecated("use KeyPool.Builder", "0.4.7") final class KeyPoolBuilder[F[_]: Temporal, A, B] private ( val kpCreate: A => F[B], val kpDestroy: B => F[Unit], @@ -59,22 +60,19 @@ final class KeyPoolBuilder[F[_]: Temporal, A, B] private ( fa.onError { case e => onReaperException(e) }.attempt >> keepRunning(fa) for { kpVar <- Resource.make( - Ref[F].of[PoolMap[A, B]](PoolMap.open(0, Map.empty[A, PoolList[B]])) - )(kpVar => KeyPool.destroy(kpDestroy, kpVar)) + Ref[F].of[PoolMap[A, (B, F[Unit])]](PoolMap.open(0, Map.empty[A, PoolList[(B, F[Unit])]])) + )(kpVar => KeyPool.destroy(kpVar)) _ <- idleTimeAllowedInPool match { case fd: FiniteDuration => val nanos = 0.seconds.max(fd) Resource.make( - Concurrent[F].start( - keepRunning(KeyPool.reap(kpDestroy, nanos, kpVar, onReaperException)) - ) + Concurrent[F].start(keepRunning(KeyPool.reap(nanos, kpVar, onReaperException))) )(_.cancel) case _ => Applicative[Resource[F, *]].unit } } yield new KeyPool.KeyPoolConcrete( - kpCreate, - kpDestroy, + (a: A) => Resource.make[F, B](kpCreate(a))(kpDestroy), kpDefaultReuseState, kpMaxPerKey, kpMaxTotal, @@ -85,6 +83,7 @@ final class KeyPoolBuilder[F[_]: Temporal, A, B] private ( } object KeyPoolBuilder { + @deprecated("use KeyPool.Builder.apply", "0.4.7") def apply[F[_]: Temporal, A, B]( create: A => F[B], destroy: B => F[Unit] diff --git a/core/src/test/scala/org/typelevel/keypool/KeyPoolSpec.scala b/core/src/test/scala/org/typelevel/keypool/KeyPoolSpec.scala index c205b124..7cb97046 100644 --- a/core/src/test/scala/org/typelevel/keypool/KeyPoolSpec.scala +++ b/core/src/test/scala/org/typelevel/keypool/KeyPoolSpec.scala @@ -13,10 +13,12 @@ class KeypoolSpec extends CatsEffectSuite { test("Keep Resources marked to be kept") { def nothing(ref: Ref[IO, Int]): IO[Unit] = ref.get.void - KeyPoolBuilder( - (i: Int) => Ref.of[IO, Int](i), - nothing - ).withDefaultReuseState(Reusable.Reuse) + KeyPool + .Builder( + (i: Int) => Ref.of[IO, Int](i), + nothing + ) + .withDefaultReuseState(Reusable.Reuse) .withIdleTimeAllowedInPool(Duration.Inf) .withMaxPerKey(Function.const(10)) .withMaxTotal(10) @@ -33,10 +35,12 @@ class KeypoolSpec extends CatsEffectSuite { test("Delete Resources marked to be deleted") { def nothing(ref: Ref[IO, Int]): IO[Unit] = ref.get.void - KeyPoolBuilder( - (i: Int) => Ref.of[IO, Int](i), - nothing - ).withDefaultReuseState(Reusable.DontReuse) + KeyPool + .Builder( + (i: Int) => Ref.of[IO, Int](i), + nothing + ) + .withDefaultReuseState(Reusable.DontReuse) .withIdleTimeAllowedInPool(Duration.Inf) .withMaxPerKey(Function.const(10)) .withMaxTotal(10) @@ -53,10 +57,12 @@ class KeypoolSpec extends CatsEffectSuite { test("Delete Resource when pool is full") { def nothing(ref: Ref[IO, Int]): IO[Unit] = ref.get.void - KeyPoolBuilder( - (i: Int) => Ref.of[IO, Int](i), - nothing - ).withDefaultReuseState(Reusable.Reuse) + KeyPool + .Builder( + (i: Int) => Ref.of[IO, Int](i), + nothing + ) + .withDefaultReuseState(Reusable.Reuse) .withIdleTimeAllowedInPool(Duration.Inf) .withMaxPerKey(Function.const(1)) .withMaxTotal(1) @@ -76,10 +82,12 @@ class KeypoolSpec extends CatsEffectSuite { test("Used Resource Cleaned Up By Reaper") { def nothing(ref: Ref[IO, Int]): IO[Unit] = ref.get.void - KeyPoolBuilder( - (i: Int) => Ref.of[IO, Int](i), - nothing - ).withDefaultReuseState(Reusable.Reuse) + KeyPool + .Builder( + (i: Int) => Ref.of[IO, Int](i), + nothing + ) + .withDefaultReuseState(Reusable.Reuse) .withIdleTimeAllowedInPool(Duration.Zero) .withMaxPerKey(Function.const(1)) .withMaxTotal(1) @@ -102,10 +110,12 @@ class KeypoolSpec extends CatsEffectSuite { def nothing(ref: Ref[IO, Int]): IO[Unit] = ref.get.void - KeyPoolBuilder( - (i: Int) => Ref.of[IO, Int](i), - nothing - ).withDefaultReuseState(Reusable.Reuse) + KeyPool + .Builder( + (i: Int) => Ref.of[IO, Int](i), + nothing + ) + .withDefaultReuseState(Reusable.Reuse) .withIdleTimeAllowedInPool(30.seconds) .withMaxPerKey(Function.const(1)) .withMaxTotal(1)