diff --git a/actor-tests/src/test/java/org/apache/pekko/dispatch/JavaFutureTests.java b/actor-tests/src/test/java/org/apache/pekko/dispatch/JavaFutureTests.java index 6af8c2b399c..d5f9b193b4c 100644 --- a/actor-tests/src/test/java/org/apache/pekko/dispatch/JavaFutureTests.java +++ b/actor-tests/src/test/java/org/apache/pekko/dispatch/JavaFutureTests.java @@ -36,6 +36,7 @@ import org.apache.pekko.testkit.PekkoSpec; +@SuppressWarnings("deprecation") public class JavaFutureTests extends JUnitSuite { @ClassRule diff --git a/actor-tests/src/test/java/org/apache/pekko/pattern/PatternsTest.java b/actor-tests/src/test/java/org/apache/pekko/pattern/PatternsTest.java index aa75a4f3a44..4f568facf75 100644 --- a/actor-tests/src/test/java/org/apache/pekko/pattern/PatternsTest.java +++ b/actor-tests/src/test/java/org/apache/pekko/pattern/PatternsTest.java @@ -19,6 +19,7 @@ import org.apache.pekko.testkit.PekkoSpec; import org.apache.pekko.testkit.TestProbe; import org.apache.pekko.util.Timeout; +import org.apache.pekko.util.FutureConverters; import org.junit.ClassRule; import org.junit.Test; import org.scalatestplus.junit.JUnitSuite; @@ -317,19 +318,21 @@ public void testCSRetry() throws Exception { } @Test(expected = IllegalStateException.class) - public void testAfterFailedCallable() throws Exception { - Callable> failedCallable = - () -> Futures.failed(new IllegalStateException("Illegal!")); + public void testAfterFailedCallable() throws Throwable { + Callable> failedCallable = + () -> Futures.failedCompletionStage(new IllegalStateException("Illegal!")); - Future delayedFuture = + CompletionStage delayedFuture = Patterns.after( - scala.concurrent.duration.Duration.create(200, "millis"), - system.scheduler(), - ec, + Duration.ofMillis(200), + system, failedCallable); - Future resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture), ec); - Await.result(resultFuture, scala.concurrent.duration.FiniteDuration.apply(3, SECONDS)); + try { + delayedFuture.toCompletableFuture().get(3, SECONDS); + } catch (ExecutionException e) { + throw e.getCause(); + } } @Test(expected = IllegalStateException.class) @@ -340,7 +343,7 @@ public void testAfterFailedFuture() throws Exception { scala.concurrent.duration.Duration.create(200, "millis"), system.scheduler(), ec, - () -> Futures.failed(new IllegalStateException("Illegal!"))); + () -> FutureConverters.asScala(Futures.failedCompletionStage(new IllegalStateException("Illegal!")))); Future resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture), ec); Await.result(resultFuture, FiniteDuration.apply(3, SECONDS)); @@ -391,7 +394,7 @@ public void testAfterFiniteDuration() throws Exception { ec, () -> Futures.successful("world")); - Future immediateFuture = Futures.future(() -> expected, ec); + Future immediateFuture = FutureConverters.asScala(CompletableFuture.completedFuture(expected)); Future resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture, immediateFuture), ec); diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/Future.scala b/actor/src/main/scala/org/apache/pekko/dispatch/Future.scala index 3099bcf6f93..5e894837e8b 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/Future.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/Future.scala @@ -130,6 +130,7 @@ object Futures { * @param executor the execution context on which the future is run * @return the `Future` holding the result of the computation */ + @deprecated("Use CompletionStage / CompletableFuture-based APIs instead", "1.1.0") def future[T](body: Callable[T], executor: ExecutionContext): Future[T] = Future(body.call)(executor) /** @@ -142,6 +143,7 @@ object Futures { /** * creates an already completed Promise with the specified exception */ + @deprecated("Use CompletionStage / CompletableFuture-based APIs instead", "1.1.0") def failed[T](exception: Throwable): Future[T] = Future.failed(exception) /** @@ -151,6 +153,8 @@ object Futures { /** * Creates an already completed CompletionStage with the specified exception + * + * Note: prefer CompletableFuture.failedStage(ex) from Java 9 onwards */ def failedCompletionStage[T](ex: Throwable): CompletionStage[T] = { val f = CompletableFuture.completedFuture[T](null.asInstanceOf[T]) @@ -172,6 +176,7 @@ object Futures { /** * Returns a Future to the result of the first future in the list that is completed */ + //@deprecated("Use CompletableFuture.anyOf instead", "1.1.0") def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], executor: ExecutionContext): Future[T] = Future.firstCompletedOf(futures.asScala)(executor) diff --git a/docs/src/test/java/jdocs/future/ActorWithFuture.java b/docs/src/test/java/jdocs/future/ActorWithFuture.java index 9600b44011b..19553f1a1ce 100644 --- a/docs/src/test/java/jdocs/future/ActorWithFuture.java +++ b/docs/src/test/java/jdocs/future/ActorWithFuture.java @@ -15,11 +15,12 @@ // #context-dispatcher import org.apache.pekko.actor.AbstractActor; -import org.apache.pekko.dispatch.Futures; + +import java.util.concurrent.CompletableFuture; public class ActorWithFuture extends AbstractActor { ActorWithFuture() { - Futures.future(() -> "hello", getContext().dispatcher()); + CompletableFuture.supplyAsync(() -> "hello", getContext().dispatcher()); } @Override diff --git a/docs/src/test/java/jdocs/future/FutureDocTest.java b/docs/src/test/java/jdocs/future/FutureDocTest.java index 5627b1d4e12..f6059f4dd52 100644 --- a/docs/src/test/java/jdocs/future/FutureDocTest.java +++ b/docs/src/test/java/jdocs/future/FutureDocTest.java @@ -14,27 +14,18 @@ package jdocs.future; import org.apache.pekko.actor.typed.ActorSystem; -import org.apache.pekko.dispatch.Futures; import org.apache.pekko.pattern.Patterns; import org.apache.pekko.testkit.PekkoJUnitActorSystemResource; import org.apache.pekko.testkit.PekkoSpec; import org.apache.pekko.util.Timeout; -import org.apache.pekko.util.FutureConverters; import jdocs.AbstractJavaTest; import org.junit.ClassRule; import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; import java.time.Duration; -import java.util.Arrays; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; +import java.util.concurrent.*; import static org.apache.pekko.actor.typed.javadsl.Adapter.toTyped; -import static org.apache.pekko.dispatch.Futures.future; // #imports // #imports @@ -50,7 +41,7 @@ public class FutureDocTest extends AbstractJavaTest { @Test(expected = java.util.concurrent.CompletionException.class) public void useAfter() throws Exception { - final ExecutionContext ec = system.executionContext(); + final Executor ex = system.executionContext(); // #after CompletionStage failWithException = CompletableFuture.supplyAsync( @@ -60,18 +51,21 @@ public void useAfter() throws Exception { CompletionStage delayed = Patterns.after(Duration.ofMillis(200), system, () -> failWithException); // #after - Future future = - future( + CompletionStage completionStage = + CompletableFuture.supplyAsync( () -> { - Thread.sleep(1000); - return "foo"; + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return "foo"; }, - ec); - Future result = - Futures.firstCompletedOf( - Arrays.>asList(future, FutureConverters.asScala(delayed)), ec); + ex); + CompletableFuture result = + CompletableFuture.anyOf(completionStage.toCompletableFuture(), delayed.toCompletableFuture()); Timeout timeout = Timeout.create(Duration.ofSeconds(2)); - Await.result(result, timeout.duration()); + result.toCompletableFuture().get(2, SECONDS); } @Test diff --git a/docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java b/docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java index b951163911a..72b83f51b35 100644 --- a/docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java +++ b/docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java @@ -33,6 +33,9 @@ import org.junit.runner.RunWith; import org.scalatestplus.junit.JUnitRunner; import scala.concurrent.Future; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.function.Consumer; import org.iq80.leveldb.util.FileUtils; import java.util.Optional; @@ -81,45 +84,45 @@ public Receive createReceive() { class MySnapshotStore extends SnapshotStore { @Override - public Future> doLoadAsync( + public CompletionStage> doLoadAsync( String persistenceId, SnapshotSelectionCriteria criteria) { return null; } @Override - public Future doSaveAsync(SnapshotMetadata metadata, Object snapshot) { + public CompletionStage doSaveAsync(SnapshotMetadata metadata, Object snapshot) { return null; } @Override - public Future doDeleteAsync(SnapshotMetadata metadata) { - return Futures.successful(null); + public CompletionStage doDeleteAsync(SnapshotMetadata metadata) { + return CompletableFuture.completedFuture(null); } @Override - public Future doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) { - return Futures.successful(null); + public CompletionStage doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) { + return CompletableFuture.completedFuture(null); } } class MyAsyncJournal extends AsyncWriteJournal { // #sync-journal-plugin-api @Override - public Future>> doAsyncWriteMessages( + public CompletionStage>> doAsyncWriteMessages( Iterable messages) { try { Iterable> result = new ArrayList>(); // blocking call here... // result.add(..) - return Futures.successful(result); + return CompletableFuture.completedFuture(result); } catch (Exception e) { - return Futures.failed(e); + return Futures.failedCompletionStage(e); } } // #sync-journal-plugin-api @Override - public Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) { + public CompletionStage doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) { return null; } diff --git a/docs/src/test/java/jdocs/stream/FlowDocTest.java b/docs/src/test/java/jdocs/stream/FlowDocTest.java index 82f04a08bab..56cbbf28b9d 100644 --- a/docs/src/test/java/jdocs/stream/FlowDocTest.java +++ b/docs/src/test/java/jdocs/stream/FlowDocTest.java @@ -19,7 +19,6 @@ import org.apache.pekko.actor.ActorRef; import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.actor.Cancellable; -import org.apache.pekko.dispatch.Futures; import org.apache.pekko.japi.Pair; import org.apache.pekko.stream.*; import org.apache.pekko.stream.javadsl.*; @@ -158,8 +157,8 @@ public void creatingSourcesSinks() throws Exception { list.add(3); Source.from(list); - // Create a source form a Future - Source.future(Futures.successful("Hello Streams!")); + // Create a source form a CompletionStage + Source.completionStage(CompletableFuture.completedFuture("Hello Streams!")); // Create a source from a single element Source.single("only one element"); diff --git a/persistence/src/main/java/org/apache/pekko/persistence/journal/japi/AsyncWritePlugin.java b/persistence/src/main/java/org/apache/pekko/persistence/journal/japi/AsyncWritePlugin.java index 3eba7b5cde0..639ffbd9083 100644 --- a/persistence/src/main/java/org/apache/pekko/persistence/journal/japi/AsyncWritePlugin.java +++ b/persistence/src/main/java/org/apache/pekko/persistence/journal/japi/AsyncWritePlugin.java @@ -14,8 +14,7 @@ package org.apache.pekko.persistence.journal.japi; import java.util.Optional; - -import scala.concurrent.Future; +import java.util.concurrent.CompletionStage; import org.apache.pekko.persistence.*; @@ -73,7 +72,7 @@ interface AsyncWritePlugin { * *

This call is protected with a circuit-breaker. */ - Future>> doAsyncWriteMessages(Iterable messages); + CompletionStage>> doAsyncWriteMessages(Iterable messages); /** * Java API, Plugin API: synchronously deletes all persistent messages up to `toSequenceNr`. @@ -82,6 +81,6 @@ interface AsyncWritePlugin { * * @see AsyncRecoveryPlugin */ - Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr); + CompletionStage doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr); // #async-write-plugin-api } diff --git a/persistence/src/main/java/org/apache/pekko/persistence/snapshot/japi/SnapshotStorePlugin.java b/persistence/src/main/java/org/apache/pekko/persistence/snapshot/japi/SnapshotStorePlugin.java index 20313bb3184..047ce99bbfd 100644 --- a/persistence/src/main/java/org/apache/pekko/persistence/snapshot/japi/SnapshotStorePlugin.java +++ b/persistence/src/main/java/org/apache/pekko/persistence/snapshot/japi/SnapshotStorePlugin.java @@ -19,6 +19,7 @@ import scala.concurrent.Future; import java.util.Optional; +import java.util.concurrent.CompletionStage; interface SnapshotStorePlugin { // #snapshot-store-plugin-api @@ -28,7 +29,7 @@ interface SnapshotStorePlugin { * @param persistenceId id of the persistent actor. * @param criteria selection criteria for loading. */ - Future> doLoadAsync( + CompletionStage> doLoadAsync( String persistenceId, SnapshotSelectionCriteria criteria); /** @@ -37,14 +38,14 @@ Future> doLoadAsync( * @param metadata snapshot metadata. * @param snapshot snapshot. */ - Future doSaveAsync(SnapshotMetadata metadata, Object snapshot); + CompletionStage doSaveAsync(SnapshotMetadata metadata, Object snapshot); /** * Java API, Plugin API: deletes the snapshot identified by `metadata`. * * @param metadata snapshot metadata. */ - Future doDeleteAsync(SnapshotMetadata metadata); + CompletionStage doDeleteAsync(SnapshotMetadata metadata); /** * Java API, Plugin API: deletes all snapshots matching `criteria`. @@ -52,6 +53,6 @@ Future> doLoadAsync( * @param persistenceId id of the persistent actor. * @param criteria selection criteria for deleting. */ - Future doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria); + CompletionStage doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria); // #snapshot-store-plugin-api } diff --git a/persistence/src/main/scala/org/apache/pekko/persistence/journal/japi/AsyncWriteJournal.scala b/persistence/src/main/scala/org/apache/pekko/persistence/journal/japi/AsyncWriteJournal.scala index ef74c5302a7..7fbb6e76059 100644 --- a/persistence/src/main/scala/org/apache/pekko/persistence/journal/japi/AsyncWriteJournal.scala +++ b/persistence/src/main/scala/org/apache/pekko/persistence/journal/japi/AsyncWriteJournal.scala @@ -21,6 +21,7 @@ import scala.util.Try import org.apache.pekko import pekko.persistence._ import pekko.persistence.journal.{ AsyncWriteJournal => SAsyncWriteJournal } +import pekko.util.FutureConverters._ import pekko.util.ccompat._ import pekko.util.ccompat.JavaConverters._ @@ -33,7 +34,7 @@ abstract class AsyncWriteJournal extends AsyncRecovery with SAsyncWriteJournal w import context.dispatcher final def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = - doAsyncWriteMessages(messages.asJava).map { results => + doAsyncWriteMessages(messages.asJava).asScala.map { results => results.asScala.iterator .map { r => if (r.isPresent) Failure(r.get) @@ -42,6 +43,7 @@ abstract class AsyncWriteJournal extends AsyncRecovery with SAsyncWriteJournal w .to(immutable.IndexedSeq) } - final def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long) = - doAsyncDeleteMessagesTo(persistenceId, toSequenceNr).map(_ => ()) + final def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = { + doAsyncDeleteMessagesTo(persistenceId, toSequenceNr).asScala.map(_ => ()) + } } diff --git a/persistence/src/main/scala/org/apache/pekko/persistence/snapshot/japi/SnapshotStore.scala b/persistence/src/main/scala/org/apache/pekko/persistence/snapshot/japi/SnapshotStore.scala index ca7f1e9d9fa..fd35c6af74b 100644 --- a/persistence/src/main/scala/org/apache/pekko/persistence/snapshot/japi/SnapshotStore.scala +++ b/persistence/src/main/scala/org/apache/pekko/persistence/snapshot/japi/SnapshotStore.scala @@ -16,6 +16,7 @@ package org.apache.pekko.persistence.snapshot.japi import scala.concurrent.Future import org.apache.pekko +import pekko.util.FutureConverters._ import pekko.japi.Util._ import pekko.persistence._ import pekko.persistence.snapshot.{ SnapshotStore => SSnapshotStore } @@ -29,15 +30,15 @@ abstract class SnapshotStore extends SSnapshotStore with SnapshotStorePlugin { override final def loadAsync( persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = - doLoadAsync(persistenceId, criteria).map(option) + doLoadAsync(persistenceId, criteria).asScala.map(option) override final def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = - doSaveAsync(metadata, snapshot).map(_ => ()) + doSaveAsync(metadata, snapshot).asScala.map(_ => ()) override final def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = - doDeleteAsync(metadata).map(_ => ()) + doDeleteAsync(metadata).asScala.map(_ => ()) override final def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = - doDeleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria).map(_ => ()) + doDeleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria).asScala.map(_ => ()) }