Skip to content

Commit

Permalink
Pb With java future (#66)
Browse files Browse the repository at this point in the history
* Pb with java future
  • Loading branch information
larousso authored May 15, 2024
1 parent b4943b1 commit 51230a2
Show file tree
Hide file tree
Showing 12 changed files with 79 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import akka.stream.Materializer;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.*;
import fr.maif.concurrent.CompletionStages;
import fr.maif.eventsourcing.Event;
import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.EventPublisher;
Expand Down Expand Up @@ -58,7 +59,7 @@ public static <E extends Event, Meta, Context> InMemoryEventStore<E, Meta, Conte

@Override
public CompletionStage<Long> lastPublishedSequence() {
return CompletableFuture.completedStage(eventStore.stream().filter(e -> e.published).map(e -> e.sequenceNum)
return CompletionStages.completedStage(eventStore.stream().filter(e -> e.published).map(e -> e.sequenceNum)
.max(Comparator.comparingLong(e -> e))
.orElse(0L));
}
Expand All @@ -75,7 +76,7 @@ public CompletionStage<EventEnvelope<E, Meta, Context>> markAsPublished(Tuple0 t

@Override
public CompletionStage<Tuple0> openTransaction() {
return CompletableFuture.completedStage(Tuple.empty());
return CompletionStages.empty();
}

@Override
Expand All @@ -85,14 +86,14 @@ public CompletionStage<Tuple0> commitOrRollback(Option<Throwable> of, Tuple0 tx)

@Override
public CompletionStage<EventEnvelope<E, Meta, Context>> markAsPublished(EventEnvelope<E, Meta, Context> eventEnvelope) {
return CompletableFuture.completedStage(
return CompletionStages.completedStage(
eventEnvelope.copy().withPublished(true).build()
);
}

@Override
public CompletionStage<Long> nextSequence(Tuple0 tx) {
return CompletableFuture.completedStage(sequence_num.incrementAndGet());
return CompletionStages.completedStage(sequence_num.incrementAndGet());
}

@Override
Expand Down
8 changes: 4 additions & 4 deletions thoth-core-akka/src/test/java/fr/maif/Helpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public static class VikingCommandHandler implements CommandHandler<String, Vikin

@Override
public CompletionStage<Either<String, Events<VikingEvent, String>>> handleCommand(Tuple0 unit, Option<Viking> state, VikingCommand vikingCommand) {
return CompletableFuture.completedStage(
return CompletionStages.completedStage(
Match(vikingCommand).of(
Case(VikingCommand.CreateVikingV1.pattern(), e -> events("C", new VikingEvent.VikingCreated(e.id, e.name))),
Case(VikingCommand.UpdateVikingV1.pattern(), e -> events("U", new VikingEvent.VikingUpdated(e.id, e.name))),
Expand Down Expand Up @@ -307,12 +307,12 @@ public static class VikingSnapshot implements SnapshotStore<Viking, String, Tupl

@Override
public CompletionStage<Option<Viking>> getSnapshot(String entityId) {
return CompletableFuture.completedStage(Option.of(data.get(entityId)));
return CompletionStages.completedStage(Option.of(data.get(entityId)));
}

@Override
public CompletionStage<Option<Viking>> getSnapshot(Tuple0 transactionContext, String entityId) {
return CompletableFuture.completedStage(Option.of(data.get(entityId)));
return CompletionStages.completedStage(Option.of(data.get(entityId)));
}

@Override
Expand All @@ -321,7 +321,7 @@ public CompletionStage<Tuple0> persist(Tuple0 transactionContext, String id, Opt
Case($Some($()), s -> data.put(s.id, s)),
Case($None(), () -> data.remove(id))
);
return CompletableFuture.completedStage(Tuple.empty());
return CompletionStages.completedStage(Tuple.empty());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class InMemoryEventStore<E extends Event, Meta, Context> implements Event
private final Supplier<CompletionStage<Tuple0>> markAsPublishedTx;
private final Supplier<CompletionStage<Tuple0>> markAsPublished;

private final static Supplier<CompletionStage<Tuple0>> NOOP = () -> CompletableFuture.completedStage(Tuple.empty());
private final static Supplier<CompletionStage<Tuple0>> NOOP = () -> CompletionStages.completedStage(Tuple.empty());

public InMemoryEventStore(Supplier<CompletionStage<Tuple0>> markAsPublishedTx,
Supplier<CompletionStage<Tuple0>> markAsPublished,
Expand Down Expand Up @@ -84,26 +84,26 @@ public CompletionStage<Long> lastPublishedSequence() {
max.accumulateAndGet(k.sequenceNum, Math::max);
}
});
return CompletableFuture.completedStage(max.get());
return CompletionStages.completedStage(max.get());
}

@Override
public CompletionStage<Transaction<E, Meta, Context>> openTransaction() {
return CompletableFuture.completedStage(new Transaction<>());
return CompletionStages.completedStage(new Transaction<>());
}

@Override
public CompletionStage<EventEnvelope<E, Meta, Context>> markAsPublished(Transaction<E, Meta, Context> tx, EventEnvelope<E, Meta, Context> eventEnvelope) {
return markAsPublishedTx.get().thenCompose(any -> {
tx.toPublish().add(eventEnvelope);
return CompletableFuture.completedStage(eventEnvelope);
return CompletionStages.completedStage(eventEnvelope);
});
}

@Override
public CompletionStage<EventEnvelope<E, Meta, Context>> markAsPublished(EventEnvelope<E, Meta, Context> eventEnvelope) {
return markAsPublished.get().thenCompose(any ->
CompletableFuture.completedStage(store.compute(eventEnvelope.sequenceNum, (k, event) -> {
CompletionStages.completedStage(store.compute(eventEnvelope.sequenceNum, (k, event) -> {
if (event == null) {
return eventEnvelope.copy().withPublished(true).build();
} else {
Expand All @@ -115,7 +115,7 @@ public CompletionStage<EventEnvelope<E, Meta, Context>> markAsPublished(EventEnv

@Override
public CompletionStage<Tuple0> persist(Transaction<E, Meta, Context> transactionContext, List<EventEnvelope<E, Meta, Context>> events) {
return CompletableFuture.completedStage(transactionContext.addAll(events.toJavaList()));
return CompletionStages.completedStage(transactionContext.addAll(events.toJavaList()));
}

@Override
Expand All @@ -130,20 +130,20 @@ public CompletionStage<Tuple0> commitOrRollback(Option<Throwable> of, Transactio
}
tx.events.clear();
tx.toPublish.clear();
return CompletableFuture.completedStage(API.Tuple());
return CompletionStages.completedStage(API.Tuple());
}

@Override
public CompletionStage<Long> nextSequence(InMemoryEventStore.Transaction<E, Meta, Context> tx) {
long value = store.values().stream().map(e -> e.sequenceNum).max(Comparator.comparingLong(e -> e)).orElse(0L) + 1;
sequenceNums.incrementAndGet();
return CompletableFuture.completedStage(sequenceNums.accumulateAndGet(value, Math::max));
return CompletionStages.completedStage(sequenceNums.accumulateAndGet(value, Math::max));
}

@Override
public CompletionStage<Tuple0> publish(List<EventEnvelope<E, Meta, Context>> events) {
events.forEach(e -> store.put(e.sequenceNum, e));
return CompletableFuture.completedStage(API.Tuple());
return CompletionStages.completedStage(API.Tuple());
}

@Override
Expand Down
8 changes: 4 additions & 4 deletions thoth-core-reactor/src/test/java/fr/maif/Helpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ public static class VikingCommandHandler implements CommandHandler<String, Vikin

@Override
public CompletionStage<Either<String, Events<VikingEvent, String>>> handleCommand(Transaction<VikingEvent, Tuple0, Tuple0> unit, Option<Viking> state, VikingCommand vikingCommand) {
return CompletableFuture.completedStage(
return CompletionStages.completedStage(
Match(vikingCommand).of(
Case(VikingCommand.CreateVikingV1.pattern(), e -> events("C", new VikingEvent.VikingCreated(e.id, e.name, e.age))),
Case(VikingCommand.UpdateVikingV1.pattern(), e -> events("U", new VikingEvent.VikingUpdated(e.id, e.name, e.age))),
Expand Down Expand Up @@ -364,12 +364,12 @@ public static class VikingSnapshot implements SnapshotStore<Viking, String, Tran

@Override
public CompletionStage<Option<Viking>> getSnapshot(String entityId) {
return CompletableFuture.completedStage(Option.of(data.get(entityId)));
return CompletionStages.completedStage(Option.of(data.get(entityId)));
}

@Override
public CompletionStage<Option<Viking>> getSnapshot(Transaction<VikingEvent, Tuple0, Tuple0> transactionContext, String entityId) {
return CompletableFuture.completedStage(Option.of(data.get(entityId)));
return CompletionStages.completedStage(Option.of(data.get(entityId)));
}

@Override
Expand All @@ -378,7 +378,7 @@ public CompletionStage<Tuple0> persist(Transaction<VikingEvent, Tuple0, Tuple0>
Case($Some($()), s -> data.put(s.id, s)),
Case($None(), () -> data.remove(id))
);
return CompletableFuture.completedStage(Tuple.empty());
return CompletionStages.completedStage(Tuple.empty());
}
}

Expand Down
35 changes: 23 additions & 12 deletions thoth-core/src/main/java/fr/maif/concurrent/CompletionStages.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,54 +15,65 @@

public class CompletionStages {

public static <U> CompletionStage<U> completedStage(U value) {
CompletableFuture<U> completableFuture = new CompletableFuture<>();
completableFuture.complete(value);
return completableFuture;
}
public static <U> CompletionStage<U> failedStage(Throwable e) {
CompletableFuture<U> completableFuture = new CompletableFuture<>();
completableFuture.completeExceptionally(e);
return completableFuture;
}

public static <T, U> CompletionStage<List<U>> traverse(List<T> elements, Function<T, CompletionStage<U>> handler) {
return elements.foldLeft(
CompletableFuture.completedStage(List.empty()),
completedStage(List.empty()),
(fResult, elt) ->
fResult.thenCompose(listResult -> handler.apply(elt).thenApply(listResult::append))
);
}

public static <T> CompletionStage<T> fromTry(Supplier<Try<T>> tryValue, Executor executor) {
return CompletableFuture.supplyAsync(() -> tryValue.get().fold(
CompletableFuture::<T>failedStage,
CompletableFuture::completedStage
CompletionStages::<T>failedStage,
CompletionStages::completedStage
), executor).thenCompose(identity());
}

public static <T> CompletionStage<T> fromTry(Supplier<Try<T>> tryValue) {
return CompletableFuture.supplyAsync(() -> tryValue.get().fold(
CompletableFuture::<T>failedStage,
CompletableFuture::completedStage
CompletionStages::<T>failedStage,
CompletionStages::completedStage
)).thenCompose(identity());
}

public static <T> CompletionStage<T> of(Supplier<T> tryValue, Executor executor) {
return CompletableFuture.supplyAsync(() -> {
try {
return CompletableFuture.completedStage(tryValue.get());
return completedStage(tryValue.get());
} catch (Exception e) {
return CompletableFuture.<T>failedStage(e);
return CompletionStages.<T>failedStage(e);
}
}, executor).thenCompose(identity());
}
public static <T> CompletionStage<T> of(Supplier<T> tryValue) {
return CompletableFuture.supplyAsync(() -> {
try {
return CompletableFuture.completedStage(tryValue.get());
return completedStage(tryValue.get());
} catch (Exception e) {
return CompletableFuture.<T>failedStage(e);
return CompletionStages.<T>failedStage(e);
}
}).thenCompose(identity());
}

public static <S> CompletionStage<S> successful(S value) {
return CompletableFuture.completedStage(value);
return completedStage(value);
}
public static <S> CompletionStage<S> failed(Throwable e) {
return CompletableFuture.failedStage(e);
return CompletionStages.failedStage(e);
}
public static CompletionStage<Tuple0> empty() {
return CompletableFuture.completedStage(Tuple.empty());
return completedStage(Tuple.empty());
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package fr.maif.eventsourcing;

import fr.maif.concurrent.CompletionStages;
import io.vavr.Tuple;
import io.vavr.Tuple0;
import io.vavr.collection.List;
import io.vavr.concurrent.Future;
import io.vavr.control.Option;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

public interface AggregateStore<S extends State<S>, Id, TxCtx> {
Expand All @@ -16,11 +15,11 @@ public interface AggregateStore<S extends State<S>, Id, TxCtx> {
CompletionStage<Option<S>> getAggregate(TxCtx ctx, Id entityId);

default CompletionStage<Tuple0> storeSnapshot(TxCtx transactionContext, Id id, Option<S> state) {
return CompletableFuture.completedStage(Tuple.empty());
return CompletionStages.completedStage(Tuple.empty());
}

default CompletionStage<Option<S>> getSnapshot(TxCtx transactionContext, Id id) {
return CompletableFuture.completedStage(Option.none());
return CompletionStages.completedStage(Option.none());
}

default <E extends Event> CompletionStage<Option<S>> buildAggregateAndStoreSnapshot(TxCtx ctx, EventHandler<S, E> eventHandler, Option<S> state, Id id, List<E> events, Option<Long> lastSequenceNum) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package fr.maif.eventsourcing;

import fr.maif.concurrent.CompletionStages;
import io.vavr.Tuple0;
import io.vavr.Tuple2;
import io.vavr.collection.List;
import io.vavr.concurrent.Future;
import io.vavr.control.Either;
import io.vavr.control.Option;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

/**
Expand All @@ -33,11 +31,11 @@ public interface CommandHandler<Error, State, Command, E extends Event, Message,
CompletionStage<Either<Error, Events<E, Message>>> handleCommand(TxCtx ctx, Option<State> state, Command command);

default CompletionStage<Either<Error, Events<E, Tuple0>>> eventsAsync(E... events) {
return CompletableFuture.completedStage(Either.right(Events.events(List.of(events))));
return CompletionStages.completedStage(Either.right(Events.events(List.of(events))));
}

default CompletionStage<Either<Error, Events<E, Message>>> eventsAsync(Message message, E... events) {
return CompletableFuture.completedStage(Either.right(Events.events(message, List.of(events))));
return CompletionStages.completedStage(Either.right(Events.events(message, List.of(events))));
}

default Either<Error, Events<E, Tuple0>> events(E... events) {
Expand All @@ -53,7 +51,7 @@ default Either<Error, Events<E, Message>> fail(Error error) {
}

default CompletionStage<Either<Error, Events<E, Message>>> failAsync(Error error) {
return CompletableFuture.completedStage(Either.left(error));
return CompletionStages.completedStage(Either.left(error));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public CompletionStage<InTransactionResult<List<Either<Error, ProcessingSuccess<

public CompletionStage<List<Tuple3<C, Option<S>, Either<Error, Events<E, Message>>>>> traverseCommands(List<C> elements, BiFunction<C, List<E>, CompletionStage<Tuple3<C, Option<S>, Either<Error, Events<E, Message>>>>> handler) {
return elements.foldLeft(
CompletableFuture.completedStage(Tuple(List.<Tuple3<C, Option<S>, Either<Error, Events<E, Message>>>>empty(), List.<Events<E, Message>>empty())),
CompletionStages.completedStage(Tuple(List.<Tuple3<C, Option<S>, Either<Error, Events<E, Message>>>>empty(), List.<Events<E, Message>>empty())),
(fResult, elt) ->
fResult.thenCompose(listResult -> handler.apply(elt, listResult._2.flatMap(e -> e.events))
.thenApply(r ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ public <T> CompletionStage<T> withTransaction(Function<Connection, CompletionSta
.thenCompose(r -> commit(connection).thenApply(__ -> r))
.exceptionallyCompose(e -> {
LOGGER.error("Error, rollbacking, {}", e);
return rollback(connection).thenCompose(__ -> CompletableFuture.<T>failedStage(e));
return rollback(connection).thenCompose(__ -> CompletionStages.<T>failedStage(e));
})
.thenCompose(r -> closeConnection(connection).thenApply(__ -> r))
.exceptionallyCompose(e -> {
LOGGER.error("Error, closing connection, {}", e);
return closeConnection(connection).thenCompose(__ -> CompletableFuture.<T>failedStage(e));
return closeConnection(connection).thenCompose(__ -> CompletionStages.<T>failedStage(e));
})
);
}
Expand Down
8 changes: 4 additions & 4 deletions thoth-core/src/test/java/fr/maif/Helpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ public static class VikingCommandHandler implements CommandHandler<String, Vikin

@Override
public CompletionStage<Either<String, Events<VikingEvent, String>>> handleCommand(Tuple0 unit, Option<Viking> state, VikingCommand vikingCommand) {
return CompletableFuture.completedStage(
return CompletionStages.completedStage(
Match(vikingCommand).of(
Case(VikingCommand.CreateVikingV1.pattern(), e -> events("C", new VikingEvent.VikingCreated(e.id, e.name))),
Case(VikingCommand.UpdateVikingV1.pattern(), e -> events("U", new VikingEvent.VikingUpdated(e.id, e.name))),
Expand Down Expand Up @@ -293,12 +293,12 @@ public static class VikingSnapshot implements SnapshotStore<Viking, String, Tupl

@Override
public CompletionStage<Option<Viking>> getSnapshot(String entityId) {
return CompletableFuture.completedStage(Option.of(data.get(entityId)));
return CompletionStages.completedStage(Option.of(data.get(entityId)));
}

@Override
public CompletionStage<Option<Viking>> getSnapshot(Tuple0 transactionContext, String entityId) {
return CompletableFuture.completedStage(Option.of(data.get(entityId)));
return CompletionStages.completedStage(Option.of(data.get(entityId)));
}

@Override
Expand All @@ -307,7 +307,7 @@ public CompletionStage<Tuple0> persist(Tuple0 transactionContext, String id, Opt
Case($Some($()), s -> data.put(s.id, s)),
Case($None(), () -> data.remove(id))
);
return CompletableFuture.completedStage(Tuple.empty());
return CompletionStages.completedStage(Tuple.empty());
}
}

Expand Down
Loading

0 comments on commit 51230a2

Please sign in to comment.