Skip to content

Commit

Permalink
Merge pull request #249 from AL333Z/build-with-resource
Browse files Browse the repository at this point in the history
Add constructor from Resource
  • Loading branch information
rossabaker authored Sep 19, 2021
2 parents 70fcc9f + a7c79ae commit f2fb62f
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 83 deletions.
216 changes: 160 additions & 56 deletions core/src/main/scala/org/typelevel/keypool/KeyPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,11 @@ trait KeyPool[F[_], A, B] {
object KeyPool {

private[keypool] final class KeyPoolConcrete[F[_]: Temporal, A, B] private[keypool] (
private[keypool] val kpCreate: A => F[B],
private[keypool] val kpDestroy: B => F[Unit],
private[keypool] val kpRes: A => Resource[F, B],
private[keypool] val kpDefaultReuseState: Reusable,
private[keypool] val kpMaxPerKey: A => Int,
private[keypool] val kpMaxTotal: Int,
private[keypool] val kpVar: Ref[F, PoolMap[A, B]]
private[keypool] val kpVar: Ref[F, PoolMap[A, (B, F[Unit])]]
) extends KeyPool[F, A, B] {

def take(k: A): Resource[F, Managed[F, B]] =
Expand Down Expand Up @@ -91,17 +90,16 @@ object KeyPool {
* Make a 'KeyPool' inactive and destroy all idle resources.
*/
private[keypool] def destroy[F[_]: MonadThrow, A, B](
kpDestroy: B => F[Unit],
kpVar: Ref[F, PoolMap[A, B]]
kpVar: Ref[F, PoolMap[A, (B, F[Unit])]]
): F[Unit] = for {
m <- kpVar.getAndSet(PoolMap.closed[A, B])
m <- kpVar.getAndSet(PoolMap.closed[A, (B, F[Unit])])
_ <- m match {
case PoolClosed() => Applicative[F].unit
case PoolOpen(_, m2) =>
m2.toList.traverse_ { case (_, pl) =>
pl.toList
.traverse_ { case (_, r) =>
kpDestroy(r).attempt.void
r._2.attempt.void
}
}
}
Expand All @@ -112,18 +110,17 @@ object KeyPool {
* switches to PoolClosed.
*/
private[keypool] def reap[F[_], A, B](
destroy: B => F[Unit],
idleTimeAllowedInPoolNanos: FiniteDuration,
kpVar: Ref[F, PoolMap[A, B]],
kpVar: Ref[F, PoolMap[A, (B, F[Unit])]],
onReaperException: Throwable => F[Unit]
)(implicit F: Temporal[F]): F[Unit] = {
// We are going to do non-referentially tranpsarent things as we may be waiting for our modification to go through
// which may change the state depending on when the modification block is running atomically at the moment
def findStale(
now: FiniteDuration,
idleCount: Int,
m: Map[A, PoolList[B]]
): (PoolMap[A, B], List[(A, B)]) = {
m: Map[A, PoolList[(B, F[Unit])]]
): (PoolMap[A, (B, F[Unit])], List[(A, (B, F[Unit]))]) = {
val isNotStale: FiniteDuration => Boolean =
time =>
time + idleTimeAllowedInPoolNanos >= now // Time value is alright inside the KeyPool in nanos.
Expand All @@ -134,23 +131,24 @@ object KeyPool {
// [(key, PoolList resource)] ->
// (Map key (PoolList resource), [resource])
def findStale_(
toKeep: List[(A, PoolList[B])] => List[(A, PoolList[B])],
toDestroy: List[(A, B)] => List[(A, B)],
l: List[(A, PoolList[B])]
): (Map[A, PoolList[B]], List[(A, B)]) = {
toKeep: List[(A, PoolList[(B, F[Unit])])] => List[(A, PoolList[(B, F[Unit])])],
toDestroy: List[(A, (B, F[Unit]))] => List[(A, (B, F[Unit]))],
l: List[(A, PoolList[(B, F[Unit])])]
): (Map[A, PoolList[(B, F[Unit])]], List[(A, (B, F[Unit]))]) = {
l match {
case Nil => (toKeep(List.empty).toMap, toDestroy(List.empty))
case (key, pList) :: rest =>
// Can use span since we know everything will be ordered as the time is
// when it is placed back into the pool.
val (notStale, stale) = pList.toList.span(r => isNotStale(r._1))
val toDestroy_ : List[(A, B)] => List[(A, B)] = l =>
val toDestroy_ : List[(A, (B, F[Unit]))] => List[(A, (B, F[Unit]))] = l =>
toDestroy((stale.map(t => (key -> t._2)) ++ l))
val toKeep_ : List[(A, PoolList[B])] => List[(A, PoolList[B])] = l =>
PoolList.fromList(notStale) match {
case None => toKeep(l)
case Some(x) => toKeep((key, x) :: l)
}
val toKeep_ : List[(A, PoolList[(B, F[Unit])])] => List[(A, PoolList[(B, F[Unit])])] =
l =>
PoolList.fromList(notStale) match {
case None => toKeep(l)
case Some(x) => toKeep((key, x) :: l)
}
findStale_(toKeep_, toDestroy_, rest)
}
}
Expand All @@ -175,7 +173,7 @@ object KeyPool {
val (m_, toDestroy) = findStale(now, idleCount, m)
(
m_,
toDestroy.traverse_(r => destroy(r._2)).attempt.flatMap {
toDestroy.traverse_(_._2._2).attempt.flatMap {
case Left(t) => onReaperException(t)
// .handleErrorWith(t => F.delay(t.printStackTrace())) // CHEATING?
case Right(()) => F.unit
Expand All @@ -193,7 +191,7 @@ object KeyPool {
}

private[keypool] def state[F[_]: Functor, A, B](
kpVar: Ref[F, PoolMap[A, B]]
kpVar: Ref[F, PoolMap[A, (B, F[Unit])]]
): F[(Int, Map[A, Int])] =
kpVar.get.map(pm =>
pm match {
Expand All @@ -212,7 +210,8 @@ object KeyPool {
private[keypool] def put[F[_]: Temporal, A, B](
kp: KeyPoolConcrete[F, A, B],
k: A,
r: B
r: B,
destroy: F[Unit]
): F[Unit] = {
def addToList[Z](
now: FiniteDuration,
Expand All @@ -229,23 +228,24 @@ object KeyPool {
else (l, Some(x))
}
}
def go(now: FiniteDuration, pc: PoolMap[A, B]): (PoolMap[A, B], F[Unit]) = pc match {
case p @ PoolClosed() => (p, kp.kpDestroy(r))
case p @ PoolOpen(idleCount, m) =>
if (idleCount > kp.kpMaxTotal) (p, kp.kpDestroy(r))
else
m.get(k) match {
case None =>
val cnt_ = idleCount + 1
val m_ = PoolMap.open(cnt_, m + (k -> One(r, now)))
(m_, Applicative[F].pure(()))
case Some(l) =>
val (l_, mx) = addToList(now, kp.kpMaxPerKey(k), r, l)
val cnt_ = idleCount + mx.fold(1)(_ => 0)
val m_ = PoolMap.open(cnt_, m + (k -> l_))
(m_, mx.fold(Applicative[F].unit)(r => kp.kpDestroy(r)))
}
}
def go(now: FiniteDuration, pc: PoolMap[A, (B, F[Unit])]): (PoolMap[A, (B, F[Unit])], F[Unit]) =
pc match {
case p @ PoolClosed() => (p, destroy)
case p @ PoolOpen(idleCount, m) =>
if (idleCount > kp.kpMaxTotal) (p, destroy)
else
m.get(k) match {
case None =>
val cnt_ = idleCount + 1
val m_ = PoolMap.open(cnt_, m + (k -> One((r, destroy), now)))
(m_, Applicative[F].pure(()))
case Some(l) =>
val (l_, mx) = addToList(now, kp.kpMaxPerKey(k), (r, destroy), l)
val cnt_ = idleCount + mx.fold(1)(_ => 0)
val m_ = PoolMap.open(cnt_, m + (k -> l_))
(m_, mx.fold(Applicative[F].unit)(_ => destroy))
}
}

Clock[F].monotonic.flatMap { now =>
kp.kpVar.modify(pm => go(now, pm)).flatten
Expand All @@ -256,31 +256,135 @@ object KeyPool {
kp: KeyPoolConcrete[F, A, B],
k: A
): Resource[F, Managed[F, B]] = {
def go(pm: PoolMap[A, B]): (PoolMap[A, B], Option[B]) = pm match {
case p @ PoolClosed() => (p, None)
case pOrig @ PoolOpen(idleCount, m) =>
m.get(k) match {
case None => (pOrig, None)
case Some(One(a, _)) =>
(PoolMap.open(idleCount - 1, m - (k)), Some(a))
case Some(Cons(a, _, _, rest)) =>
(PoolMap.open(idleCount - 1, m + (k -> rest)), Some(a))
}
}
def go(pm: PoolMap[A, (B, F[Unit])]): (PoolMap[A, (B, F[Unit])], Option[(B, F[Unit])]) =
pm match {
case p @ PoolClosed() => (p, None)
case pOrig @ PoolOpen(idleCount, m) =>
m.get(k) match {
case None => (pOrig, None)
case Some(One(a, _)) =>
(PoolMap.open(idleCount - 1, m - (k)), Some(a))
case Some(Cons(a, _, _, rest)) =>
(PoolMap.open(idleCount - 1, m + (k -> rest)), Some(a))
}
}

for {
optR <- Resource.eval(kp.kpVar.modify(go))
releasedState <- Resource.eval(Ref[F].of[Reusable](kp.kpDefaultReuseState))
resource <- Resource.make(optR.fold(kp.kpCreate(k))(r => Applicative[F].pure(r))) {
resource <- Resource.make(optR.fold(kp.kpRes(k).allocated)(r => Applicative[F].pure(r))) {
resource =>
for {
reusable <- releasedState.get
out <- reusable match {
case Reusable.Reuse => put(kp, k, resource).attempt.void
case Reusable.DontReuse => kp.kpDestroy(resource).attempt.void
case Reusable.Reuse => put(kp, k, resource._1, resource._2).attempt.void
case Reusable.DontReuse => resource._2.attempt.void
}
} yield out
}
} yield new Managed(resource, optR.isDefined, releasedState)
} yield new Managed(resource._1, optR.isDefined, releasedState)
}

final class Builder[F[_]: Temporal, A, B] private (
val kpRes: A => Resource[F, B],
val kpDefaultReuseState: Reusable,
val idleTimeAllowedInPool: Duration,
val kpMaxPerKey: A => Int,
val kpMaxTotal: Int,
val onReaperException: Throwable => F[Unit]
) {
private def copy(
kpRes: A => Resource[F, B] = this.kpRes,
kpDefaultReuseState: Reusable = this.kpDefaultReuseState,
idleTimeAllowedInPool: Duration = this.idleTimeAllowedInPool,
kpMaxPerKey: A => Int = this.kpMaxPerKey,
kpMaxTotal: Int = this.kpMaxTotal,
onReaperException: Throwable => F[Unit] = this.onReaperException
): Builder[F, A, B] = new Builder[F, A, B](
kpRes,
kpDefaultReuseState,
idleTimeAllowedInPool,
kpMaxPerKey,
kpMaxTotal,
onReaperException
)

def doOnCreate(f: B => F[Unit]): Builder[F, A, B] =
copy(kpRes = { (k: A) => this.kpRes(k).flatMap(v => Resource.eval(f(v).attempt.void.as(v))) })

def doOnDestroy(f: B => F[Unit]): Builder[F, A, B] =
copy(kpRes = { (k: A) =>
this.kpRes(k).flatMap(v => Resource.make(Applicative[F].unit)(_ => f(v).attempt.void).as(v))
})

def withDefaultReuseState(defaultReuseState: Reusable) =
copy(kpDefaultReuseState = defaultReuseState)

def withIdleTimeAllowedInPool(duration: Duration) =
copy(idleTimeAllowedInPool = duration)

def withMaxPerKey(f: A => Int): Builder[F, A, B] =
copy(kpMaxPerKey = f)

def withMaxTotal(total: Int): Builder[F, A, B] =
copy(kpMaxTotal = total)

def withOnReaperException(f: Throwable => F[Unit]) =
copy(onReaperException = f)

def build: Resource[F, KeyPool[F, A, B]] = {
def keepRunning[Z](fa: F[Z]): F[Z] =
fa.onError { case e => onReaperException(e) }.attempt >> keepRunning(fa)
for {
kpVar <- Resource.make(
Ref[F].of[PoolMap[A, (B, F[Unit])]](PoolMap.open(0, Map.empty[A, PoolList[(B, F[Unit])]]))
)(kpVar => KeyPool.destroy(kpVar))
_ <- idleTimeAllowedInPool match {
case fd: FiniteDuration =>
val nanos = 0.seconds.max(fd)
Resource.make(
Concurrent[F].start(keepRunning(KeyPool.reap(nanos, kpVar, onReaperException)))
)(_.cancel)
case _ =>
Applicative[Resource[F, *]].unit
}
} yield new KeyPool.KeyPoolConcrete(
kpRes,
kpDefaultReuseState,
kpMaxPerKey,
kpMaxTotal,
kpVar
)
}

}

object Builder {
def apply[F[_]: Temporal, A, B](
res: A => Resource[F, B]
): Builder[F, A, B] = new Builder[F, A, B](
res,
Defaults.defaultReuseState,
Defaults.idleTimeAllowedInPool,
Defaults.maxPerKey,
Defaults.maxTotal,
Defaults.onReaperException[F]
)

def apply[F[_]: Temporal, A, B](
create: A => F[B],
destroy: B => F[Unit]
): Builder[F, A, B] =
apply(a => Resource.make(create(a))(destroy))

private object Defaults {
val defaultReuseState = Reusable.Reuse
val idleTimeAllowedInPool = 30.seconds
def maxPerKey[K](k: K): Int = Function.const(100)(k)
val maxTotal = 100
def onReaperException[F[_]: Applicative] = { (t: Throwable) =>
Function.const(Applicative[F].unit)(t)
}
}
}
}
13 changes: 6 additions & 7 deletions core/src/main/scala/org/typelevel/keypool/KeyPoolBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import cats.syntax.all._
import cats.effect.kernel._
import scala.concurrent.duration._

@deprecated("use KeyPool.Builder", "0.4.7")
final class KeyPoolBuilder[F[_]: Temporal, A, B] private (
val kpCreate: A => F[B],
val kpDestroy: B => F[Unit],
Expand Down Expand Up @@ -59,22 +60,19 @@ final class KeyPoolBuilder[F[_]: Temporal, A, B] private (
fa.onError { case e => onReaperException(e) }.attempt >> keepRunning(fa)
for {
kpVar <- Resource.make(
Ref[F].of[PoolMap[A, B]](PoolMap.open(0, Map.empty[A, PoolList[B]]))
)(kpVar => KeyPool.destroy(kpDestroy, kpVar))
Ref[F].of[PoolMap[A, (B, F[Unit])]](PoolMap.open(0, Map.empty[A, PoolList[(B, F[Unit])]]))
)(kpVar => KeyPool.destroy(kpVar))
_ <- idleTimeAllowedInPool match {
case fd: FiniteDuration =>
val nanos = 0.seconds.max(fd)
Resource.make(
Concurrent[F].start(
keepRunning(KeyPool.reap(kpDestroy, nanos, kpVar, onReaperException))
)
Concurrent[F].start(keepRunning(KeyPool.reap(nanos, kpVar, onReaperException)))
)(_.cancel)
case _ =>
Applicative[Resource[F, *]].unit
}
} yield new KeyPool.KeyPoolConcrete(
kpCreate,
kpDestroy,
(a: A) => Resource.make[F, B](kpCreate(a))(kpDestroy),
kpDefaultReuseState,
kpMaxPerKey,
kpMaxTotal,
Expand All @@ -85,6 +83,7 @@ final class KeyPoolBuilder[F[_]: Temporal, A, B] private (
}

object KeyPoolBuilder {
@deprecated("use KeyPool.Builder.apply", "0.4.7")
def apply[F[_]: Temporal, A, B](
create: A => F[B],
destroy: B => F[Unit]
Expand Down
Loading

0 comments on commit f2fb62f

Please sign in to comment.