From f5d480fe9c37338956d420e74b2c5b727e9be1d8 Mon Sep 17 00:00:00 2001 From: Kyri Petrou Date: Thu, 25 Jul 2024 08:32:02 +0300 Subject: [PATCH 1/6] Fix propagation of interruption from CIO to ZIO --- .../scala/zio/interop/CatsInteropSpec.scala | 24 ++++++++++++++++++- .../src/main/scala/zio/interop/package.scala | 20 +++++++++------- 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsInteropSpec.scala b/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsInteropSpec.scala index f83c3c00..26c7e7d6 100644 --- a/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsInteropSpec.scala +++ b/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsInteropSpec.scala @@ -6,6 +6,9 @@ import zio.interop.catz.* import zio.test.* import zio.* +import java.util.concurrent.CompletableFuture +import java.util.concurrent.atomic.AtomicBoolean + object CatsInteropSpec extends CatsRunnableSpec { def spec: Spec[Any, Throwable] = suite("Cats interop")( test("cats fiber wrapped in Resource can be canceled") { @@ -203,6 +206,25 @@ object CatsInteropSpec extends CatsRunnableSpec { !exception.get.getMessage.contains("Boxed Exception") && exception.get.getMessage.contains("The fiber was canceled") ) - } + }, + test("CIO propagates interruption to ZIO") { + ZIO.succeedBlocking { + val latch = new CompletableFuture[Unit]() + val ref = new AtomicBoolean(false) + val zioF: CIO[Nothing] = + (ZIO.yieldNow.as(latch.complete(())) *> ZIO.never) + .onInterrupt(ZIO.succeed(ref.set(true))) + .toEffect[CIO] + + val value = zioF.start + .productL(CIO.fromCompletableFuture(CIO(latch))) + .flatMap { (fib: cats.effect.FiberIO[Nothing]) => + fib.cancel *> CIO(ref.get()) + } + .unsafeRunSync() + + assertTrue(value) + } + } @@ TestAspect.nonFlaky(1000) ) } diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala index 046bfa0d..acab64c8 100644 --- a/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala +++ b/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala @@ -34,21 +34,25 @@ package object interop { @inline def toEffect[F[_], R, A](rio: RIO[R, A])(implicit R: Runtime[R], F: Async[F], trace: Trace): F[A] = F.defer { val interrupted = new AtomicBoolean(true) - F.async[Exit[Throwable, A]] { cb => + F.asyncCheckAttempt[Exit[Throwable, A]] { cb => Unsafe.unsafe { implicit unsafe => - val fiber = R.unsafe.fork { + val fiber = R.unsafe.runOrFork { signalOnNoExternalInterrupt { rio }(ZIO.succeed(interrupted.set(false))) } - fiber.unsafe - .addObserver(exit => cb(Right(exit))) - val cancelerEffect = F.delay { - val _ = fiber.interrupt + val out = fiber match { + case Left(fib) => + fib.unsafe.addObserver(exit => cb(Right(exit))) + Left(Some(F.async_[Unit] { cb => + fib.unsafe.addObserver(_ => cb(Right(()))) + fib.tellInterrupt(Cause.interrupt(fib.id)) + })) + case Right(v) => Right(v) // No need to invoke the callback, sync resumption will take place } - F.pure(Some(cancelerEffect)) - } + F.pure(out) + } }.flatMap { exit => toOutcomeThrowableOtherFiber(interrupted.get())(F.pure(_: A), exit) match { case Outcome.Succeeded(fa) => From ff8b98442949c9d38153fd487d686fafdba26bf8 Mon Sep 17 00:00:00 2001 From: Kyri Petrou Date: Thu, 25 Jul 2024 08:38:10 +0300 Subject: [PATCH 2/6] Wrap method in F.delay --- .../src/main/scala/zio/interop/package.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala index acab64c8..516e732c 100644 --- a/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala +++ b/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala @@ -35,23 +35,23 @@ package object interop { F.defer { val interrupted = new AtomicBoolean(true) F.asyncCheckAttempt[Exit[Throwable, A]] { cb => - Unsafe.unsafe { implicit unsafe => - val fiber = R.unsafe.runOrFork { + F.delay { + implicit val unsafe: Unsafe = Unsafe.unsafe + + val out = R.unsafe.runOrFork { signalOnNoExternalInterrupt { rio }(ZIO.succeed(interrupted.set(false))) } - val out = fiber match { - case Left(fib) => - fib.unsafe.addObserver(exit => cb(Right(exit))) + out match { + case Left(fiber) => + fiber.unsafe.addObserver(exit => cb(Right(exit))) Left(Some(F.async_[Unit] { cb => - fib.unsafe.addObserver(_ => cb(Right(()))) - fib.tellInterrupt(Cause.interrupt(fib.id)) + fiber.unsafe.addObserver(_ => cb(Right(()))) + fiber.tellInterrupt(Cause.interrupt(fiber.id)) })) - case Right(v) => Right(v) // No need to invoke the callback, sync resumption will take place + case Right(v) => Right(v) // No need to invoke the callback, sync resumption will take place } - - F.pure(out) } }.flatMap { exit => toOutcomeThrowableOtherFiber(interrupted.get())(F.pure(_: A), exit) match { From 5a1b6bf1ab08d1caf3e4658e02b8c6dade6d0003 Mon Sep 17 00:00:00 2001 From: Kyri Petrou Date: Thu, 25 Jul 2024 08:45:30 +0300 Subject: [PATCH 3/6] Allow interruption of interruption --- .../src/main/scala/zio/interop/package.scala | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala index 516e732c..f25998a5 100644 --- a/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala +++ b/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala @@ -45,10 +45,17 @@ package object interop { } out match { case Left(fiber) => - fiber.unsafe.addObserver(exit => cb(Right(exit))) - Left(Some(F.async_[Unit] { cb => - fiber.unsafe.addObserver(_ => cb(Right(()))) - fiber.tellInterrupt(Cause.interrupt(fiber.id)) + val completeCb = (exit: Exit[Throwable, A]) => cb(Right(exit)) + fiber.unsafe.addObserver(completeCb) + Left(Some(F.async[Unit] { cb => + F.delay { + val interruptCb = (_: Exit[Throwable, A]) => cb(Right(())) + fiber.unsafe.addObserver(interruptCb) + fiber.unsafe.removeObserver(completeCb) + fiber.tellInterrupt(Cause.interrupt(fiber.id)) + // Allow the interruption to be interrupted + Some(fiber.unsafe.removeObserver(interruptCb)) + } })) case Right(v) => Right(v) // No need to invoke the callback, sync resumption will take place } From 6e57f1389e3a45eba537cd14cb9a1c65da8d38a7 Mon Sep 17 00:00:00 2001 From: Kyri Petrou Date: Thu, 25 Jul 2024 09:03:36 +0300 Subject: [PATCH 4/6] Fix compiling --- .../shared/src/main/scala/zio/interop/package.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala index f25998a5..382eb3f3 100644 --- a/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala +++ b/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala @@ -54,7 +54,7 @@ package object interop { fiber.unsafe.removeObserver(completeCb) fiber.tellInterrupt(Cause.interrupt(fiber.id)) // Allow the interruption to be interrupted - Some(fiber.unsafe.removeObserver(interruptCb)) + Some(F.delay(fiber.unsafe.removeObserver(interruptCb))) } })) case Right(v) => Right(v) // No need to invoke the callback, sync resumption will take place From cbe8bc5dd30fdc26e34adbe396cf3ae14af40fa4 Mon Sep 17 00:00:00 2001 From: Kyri Petrou Date: Thu, 25 Jul 2024 09:05:42 +0300 Subject: [PATCH 5/6] Make sure to complete the interruption CB --- .../shared/src/main/scala/zio/interop/package.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala index 382eb3f3..bd083a0f 100644 --- a/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala +++ b/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala @@ -54,7 +54,10 @@ package object interop { fiber.unsafe.removeObserver(completeCb) fiber.tellInterrupt(Cause.interrupt(fiber.id)) // Allow the interruption to be interrupted - Some(F.delay(fiber.unsafe.removeObserver(interruptCb))) + Some(F.delay { + fiber.unsafe.removeObserver(interruptCb) + interruptCb(null) + }) } })) case Right(v) => Right(v) // No need to invoke the callback, sync resumption will take place From 0a0071cf9b59e475b010d7f6aac52104bf93ab50 Mon Sep 17 00:00:00 2001 From: Kyri Petrou Date: Fri, 26 Jul 2024 19:53:12 +0300 Subject: [PATCH 6/6] Simplify logic by making interruption un-cancellable --- .../src/main/scala/zio/interop/package.scala | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala index bd083a0f..9736e6f7 100644 --- a/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala +++ b/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala @@ -47,18 +47,10 @@ package object interop { case Left(fiber) => val completeCb = (exit: Exit[Throwable, A]) => cb(Right(exit)) fiber.unsafe.addObserver(completeCb) - Left(Some(F.async[Unit] { cb => - F.delay { - val interruptCb = (_: Exit[Throwable, A]) => cb(Right(())) - fiber.unsafe.addObserver(interruptCb) - fiber.unsafe.removeObserver(completeCb) - fiber.tellInterrupt(Cause.interrupt(fiber.id)) - // Allow the interruption to be interrupted - Some(F.delay { - fiber.unsafe.removeObserver(interruptCb) - interruptCb(null) - }) - } + Left(Some(F.async_ { cb => + fiber.unsafe.addObserver(_ => cb(Right(()))) + fiber.unsafe.removeObserver(completeCb) + fiber.tellInterrupt(Cause.interrupt(fiber.id)) })) case Right(v) => Right(v) // No need to invoke the callback, sync resumption will take place }