From 719e4de8062f06d9ba109552eb1fb6f1c9ba4ab9 Mon Sep 17 00:00:00 2001 From: Regis Desgroppes Date: Fri, 5 Nov 2021 16:50:23 +0100 Subject: [PATCH] Allow to pass an `Executor` when converting `Future`s This is to circumvent #179. --- .../scala/compat/java8/FutureConverters.scala | 48 +++++++++++++++++-- .../java8/FutureConvertersImpl.scala | 30 ++++++------ 2 files changed, 61 insertions(+), 17 deletions(-) diff --git a/src/main/scala/scala/compat/java8/FutureConverters.scala b/src/main/scala/scala/compat/java8/FutureConverters.scala index 7f75ee0..f65a960 100644 --- a/src/main/scala/scala/compat/java8/FutureConverters.scala +++ b/src/main/scala/scala/compat/java8/FutureConverters.scala @@ -17,7 +17,7 @@ import scala.language.implicitConversions import scala.concurrent.java8.FuturesConvertersImpl._ import scala.concurrent.java8.FuturesConvertersImplCompat._ import scala.concurrent.{ Future, Promise, ExecutionContext, ExecutionContextExecutorService, ExecutionContextExecutor } -import java.util.concurrent.{ CompletionStage, Executor, ExecutorService } +import java.util.concurrent.{ CompletionStage, Executor, ExecutorService, ForkJoinPool } import java.util.function.Consumer /** @@ -59,16 +59,38 @@ object FutureConverters { * transformations to their asynchronous counterparts, i.e. * thenRun will internally call thenRunAsync. * + * Callbacks will run on ForkJoinPool.commonPool(), unless it does not + * support a parallelism level of at least two, in which case a new Thread + * is used. + * * @param f The Scala Future which may eventually supply the completion for * the returned CompletionStage * @return a CompletionStage that runs all callbacks asynchronously and does * not support the CompletableFuture interface */ - def toJava[T](f: Future[T]): CompletionStage[T] = { + def toJava[T](f: Future[T]): CompletionStage[T] = toJava(f, ForkJoinPool.commonPool()) + + /** + * Returns a CompletionStage that will be completed with the same value or + * exception as the given Scala Future when that completes. Since the Future is a read-only + * representation, this CompletionStage does not support the + * toCompletableFuture method. The semantics of Scala Future + * demand that all callbacks are invoked asynchronously by default, therefore + * the returned CompletionStage routes all calls to synchronous + * transformations to their asynchronous counterparts, i.e. + * thenRun will internally call thenRunAsync. + * + * @param f The Scala Future which may eventually supply the completion for + * the returned CompletionStage + * @param e The Java Executor onto which schedule the callbacks + * @return a CompletionStage that runs all callbacks asynchronously and does + * not support the CompletableFuture interface + */ + def toJava[T](f: Future[T], e: Executor): CompletionStage[T] = { f match { case p: P[T @unchecked] => p.wrapped case _ => - val cf = new CF[T](f) + val cf = new CF[T](f, e) implicit val ec = InternalCallbackExecutor f onComplete cf cf @@ -189,10 +211,30 @@ object FutureConverters { * transformations to their asynchronous counterparts, i.e. * thenRun will internally call thenRunAsync. * + * Callbacks will run on ForkJoinPool.commonPool(), unless it does not + * support a parallelism level of at least two, in which case a new Thread + * is used. + * * @return a CompletionStage that runs all callbacks asynchronously and does * not support the CompletableFuture interface */ def toJava: CompletionStage[T] = FutureConverters.toJava(__self) + + /** + * Returns a CompletionStage that will be completed with the same value or + * exception as the given Scala Future when that completes. Since the Future is a read-only + * representation, this CompletionStage does not support the + * toCompletableFuture method. The semantics of Scala Future + * demand that all callbacks are invoked asynchronously by default, therefore + * the returned CompletionStage routes all calls to synchronous + * transformations to their asynchronous counterparts, i.e. + * thenRun will internally call thenRunAsync. + * + * @param e The Java Executor onto which schedule the callbacks + * @return a CompletionStage that runs all callbacks asynchronously and does + * not support the CompletableFuture interface + */ + def toJava(e: Executor): CompletionStage[T] = FutureConverters.toJava(__self, e) } implicit def CompletionStageOps[T](cs: CompletionStage[T]): CompletionStageOps[T] = new CompletionStageOps(cs) diff --git a/src/main/scala/scala/concurrent/java8/FutureConvertersImpl.scala b/src/main/scala/scala/concurrent/java8/FutureConvertersImpl.scala index 3099d6e..9e0c3bf 100644 --- a/src/main/scala/scala/concurrent/java8/FutureConvertersImpl.scala +++ b/src/main/scala/scala/concurrent/java8/FutureConvertersImpl.scala @@ -23,7 +23,9 @@ import scala.util.{Failure, Success, Try} // TODO: make this private[scala] when genjavadoc allows for that. object FuturesConvertersImpl { - class CF[T](val wrapped: Future[T]) extends CompletableFuture[T] with (Try[T] => Unit) { + class CF[T](val wrapped: Future[T], executor: Executor) extends CompletableFuture[T] with (Try[T] => Unit) { + def this(wrapped: Future[T]) = this(wrapped, ForkJoinPool.commonPool()) + override def apply(t: Try[T]): Unit = t match { case Success(v) => complete(v) case Failure(e) => completeExceptionally(e) @@ -32,29 +34,29 @@ object FuturesConvertersImpl { /* * Ensure that completions of this future cannot hold the Scala Future’s completer hostage. */ - override def thenApply[U](fn: JF[_ >: T, _ <: U]): CompletableFuture[U] = thenApplyAsync(fn) + override def thenApply[U](fn: JF[_ >: T, _ <: U]): CompletableFuture[U] = thenApplyAsync(fn, executor) - override def thenAccept(fn: Consumer[_ >: T]): CompletableFuture[Void] = thenAcceptAsync(fn) + override def thenAccept(fn: Consumer[_ >: T]): CompletableFuture[Void] = thenAcceptAsync(fn, executor) - override def thenRun(fn: Runnable): CompletableFuture[Void] = thenRunAsync(fn) + override def thenRun(fn: Runnable): CompletableFuture[Void] = thenRunAsync(fn, executor) - override def thenCombine[U, V](cs: CompletionStage[_ <: U], fn: BiFunction[_ >: T, _ >: U, _ <: V]): CompletableFuture[V] = thenCombineAsync(cs, fn) + override def thenCombine[U, V](cs: CompletionStage[_ <: U], fn: BiFunction[_ >: T, _ >: U, _ <: V]): CompletableFuture[V] = thenCombineAsync(cs, fn, executor) - override def thenAcceptBoth[U](cs: CompletionStage[_ <: U], fn: BiConsumer[_ >: T, _ >: U]): CompletableFuture[Void] = thenAcceptBothAsync(cs, fn) + override def thenAcceptBoth[U](cs: CompletionStage[_ <: U], fn: BiConsumer[_ >: T, _ >: U]): CompletableFuture[Void] = thenAcceptBothAsync(cs, fn, executor) - override def runAfterBoth(cs: CompletionStage[_], fn: Runnable): CompletableFuture[Void] = runAfterBothAsync(cs, fn) + override def runAfterBoth(cs: CompletionStage[_], fn: Runnable): CompletableFuture[Void] = runAfterBothAsync(cs, fn, executor) - override def applyToEither[U](cs: CompletionStage[_ <: T], fn: JF[_ >: T, U]): CompletableFuture[U] = applyToEitherAsync(cs, fn) + override def applyToEither[U](cs: CompletionStage[_ <: T], fn: JF[_ >: T, U]): CompletableFuture[U] = applyToEitherAsync(cs, fn, executor) - override def acceptEither(cs: CompletionStage[_ <: T], fn: Consumer[_ >: T]): CompletableFuture[Void] = acceptEitherAsync(cs, fn) + override def acceptEither(cs: CompletionStage[_ <: T], fn: Consumer[_ >: T]): CompletableFuture[Void] = acceptEitherAsync(cs, fn, executor) - override def runAfterEither(cs: CompletionStage[_], fn: Runnable): CompletableFuture[Void] = runAfterEitherAsync(cs, fn) + override def runAfterEither(cs: CompletionStage[_], fn: Runnable): CompletableFuture[Void] = runAfterEitherAsync(cs, fn, executor) - override def thenCompose[U](fn: JF[_ >: T, _ <: CompletionStage[U]]): CompletableFuture[U] = thenComposeAsync(fn) + override def thenCompose[U](fn: JF[_ >: T, _ <: CompletionStage[U]]): CompletableFuture[U] = thenComposeAsync(fn, executor) - override def whenComplete(fn: BiConsumer[_ >: T, _ >: Throwable]): CompletableFuture[T] = whenCompleteAsync(fn) + override def whenComplete(fn: BiConsumer[_ >: T, _ >: Throwable]): CompletableFuture[T] = whenCompleteAsync(fn, executor) - override def handle[U](fn: BiFunction[_ >: T, Throwable, _ <: U]): CompletableFuture[U] = handleAsync(fn) + override def handle[U](fn: BiFunction[_ >: T, Throwable, _ <: U]): CompletableFuture[U] = handleAsync(fn, executor) override def exceptionally(fn: JF[Throwable, _ <: T]): CompletableFuture[T] = { val cf = new CompletableFuture[T] @@ -71,7 +73,7 @@ object FuturesConvertersImpl { if (n ne this) cf.complete(n.asInstanceOf[T]) } } - }) + }, executor) cf }