diff --git a/jvm-loom-addon/src/test/scala/cpsloomtest/TestPE.scala b/jvm-loom-addon/src/test/scala/cpsloomtest/TestPE.scala index ea896b83..03199039 100644 --- a/jvm-loom-addon/src/test/scala/cpsloomtest/TestPE.scala +++ b/jvm-loom-addon/src/test/scala/cpsloomtest/TestPE.scala @@ -6,7 +6,7 @@ import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, ConcurrentLin import scala.util.* import scala.collection.mutable.{Map, Queue} import scala.collection.concurrent.TrieMap -import scala.concurrent.{Await, BlockContext, CanAwait, blocking} +import scala.concurrent.{Await, BlockContext, CanAwait, ExecutionException, blocking} import org.junit.{Ignore, Test} import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} @@ -39,20 +39,32 @@ object PoorManEffect { def error[T](e: Throwable): PoorManEffect[T] = Error(e) def map[A, B](fa: PoorManEffect[A])(f: A => B): PoorManEffect[B] = fa match - case Pure(t) => Pure(f(t)) + case Pure(t) => + inTry(Pure(f(t))) case Error(e) => Error(e) - case Thunk(th) => Thunk((d) => map(th(d))(f)) + case Thunk(th) => Thunk((d) => map(inTry(th(d)))(f)) def flatMap[A, B](fa: PoorManEffect[A])(f: A => PoorManEffect[B]): PoorManEffect[B] = fa match - case Pure(t) => f(t) + case Pure(t) => + inTry(f(t)) case Error(e) => Error(e) - case Thunk(th) => Thunk(d => flatMap(th(d))(f)) + case Thunk(th) => Thunk(d => flatMap(inTry(th(d)))(f)) def flatMapTry[A, B](fa: PoorManEffect[A])(f: Try[A] => PoorManEffect[B]): PoorManEffect[B] = fa match - case Pure(t) => f(Success(t)) - case Error(e) => f(Failure(e)) - case Thunk(th) => Thunk(d => flatMapTry(th(d))(f)) + case Pure(t) => + inTry(f(Success(t))) + case Error(e) => + inTry(f(Failure(e))) + case Thunk(th) => + Thunk(d => flatMapTry(inTry(th(d)))(f)) + + def inTry[X](op: =>PoorManEffect[X]): PoorManEffect[X] = + try + op + catch + case NonFatal(ex) => + Error(ex) } @@ -142,9 +154,12 @@ object PoorManEffect { } } } - val retval = thunk - submitWaiter.synchronized { - submitWaiter.notifyAll() + val retval = try { + thunk + } finally { + submitWaiter.synchronized { + submitWaiter.notifyAll() + } } retval } @@ -159,7 +174,8 @@ object PoorManEffect { val v = runQueue.dequeue() v.pe match case Pure(t) => setWaiterResult(v.id, Success(t)) - case Error(e) => setWaiterResult(v.id, Failure(e)) + case Error(e) => + setWaiterResult(v.id, Failure(e)) case Thunk(th) => // here we can have call of block-context. nThunksInProcess.incrementAndGet() @@ -203,7 +219,12 @@ object PoorManEffect { val runner = new Runner() val id0 = runner.submit[A](pe) val resultFuture = runner.listenSubmitted[A](id0) - runner.process() + try + runner.process() + catch + case NonFatal(ex) => + println(s"Impossible, excwption from process: ${ex.getMessage}") + throw ex runner.checkSubmitted[A](id0) match case Some(Success(t)) => t.asInstanceOf[A] case Some(Failure(e)) => throw e @@ -220,7 +241,11 @@ object PoorManEffect { throw new RuntimeException(s"process finished, but no result for id ${id0}") } else { blocking { - resultFuture.get() + try + resultFuture.get() + catch + case ex: ExecutionException => + throw ex.getCause() } } @@ -244,8 +269,13 @@ class PoorManEffectRuntimeAwait(rt:PoorManEffect.RunAPI) extends CpsRuntimeAwait val cf = rt.listenSubmitted[A](id) // here execution of main loop of runner.process will be moved to other virtual thread. blocking{ - val retval = cf.get() - rt.forgetSubmitted(id) + val retval = try + cf.get() + catch + case ex: ExecutionException => + throw ex.getCause() + finally + rt.forgetSubmitted(id) retval } } @@ -314,7 +344,7 @@ class TestPE { } @Test - def testPMECatchExceptionFromAwait(): Int = { + def testPMECatchExceptionFromAwait(): Unit = { val c = async[PoorManEffect] { val list0 = MyList.create(1, 2, 3, 4, 5) try { @@ -325,6 +355,10 @@ class TestPE { case ex: RuntimeException => assert(ex.getMessage() == "test") 2 + case ex: Throwable => + println("Catch non-tuntime exception") + ex.printStackTrace() + throw ex } } val r = PoorManEffect.run(c)