Skip to content

Commit

Permalink
Publishing hang (#62)
Browse files Browse the repository at this point in the history
* Publishing hang in reactor publisher. The queue stop and never go back
  • Loading branch information
larousso authored Dec 8, 2023
1 parent 3772958 commit 73ad147
Show file tree
Hide file tree
Showing 18 changed files with 436 additions and 268 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ subprojects {
jooqAsyncVersion = "2.0.0"
functionalJsonVersion = "1.0.3"
kafkaVersion = "3.0.1"
reactorKafkaVersion = "1.3.12"
reactorVersion = "3.4.23"
reactorKafkaVersion = "1.3.22"
reactorVersion = "3.5.7"
vertxSqlVersion = "4.3.3"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -55,6 +56,13 @@ public static <E extends Event, Meta, Context> InMemoryEventStore<E, Meta, Conte
return new InMemoryEventStore<>(system);
}

@Override
public CompletionStage<Long> lastPublishedSequence() {
return CompletableFuture.completedStage(eventStore.stream().filter(e -> e.published).map(e -> e.sequenceNum)
.max(Comparator.comparingLong(e -> e))
.orElse(0L));
}

@Override
public Publisher<EventEnvelope<E, Meta, Context>> loadEventsUnpublished(Tuple0 tx, ConcurrentReplayStrategy concurrentReplayStrategy) {
return Source.<EventEnvelope<E, Meta, Context>>empty().runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), system);
Expand Down
1 change: 1 addition & 0 deletions thoth-core-reactor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ dependencies {
implementation "io.projectreactor.kafka:reactor-kafka:$reactorKafkaVersion"
implementation("io.vavr:vavr:$vavrVersion")
implementation("io.vavr:vavr-jackson:$vavrVersion")
implementation('org.slf4j:slf4j-api:2.0.7')
implementation("com.fasterxml.uuid:java-uuid-generator:3.1.5")
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$jacksonVersion")
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:$jacksonVersion")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,106 +5,171 @@
import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.EventPublisher;
import fr.maif.eventsourcing.EventStore;
import io.vavr.API;
import io.vavr.Tuple;
import io.vavr.Tuple0;
import io.vavr.collection.List;
import io.vavr.collection.Stream;
import io.vavr.control.Option;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

public class InMemoryEventStore<E extends Event, Meta, Context> implements EventStore<Tuple0, E, Meta, Context> {
public class InMemoryEventStore<E extends Event, Meta, Context> implements EventStore<InMemoryEventStore.Transaction<E, Meta, Context>, E, Meta, Context> {

private java.util.List<EventEnvelope<E, Meta, Context>> eventStore = new ArrayList<>();
ConcurrentHashMap<Long, EventEnvelope<E, Meta, Context>> store = new ConcurrentHashMap<>();

private final Sinks.Many<EventEnvelope<E, Meta, Context>> queue;
private final Flux<EventEnvelope<E, Meta, Context>> realTimeEvents;
AtomicLong sequenceNums = new AtomicLong(0);
private final Supplier<CompletionStage<Tuple0>> markAsPublishedTx;
private final Supplier<CompletionStage<Tuple0>> markAsPublished;

private AtomicLong sequence_num = new AtomicLong(0);
private final static Supplier<CompletionStage<Tuple0>> NOOP = () -> CompletableFuture.completedStage(Tuple.empty());

private final ConcurrentHashMap<String, Long> offsets = new ConcurrentHashMap<>();
public InMemoryEventStore(Supplier<CompletionStage<Tuple0>> markAsPublishedTx,
Supplier<CompletionStage<Tuple0>> markAsPublished,
EventEnvelope<E, Meta, Context>... events) {

public InMemoryEventStore() {
this.queue = Sinks.many().multicast().onBackpressureBuffer(10000);
this.realTimeEvents = queue.asFlux();
this.markAsPublishedTx = markAsPublishedTx;
this.markAsPublished = markAsPublished;
Stream.of(events).forEach(e -> store.put(e.sequenceNum, e));
}

public static <E extends Event, Meta, Context> InMemoryEventStore<E, Meta, Context> create() {
return new InMemoryEventStore<>();
public InMemoryEventStore(EventEnvelope<E, Meta, Context>... events) {
this(NOOP, NOOP, events);
}

@SafeVarargs
public static <E extends Event, Meta, Context> InMemoryEventStore<E, Meta, Context> create(EventEnvelope<E, Meta, Context>... events) {
return new InMemoryEventStore<>(events);
}


public record Transaction<E extends Event, Meta, Context>(ArrayList<EventEnvelope<E, Meta, Context>> events,
ArrayList<EventEnvelope<E, Meta, Context>> toPublish) {

public Transaction() {
this(new ArrayList<>(), new ArrayList<>());
}

public static <E extends Event, Meta, Context> Transaction<E, Meta, Context> newTx() {
return new Transaction<>();
}

public Tuple0 addAll(java.util.List<EventEnvelope<E, Meta, Context>> events) {
this.events.addAll(events);
return API.Tuple();
}
}

@Override
public Publisher<EventEnvelope<E, Meta, Context>> loadEventsUnpublished(Tuple0 tx, ConcurrentReplayStrategy concurrentReplayStrategy) {
return Flux.empty();
public Publisher<EventEnvelope<E, Meta, Context>> loadEventsUnpublished(Transaction<E, Meta, Context> tx, ConcurrentReplayStrategy concurrentReplayStrategy) {
return Flux.fromIterable(store.values().stream().sorted(Comparator.comparingLong(e -> e.sequenceNum)).toList())
.filter(e -> Boolean.FALSE.equals(e.published));
}

@Override
public CompletionStage<EventEnvelope<E, Meta, Context>> markAsPublished(Tuple0 tx, EventEnvelope<E, Meta, Context> eventEnvelope) {
return markAsPublished(eventEnvelope);
public CompletionStage<Long> lastPublishedSequence() {
AtomicLong max = new AtomicLong(0L);
store.values().forEach(k -> {
if (k.published) {
max.accumulateAndGet(k.sequenceNum, Math::max);
}
});
return CompletableFuture.completedStage(max.get());
}

@Override
public CompletionStage<Tuple0> openTransaction() {
return CompletableFuture.completedStage(Tuple.empty());
public CompletionStage<Transaction<E, Meta, Context>> openTransaction() {
return CompletableFuture.completedStage(new Transaction<>());
}

@Override
public CompletionStage<Tuple0> commitOrRollback(Option<Throwable> of, Tuple0 tx) {
return CompletionStages.empty();
public CompletionStage<EventEnvelope<E, Meta, Context>> markAsPublished(Transaction<E, Meta, Context> tx, EventEnvelope<E, Meta, Context> eventEnvelope) {
return markAsPublishedTx.get().thenCompose(any -> {
tx.toPublish().add(eventEnvelope);
return CompletableFuture.completedStage(eventEnvelope);
});
}

@Override
public CompletionStage<EventEnvelope<E, Meta, Context>> markAsPublished(EventEnvelope<E, Meta, Context> eventEnvelope) {
return CompletableFuture.completedStage(
eventEnvelope.copy().withPublished(true).build()
return markAsPublished.get().thenCompose(any ->
CompletableFuture.completedStage(store.compute(eventEnvelope.sequenceNum, (k, event) -> {
if (event == null) {
return eventEnvelope.copy().withPublished(true).build();
} else {
return event.copy().withPublished(true).build();
}
}))
);
}

@Override
public CompletionStage<Long> nextSequence(Tuple0 tx) {
return CompletableFuture.completedStage(sequence_num.incrementAndGet());
public CompletionStage<Tuple0> persist(Transaction<E, Meta, Context> transactionContext, List<EventEnvelope<E, Meta, Context>> events) {
return CompletableFuture.completedStage(transactionContext.addAll(events.toJavaList()));
}

@Override
public CompletionStage<Tuple0> commitOrRollback(Option<Throwable> of, Transaction<E, Meta, Context> tx) {
if (of.isEmpty()) {
tx.events().forEach(event ->
store.put(event.sequenceNum, event)
);
tx.toPublish.forEach(e ->
store.computeIfPresent(e.sequenceNum, (k, event) -> event.copy().withPublished(true).build())
);
}
tx.events.clear();
tx.toPublish.clear();
return CompletableFuture.completedStage(API.Tuple());
}

@Override
public CompletionStage<Long> nextSequence(InMemoryEventStore.Transaction<E, Meta, Context> tx) {
long value = store.values().stream().map(e -> e.sequenceNum).max(Comparator.comparingLong(e -> e)).orElse(0L) + 1;
sequenceNums.incrementAndGet();
return CompletableFuture.completedStage(sequenceNums.accumulateAndGet(value, Math::max));
}

@Override
public CompletionStage<Tuple0> publish(List<EventEnvelope<E, Meta, Context>> events) {
events.forEach(queue::tryEmitNext);
return CompletionStages.empty();
events.forEach(e -> store.put(e.sequenceNum, e));
return CompletableFuture.completedStage(API.Tuple());
}

@Override
public Publisher<EventEnvelope<E, Meta, Context>> loadEvents(String id) {
return Flux.fromIterable(eventStore);
return Flux.fromIterable(store.values()).filter(e ->
e.entityId.equals(id)
);
}


@Override
public Publisher<EventEnvelope<E, Meta, Context>> loadAllEvents() {
return Flux.fromIterable(eventStore);
return Flux.fromIterable(store.values());
}

@Override
public Publisher<EventEnvelope<E, Meta, Context>> loadEventsByQuery(Tuple0 tx, Query query) {
public Publisher<EventEnvelope<E, Meta, Context>> loadEventsByQuery(InMemoryEventStore.Transaction<E, Meta, Context> tx, Query query) {
return loadEventsByQuery(query);
}

@Override
public Publisher<EventEnvelope<E, Meta, Context>> loadEventsByQuery(Query query) {
return Flux.fromIterable(eventStore)
return Flux.fromIterable(store.values())
.filter(e -> Option.of(query.entityId).map(id -> id.equals(e.entityId)).getOrElse(true));
}

@Override
public CompletionStage<Tuple0> persist(Tuple0 transactionContext, List<EventEnvelope<E, Meta, Context>> events) {
eventStore.addAll(events.toJavaList());
return CompletionStages.empty();
}

@Override
public EventPublisher<E, Meta, Context> eventPublisher() {
Expand Down
Loading

0 comments on commit 73ad147

Please sign in to comment.