diff --git a/CHANGES.txt b/CHANGES.txt index 528ed1ce..c3c91f1e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -3,9 +3,11 @@ 1, Improve Pair/Triple/Tuple... -2, Add firstResult/firstSuccessResult/lastResult/lastSuccessResult to AsyncExecutor. +2, Add firstResult/firstSuccessResult/lastResult/lastSuccessResult/parallelGet to CompletableFuture. -3, Improvements and bug fix. +3, Refactoring CompletableFuture/AsyncExecutor to keep consistent with CompletableFuture in JDK 8. + +4, Improvements and bug fix. ========Changes in 0.9.52========================================================================= diff --git a/lib/abacus-util-0.9.53.jar b/lib/abacus-util-0.9.53.jar index 59cf2e34..1c6caba8 100644 Binary files a/lib/abacus-util-0.9.53.jar and b/lib/abacus-util-0.9.53.jar differ diff --git a/lib/abacus-util-all-0.9.53.jar b/lib/abacus-util-all-0.9.53.jar index b1fc62ab..dc25d300 100644 Binary files a/lib/abacus-util-all-0.9.53.jar and b/lib/abacus-util-all-0.9.53.jar differ diff --git a/src/com/landawn/abacus/android/util/AsyncExecutor.java b/src/com/landawn/abacus/android/util/AsyncExecutor.java index f0f7f949..a0d4644c 100644 --- a/src/com/landawn/abacus/android/util/AsyncExecutor.java +++ b/src/com/landawn/abacus/android/util/AsyncExecutor.java @@ -16,23 +16,11 @@ package com.landawn.abacus.android.util; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.Executor; +import java.util.concurrent.FutureTask; import com.landawn.abacus.annotation.Beta; -import com.landawn.abacus.logging.Logger; -import com.landawn.abacus.logging.LoggerFactory; -import com.landawn.abacus.util.Callback; -import com.landawn.abacus.util.Holder; -import com.landawn.abacus.util.N; -import com.landawn.abacus.util.Optional; -import com.landawn.abacus.util.OptionalNullable; -import com.landawn.abacus.util.Pair; import com.landawn.abacus.util.Retry; import com.landawn.abacus.util.Retry.Retry0; import com.landawn.abacus.util.function.BiFunction; @@ -44,7 +32,7 @@ /** *
- * AsyncExecutor.executeInParallel(() -> { // download image }) + * AsyncExecutor.executeWithThreadPool(() -> { // download image }) .executeOnUiThread((e, image) -> {// refresh UI }) .execute((e, image) -> {// convert image to bitmap format}) .callbackOnUiThread((e, bitmap) -> {// update UI}); @@ -55,8 +43,10 @@ * @author Haiyang Li */ public class AsyncExecutor { - private static final Logger logger = LoggerFactory.getLogger(Asyn.class); - private static final Handler HANDLER = new Handler(Looper.getMainLooper()); + + static final UIExecutor UI_EXECUTOR = new UIExecutor(); + static final Executor SERIAL_EXECUTOR = AsyncTask.SERIAL_EXECUTOR; + static final Executor THREAD_POOL_EXECUTOR = AsyncTask.THREAD_POOL_EXECUTOR; private AsyncExecutor() { // Singleton @@ -69,7 +59,7 @@ private AsyncExecutor() { * @return */ public static CompletableFutureexecute(final Runnable action) { - return execute(new CompletableFuture (action, null)); + return execute(new FutureTask (action, null), SERIAL_EXECUTOR); } public static CompletableFuture execute(final Runnable action, final int retryTimes, final long retryInterval, @@ -82,17 +72,6 @@ public void run() { }); } - @Beta - static List > execute(final List extends Runnable> actions) { - final List > results = new ArrayList<>(actions.size()); - - for (Runnable cmd : actions) { - results.add(execute(cmd)); - } - - return results; - } - /** * The action will be asynchronously executed with {@code android.io.AsyncTask#SERIAL_EXECUTOR} in background. * @@ -100,7 +79,7 @@ static List > execute(final List extends Runnable> acti * @return */ public static CompletableFuture execute(final Callable action) { - return execute(new CompletableFuture<>(action)); + return execute(new FutureTask<>(action), SERIAL_EXECUTOR); } public static CompletableFuture execute(final Callable action, final int retryTimes, final long retryInterval, @@ -114,42 +93,19 @@ public T call() throws Exception { }); } - /** - * The actions will be asynchronously executed with {@code android.io.AsyncTask#SERIAL_EXECUTOR} in background. - * - * @param actions - * @return - */ - @Beta - static List > execute(final Collection extends Callable > actions) { - final List > results = new ArrayList<>(actions.size()); - - for (Callable cmd : actions) { - results.add(execute(cmd)); - } - - return results; - } - - private static CompletableFuture execute(final CompletableFuture callableFuture) { - AsyncTask.SERIAL_EXECUTOR.execute(callableFuture); - - return callableFuture; - } - /** * The action will be asynchronously executed with {@code android.io.AsyncTask#THREAD_POOL_EXECUTOR} in background. * * @param action * @return */ - public static CompletableFuture executeInParallel(final Runnable action) { - return executeInParallel(new CompletableFuture (action, null)); + public static CompletableFuture executeWithThreadPool(final Runnable action) { + return execute(new FutureTask (action, null), THREAD_POOL_EXECUTOR); } - public static CompletableFuture executeInParallel(final Runnable action, final int retryTimes, final long retryInterval, + public static CompletableFuture executeWithThreadPool(final Runnable action, final int retryTimes, final long retryInterval, final Function retryCondition) { - return executeInParallel(new Runnable() { + return executeWithThreadPool(new Runnable() { @Override public void run() { Retry.of(retryTimes, retryInterval, retryCondition).run(action); @@ -157,36 +113,19 @@ public void run() { }); } - /** - * The actions will be asynchronously executed with {@code android.io.AsyncTask#THREAD_POOL_EXECUTOR} in background. - * - * @param actions - * @return - */ - @Beta - static List > executeInParallel(final List extends Runnable> actions) { - final List > results = new ArrayList<>(actions.size()); - - for (Runnable cmd : actions) { - results.add(executeInParallel(cmd)); - } - - return results; - } - /** * The action will be asynchronously executed with {@code android.io.AsyncTask#THREAD_POOL_EXECUTOR} in background. * * @param action * @return */ - public static CompletableFuture executeInParallel(final Callable action) { - return executeInParallel(new CompletableFuture<>(action)); + public static CompletableFuture executeWithThreadPool(final Callable action) { + return execute(new FutureTask<>(action), THREAD_POOL_EXECUTOR); } - public static CompletableFuture executeInParallel(final Callable action, final int retryTimes, final long retryInterval, + public static CompletableFuture executeWithThreadPool(final Callable action, final int retryTimes, final long retryInterval, final BiFunction super T, Throwable, Boolean> retryCondition) { - return executeInParallel(new Callable () { + return executeWithThreadPool(new Callable () { @Override public T call() throws Exception { final Retry0 retry = Retry0.of(retryTimes, retryInterval, retryCondition); @@ -195,29 +134,6 @@ public T call() throws Exception { }); } - /** - * The actions will be asynchronously executed with {@code android.io.AsyncTask#THREAD_POOL_EXECUTOR} in background. - * - * @param actions - * @return - */ - @Beta - static List > executeInParallel(final Collection extends Callable > actions) { - final List > results = new ArrayList<>(actions.size()); - - for (Callable cmd : actions) { - results.add(executeInParallel(cmd)); - } - - return results; - } - - private static CompletableFuture executeInParallel(final CompletableFuture callableFuture) { - AsyncTask.THREAD_POOL_EXECUTOR.execute(callableFuture); - - return callableFuture; - } - /** * The action will be asynchronously executed in UI thread. * @@ -225,7 +141,7 @@ private static CompletableFuture executeInParallel(final CompletableFutur * @return */ public static CompletableFuture executeOnUiThread(final Runnable action) { - return executeOnUiThread(new CompletableFuture (action, null), 0); + return executeOnUiThread(action, 0); } /** @@ -236,7 +152,7 @@ public static CompletableFuture executeOnUiThread(final Runnable action) { * @return */ public static CompletableFuture executeOnUiThread(final Runnable action, final long delay) { - return executeOnUiThread(new CompletableFuture (action, null), delay); + return execute(new FutureTask (action, null), UI_EXECUTOR, delay); } public static CompletableFuture executeOnUiThread(final Runnable action, final int retryTimes, final long retryInterval, @@ -249,41 +165,6 @@ public void run() { }); } - /** - * The actions will be asynchronously executed in UI thread. - * - * @param actions - * @return - */ - @Beta - static List > executeOnUiThread(final List extends Runnable> actions) { - final List > results = new ArrayList<>(actions.size()); - - for (Runnable cmd : actions) { - results.add(executeOnUiThread(cmd)); - } - - return results; - } - - /** - * The actions will be asynchronously executed in UI thread. - * - * @param actions - * @param delay - * @return - */ - @Beta - static List > executeOnUiThread(final List extends Runnable> actions, final long delay) { - final List > results = new ArrayList<>(actions.size()); - - for (Runnable cmd : actions) { - results.add(executeOnUiThread(cmd, delay)); - } - - return results; - } - /** * The action will be asynchronously executed in UI thread. * @@ -291,7 +172,7 @@ static List > executeOnUiThread(final List extends Runn * @return */ public static CompletableFuture executeOnUiThread(final Callable action) { - return executeOnUiThread(new CompletableFuture<>(action), 0); + return executeOnUiThread(action, 0); } /** @@ -302,7 +183,7 @@ public static CompletableFuture executeOnUiThread(final Callable actio * @return */ public static CompletableFuture executeOnUiThread(final Callable action, final long delay) { - return executeOnUiThread(new CompletableFuture<>(action), delay); + return execute(new FutureTask<>(action), UI_EXECUTOR, delay); } public static CompletableFuture executeOnUiThread(final Callable action, final int retryTimes, final long retryInterval, @@ -316,633 +197,33 @@ public T call() throws Exception { }); } - /** - * The actions will be asynchronously executed in UI thread. - * - * @param actions - * @return - */ - @Beta - static List > executeOnUiThread(final Collection extends Callable > actions) { - final List > results = new ArrayList<>(actions.size()); - - for (Callable cmd : actions) { - results.add(executeOnUiThread(cmd)); - } - - return results; - } - - /** - * The actions will be asynchronously executed in UI thread. - * - * @param actions - * @param delay - * @return - */ - @Beta - static List > executeOnUiThread(final Collection extends Callable > actions, final long delay) { - final List > results = new ArrayList<>(actions.size()); - - for (Callable cmd : actions) { - results.add(executeOnUiThread(cmd, delay)); - } - - return results; - } - - private static CompletableFuture executeOnUiThread(final CompletableFuture callableFuture, final long delay) { - if (delay > 0) { - HANDLER.postDelayed(callableFuture, delay); - } else { - HANDLER.post(callableFuture); - } - - return callableFuture; - } - - /** - * Returns the first result, which could be an exception. - * - * @param a - * @return - */ - public static Optional > firstResult(final CompletableFuture extends T>... a) { - final BlockingQueue > queue = new ArrayBlockingQueue<>(a.length); - - for (CompletableFuture extends T> future : a) { - ((CompletableFuture ) future).callback(new Callback () { - @Override - public void on(Throwable e, T result) { - queue.offer(Pair.of(result, e)); - } - }); - } - - try { - Pair result = null; - - for (int i = 0, len = a.length; i < len; i++) { - result = queue.poll(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); - - if (result != null) { - return Optional.of(result); - } - } - } catch (InterruptedException e) { - // throw N.toRuntimeException(e); - logger.error("Thread is interrupted while retriving result from queue", e); - return Optional.empty(); - } - - return Optional.empty(); - } - - /** - * Returns the first result, which could be an exception. - * - * @param c - * @return - */ - public static Optional > firstResult(final Collection extends CompletableFuture extends T>> c) { - final BlockingQueue > queue = new ArrayBlockingQueue<>(c.size()); - - for (CompletableFuture extends T> future : c) { - ((CompletableFuture ) future).callback(new Callback () { - @Override - public void on(Throwable e, T result) { - queue.offer(Pair.of(result, e)); - } - }); - } - - try { - Pair result = null; - - for (int i = 0, len = c.size(); i < len; i++) { - result = queue.poll(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); - - if (result != null) { - return Optional.of(result); - } - } - } catch (InterruptedException e) { - // throw N.toRuntimeException(e); - logger.error("Thread is interrupted while retriving result from queue", e); - return Optional.empty(); - } - - return Optional.empty(); - } - - /** - * Returns the first result, which could be an exception. - * - * @param c - * @param maxTimeout - * @return - */ - public static Optional > firstResult(final Collection extends CompletableFuture extends T>> c, final long maxTimeout) { - final BlockingQueue > queue = new ArrayBlockingQueue<>(c.size()); - - for (CompletableFuture extends T> future : c) { - ((CompletableFuture ) future).callback(new Callback () { - @Override - public void on(Throwable e, T result) { - queue.offer(Pair.of(result, e)); - } - }); - } - - final long endTime = N.currentMillis() + maxTimeout; - - try { - Pair result = null; - - for (int i = 0, len = c.size(); i < len; i++) { - long timeout = endTime - N.currentMillis(); - - if (timeout <= 0) { - return Optional.empty(); - } - - result = queue.poll(timeout, TimeUnit.MILLISECONDS); - - if (result != null) { - return Optional.of(result); - } - } - } catch (InterruptedException e) { - // throw N.toRuntimeException(e); - logger.error("Thread is interrupted while retriving result from queue", e); - return Optional.empty(); - } - - return Optional.empty(); - } - - /** - * Returns the first non-exception result or empty if fail to get result for all futures. - * - * @param a - * @return - */ - public static OptionalNullable firstSuccessResult(final CompletableFuture extends T>... a) { - final BlockingQueue > queue = new ArrayBlockingQueue<>(a.length); - - for (CompletableFuture extends T> future : a) { - ((CompletableFuture ) future).callback(new Callback () { - @Override - public void on(Throwable e, T result) { - queue.offer(Pair.of(result, e)); - } - }); - } - - try { - Pair result = null; - - for (int i = 0, len = a.length; i < len; i++) { - result = queue.poll(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); - - if (result != null && result.right == null) { - return OptionalNullable.of(result.left); - } - } - } catch (InterruptedException e) { - // throw N.toRuntimeException(e); - logger.error("Thread is interrupted while retriving result from queue", e); - return OptionalNullable.empty(); - } - - return OptionalNullable.empty(); - } - - /** - * Returns the first non-exception result or empty if fail to get result for all futures. - * - * @param c - * @return - */ - public static OptionalNullable firstSuccessResult(final Collection extends CompletableFuture extends T>> c) { - final BlockingQueue > queue = new ArrayBlockingQueue<>(c.size()); - - for (CompletableFuture extends T> future : c) { - ((CompletableFuture ) future).callback(new Callback () { - @Override - public void on(Throwable e, T result) { - queue.offer(Pair.of(result, e)); - } - }); - } - - try { - Pair result = null; - - for (int i = 0, len = c.size(); i < len; i++) { - result = queue.poll(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); - - if (result != null && result.right == null) { - return OptionalNullable.of(result.left); - } - } - } catch (InterruptedException e) { - // throw N.toRuntimeException(e); - logger.error("Thread is interrupted while retriving result from queue", e); - return OptionalNullable.empty(); - } - - return OptionalNullable.empty(); - } - - /** - * Returns the first result, which could be an exception. - * - * @param c - * @param maxTimeout - * @return - */ - public static OptionalNullable firstSuccessResult(final Collection extends CompletableFuture extends T>> c, final long maxTimeout) { - final BlockingQueue > queue = new ArrayBlockingQueue<>(c.size()); - - for (CompletableFuture extends T> future : c) { - ((CompletableFuture ) future).callback(new Callback () { - @Override - public void on(Throwable e, T result) { - queue.offer(Pair.of(result, e)); - } - }); - } - - final long endTime = N.currentMillis() + maxTimeout; - - try { - Pair result = null; - - for (int i = 0, len = c.size(); i < len; i++) { - long timeout = endTime - N.currentMillis(); - - if (timeout <= 0) { - return OptionalNullable.empty(); - } - - result = queue.poll(timeout, TimeUnit.MILLISECONDS); - - if (result != null && result.right == null) { - return OptionalNullable.of(result.left); - } - } - } catch (InterruptedException e) { - // throw N.toRuntimeException(e); - logger.error("Thread is interrupted while retriving result from queue", e); - return OptionalNullable.empty(); - } - - return OptionalNullable.empty(); - } - - /** - * Returns the first result, which could be an exception. - * - * @param a - * @return - */ - public static Optional > lastResult(final CompletableFuture extends T>... a) { - final BlockingQueue > queue = new ArrayBlockingQueue<>(a.length); - - for (CompletableFuture extends T> future : a) { - ((CompletableFuture ) future).callback(new Callback () { - @Override - public void on(Throwable e, T result) { - queue.offer(Pair.of(result, e)); - } - }); - } - - final Holder > holder = new Holder<>((Pair ) N.NULL_MASK); - - try { - Pair result = null; - - for (int i = 0, len = a.length; i < len; i++) { - result = queue.poll(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); - - if (result != null) { - holder.setValue(result); - } - } - } catch (InterruptedException e) { - // throw N.toRuntimeException(e); - logger.error("Thread is interrupted while retriving result from queue", e); - return Optional.empty(); - } + private static CompletableFuture execute(final FutureTask futureTask, final Executor executor) { + executor.execute(futureTask); - if (holder.value == N.NULL_MASK) { - return Optional.empty(); - } else { - return Optional.of(holder.value); - } + return new CompletableFuture<>(futureTask, executor); } - /** - * Returns the first result, which could be an exception. - * - * @param c - * @return - */ - public static Optional > lastResult(final Collection extends CompletableFuture extends T>> c) { - final BlockingQueue > queue = new ArrayBlockingQueue<>(c.size()); - - for (CompletableFuture extends T> future : c) { - ((CompletableFuture ) future).callback(new Callback () { - @Override - public void on(Throwable e, T result) { - queue.offer(Pair.of(result, e)); - } - }); - } - - final Holder > holder = new Holder<>((Pair ) N.NULL_MASK); + private static CompletableFuture execute(final FutureTask futureTask, final UIExecutor executor, final long delay) { + executor.execute(futureTask, delay); - try { - Pair result = null; - - for (int i = 0, len = c.size(); i < len; i++) { - result = queue.poll(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); - - if (result != null) { - holder.setValue(result); - } - } - } catch (InterruptedException e) { - // throw N.toRuntimeException(e); - logger.error("Thread is interrupted while retriving result from queue", e); - return Optional.empty(); - } - - if (holder.value == N.NULL_MASK) { - return Optional.empty(); - } else { - return Optional.of(holder.value); - } + return new CompletableFuture<>(futureTask, executor); } - /** - * Returns the first result, which could be an exception. - * - * @param c - * @param maxTimeout - * @return - */ - public static Optional > lastResult(final Collection extends CompletableFuture extends T>> c, final long maxTimeout) { - final BlockingQueue > queue = new ArrayBlockingQueue<>(c.size()); + static final class UIExecutor implements Executor { + private static final Handler HANDLER = new Handler(Looper.getMainLooper()); - for (CompletableFuture extends T> future : c) { - ((CompletableFuture ) future).callback(new Callback () { - @Override - public void on(Throwable e, T result) { - queue.offer(Pair.of(result, e)); - } - }); + @Override + public void execute(Runnable command) { + HANDLER.post(command); } - final Holder > holder = new Holder<>((Pair ) N.NULL_MASK); - final long endTime = N.currentMillis() + maxTimeout; - - try { - Pair result = null; - - for (int i = 0, len = c.size(); i < len; i++) { - long timeout = endTime - N.currentMillis(); - - if (timeout <= 0) { - break; - } - - result = queue.poll(timeout, TimeUnit.MILLISECONDS); - - if (result != null) { - holder.setValue(result); - } + public void execute(Runnable command, final long delay) { + if (delay > 0) { + HANDLER.postDelayed(command, delay); + } else { + HANDLER.post(command); } - } catch (InterruptedException e) { - // throw N.toRuntimeException(e); - logger.error("Thread is interrupted while retriving result from queue", e); - return Optional.empty(); } - - if (holder.value == N.NULL_MASK) { - return Optional.empty(); - } else { - return Optional.of(holder.value); - } - } - - /** - * Returns the first non-exception result or empty if fail to get result for all futures. - * - * @param a - * @return - */ - public static OptionalNullable lastSuccessResult(final CompletableFuture extends T>... a) { - final BlockingQueue > queue = new ArrayBlockingQueue<>(a.length); - - for (CompletableFuture extends T> future : a) { - ((CompletableFuture ) future).callback(new Callback () { - @Override - public void on(Throwable e, T result) { - queue.offer(Pair.of(result, e)); - } - }); - } - - final Holder holder = new Holder<>((T) N.NULL_MASK); - - try { - Pair result = null; - - for (int i = 0, len = a.length; i < len; i++) { - result = queue.poll(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); - - if (result != null && result.right == null) { - holder.setValue(result.left); - } - } - } catch (InterruptedException e) { - // throw N.toRuntimeException(e); - logger.error("Thread is interrupted while retriving result from queue", e); - return OptionalNullable.empty(); - } - - if (holder.value == N.NULL_MASK) { - return OptionalNullable.empty(); - } else { - return OptionalNullable.of(holder.value); - } - } - - /** - * Returns the first non-exception result or empty if fail to get result for all futures. - * - * @param c - * @return - */ - public static OptionalNullable lastSuccessResult(final Collection extends CompletableFuture extends T>> c) { - final BlockingQueue > queue = new ArrayBlockingQueue<>(c.size()); - - for (CompletableFuture extends T> future : c) { - ((CompletableFuture ) future).callback(new Callback () { - @Override - public void on(Throwable e, T result) { - queue.offer(Pair.of(result, e)); - } - }); - } - - final Holder holder = new Holder<>((T) N.NULL_MASK); - - try { - Pair result = null; - - for (int i = 0, len = c.size(); i < len; i++) { - result = queue.poll(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); - - if (result != null && result.right == null) { - holder.setValue(result.left); - } - } - } catch (InterruptedException e) { - // throw N.toRuntimeException(e); - logger.error("Thread is interrupted while retriving result from queue", e); - return OptionalNullable.empty(); - } - - if (holder.value == N.NULL_MASK) { - return OptionalNullable.empty(); - } else { - return OptionalNullable.of(holder.value); - } - } - - /** - * Returns the first result, which could be an exception. - * - * @param c - * @param maxTimeout - * @return - */ - public static OptionalNullable lastSuccessResult(final Collection extends CompletableFuture extends T>> c, final long maxTimeout) { - final BlockingQueue > queue = new ArrayBlockingQueue<>(c.size()); - - for (CompletableFuture extends T> future : c) { - ((CompletableFuture ) future).callback(new Callback () { - @Override - public void on(Throwable e, T result) { - queue.offer(Pair.of(result, e)); - } - }); - } - - final Holder holder = new Holder<>((T) N.NULL_MASK); - final long endTime = N.currentMillis() + maxTimeout; - - try { - Pair result = null; - - for (int i = 0, len = c.size(); i < len; i++) { - long timeout = endTime - N.currentMillis(); - - if (timeout <= 0) { - break; - } - - result = queue.poll(timeout, TimeUnit.MILLISECONDS); - - if (result != null && result.right == null) { - holder.setValue(result.left); - } - } - } catch (InterruptedException e) { - // throw N.toRuntimeException(e); - logger.error("Thread is interrupted while retriving result from queue", e); - return OptionalNullable.empty(); - } - - if (holder.value == N.NULL_MASK) { - return OptionalNullable.empty(); - } else { - return OptionalNullable.of(holder.value); - } - } - - public static List > concat(final CompletableFuture extends T>... a) { - final List > queue = new ArrayList<>(a.length); - - for (CompletableFuture extends T> future : a) { - ((CompletableFuture ) future).get(new Callback () { - @Override - public void on(Throwable e, T result) { - queue.add(Pair.of(result, e)); - } - }); - } - - return queue; - } - - /** - * - * @param c - * @return - */ - public static List > concat(final Collection extends CompletableFuture extends T>> c) { - final List > queue = new ArrayList<>(c.size()); - - for (CompletableFuture extends T> future : c) { - ((CompletableFuture ) future).get(new Callback () { - @Override - public void on(Throwable e, T result) { - queue.add(Pair.of(result, e)); - } - }); - } - - return queue; - } - - public static BlockingQueue > parallelConcat(final CompletableFuture extends T>... a) { - final BlockingQueue > queue = new ArrayBlockingQueue<>(a.length); - - for (CompletableFuture extends T> future : a) { - ((CompletableFuture ) future).callback(new Callback () { - @Override - public void on(Throwable e, T result) { - queue.offer(Pair.of(result, e)); - } - }); - } - - return queue; - } - - /** - * - * @param c - * @return - */ - public static BlockingQueue > parallelConcat(final Collection extends CompletableFuture extends T>> c) { - final BlockingQueue > queue = new ArrayBlockingQueue<>(c.size()); - - for (CompletableFuture extends T> future : c) { - ((CompletableFuture ) future).callback(new Callback () { - @Override - public void on(Throwable e, T result) { - queue.offer(Pair.of(result, e)); - } - }); - } - - return queue; } /** diff --git a/src/com/landawn/abacus/android/util/CompletableFuture.java b/src/com/landawn/abacus/android/util/CompletableFuture.java index 45ab5ff2..dbddb221 100644 --- a/src/com/landawn/abacus/android/util/CompletableFuture.java +++ b/src/com/landawn/abacus/android/util/CompletableFuture.java @@ -14,66 +14,182 @@ package com.landawn.abacus.android.util; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.FutureTask; -import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import com.landawn.abacus.util.Callback; -import com.landawn.abacus.util.Callback2; +import com.landawn.abacus.android.util.AsyncExecutor.UIExecutor; +import com.landawn.abacus.logging.Logger; +import com.landawn.abacus.logging.LoggerFactory; +import com.landawn.abacus.util.MutableBoolean; import com.landawn.abacus.util.N; +import com.landawn.abacus.util.Optional; +import com.landawn.abacus.util.OptionalNullable; import com.landawn.abacus.util.Pair; -import com.landawn.abacus.util.ThreadMode; +import com.landawn.abacus.util.Tuple; +import com.landawn.abacus.util.Tuple.Tuple4; +import com.landawn.abacus.util.function.BiConsumer; +import com.landawn.abacus.util.function.BiFunction; +import com.landawn.abacus.util.function.Consumer; +import com.landawn.abacus.util.function.Function; /** + * The action
in all*run*
methods will be executed by the specified or defaultExecutor
. + * Theaction
in other methods will be executed in the thread where theget()
orget(timeout, unit)
method is called. + * * * @since 0.8 * * @author Haiyang Li */ -public class CompletableFutureimplements RunnableFuture { - private final FutureTask futureTask; - private volatile Callback callback; - private volatile ThreadMode threadMode; - private volatile boolean actionExecuted = false; +public class CompletableFuture implements Future { + private static final Logger logger = LoggerFactory.getLogger(CompletableFuture.class); + + private static final Consumer