From 757c8756ee1ef78ac1910c187832310d4996b833 Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Sat, 2 Apr 2022 18:03:56 -0700 Subject: [PATCH] upgrade zio version (#521) --- build.sbt | 2 +- .../zio/interop/test/CoreSummonSpec.scala | 2 +- .../stacktracer/InteropTracerSpec.scala | 2 +- .../zio/interop/CatsZManagedSyntaxSpec.scala | 6 +- .../scala/zio/interop/catzQueueSpec.scala | 33 +-- .../scala/zio/interop/fs2StreamSpec.scala | 6 +- .../src/test/scala/zio/interop/CatsSpec.scala | 18 +- .../test/scala/zio/interop/CatsSpecBase.scala | 72 +++--- .../test/scala/zio/interop/ZioSpecBase.scala | 5 +- .../src/main/scala/zio/interop/ZioAsync.scala | 4 +- .../src/main/scala/zio/interop/ZioAsync.scala | 4 +- .../src/main/scala/zio/interop/CHub.scala | 239 ------------------ .../src/main/scala/zio/interop/CQueue.scala | 185 -------------- .../src/main/scala/zio/interop/Dequeue.scala | 126 +++++++++ .../src/main/scala/zio/interop/Enqueue.scala | 98 +++++++ .../src/main/scala/zio/interop/Hub.scala | 126 +++++++++ .../src/main/scala/zio/interop/Queue.scala | 94 +++++++ .../src/main/scala/zio/interop/cats.scala | 36 +-- .../main/scala/zio/interop/catszmanaged.scala | 13 +- .../src/main/scala/zio/interop/package.scala | 12 - .../main/scala/zio/interop/stm/TQueue.scala | 14 +- .../src/main/scala/zio/interop/stm/TRef.scala | 12 +- .../zio/test/interop/CatsRunnableSpec.scala | 17 +- 23 files changed, 546 insertions(+), 580 deletions(-) delete mode 100644 zio-interop-cats/shared/src/main/scala/zio/interop/CHub.scala delete mode 100644 zio-interop-cats/shared/src/main/scala/zio/interop/CQueue.scala create mode 100644 zio-interop-cats/shared/src/main/scala/zio/interop/Dequeue.scala create mode 100644 zio-interop-cats/shared/src/main/scala/zio/interop/Enqueue.scala create mode 100644 zio-interop-cats/shared/src/main/scala/zio/interop/Hub.scala create mode 100644 zio-interop-cats/shared/src/main/scala/zio/interop/Queue.scala diff --git a/build.sbt b/build.sbt index 69ad0bdc..95a168ed 100644 --- a/build.sbt +++ b/build.sbt @@ -47,7 +47,7 @@ lazy val root = project unusedCompileDependenciesFilter -= moduleFilter("org.scala-js", "scalajs-library") ) -val zioVersion = "2.0.0-RC3" +val zioVersion = "2.0.0-RC4" val catsVersion = "2.6.1" val catsEffectVersion = "3.2.9" val catsMtlVersion = "1.2.1" diff --git a/core-only-test/shared/src/test/scala/zio/interop/test/CoreSummonSpec.scala b/core-only-test/shared/src/test/scala/zio/interop/test/CoreSummonSpec.scala index b659b3dc..77492baa 100644 --- a/core-only-test/shared/src/test/scala/zio/interop/test/CoreSummonSpec.scala +++ b/core-only-test/shared/src/test/scala/zio/interop/test/CoreSummonSpec.scala @@ -9,7 +9,7 @@ import zio.stream.interop.catz.core.* import zio.stream.{ Stream, ZStream } import zio.test.* -object CoreSummonSpec extends DefaultRunnableSpec { +object CoreSummonSpec extends ZIOSpecDefault { override def spec = suite("summons from catz.core work with only a cats-core dependency")( test("ZIO instances") { diff --git a/zio-interop-cats-tests/jvm/src/test/scala/zio/internal/stacktracer/InteropTracerSpec.scala b/zio-interop-cats-tests/jvm/src/test/scala/zio/internal/stacktracer/InteropTracerSpec.scala index 51b57700..a5781011 100644 --- a/zio-interop-cats-tests/jvm/src/test/scala/zio/internal/stacktracer/InteropTracerSpec.scala +++ b/zio-interop-cats-tests/jvm/src/test/scala/zio/internal/stacktracer/InteropTracerSpec.scala @@ -3,7 +3,7 @@ package zio.internal.stacktracer import zio.test._ import zio.ZTraceElement -object InteropTracerSpec extends DefaultRunnableSpec { +object InteropTracerSpec extends ZIOSpecDefault { private val myLambda: () => Any = () => () override def spec = diff --git a/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsZManagedSyntaxSpec.scala b/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsZManagedSyntaxSpec.scala index 5e20048e..2c16c2c5 100644 --- a/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsZManagedSyntaxSpec.scala +++ b/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsZManagedSyntaxSpec.scala @@ -246,7 +246,7 @@ object CatsZManagedSyntaxSpec extends CatsRunnableSpec { def man(x: Int): ZManaged[Any, Throwable, Unit] = ZManaged.acquireReleaseWith(ZIO.succeed(effects += x).unit)(_ => ZIO.succeed(effects += x + 1)) - val testCase = man(1).toResource[RIO[ZEnv, _]].use(_ => ZIO.unit) + val testCase = man(1).toResource[Task].use(_ => ZIO.unit) for { _ <- testCase effects <- ZIO.succeed(effects.toList) @@ -262,7 +262,7 @@ object CatsZManagedSyntaxSpec extends CatsRunnableSpec { ZIO.unit } - val testCase = man(1).toResource[RIO[ZEnv, _]].use(_ => ZIO.fail(new RuntimeException()).unit) + val testCase = man(1).toResource[Task].use(_ => ZIO.fail(new RuntimeException()).unit) for { _ <- testCase.orElse(ZIO.unit) effects <- ZIO.succeed(effects.toList) @@ -278,7 +278,7 @@ object CatsZManagedSyntaxSpec extends CatsRunnableSpec { ZIO.unit } - val testCase = man(1).toResource[RIO[ZEnv, _]].use(_ => ZIO.interrupt) + val testCase = man(1).toResource[Task].use(_ => ZIO.interrupt) for { _ <- testCase.orElse(ZIO.unit) effects <- ZIO.succeed(effects.toList) diff --git a/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/catzQueueSpec.scala b/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/catzQueueSpec.scala index 2fe26bc4..dfe7acdf 100644 --- a/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/catzQueueSpec.scala +++ b/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/catzQueueSpec.scala @@ -1,7 +1,6 @@ package zio.interop import cats.effect.kernel.Async -import cats.effect.std.Dispatcher import cats.effect.IO as CIO import cats.implicits.* import zio.test.Assertion.* @@ -10,7 +9,7 @@ import zio.test.interop.catz.test.* object catzQueueSpec extends CatsRunnableSpec { - def boundedQueueTest[F[+_]: Async: Dispatcher]: F[TestResult] = + def boundedQueueTest[F[+_]: Async]: F[TestResult] = for { q <- Queue.bounded[F, Int](1) _ <- q.offer(1) @@ -19,21 +18,21 @@ object catzQueueSpec extends CatsRunnableSpec { r2 <- q.takeAll } yield assert(r1)(equalTo(List(1))) && assert(r2)(equalTo(List(2))) - def droppingQueueTest[F[+_]: Async: Dispatcher]: F[TestResult] = + def droppingQueueTest[F[+_]: Async]: F[TestResult] = for { q <- Queue.dropping[F, Int](2) _ <- q.offerAll(List(1, 2, 3)) r <- q.takeAll } yield assert(r)(equalTo(List(1, 2))) - def slidingQueueTest[F[+_]: Async: Dispatcher]: F[TestResult] = + def slidingQueueTest[F[+_]: Async]: F[TestResult] = for { q <- Queue.sliding[F, Int](2) _ <- q.offerAll(List(1, 2, 3, 4)) r <- q.takeAll } yield assert(r)(equalTo(List(3, 4))) - def unboundedQueueTest[F[+_]: Async: Dispatcher]: F[TestResult] = + def unboundedQueueTest[F[+_]: Async]: F[TestResult] = for { q <- Queue.unbounded[F, Int] expected = Range.inclusive(0, 100) @@ -41,32 +40,10 @@ object catzQueueSpec extends CatsRunnableSpec { actual <- q.takeAll } yield assert(actual)(equalTo(expected.toList)) - def contramapQueueTest[F[+_]: Async: Dispatcher]: F[TestResult] = - for { - q <- Queue.unbounded[F, String] - q1 = q.contramap((i: Int) => i.toString) - data = Range.inclusive(0, 100) - _ <- q1.offerAll(data) - actual <- q1.takeAll - expected = data.map(_.toString) - } yield assert(actual)(equalTo(expected.toList)) - - def mapMQueueTest[F[+_]: Async: Dispatcher]: F[TestResult] = - for { - q <- Queue.unbounded[F, Int] - q1 = q.mapM(_.toString.pure[F]) - data = Range.inclusive(0, 100) - _ <- q1.offerAll(data) - actual <- q1.takeAll - expected = data.map(_.toString) - } yield assert(actual)(equalTo(expected.toList)) - def spec = suite("catzQueueSpec")( testF("can use a bounded queue from Cats Effect IO")(boundedQueueTest[CIO]), testF("can use a dropping queue from Cats Effect IO")(droppingQueueTest[CIO]), testF("can use a sliding queue from Cats Effect IO")(slidingQueueTest[CIO]), - testF("can use an unbounded queue from Cats Effect IO")(unboundedQueueTest[CIO]), - testF("can contramap a queue from Cats Effect IO")(contramapQueueTest[CIO]), - testF("can mapM a queue from Cats Effect IO")(mapMQueueTest[CIO]) + testF("can use an unbounded queue from Cats Effect IO")(unboundedQueueTest[CIO]) ) } diff --git a/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/fs2StreamSpec.scala b/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/fs2StreamSpec.scala index 37e9b560..9b7f28b0 100644 --- a/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/fs2StreamSpec.scala +++ b/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/fs2StreamSpec.scala @@ -1,14 +1,14 @@ package zio.interop import fs2.Stream -import zio.{ Chunk, Clock, RIO, Ref, Task } +import zio.{ Chunk, Ref, Task } import zio.stream.ZStream import zio.test.Assertion.{ equalTo, fails } import zio.test.* import zio.interop.catz.* import zio.Random.nextIntBetween -object fs2StreamSpec extends DefaultRunnableSpec { +object fs2StreamSpec extends ZIOSpecDefault { import zio.stream.interop.fs2z.* val exception: Throwable = new Exception("Failed") @@ -88,7 +88,7 @@ object fs2StreamSpec extends DefaultRunnableSpec { for { queueSize <- nextIntBetween(2, 128) result <- assertEqual( - fs2StreamFromChunk(chunk).covary[RIO[Clock, _]].toZStream(queueSize), + fs2StreamFromChunk(chunk).covary[Task].toZStream(queueSize), ZStream.fromChunk(chunk) ) } yield result diff --git a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpec.scala b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpec.scala index 845f52e1..2c44a676 100644 --- a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpec.scala +++ b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpec.scala @@ -13,27 +13,13 @@ import scala.concurrent.duration._ class CatsSpec extends ZioSpecBase { // ZIO tests - checkAllAsync( - "Async[RIO[Clock, _]]", - implicit tc => AsyncTests[RIO[Clock, _]].async[Int, Int, Int](100.millis) - ) checkAllAsync( "Async[Task]", - { implicit tc => - implicit val runtime: Runtime[Clock] = Runtime(environment, platform) - AsyncTests[Task].async[Int, Int, Int](100.millis) - } - ) - checkAllAsync( - "Temporal[RIO[Clock, _]]", - implicit tc => GenTemporalTests[RIO[Clock, _], Throwable].temporal[Int, Int, Int](100.millis) + implicit tc => AsyncTests[Task].async[Int, Int, Int](100.millis) ) checkAllAsync( "Temporal[Task]", - { implicit tc => - implicit val runtime: Runtime[Clock] = Runtime(environment, platform) - GenTemporalTests[Task, Throwable].temporal[Int, Int, Int](100.millis) - } + implicit tc => GenTemporalTests[Task, Throwable].temporal[Int, Int, Int](100.millis) ) checkAllAsync("GenSpawn[IO[Int, _], Int]", implicit tc => GenSpawnTests[IO[Int, _], Int].spawn[Int, Int, Int]) checkAllAsync("MonadError[IO[In t, _]]", implicit tc => MonadErrorTests[IO[Int, _], Int].monadError[Int, Int, Int]) diff --git a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpecBase.scala b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpecBase.scala index 5fa8fc74..f47d8664 100644 --- a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpecBase.scala +++ b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpecBase.scala @@ -37,38 +37,39 @@ private[zio] trait CatsSpecBase blockingExecutor = Executor.fromExecutionContext(1024)(ticker.ctx) ) - def environment(implicit ticker: Ticker): ZEnvironment[ZEnv] = { - - val testClock = new Clock { - - def instant(implicit trace: ZTraceElement): UIO[Instant] = - ??? - def localDateTime(implicit trace: ZTraceElement): UIO[LocalDateTime] = - ??? - def currentTime(unit: => TimeUnit)(implicit trace: ZTraceElement): UIO[Long] = - ZIO.succeed(ticker.ctx.now().toUnit(unit).toLong) - - def currentDateTime(implicit trace: ZTraceElement): UIO[OffsetDateTime] = - ZIO.succeed(OffsetDateTime.ofInstant(Instant.ofEpochMilli(ticker.ctx.now().toMillis), ZoneOffset.UTC)) - - def nanoTime(implicit trace: ZTraceElement): UIO[Long] = - ZIO.succeed(ticker.ctx.now().toNanos) - - def sleep(duration: => Duration)(implicit trace: ZTraceElement): UIO[Unit] = duration.asScala match { - case finite: FiniteDuration => - ZIO.asyncInterrupt { cb => - val cancel = ticker.ctx.schedule(finite, () => cb(UIO.unit)) - Left(UIO.succeed(cancel())) - } - case infinite: Infinite => - ZIO.dieMessage(s"Unexpected infinite duration $infinite passed to Ticker") - } - - def scheduler(implicit trace: ZTraceElement): UIO[Scheduler] = - ??? + val environment: ZEnvironment[Any] = + ZEnvironment(()) + + def testClock(implicit ticker: Ticker) = new Clock { + + def instant(implicit trace: ZTraceElement): UIO[Instant] = + ??? + def localDateTime(implicit trace: ZTraceElement): UIO[LocalDateTime] = + ??? + def currentTime(unit: => TimeUnit)(implicit trace: ZTraceElement): UIO[Long] = + ZIO.succeed(ticker.ctx.now().toUnit(unit).toLong) + + def currentDateTime(implicit trace: ZTraceElement): UIO[OffsetDateTime] = + ZIO.succeed(OffsetDateTime.ofInstant(Instant.ofEpochMilli(ticker.ctx.now().toMillis), ZoneOffset.UTC)) + + def javaClock(implicit trace: zio.ZTraceElement): zio.UIO[java.time.Clock] = + ??? + + def nanoTime(implicit trace: ZTraceElement): UIO[Long] = + ZIO.succeed(ticker.ctx.now().toNanos) + + def sleep(duration: => Duration)(implicit trace: ZTraceElement): UIO[Unit] = duration.asScala match { + case finite: FiniteDuration => + ZIO.asyncInterrupt { cb => + val cancel = ticker.ctx.schedule(finite, () => cb(UIO.unit)) + Left(UIO.succeed(cancel())) + } + case infinite: Infinite => + ZIO.dieMessage(s"Unexpected infinite duration $infinite passed to Ticker") } - ZEnv.Services.live ++ ZEnvironment(testClock) + def scheduler(implicit trace: ZTraceElement): UIO[Scheduler] = + ??? } def unsafeRun[E, A](io: IO[E, A])(implicit ticker: Ticker): Exit[E, Option[A]] = @@ -84,15 +85,18 @@ private[zio] trait CatsSpecBase } implicit def runtime(implicit ticker: Ticker): Runtime[Any] = - Runtime(ZEnvironment(()), platform) + Runtime(environment, platform) implicit val arbitraryAny: Arbitrary[Any] = Arbitrary(Gen.const(())) + implicit def arbitraryChunk[A: Arbitrary]: Arbitrary[Chunk[A]] = + Arbitrary(Gen.listOf(Arbitrary.arbitrary[A]).map(Chunk.fromIterable)) + implicit val cogenForAny: Cogen[Any] = Cogen(_.hashCode.toLong) - implicit def arbitraryEnvironment(implicit ticker: Ticker): Arbitrary[ZEnvironment[ZEnv]] = + implicit val arbitraryEnvironment: Arbitrary[ZEnvironment[Any]] = Arbitrary(Gen.const(environment)) implicit val eqForNothing: Eq[Nothing] = @@ -122,8 +126,8 @@ private[zio] trait CatsSpecBase implicit def eqForURIO[R: Arbitrary: Tag, A: Eq](implicit ticker: Ticker): Eq[URIO[R, A]] = eqForZIO[R, Nothing, A] - implicit def execRIO(rio: RIO[ZEnv, Boolean])(implicit ticker: Ticker): Prop = - rio.provideEnvironment(environment).toEffect[CIO] + implicit def execTask(task: Task[Boolean])(implicit ticker: Ticker): Prop = + ZEnv.services.locallyWith(_.add(testClock))(task).toEffect[CIO] implicit def orderForUIOofFiniteDuration(implicit ticker: Ticker): Order[UIO[FiniteDuration]] = Order.by(unsafeRun(_).toEither.toOption) diff --git a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/ZioSpecBase.scala b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/ZioSpecBase.scala index 4d39934b..3cd65e8a 100644 --- a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/ZioSpecBase.scala +++ b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/ZioSpecBase.scala @@ -17,15 +17,12 @@ private[interop] trait ZioSpecBase extends CatsSpecBase with ZioSpecBaseLowPrior implicit def arbitraryURManaged[R: Cogen: Tag, A: Arbitrary]: Arbitrary[URManaged[R, A]] = zManagedArbitrary[R, Nothing, A] - - implicit def arbitraryClockAndBlocking(implicit ticker: Ticker): Arbitrary[Clock] = - Arbitrary(Arbitrary.arbitrary[ZEnvironment[ZEnv]].map(_.get[Clock])) } private[interop] trait ZioSpecBaseLowPriority { self: ZioSpecBase => implicit def arbitraryClock(implicit ticker: Ticker): Arbitrary[Clock] = - Arbitrary(Arbitrary.arbitrary[ZEnvironment[ZEnv]].map(_.get[Clock])) + Arbitrary(Gen.const(testClock)) implicit val cogenForClock: Cogen[Clock] = Cogen(_.hashCode.toLong) diff --git a/zio-interop-cats/js/src/main/scala/zio/interop/ZioAsync.scala b/zio-interop-cats/js/src/main/scala/zio/interop/ZioAsync.scala index 1e288ad8..2f36a528 100644 --- a/zio-interop-cats/js/src/main/scala/zio/interop/ZioAsync.scala +++ b/zio-interop-cats/js/src/main/scala/zio/interop/ZioAsync.scala @@ -1,11 +1,11 @@ package zio.interop import cats.effect.kernel.{ Async, Cont, Sync, Unique } -import zio.{ Clock, Promise, RIO, ZIO } +import zio.{ Promise, RIO, ZIO } import scala.concurrent.{ ExecutionContext, Future } -private class ZioAsync[R <: Clock] extends ZioTemporal[R, Throwable] with Async[RIO[R, _]] { +private class ZioAsync[R] extends ZioTemporal[R, Throwable] with Async[RIO[R, _]] { override final def evalOn[A](fa: F[A], ec: ExecutionContext): F[A] = fa.onExecutionContext(ec) diff --git a/zio-interop-cats/jvm/src/main/scala/zio/interop/ZioAsync.scala b/zio-interop-cats/jvm/src/main/scala/zio/interop/ZioAsync.scala index 73a0a974..e21fd78a 100644 --- a/zio-interop-cats/jvm/src/main/scala/zio/interop/ZioAsync.scala +++ b/zio-interop-cats/jvm/src/main/scala/zio/interop/ZioAsync.scala @@ -1,11 +1,11 @@ package zio.interop import cats.effect.kernel.{ Async, Cont, Sync, Unique } -import zio.{ Clock, Promise, RIO, ZIO } +import zio.{ Promise, RIO, ZIO } import scala.concurrent.{ ExecutionContext, Future } -private class ZioAsync[R <: Clock] extends ZioTemporal[R, Throwable] with Async[RIO[R, _]] { +private class ZioAsync[R] extends ZioTemporal[R, Throwable] with Async[RIO[R, _]] { override final def evalOn[A](fa: F[A], ec: ExecutionContext): F[A] = fa.onExecutionContext(ec) diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/CHub.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/CHub.scala deleted file mode 100644 index 88078702..00000000 --- a/zio-interop-cats/shared/src/main/scala/zio/interop/CHub.scala +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Copyright 2021 John A. De Goes and the ZIO Contributors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package zio.interop - -import cats.effect.kernel.{ Async, Resource } -import cats.effect.std.Dispatcher -import zio.interop.catz.scopedSyntax -import zio.{ Runtime, ZDequeue, ZEnqueue, ZHub, ZQueue, ZTraceElement } - -/** - * A `CHub[F, A, B]` is an asynchronous message hub. Publishers can publish - * messages of type `A` to the hub and subscribers can subscribe to take - * messages of type `B` from the hub within the context of the effect `F`. - */ -sealed abstract class CHub[F[+_], -A, +B] extends Serializable { - - /** - * Waits for the hub to be shut down. - */ - def awaitShutdown(implicit trace: ZTraceElement): F[Unit] - - /** - * The maximum capacity of the hub. - */ - def capacity: Int - - /** - * Checks whether the hub is shut down. - */ - def isShutdown(implicit trace: ZTraceElement): F[Boolean] - - /** - * Publishes a message to the hub, returning whether the message was - * published to the hub. - */ - def publish(a: A)(implicit trace: ZTraceElement): F[Boolean] - - /** - * Publishes all of the specified messages to the hub, returning whether - * they were published to the hub. - */ - def publishAll(as: Iterable[A])(implicit trace: ZTraceElement): F[Boolean] - - /** - * Shuts down the hub. - */ - def shutdown(implicit trace: ZTraceElement): F[Unit] - - /** - * The current number of messages in the hub. - */ - def size(implicit trace: ZTraceElement): F[Int] - - /** - * Subscribes to receive messages from the hub. The resulting subscription - * can be evaluated multiple times within the scope of the resource to take a - * message from the hub each time. - */ - def subscribe(implicit trace: ZTraceElement): Resource[F, Dequeue[F, B]] - - /** - * Transforms messages published to the hub using the specified function. - */ - def contramap[C](f: C => A): CHub[F, C, B] - - /** - * Transforms messages published to the hub using the specified effectual - * function. - */ - def contramapM[C](f: C => F[A]): CHub[F, C, B] - - /** - * Transforms messages published to and taken from the hub using the - * specified functions. - */ - def dimap[C, D](f: C => A, g: B => D): CHub[F, C, D] - - /** - * Transforms messages published to and taken from the hub using the - * specified effectual functions. - */ - def dimapM[C, D]( - f: C => F[A], - g: B => F[D] - ): CHub[F, C, D] - - /** - * Filters messages published to the hub using the specified function. - */ - def filterInput[A1 <: A](f: A1 => Boolean): CHub[F, A1, B] - - /** - * Filters messages published to the hub using the specified effectual - * function. - */ - def filterInputM[A1 <: A]( - f: A1 => F[Boolean] - ): CHub[F, A1, B] - - /** - * Filters messages taken from the hub using the specified function. - */ - def filterOutput(f: B => Boolean): CHub[F, A, B] - - /** - * Filters messages taken from the hub using the specified effectual - * function. - */ - def filterOutputM( - f: B => F[Boolean] - ): CHub[F, A, B] - - /** - * Transforms messages taken from the hub using the specified function. - */ - def map[C](f: B => C): CHub[F, A, C] - - /** - * Transforms messages taken from the hub using the specified effectual - * function. - */ - def mapM[C](f: B => F[C]): CHub[F, A, C] - - /** - * Views the hub as a queue that can only be written to. - */ - def toQueue: Enqueue[F, A] -} - -object CHub { - - /** - * Creates a bounded hub with the back pressure strategy. The hub will retain - * messages until they have been taken by all subscribers, applying back - * pressure to publishers if the hub is at capacity. - * - * For best performance use capacities that are powers of two. - */ - def bounded[F[+_]: Async: Dispatcher, A]( - requestedCapacity: Int - )(implicit runtime: Runtime[Any], trace: ZTraceElement): F[Hub[F, A]] = - ZHub.bounded[A](requestedCapacity).map(CHub(_)).toEffect[F] - - /** - * Creates a bounded hub with the dropping strategy. The hub will drop new - * messages if the hub is at capacity. - * - * For best performance use capacities that are powers of two. - */ - def dropping[F[+_]: Async: Dispatcher, A]( - requestedCapacity: Int - )(implicit runtime: Runtime[Any], trace: ZTraceElement): F[Hub[F, A]] = - ZHub.dropping[A](requestedCapacity).map(CHub(_)).toEffect[F] - - /** - * Creates a bounded hub with the sliding strategy. The hub will add new - * messages and drop old messages if the hub is at capacity. - * - * For best performance use capacities that are powers of two. - */ - def sliding[F[+_]: Async: Dispatcher, A]( - requestedCapacity: Int - )(implicit runtime: Runtime[Any], trace: ZTraceElement): F[Hub[F, A]] = - ZHub.sliding[A](requestedCapacity).map(CHub(_)).toEffect[F] - - /** - * Creates an unbounded hub. - */ - def unbounded[F[+_]: Async: Dispatcher, A](implicit runtime: Runtime[Any], trace: ZTraceElement): F[Hub[F, A]] = - ZHub.unbounded[A].map(CHub(_)).toEffect[F] - - private def apply[F[+_]: Async: Dispatcher, A, B]( - hub: ZHub[Any, Any, Throwable, Throwable, A, B] - )(implicit runtime: Runtime[Any]): CHub[F, A, B] = - new CHub[F, A, B] { self => - def awaitShutdown(implicit trace: ZTraceElement): F[Unit] = - hub.awaitShutdown.toEffect[F] - def capacity: Int = - hub.capacity - def isShutdown(implicit trace: ZTraceElement): F[Boolean] = - hub.isShutdown.toEffect[F] - def publish(a: A)(implicit trace: ZTraceElement): F[Boolean] = - hub.publish(a).toEffect[F] - def publishAll(as: Iterable[A])(implicit trace: ZTraceElement): F[Boolean] = - hub.publishAll(as).toEffect[F] - def shutdown(implicit trace: ZTraceElement): F[Unit] = - hub.shutdown.toEffect[F] - def size(implicit trace: ZTraceElement): F[Int] = - hub.size.toEffect[F] - def subscribe(implicit trace: ZTraceElement): Resource[F, Dequeue[F, B]] = - Resource.scoped[F, Any, Dequeue[F, B]](hub.subscribe.map(Dequeue[F, B](_))) - def contramap[C](f: C => A): CHub[F, C, B] = - CHub(hub.contramap(f)) - def contramapM[C](f: C => F[A]): CHub[F, C, B] = - CHub(hub.contramapZIO(c => fromEffect(f(c)))) - def dimap[C, D](f: C => A, g: B => D): CHub[F, C, D] = - CHub(hub.dimap(f, g)) - def dimapM[C, D](f: C => F[A], g: B => F[D]): CHub[F, C, D] = - CHub(hub.dimapZIO(c => fromEffect(f(c)), b => fromEffect(g(b)))) - def filterInput[A1 <: A](f: A1 => Boolean): CHub[F, A1, B] = - CHub(hub.filterInput(f)) - def filterInputM[A1 <: A](f: A1 => F[Boolean]): CHub[F, A1, B] = - CHub(hub.filterInputZIO(a => fromEffect(f(a)))) - def filterOutput(f: B => Boolean): CHub[F, A, B] = - CHub(hub.filterOutput(f)) - def filterOutputM(f: B => F[Boolean]): CHub[F, A, B] = - CHub(hub.filterOutputZIO(a => fromEffect(f(a)))) - def map[C](f: B => C): CHub[F, A, C] = - CHub(hub.map(f)) - def mapM[C](f: B => F[C]): CHub[F, A, C] = - CHub(hub.mapZIO(a => fromEffect(f(a)))) - def toQueue: Enqueue[F, A] = - Enqueue(hub.toQueue) - } - - private def Dequeue[F[+_]: Async: Dispatcher, A]( - dequeue: ZDequeue[Any, Throwable, A] - )(implicit runtime: Runtime[Any]): Dequeue[F, A] = - CQueue[F, Nothing, A](dequeue.asInstanceOf[ZQueue[Any, Any, Throwable, Throwable, Nothing, A]]) - - private def Enqueue[F[+_]: Async: Dispatcher, A]( - enqueue: ZEnqueue[Any, Throwable, A] - )(implicit runtime: Runtime[Any]): Enqueue[F, A] = - CQueue[F, A, Nothing](enqueue.asInstanceOf[ZQueue[Any, Any, Throwable, Throwable, A, Nothing]]) -} diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/CQueue.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/CQueue.scala deleted file mode 100644 index 2847cb6d..00000000 --- a/zio-interop-cats/shared/src/main/scala/zio/interop/CQueue.scala +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Copyright 2017-2019 John A. De Goes and the ZIO Contributors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package zio.interop - -import cats.effect.kernel.Async -import cats.effect.std.Dispatcher -import zio.{ Runtime, ZQueue, ZTraceElement } - -/** - * @see [[zio.ZQueue]] - */ -sealed abstract class CQueue[F[+_], -A, +B](private val underlying: ZQueue[Any, Any, Throwable, Throwable, A, B]) { - - /** - * @see [[ZQueue.awaitShutdown]] - */ - def awaitShutdown(implicit trace: ZTraceElement): F[Unit] - - /** - * @see [[ZQueue.capacity]] - */ - def capacity: Int - - /** - * @see [[ZQueue.isShutdown]] - */ - def isShutdown(implicit trace: ZTraceElement): F[Boolean] - - /** - * @see [[ZQueue.offer]] - */ - def offer(a: A)(implicit trace: ZTraceElement): F[Boolean] - - /** - * @see [[ZQueue.offerAll]] - */ - def offerAll(as: Iterable[A])(implicit trace: ZTraceElement): F[Boolean] - - /** - * @see [[ZQueue.shutdown]] - */ - def shutdown(implicit trace: ZTraceElement): F[Unit] - - /** - * @see [[ZQueue.size]] - */ - def size(implicit trace: ZTraceElement): F[Int] - - /** - * @see [[ZQueue.take]] - */ - def take(implicit trace: ZTraceElement): F[B] - - /** - * @see [[ZQueue.takeAll]] - */ - def takeAll(implicit trace: ZTraceElement): F[List[B]] - - /** - * @see [[ZQueue.takeUpTo]] - */ - def takeUpTo(max: Int)(implicit trace: ZTraceElement): F[List[B]] - - /** - * @see [[ZQueue.contramap]] - */ - def contramap[C](f: C => A): CQueue[F, C, B] - - /** - * @see [[ZQueue.contramapZIO]] - */ - def contramapM[C](f: C => F[A])(implicit trace: ZTraceElement): CQueue[F, C, B] - - /** - * @see [[ZQueue.filterInput]] - */ - def filterInput[A0 <: A](f: A0 => Boolean): CQueue[F, A0, B] - - /** - * @see [[ZQueue.filterInputZIO]] - */ - def filterInputM[A0 <: A](f: A0 => F[Boolean])(implicit trace: ZTraceElement): CQueue[F, A0, B] - - /** - * @see [[ZQueue.map]] - */ - def map[C](f: B => C): CQueue[F, A, C] - - /** - * @see [[ZQueue.mapZIO]] - */ - def mapM[C](f: B => F[C])(implicit trace: ZTraceElement): CQueue[F, A, C] - - /** - * @see [[ZQueue.poll]] - */ - def poll(implicit trace: ZTraceElement): F[Option[B]] -} - -object CQueue { - - /** - * @see ZioQueue.bounded - */ - final def bounded[F[+_]: Async: Dispatcher, A]( - capacity: Int - )(implicit R: Runtime[Any], trace: ZTraceElement): F[Queue[F, A]] = - ZQueue.bounded[A](capacity).map(CQueue(_)).toEffect[F] - - /** - * @see ZioQueue.dropping - */ - final def dropping[F[+_]: Async: Dispatcher, A]( - capacity: Int - )(implicit R: Runtime[Any], trace: ZTraceElement): F[Queue[F, A]] = - ZQueue.dropping[A](capacity).map(CQueue(_)).toEffect[F] - - /** - * @see ZioQueue.sliding - */ - final def sliding[F[+_]: Async: Dispatcher, A]( - capacity: Int - )(implicit R: Runtime[Any], trace: ZTraceElement): F[Queue[F, A]] = - ZQueue.sliding[A](capacity).map(CQueue(_)).toEffect[F] - - /** - * @see ZioQueue.unbounded - */ - final def unbounded[F[+_]: Async: Dispatcher, A](implicit R: Runtime[Any], trace: ZTraceElement): F[Queue[F, A]] = - ZQueue.unbounded[A].map(CQueue(_)).toEffect[F] - - private[interop] final def apply[F[+_]: Async: Dispatcher, A, B]( - underlying: ZQueue[Any, Any, Throwable, Throwable, A, B] - )(implicit runtime: Runtime[Any]): CQueue[F, A, B] = - new CQueue[F, A, B](underlying) { - def awaitShutdown(implicit trace: ZTraceElement): F[Unit] = - underlying.awaitShutdown.toEffect[F] - def capacity: Int = - underlying.capacity - def isShutdown(implicit trace: ZTraceElement): F[Boolean] = - underlying.isShutdown.toEffect[F] - def offer(a: A)(implicit trace: ZTraceElement): F[Boolean] = - underlying.offer(a).toEffect[F] - def offerAll(as: Iterable[A])(implicit trace: ZTraceElement): F[Boolean] = - underlying.offerAll(as).toEffect[F] - def shutdown(implicit trace: ZTraceElement): F[Unit] = - underlying.shutdown.toEffect[F] - def size(implicit trace: ZTraceElement): F[Int] = - underlying.size.toEffect[F] - def take(implicit trace: ZTraceElement): F[B] = - underlying.take.toEffect[F] - def takeAll(implicit trace: ZTraceElement): F[List[B]] = - underlying.takeAll.map(_.toList).toEffect[F] - def takeUpTo(max: Int)(implicit trace: ZTraceElement): F[List[B]] = - underlying.takeUpTo(max).map(_.toList).toEffect[F] - def contramap[C](f: C => A): CQueue[F, C, B] = - CQueue(underlying.contramap(f)) - def contramapM[C](f: C => F[A])(implicit trace: ZTraceElement): CQueue[F, C, B] = - CQueue(underlying.contramapZIO(c => fromEffect(f(c)))) - def filterInput[A0 <: A](f: A0 => Boolean): CQueue[F, A0, B] = - CQueue(underlying.filterInput(f)) - def filterInputM[A0 <: A](f: A0 => F[Boolean])(implicit trace: ZTraceElement): CQueue[F, A0, B] = - CQueue(underlying.filterInputZIO((a0: A0) => fromEffect(f(a0)))) - def map[C](f: B => C): CQueue[F, A, C] = - CQueue(underlying.map(f)) - def mapM[C](f: B => F[C])(implicit trace: ZTraceElement): CQueue[F, A, C] = - CQueue(underlying.mapZIO(b => fromEffect(f(b)))) - def poll(implicit trace: ZTraceElement): F[Option[B]] = - underlying.poll.toEffect[F] - } -} diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/Dequeue.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/Dequeue.scala new file mode 100644 index 00000000..83d09492 --- /dev/null +++ b/zio-interop-cats/shared/src/main/scala/zio/interop/Dequeue.scala @@ -0,0 +1,126 @@ +/* + * Copyright 2017-2022 John A. De Goes and the ZIO Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zio.interop + +import cats.effect.kernel.Async +import zio.{ Dequeue as ZDequeue, Runtime, ZTraceElement } + +/** + * A queue that can only be dequeued. + */ +trait Dequeue[F[+_], +A] extends Serializable { + + /** + * @see [[Dequeue.awaitShutdown]] + */ + def awaitShutdown(implicit trace: ZTraceElement): F[Unit] + + /** + * @see [[Dequeue.capacity]] + */ + def capacity: Int + + /** + * @see [[Dequeue.isEmpty]] + */ + def isEmpty(implicit trace: ZTraceElement): F[Boolean] + + /** + * @see [[Dequeue.isFull]] + */ + def isFull(implicit trace: ZTraceElement): F[Boolean] + + /** + * @see [[Dequeue.isShutdown]] + */ + def isShutdown(implicit trace: ZTraceElement): F[Boolean] + + /** + * @see [[Dequeue.poll]] + */ + def poll(implicit trace: ZTraceElement): F[Option[A]] + + /** + * @see [[Dequeue.shutdown]] + */ + def shutdown(implicit trace: ZTraceElement): F[Unit] + + /** + * @see [[Dequeue.size]] + */ + def size(implicit trace: ZTraceElement): F[Int] + + /** + * @see [[Dequeue.take]] + */ + def take(implicit trace: ZTraceElement): F[A] + + /** + * @see [[Dequeue.takeAll]] + */ + def takeAll(implicit trace: ZTraceElement): F[List[A]] + + /** + * @see [[Dequeue.takeBetween]] + */ + def takeBetween(min: Int, max: Int)(implicit trace: ZTraceElement): F[List[A]] + + /** + * @see [[Dequeue.takeN]] + */ + def takeN(n: Int)(implicit trace: ZTraceElement): F[List[A]] + + /** + * @see [[Dequeue.takeUpTo]] + */ + def takeUpTo(max: Int)(implicit trace: ZTraceElement): F[List[A]] +} + +object Dequeue { + + private[interop] final def apply[F[+_]: Async, A]( + underlying: ZDequeue[A] + )(implicit runtime: Runtime[Any]): Dequeue[F, A] = + new Dequeue[F, A] { + def awaitShutdown(implicit trace: ZTraceElement): F[Unit] = + underlying.awaitShutdown.toEffect[F] + def capacity: Int = + underlying.capacity + def isEmpty(implicit trace: ZTraceElement): F[Boolean] = + underlying.isEmpty.toEffect[F] + def isFull(implicit trace: ZTraceElement): F[Boolean] = + underlying.isFull.toEffect[F] + def isShutdown(implicit trace: ZTraceElement): F[Boolean] = + underlying.isShutdown.toEffect[F] + def poll(implicit trace: ZTraceElement): F[Option[A]] = + underlying.poll.toEffect[F] + def shutdown(implicit trace: ZTraceElement): F[Unit] = + underlying.shutdown.toEffect[F] + def size(implicit trace: ZTraceElement): F[Int] = + underlying.size.toEffect[F] + def take(implicit trace: ZTraceElement): F[A] = + underlying.take.toEffect[F] + def takeAll(implicit trace: ZTraceElement): F[List[A]] = + underlying.takeAll.map(_.toList).toEffect[F] + def takeBetween(min: Int, max: Int)(implicit trace: ZTraceElement): F[List[A]] = + underlying.takeBetween(min, max).map(_.toList).toEffect[F] + def takeN(n: Int)(implicit trace: ZTraceElement): F[List[A]] = + underlying.takeN(n).map(_.toList).toEffect[F] + def takeUpTo(max: Int)(implicit trace: ZTraceElement): F[List[A]] = + underlying.takeUpTo(max).map(_.toList).toEffect[F] + } +} diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/Enqueue.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/Enqueue.scala new file mode 100644 index 00000000..d2d4e94c --- /dev/null +++ b/zio-interop-cats/shared/src/main/scala/zio/interop/Enqueue.scala @@ -0,0 +1,98 @@ +/* + * Copyright 2017-2022 John A. De Goes and the ZIO Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zio.interop + +import cats.effect.kernel.Async +import zio.{ Enqueue as ZEnqueue, Runtime, ZTraceElement } + +/** + * A queue that can only be enqueued. + */ +trait Enqueue[F[+_], -A] extends Serializable { + + /** + * @see [[Enqueue.awaitShutdown]] + */ + def awaitShutdown(implicit trace: ZTraceElement): F[Unit] + + /** + * @see [[Enqueue.capacity]] + */ + def capacity: Int + + /** + * @see [[Enqueue.isEmpty]] + */ + def isEmpty(implicit trace: ZTraceElement): F[Boolean] + + /** + * @see [[Enqueue.isFull]] + */ + def isFull(implicit trace: ZTraceElement): F[Boolean] + + /** + * @see [[Enqueue.isShutdown]] + */ + def isShutdown(implicit trace: ZTraceElement): F[Boolean] + + /** + * @see [[Enqueue.offer]] + */ + def offer(a: A)(implicit trace: ZTraceElement): F[Boolean] + + /** + * @see [[Enqueue.offerAll]] + */ + def offerAll(as: Iterable[A])(implicit trace: ZTraceElement): F[Boolean] + + /** + * @see [[Enqueue.shutdown]] + */ + def shutdown(implicit trace: ZTraceElement): F[Unit] + + /** + * @see [[Enqueue.size]] + */ + def size(implicit trace: ZTraceElement): F[Int] +} + +object Enqueue { + + private[interop] final def apply[F[+_]: Async, A]( + underlying: ZEnqueue[A] + )(implicit runtime: Runtime[Any]): Enqueue[F, A] = + new Enqueue[F, A] { + def awaitShutdown(implicit trace: ZTraceElement): F[Unit] = + underlying.awaitShutdown.toEffect[F] + def capacity: Int = + underlying.capacity + def isEmpty(implicit trace: ZTraceElement): F[Boolean] = + underlying.isEmpty.toEffect[F] + def isFull(implicit trace: ZTraceElement): F[Boolean] = + underlying.isFull.toEffect[F] + def isShutdown(implicit trace: ZTraceElement): F[Boolean] = + underlying.isShutdown.toEffect[F] + def offer(a: A)(implicit trace: ZTraceElement): F[Boolean] = + underlying.offer(a).toEffect[F] + def offerAll(as: Iterable[A])(implicit trace: ZTraceElement): F[Boolean] = + underlying.offerAll(as).toEffect[F] + def shutdown(implicit trace: ZTraceElement): F[Unit] = + underlying.shutdown.toEffect[F] + def size(implicit trace: ZTraceElement): F[Int] = + underlying.size.toEffect[F] + } +} diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/Hub.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/Hub.scala new file mode 100644 index 00000000..a65dbe11 --- /dev/null +++ b/zio-interop-cats/shared/src/main/scala/zio/interop/Hub.scala @@ -0,0 +1,126 @@ +/* + * Copyright 2021 John A. De Goes and the ZIO Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zio.interop + +import cats.effect.kernel.{ Async, Resource } +import zio.interop.catz.scopedSyntax +import zio.{ Hub as ZHub, Runtime, ZTraceElement } + +/** + * A `Hub[F, A]` is an asynchronous message hub. Publishers can publish + * messages to the hub and subscribers can subscribe to take messages of from + * the hub within the context of the effect `F`. + */ +abstract class Hub[F[+_], A] extends Enqueue[F, A] with Serializable { + + /** + * Checks whether the hub is shut down. + */ + def isShutdown(implicit trace: ZTraceElement): F[Boolean] + + /** + * Publishes a message to the hub, returning whether the message was + * published to the hub. + */ + def publish(a: A)(implicit trace: ZTraceElement): F[Boolean] + + /** + * Publishes all of the specified messages to the hub, returning whether + * they were published to the hub. + */ + def publishAll(as: Iterable[A])(implicit trace: ZTraceElement): F[Boolean] + + /** + * Subscribes to receive messages from the hub. The resulting subscription + * can be evaluated multiple times within the scope of the resource to take a + * message from the hub each time. + */ + def subscribe(implicit trace: ZTraceElement): Resource[F, Dequeue[F, A]] +} + +object Hub { + + /** + * Creates a bounded hub with the back pressure strategy. The hub will retain + * messages until they have been taken by all subscribers, applying back + * pressure to publishers if the hub is at capacity. + * + * For best performance use capacities that are powers of two. + */ + def bounded[F[+_]: Async, A]( + requestedCapacity: Int + )(implicit runtime: Runtime[Any], trace: ZTraceElement): F[Hub[F, A]] = + ZHub.bounded[A](requestedCapacity).map(Hub(_)).toEffect[F] + + /** + * Creates a bounded hub with the dropping strategy. The hub will drop new + * messages if the hub is at capacity. + * + * For best performance use capacities that are powers of two. + */ + def dropping[F[+_]: Async, A]( + requestedCapacity: Int + )(implicit runtime: Runtime[Any], trace: ZTraceElement): F[Hub[F, A]] = + ZHub.dropping[A](requestedCapacity).map(Hub(_)).toEffect[F] + + /** + * Creates a bounded hub with the sliding strategy. The hub will add new + * messages and drop old messages if the hub is at capacity. + * + * For best performance use capacities that are powers of two. + */ + def sliding[F[+_]: Async, A]( + requestedCapacity: Int + )(implicit runtime: Runtime[Any], trace: ZTraceElement): F[Hub[F, A]] = + ZHub.sliding[A](requestedCapacity).map(Hub(_)).toEffect[F] + + /** + * Creates an unbounded hub. + */ + def unbounded[F[+_]: Async, A](implicit runtime: Runtime[Any], trace: ZTraceElement): F[Hub[F, A]] = + ZHub.unbounded[A].map(Hub(_)).toEffect[F] + + private def apply[F[+_]: Async, A]( + hub: ZHub[A] + )(implicit runtime: Runtime[Any]): Hub[F, A] = + new Hub[F, A] { self => + def awaitShutdown(implicit trace: ZTraceElement): F[Unit] = + hub.awaitShutdown.toEffect[F] + def capacity: Int = + hub.capacity + def isEmpty(implicit trace: ZTraceElement): F[Boolean] = + hub.isEmpty.toEffect[F] + def isFull(implicit trace: ZTraceElement): F[Boolean] = + hub.isFull.toEffect[F] + def isShutdown(implicit trace: ZTraceElement): F[Boolean] = + hub.isShutdown.toEffect[F] + def publish(a: A)(implicit trace: ZTraceElement): F[Boolean] = + hub.publish(a).toEffect[F] + def publishAll(as: Iterable[A])(implicit trace: ZTraceElement): F[Boolean] = + hub.publishAll(as).toEffect[F] + def shutdown(implicit trace: ZTraceElement): F[Unit] = + hub.shutdown.toEffect[F] + def size(implicit trace: ZTraceElement): F[Int] = + hub.size.toEffect[F] + def subscribe(implicit trace: ZTraceElement): Resource[F, Dequeue[F, A]] = + Resource.scoped[F, Any, Dequeue[F, A]](hub.subscribe.map(Dequeue[F, A](_))) + def offer(a: A)(implicit trace: ZTraceElement): F[Boolean] = + publish(a) + def offerAll(as: Iterable[A])(implicit trace: ZTraceElement): F[Boolean] = + publishAll(as) + } +} diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/Queue.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/Queue.scala new file mode 100644 index 00000000..d6947fbc --- /dev/null +++ b/zio-interop-cats/shared/src/main/scala/zio/interop/Queue.scala @@ -0,0 +1,94 @@ +/* + * Copyright 2017-2019 John A. De Goes and the ZIO Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zio.interop + +import cats.effect.kernel.Async +import zio.{ Queue as ZQueue, Runtime, ZTraceElement } + +/** + * @see [[zio.Queue]] + */ +abstract class Queue[F[+_], A] extends Dequeue[F, A] with Enqueue[F, A] + +object Queue { + + /** + * @see ZioQueue.bounded + */ + final def bounded[F[+_]: Async, A]( + capacity: Int + )(implicit R: Runtime[Any], trace: ZTraceElement): F[Queue[F, A]] = + ZQueue.bounded[A](capacity).map(Queue(_)).toEffect[F] + + /** + * @see ZioQueue.dropping + */ + final def dropping[F[+_]: Async, A]( + capacity: Int + )(implicit R: Runtime[Any], trace: ZTraceElement): F[Queue[F, A]] = + ZQueue.dropping[A](capacity).map(Queue(_)).toEffect[F] + + /** + * @see ZioQueue.sliding + */ + final def sliding[F[+_]: Async, A]( + capacity: Int + )(implicit R: Runtime[Any], trace: ZTraceElement): F[Queue[F, A]] = + ZQueue.sliding[A](capacity).map(Queue(_)).toEffect[F] + + /** + * @see ZioQueue.unbounded + */ + final def unbounded[F[+_]: Async, A](implicit R: Runtime[Any], trace: ZTraceElement): F[Queue[F, A]] = + ZQueue.unbounded[A].map(Queue(_)).toEffect[F] + + private[interop] final def apply[F[+_]: Async, A]( + underlying: ZQueue[A] + )(implicit runtime: Runtime[Any]): Queue[F, A] = + new Queue[F, A] { + def awaitShutdown(implicit trace: ZTraceElement): F[Unit] = + underlying.awaitShutdown.toEffect[F] + def capacity: Int = + underlying.capacity + def isEmpty(implicit trace: zio.ZTraceElement): F[Boolean] = + underlying.isEmpty.toEffect[F] + def isFull(implicit trace: zio.ZTraceElement): F[Boolean] = + underlying.isFull.toEffect[F] + def isShutdown(implicit trace: ZTraceElement): F[Boolean] = + underlying.isShutdown.toEffect[F] + def offer(a: A)(implicit trace: ZTraceElement): F[Boolean] = + underlying.offer(a).toEffect[F] + def offerAll(as: Iterable[A])(implicit trace: ZTraceElement): F[Boolean] = + underlying.offerAll(as).toEffect[F] + def shutdown(implicit trace: ZTraceElement): F[Unit] = + underlying.shutdown.toEffect[F] + def size(implicit trace: ZTraceElement): F[Int] = + underlying.size.toEffect[F] + def take(implicit trace: ZTraceElement): F[A] = + underlying.take.toEffect[F] + def takeAll(implicit trace: ZTraceElement): F[List[A]] = + underlying.takeAll.map(_.toList).toEffect[F] + def takeBetween(min: Int, max: Int)(implicit trace: zio.ZTraceElement): F[List[A]] = + underlying.takeBetween(min, max).map(_.toList).toEffect[F] + def takeN(n: Int)(implicit trace: zio.ZTraceElement): F[List[A]] = + underlying.takeN(n).map(_.toList).toEffect[F] + def takeUpTo(max: Int)(implicit trace: ZTraceElement): F[List[A]] = + underlying.takeUpTo(max).map(_.toList).toEffect[F] + def poll(implicit trace: ZTraceElement): F[Option[A]] = + underlying.poll.toEffect[F] + } +} diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/cats.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/cats.scala index 04bc9c49..a8eeb7af 100644 --- a/zio-interop-cats/shared/src/main/scala/zio/interop/cats.scala +++ b/zio-interop-cats/shared/src/main/scala/zio/interop/cats.scala @@ -23,7 +23,7 @@ import cats.effect.{ IO as CIO, LiftIO } import cats.kernel.{ CommutativeMonoid, CommutativeSemigroup } import cats.effect import cats.* -import zio.{ Clock, Fiber, ZEnvironment } +import zio.{ Fiber, Ref as ZRef, ZEnvironment } import zio.* import zio.Clock.{ currentTime, nanoTime } import zio.Duration @@ -44,14 +44,14 @@ object catz extends CatsEffectPlatform { * summon Cats Effect typeclasses without the ceremony of * * {{{ - * ZIO.runtime[Clock with Blocking].flatMap { implicit runtime => + * ZIO.runtime[Any].flatMap { implicit runtime => * implicit val asyncTask: Async[Task] = implicitly * ... * } * }}} */ object implicits { - implicit val rts: Runtime[Clock] = Runtime.default + implicit val rts: Runtime[Any] = Runtime.default } } @@ -64,7 +64,7 @@ abstract class CatsEffectPlatform with CatsZManagedSyntax { trait CatsApp extends ZIOAppDefault { - implicit override val runtime: Runtime[ZEnv] = super.runtime + implicit override val runtime: Runtime[Any] = super.runtime } val console: interop.console.cats.type = @@ -82,25 +82,25 @@ abstract class CatsEffectInstances extends CatsZioInstances { implicit final def liftIOInstance[R](implicit runtime: IORuntime): LiftIO[RIO[R, _]] = new ZioLiftIO - implicit final def asyncInstance[R <: Clock]: Async[RIO[R, _]] = + implicit final def asyncInstance[R]: Async[RIO[R, _]] = asyncInstance0.asInstanceOf[Async[RIO[R, _]]] - implicit final def temporalInstance[R <: Clock, E]: GenTemporal[ZIO[R, E, _], E] = + implicit final def temporalInstance[R, E]: GenTemporal[ZIO[R, E, _], E] = temporalInstance0.asInstanceOf[GenTemporal[ZIO[R, E, _], E]] implicit final def concurrentInstance[R, E]: GenConcurrent[ZIO[R, E, _], E] = concurrentInstance0.asInstanceOf[GenConcurrent[ZIO[R, E, _], E]] - implicit final def asyncRuntimeInstance[E](implicit runtime: Runtime[Clock]): Async[Task] = + implicit final def asyncRuntimeInstance[E](implicit runtime: Runtime[Any]): Async[Task] = new ZioRuntimeAsync - implicit final def temporalRuntimeInstance[E](implicit runtime: Runtime[Clock]): GenTemporal[IO[E, _], E] = + implicit final def temporalRuntimeInstance[E](implicit runtime: Runtime[Any]): GenTemporal[IO[E, _], E] = new ZioRuntimeTemporal[E] - private[this] val asyncInstance0: Async[RIO[Clock, _]] = + private[this] val asyncInstance0: Async[Task] = new ZioAsync - private[this] val temporalInstance0: Temporal[RIO[Clock, _]] = + private[this] val temporalInstance0: Temporal[Task] = new ZioTemporal private[this] val concurrentInstance0: Concurrent[Task] = @@ -290,7 +290,7 @@ private final class ZioDeferred[R, E, A](promise: Promise[E, A]) extends Deferre } } -private final class ZioRef[R, E, A](ref: ERef[E, A]) extends effect.Ref[ZIO[R, E, _], A] { +private final class ZioRef[R, E, A](ref: ZRef[A]) extends effect.Ref[ZIO[R, E, _], A] { type F[T] = ZIO[R, E, T] override def access: F[(A, A => F[Boolean])] = { @@ -354,7 +354,7 @@ private final class ZioRef[R, E, A](ref: ERef[E, A]) extends effect.Ref[ZIO[R, E ref.get(CoreTracer.newTrace) } -private class ZioTemporal[R <: Clock, E] extends ZioConcurrent[R, E] with GenTemporal[ZIO[R, E, _], E] { +private class ZioTemporal[R, E] extends ZioConcurrent[R, E] with GenTemporal[ZIO[R, E, _], E] { override final def sleep(time: FiniteDuration): F[Unit] = { implicit def trace: ZTraceElement = CoreTracer.newTrace @@ -375,12 +375,12 @@ private class ZioTemporal[R <: Clock, E] extends ZioConcurrent[R, E] with GenTem } } -private class ZioRuntimeTemporal[E](implicit runtime: Runtime[Clock]) +private class ZioRuntimeTemporal[E](implicit runtime: Runtime[Any]) extends ZioConcurrent[Any, E] with GenTemporal[IO[E, _], E] { - private[this] val underlying: GenTemporal[ZIO[Clock, E, _], E] = new ZioTemporal[Clock, E] - private[this] val clock: ZEnvironment[Clock] = runtime.environment + private[this] val underlying: GenTemporal[ZIO[Any, E, _], E] = new ZioTemporal[Any, E] + private[this] val clock: ZEnvironment[Any] = runtime.environment override final def sleep(time: FiniteDuration): F[Unit] = { implicit def trace: ZTraceElement = CoreTracer.newTrace @@ -401,10 +401,10 @@ private class ZioRuntimeTemporal[E](implicit runtime: Runtime[Clock]) } } -private class ZioRuntimeAsync(implicit runtime: Runtime[Clock]) extends ZioRuntimeTemporal[Throwable] with Async[Task] { +private class ZioRuntimeAsync(implicit runtime: Runtime[Any]) extends ZioRuntimeTemporal[Throwable] with Async[Task] { - private[this] val underlying: Async[RIO[Clock, _]] = new ZioAsync[Clock] - private[this] val environment: ZEnvironment[Clock] = runtime.environment + private[this] val underlying: Async[RIO[Any, _]] = new ZioAsync[Any] + private[this] val environment: ZEnvironment[Any] = runtime.environment override final def evalOn[A](fa: F[A], ec: ExecutionContext): F[A] = { implicit def trace: ZTraceElement = CoreTracer.newTrace diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/catszmanaged.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/catszmanaged.scala index 40cd2dfd..d7cfae8a 100644 --- a/zio-interop-cats/shared/src/main/scala/zio/interop/catszmanaged.scala +++ b/zio-interop-cats/shared/src/main/scala/zio/interop/catszmanaged.scala @@ -96,19 +96,10 @@ final class ZManagedSyntax[R, E, A](private val managed: ZManaged[R, E, A]) exte import zio.interop.catz.scopedSyntax def toResourceZIO(implicit trace: ZTraceElement): Resource[ZIO[R, E, _], A] = - Resource.scopedZIO[R, E, A](scoped(managed)) + Resource.scopedZIO[R, E, A](managed.scoped) def toResource[F[_]: Async](implicit R: Runtime[R], ev: E <:< Throwable, trace: ZTraceElement): Resource[F, A] = - Resource.scoped[F, R, A](scoped(managed).mapError(ev)) - - private def scoped[R, E, A](managed: ZManaged[R, E, A])(implicit trace: ZTraceElement): ZIO[R with Scope, E, A] = - for { - scope <- ZIO.scope - releaseMap <- ZManaged.ReleaseMap.make - _ <- scope.addFinalizerExit(releaseMap.releaseAll(_, ExecutionStrategy.Sequential)) - tuple <- ZManaged.currentReleaseMap.locally(releaseMap)(managed.zio) - (_, a) = tuple - } yield a + Resource.scoped[F, R, A](managed.scoped.mapError(ev)) } final class ScopedSyntax(private val self: Resource.type) extends AnyVal { diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala index 24868702..357434d2 100644 --- a/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala +++ b/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala @@ -24,18 +24,6 @@ import scala.concurrent.Future package object interop { - type Queue[F[+_], A] = CQueue[F, A, A] - val Queue: CQueue.type = CQueue - - /** A queue that can only be dequeued. */ - type Dequeue[F[+_], +A] = CQueue[F, Nothing, A] - - /** A queue that can only be enqueued. */ - type Enqueue[F[+_], -A] = CQueue[F, A, Nothing] - - type Hub[F[+_], A] = CHub[F, A, A] - val Hub: CHub.type = CHub - @inline private[interop] def toOutcome[R, E, A]( exit: Exit[E, A] )(implicit trace: ZTraceElement): Outcome[ZIO[R, E, _], E, A] = diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/stm/TQueue.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/stm/TQueue.scala index 588b717d..44fa80d8 100644 --- a/zio-interop-cats/shared/src/main/scala/zio/interop/stm/TQueue.scala +++ b/zio-interop-cats/shared/src/main/scala/zio/interop/stm/TQueue.scala @@ -30,43 +30,43 @@ final class TQueue[F[+_], A] private (underlying: ZTQueue[A]) { new TQueue(underlying) /** - * See [[zio.stm.ZTQueue#offer]] + * See [[zio.stm.TQueue#offer]] */ def offer(a: A): STM[F, Boolean] = new STM(underlying.offer(a)) /** - * See [[zio.stm.ZTQueue#offerAll]] + * See [[zio.stm.TQueue#offerAll]] */ def offerAll(as: List[A]): STM[F, Boolean] = new STM(underlying.offerAll(as)) /** - * See [[zio.stm.ZTQueue#poll]] + * See [[zio.stm.TQueue#poll]] */ def poll: STM[F, Option[A]] = new STM(underlying.poll) /** - * See [[zio.stm.ZTQueue#size]] + * See [[zio.stm.TQueue#size]] */ def size: STM[F, Int] = new STM(underlying.size) /** - * See [[zio.stm.ZTQueue#take]] + * See [[zio.stm.TQueue#take]] */ def take: STM[F, A] = new STM(underlying.take) /** - * See [[zio.stm.ZTQueue#takeAll]] + * See [[zio.stm.TQueue#takeAll]] */ def takeAll: STM[F, List[A]] = new STM(underlying.takeAll.map(_.toList)) /** - * See [[zio.stm.ZTQueue#takeUpTo]] + * See [[zio.stm.TQueue#takeUpTo]] */ def takeUpTo(max: Int): STM[F, List[A]] = new STM(underlying.takeUpTo(max).map(_.toList)) diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/stm/TRef.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/stm/TRef.scala index 05cc9d15..2a4dd3b0 100644 --- a/zio-interop-cats/shared/src/main/scala/zio/interop/stm/TRef.scala +++ b/zio-interop-cats/shared/src/main/scala/zio/interop/stm/TRef.scala @@ -26,7 +26,7 @@ import zio.stm.TRef as ZTRef final class TRef[F[+_], A] private (underlying: ZTRef[A]) { /** - * See [[zio.stm.ZTRef#get]] + * See [[zio.stm.TRef#get]] */ def get: STM[F, A] = new STM(underlying.get) @@ -38,19 +38,19 @@ final class TRef[F[+_], A] private (underlying: ZTRef[A]) { new TRef(underlying) /** - * See [[zio.stm.ZTRef.UnifiedSyntax#modify]] + * See [[zio.stm.TRef#modify]] */ def modify[B](f: A => (B, A)): STM[F, B] = new STM(underlying.modify(f)) /** - * See [[zio.stm.ZTRef.UnifiedSyntax#modifySome]] + * See [[zio.stm.TRef#modifySome]] */ def modifySome[B](default: B)(f: PartialFunction[A, (B, A)]): STM[F, B] = new STM(underlying.modifySome(default)(f)) /** - * See [[zio.stm.ZTRef#set]] + * See [[zio.stm.TRef#set]] */ def set(newValue: A): STM[F, Unit] = new STM(underlying.set(newValue)) @@ -59,13 +59,13 @@ final class TRef[F[+_], A] private (underlying: ZTRef[A]) { underlying.toString /** - * See [[zio.stm.ZTRef.UnifiedSyntax#update]] + * See [[zio.stm.TRef#update]] */ def update(f: A => A): STM[F, A] = new STM(underlying.updateAndGet(f)) /** - * See [[zio.stm.ZTRef.UnifiedSyntax#updateSome]] + * See [[zio.stm.TRef#updateSome]] */ def updateSome(f: PartialFunction[A, A]): STM[F, A] = new STM(underlying.updateSomeAndGet(f)) diff --git a/zio-test-interop-cats/shared/src/test/scala/zio/test/interop/CatsRunnableSpec.scala b/zio-test-interop-cats/shared/src/test/scala/zio/test/interop/CatsRunnableSpec.scala index 9581206e..469b2216 100644 --- a/zio-test-interop-cats/shared/src/test/scala/zio/test/interop/CatsRunnableSpec.scala +++ b/zio-test-interop-cats/shared/src/test/scala/zio/test/interop/CatsRunnableSpec.scala @@ -4,15 +4,15 @@ import cats.effect.std.Dispatcher import cats.effect.unsafe.{ IORuntime, IORuntimeConfig, Scheduler } import cats.effect.IO as CIO import zio.* -import zio.test.{ DefaultRunnableSpec, TestAspect } +import zio.test.{ TestAspect, ZIOSpecDefault } import scala.util.Success -abstract class CatsRunnableSpec extends DefaultRunnableSpec { +abstract class CatsRunnableSpec extends ZIOSpecDefault { private[this] var openDispatcher: Dispatcher[CIO] = _ private[this] var closeDispatcher: CIO[Unit] = _ - implicit val zioRuntime: Runtime[ZEnv] = + implicit val zioRuntime: Runtime[Any] = Runtime.default implicit val cioRuntime: IORuntime = @@ -27,14 +27,17 @@ abstract class CatsRunnableSpec extends DefaultRunnableSpec { openDispatcher.unsafeToFutureCancelable(fa) } - override val aspects = List( - TestAspect.timeout(1.minute), - TestAspect.beforeAll(ZIO.fromFuture { implicit ec => + runtime.unsafeRunToFuture { + ZIO.fromFuture { implicit ec => Dispatcher[CIO].allocated.unsafeToFuture().andThen { case Success((dispatcher, close)) => openDispatcher = dispatcher closeDispatcher = close } - }.orDie), + }.orDie + } + + override val aspects = Chunk( + TestAspect.timeout(1.minute), TestAspect.afterAll(ZIO.fromFuture(_ => closeDispatcher.unsafeToFuture()).orDie) ) }