From 59d0af48306f1d18fab5a1799fb6b1a29bffeac6 Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Mon, 29 Jul 2024 23:16:42 +0200 Subject: [PATCH] Don't use Scala Futures in Java APIs Sketching out #1417 - incomplete and notably not bothering with binary compatibility yet, just to illustrate the idea. --- .../org/apache/pekko/dispatch/Future.scala | 5 +++ .../java/jdocs/future/ActorWithFuture.java | 5 +-- .../test/java/jdocs/future/FutureDocTest.java | 34 ++++++++----------- .../LambdaPersistencePluginDocTest.java | 23 +++++++------ .../test/java/jdocs/stream/FlowDocTest.java | 5 ++- .../journal/japi/AsyncWritePlugin.java | 7 ++-- .../snapshot/japi/SnapshotStorePlugin.java | 9 ++--- .../journal/japi/AsyncWriteJournal.scala | 8 +++-- .../snapshot/japi/SnapshotStore.scala | 9 ++--- 9 files changed, 55 insertions(+), 50 deletions(-) diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/Future.scala b/actor/src/main/scala/org/apache/pekko/dispatch/Future.scala index 3099bcf6f93..86e269d4f63 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/Future.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/Future.scala @@ -130,6 +130,7 @@ object Futures { * @param executor the execution context on which the future is run * @return the `Future` holding the result of the computation */ + //@deprecated("Use CompletionStage / CompletableFuture-based APIs instead", "1.1.0") def future[T](body: Callable[T], executor: ExecutionContext): Future[T] = Future(body.call)(executor) /** @@ -142,6 +143,7 @@ object Futures { /** * creates an already completed Promise with the specified exception */ + //@deprecated("Use CompletionStage / CompletableFuture-based APIs instead", "1.1.0") def failed[T](exception: Throwable): Future[T] = Future.failed(exception) /** @@ -151,6 +153,8 @@ object Futures { /** * Creates an already completed CompletionStage with the specified exception + * + * Note: prefer CompletableFuture.failedStage(ex) from Java 9 onwards */ def failedCompletionStage[T](ex: Throwable): CompletionStage[T] = { val f = CompletableFuture.completedFuture[T](null.asInstanceOf[T]) @@ -172,6 +176,7 @@ object Futures { /** * Returns a Future to the result of the first future in the list that is completed */ + //@deprecated("Use CompletableFuture.anyOf instead", "1.1.0") def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], executor: ExecutionContext): Future[T] = Future.firstCompletedOf(futures.asScala)(executor) diff --git a/docs/src/test/java/jdocs/future/ActorWithFuture.java b/docs/src/test/java/jdocs/future/ActorWithFuture.java index 9600b44011b..19553f1a1ce 100644 --- a/docs/src/test/java/jdocs/future/ActorWithFuture.java +++ b/docs/src/test/java/jdocs/future/ActorWithFuture.java @@ -15,11 +15,12 @@ // #context-dispatcher import org.apache.pekko.actor.AbstractActor; -import org.apache.pekko.dispatch.Futures; + +import java.util.concurrent.CompletableFuture; public class ActorWithFuture extends AbstractActor { ActorWithFuture() { - Futures.future(() -> "hello", getContext().dispatcher()); + CompletableFuture.supplyAsync(() -> "hello", getContext().dispatcher()); } @Override diff --git a/docs/src/test/java/jdocs/future/FutureDocTest.java b/docs/src/test/java/jdocs/future/FutureDocTest.java index 5627b1d4e12..f6059f4dd52 100644 --- a/docs/src/test/java/jdocs/future/FutureDocTest.java +++ b/docs/src/test/java/jdocs/future/FutureDocTest.java @@ -14,27 +14,18 @@ package jdocs.future; import org.apache.pekko.actor.typed.ActorSystem; -import org.apache.pekko.dispatch.Futures; import org.apache.pekko.pattern.Patterns; import org.apache.pekko.testkit.PekkoJUnitActorSystemResource; import org.apache.pekko.testkit.PekkoSpec; import org.apache.pekko.util.Timeout; -import org.apache.pekko.util.FutureConverters; import jdocs.AbstractJavaTest; import org.junit.ClassRule; import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; import java.time.Duration; -import java.util.Arrays; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; +import java.util.concurrent.*; import static org.apache.pekko.actor.typed.javadsl.Adapter.toTyped; -import static org.apache.pekko.dispatch.Futures.future; // #imports // #imports @@ -50,7 +41,7 @@ public class FutureDocTest extends AbstractJavaTest { @Test(expected = java.util.concurrent.CompletionException.class) public void useAfter() throws Exception { - final ExecutionContext ec = system.executionContext(); + final Executor ex = system.executionContext(); // #after CompletionStage failWithException = CompletableFuture.supplyAsync( @@ -60,18 +51,21 @@ public void useAfter() throws Exception { CompletionStage delayed = Patterns.after(Duration.ofMillis(200), system, () -> failWithException); // #after - Future future = - future( + CompletionStage completionStage = + CompletableFuture.supplyAsync( () -> { - Thread.sleep(1000); - return "foo"; + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return "foo"; }, - ec); - Future result = - Futures.firstCompletedOf( - Arrays.>asList(future, FutureConverters.asScala(delayed)), ec); + ex); + CompletableFuture result = + CompletableFuture.anyOf(completionStage.toCompletableFuture(), delayed.toCompletableFuture()); Timeout timeout = Timeout.create(Duration.ofSeconds(2)); - Await.result(result, timeout.duration()); + result.toCompletableFuture().get(2, SECONDS); } @Test diff --git a/docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java b/docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java index b951163911a..72b83f51b35 100644 --- a/docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java +++ b/docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java @@ -33,6 +33,9 @@ import org.junit.runner.RunWith; import org.scalatestplus.junit.JUnitRunner; import scala.concurrent.Future; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.function.Consumer; import org.iq80.leveldb.util.FileUtils; import java.util.Optional; @@ -81,45 +84,45 @@ public Receive createReceive() { class MySnapshotStore extends SnapshotStore { @Override - public Future> doLoadAsync( + public CompletionStage> doLoadAsync( String persistenceId, SnapshotSelectionCriteria criteria) { return null; } @Override - public Future doSaveAsync(SnapshotMetadata metadata, Object snapshot) { + public CompletionStage doSaveAsync(SnapshotMetadata metadata, Object snapshot) { return null; } @Override - public Future doDeleteAsync(SnapshotMetadata metadata) { - return Futures.successful(null); + public CompletionStage doDeleteAsync(SnapshotMetadata metadata) { + return CompletableFuture.completedFuture(null); } @Override - public Future doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) { - return Futures.successful(null); + public CompletionStage doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) { + return CompletableFuture.completedFuture(null); } } class MyAsyncJournal extends AsyncWriteJournal { // #sync-journal-plugin-api @Override - public Future>> doAsyncWriteMessages( + public CompletionStage>> doAsyncWriteMessages( Iterable messages) { try { Iterable> result = new ArrayList>(); // blocking call here... // result.add(..) - return Futures.successful(result); + return CompletableFuture.completedFuture(result); } catch (Exception e) { - return Futures.failed(e); + return Futures.failedCompletionStage(e); } } // #sync-journal-plugin-api @Override - public Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) { + public CompletionStage doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) { return null; } diff --git a/docs/src/test/java/jdocs/stream/FlowDocTest.java b/docs/src/test/java/jdocs/stream/FlowDocTest.java index 82f04a08bab..56cbbf28b9d 100644 --- a/docs/src/test/java/jdocs/stream/FlowDocTest.java +++ b/docs/src/test/java/jdocs/stream/FlowDocTest.java @@ -19,7 +19,6 @@ import org.apache.pekko.actor.ActorRef; import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.actor.Cancellable; -import org.apache.pekko.dispatch.Futures; import org.apache.pekko.japi.Pair; import org.apache.pekko.stream.*; import org.apache.pekko.stream.javadsl.*; @@ -158,8 +157,8 @@ public void creatingSourcesSinks() throws Exception { list.add(3); Source.from(list); - // Create a source form a Future - Source.future(Futures.successful("Hello Streams!")); + // Create a source form a CompletionStage + Source.completionStage(CompletableFuture.completedFuture("Hello Streams!")); // Create a source from a single element Source.single("only one element"); diff --git a/persistence/src/main/java/org/apache/pekko/persistence/journal/japi/AsyncWritePlugin.java b/persistence/src/main/java/org/apache/pekko/persistence/journal/japi/AsyncWritePlugin.java index 3eba7b5cde0..639ffbd9083 100644 --- a/persistence/src/main/java/org/apache/pekko/persistence/journal/japi/AsyncWritePlugin.java +++ b/persistence/src/main/java/org/apache/pekko/persistence/journal/japi/AsyncWritePlugin.java @@ -14,8 +14,7 @@ package org.apache.pekko.persistence.journal.japi; import java.util.Optional; - -import scala.concurrent.Future; +import java.util.concurrent.CompletionStage; import org.apache.pekko.persistence.*; @@ -73,7 +72,7 @@ interface AsyncWritePlugin { * *

This call is protected with a circuit-breaker. */ - Future>> doAsyncWriteMessages(Iterable messages); + CompletionStage>> doAsyncWriteMessages(Iterable messages); /** * Java API, Plugin API: synchronously deletes all persistent messages up to `toSequenceNr`. @@ -82,6 +81,6 @@ interface AsyncWritePlugin { * * @see AsyncRecoveryPlugin */ - Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr); + CompletionStage doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr); // #async-write-plugin-api } diff --git a/persistence/src/main/java/org/apache/pekko/persistence/snapshot/japi/SnapshotStorePlugin.java b/persistence/src/main/java/org/apache/pekko/persistence/snapshot/japi/SnapshotStorePlugin.java index 20313bb3184..047ce99bbfd 100644 --- a/persistence/src/main/java/org/apache/pekko/persistence/snapshot/japi/SnapshotStorePlugin.java +++ b/persistence/src/main/java/org/apache/pekko/persistence/snapshot/japi/SnapshotStorePlugin.java @@ -19,6 +19,7 @@ import scala.concurrent.Future; import java.util.Optional; +import java.util.concurrent.CompletionStage; interface SnapshotStorePlugin { // #snapshot-store-plugin-api @@ -28,7 +29,7 @@ interface SnapshotStorePlugin { * @param persistenceId id of the persistent actor. * @param criteria selection criteria for loading. */ - Future> doLoadAsync( + CompletionStage> doLoadAsync( String persistenceId, SnapshotSelectionCriteria criteria); /** @@ -37,14 +38,14 @@ Future> doLoadAsync( * @param metadata snapshot metadata. * @param snapshot snapshot. */ - Future doSaveAsync(SnapshotMetadata metadata, Object snapshot); + CompletionStage doSaveAsync(SnapshotMetadata metadata, Object snapshot); /** * Java API, Plugin API: deletes the snapshot identified by `metadata`. * * @param metadata snapshot metadata. */ - Future doDeleteAsync(SnapshotMetadata metadata); + CompletionStage doDeleteAsync(SnapshotMetadata metadata); /** * Java API, Plugin API: deletes all snapshots matching `criteria`. @@ -52,6 +53,6 @@ Future> doLoadAsync( * @param persistenceId id of the persistent actor. * @param criteria selection criteria for deleting. */ - Future doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria); + CompletionStage doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria); // #snapshot-store-plugin-api } diff --git a/persistence/src/main/scala/org/apache/pekko/persistence/journal/japi/AsyncWriteJournal.scala b/persistence/src/main/scala/org/apache/pekko/persistence/journal/japi/AsyncWriteJournal.scala index ef74c5302a7..7fbb6e76059 100644 --- a/persistence/src/main/scala/org/apache/pekko/persistence/journal/japi/AsyncWriteJournal.scala +++ b/persistence/src/main/scala/org/apache/pekko/persistence/journal/japi/AsyncWriteJournal.scala @@ -21,6 +21,7 @@ import scala.util.Try import org.apache.pekko import pekko.persistence._ import pekko.persistence.journal.{ AsyncWriteJournal => SAsyncWriteJournal } +import pekko.util.FutureConverters._ import pekko.util.ccompat._ import pekko.util.ccompat.JavaConverters._ @@ -33,7 +34,7 @@ abstract class AsyncWriteJournal extends AsyncRecovery with SAsyncWriteJournal w import context.dispatcher final def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = - doAsyncWriteMessages(messages.asJava).map { results => + doAsyncWriteMessages(messages.asJava).asScala.map { results => results.asScala.iterator .map { r => if (r.isPresent) Failure(r.get) @@ -42,6 +43,7 @@ abstract class AsyncWriteJournal extends AsyncRecovery with SAsyncWriteJournal w .to(immutable.IndexedSeq) } - final def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long) = - doAsyncDeleteMessagesTo(persistenceId, toSequenceNr).map(_ => ()) + final def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = { + doAsyncDeleteMessagesTo(persistenceId, toSequenceNr).asScala.map(_ => ()) + } } diff --git a/persistence/src/main/scala/org/apache/pekko/persistence/snapshot/japi/SnapshotStore.scala b/persistence/src/main/scala/org/apache/pekko/persistence/snapshot/japi/SnapshotStore.scala index ca7f1e9d9fa..fd35c6af74b 100644 --- a/persistence/src/main/scala/org/apache/pekko/persistence/snapshot/japi/SnapshotStore.scala +++ b/persistence/src/main/scala/org/apache/pekko/persistence/snapshot/japi/SnapshotStore.scala @@ -16,6 +16,7 @@ package org.apache.pekko.persistence.snapshot.japi import scala.concurrent.Future import org.apache.pekko +import pekko.util.FutureConverters._ import pekko.japi.Util._ import pekko.persistence._ import pekko.persistence.snapshot.{ SnapshotStore => SSnapshotStore } @@ -29,15 +30,15 @@ abstract class SnapshotStore extends SSnapshotStore with SnapshotStorePlugin { override final def loadAsync( persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = - doLoadAsync(persistenceId, criteria).map(option) + doLoadAsync(persistenceId, criteria).asScala.map(option) override final def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = - doSaveAsync(metadata, snapshot).map(_ => ()) + doSaveAsync(metadata, snapshot).asScala.map(_ => ()) override final def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = - doDeleteAsync(metadata).map(_ => ()) + doDeleteAsync(metadata).asScala.map(_ => ()) override final def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = - doDeleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria).map(_ => ()) + doDeleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria).asScala.map(_ => ()) }